Skip links
ooo1

Apache Kafka: An Intro

What is Kafka?

In simple terms, Kafka is like a messaging system that allows different applications to talk to each other by sending messages. It is very fast, scalable, and reliable, which makes it ideal for handling large volumes of data.

Think of it like a mailroom in an office building, where mail comes in from different sources (producers) and is sorted and distributed to different departments (consumers) based on their needs.

Kafka Terminology:

When the events occur the messages has to be stored in a central place to be accessed by the consumers.

 

image2 1

Broker: A Broker is a Kafka server that is responsible for the communication between multiple services. The system can have multiple brokers (and that is recommended for high availability and fault tolerance) and each broker can be identified by id. Each broker can have one or more topic partitions and those partitions will be replicated across the cluster. Multiple brokers would form a Kafka cluster.

Producer and Consumer: The clients/services that produce these events to Kafka broker are referred to as Producers and those which consume these events are referred to as Consumers. It could also be possible that the same service can both produce and consume messages from Kafka.

Event: The messages that are produced to or consumed from the Kafka broker are called events.

Consumer groups: A consumer group is a set of consumer instances that together consume a set of partitions in parallel. Each partition can be consumed by only one consumer within a group at a time. If two consumers have subscribed to the same topic and are present in the same consumer group, then these two consumers would be assigned a different set of partitions and none of these two consumers would receive the same messages.

Topic: A message sent to the kafka broker could be in a variety of formats(Plain text, JSON). In order to differentiate the type of events stored in Kafka, topics are used. Producer can send a certain type of message in a certain topic.

Partition: Topics are split into partitions like 0,1, 2, 3, … etc And each message within the partition gets an incremental id called offset. And these offsets have meaning only in that specific partition.

Offset: To keep a track of which events have already been consumed by the consumer, an index pointing to the latest consumed message is stored inside Kafka, this index is called the offset and helps keep a track of which events have already been consumed by the consumer. 

Replication Factor: A replica of a partition is a backup copy of that partition. The replication factor of a topic decides how many replicas of a partition in that topic should be maintained by the Kafka cluster. A topic with partition as 1 and replication factor as 2 would mean that two copies of the same partition with same data would be stored in the Kafka cluster.

ZookeeperZookeeper is an essential component that helps in managing its configuration, metadata, and coordination across the Kafka broker maintain the cluster ACLs, stores the offsets for all the partitions of all the topics, used to track the status of the Kafka broker nodes and maintain the client quotas (how much data a producer/consumer is allowed to read/write).

How to set up Kafka locally?

 Download Apache Kafka latest version and extract it
Ensure the Java Version is available on the system
Open terminal (CLI): java – -version
 
Download Java
 
Start the Kafka Environment with zookeeper
//Start the zookeeper service

Move to the downloaded kafka file path 

$ bin/zookeeper-server-start.sh config/zookeeper.properties

Open another terminal session and Start the kafka  broker service

$ bin/kafka-server-start.sh config/server.properties

 

First Spring Boot Kafka Project

From spring initializer generate a starter Spring Boot Application with the following dependencies.

 

image3

Maven, Spring Boot 2.7.9, Group: com.alphanove, Name: spring-boot-kafka, packaging – JAR, Java 11, Spring Web, Spring for Apache Kafka.

Open IntelliJ IDEA to write the code to call REST Endpoints which trigger the producer to send message to the kafka broker for the consumer to subscribe to the topic.

Establish connection to kafka in application .properties file. 

Configure Kafka producer and consumer

spring.kafka.producer.bootstrap-servers: localhost:9092
spring.kafka.consumer.bootstrap-servers: localhost:9092

Kafka broker by default runs on port 9092 

If u have more than one broker (recommended 3 for fault tolerance as in if a server breakes the messages should not be lost) 

#spring.kafka.consumer.bootstrap-servers:localhost:9092,localhost:9091,localhost:9093

Configure consumer group by providing id.

spring.kafka.consumer.group-id: myConsumerGroup

Set offset to the earliest

spring.kafka.consumer.auto-offset-reset:earliest

For the first task lets send messages from producer to consumer in plain text. Let’s use inbuilt StringSerializer and  StringDeserializer

#consumer key and value deserializer
spring.kafka.consumer.key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

#producer key and value deserializer
spring.kafka.producer.key-serializer: org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer: org.apache.kafka.common.serialization.StringSerializer

By far, the configuration is done. Let’s start the actual code.

Create a package (name: config )in in the com.alphanove.spring-boot-kafka package and a class (name:KafkaTopicConfig) in kafka package.  Create a plainTextTopic method to send messages in plain text.

@Configuration
public class KafkaTopicConfig {

@Bean
public NewTopic plainTextTopic() {
return TopicBuilder.name(“PlainTextJavaExample”)
.build();
//.partitions(10) not creating partitions taking default
}

}

Import necessary built in classes

import org.apache.kafka.clients.admin.NewTopic;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.TopicBuilder;

Create a package (name: kafka )in in the com.alphanove.spring-boot-kafka package and a class (name:KafkaProducer) in kafka package.

@Service
public class KafkaProducer {
//Default logger to log the msg
private static final Logger LOGGER = LoggerFactory.getLogger(KafkaProducer.class);

//inject inbuild spring kafka template to call producer
private KafkaTemplate<String, String> kafkaTemplate;

//constructor
public KafkaProducer(KafkaTemplate<String, String> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}

//create a method which uses kafka method to send a message
public void sendMessage(String message){
LOGGER.info(String.format("Message sent is : %s",message));
//send a message to JavaExample Topic
kafkaTemplate.send("PlainTextJavaExample", message);
}
}

Import the necessary classes

import org.slf4j.LoggerFactory;
import org.slf4j.Logger;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;

Create a class (name:KafkaConsumer) in kafka package.

@Service
public class KafkaConsumer {

//create a subscriber(consume) method which will subscribe to the topic
//provide consumer group in the annotation kafka listener
//Logger
private static final Logger LOGGER = LoggerFactory.getLogger(KafkaConsumer.class);
@KafkaListener(topics = "PlainTextJavaExample",groupId = "myConsumerGroup")
public void consume(String message) {
LOGGER.info(String.format("Received message:%s ",message));
}
}

The imports!

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;

Now we need to call the endpoint to send message with the producer

Create a package (name: controller )in in the com.alphanove.spring-boot-kafka package and a class (name:MsgController) in controller package.

@RestController
@RequestMapping("/api/v1/kafka")
public class MsgController {

//inject kafka producer
private KafkaProducer kafkaProducer;

//@Autowired not required since only one constructor is available
//gen constructor
public MsgController(KafkaProducer kafkaProducer) {
this.kafkaProducer = kafkaProducer;
}

//http:localhost:8080/api/v1/kafka/publish?message=hello world
@GetMapping("/publish")
public ResponseEntity<String> publish(@RequestParam("message") String message){

kafkaProducer.sendMessage(message);
return ResponseEntity.ok("Message sent to the topic is : "+ message);

}

}

And the imports

import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.*;

Start the consumer service by running the following command on CLI by specifying the topic name and run the application on Intellij IDEA

bin/kafka-console-producer.sh --topic PlainTextJavaExample --bootstrap-server localhost:9092

In your browser send a get request and it can be seen on the CLI and in the logs in Intellj

http://localhost:8080/api/v1/kafka/publish?message=hello springboot kafka

Postman can also be used to call the API endpoints.