Convert Pandas data to Spark dataframe error

I am trying to convert Pandas DF to Spark. DF head:

10000001,1,0,1,12:35,OK,10002,1,0,9,f,NA,24,24,0,3,9,0,0,1,1,0,0,4,543 10000001,2,0,1,12:36,OK,10002,1,0,9,f,NA,24,24,0,3,9,2,1,1,3,1,3,2,611 10000002,1,0,4,12:19,PA,10003,1,1,7,f,NA,74,74,0,2,15,2,0,2,3,1,2,2,691 

code:

 dataset = pd.read_csv("data/AS/test_v2.csv") sc = SparkContext(conf=conf) sqlCtx = SQLContext(sc) sdf = sqlCtx.createDataFrame(dataset) 

And I have an error:

 TypeError: Can not merge type <class 'pyspark.sql.types.StringType'> and <class 'pyspark.sql.types.DoubleType'> 
+22
source share
5 answers

You need to make sure your pandas data columns are suitable for the spark type. If your pandas framework lists something like:

 pd.info() <class 'pandas.core.frame.DataFrame'> RangeIndex: 5062 entries, 0 to 5061 Data columns (total 51 columns): SomeCol 5062 non-null object Col2 5062 non-null object 

And you will get this error:

 df[['SomeCol', 'Col2']] = df[['SomeCol', 'Col2']].astype(str) 

Now make sure .astype(str) is actually the type you want these columns to be. Basically, when the Java base code tries to infer a type from an object in python, it uses some observations and makes an assumption if this assumption does not apply to all the data in the columns (columns) that it is trying to convert from pandas in order to fix this fails.

+20
source

Type-related errors can be avoided by overlaying the schema as follows:

note : a text file (test.csv) was created with the source data (as indicated above) and hypothetical column names ("col1", "col2", ..., "col25") were inserted.

 import pyspark from pyspark.sql import SparkSession import pandas as pd spark = SparkSession.builder.appName('pandasToSparkDF').getOrCreate() pdDF = pd.read_csv("test.csv") 

the contents of the pandas data frame:

 pdDF col1 col2 col3 col4 col5 col6 col7 col8 col9 col10 ... col16 col17 col18 col19 col20 col21 col22 col23 col24 col25 0 10000001 1 0 1 12:35 OK 10002 1 0 9 ... 3 9 0 0 1 1 0 0 4 543 1 10000001 2 0 1 12:36 OK 10002 1 0 9 ... 3 9 2 1 1 3 1 3 2 611 2 10000002 1 0 4 12:19 PA 10003 1 1 7 ... 2 15 2 0 2 3 1 2 2 691 

Next, create a diagram:

 from pyspark.sql.types import * mySchema = StructType([ StructField("Col1", LongType(), True)\ ,StructField("Col2", IntegerType(), True)\ ,StructField("Col3", IntegerType(), True)\ ,StructField("Col4", IntegerType(), True)\ ,StructField("Col5", StringType(), True)\ ,StructField("Col6", StringType(), True)\ ,StructField("Col7", IntegerType(), True)\ ,StructField("Col8", IntegerType(), True)\ ,StructField("Col9", IntegerType(), True)\ ,StructField("Col10", IntegerType(), True)\ ,StructField("Col11", StringType(), True)\ ,StructField("Col12", StringType(), True)\ ,StructField("Col13", IntegerType(), True)\ ,StructField("Col14", IntegerType(), True)\ ,StructField("Col15", IntegerType(), True)\ ,StructField("Col16", IntegerType(), True)\ ,StructField("Col17", IntegerType(), True)\ ,StructField("Col18", IntegerType(), True)\ ,StructField("Col19", IntegerType(), True)\ ,StructField("Col20", IntegerType(), True)\ ,StructField("Col21", IntegerType(), True)\ ,StructField("Col22", IntegerType(), True)\ ,StructField("Col23", IntegerType(), True)\ ,StructField("Col24", IntegerType(), True)\ ,StructField("Col25", IntegerType(), True)]) 

Note : True (implies a nullable value)

create a pyspark data frame:

 df = spark.createDataFrame(pdDF,schema=mySchema) 

confirm that the pandas data frame is now the pyspark data frame:

 type(df) 

output:

 pyspark.sql.dataframe.DataFrame 

Aside :

To answer Kate's comment below - to superimpose a general (string) diagram, you can do the following:

 df=spark.createDataFrame(pdDF.astype(str)) 
+21
source

I tried this with your data and it works:

 %pyspark import pandas as pd from pyspark.sql import SQLContext print sc df = pd.read_csv("test.csv") print type(df) print df sqlCtx = SQLContext(sc) sqlCtx.createDataFrame(df).show() 
+7
source

I got a similar error message once, in my case it happened because my pandas info frame contained NULL. I recommend trying and processing this in pandas before moving on to the spark (this solved the problem in my case).

0
source

I made this algorithm, it worked for my 10 pandas data frames

 from pyspark.sql.types import * # Auxiliar functions def equivalent_type(f): if f == 'datetime64[ns]': return DateType() elif f == 'int64': return LongType() elif f == 'int32': return IntegerType() elif f == 'float64': return FloatType() else: return StringType() def define_structure(string, format_type): try: typo = equivalent_type(format_type) except: typo = StringType() return StructField(string, typo) # Given pandas dataframe, it will return a spark dataframe. def pandas_to_spark(pandas_df): columns = list(pandas_df.columns) types = list(pandas_df.dtypes) struct_list = [] i = 0 for column, typo in zip(columns, types): struct_list.append(define_structure(column, typo)) p_schema = StructType(struct_list) return sqlContext.createDataFrame(pandas_df, p_schema) 

You can see it also in this essence.

To do this, you just need to call spark_df = pandas_to_spark(pandas_df)

0
source

All Articles