I have an image of tramps with a Spark, Spark, Accumulo 1.6 and Hadoop laptop. From the laptop, I can manually create a scanner and pull out the test data from the table that I created using one of Accumulo's examples:
val instanceNameS = "accumulo"
val zooServersS = "localhost:2181"
val instance: Instance = new ZooKeeperInstance(instanceNameS, zooServersS)
val connector: Connector = instance.getConnector( "root", new PasswordToken("password"))
val auths = new Authorizations("exampleVis")
val scanner = connector.createScanner("batchtest1", auths)
scanner.setRange(new Range("row_0000000000", "row_0000000010"))
for(entry: Entry[Key, Value] <- scanner) {
println(entry.getKey + " is " + entry.getValue)
}
will give the first ten rows of table data.
When I try to create an RDD this way:
val rdd2 =
sparkContext.newAPIHadoopRDD (
new Configuration(),
classOf[org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat],
classOf[org.apache.accumulo.core.data.Key],
classOf[org.apache.accumulo.core.data.Value]
)
I get an RDD returned to me that I cannot do much because of the following error:
java.io.IOException: . org.apache.accumulo.core.client.mapreduce.lib.impl.InputConfigurator.validateOptions(InputConfigurator.java:630) org.apache.accumulo.core.client.mapreduce.AbstractInputFormat.validateOptions(AbstractInputFormat.java:343) org.apache.accumulo.core.client.mapreduce.AbstractInputFormat.getSplits(AbstractInputFormat.java:538) org.apache.spark.rdd.NewHadoopRDD.getPartitions(NewHadoopRDD.scala: 98) org.apache.spark.rdd.RDD $$ anonfun $ $2.Apply(RDD.scala: 222) org.apache.spark.rdd.RDD $$ anonfun $ $2.Apply(RDD.scala: 220) scala.Option.getOrElse(Option.scala: 120) org.apache.spark.rdd.RDD.partitions(RDD.scala: 220) at org.apache.spark.SparkContext.runJob(SparkContext.scala: 1367) at org.apache.spark.rdd.RDD.count(RDD.scala: 927)
, - , , auths ..
, : , RDD?
, . , ,
org.apache.accumulo.core.client.mapreduce
&
org.apache.accumulo.core.client.mapred
, , . , , , , . Sietse . , , :
import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.conf.Configuration
val jobConf = new JobConf(new Configuration)
import org.apache.hadoop.mapred.JobConf import org.apache.hadoop.conf.Configuration jobConf: org.apache.hadoop.mapred.JobConf = : core-default.xml, core-site.xml, mapred-default.xml, mapred-site.xml, yarn-default.xml, -site.xml
: core-default.xml, core-site.xml, mapred-default.xml, mapred-site.xml, yarn-default.xml, yarn-site.xml
AbstractInputFormat.setConnectorInfo(jobConf,
"root",
new PasswordToken("password")
AbstractInputFormat.setScanAuthorizations(jobConf, auths)
AbstractInputFormat.setZooKeeperInstance(jobConf, new ClientConfiguration)
val rdd2 =
sparkContext.hadoopRDD (
jobConf,
classOf[org.apache.accumulo.core.client.mapred.AccumuloInputFormat],
classOf[org.apache.accumulo.core.data.Key],
classOf[org.apache.accumulo.core.data.Value],
1
)
rdd2: org.apache.spark.rdd.RDD [(org.apache.accumulo.core.data.Key, org.apache.accumul.core.data.Value)] = HadoopRDD [1] at hadoopRDD at: 62
rdd2.first
java.io.IOException: . org.apache.accumulo.core.client.mapreduce.lib.impl.InputConfigurator.validateOptions(InputConfigurator.java:630) org.apache.accumulo.core.client.mapred.AbstractInputFormat.validateOptions(AbstractInputFormat.java:308) org.apache.accumulo.core.client.mapred.AbstractInputFormat.getSplits(AbstractInputFormat.java:505) org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala: 201) org.apache.spark.rdd.RDD $$ anonfun $ $2.Apply(RDD.scala: 222) org.apache.spark.rdd.RDD $$ anonfun $ $2.Apply(RDD.scala: 220) scala.Option.getOrElse(Option.scala: 120) org.apache.spark.rdd.RDD.partitions(RDD.scala: 220) at org.apache.spark.rdd.RDD.take(RDD.scala: 1077) org.apache.spark.rdd.RDD.first(RDD.scala: 1110) at $IWC $$ IWC $$ IWC IWC $$ $$ $$ IWC IWC IWC $$ $$ $$ IWC IWC IWC $$ $$ $$ IWC IWC IWC $$ $$ $$ IWC IWC IWC $$ $$ IWC $$ $$ IWC IWC IWC $$ $$ $$ IWC IWC IWC $$ $$ $$ IWC IWC. (64) $IWC $$ IWC $$ IWC IWC $$ $$ $$ IWC IWC IWC $$ $$ $$ IWC IWC IWC $$ $$ $$ IWC IWC IWC $$ $$ $$ IWC IWC IWC $$ $$ IWC $$ IWC $$ $$ IWC IWC IWC $$ $$ $$ IWC IWC IWC $$. (: 69) ...
* edit 2 *
re: Holden - :
AbstractInputFormat.setConnectorInfo(jobConf,
"root",
new PasswordToken("password")
AbstractInputFormat.setScanAuthorizations(jobConf, auths)
AbstractInputFormat.setZooKeeperInstance(jobConf, new ClientConfiguration)
InputFormatBase.setInputTableName(jobConf, "batchtest1")
val rddX = sparkContext.newAPIHadoopRDD(
jobConf,
classOf[org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat],
classOf[org.apache.accumulo.core.data.Key],
classOf[org.apache.accumulo.core.data.Value]
)
rddX: org.apache.spark.rdd.RDD [(org.apache.accumulo.core.data.Key, org.apache.accumul.core.data.Value)] = NewHadoopRDD [0] at newAPIHadoopRDD at: 58
Out [15]: NewHadoopRDD [0] at newAPIHadoopRDD at: 58
rddX.first
java.io.IOException: . org.apache.accumulo.core.client.mapreduce.lib.impl.InputConfigurator.validateOptions(InputConfigurator.java:630) org.apache.accumulo.core.client.mapreduce.AbstractInputFormat.validateOptions(AbstractInputFormat.java:343) org.apache.accumulo.core.client.mapreduce.AbstractInputFormat.getSplits(AbstractInputFormat.java:538) org.apache.spark.rdd.NewHadoopRDD.getPartitions(NewHadoopRDD.scala: 98) org.apache.spark.rdd.RDD $$ anonfun $ $2.Apply(RDD.scala: 222) org.apache.spark.rdd.RDD $$ anonfun $ $2.Apply(RDD.scala: 220) scala.Option.getOrElse(Option.scala: 120) org.apache.spark.rdd.RDD.partitions(RDD.scala: 220) at org.apache.spark.rdd.RDD.take(RDD.scala: 1077) org.apache.spark.rdd.RDD.first(RDD.scala: 1110) at $IWC $$ IWC $$ IWC IWC $$ $$ $$ IWC IWC IWC $$ $$ $$ IWC IWC IWC $$ $$ $$ IWC IWC IWC $$ $$ $$ IWC IWC IWC $$ $$ IWC $$ IWC $$ $$ IWC IWC IWC $$ $$ $$ IWC IWC IWC $$. (: 61)
edit 3 - !
, "input INFO not set". , , , '('
AbstractInputFormat.setConnectorInfo(jobConf, "root", new PasswordToken("password")
-, , . , , , , , "), - , . , setConnectorInfo .
, RDD, .
rddX.count
res15: Long = 10000
- , , 10 000 . , :
rddX.first
:
org.apache.spark.SparkException: - : 0.0 0.0 (TID 0) : org.apache.accumulo.core.data.Key
, ?
4 - !
+ - 90% , , / . , .toString() . - , - .