Kafka theme splitting and Spark artist display

I use sparking with kafa. The theme is created with 5 sections. All my posts are published in the kafka topic, using tablename as the key. Given this, I assume that all messages for this table should have the same section. But I notice that in the intrinsic safety log messages for the same table sometimes there is a node -1 executor and sometime it goes to a node -2 executor.

I run the code in yarn-cluster mode using the following command:

spark-submit --name DataProcessor --master yarn-cluster --files /opt/ETL_JAR/executor-log4j-spark.xml,/opt/ETL_JAR/driver-log4j-spark.xml,/opt/ETL_JAR/application.properties --conf "spark.driver.extraJavaOptions=-Dlog4j.configuration=driver-log4j-spark.xml" --conf "spark.executor.extraJavaOptions=-Dlog4j.configuration=executor-log4j-spark.xml" --class com.test.DataProcessor /opt/ETL_JAR/etl-all-1.0.jar

and this view creates 1 driver that allows you to say node -1 and 2 artists on node -1 and node -2.

I do not want the node -1 and node -2 executors to read the same section. but it happens

I also tried the following configuration to indicate the consumer group, but no difference.

kafkaParams.put("group.id", "app1");

This is how we create a stream using the createDirectStream * method Not through zookeeper.

    HashMap<String, String> kafkaParams = new HashMap<String, String>();
    kafkaParams.put("metadata.broker.list", brokers);
    kafkaParams.put("auto.offset.reset", "largest");
    kafkaParams.put("group.id", "app1");

        JavaPairInputDStream<String, String> messages = KafkaUtils.createDirectStream(
                jssc, 
                String.class, 
                String.class,
                StringDecoder.class, 
                StringDecoder.class, 
                kafkaParams, 
                topicsSet
        );

The code:

import java.io.Serializable;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;

import org.apache.commons.lang3.StringUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.api.java.JavaStreamingContextFactory;
import org.apache.spark.streaming.kafka.KafkaUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import kafka.serializer.StringDecoder;
import scala.Tuple2;

public class DataProcessor2 implements Serializable {
    private static final long serialVersionUID = 3071125481526170241L;

    private static Logger log = LoggerFactory.getLogger("DataProcessor");

    public static void main(String[] args) {
        final String sparkCheckPointDir = ApplicationProperties.getProperty(Consts.SPARK_CHECKPOINTING_DIR);
        DataProcessorContextFactory3 factory = new DataProcessorContextFactory3();
        JavaStreamingContext jssc = JavaStreamingContext.getOrCreate(sparkCheckPointDir, factory);

        // Start the process
        jssc.start();
        jssc.awaitTermination();
    }

}

class DataProcessorContextFactory3 implements JavaStreamingContextFactory, Serializable {
    private static final long serialVersionUID = 6070911284191531450L;

    private static Logger logger = LoggerFactory.getLogger(DataProcessorContextFactory.class);

    DataProcessorContextFactory3() {
    }

    @Override
    public JavaStreamingContext create() {
        logger.debug("creating new context..!");

        final String brokers = ApplicationProperties.getProperty(Consts.KAFKA_BROKERS_NAME);
        final String topic = ApplicationProperties.getProperty(Consts.KAFKA_TOPIC_NAME);
        final String app = "app1";
        final String offset = ApplicationProperties.getProperty(Consts.KAFKA_CONSUMER_OFFSET, "largest");

        logger.debug("Data processing configuration. brokers={}, topic={}, app={}, offset={}", brokers, topic, app,
                offset);
        if (StringUtils.isBlank(brokers) || StringUtils.isBlank(topic) || StringUtils.isBlank(app)) {
            System.err.println("Usage: DataProcessor <brokers> <topic>\n" + Consts.KAFKA_BROKERS_NAME
                    + " is a list of one or more Kafka brokers separated by comma\n" + Consts.KAFKA_TOPIC_NAME
                    + " is a kafka topic to consume from \n\n\n");
            System.exit(1);
        }
        final String majorVersion = "1.0";
        final String minorVersion = "3";
        final String version = majorVersion + "." + minorVersion;
        final String applicationName = "DataProcessor-" + topic + "-" + version;
        // for dev environment
         SparkConf sparkConf = new SparkConf().setMaster("local[*]").setAppName(applicationName);
        // for cluster environment
        //SparkConf sparkConf = new SparkConf().setAppName(applicationName);
        final long sparkBatchDuration = Long
                .valueOf(ApplicationProperties.getProperty(Consts.SPARK_BATCH_DURATION, "10"));

        final String sparkCheckPointDir = ApplicationProperties.getProperty(Consts.SPARK_CHECKPOINTING_DIR);

        JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, Durations.seconds(sparkBatchDuration));
        logger.debug("setting checkpoint directory={}", sparkCheckPointDir);
        jssc.checkpoint(sparkCheckPointDir);

        HashSet<String> topicsSet = new HashSet<String>(Arrays.asList(topic.split(",")));

        HashMap<String, String> kafkaParams = new HashMap<String, String>();
        kafkaParams.put("metadata.broker.list", brokers);
        kafkaParams.put("auto.offset.reset", offset);
        kafkaParams.put("group.id", "app1");

//          @formatter:off
            JavaPairInputDStream<String, String> messages = KafkaUtils.createDirectStream(
                    jssc, 
                    String.class, 
                    String.class,
                    StringDecoder.class, 
                    StringDecoder.class, 
                    kafkaParams, 
                    topicsSet
            );
//          @formatter:on
        processRDD(messages, app);
        return jssc;
    }

    private void processRDD(JavaPairInputDStream<String, String> messages, final String app) {
        JavaDStream<MsgStruct> rdd = messages.map(new MessageProcessFunction());

        rdd.foreachRDD(new Function<JavaRDD<MsgStruct>, Void>() {

            private static final long serialVersionUID = 250647626267731218L;

            @Override
            public Void call(JavaRDD<MsgStruct> currentRdd) throws Exception {
                if (!currentRdd.isEmpty()) {
                    logger.debug("Receive RDD. Create JobDispatcherFunction at HOST={}", FunctionUtil.getHostName());
                    currentRdd.foreachPartition(new VoidFunction<Iterator<MsgStruct>>() {

                        @Override
                        public void call(Iterator<MsgStruct> arg0) throws Exception {
                            while(arg0.hasNext()){
                                System.out.println(arg0.next().toString());
                            }
                        }
                    });
                } else {
                    logger.debug("Current RDD is empty.");
                }
                return null;
            }
        });
    }
    public static class MessageProcessFunction implements Function<Tuple2<String, String>, MsgStruct> {
        @Override
        public MsgStruct call(Tuple2<String, String> data) throws Exception {
            String message = data._2();
            System.out.println("message:"+message);
            return MsgStruct.parse(message);
        }

    }
    public static class MsgStruct implements Serializable{
        private String message;
        public static MsgStruct parse(String msg){
            MsgStruct m = new MsgStruct();
            m.message = msg;
            return m;
        }
        public String toString(){
            return "content inside="+message;
        }
    }

}
+4
source share
2 answers

Using the approach DirectStream, the correct assumption should be made that messages sent to the Kafka section fall into the same Spark section.

We cannot assume that each Spark partition will be processed by the same Spark worker each time. In each interval of the package, a Spark task is created for each OffsetRangefor each partition and sent to the cluster for processing, landing on some available worker.

. , kafka, kafka, , , Spark Kafka; , .

, , Apache Samza .

+3

Spark Streaming + Kafka ( Kafka 0.10.0 ), .

, (h1 h2), Kafka topic-name . , Java.

Map<TopicPartition, String> partitionMapToHost = new HashMap<>();
// partition 0 -> h1, partition 1 and 2 -> h2
partitionMapToHost.put(new TopicPartition("topic-name", 0), "h1");
partitionMapToHost.put(new TopicPartition("topic-name", 1), "h2");
partitionMapToHost.put(new TopicPartition("topic-name", 2), "h2");
List<String> topicCollection = Arrays.asList("topic-name");
Map<String, Object> kafkaParams = new HasMap<>();
kafkaParams.put("bootstrap.servers", "10.0.0.2:9092,10.0.0.3:9092");
kafkaParams.put("group.id", "group-id-name");
kafkaParams.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
kafkaParams.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
JavaInputDStream<ConsumerRecord<String, String>> records = KafkaUtils.createDirectStream(jssc,
    LocationStrategies.PreferFixed(partitionMapToHost), // PreferFixed is the key
    ConsumerStrategies.Subscribe(topicCollection, kafkaParams));

LocationStrategies.PreferConsistent(), , .

+3

All Articles