Configuration Of Kafka Topic, Producer And Consumer Using Python (Part-2)

Author: Pulkit Bindal

Introduction:

Apache Kafka is an Open-Source and Distributed Stream Processing platform that stores and handles real-time data. In other words, Kafka is an Event Streaming service that allows users to build event-driven or data-driven applications.

Kafka offers its consumers three primary features:

1. Create and follow streams of records.

2. Store streams of records efficiently in the order that they were created.

3. Real-time records stream processing.

In this blog, you will learn about Kafka Topics, Producers, consumers, steps for creating Kafka Producers, and Consumers using Python. You will also know about the internal configurations of it.

What is Kafka Topic(s)? 

The Kafka Topics are kinda Groups or Logs that store messages and events in a logical order, allowing users to send and receive data between Kafka Servers/Clusters with ease. When a Producer sends messages into a specific Kafka Topic, the topics will append the messages one after another, thereby creating Buffer stock. Furthermore, producers can push messages into the tail of these newly created logs while consumers pull messages off from a specific Kafka Topic. 

By creating Kafka Topics, users can perform Logical Segregation between Messages and Events, which works the same as the concept of different tables having different types of data in a database. In Apache Kafka, you can create any number of topics based on your use cases. However, each topic should have a unique and identifiable name to differentiate it across various Kafka Brokers.

Why do we need Kafka Topic(s)? 

Kafka topics are needed to organize and manage the data being sent to a Kafka cluster by producers. Think of it like organizing your email inbox into different folders for work, personal, and social emails. Kafka topics allow data to be categorized and labeled so that consumers can easily find and retrieve the data they need. Topics also enable the scaling of the cluster to handle large amounts of data and ensure that messages are processed in the correct order.

Why do we need Kafka Producers?

Kafka producers are needed to send data to a Kafka cluster. Just like a person sending a message to a friend, a producer sends data to Kafka. The data can be anything from website logs to stock market data. Producers ensure that the data is formatted correctly and sent to the right place. Once the data is in Kafka topics, consumers can retrieve it and do something useful with it, like analyze it or display it. 

Why do we need Kafka Consumers?

Kafka consumers are needed to retrieve and process data from a Kafka cluster. They act like a person receiving a message from a friend, reading and interpreting it. Consumers retrieve data from Kafka topics and do something useful with it, like storing it in a database or analyzing it to generate insights. They ensure that data is processed in the correct order and can handle large amounts of data, allowing for scalable data processing.

Real-time example Of Kafka:

Here’s an example of how Kafka is used by following technologies in real-time:

1. Slack: When you send a message in a Slack channel, Kafka is responsible for handling that message and delivering it to all the members of that channel in real time. Kafka allows Slack to handle this at scale as it can handle millions of messages per second and ensure that they are delivered to the correct recipients.  Kafka also enables Slack to implement features like real-time presence updates which show when users are online or offline. This is achieved by using Kafka to track the presence of users across different devices and channels and update their status in real-time.

2. LinkedIn: The social networking platform uses Kafka to process large amounts of data in real-time. They use Kafka for various use cases including data ingestion, real-time analytics, and log processing.

3. Uber: The ride-hailing company uses Kafka to handle real-time data streams. They use Kafka to track real-time metrics, monitor their systems, and perform real-time analytics.

4. Twitter: The social networking platform uses Kafka to handle its real-time data streams. They use Kafka to process billions of tweets and user interactions every day and to power real-time analytics and monitoring tools.

5. OTT: Amazon Lionsgate, Netflix, and Zee5 the live video streaming platform, use Kafka to handle their real-time data streams. They use Kafka to process user interactions, analyze viewing patterns, and deliver personalized recommendations to users.

Prerequisites:

  1. Docker Daemon/Desktop and Kafka Cluster/Broker should be up and running. (Ex. localhost:9092)
  2. Package Installer / Utility (Ex. pip)
  3. Python Compiler

Steps to solve the problem:

Here is the step-by-step process you will need to perform to create a Real-Time Data Pipeline with Kafka (Producers & Consumers) using Python. 

By following these steps, you can handle large volumes of data and provide near-real-time insight into various services.

Step 1: To Install and Create Kafka Topic

  1. Download Kafka: Download the most recent stable version of Kafka for Windows by going to the Kafka website (https://kafka.apache.org/downloads). The “binary” download seen in the “Recommended Downloads” section is the one you wish to download.
  1. Extract Kafka: Once you’ve downloaded Kafka, unzip the files to a directory of your choosing, such C:kafka_2.13-3.4.0.
  2. Create Kafka Topic: Kafka may now be used and tested because it is up and running. Run the following command to create a new topic in the Kafka directory.
Command:  ./kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic dummy-data
  1. Verify Kafka: Run the following command to list all available Kafka topics:
Command:  ./kafka-topics.sh --list --bootstrap-server localhost:9092

All the topics that are currently accessible in your Kafka cluster will be listed by this command. Replace 9092 with the correct port number if your Kafka cluster is running on a different port.

Note: You are done now! You now understand how to use the kafka-topics.sh utility in Windows to list Kafka topics.

Step 1: Installing Kafka and Python Client Library

Installing Kafka and a Python client module for Kafka is a must before we can begin creating our data pipeline. Several Python client libraries for Kafka are available and Kafka may be deployed locally or on a distant server. We’ll use the confluent-kafka Python client library in this example which can be installed using pip:

pip install confluent_kafka

Step 2: Creating a Kafka Producer

After we have installed the confluent-kafka library, we can start creating our data pipeline. Creating a Kafka producer that produces fictitious data and transmits it to a topic is the first step. The produce() method in the code below creates some dummy data and uses a Kafka producer to deliver it to the topic dummy data in Kafka:

Complete Source Code: KAFKA-BROKER

from confluent_kafka import Producer

import json

import time

# Kafka configuration

conf = {

    'bootstrap.servers': 'localhost:9092',

    'client.id': 'python-producer'

}

# Kafka producer function

def produce():

    producer = Producer(conf)

    # Produce some dummy data to Kafka topic

    for i in range(10):

        data = {

            'id': i,

            'name': f'name{i}',

            'value': i * 10

        }

        producer.produce('dummy-data', key=str(i), value=json.dumps(data))

        print(data)

        time.sleep(5)

    producer.flush()

# Kafka consumer function

produce()

We first define the Kafka configuration in this code, which comprises the client ID for the producer and the address of the Kafka broker. Once a Kafka producer has been created, we utilize it to deliver some fictitious data to the dummy-data topic by defining the produce() function. A dictionary with some fictitious data and the producer are both contained in the data variable. This data is sent to Kafka by the produce() method. To create a delay between sending messages and being able to monitor their consumption in real-time, we also add time.sleep() function.

Step 3: Creating a Kafka Consumer

We can now establish a Kafka consumer that subscribes to this topic and receives the data in real-time after creating a Kafka producer that sends data to a Kafka subject. In the following code, we write a consume() function that establishes a Kafka consumer and subscribes to the dummy-data topic:

Complete Source Code: KAFKA-BROKER

from confluent_kafka import Consumer

import json

# Kafka consumer function

def consume():

    consumer_conf = {

        'bootstrap.servers': 'localhost:9092',

        'group.id': 'python-consumer',

        'auto.offset.reset': 'earliest'

    }

    consumer = Consumer(consumer_conf)

    # Subscribe to Kafka topic

    consumer.subscribe(['dummy-data'])

    # Consume messages from Kafka topic

    while True:

        msg = consumer.poll(1.0)

        if msg is None:

            continue

        if msg.error():

            print(f'Consumer error: {msg.error()}')

            continue

        # Process received message

        key = msg.key()

        value = json.loads(msg.value())

        print(f'Key: {key}, Value: {value}')

    consumer.close()

consume()

Using the consume() method defined in this code, a Kafka consumer is created and subscribed to the dummy-data topic. The consumer’s Kafka configuration is stored in the consumer conf dictionary. This configuration includes the consumer’s group ID, the address of the Kafka broker, and an auto.offset.reset value that is set to the earliest to force the consumer to begin consuming at the beginning of the topic. After that, we use the Consumer() function in the confluent-kafka library to build a Kafka consumer and the subscribe() method to subscribe to the dummy-data topic.

We utilize the poll() method within the while loop to retrieve messages from the Kafka topic. When a message is received, we use the error() method to determine whether it contains any problems. If there are no problems, we use the key() and value() procedures to determine the message’s key and value. The value in this instance is a JSON string, therefore before processing it, we transform it into a dictionary using the json.loads() method. The message’s key and value are then printed to the console.

Step 4: Running the Producer and Consumer

Our producer and consumer functions have now been defined, and we can now execute them in separate terminal windows to begin sending and receiving data in real time. Just call the below command:

python producer.py

This will generate some dummy data and send it to the dummy-data Kafka topic.

To run the consumer, execute the below command:

python consumer.py

With this, data from the dummy data topic will begin to be consumed, and the console will be printed with each message’s key and value.

Conclusion:

In this blog, we have learned about Apache Kafka, Apache Kafka Topics, Producers, Consumers, and the steps to configure it. We have learned the Manual Method of creating Topics and customizing Topic Configurations in Apache Kafka by the command-line tool or command prompt. By following these steps, you can handle large volumes of data and provide near-real-time insights for various applications. 

However, you can also use the Kafka Admin API, i.e., TopicBuilder Class, to programmatically implement the Topic Creation and its operations. We can discuss that later in my other blog. Overall, Kafka and Python provide a powerful combination for building data-intensive applications.

We use cookies on this site to enhance your user experience. For a complete overview of how we use cookies, please see our privacy policy.