Apache Kafka: An Intro
What is Kafka?
In simple terms, Kafka is like a messaging system that allows different applications to talk to each other by sending messages. It is very fast, scalable, and reliable, which makes it ideal for handling large volumes of data.
Think of it like a mailroom in an office building, where mail comes in from different sources (producers) and is sorted and distributed to different departments (consumers) based on their needs.
Kafka Terminology:
Broker: A Broker is a Kafka server that is responsible for the communication between multiple services. The system can have multiple brokers (and that is recommended for high availability and fault tolerance) and each broker can be identified by id. Each broker can have one or more topic partitions and those partitions will be replicated across the cluster. Multiple brokers would form a Kafka cluster.
Producer and Consumer: The clients/services that produce these events to Kafka broker are referred to as Producers and those which consume these events are referred to as Consumers. It could also be possible that the same service can both produce and consume messages from Kafka.
Event: The messages that are produced to or consumed from the Kafka broker are called events.
Consumer groups: A consumer group is a set of consumer instances that together consume a set of partitions in parallel. Each partition can be consumed by only one consumer within a group at a time. If two consumers have subscribed to the same topic and are present in the same consumer group, then these two consumers would be assigned a different set of partitions and none of these two consumers would receive the same messages.
Topic: A message sent to the kafka broker could be in a variety of formats(Plain text, JSON). In order to differentiate the type of events stored in Kafka, topics are used. Producer can send a certain type of message in a certain topic.
Partition: Topics are split into partitions like 0,1, 2, 3, … etc And each message within the partition gets an incremental id called offset. And these offsets have meaning only in that specific partition.
Offset: To keep a track of which events have already been consumed by the consumer, an index pointing to the latest consumed message is stored inside Kafka, this index is called the offset and helps keep a track of which events have already been consumed by the consumer.
Replication Factor: A replica of a partition is a backup copy of that partition. The replication factor of a topic decides how many replicas of a partition in that topic should be maintained by the Kafka cluster. A topic with partition as 1 and replication factor as 2 would mean that two copies of the same partition with same data would be stored in the Kafka cluster.
Zookeeper: Zookeeper is an essential component that helps in managing its configuration, metadata, and coordination across the Kafka broker maintain the cluster ACLs, stores the offsets for all the partitions of all the topics, used to track the status of the Kafka broker nodes and maintain the client quotas (how much data a producer/consumer is allowed to read/write).
How to set up Kafka locally?
//Start the zookeeper service
Move to the downloaded kafka file path
$ bin/zookeeper-server-start.sh config/zookeeper.properties
Open another terminal session and Start the kafka broker service
$ bin/kafka-server-start.sh config/server.properties
First Spring Boot Kafka Project
From spring initializer generate a starter Spring Boot Application with the following dependencies.
Maven, Spring Boot 2.7.9, Group: com.alphanove, Name: spring-boot-kafka, packaging – JAR, Java 11, Spring Web, Spring for Apache Kafka.
Open IntelliJ IDEA to write the code to call REST Endpoints which trigger the producer to send message to the kafka broker for the consumer to subscribe to the topic.
Establish connection to kafka in application .properties file.
Configure Kafka producer and consumer
spring.kafka.producer.bootstrap-servers: localhost:9092
spring.kafka.consumer.bootstrap-servers: localhost:9092
Kafka broker by default runs on port 9092
If u have more than one broker (recommended 3 for fault tolerance as in if a server breakes the messages should not be lost)
#spring.kafka.consumer.bootstrap-servers:localhost:9092,localhost:9091,localhost:9093
Configure consumer group by providing id.
spring.kafka.consumer.group-id: myConsumerGroup
Set offset to the earliest
spring.kafka.consumer.auto-offset-reset:earliest
For the first task lets send messages from producer to consumer in plain text. Let’s use inbuilt StringSerializer and StringDeserializer
#consumer key and value deserializer
spring.kafka.consumer.key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
#producer key and value deserializer
spring.kafka.producer.key-serializer: org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer: org.apache.kafka.common.serialization.StringSerializer
By far, the configuration is done. Let’s start the actual code.
Create a package (name: config )in in the com.alphanove.spring-boot-kafka package and a class (name:KafkaTopicConfig) in kafka package. Create a plainTextTopic method to send messages in plain text.
@Configuration
public class KafkaTopicConfig {
@Bean
public NewTopic plainTextTopic() {
return TopicBuilder.name(“PlainTextJavaExample”)
.build();
//.partitions(10) not creating partitions taking default
}
}
Import necessary built in classes
import org.apache.kafka.clients.admin.NewTopic;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.TopicBuilder;
Create a package (name: kafka )in in the com.alphanove.spring-boot-kafka package and a class (name:KafkaProducer) in kafka package.
@Service
public class KafkaProducer {
//Default logger to log the msg
private static final Logger LOGGER = LoggerFactory.getLogger(KafkaProducer.class);
//inject inbuild spring kafka template to call producer
private KafkaTemplate<String, String> kafkaTemplate;
//constructor
public KafkaProducer(KafkaTemplate<String, String> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
//create a method which uses kafka method to send a message
public void sendMessage(String message){
LOGGER.info(String.format("Message sent is : %s",message));
//send a message to JavaExample Topic
kafkaTemplate.send("PlainTextJavaExample", message);
}
}
Import the necessary classes
import org.slf4j.LoggerFactory;
import org.slf4j.Logger;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
Create a class (name:KafkaConsumer) in kafka package.
@Service
public class KafkaConsumer {
//create a subscriber(consume) method which will subscribe to the topic
//provide consumer group in the annotation kafka listener
//Logger
private static final Logger LOGGER = LoggerFactory.getLogger(KafkaConsumer.class);
@KafkaListener(topics = "PlainTextJavaExample",groupId = "myConsumerGroup")
public void consume(String message) {
LOGGER.info(String.format("Received message:%s ",message));
}
}
The imports!
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;
Now we need to call the endpoint to send message with the producer
Create a package (name: controller )in in the com.alphanove.spring-boot-kafka package and a class (name:MsgController) in controller package.
@RestController
@RequestMapping("/api/v1/kafka")
public class MsgController {
//inject kafka producer
private KafkaProducer kafkaProducer;
//@Autowired not required since only one constructor is available
//gen constructor
public MsgController(KafkaProducer kafkaProducer) {
this.kafkaProducer = kafkaProducer;
}
//http:localhost:8080/api/v1/kafka/publish?message=hello world
@GetMapping("/publish")
public ResponseEntity<String> publish(@RequestParam("message") String message){
kafkaProducer.sendMessage(message);
return ResponseEntity.ok("Message sent to the topic is : "+ message);
}
}
And the imports
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.*;
Start the consumer service by running the following command on CLI by specifying the topic name and run the application on Intellij IDEA
bin/
kafka-console-producer.sh --topic PlainTextJavaExample --bootstrap-server localhost:9092
In your browser send a get request and it can be seen on the CLI and in the logs in Intellj
http://localhost:8080/api/v1/kafka/publish?message=hello springboot kafka
Postman can also be used to call the API endpoints.