Java lambdas, stateless lambdas and parallel execution

When I tried to learn Java lambdas, I came across one of the articles (listed below), where in the thread API restrictions section he states that: "Labels with active states are usually not a problem when executed sequentially, but when the thread is parallelized, it breaks" . He then gives this code as an example of problems due to the execution order:

List<String> ss = ...; List<String> result = ...; Stream<String> stream = ss.stream(); stream.map(s -> { synchronized (result) { if (result.size() < 10) { result.add(s); } } }) .forEach(e -> { }); 

I see how it would be non-deterministic if it were parallelized, but what I don't see is how you fix it with guaranteeless lambdas - is there something inherent that is not deterministic about adding things to list in parallel. An example of what a six-year-old child with a hat could understand, perhaps in C #, would be greatly appreciated.

Link to the original article http://blog.hartveld.com/2013/03/jdk-8-33-stream-api.html

+7
java lambda parallel-processing java-8
source share
3 answers

I know where you are hinting at your question, and I will do my best to explain.

Consider a list of input data, consisting of 8 elements:

[1, 2, 3, 4, 5, 6, 7, 8]

And suppose that the threads will parallellize it as follows, in reality they do not, the exact process of parallelization is quite difficult to understand. But for now, suppose they would divide the size into two until there are two elements left.

The branching will look like this:

  • First Division:

    [1, 2, 3, 4]
    [5, 6, 7, 8]

  • The second division:

    [1, 2]
    [3, 4]
    [5, 6]
    [7, 8]

Now we have four pieces that (in our theory) will be processed by four different threads that do not know each other.
This can be fixed by synchronizing on some external resource, but then you lose the benefits of parallelization, so we need to assume that we are not synchronizing, and when we are not synchronizing, other threads will not see what other threads are done, so our result it will be rubbish.

Now to the question of where you are asking about statelessness, how can it be handled correctly in parallel? How to add items that are processed in parallel in the correct order to the list?

First, suppose a simple display function in which you map lambda i -> i + 10 , and then print it using System.out::println in foreach.

Now after the second division, the following will happen:

[1, 2] -> [11, 12] -> { System.out.println(11); System.println(12); }
[3, 4] -> [13, 14] -> { System.out.println(13); System.println(14); }
[5, 6] -> [15, 16] -> { System.out.println(15); System.println(16); }
[7, 8] -> [17, 18] -> { System.out.println(17); System.println(18); }

There is no guarantee on the order, except that all elements processed by the same thread (internal state, not rely) are processed in order.

If you want to process them in order, you need to use forEachOrdered , which ensures that all threads will work in the correct order and you will not lose too many parallelization advantages because of this, since it only applies to the final state.

To find out how you can add parelllized items to the list, look at this using Collectors.toList() , which provides methods for:

  • Create a new list.
  • Adding a value to the list.
  • Combining two lists.

Now after the second division, the following will happen:

For every four threads, it will do the following (only one thread is displayed here):

  • We had [1, 2] .
  • We list it on [11, 12] .
  • We create an empty List<Integer> .
  • We will add 11 to the list.
  • We will add 12 to the list.

Now all the threads have done this, and we have four two-item lists.

Now the following merges occur in the order specified:

  • [11, 12] ++ [13, 14] = [11, 12, 13, 14]
  • [15, 16] ++ [17, 18] = [15, 16, 17, 18]
  • Finally [11, 12, 13, 14] ++ [15, 16, 17, 18] = [11, 12, 13, 14, 15, 16, 17, 18]

And therefore, the resulting list is ordered and the mapping is done in parallel. Now you must also understand why parallelization requires a higher minimum, but only two elements, since creating new lists and merging become too expensive.

Hopefully now you will understand why stream operations must be idle in order to take full advantage of parallelization.

+9
source share

reducing this problem, it looks like it just finds the first 10 elements in the stream. and separately do foreach on the whole stream. s.limit(10).collect(...); and s.forEach(...); . also calling the map does not actually return anything, so I doubt it compiles.

0
source share

This was a good example from @skiwi, let me see if I can add a bit.

The term β€œordered” in parallel computing usually means returning the result in the same order as in a sequential process. That is, call sequential.method () or parallel.method (), the result will look the same.

The problem withforEachOrdered () is that the structure cannot create unique objects for each task result and order them at the end without stopping. Therefore, it treats the stream as a balanced tree. The framework creates a ConcurrentHashMap with parent / child associations. It starts the left child first, then the right child, and then the parent, forcing the relationship "earlier", when the processing should be parallel. From ordered results to ordered sequential processing.

What you need to do is streamline the results, not process them in the order of the failures. Create objects containing a part of the array for each final division (here we use the second division @skiwis), the results of the processing should be filled with calculation and serial number for each object. Let threads handle objects at the same time. When all threads are completed, order the objects by serial number and exit.

0
source share

All Articles