In previous article we have seen, how to write a Kafka producer in java. In this article we will see how to write a kafka consumer in java with manual offset committing to get data from kafka cluster.

Maven dependencies required for Kafka Java consumer

In order to read data from kafka using a kafka java consumer, we need to add following maven dependency (kafka-java-client) to our pom.xml.

<dependency>
	<groupId>org.apache.kafka</groupId>
		<artifactId>kafka-clients</artifactId>
	<version>1.0.0</version>
</dependency>
Kafka consumers works in a way that only one consumer of same consumer group gets a massage from a topic's partition or we can say different consumers with same consumer group consumes messages from different partitions of a topic.

For example if a topic has 3 partitions and we have 4 consumers than ideally 3 consumers will consume from three different partitions and one consumer will left idle. In case partitions are 3 and consumers are 2 then 1 consumer will consume one partition and other one will consume data from remaining two partitions, in any case one partition is being consumed with at most one consumer of same group.

Kafka java client provides three main consumers:

1) Automatic offset commiting consumer
2) Manual offset commiting consumer
3) Manualy assign a consumer to a partition

In this article we will see how to implement a "Manual offset committing consumer", in this type of consumer the offset commit by which the data has been consumed in a partition from a consumer can be controller manually.

Manual offset commiting kafka java consumer

Following class can be used as "Manual offset committing kafka java consumer, please read the code and explanation written below:
package com.til.kafka.consumer;

import java.util.List;
import java.util.Properties;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import com.tb.constants.KafkaConstants;

//Manual Offset Control
public class MOCKafkaConsumer {
	Properties props;
	KafkaConsumer<String, String> consumer;

	public MOCKafkaConsumer(String brokerString) {
		props = new Properties();
		props.put("bootstrap.servers", brokerString);
		props.put("group.id", KafkaConstants.KAFKA_CONSUMER_GROUP);
		props.put("enable.auto.commit", "false");
		props.put("key.deserializer", KafkaConstants.KAFKA_KEY_SERIALIZER);
		props.put("value.deserializer", KafkaConstants.KAFKA_VALUE_SERIALIZER);
		consumer = new KafkaConsumer<>(props);
	}

	public void subscribe(List<String> topics) {
		consumer.subscribe(topics);
		while (true) {
			ConsumerRecords<String, String> records = consumer.poll(100);
			for (ConsumerRecord<String, String> record : records)
				System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
			// This line of code manually commits offset to kafka
			consumer.commitSync();
		}
	}
}

Utility class to hold constants

package com.tb.constants;

public class KafkaConstants {
	public static String KAFKA_BROKER_STRING = 
			"127.0.0.1:9092,127.0.0.1:9093,127.0.0.1:9094";
	public static String KAFKA_KEY_SERIALIZER = 
			"org.apache.kafka.common.serialization.StringSerializer";
	public static String KAFKA_VALUE_SERIALIZER = 
			"org.apache.kafka.common.serialization.StringSerializer";
	public static String KAFKA_TOPIC = "TEST-1";
	public static String KAFKA_CONSUMER_GROUP = "TEST";
}

"bootstrap.servers", this property specifies the kafka cluster brokers.

"group.id", this specifies the consumer group, only one consumer of a group gets a message from a topic's partition.

"enable.auto.commit", this property specifies that the offset by which the data has been consumed in a partition should not be committed automatically. Offset is committed only when "consumer.commitSync()" is called.

"key.serializer" and "value.serializer", are classes to be used to decode the message into bytes.

Testing kafka consumer

Lets run our kafka consumer from main() class and try to read data:
package com.tb.manager;

import java.util.Arrays;

import com.tb.constants.KafkaConstants;
import com.til.kafka.consumer.MOCKafkaConsumer;

public class App {
	public static void main(String[] args) {

		// This will start a consumer in new thread
		new Thread(new Runnable() {
			@Override
			public void run() {
				MOCKafkaConsumer mocKafkaConsumer = new MOCKafkaConsumer(KafkaConstants.KAFKA_BROKER_STRING);
				mocKafkaConsumer.subscribe(Arrays.asList(KafkaConstants.KAFKA_TOPIC));

			}
		}).start();
	}
}

In this article we have seen how to write a "Manual offset committing consumer" in java. In coming articles we will see more about kafka producers and consumers in Java.
  • By Techburps.com
  • Jan 22, 2018
  • Big Data