How to query Elasticsearch index using Pyspark and Dataframes

Elasticsaerch documentation only covers loading the full index into Spark.

from pyspark.sql import SQLContext sqlContext = SQLContext(sc) df = sqlContext.read.format("org.elasticsearch.spark.sql").load("index/type") df.printSchema() 

How can you execute a query to return data from an Elasticsearch index and load it into Spark as a DataFrame using pyspark?

+6
source share
3 answers

The following shows how I do it.

General settings and environment command:

 export SPARK_HOME=/home/ezerkar/spark-1.6.0-bin-hadoop2.6 export PYSPARK_DRIVER_PYTHON=ipython2 ./spark-1.6.0-bin-hadoop2.6/bin/pyspark --driver-class-path=/home/eyald/spark-1.6.0-bin-hadoop2.6/lib/elasticsearch-hadoop-2.3.1.jar 

the code:

 from pyspark import SparkConf from pyspark.sql import SQLContext conf = SparkConf().setAppName("ESTest") sc = SparkContext(conf=conf) sqlContext = SQLContext(sc) q ="""{ "query": { "filtered": { "filter": { "exists": { "field": "label" } }, "query": { "match_all": {} } } } }""" es_read_conf = { "es.nodes" : "localhost", "es.port" : "9200", "es.resource" : "titanic/passenger", "es.query" : q } es_rdd = sc.newAPIHadoopRDD( inputFormatClass="org.elasticsearch.hadoop.mr.EsInputFormat", keyClass="org.apache.hadoop.io.NullWritable", valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable", conf=es_read_conf) sqlContext.createDataFrame(es_rdd).collect() 

You can also define columns of frame data. For more information see here .

Hope this helps!

+4
source

I am running my code in an EMR cluster from Amazon using pyspark. Then, as I did this, follow these steps:

1) Put this bootstrap action in the creation of the cluster (to create the localhost elasticsearch server):

 s3://awssupportdatasvcs.com/bootstrap-actions/elasticsearch/elasticsearch_install.4.0.0.rb 

2) I run these commands to populate the elasticsearch database with some data:

  curl -XPUT "http://localhost:9200/movies/movie/1" -d' { "title": "The Godfather", "director": "Francis Ford Coppola", "year": 1972 }' 

You can also run other curl commands if you want:

 curl -XGET http://localhost:9200/_search?pretty=true&q={'matchAll':{''}} 

3) I enabled pyspark using the following options:

 pyspark --driver-memory 5G --executor-memory 10G --executor-cores 2 --jars=elasticsearch-hadoop-5.5.1.jar 

I previously downloaded the python elasticsearch client

4) I run the following code:

 from pyspark import SparkConf from pyspark.sql import SQLContext q ="""{ "query": { "match_all": {} } }""" es_read_conf = { "es.nodes" : "localhost", "es.port" : "9200", "es.resource" : "movies/movie", "es.query" : q } es_rdd = sc.newAPIHadoopRDD( inputFormatClass="org.elasticsearch.hadoop.mr.EsInputFormat", keyClass="org.apache.hadoop.io.NullWritable", valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable", conf=es_read_conf) sqlContext.createDataFrame(es_rdd).collect() 

Then I finally got a successful result from the team.

0
source

I ran into a problem like this to get geofilter data into a PySpark DataFrame data file. I am using elasticsearch-spark-20_2.11-5.2.2.jar with Spark version 2.1.1 and ES version 5.2. I was able to load data into a DataFrame, specifying my query as an option when creating a DataFrame

My geo request

 q ="""{ "query": { "bool" : { "must" : { "match_all" : {} }, "filter" : { "geo_distance" : { "distance" : "100km", "location" : { "lat" : 35.825, "lon" : -87.99 } } } } } }""" 

I used the following command to load data into a DataFrame

 spark_df = spark.read.format("es").option("es.query", q).load("index_name") 
0
source

All Articles