The documentation for Stream.flatMap reads:
Each displayed stream is closed after its contents have been placed in this stream.
In other words, no additional actions are required to close threads normally. However, since only processed threads are closed, you should not be looking forward to creating threads without knowing whether they will be processed later by the thread:
private static void foo(String path, String... files) throws IOException { Arrays.stream(files).flatMap(file-> { try { return Files.lines(Paths.get(path, file)) .onClose(() -> System.out.println("Closed " + file)); } catch(IOException ex) { throw new UncheckedIOException(ex); } }) .parallel() .distinct() .sorted() .limit(10) .forEachOrdered(System.out::println); }
By creating substreams in flatMap , it ensures that each will only be created if the stream will process it. Thus, this solution will close all substreams even without an external Stream inside the try-with-resource statement. The disadvantage of this solution is that it will not be cleared like a try-with-resource statement in an exceptional case, and even putting an external thread into a try-with-resource statement will not fix this.
I consider this missing exception - security - a bug in the Stream implementation that needs to be fixed.
Trying to fix this in your code to get a safe closing behavior that acts like a try-with-resource statement, that is, not only guarantees that all resources are closed, but also that no exceptions caused by close invocations are swallowed, complicated . This involves re-checking what Files.lines(…) does inside, and then adds the missing bits:
static Closeable join(Closeable a, Closeable b) { return a==null? b: ()->{ try(Closeable c=a) { b.close(); }}; } private static void foo(String path, String... files) throws IOException { Closeable[] cl={ null }; try(Closeable c=()->{ if(cl[0]!=null) cl[0].close(); }) { Stream.Builder<Stream<String>> b=Stream.builder(); for(String file: files) { BufferedReader br=Files.newBufferedReader(Paths.get(path, file)); cl[0]=join(cl[0], br); b.add(br.lines()); } b.build() .parallel() .flatMap(Function.identity()) .distinct() .sorted() .limit(10) .forEachOrdered(System.out::println); } }
This will close all resources, regardless of whether and where there were exceptions, during thread processing, closing, or initialization (it will close the entire resource that has been allocated so far).
If you want to test the behavior, you can use the following code:
private static void foo2(String path, String... files) throws IOException { Closeable[] cl={ null }; try(Closeable c=()->{ if(cl[0]!=null) cl[0].close(); }) { Stream.Builder<Stream<String>> b=Stream.builder(); for(String file: files) { if(Math.random()>0.7) throw new RuntimeException("simulated opening failure"); BufferedReader br=Files.newBufferedReader(Paths.get(path, file)); System.out.println("opened "+file); br=new BufferedReader(br) { @Override public void close() throws IOException { System.out.println("closing "+file); super.close(); throw new RuntimeException("simulated closing "+file+" failure"); } }; cl[0]=join(cl[0], br); b.add(br.lines()); } b.build() .parallel() .flatMap(Function.identity()) .distinct() .sorted() .limit(10) .forEachOrdered(System.out::println); throw new RuntimeException("simulated processing failure"); } }