How to read Avro file in PySpark

I am writing a spark job using Python. However, I need to read a whole bunch of avro files.

This is the closest solution I found in the Spark examples folder. However, you need to send this script to python using spark-submit. In the spark-submit command line, you can specify the driver class, in which case your whole avrokey class, avrovalue, will be located.

avro_rdd = sc.newAPIHadoopFile( path, "org.apache.avro.mapreduce.AvroKeyInputFormat", "org.apache.avro.mapred.AvroKey", "org.apache.hadoop.io.NullWritable", keyConverter="org.apache.spark.examples.pythonconverters.AvroWrapperToJavaConverter", conf=conf) 

In my case, I need to run everything in a Python script, I tried to create an environment variable that includes a jar file, the Cross Cross Python method will add jar to the path, but obviously it is not, it gives me an unexpected error class.

 os.environ['SPARK_SUBMIT_CLASSPATH'] = "/opt/cloudera/parcels/CDH-5.1.0-1.cdh5.1.0.p0.53/lib/spark/examples/lib/spark-examples_2.10-1.0.0-cdh5.1.0.jar" 

Can someone help me how to read avro file in one Python script?

+12
python apache-spark pyspark avro
source share
2 answers

Spark> = 2.4.0

You can use the built-in support for Avro . The API is backward compatible with the spark-avro package, with a few additions (especially the to_avro function from_avro / to_avro ).

Please note that the module is not associated with standard Spark binaries and must be enabled using spark.jars.packages or an equivalent mechanism.

See also Pyspark 2.4.0, read avro with kafka with read stream - Python

Spark <2.4.0

You can use the spark-avro library. First, let's create an example dataset:

 import avro.schema from avro.datafile import DataFileReader, DataFileWriter schema_string ='''{"namespace": "example.avro", "type": "record", "name": "KeyValue", "fields": [ {"name": "key", "type": "string"}, {"name": "value", "type": ["int", "null"]} ] }''' schema = avro.schema.parse(schema_string) with open("kv.avro", "w") as f, DataFileWriter(f, DatumWriter(), schema) as wrt: wrt.append({"key": "foo", "value": -1}) wrt.append({"key": "bar", "value": 1}) 

Reading it with spark-csv is as simple as this:

 df = sqlContext.read.format("com.databricks.spark.avro").load("kv.avro") df.show() ## +---+-----+ ## |key|value| ## +---+-----+ ## |foo| -1| ## |bar| 1| ## +---+-----+ 
+5
source share

The previous solution requires you to install a third-party Java dependency, which is not something more than most Python developers have. But you do not need an external library if all you want to do is analyze your Avro files using this scheme. You can simply read the binaries and parse them with your favorite Avro python package.

For example, you can upload Avro files using fastavro :

 from io import BytesIO import fastavro schema = { ... } rdd = sc.binaryFiles("/path/to/dataset/*.avro")\ .flatMap(lambda args: fastavro.reader(BytesIO(args[1]), reader_schema=schema)) print(rdd.collect()) 
+5
source share

All Articles