Using a custom DataFlow data source on DirectPipelineRunner

I am writing my own unlimited DataFlow data source that reads with Kafka 0.8. I would like to run it locally using DirectPipelineRunner. However, I get the following stack:

Exception in thread "main" java.lang.IllegalStateException: no evaluator registered for Read(KafkaDataflowSource) at com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner$Evaluator.visitTransform(DirectPipelineRunner.java:700) at com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:219) at com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:215) at com.google.cloud.dataflow.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:102) at com.google.cloud.dataflow.sdk.Pipeline.traverseTopologically(Pipeline.java:252) at com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner$Evaluator.run(DirectPipelineRunner.java:662) at com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner.run(DirectPipelineRunner.java:374) at com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner.run(DirectPipelineRunner.java:87) at com.google.cloud.dataflow.sdk.Pipeline.run(Pipeline.java:174) 

Which makes sense since I did not register an evaluator for my user source at any time.

Reading https://github.com/GoogleCloudPlatform/DataflowJavaSDK , it seems that only evaluators for limited sources are logged. What is the recommended way to identify and register an appraiser for a custom unlimited source?

+7
google-cloud-dataflow
source share
1 answer

DirectPipelineRunner currently only works with restricted access. We are actively working to eliminate this limitation and expect it soon.

At the same time, you can trivially turn any UnboundedSource into a BoundedSource for testing purposes using withMaxNumRecords , as in the following example:

 UnboundedSource<String> unboundedSource = ...; // make a Kafka source PCollection<String> boundedKafkaCollection = p.apply(Read.from(unboundedSource).withMaxNumRecords(10)); 

See this issue on GitHub for more details.


Separately, there are several efforts to create the Kafka connector. You might want to contact us and other members using our GitHub repository .

+3
source share

All Articles