How to add a custom StateStore to a Kafka Streams DSL processor?

For one of my Kafka streams applications, I need to use the functions of both the DSL and the processor API. My application stream

source -> selectKey -> filter -> aggregate (on a window) -> sink 

After aggregation, I need to send an aggregated SINGLE message to the receiver. Therefore, I define my topology below

 KStreamBuilder builder = new KStreamBuilder(); KStream<String, String> source = builder.stream(source_stream); source.selectKey(new MyKeyValueMapper()) .filterNot((k,v) -> k.equals("UnknownGroup")) .process(() -> new MyProcessor()); 

I define a custom StateStore and register it with my processor, as shown below

 public class MyProcessor implements Processor<String, String> { private ProcessorContext context = null; Serde<HashMapStore> invSerde = Serdes.serdeFrom(invJsonSerializer, invJsonDeserializer); KeyValueStore<String, HashMapStore> invStore = (KeyValueStore) Stores.create("invStore") .withKeys(Serdes.String()) .withValues(invSerde) .persistent() .build() .get(); public MyProcessor() { } @Override public void init(ProcessorContext context) { this.context = context; this.context.register(invStore, false, null); // register the store this.context.schedule(10 * 60 * 1000L); } @Override public void process(String partitionKey, String message) { try { MessageModel smb = new MessageModel(message); HashMapStore oldStore = invStore.get(partitionKey); if (oldStore == null) { oldStore = new HashMapStore(); } oldStore.addSmb(smb); invStore.put(partitionKey, oldStore); } catch (Exception e) { e.printStackTrace(); } } @Override public void punctuate(long timestamp) { // processes all the messages in the state store and sends single aggregate message } @Override public void close() { invStore.close(); } } 

When I run the application, I get java.lang.NullPointerException

Exception in thread "StreamThread-18" java.lang.NullPointerException at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.flush (MeteredKeyValueStore.java:167) on org.apache.kafka.streams.processor.atemals .flush (ProcessorStateManager.javahaps32) at org.apache.kafka.streams.processor.internals.StreamTask.commit (StreamTask.java:252) at org.apache.kafka.streams.processor.internals.StreamThread.commitOne (StreamThread .java: 446) at org.apache.kafka.streams.processor.internals.StreamThread.commitAll (StreamThread.javarige34) at org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit (StreamThread.java:422 ) on org.apache.kafka.streams.processor.internals.StreamThread.runLoop (StreamThread.java data4040) on org.apache.kafka.streams.processor.internals.StreamThread.run (StreamThread.java:218)

Any idea what is going wrong here?

+1
apache-kafka-streams
source share
1 answer

You need to register your store outside of your processor using KStreamBuilder . First you create a store, than you register it before KStreamBuilder , and when you add a processor, you provide a store name for connecting the processor and store.

 KStreamBuilder builder = new KStreamBuilder(); // create store StateStoreSupplier storeSupplier = (KeyValueStore)Stores.create("invStore") .withKeys(Serdes.String()) .withValues(invSerde) .persistent() .build(); // register store builder.addStateStore(storeSupplier); KStream<String, String> source = builder.stream(source_stream); source.selectKey(new MyKeyValueMapper()) .filterNot((k,v) -> k.equals("UnknownGroup")) .process(() -> new MyProcessor(), "invStore"); // connect store to processed by providing store name 
+6
source share

All Articles