Kafka Integration With Mulesoft

Author: Ravikant Kumar

1. Introduction To Apache Kafka

Apache Kafka is a framework implementation of a software bus using stream-processing. It is an open-source software platform developed by the Apache Software Foundation and it is written in Scala and JAVA. 

The project aims to provide:

  • Unified
  • High-Throughput
  • Low-latency platform
  • Scalable
  • No Data Loss
  • Reduces Complexity

For handling the real-time data feeds.

2. Basic Terminologies of Kafka

Topic- is a category or feed name to which messages are published. A topic can have a zero, one or many consumers who can subscribe to the data written to it.

Partition- A topic can have one or more partitions associated with handling large volumes of data. Each partition is an ordered, immutable sequence of records continually appended to- a structured commit log.

Partition Offset- The partitions’ records are each assigned a sequential id number that uniquely identifies each record within the partition.

Brokers- A kafka server is known as a broker. Every broker has a unique id to identify it. It manages the storage of messages in the topic.

Kafka Cluster- Kafka brokers form a Kafka cluster. More than one broker form a cluster and each broker has a unique id. The kafka cluster consists of many brokers running on many servers.

Producers- Publish data to the topics. The producer is responsible for choosing which record to assign to which partition within the topic.

Consumers- Read and process data from topics.

3. Software Versions Used in this Demo
  • Java 1.8
  • Windows 10
  • Mule Runtime 4.3.0 EE
  • Anypoint Studio 7.10.0
  • Apache Kafka 2.13-2.8.0
  • Apache Kafka Connector-Mule 4 Version-4.3.3
4. Getting started with Apache Kafka

To use kafka you must have pre pre-installed jdk on your system. If not, kindly do that before proceeding. Install the latest version of Apache kafka binaries from https://kafka.apache.org/downloads . Extract the binary in your C folder.

Note: In the earlier version of kafka, we also needed to install zookeeper separately, but in recent versions, all the zookeeper dependencies are already included in the kafka binaries itself.

4.1 Disabling Hyper-V

If you are using any Docker server, on your local server, kafka will not work; disable your Hyper-V by following the below instructions:

Go to your control panel and then uninstall a program 

Click on Turn windows features on or off

Ensure the Hyper-V is unchecked, and click on it. It may ask you to restart your system, do that.

Now, you are ready to proceed working with kafka.

4.2 Necessary changes in extracted kafka files

Go to your kafka>config folder. You should see the following files. We need to make changes in the zookeeper.properties file and server.properties file.

.

Open the zookeeper.properties file using Notepad/Notepad++ and give a proper address to the dataDir variable. You can also change the port from the same file if you needed.

Open the server.properties file using any editor of your choice and properly address the logs.dirs variable.

Note: These dirs should be changed because of the storage issue. By default it comes with the temp file location, and we have to change it to our folder to increase the storage capacity of logs and to run the server and zookeeper smoothly.

Also add the following lines in the internal topic settings section as shown in the exhibit below:

offsets.topic.num.partitions=1offsets.topic.replication.factor=1transaction.state.log.replication.factor=1transaction.state.log.min.isr=1min.insync.replicas=1default.replication.factor=1

Now we are ready to deploy the kafka server locally.

4.3 Zookeeper Server Deployment

Kafka needs a zookeeper to manage the cluster. Zookeeper is used to coordinate the brokers/clusters network. Zookeeper is a consistent file system for configuration information. So, we need to start the zookeeper prior to our kafka server on our machine.

Open command prompt and navigate to the kafka directory. Run the following command to start the zookeeper:

.\bin\windows\zookeeper-server-start.bat .\config\zookeeper.properties

Your zookeeper should start on port 2181.

4.4 Kafka Server Deployment

Open a new command prompt and go to your kafka folder. Navigate to the path bin/windows.  And then run the following command to deploy your server:

kafka-server-start.bat ../../config/server.properties

Your kafka server should start.

4.5 Create a Topic

Open a new command prompt in kafka/bin/windows and run the following command to create a topic:

kafka-topics.bat –create –zookeeper localhost:2181 –replication-factor 1 –partitions 1 –topic <TopicName>

To verify, if the topic is successfully created or not, use the following command:

kafka-topics.bat -list -zookeeper localhost:2181

You can see the list of topics present.

5. Create Mulesoft Flow for Publish Operation

Note: Before running the flows make sure that the zookeeper and the server are running on your local machine.

Let us now integrate this kafka server with Mule ESB. Create a new Project and import the latest version of Apache Kafka Connector from Anypoint Exchange.

You should see the list of kafka processors available in the module as shown above.

Create a simple flow that looks like as follows:

Set all the configurations as follows:

Listener Configuration

Set the method to POST in the advanced tab of the listener properties.

Leave the rest of the configurations as default.

Logger

Log a message stating that the message has been received to publish to kafka.

Publish

Create a global connector configuration element for kafka producer as shown in the below exhibit.

And click on the test connection. Your connection should be successful.

Click OK and then again OK.

Add this configuration in the general tab of publish properties. Give the topic name which we created earlier. And in the message section we will take the data from the HTTP request body. Hence, in the message section give payload in expression mode.

Also we are giving the key as the current time using #[now()].

Following is the xml code for the flow:

<?xml version=”1.0″ encoding=”UTF-8″?>
<mule xmlns:http=”http://www.mulesoft.org/schema/mule/http” xmlns:kafka=”http://www.mulesoft.org/schema/mule/kafka” xmlns=”http://www.mulesoft.org/schema/mule/core”  xmlns:doc=”http://www.mulesoft.org/schema/mule/documentation”  xmlns:xsi=”http://www.w3.org/2001/XMLSchema-instance”  xsi:schemaLocation=”http://www.mulesoft.org/schema/mule/core http://www.mulesoft.org/schema/mule/core/current/mule.xsdhttp://www.mulesoft.org/schema/mule/kafka http://www.mulesoft.org/schema/mule/kafka/current/mule-kafka.xsdhttp://www.mulesoft.org/schema/mule/secure-properties http://www.mulesoft.org/schema/mule/secure-properties/current/mule-secure-properties.xsdhttp://www.mulesoft.org/schema/mule/http http://www.mulesoft.org/schema/mule/http/current/mule-http.xsd”> <http:listener-config name=”HTTP_Listener_config” doc:name=”HTTP Listener config” doc:id=”6bece4e7-abe1-467d-b73a-c56c9e80dc7c” > <http:listener-connection host=”0.0.0.0″ port=”8089″ /> </http:listener-config> <kafka:producer-config name=”Apache_Kafka_Producer_configuration” doc:name=”Apache Kafka Producer configuration”  doc:id=”d8e90197-b797-4d14-95de-0d80448da6a9″ > <kafka:producer-plaintext-connection partitioner=”ROUND_ROBIN”> <kafka:bootstrap-servers > <kafka:bootstrap-server value=”localhost:9092″ /> </kafka:bootstrap-servers> </kafka:producer-plaintext-connection> </kafka:producer-config>          <flow name=”Publish-flow” doc:id=”5b88c75c-8072-46c1-b000-b71f0b7caaaf” > <http:listener doc:name=”Listener” doc:id=”fb52500c-604c-4939-9a4d-4725bb1bb56e” config-ref=”HTTP_Listener_config” path=”/publish” allowedMethods=”POST”> <http:response > <http:body ><![CDATA[#[“Message sent to kafka producer”]]]></http:body> </http:response> </http:listener> <logger level=”INFO” doc:name=”Logger” doc:id=”192c2478-fefa-45cb-87d9-e3d8c87e1f6e” message=”Received message to publish”/> <kafka:publish doc:name=”Publish” doc:id=”a248980c-efd0-4368-b7cf-49b0cc703d0e” topic=”MuleTopic” config-ref=”Apache_Kafka_Producer_configuration” key=”#[now()]”> <reconnect /> </kafka:publish>     </flow></mule>
6. Create Mulesoft Consume Flow

Now we will create a flow to consume the messages published in the kafka topic.

Create a simple flow as below:

Set all the configurations as below:

Create a global connector configuration element for kafka Consumer as shown in the below exhibit.

Click on test connection. Your connection should be successful.

Click on OK and then again OK.

Leave all other properties to default.

Logger:

Just logging a message to know that the message is successfully consumed.

XML code for the flow is given below:

<?xml version=”1.0″ encoding=”UTF-8″?>
<mule xmlns:http=”http://www.mulesoft.org/schema/mule/http” xmlns:kafka=”http://www.mulesoft.org/schema/mule/kafka” xmlns=”http://www.mulesoft.org/schema/mule/core”  xmlns:doc=”http://www.mulesoft.org/schema/mule/documentation”  xmlns:xsi=”http://www.w3.org/2001/XMLSchema-instance”  xsi:schemaLocation=”http://www.mulesoft.org/schema/mule/core http://www.mulesoft.org/schema/mule/core/current/mule.xsdhttp://www.mulesoft.org/schema/mule/kafka http://www.mulesoft.org/schema/mule/kafka/current/mule-kafka.xsdhttp://www.mulesoft.org/schema/mule/secure-properties http://www.mulesoft.org/schema/mule/secure-properties/current/mule-secure-properties.xsdhttp://www.mulesoft.org/schema/mule/http http://www.mulesoft.org/schema/mule/http/current/mule-http.xsd”> <http:listener-config name=”HTTP_Listener_config” doc:name=”HTTP Listener config” doc:id=”6bece4e7-abe1-467d-b73a-c56c9e80dc7c” > <http:listener-connection host=”0.0.0.0″ port=”8089″ /> </http:listener-config>          <kafka:consumer-config name=”Apache_Kafka_Consumer_configuration” doc:name=”Apache Kafka Consumer configuration” doc:id=”64f6184c-016f-4403-b246-1cd551e13b79″ > <kafka:consumer-plaintext-connection groupId=”test-consumer-group”> <kafka:bootstrap-servers > <kafka:bootstrap-server value=”localhost:9092″ /> </kafka:bootstrap-servers> <kafka:topic-patterns > <kafka:topic-pattern value=”DemoTopic” /> </kafka:topic-patterns> </kafka:consumer-plaintext-connection> </kafka:consumer-config><flow name=”ConsumeFlow” doc:id=”558e2042-e8d3-400c-ac2e-fc6009ea43bd” > <kafka:message-listener doc:name=”Message listener” doc:id=”0f8dbcc6-4675-42c6-ad1c-728e858ca99d” config-ref=”Apache_Kafka_Consumer_configuration” ackMode=”AUTO”/> <logger level=”INFO” doc:name=”Logger” doc:id=”0a9cea42-188c-4a87-8264-30417014d178″ message=”#[‘\n Logging the consumed payload: ‘++ payload]”/></flow>

Now we are ready to test the application. Deploy the application.

Check in the console, once the application is deployed, go to your Postman/AdvanceRestClient and send a POST request as shown below.

You should receive the following message in the response tab:

By checking the console of anypoint studio, confirm that you get a response from the publish flow logger component as below:

And by consume-flow logger component also:

7. Parallel Processing in Kafka

In Kafka, the topic partition is the unit of parallelism. The more partitions, the higher the processing parallelism. For example, if a topic has 30 partitions, then an application can run up to 30 instances of itself such as 30 Docker containers to process the topic’s data collaboratively and in parallel. Each of the 30 instances will get exclusive access to one partition, whose messages it will process sequentially. Any instances beyond 30 will remain idle. Message ordering is also guaranteed by the partition construct; each message will be processed in the order in which it is written to the partition.

  1. To achieve parallel processing using Kafka first we will have to create a topic with multiple partitions.
  2. Then in the general properties of Message Listener increase the amount of parallel consumers and then you can achieve parallelism in kafka.

Try Running the application and you will see the parallelism working on your local machine.

8. Commands Used for Local Kafka Setup
Start Zookeeper: .\bin\windows\zookeeper-server-start.bat .\config\zookeeper.properties
Go to kafka/bin/windows folder and then give the below command
Start Kafka Server: kafka-server-start.bat ../../config/server.properties
Create a Topic: kafka-topics.bat –create –zookeeper localhost:2181 –replication-factor 1 –partitions 1 –topic TopicName<topic name>
Describe a Topic: kafka-topics.bat –describe –zookeeper localhost:2181 –topic TopicName<topic name>
List Kafka Topics: .\bin\windows\kafka-topics.bat –list –zookeeper localhost:2181Delete a topic: kafka-run-class.bat kafka.admin.TopicCommand –delete –topic TopicName<topic name> –zookeeper localhost:2181
Start Kafka Producer: kafka-console-producer.bat –broker-list localhost:9092 –topic TopicName<topic name>
Start Kafka Consumer: kafka-console-consumer.bat –bootstrap-server localhost:9092 –topic TopicName<topic name>

We use cookies on this site to enhance your user experience. For a complete overview of how we use cookies, please see our privacy policy.