How to close a variable number of threads?

I create several threads that I should access in parallel (or, possibly, in parallel). I know how to do try-with-resources when the amount of resources is fixed at compile time, but what if the amount of resources is determined by a parameter?

I have something like this:

private static void foo(String path, String... files) throws IOException { @SuppressWarnings("unchecked") Stream<String>[] streams = new Stream[files.length]; try { for (int i = 0; i < files.length; i++) { final String file = files[i]; streams[i] = Files.lines(Paths.get(path, file)) .onClose(() -> System.out.println("Closed " + file)); } // do something with streams Stream.of(streams) .parallel() .flatMap(x -> x) .distinct() .sorted() .limit(10) .forEach(System.out::println); } finally { for (Stream<String> s : streams) { if (s != null) { s.close(); } } } } 
+5
source share
2 answers

You can write an AutoCloseable compound to control the dynamic amount of AutoCloseable :

 import java.util.ArrayList; import java.util.List; public class CompositeAutoclosable<T extends AutoCloseable> implements AutoCloseable { private final List<T> components= new ArrayList<>(); public void addComponent(T component) { components.add(component); } public List<T> getComponents() { return components; } @Override public void close() throws Exception { Exception e = null; for (T component : components) { try { component.close(); } catch (Exception closeException) { if (e == null) { e = closeException; } else { e.addSuppressed(closeException); } } } if (e != null) { throw e; } } } 

and you can use it in your method:

 private static void foo(String path, String... files) throws Exception { try (CompositeAutoclosable<Stream<String>> streams = new CompositeAutoclosable<Stream<String>>()) { for (int i = 0; i < files.length; i++) { final String file = files[i]; streams.addComponent(Files.lines(Paths.get(path, file)) .onClose(() -> System.out.println("Closed " + file))); } streams.getComponents().stream() .parallel() .flatMap(x -> x) .distinct() .sorted() .limit(10) .forEach(System.out::println); } } 
+5
source

The documentation for Stream.flatMap reads:

Each displayed stream is closed after its contents have been placed in this stream.

In other words, no additional actions are required to close threads normally. However, since only processed threads are closed, you should not be looking forward to creating threads without knowing whether they will be processed later by the thread:

 private static void foo(String path, String... files) throws IOException { Arrays.stream(files).flatMap(file-> { try { return Files.lines(Paths.get(path, file)) .onClose(() -> System.out.println("Closed " + file)); } catch(IOException ex) { throw new UncheckedIOException(ex); } }) .parallel() .distinct() .sorted() .limit(10) .forEachOrdered(System.out::println); } 

By creating substreams in flatMap , it ensures that each will only be created if the stream will process it. Thus, this solution will close all substreams even without an external Stream inside the try-with-resource statement. The disadvantage of this solution is that it will not be cleared like a try-with-resource statement in an exceptional case, and even putting an external thread into a try-with-resource statement will not fix this.

I consider this missing exception - security - a bug in the Stream implementation that needs to be fixed.

Trying to fix this in your code to get a safe closing behavior that acts like a try-with-resource statement, that is, not only guarantees that all resources are closed, but also that no exceptions caused by close invocations are swallowed, complicated . This involves re-checking what Files.lines(…) does inside, and then adds the missing bits:

 static Closeable join(Closeable a, Closeable b) { return a==null? b: ()->{ try(Closeable c=a) { b.close(); }}; } private static void foo(String path, String... files) throws IOException { Closeable[] cl={ null }; try(Closeable c=()->{ if(cl[0]!=null) cl[0].close(); }) { Stream.Builder<Stream<String>> b=Stream.builder(); for(String file: files) { BufferedReader br=Files.newBufferedReader(Paths.get(path, file)); cl[0]=join(cl[0], br); b.add(br.lines()); } b.build() .parallel() .flatMap(Function.identity()) .distinct() .sorted() .limit(10) .forEachOrdered(System.out::println); } } 

This will close all resources, regardless of whether and where there were exceptions, during thread processing, closing, or initialization (it will close the entire resource that has been allocated so far).

If you want to test the behavior, you can use the following code:

 private static void foo2(String path, String... files) throws IOException { Closeable[] cl={ null }; try(Closeable c=()->{ if(cl[0]!=null) cl[0].close(); }) { Stream.Builder<Stream<String>> b=Stream.builder(); for(String file: files) { if(Math.random()>0.7) throw new RuntimeException("simulated opening failure"); BufferedReader br=Files.newBufferedReader(Paths.get(path, file)); System.out.println("opened "+file); br=new BufferedReader(br) { @Override public void close() throws IOException { System.out.println("closing "+file); super.close(); throw new RuntimeException("simulated closing "+file+" failure"); } }; cl[0]=join(cl[0], br); b.add(br.lines()); } b.build() .parallel() .flatMap(Function.identity()) .distinct() .sorted() .limit(10) .forEachOrdered(System.out::println); throw new RuntimeException("simulated processing failure"); } } 
+1
source

All Articles