Creating Custom Spark RDD in Python

Is it possible to extend Spark RDD in Python to add custom operators? If this is not possible, how can you wrap Scala code for a class that extends RDD, for example, here: http://blog.madhukaraphatak.com/extending-spark-api/

Edit: I'm trying to create a new RDD, say PersonRDD, and add a set of new statements to PersonRDD, for example. PersonRDD.computeMedianIncome (). According to the link below, it is not trivial to do this in Python. However, since this is an old thread, I was wondering if there were any new updates. If not, I would like to use Scala for this, but I'm not sure how to call a class from Python using Py4J (mail-archives.us.apache.org/mod_mbox/spark-user/201308.mbox / ...)

Any advice or help is appreciated.

Mandy

+4
source share
2 answers

Calculating the exact median in a distributed environment requires some effort, so let's say you want something like the square of all the values ​​in the RDD. Call this method squaresand assume that it should work as follows:

assert rdd.squares().collect() == rdd.map(lambda x: x * x).collect()

1. Change the definition pyspark.RDD:

from pyspark import RDD

def squares(self):
    return self.map(lambda x: x * x)

RDD.squares = squares
rdd = sc.parallelize([1, 2, 3])
assert rdd.squares().collect() == [1, 4, 9]

Note. If you change the class definition, each instance will gain access to squares.

2. Create a subclass of RDD:

class RDDWithSquares(RDD):
    def squares(self):
        return self.map(lambda x: x * x)

rdd = sc.parallelize([1, 2, 3])
rdd.__class__ = RDDWithSquares # WARNING: see a comment below

Assigning a class is a dirty hack, so in practice you need to create the RDD properly (see, for example, context.parallelize ).

3. Add method to instance

import types

rdd = sc.parallelize([1, 2, 3])
# Reusing squares function defined above
rdd.squares = types.MethodType(squares, rdd)

Renouncement

, , , .

, , . - , , , currying pipes .

from toolz import pipe
pipe(
    sc.parallelize([1, 2, 3]),
    squares,
    lambda rdd: rdd.collect())
+4

, RDD , , . , , , , , RDD, , RDD, . :

from pyspark.rdd import RDD, PipelinedRDD

class CustomRDD(RDD):
    def __init__(self, rdd, first=True):
        if first:
            rdd = custom_parser(rdd)
        self._jrdd = rdd._jrdd
        self.is_cached = rdd.is_cached
        self.is_checkpointed = rdd.is_checkpointed
        self.ctx = rdd.ctx
        self._jrdd_deserializer = rdd._jrdd_deserializer
        self._id = rdd._id
        self.partitioner = rdd.partitioner

    def mapPartitionsWithIndex(self, f, preservesPartition=False):
        return CustomRDD(PipelinedRDD(self, f, preservesPartition), False)

    def union(self, other):
        return WebtrendsRDD(super(WebtrendsRDD, self).union(other), False)

    def custom_method(self):
        return CustomRDD(self.filter(lambda x: x.has_property()), False)

mapPartitionsWithIndex RDD, , , , CustomRDD, .

0

All Articles