How to create a Spark RDD from Accumulo 1.6 in a spark-laptop?

I have an image of tramps with a Spark, Spark, Accumulo 1.6 and Hadoop laptop. From the laptop, I can manually create a scanner and pull out the test data from the table that I created using one of Accumulo's examples:

val instanceNameS = "accumulo"
val zooServersS = "localhost:2181"
val instance: Instance = new ZooKeeperInstance(instanceNameS, zooServersS)
val connector: Connector = instance.getConnector( "root", new PasswordToken("password"))
val auths = new Authorizations("exampleVis")
val scanner = connector.createScanner("batchtest1", auths)

scanner.setRange(new Range("row_0000000000", "row_0000000010"))

for(entry: Entry[Key, Value] <- scanner) {
  println(entry.getKey + " is " + entry.getValue)
}

will give the first ten rows of table data.

When I try to create an RDD this way:

val rdd2 = 
  sparkContext.newAPIHadoopRDD (
    new Configuration(), 
    classOf[org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat], 
    classOf[org.apache.accumulo.core.data.Key], 
    classOf[org.apache.accumulo.core.data.Value]
  )

I get an RDD returned to me that I cannot do much because of the following error:

java.io.IOException: . org.apache.accumulo.core.client.mapreduce.lib.impl.InputConfigurator.validateOptions(InputConfigurator.java:630)    org.apache.accumulo.core.client.mapreduce.AbstractInputFormat.validateOptions(AbstractInputFormat.java:343)    org.apache.accumulo.core.client.mapreduce.AbstractInputFormat.getSplits(AbstractInputFormat.java:538)    org.apache.spark.rdd.NewHadoopRDD.getPartitions(NewHadoopRDD.scala: 98)    org.apache.spark.rdd.RDD $$ anonfun $ $2.Apply(RDD.scala: 222)    org.apache.spark.rdd.RDD $$ anonfun $ $2.Apply(RDD.scala: 220)    scala.Option.getOrElse(Option.scala: 120) org.apache.spark.rdd.RDD.partitions(RDD.scala: 220) at org.apache.spark.SparkContext.runJob(SparkContext.scala: 1367) at org.apache.spark.rdd.RDD.count(RDD.scala: 927)

, - , , auths ..

, : , RDD?

, . , ,

org.apache.accumulo.core.client.mapreduce

&

org.apache.accumulo.core.client.mapred

, , . , , , , . Sietse . , , :

import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.conf.Configuration
val jobConf = new JobConf(new Configuration)

import org.apache.hadoop.mapred.JobConf import org.apache.hadoop.conf.Configuration jobConf: org.apache.hadoop.mapred.JobConf = : core-default.xml, core-site.xml, mapred-default.xml, mapred-site.xml, yarn-default.xml, -site.xml

: core-default.xml, core-site.xml, mapred-default.xml, mapred-site.xml, yarn-default.xml, yarn-site.xml

AbstractInputFormat.setConnectorInfo(jobConf, 
                                     "root", 
                                     new PasswordToken("password")

AbstractInputFormat.setScanAuthorizations(jobConf, auths)

AbstractInputFormat.setZooKeeperInstance(jobConf, new ClientConfiguration)

val rdd2 = 
  sparkContext.hadoopRDD (
    jobConf, 
    classOf[org.apache.accumulo.core.client.mapred.AccumuloInputFormat], 
    classOf[org.apache.accumulo.core.data.Key], 
    classOf[org.apache.accumulo.core.data.Value], 
    1
  )

rdd2: org.apache.spark.rdd.RDD [(org.apache.accumulo.core.data.Key, org.apache.accumul.core.data.Value)] = HadoopRDD [1] at hadoopRDD at: 62

rdd2.first

java.io.IOException: . org.apache.accumulo.core.client.mapreduce.lib.impl.InputConfigurator.validateOptions(InputConfigurator.java:630)    org.apache.accumulo.core.client.mapred.AbstractInputFormat.validateOptions(AbstractInputFormat.java:308)    org.apache.accumulo.core.client.mapred.AbstractInputFormat.getSplits(AbstractInputFormat.java:505)    org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala: 201)    org.apache.spark.rdd.RDD $$ anonfun $ $2.Apply(RDD.scala: 222)    org.apache.spark.rdd.RDD $$ anonfun $ $2.Apply(RDD.scala: 220)    scala.Option.getOrElse(Option.scala: 120) org.apache.spark.rdd.RDD.partitions(RDD.scala: 220) at org.apache.spark.rdd.RDD.take(RDD.scala: 1077) org.apache.spark.rdd.RDD.first(RDD.scala: 1110) at $IWC $$ IWC $$ IWC IWC $$ $$ $$ IWC IWC IWC $$ $$ $$ IWC IWC IWC $$ $$ $$ IWC IWC IWC $$ $$ $$ IWC IWC IWC $$ $$ IWC $$ $$ IWC IWC IWC $$ $$ $$ IWC IWC IWC $$ $$ $$ IWC IWC. (64)    $IWC $$ IWC $$ IWC IWC $$ $$ $$ IWC IWC IWC $$ $$ $$ IWC IWC IWC $$ $$ $$ IWC IWC IWC $$ $$ $$ IWC IWC IWC $$ $$ IWC $$ IWC $$ $$ IWC IWC IWC $$ $$ $$ IWC IWC IWC $$. ​​(: 69)   ...

* edit 2 *

re: Holden - :

    AbstractInputFormat.setConnectorInfo(jobConf, 
                                         "root", 
                                         new PasswordToken("password")
    AbstractInputFormat.setScanAuthorizations(jobConf, auths)
    AbstractInputFormat.setZooKeeperInstance(jobConf, new ClientConfiguration)
    InputFormatBase.setInputTableName(jobConf, "batchtest1")
    val rddX = sparkContext.newAPIHadoopRDD(
      jobConf, 
      classOf[org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat], 
      classOf[org.apache.accumulo.core.data.Key], 
      classOf[org.apache.accumulo.core.data.Value]
      )

rddX: org.apache.spark.rdd.RDD [(org.apache.accumulo.core.data.Key, org.apache.accumul.core.data.Value)] = NewHadoopRDD [0] at newAPIHadoopRDD at: 58

Out [15]: NewHadoopRDD [0] at newAPIHadoopRDD at: 58

rddX.first

java.io.IOException: . org.apache.accumulo.core.client.mapreduce.lib.impl.InputConfigurator.validateOptions(InputConfigurator.java:630)    org.apache.accumulo.core.client.mapreduce.AbstractInputFormat.validateOptions(AbstractInputFormat.java:343)    org.apache.accumulo.core.client.mapreduce.AbstractInputFormat.getSplits(AbstractInputFormat.java:538)    org.apache.spark.rdd.NewHadoopRDD.getPartitions(NewHadoopRDD.scala: 98)    org.apache.spark.rdd.RDD $$ anonfun $ $2.Apply(RDD.scala: 222)    org.apache.spark.rdd.RDD $$ anonfun $ $2.Apply(RDD.scala: 220)    scala.Option.getOrElse(Option.scala: 120) org.apache.spark.rdd.RDD.partitions(RDD.scala: 220) at org.apache.spark.rdd.RDD.take(RDD.scala: 1077) org.apache.spark.rdd.RDD.first(RDD.scala: 1110) at $IWC $$ IWC $$ IWC IWC $$ $$ $$ IWC IWC IWC $$ $$ $$ IWC IWC IWC $$ $$ $$ IWC IWC IWC $$ $$ $$ IWC IWC IWC $$ $$ IWC $$ IWC $$ $$ IWC IWC IWC $$ $$ $$ IWC IWC IWC $$. ​​(: 61)   

edit 3 - !

, "input INFO not set". , , , '('

AbstractInputFormat.setConnectorInfo(jobConf, "root", new PasswordToken("password") 

-, , . , , , , , "), - , . , setConnectorInfo .

, RDD, .

rddX.count

res15: Long = 10000

- , , 10 000 . , :

rddX.first

:

org.apache.spark.SparkException: - : 0.0 0.0 (TID 0) : org.apache.accumulo.core.data.Key

, ?

4 - !

+ - 90% , , / . , .toString() . - , - .

+4
2

, Hadoop JobConf. @Sietse, AccumuloInputFormat , JobConf. , :

val jobConf = new JobConf() // Create a job conf
// Configure the job conf with our accumulo properties
AccumuloInputFormat.setConnectorInfo(jobConf, principal, token)
AccumuloInputFormat.setScanAuthorizations(jobConf, authorizations)
val clientConfig =  new ClientConfiguration().withInstance(instanceName).withZkHosts(zooKeepers)
AccumuloInputFormat.setZooKeeperInstance(jobConf, clientConfig)
AccumuloInputFormat.setInputTableName(jobConf, tableName)
// Create an RDD using the jobConf
val rdd2 = sc.newAPIHadoopRDD(jobConf, 
classOf[org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat], 
classOf[org.apache.accumulo.core.data.Key], 
classOf[org.apache.accumulo.core.data.Value]
)

. , ( ), , . - . . https://github.com/apache/accumulo/blob/bf102d0711103e903afa0589500f5796ad51c366/core/src/main/java/org/apache/accumulo/core/client/mapreduce/lib/impl/ConfiguratorBase.java#L127 ). , , , .

+2

All Articles