Drools In Spark for Streaming File

We were able to successfully integrate drools with spark. When we tried to apply the rules from Drools, we were able to do this for a batch file that is present in HDFS, but we tried to use drools for a streaming file so that we could make a decision immediately, but we could not figure out how to do it. Below is a snippet of the code we are trying to achieve.
Case1:

SparkConf conf = new SparkConf().setAppName("sample"); JavaSparkContext sc = new JavaSparkContext(conf); JavaRDD<String> javaRDD = sc.textFile("/user/root/spark/sample.dat"); List<String> store = new ArrayList<String>(); store = javaRDD.collect(); 

Case 2: when we use a streaming context

 SparkConf sparkconf = new SparkConf().setAppName("sparkstreaming"); JavaStreamingContext ssc = new JavaStreamingContext(sparkconf, new Duration(1)); JavaDStream<String> lines = ssc.socketTextStream("xx.xx.xx.xx", xxxx); 

In the first case, we were able to apply our rules to the variable store, but in the second case, we were not able to apply the rules to dstream lines.

If anyone has an idea how to do this, it will be a great help.

+5
source share
1 answer

Here is one way to do it.

  • First create your knowledge session with business rules.

     //Create knowledge and session here KnowledgeBase kbase = KnowledgeBaseFactory.newKnowledgeBase(); KnowledgeBuilder kbuilder = KnowledgeBuilderFactory.newKnowledgeBuilder(); kbuilder.add( ResourceFactory.newFileResource( "rulefile.drl"), ResourceType.DRL ); Collection<KnowledgePackage> pkgs = kbuilder.getKnowledgePackages(); kbase.addKnowledgePackages( pkgs ); final StatelessKnowledgeSession ksession = kbase.newStatelessKnowledgeSession(); 
  • Create a JavaDStream using StreamingContext.

     SparkConf sparkconf = new SparkConf().setAppName("sparkstreaming"); JavaStreamingContext ssc = new JavaStreamingContext(sparkconf, new Duration(1)); JavaDStream<String> lines = ssc.socketTextStream("xx.xx.xx.xx", xxxx); 
  • Call DStream foreachRDD to create facts and run your rules.

     lines.foreachRDD(new Function<JavaRDD<String>, Void>() { @Override public Void call(JavaRDD<String> rdd) throws Exception { List<String> facts = rdd.collect(); //Apply rules on facts here ksession.execute(facts); return null; } }); 
+1
source

Source: https://habr.com/ru/post/1212956/


All Articles