Found out the answer to my own question.
Serialization
The following is a code snippet illustrating the serialization of an instance of Foo to Thrift -compatible bytes (using the Thrift Compact protocol). To use the Binary protocol, replace TCompactProtocol with TBinaryProtocol .
#include <thrift/transport/TBufferTransports.h> #include <thrift/protocol/TCompactProtocol.h> using apache::thrift::protocol::TCompactProtocol; using apache::thrift::transport::TMemoryBuffer; ... ... boost::shared_ptr<TMemoryBuffer> buffer(new TMemoryBuffer()); boost::shared_ptr<TCompactProtocol> protocol(new TCompactProtocol(buffer)); uint8_t **serialized_bytes = reinterpret_cast<uint8_t **>(malloc(sizeof(uint8_t *))); uint32_t num_bytes = 0; // 'foo' is an instance of Foo foo->write(protocol.get()); buffer->getBuffer(serialized_bytes, &num_bytes);
Sending to Kafka Cluster
The following code snippet illustrates how to send Thrift-compatible bytes to a Kafka cluster.
NOTE The kafka client library used below is librdkafka .
#include "rdkafkacpp.h" std::string errstr; // Create global configuration RdKafka::Conf *conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL); conf->set("metadata.broker.list", "localhost:9092", errstr); conf->set("api.version.request", "true", errstr); // Create kafka producer RdKafka::Producer *producer = RdKafka::Producer::create(conf, errstr); // Create topic-specific configuration RdKafka::Topic *topic = RdKafka::Topic::create(producer, "topic_name", nullptr, errstr); auto partition = 1; // Sending the serialized bytes to Kafka cluster auto res = producer->produce( topic, partition, RdKafka::Producer::RK_MSG_COPY /* Copy payload */, serialized_bytes, num_bytes, NULL, NULL); if (res != RdKafka::ERR_NO_ERROR) { std::cerr << "Failed to publish message" << RdKafka::err2str(res) << std::endl; } else { std::cout << "Published message of " << num_bytes << " bytes" << std::endl; } producer->flush(10000);
jithinpt
source share