For one of my Kafka streams applications, I need to use the functions of both the DSL and the processor API. My application stream
source -> selectKey -> filter -> aggregate (on a window) -> sink
After aggregation, I need to send an aggregated SINGLE message to the receiver. Therefore, I define my topology below
KStreamBuilder builder = new KStreamBuilder(); KStream<String, String> source = builder.stream(source_stream); source.selectKey(new MyKeyValueMapper()) .filterNot((k,v) -> k.equals("UnknownGroup")) .process(() -> new MyProcessor());
I define a custom StateStore and register it with my processor, as shown below
public class MyProcessor implements Processor<String, String> { private ProcessorContext context = null; Serde<HashMapStore> invSerde = Serdes.serdeFrom(invJsonSerializer, invJsonDeserializer); KeyValueStore<String, HashMapStore> invStore = (KeyValueStore) Stores.create("invStore") .withKeys(Serdes.String()) .withValues(invSerde) .persistent() .build() .get(); public MyProcessor() { } @Override public void init(ProcessorContext context) { this.context = context; this.context.register(invStore, false, null);
When I run the application, I get java.lang.NullPointerException
Exception in thread "StreamThread-18" java.lang.NullPointerException at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.flush (MeteredKeyValueStore.java:167) on org.apache.kafka.streams.processor.atemals .flush (ProcessorStateManager.javahaps32) at org.apache.kafka.streams.processor.internals.StreamTask.commit (StreamTask.java:252) at org.apache.kafka.streams.processor.internals.StreamThread.commitOne (StreamThread .java: 446) at org.apache.kafka.streams.processor.internals.StreamThread.commitAll (StreamThread.javarige34) at org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit (StreamThread.java:422 ) on org.apache.kafka.streams.processor.internals.StreamThread.runLoop (StreamThread.java data4040) on org.apache.kafka.streams.processor.internals.StreamThread.run (StreamThread.java:218)
Any idea what is going wrong here?
apache-kafka-streams
Samy
source share