How to filter keys and value using a processor using Cafka Stream DSL

I have a Processor that interacts with StateStore to filter and execute complex message logic. In the process(key,value) method process(key,value) I use context.forward(key,value) to send the keys and values ​​that I need. For debugging purposes, I also print them.

I have a KStream mergedStream , which is the result of combining two other streams. I want to apply a processor to the records of this stream. I achieve this with: mergedStream.process(myprocessor,"stateStoreName")

When I run this program, I can see the correct values ​​that will be printed on my console. However, if I send mergedStream to a topic using mergedStream.to("topic") , the values ​​in the topic are not the one that I sent to the processor, but the source ones.

I am using kafka-streams 0.10.1.0.

What is the best way to get the values ​​that I sent to the processor to another thread?

Can I mix the processor API with streams created by KStream DSL ?

+4
apache-kafka-streams
source share
1 answer

Short:

To solve your problem, you can use transform(...) instead of process(...) , which also gives you access to the processor API in DSL.

Long

If you use process(...) , you apply the processor to the thread, however this is a “terminate” (or receiver) operation (its return type is void ), that is, it does not return any result (here, “shell” means only that the operator does not have a successor - this does not mean that any result is written somewhere!)

In addition, if you call mergedStream.process(...) and mergedStream.to(...) , you basically expand and duplicate your stream and send one copy to each downstream statement (i.e. one instance per process and one copy on to .

Mixing DSL and the processor API is absolutely possible (you've already done that;)). However, using process(...) , you cannot use forward(...) data in DSL - if you want to use the result of the processor API, you can use transform(...) instead of process(...) .

+10
source share

All Articles