Let's start with some import and variables that will be required for further processing:
import org.apache.spark._
import org.apache.spark.graphx._
import org.apache.spark.rdd.RDD
import scala.util.Random
import org.apache.spark.HashPartitioner
val nPartitions: Integer = ???
val n: Long = ???
val p: Double = ???
RDD , . :
sc.parallelize(0L to n)
node id, . :
sc.parallelize(0L to n)
.map((_, None))
.partitionBy(new HashPartitioner(nPartitions))
.keys
- RDD . :
def genNodeIds(nPartitions: Int, n: Long)(i: Int) = {
(0L until n).filter(_ % nPartitions == i).toIterator
}
:
val empty = sc.parallelize(Seq.empty[Int], nPartitions)
val ids = empty.mapPartitionsWithIndex((i, _) => genNodeIds(nPartitions, n)(i))
( , ):
require(ids.distinct.count == n)
:
def genEdgesForId(p: Double, n: Long, random: Random)(i: Long) = {
(i + 1 until n).filter(_ => random.nextDouble < p).map(j => Edge(i, j, ()))
}
def genEdgesForPartition(iter: Iterator[Long]) = {
val random = new Random(new java.security.SecureRandom())
iter.flatMap(genEdgesForId(p, n, random))
}
val edges = ids.mapPartitions(genEdgesForPartition)
, :
val graph = Graph.fromEdges(edges, ())