How to handle Java thread with many default threads?

By default, Java threads are handled by a common thread pool , which is constructed with default parameters. As mentioned in another question , you can configure these defaults by specifying your own pool or by setting the system parameter java.util.concurrent.ForkJoinPool.common.parallelism .

However, I was not able to increase the number of threads assigned to process the threads by either of these two methods. As an example, consider the program below, which processes the list of IP addresses contained in the file specified in its first argument and displays the allowed addresses. Running this in a file with approximately 13,000 unique IP addresses, I see that with the help of Oracle Java Mission Control there are only 16 threads. Of these, only five ForkJoinPool workers. However, this particular task could benefit from many other streams, because streams spend most of their time waiting for DNS responses. So my question is: how can I increase the number of threads used?

I tried the program in three environments; this is the number of threads negotiated by the OS.

  • Java SE Runtime Environment build 1.8.0_73-b02 on an 8-core computer running Windows 7: 17 threads
  • Java SE Runtime Environment build 1.8.0_66-b17 on a dual-core computer running OS X Darwin 15.2.0: 23 threads
  • openjdk version 1.8.0_72 on a 24-core machine using FreeBSD 11.0: 44 threads
 import java.io.IOException; import java.net.InetAddress; import java.net.UnknownHostException; import java.nio.file.Files; import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; import java.util.concurrent.ForkJoinPool; /** Resolve IP addresses in file args[0] using 100 threads */ public class Resolve100 { /** Resolve the passed IP address into a name */ static String addressName(String ipAddress) { try { return InetAddress.getByName(ipAddress).getHostName(); } catch (UnknownHostException e) { return ipAddress; } } public static void main(String[] args) { Path path = Paths.get(args[0]); ForkJoinPool fjp = new ForkJoinPool(100); try { fjp.submit(() -> { try { Files.lines(path) .parallel() .map(line -> addressName(line)) .forEach(System.out::println); } catch (IOException e) { System.err.println("Failed: " + e); } }).get(); } catch (Exception e) { System.err.println("Failed: " + e); } } } 
+6
source share
1 answer

There are two problems with your approach. First, using custom FJP will not change the maximum number of individual tasks created by the streaming API, as it is defined as follows :

 static final int LEAF_TARGET = ForkJoinPool.getCommonPoolParallelism() << 2; 

Thus, even if you use a custom pool, the number of parallel tasks will be limited by commonPoolParallelism * 4 . (in fact, this is not a hard limit, but a goal, but in many cases the number of tasks is equal to this number).

The above problem can be fixed using the system property java.util.concurrent.ForkJoinPool.common.parallelism , but here you are faced with another problem: Files.lines very poorly parallelized. See this question for more details. In particular, for 13,000 input lines, the maximum possible speed is 3.17x (provided that each line processing takes about the same time), even if you have 100 processors. My StreamEx library provides a workaround for this (create a stream using StreamEx.ofLines(path).parallel() ). Another possible solution is to read the lines of the file in the List sequentially and then create a parallel stream from it:

 Files.readAllLines(path).parallelStream()... 

This will work along with the system property. However, in general, the Stream API is not suitable for parallel processing when tasks include I / O. A more flexible solution is to use CompletableFuture for each row:

 ForkJoinPool fjp = new ForkJoinPool(100); List<CompletableFuture<String>> list = Files.lines(path) .map(line -> CompletableFuture.supplyAsync(() -> addressName(line), fjp)) .collect(Collectors.toList()); list.stream().map(CompletableFuture::join) .forEach(System.out::println); 

Thus, you do not need to configure the system property and use separate pools for individual tasks.

+6
source

All Articles