In previous article we have seen, how to write a Kafka producer in java. In this article we will see how to write a kafka consumer in java that can be assigned manually to a specific partition.

Maven dependencies required for Kafka Java consumer

In order to read data from kafka using a kafka java consumer, we need to add following maven dependency (kafka-java-client) to our pom.xml.

<dependency>
	<groupId>org.apache.kafka</groupId>
		<artifactId>kafka-clients</artifactId>
	<version>1.0.0</version>
</dependency>
Kafka consumers works in a way that only one consumer of same consumer group gets a massage from a topic's partition or we can say different consumers with same consumer group consumes messages from different partitions of a topic.

For example if a topic has 3 partitions and we have 4 consumers than ideally 3 consumers will consume from three different partitions and one consumer will left idle. In case partitions are 3 and consumers are 2 then 1 consumer will consume one partition and other one will consume data from remaining two partitions, in any case one partition is being consumed with at most one consumer of same group.

We can bypass consumer group concept by writing a manually assigned consumer to a specific partition.

Kafka java client provides three main consumers:

1) Automatic offset commiting consumer
2) Manual offset commiting consumer
3) Manualy assign a consumer to a partition

In this article we will see how to write a kafka consumer in java that can be assigned manually to a specific partition, in this type of consumer we can bypass consumer group concept and can assign a consumer to a specific partition.

Manually assign a consumer to a kafka partition in java

Following class can be used as "Manually assignable consumer", please read the code and explanation written below:
package com.til.kafka.consumer;

import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Properties;
import java.util.Set;
import java.util.stream.Collectors;

import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;

import com.tb.constants.KafkaConstants;

//Manual Partition Assignment
public class MPAKafkaConsumer {
	private Properties props;
	private KafkaConsumer<String, String> consumer;

	public MPAKafkaConsumer(String brokerString) {

		props = new Properties();
		props.put("bootstrap.servers", brokerString);
		// props.put("group.id", KafkaConstants.KAFKA_CONSUMER_GROUP);
		props.put("enable.auto.commit", "false");
		props.put("key.deserializer", KafkaConstants.KAFKA_KEY_SERIALIZER);
		props.put("value.deserializer", KafkaConstants.KAFKA_VALUE_SERIALIZER);
		consumer = new KafkaConsumer<>(props);

	}

	public void subscribe(List<TopicPartition> topicsPartions) {
		consumer.assign(topicsPartions);

	}

}

Utility class to hold constants

package com.tb.constants;

public class KafkaConstants {
	public static String KAFKA_BROKER_STRING = 
			"127.0.0.1:9092,127.0.0.1:9093,127.0.0.1:9094";
	public static String KAFKA_KEY_SERIALIZER = 
			"org.apache.kafka.common.serialization.StringSerializer";
	public static String KAFKA_VALUE_SERIALIZER = 
			"org.apache.kafka.common.serialization.StringSerializer";
	public static String KAFKA_TOPIC = "TEST-1";
	public static String KAFKA_CONSUMER_GROUP = "TEST";
}

"bootstrap.servers", this property specifies the kafka cluster brokers.

"group.id", this specifies the consumer group, only one consumer of a group gets a message from a topic's partition.

"enable.auto.commit", this property specifies that the offset by which the data has been consumed in a partition should not be committed automatically. Offset is committed only when "consumer.commitSync()" is called.

"key.serializer" and "value.serializer", are classes to be used to decode the message into bytes.

Testing kafka consumer

Lets run our kafka consumer from main() class and try to read data:
package com.tb.manager;

import java.util.Arrays;

import org.apache.kafka.common.TopicPartition;

import com.tb.constants.KafkaConstants;
import com.til.kafka.consumer.MPAKafkaConsumer;

public class App {
	public static void main(String[] args) {

		// Partitions to which a consumer has to assign
		TopicPartition partition = new 
				TopicPartition(KafkaConstants.KAFKA_TOPIC, 0);

		// This will start a consumer in new thread
		new Thread(new Runnable() {
			@Override
			public void run() {
				MPAKafkaConsumer mpaKafkaConsumer = 
						new MPAKafkaConsumer(KafkaConstants.KAFKA_BROKER_STRING);
				
				mpaKafkaConsumer.subscribe(Arrays.asList(partition));

			}
		}).start();
	}
}
In this article we have seen how to write a kafka consumer in java that can be assigned manually to a specific partition. In coming articles we will see more about kafka producers and consumers in Java.
  • By Techburps.com
  • Jan 22, 2018
  • Big Data