How to access denseVector values ​​in PySpark

I train the LogisticRegression model in pyspark (ML lib), and the result of the prediction is a DataFrame (called predictions ), and I select only three columns from it, as shown below.

 prunedPredictions = predictions.select(predictions["prediction"], predictions["probability"], predictions["label"]) 

The probability column is of type DenseVector . I am trying to create another DataFrame in which the probability column is replaced with one of its elements (first elements). I tried different things but nobody works. Here is one of them:

 prunedPredictionsDF = prunedPredictions.map(lambda r: Row(prediction = r[0],probability = r[1].apply(1),label = r[2])) 

or

 prunedPredictionsDF = prunedPredictions.map(lambda r: Row(prediction = r[0],probability = r[1][1],label = r[2])) 

How can i do this? I get the following error message.

 Traceback (most recent call last): File "<stdin>", line 1, in <module> File "/usr/lib/spark/python/pyspark/rdd.py", line 1317, in first rs = self.take(1) File "/usr/lib/spark/python/pyspark/rdd.py", line 1299, in take res = self.context.runJob(self, takeUpToNumLeft, p) File "/usr/lib/spark/python/pyspark/context.py", line 916, in runJob port = self._jvm.PythonRDD.runJob(self._jsc.sc(), mappedRDD._jrdd, partitions) File "/usr/lib/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py", line 538, in __call__ File "/usr/lib/spark/python/pyspark/sql/utils.py", line 36, in deco return f(*a, **kw) File "/usr/lib/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py", line 300, in get_return_value py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.runJob. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 235.0 failed 4 times, most recent failure: Lost task 0.3 in stage 235.0 (TID 61305, anp-r03wn03.c03.hadoop.td.com): org.apache.spark.api.python.PythonException: Traceback (most recent call last): File "/usr/lib/spark/python/pyspark/worker.py", line 111, in main process() File "/usr/lib/spark/python/pyspark/worker.py", line 106, in process serializer.dump_stream(func(split_index, iterator), outfile) File "/usr/lib/spark/python/pyspark/serializers.py", line 263, in dump_stream vs = list(itertools.islice(iterator, batch)) File "/usr/lib/spark/python/pyspark/rdd.py", line 1295, in takeUpToNumLeft yield next(iterator) File "/usr/lib/spark/python/pyspark/serializers.py", line 139, in load_stream yield self._read_with_length(stream) File "/usr/lib/spark/python/pyspark/serializers.py", line 164, in _read_with_length return self.loads(obj) File "/usr/lib/spark/python/pyspark/serializers.py", line 422, in loads return pickle.loads(obj) File "/usr/lib/spark/python/pyspark/sql/types.py", line 728, in _parse_datatype_json_string return _parse_datatype_json_value(json.loads(json_string)) File "/usr/lib/spark/python/pyspark/sql/types.py", line 748, in _parse_datatype_json_value return _all_complex_types[tpe].fromJson(json_value) File "/usr/lib/spark/python/pyspark/sql/types.py", line 525, in fromJson return StructType([StructField.fromJson(f) for f in json["fields"]]) File "/usr/lib/spark/python/pyspark/sql/types.py", line 425, in fromJson _parse_datatype_json_value(json["type"]), File "/usr/lib/spark/python/pyspark/sql/types.py", line 750, in _parse_datatype_json_value return UserDefinedType.fromJson(json_value) File "/usr/lib/spark/python/pyspark/sql/types.py", line 663, in fromJson m = __import__(pyModule, globals(), locals(), [pyClass]) File "/usr/lib/spark/python/pyspark/mllib/__init__.py", line 25, in <module> import numpy ImportError: ('No module named numpy', <function _parse_datatype_json_string at 0xcd9d70>, (u'{"type":"struct","fields":[{"name":"prediction","type":"double","nullable":true,"metadata":{}},{"name":"probability","type":{"type":"udt","class":"org.apache.spark.mllib.linalg.VectorUDT","pyClass":"pyspark.mllib.linalg.VectorUDT","sqlType":{"type":"struct","fields":[{"name":"type","type":"byte","nullable":false,"metadata":{}},{"name":"size","type":"integer","nullable":true,"metadata":{}},{"name":"indices","type":{"type":"array","elementType":"integer","containsNull":false},"nullable":true,"metadata":{}},{"name":"values","type":{"type":"array","elementType":"double","containsNull":false},"nullable":true,"metadata":{}}]}},"nullable":true,"metadata":{}},{"name":"label","type":"double","nullable":true,"metadata":{"ml_attr":{"vals":["0","1"],"type":"nominal","name":"label"}}}]}',)) at org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:138) at org.apache.spark.api.python.PythonRDD$$anon$1.<init>(PythonRDD.scala:179) at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:97) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297) at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) at org.apache.spark.scheduler.Task.run(Task.scala:88) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) ... 1 more 
+2
apache-spark pyspark pyspark-sql
source share
1 answer

In Pyspark you can access items in the DenseVector using an index.

For example:

 from pyspark.mllib.linalg import DenseVector a = DenseVector([1.0,2.0,3.0,4.0,5.0]) print(a[1]) # 2.0 

Alternatively, you can iterate by DenseVector :

 for e in a: print(e) # 1.0 # 2.0 # 3.0 # 4.0 # 5.0 

Another way is to use DenseVector.values , so you can get all the values ​​in the form of a python list .

+4
source share

All Articles