How To Send Protobuf Messages To Kafka With A Custom Schema

Author: Dheeraj Mishra

What is Protobuf?

Protocol Buffers is a free, open-source, cross-platform data format used for serializing and deserializing structured data more efficiently. It is platform-neutral and language-independent, involving an interface description language used to prepare data that can be generalized for different programming languages.

Advantage of Protobuf
  1. It supports the complex data structures required for building the data format.
  2. It uses the binary format to transfer files over the network. This is a faster way to transmit data over a network and is efficient for environments with limited storage capacity.
  3. It has a concise schema associated with it, which means it is easy to understand and does not require additional documentation.
  4. It is efficient for communicating with different microservices due to its faster rate of serialization and deserialization.
Disadvantages of Protobuf

  1. It is not supported by all programming languages; instead, it is limited to a few programming languages such as Java, Python, C++, Ruby, etc.
  2. It is not suitable for non-object-oriented programming.
Protobuf vs Json vs XML
  1. Protobuf is a much faster way to transfer data compared to JSON and XML. It takes half the time compared to JSON-structured data.
  2. It has schema validation, which establishes a set of rules for data transfer. In the case of JSON and XML, they do not have validation.
  3. It supports more complex data types, such as “any,” “one Of,” and “enum.” In the case of JSON and XML, these are not supported.
  4. It is primarily suitable for sending data to internal services. JSON is used for sending data from the client to the server and vice versa.
Protobuf Schema

The Protobuf schema is a set of rules that include data types, tags, and other fields. It is used to determine how data should be organized and aids in the serialization and deserialization of data. The schema in Protobuf is stored with the “.proto” file extension. For example:

Compiling and Generating the Java Code


To compile this code, a Protobuf compiler is required. Download the compiler from https://github.com/protocolbuffers/protobuf/releases/ and install it. Set it as an environment variable. Now, compile the code after specifying the target location. We will use Java to generate the Protobuf message. This step is necessary for both the sender and receiver to perform serialization and deserialization.

The command will be –

- protoc {protoschemafile.proto} –java_out={destinationpath}

After successfully compiling the .proto file, a Java file will be generated at the specified destination location.

Implementation of JSON to Protobuf Message Converter

To convert the JSON message to Protobuf format and vice versa, we will require the “protobuf-java-util” library provided by Google, and for dealing with JSON to Java Object conversion and vice versa, Gson is used. Please add these dependencies to the “pom.xml” file.

<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java-util</artifactId>
<version>3.21.12</version>
</dependency>
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>2.10.1</version>
</dependency>
TypeAdapter:

This is a class offered by Gson for creating a custom adapter. It has two abstract methods.

read():

This method is used to convert JSON to the respective Java objects. It takes a JsonReader instance as input, which is provided in the form of a parsed JSON string.

write():

This method is used to convert Java objects to JSON. It takes two parameters as input: a JsonWriter instance and the respective class from which the JSON is going to be produced.

package com.protobuf.operation;

import com.google.gson.JsonParser;
import com.google.gson.TypeAdapter;
import com.google.gson.stream.JsonReader;
import com.google.gson.stream.JsonWriter;
import com.google.protobuf.util.JsonFormat;
import com.protobuf.generator.HorseMessage;

import java.io.IOException;

public class Adapter extends TypeAdapter<HorseMessage.HorseDetails> {
@Override
public void write(JsonWriter jsonWriter, HorseMessage.HorseDetails horseDetails) throws IOException {
jsonWriter.jsonValue(JsonFormat.printer().print(horseDetails));
}

@Override
public HorseMessage.HorseDetails read(JsonReader jsonReader) throws IOException {

HorseMessage.HorseDetails.Builder horseDetailsBuilder = HorseMessage.HorseDetails.newBuilder();
JsonParser jsonParserObj = new JsonParser();
JsonFormat.parser().merge(jsonParserObj.parse(jsonReader).toString(), horseDetailsBuilder);
return horseDetailsBuilder.build();
}
}
registerTypeAdapter:

This is a method used to configure Gson based on custom serializers/deserializers. It takes two parameters as input: an instance creator and a custom adapter for registration.

GsonBuilder gsonBuilderObj = new GsonBuilder();
Gson gsonObj = gsonBuilderObj.registerTypeAdapter(HorseMessage.HorseDetails.class, new Adapter()).create();
Create Instance for Conversion:
package com.protobuf.operation;

import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.util.JsonFormat;
import com.protobuf.generator.HorseMessage;

import java.io.IOException;

public class JsonConverter {
Public static HorseMessage.HorseDetails fromJson(String json) throws IOException {
GsonBuilder gsonBuilderObj = new GsonBuilder();
Gson gsonObj = gsonBuilderObj.registerTypeAdapter(HorseMessage.HorseDetails.class, new Adopter()).create();
HorseMessage.HorseDetails horseDetailsObj = gsonObj.fromJson(json, HorseMessage.HorseDetails.class);
return horseDetailsObj;
}
Public static String toJson(byte[] prtobufMessgeByteArray) throws InvalidProtocolBufferException {
HorseMessage.HorseDetails horseDetails = HorseMessage.HorseDetails.parseFrom(prtobufMessgeByteArray);
String jsonData = JsonFormat.printer().print(horseDetails);
return jsonData;
}
fromJson:

This is a method created to convert JSON to a Protobuf Message. It takes a JSON string as input and returns a Protobuf message. This message can be converted into a byte array by simply calling the “toByteArray()” function.

Input: “{\”breed\”:\”Mustang\”,\”names\”:[{\”name\”:\”Abc\”}],\”tag\”:\”JT\”,\”company\”:\”MST\”,\”id\”:1456789}”

Output(Protobuf message):

toJson:

This method is used to create a JSON message from the respective Protobuf byte array. It takes a byte array as input and provides an output as a Protobuf message.

Input:

Output:

Sending Protobuf message to Kafka Stream:

Now, create a flow configuring Kafka and add a “Kafka publish” connector. Provide the required details to establish a connection with Kafka.

Kafka Configuration:

Now, provide the topic name to which we need to publish the data in the stream.

Now, trigger the endpoint with the provided JSON data.

This data will be converted to a string, and then it will be transformed into a Protobuf message byte array using Java code.

This data will be published to the Kafka stream.

Converting Protobuf to JSON:

Now, configure the listener using the “Kafka listener” to receive the same Protobuf message and convert it to a JSON message.

Now, the triggered message will be received by the listener, and this message will be converted to a JSON string using a Java method.

Output:

{
"breed": "Mustang",
"names": [
{
"name": "Abc"
}
],
"tag": "JT",
"company": "MST",
"id": 1456789
}

NOTE: Kafka is not mandatory to use; it is employed to demonstrate the process of publishing a byte message and subsequently subscribing to the same message, converting it back to JSON.

We use cookies on this site to enhance your user experience. For a complete overview of how we use cookies, please see our privacy policy.