Access Amazon S3 Public File from Apache Spark

I have an Amazon s3 public resource (text file) and you want to access it from a spark. That means I don't have Amazon credentials - it works fine if I just want to download it:

val bucket = "<my-bucket>" val key = "<my-key>" val client = new AmazonS3Client val o = client.getObject(bucket, key) val content = o.getObjectContent // <= can be read and used as input stream 

However, when I try to access the same resource from a spark context

 val conf = new SparkConf().setAppName("app").setMaster("local") val sc = new SparkContext(conf) val f = sc.textFile(s"s3a://$bucket/$key") println(f.count()) 

I get the following error with stacktrace:

 Exception in thread "main" com.amazonaws.AmazonClientException: Unable to load AWS credentials from any provider in the chain at com.amazonaws.auth.AWSCredentialsProviderChain.getCredentials(AWSCredentialsProviderChain.java:117) at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:3521) at com.amazonaws.services.s3.AmazonS3Client.headBucket(AmazonS3Client.java:1031) at com.amazonaws.services.s3.AmazonS3Client.doesBucketExist(AmazonS3Client.java:994) at org.apache.hadoop.fs.s3a.S3AFileSystem.initialize(S3AFileSystem.java:297) at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2653) at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:92) at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2687) at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2669) at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:371) at org.apache.hadoop.fs.Path.getFileSystem(Path.java:295) at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:221) at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:270) at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:207) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217) at scala.Option.getOrElse(Option.scala:121) at org.apache.spark.rdd.RDD.partitions(RDD.scala:217) at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217) at scala.Option.getOrElse(Option.scala:121) at org.apache.spark.rdd.RDD.partitions(RDD.scala:217) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1781) at org.apache.spark.rdd.RDD.count(RDD.scala:1099) at com.example.Main$.main(Main.scala:14) at com.example.Main.main(Main.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:497) at com.intellij.rt.execution.application.AppMain.main(AppMain.java:140) 

I do not want to provide any AWS data - I just want to access the resource anonymously (at the moment) - how to do this? I probably need to do something like AnonymousAWSCredentialsProvider, but how to put it inside a spark or hadoop?

PS My build.sbt just in case

 scalaVersion := "2.11.7" libraryDependencies ++= Seq( "org.apache.spark" %% "spark-core" % "1.4.1", "org.apache.hadoop" % "hadoop-aws" % "2.7.1" ) 

UPDATED: after I have done some research, I see the reason why it does not work.

First of all, S3AFileSystem creates an AWS client with the following credential order:

 AWSCredentialsProviderChain credentials = new AWSCredentialsProviderChain( new BasicAWSCredentialsProvider(accessKey, secretKey), new InstanceProfileCredentialsProvider(), new AnonymousAWSCredentialsProvider() ); 

the values ​​of "accessKey" and "secretKey" are taken from the intrinsic safety instance (the keys must be "fs.s3a.access.key" and "fs.s3a.secret.key" or org.apache.hadoop.fs. s3a.Constants.ACCESS_KEY and org.apache.hadoop.fs.s3a.Constants.SECRET_KEY, which is more convenient).

Secondly - you probably see that AnonymousAWSCredentialsProvider is the third option (last priority) - what could be wrong? See the AnonymousAWSCredentials Implementation:

 public class AnonymousAWSCredentials implements AWSCredentials { public String getAWSAccessKeyId() { return null; } public String getAWSSecretKey() { return null; } } 

It just returns null for the passkey and secret key. Sounds reasonable. But take a look inside AWSCredentialsProviderChain:

 AWSCredentials credentials = provider.getCredentials(); if (credentials.getAWSAccessKeyId() != null && credentials.getAWSSecretKey() != null) { log.debug("Loading credentials from " + provider.toString()); lastUsedProvider = provider; return credentials; } 

He does not select a provider if both keys are equal to zero - this means that anonymous credentials cannot work. Looks like an error in aws-java-sdk-1.7.4. I tried using the latest version - but this is not compatible with hadoop-aws-2.7.1.

Any other ideas?

+8
scala amazon-s3 apache-spark
source share
3 answers

I personally have never accessed public data from Spark. You can try using dummy credentials or create them just for that use. Set them directly to the SparkConf object.

 val sparkConf: SparkConf = ??? val accessKeyId: String = ??? val secretAccessKey: String = ??? sparkConf.set("spark.hadoop.fs.s3.awsAccessKeyId", accessKeyId) sparkConf.set("spark.hadoop.fs.s3n.awsAccessKeyId", accessKeyId) sparkConf.set("spark.hadoop.fs.s3.awsSecretAccessKey", secretAccessKey) sparkConf.set("spark.hadoop.fs.s3n.awsSecretAccessKey", secretAccessKey) 

Alternatively, read the DefaultAWSCredentialsProviderChain documentation to find out where to look for credentials. List (order is important):

  • Environment Variables - AWS_ACCESS_KEY_ID and AWS_SECRET_KEY
  • Java system properties - aws.accessKeyId and aws.secretKey
  • Default location credential profile file (~ / .aws / credentials) shared by all AWS SDKs and AWS CLI
  • Instance profile credentials submitted through Amazon EC2 Metadata Service
+3
source share

It looks like you can now use the aws.credentials.provider configuration key to use the anonymous access provided by org.apache.hadoop.fs.s3a.AnonymousAWSCredentialsProvider, which is the correct special case of the anonymous provider. However, you need a newer version of hadoop-aws than 2.7, which means you also need to install spark without the associated hasoop.

Here is how I did it, kolab:

 !apt-get install openjdk-8-jdk-headless -qq > /dev/null !wget -q http://apache.osuosl.org/spark/spark-2.3.1/spark-2.3.1-bin-without-hadoop.tgz !tar xf spark-2.3.1-bin-without-hadoop.tgz !pip install -q findspark !pip install -q pyarrow 

Now we set the hadoop to the side and set the hadoop classpath output of the hadoop classpath method as SPARK_DIST_CLASSPATH so that SPARK_DIST_CLASSPATH could see it.

 import os !wget -q http://mirror.nbtelecom.com.br/apache/hadoop/common/hadoop-2.8.4/hadoop-2.8.4.tar.gz !tar xf hadoop-2.8.4.tar.gz os.environ['HADOOP_HOME']= '/content/hadoop-2.8.4' os.environ["SPARK_DIST_CLASSPATH"] = "/content/hadoop-2.8.4/etc/hadoop:/content/hadoop-2.8.4/share/hadoop/common/lib/*:/content/hadoop-2.8.4/share/hadoop/common/*:/content/hadoop-2.8.4/share/hadoop/hdfs:/content/hadoop-2.8.4/share/hadoop/hdfs/lib/*:/content/hadoop-2.8.4/share/hadoop/hdfs/*:/content/hadoop-2.8.4/share/hadoop/yarn/lib/*:/content/hadoop-2.8.4/share/hadoop/yarn/*:/content/hadoop-2.8.4/share/hadoop/mapreduce/lib/*:/content/hadoop-2.8.4/share/hadoop/mapreduce/*:/content/hadoop-2.8.4/contrib/capacity-scheduler/*.jar" 

Then we do the same at https://mikestaszel.com/2018/03/07/apache-spark-on-google-colab Laboratory /, but add support for s3a and anonymous reading, which is what we are talking about.

 import os os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64" os.environ["SPARK_HOME"] = "/content/spark-2.3.1-bin-without-hadoop" os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages com.amazonaws:aws-java-sdk:1.10.6,org.apache.hadoop:hadoop-aws:2.8.4 --conf spark.sql.execution.arrow.enabled=true --conf spark.hadoop.fs.s3a.aws.credentials.provider=org.apache.hadoop.fs.s3a.AnonymousAWSCredentialsProvider pyspark-shell' 

And finally, we can create a session.

 import findspark findspark.init() from pyspark.sql import SparkSession spark = SparkSession.builder.master("local[*]").getOrCreate() 
+1
source share

Here is what helped me:

 val session = SparkSession.builder() .appName("App") .master("local[*]") .config("fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.AnonymousAWSCredentialsProvider") .getOrCreate() val df = session.read.csv(filesFromS3:_*) 

Versions:

 "org.apache.spark" %% "spark-sql" % "2.4.0", "org.apache.hadoop" % "hadoop-aws" % "2.8.5", 

Documentation: https://hadoop.apache.org/docs/current/hadoop-aws/tools/hadoop-aws/index.html#Authentication_properties

0
source share

All Articles