Serializing Thrift serialized structure for Kafka in C ++

I have a set of structs defined in Thrift , for example:

 struct Foo { 1: i32 a, 2: i64 b } 

I need to do the following in C++ :

(a) Serialize Foo instances in Thrift-compatible bars (using the Binary or Compact Thrift protocol)

(b) Submit byte serialized instances to the Kafka theme

Question

How do I send serialized Thrift instances to a Kafka cluster?

Thanks in advance

+2
c ++ serialization thrift apache-kafka
source share
1 answer

Found out the answer to my own question.

Serialization

The following is a code snippet illustrating the serialization of an instance of Foo to Thrift -compatible bytes (using the Thrift Compact protocol). To use the Binary protocol, replace TCompactProtocol with TBinaryProtocol .

 #include <thrift/transport/TBufferTransports.h> #include <thrift/protocol/TCompactProtocol.h> using apache::thrift::protocol::TCompactProtocol; using apache::thrift::transport::TMemoryBuffer; ... ... boost::shared_ptr<TMemoryBuffer> buffer(new TMemoryBuffer()); boost::shared_ptr<TCompactProtocol> protocol(new TCompactProtocol(buffer)); uint8_t **serialized_bytes = reinterpret_cast<uint8_t **>(malloc(sizeof(uint8_t *))); uint32_t num_bytes = 0; // 'foo' is an instance of Foo foo->write(protocol.get()); buffer->getBuffer(serialized_bytes, &num_bytes); 

Sending to Kafka Cluster

The following code snippet illustrates how to send Thrift-compatible bytes to a Kafka cluster.

NOTE The kafka client library used below is librdkafka .

 #include "rdkafkacpp.h" std::string errstr; // Create global configuration RdKafka::Conf *conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL); conf->set("metadata.broker.list", "localhost:9092", errstr); conf->set("api.version.request", "true", errstr); // Create kafka producer RdKafka::Producer *producer = RdKafka::Producer::create(conf, errstr); // Create topic-specific configuration RdKafka::Topic *topic = RdKafka::Topic::create(producer, "topic_name", nullptr, errstr); auto partition = 1; // Sending the serialized bytes to Kafka cluster auto res = producer->produce( topic, partition, RdKafka::Producer::RK_MSG_COPY /* Copy payload */, serialized_bytes, num_bytes, NULL, NULL); if (res != RdKafka::ERR_NO_ERROR) { std::cerr << "Failed to publish message" << RdKafka::err2str(res) << std::endl; } else { std::cout << "Published message of " << num_bytes << " bytes" << std::endl; } producer->flush(10000); 
+2
source share

All Articles