Creating an array in Executor in Spark and merging in RDD

I am moving from MPI based systems to Apache Spark. I need to do the following in Spark.

Suppose I have peaks n. I want to create a list of edges from these vertices n. An edge is just a set of two integers (u, v), no attributes are required.

However, I want to create them in parallel independently of each other in each artist. Therefore, I want to create arrays with edges Pindependently for PSpark Executors. Each array can have different sizes and depends on the vertices, so I also need the identifier of the executor from 0to n-1. Then I want to have a global array of RDD edges.

In MPI, I would create an array in each processor using the processor rank. How to do this in Spark, especially using the library GraphX?

Therefore, my main goal is to create an array of edges in each artist and combine them into one RDD.

First, I try to use one modified version of the Erdos model - Renyi. As a parameter, I have only the number of nodes n and the probability p.

Assume that the contractor ishould process nodes from 101to 200. For any node, say node, 101it will create edges from 101to 102 -- nwith probability p. After each performer creates a dedicated rib, I create an instance of GraphX EdgeRDDand VertexRDD. So my plan is to create lists of edges independently of each other in each artist and combine them into RDD.

+2
source share
1 answer

Let's start with some import and variables that will be required for further processing:

import org.apache.spark._
import org.apache.spark.graphx._
import org.apache.spark.rdd.RDD
import scala.util.Random
import org.apache.spark.HashPartitioner

val nPartitions: Integer = ???
val n: Long = ??? 
val p: Double = ???

RDD , . :

sc.parallelize(0L to n)

node id, . :

sc.parallelize(0L to n)
  .map((_, None))
  .partitionBy(new HashPartitioner(nPartitions))
  .keys

- RDD . :

def genNodeIds(nPartitions: Int, n: Long)(i: Int) = {
  (0L until n).filter(_ % nPartitions == i).toIterator
}

:

val empty = sc.parallelize(Seq.empty[Int], nPartitions)
val ids = empty.mapPartitionsWithIndex((i, _) => genNodeIds(nPartitions, n)(i))

( , ):

require(ids.distinct.count == n) 

:

def genEdgesForId(p: Double, n: Long, random: Random)(i: Long) = {
  (i + 1 until n).filter(_ => random.nextDouble < p).map(j => Edge(i, j, ()))
}

def genEdgesForPartition(iter: Iterator[Long]) = {
  // It could be an overkill but better safe than sorry
  // Depending on your requirement it could worth to
  // consider using commons-math
  // https://commons.apache.org/proper/commons-math/userguide/random.html
  val random = new Random(new java.security.SecureRandom())
  iter.flatMap(genEdgesForId(p, n, random))
}

val edges = ids.mapPartitions(genEdgesForPartition)

, :

val graph = Graph.fromEdges(edges, ())
+4

All Articles