I am trying to process kinesia stream data with a spark. I start my stream with 2 shards. It works fine until I manually broke one of my shards. After that, my program crashes when new data appears.
Here is the error message:
ERROR ShutdownTask: Application exception. java.lang.IllegalArgumentException: Application didn't checkpoint at end of shard shardId-000000000001
How should I handle the closed fragment breakpoint in my program?
EDIT: adding sample code to this question (which is also very important to me).
Code example:
def functionToCreateContext(): StreamingContext = { val ssc = new StreamingContext(sc, Seconds(2) ) ssc.checkpoint(checkpointDirectory) val kinesisStreams = KinesisUtils.createStream(ssc, appName, streamName, endpointUrl,awsRegion,InitialPositionInStream.LATEST,Seconds(1),StorageLevel.MEMORY_ONLY) kinesisStreams.foreachRDD(rdd => ...) ssc } val ssc = StreamingContext.getOrCreate(checkpointDirectory,functionToCreateContext _ ) ssc.start() ssc.awaitTermination()
source share