Error status should be open when using Spark Mongo-Hadoop to read and write to Db

Scenario: I have an RDD compiling from kafka after creating the map and reducing it, I want to write it to the db_aggregate collection date_xx .

When matching, I need to get from this db to get the previous result.

As I write A I need the result B (written earlier) for the calculation, then write A in db.

I think that the problem I am facing is when I read db_aggregate, when I write new records, that the db cursor can close with one action, write or read.

I am using Spark 1.4.1 mongo-hadoop.1.4.1 mongo 2.6

Functions:

 def getPreviousAggregate(campaignId: String, publisher: String, width: Int, height: Int, date: Int, month: Int, year: Int): BasicBSONObject = { findLatestAggregate(campaignId, publisher, width, height, date, month, year) match { case Some(toReturn) => return toReturn case None => { println("Not found previous date ....") val previousDate = Calendar.getInstance(); previousDate.set(year, month, date) previousDate.add(Calendar.DATE, -1) val _date = previousDate.get(Calendar.DATE) val _month = previousDate.get(Calendar.MONTH) val _year = previousDate.get(Calendar.YEAR) findLatestAggregate(campaignId, publisher, width, height, _date, _month, _year) match { case Some(toReturn) => return toReturn case None => { } } } } null } def findLatestAggregate(campaignId: String, publisher: String, width: Int, height: Int, date: Int, month: Int, year: Int): Option[BasicBSONObject] = { val config = new Configuration() val outDb = DB_AGGREGATE + "_%02d_%s".format(month, year) val collName: String = COLL_AGGREGATE + "_%02d".format(date) val mongoInputUri = "mongodb://%s:%s/%s.%s".format(DB_STATISTIC_HOST, DB_STATISTIC_PORT, outDb, collName) config.set("mongo.input.uri", mongoInputUri) try { val aggregate = sc.newAPIHadoopRDD(config, classOf[MongoInputFormat], classOf[Object], classOf[BSONObject]) val res = aggregate.sortBy(k => k._2.get("timestamp").toString, true).filter(r => // Integer.parseInt(r._2.get("timestamp").toString) <= timestamp - BATCH_TIME // && Integer.parseInt(r._2.get("width").toString) == width && Integer.parseInt(r._2.get("height").toString) == height && r._2.get("publisher").toString == publisher && r._2.get("campaignId").toString == campaignId ).map(x => x._2).take(1) if (res.nonEmpty) { println("\nfound previous record") val bson = new BasicBSONObject() val collect: BSONObject = res(0) bson.put("totalBudgetSpent", collect.get("totalBudgetSpent")) bson.put("totalAuctions", collect.get("totalAuctions")) bson.put("totalWin", collect.get("totalWin")) return Some(bson) } } catch { case ex: MongoCommandException => { println(ex.getMessage) } } None } 

Main action here

 // store aggregate data in mongo val outDb = DB_AGGREGATE + "_%02d_%s".format(month, year) val _config = new Configuration() val collName: String = COLL_AGGREGATE + "_%02d".format(date) val mongoOutputAggregateUri: String = "mongodb://%s:%s/%s.%s".format(DB_STATISTIC_HOST, DB_STATISTIC_PORT, outDb, collName) _config.set("mongo.output.uri", mongoOutputAggregateUri) bidWinBSON.map(x => { (x._1, (totalAuctions, totalWin, totalBudgetSpent)) }).reduceByKey((a, b) => { (a._1 + b._1, a._2 + b._2, a._3 + b._3) }).map(f = x => { val timestamp: java.lang.Long = java.lang.Long.parseLong(x._1._1.toString) val campaignId = x._1._2.toString val publisher = x._1._3 val width = x._1._4 val height = x._1._5 // get previous aggregate val previousResult = getPreviousAggregate(campaignId, publisher, width, height, date, month, year) if (previousResult != null) { print("\n\ngetPreviousAggregate\n\n") totalBudgetSpent += java.lang.Long.parseLong(previousResult.get("totalBudgetSpent").toString) totalAuctions += java.lang.Long.parseLong(previousResult.get("totalAuctions").toString) totalWin += java.lang.Long.parseLong(previousResult.get("totalWin").toString) } println("\nAggregate ............................") val bson = new BasicBSONObject() bson.put("timestamp", timestamp) bson.put("campaignId", campaignId) bson.put("publisher", publisher) bson.put("width", width) bson.put("height", height) bson.put("totalAuctions", totalAuctions) bson.put("totalWin", totalWin) bson.put("totalBudgetSpent", totalBudgetSpent) (null, bson) }).saveAsNewAPIHadoopFile("file:///xxx", classOf[Any], classOf[Any], classOf[MongoOutputFormat[Any, Any]], _config) 

I have a mistake

 15/08/27 10:35:44 ERROR Executor: Exception in task 0.0 in stage 19.0 (TID 23) java.lang.IllegalStateException: state should be: open at com.mongodb.assertions.Assertions.isTrue(Assertions.java:70) at com.mongodb.connection.BaseCluster.selectServer(BaseCluster.java:79) at com.mongodb.binding.ClusterBinding$ClusterBindingConnectionSource.<init>(ClusterBinding.java:75) at com.mongodb.binding.ClusterBinding$ClusterBindingConnectionSource.<init>(ClusterBinding.java:71) at com.mongodb.binding.ClusterBinding.getWriteConnectionSource(ClusterBinding.java:68) at com.mongodb.operation.OperationHelper.withConnection(OperationHelper.java:175) at com.mongodb.operation.MixedBulkWriteOperation.execute(MixedBulkWriteOperation.java:141) at com.mongodb.operation.MixedBulkWriteOperation.execute(MixedBulkWriteOperation.java:72) at com.mongodb.Mongo.execute(Mongo.java:745) at com.mongodb.Mongo$2.execute(Mongo.java:728) at com.mongodb.DBCollection.executeBulkWriteOperation(DBCollection.java:1968) at com.mongodb.DBCollection.executeBulkWriteOperation(DBCollection.java:1962) at com.mongodb.BulkWriteOperation.execute(BulkWriteOperation.java:98) at com.mongodb.hadoop.output.MongoOutputCommitter.commitTask(MongoOutputCommitter.java:133) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1$$anonfun$12.apply(PairRDDFunctions.scala:1045) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1$$anonfun$12.apply(PairRDDFunctions.scala:1014) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63) at org.apache.spark.scheduler.Task.run(Task.scala:70) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) 15/08/27 10:35:44 WARN TaskSetManager: Lost task 0.0 in stage 19.0 (TID 23, localhost): java.lang.IllegalStateException: state should be: open at com.mongodb.assertions.Assertions.isTrue(Assertions.java:70) at com.mongodb.connection.BaseCluster.selectServer(BaseCluster.java:79) at com.mongodb.binding.ClusterBinding$ClusterBindingConnectionSource.<init>(ClusterBinding.java:75) at com.mongodb.binding.ClusterBinding$ClusterBindingConnectionSource.<init>(ClusterBinding.java:71) at com.mongodb.binding.ClusterBinding.getWriteConnectionSource(ClusterBinding.java:68) at com.mongodb.operation.OperationHelper.withConnection(OperationHelper.java:175) at com.mongodb.operation.MixedBulkWriteOperation.execute(MixedBulkWriteOperation.java:141) at com.mongodb.operation.MixedBulkWriteOperation.execute(MixedBulkWriteOperation.java:72) at com.mongodb.Mongo.execute(Mongo.java:745) at com.mongodb.Mongo$2.execute(Mongo.java:728) at com.mongodb.DBCollection.executeBulkWriteOperation(DBCollection.java:1968) at com.mongodb.DBCollection.executeBulkWriteOperation(DBCollection.java:1962) at com.mongodb.BulkWriteOperation.execute(BulkWriteOperation.java:98) at com.mongodb.hadoop.output.MongoOutputCommitter.commitTask(MongoOutputCommitter.java:133) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1$$anonfun$12.apply(PairRDDFunctions.scala:1045) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1$$anonfun$12.apply(PairRDDFunctions.scala:1014) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63) at org.apache.spark.scheduler.Task.run(Task.scala:70) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) 
+5
source share

All Articles