Scala, Erastothenes: Is there an easy way to replace a stream with an iteration?

I wrote a function that generates prime numbers (wikipedia: Erastothenes incremental sieve ). It returns a stream, but it also combines primary multiple value streams inside to mark upcoming composites. The definition is concise, functional, elegant and easy to understand, if I say so:

def primes(): Stream[Int] = { def merge(a: Stream[Int], b: Stream[Int]): Stream[Int] = { def next = a.head min b.head Stream.cons(next, merge(if (a.head == next) a.tail else a, if (b.head == next) b.tail else b)) } def test(n: Int, compositeStream: Stream[Int]): Stream[Int] = { if (n == compositeStream.head) test(n+1, compositeStream.tail) else Stream.cons(n, test(n+1, merge(compositeStream, Stream.from(n*n, n)))) } test(2, Stream.from(4, 2)) } 

But I get "java.lang.OutOfMemoryError: GC upper limit exceeded" when I try to create the 1000th number.

I have an alternative solution that returns an iterator over primes and uses a priority queue of tuples (plural, simple, used to generate several) inside to mark upcoming composites. It works well, but it takes about twice as much code, and I basically had to restart from scratch:

 import scala.collection.mutable.PriorityQueue def primes(): Iterator[Int] = { // Tuple (composite, prime) is used to generate a primes multiples object CompositeGeneratorOrdering extends Ordering[(Long, Int)] { def compare(a: (Long, Int), b: (Long, Int)) = b._1 compare a._1 } var n = 2; val composites = PriorityQueue(((n*n).toLong, n))(CompositeGeneratorOrdering) def advance = { while (n == composites.head._1) { // n is composite while (n == composites.head._1) { // duplicate composites val (multiple, prime) = composites.dequeue composites.enqueue((multiple + prime, prime)) } n += 1 } assert(n < composites.head._1) val prime = n n += 1 composites.enqueue((prime.toLong * prime.toLong, prime)) prime } Iterator.continually(advance) } 

Is there an easy way to translate code with threads to code using iterators? Or is there an easy way to make my first attempt more memory efficient?

It's easier to think about flows; I would prefer to start this path, and then, if necessary, configure my code.

+8
iterator scala stream out-of-memory primes
Jan 08
source share
4 answers

In your first code, you must delay the merge until the square of the prime number is seen among the candidates. This will significantly reduce the number of threads used, dramatically improving memory usage problems. To get the 1000th number, 7919, we only need to consider primes that do not exceed its square root, 88. These are only 23 primes / streams of their multiples, and not 999 (22 if we ignore events from the very beginning). For the 10,000th, this is the difference between 9,999 multiple streams and a total of 66. And for the 100,000th, only 189 is required.

The trick is to separate the primes consumed from the generated primes through a recursive call:

 def primes(): Stream[Int] = { def merge(a: Stream[Int], b: Stream[Int]): Stream[Int] = { def next = a.head min b.head Stream.cons(next, merge(if (a.head == next) a.tail else a, if (b.head == next) b.tail else b)) } def test(n: Int, q: Int, compositeStream: Stream[Int], primesStream: Stream[Int]): Stream[Int] = { if (n == q) test(n+2, primesStream.tail.head*primesStream.tail.head, merge(compositeStream, Stream.from(q, 2*primesStream.head).tail), primesStream.tail) else if (n == compositeStream.head) test(n+2, q, compositeStream.tail, primesStream) else Stream.cons(n, test(n+2, q, compositeStream, primesStream)) } Stream.cons(2, Stream.cons(3, Stream.cons(5, test(7, 25, Stream.from(9, 6), primes().tail.tail)))) } 

As an added bonus, there is no need to store prime squares like Long s. It will also be much faster and will have more complex algorithmic complexity (time and space), as this avoids a lot of unnecessary work. Initial testing shows that it works in approximately ~ n 1.5..1.6 empirical growth orders when producing up to n = 80,000 primes.

There is still an algorithmic problem here: the structure created here is still a linear left-dependent structure (((mults_of_2 + mults_of_3) + mults_of_5) + ...) , and the more frequently occurring streams are located deeper inside it (so the numbers have more levels to seep up). The right structure should be better, mults_of_2 + (mults_of_3 + (mults_of_5 + ...)) . Creating this tree should lead to a real improvement in time complexity (usually pushing it down to about ~ n 1.2..1.25 ). For a discussion of this, see this haskellwiki page .

The "real" imperative sieve of Eratosthenes usually occurs at about ~ n 1.1 (in n-prime numbers) and the optimal sieve for trial division at ~ n 1.40..1.45 . Your source code runs in about cubic time, or worse. The use of an imperative variable matrix is ​​usually the fastest operating by segments (aka Eratoshenes segmented sieve).

In the context of your second code, this is how this is achieved in Python .

+7
Jan 08 '14 at 9:35
source share
β€” -

I assume this is a bug in the current Stream implementation.

primes().drop(999).head works fine:

 primes().drop(999).head // Int = 7919 

You will get an OutOfMemoryError with a saved Stream as follows:

 val prs = primes() prs.drop(999).head // Exception in thread "main" java.lang.OutOfMemoryError: GC overhead limit exceeded 

The problem here is with the Cons class implementation : it contains not only the calculated tail , but also a function to calculate this tail . Even when tail calculated and the function is no longer needed!

In this case, the functions are extremely heavy, so you get an OutOfMemoryError even with 1000 functions stored.

We need to somehow abandon these functions.

Intuitive correction failed:

 val prs = primes().iterator.toStream prs.drop(999).head // Exception in thread "main" java.lang.OutOfMemoryError: GC overhead limit exceeded 

From iterator to Stream you get StreamIterator , with StreamIterator#toStream you get the initial heavy Stream .

Bypass

So, we have to convert it manually:

 def toNewStream[T](i: Iterator[T]): Stream[T] = if (i.hasNext) Stream.cons(i.next, toNewStream(i)) else Stream.empty val prs = toNewStream(primes().iterator) // Stream[Int] = Stream(2, ?) prs.drop(999).head // Int = 7919 
+9
Jan 08 '14 at 2:37
source share

Is there an easy way to translate code with threads to code using iterators? Or is there an easy way to make my first attempt more memory efficient?

@Wess Ness gave you an improved answer using Streams and gave the reasons why your code takes up as much memory and time as when adding threads before and a linear structure to the left, but no one answered completely to the second (or possibly the main) part of your the question of whether a true incremental Sieve of Eratosthenes can be realized using an Iterator.

Firstly, we should properly treat this algorithm with a right slope, from which your first code is a rough (left) example (since it prematurely adds all simple compound streams to merge operations), which is connected with Richard Byrd as in Melissa’s epilogue E. O'Neill is the final article on the incremental sieve of Eratosthenes .

Secondly, no, in this algorithm it is impossible to replace the Iterator for Stream, since it depends on moving along the stream without restarting the stream, and although you can access the iterator head (current position) using the following value (skipping overhead) to generate the rest of the iteration, since the stream requires the creation of a completely new iterator with an awful cost in memory and time. However, we can use Iterator to output the results of a sequence of primes to minimize memory usage and simplify the use of higher order iterator functions, as you will see in my code below.

Now Ness is waiting for you, although the principles of deferring the addition of simple composite threads to calculations until they are needed, which works well when you store them in a structure such as Priority Queue or HashMap, and even missed it by O'Neill, but for Richard Bird's algorithm, this is not necessary, since the future values ​​of the stream will not be available until needed, so they will not be saved if the threads are created with lazy construction (both lazy and left-handed). In fact, this algorithm does not even need to memorize and the overhead of the full stream, because each sequence of rejecting a composite number moves forward without reference to any past primes, except for one, a separate source of basic primes is required, which can be provided by a recursive call of the same algorithm .

For help, list Richard Bird's Haskell code as follows:

 primes = 2:([3..] 'minus' composites) where composites = union [multiples p | p <βˆ’ primes] multiples n = map (n*) [n..] (x:xs) 'minus' (y:ys) | x < y = x:(xs 'minus' (y:ys)) | x == y = xs 'minus' ys | x > y = (x:xs) 'minus' ys union = foldr merge [] where merge (x:xs) ys = x:merge' xs ys merge' (x:xs) (y:ys) | x < y = x:merge' xs (y:ys) | x == y = x:merge' xs ys | x > y = y:merge' (x:xs) ys 

In the following code, I simplified the minus function (called the minus StrtAt), since we do not need to create a completely new stream, but may include a compound subtraction operation with the generation of the original (in my case, only coefficients). I also simplified the function "union" (renamed it as "mrgMltpls")

Stream operations are implemented as non-memoizing generic Co Inductive Stream (CIS) as a general class, where the first field of the class is the value of the current position of the stream and the second is thunk (a null argument that returns the next value of the stream through the built-in closing arguments of another function).

 def primes(): Iterator[Long] = { // generic class as a Co Inductive Stream element class CIS[A](val v: A, val cont: () => CIS[A]) def mltpls(p: Long): CIS[Long] = { var px2 = p * 2 def nxtmltpl(cmpst: Long): CIS[Long] = new CIS(cmpst, () => nxtmltpl(cmpst + px2)) nxtmltpl(p * p) } def allMltpls(mps: CIS[Long]): CIS[CIS[Long]] = new CIS(mltpls(mps.v), () => allMltpls(mps.cont())) def merge(a: CIS[Long], b: CIS[Long]): CIS[Long] = if (av < bv) new CIS(av, () => merge(a.cont(), b)) else if (av > bv) new CIS(bv, () => merge(a, b.cont())) else new CIS(bv, () => merge(a.cont(), b.cont())) def mrgMltpls(mlps: CIS[CIS[Long]]): CIS[Long] = new CIS(mlps.vv, () => merge(mlps.v.cont(), mrgMltpls(mlps.cont()))) def minusStrtAt(n: Long, cmpsts: CIS[Long]): CIS[Long] = if (n < cmpsts.v) new CIS(n, () => minusStrtAt(n + 2, cmpsts)) else minusStrtAt(n + 2, cmpsts.cont()) // the following are recursive, where cmpsts uses oddPrms and // oddPrms uses a delayed version of cmpsts in order to avoid a race // as oddPrms will already have a first value when cmpsts is called to generate the second def cmpsts(): CIS[Long] = mrgMltpls(allMltpls(oddPrms())) def oddPrms(): CIS[Long] = new CIS(3, () => minusStrtAt(5L, cmpsts())) Iterator.iterate(new CIS(2L, () => oddPrms())) {(cis: CIS[Long]) => cis.cont()} .map {(cis: CIS[Long]) => cis.v} } 

The above code generates the 100,000th right (1299709) on ideone in about 1.3 seconds with about 0.36 seconds overhead and has empirical computational complexity of up to 600,000 primes around 1.43. The memory usage is slightly higher than that used by the program code.

The above code can be implemented using Scala's built-in threads, but there is an overhead of performance and memory (with a constant coefficient) that this algorithm does not require. Using Streams would mean that you could use them directly without additional Iterator generation code, but since this is only used for the final output of the sequence, it is not very expensive.

To implement basic tree addition, as Will Ness suggested, you need to add the "steam" function and connect it to the "mrgMltpls" function:

 def primes(): Iterator[Long] = { // generic class as a Co Inductive Stream element class CIS[A](val v: A, val cont: () => CIS[A]) def mltpls(p: Long): CIS[Long] = { var px2 = p * 2 def nxtmltpl(cmpst: Long): CIS[Long] = new CIS(cmpst, () => nxtmltpl(cmpst + px2)) nxtmltpl(p * p) } def allMltpls(mps: CIS[Long]): CIS[CIS[Long]] = new CIS(mltpls(mps.v), () => allMltpls(mps.cont())) def merge(a: CIS[Long], b: CIS[Long]): CIS[Long] = if (av < bv) new CIS(av, () => merge(a.cont(), b)) else if (av > bv) new CIS(bv, () => merge(a, b.cont())) else new CIS(bv, () => merge(a.cont(), b.cont())) def pairs(mltplss: CIS[CIS[Long]]): CIS[CIS[Long]] = { val tl = mltplss.cont() new CIS(merge(mltplss.v, tl.v), () => pairs(tl.cont())) } def mrgMltpls(mlps: CIS[CIS[Long]]): CIS[Long] = new CIS(mlps.vv, () => merge(mlps.v.cont(), mrgMltpls(pairs(mlps.cont())))) def minusStrtAt(n: Long, cmpsts: CIS[Long]): CIS[Long] = if (n < cmpsts.v) new CIS(n, () => minusStrtAt(n + 2, cmpsts)) else minusStrtAt(n + 2, cmpsts.cont()) // the following are recursive, where cmpsts uses oddPrms and // oddPrms uses a delayed version of cmpsts in order to avoid a race // as oddPrms will already have a first value when cmpsts is called to generate the second def cmpsts(): CIS[Long] = mrgMltpls(allMltpls(oddPrms())) def oddPrms(): CIS[Long] = new CIS(3, () => minusStrtAt(5L, cmpsts())) Iterator.iterate(new CIS(2L, () => oddPrms())) {(cis: CIS[Long]) => cis.cont()} .map {(cis: CIS[Long]) => cis.v} } 

The above code generates the 100,000th right (1299709) on ideone in about 0.75 seconds with about 0.37 seconds overhead and has empirical computational complexity to the 1000,000th first (15485863) about 1.09 (5.13 seconds ) The memory usage is slightly higher than that used by the program code.

Please note that the codes above are fully functional in the sense that no mutable state is used, but the Bird algorithm (or even folding the tree) is not as fast as using the priority queue or HashMap for large ranges, since the number of operations to process tree merging has higher computational complexity than the overhead of log n in the priority queue or the linear (amortized) performance of the HashMap (although there is a large overhead for hashing processing I am a constant factor, so the advantage is really not visible until some really large ranges are used).

The reason these codes use so little memory is because CIS streams are formulated without a constant reference to the beginning of the streams, so the streams are garbage collected because they are used, leaving only the minimum number of basic simple components that, as Will Ness explained, very small - only 546 basic simple composite numeric streams to generate the first million primes up to 15485863, each placeholder takes only 10 bytes (eight for a long number, eight for a 64-bit function link with two more eight bytes for a pointer to the closure arguments, and a few more bytes for the service messages of functions and classes for a common placeholder for a stream of 40 bytes or a total of not more than 20 kilobytes to generate a sequence for a million primes).

+5
Apr 02 '15 at 14:02
source share

If you just want an endless stream of primes, this is the most elegant way, in my opinion:

 def primes = { def sieve(from : Stream[Int]): Stream[Int] = from.head #:: sieve(from.tail.filter(_ % from.head != 0)) sieve(Stream.from(2)) } 
0
Jan 08 '14 at 3:23
source share



All Articles