Conditionally add operation to Java 8 thread

I am wondering if I can add an operation to the stream based on some condition set outside the stream. For example, I want to add a restriction operation to a stream if my limit variable is not -1 .

Currently, my code looks like this, but I still do not need other examples of streams that are used in such a way where the Stream object is reassigned to the result of an intermediate operation, applied by itself:

 // Do some stream stuff stream = stream.filter(e -> e.getTimestamp() < max); // Limit the stream if (limit != -1) { stream = stream.limit(limit); } // Collect stream to list stream.collect(Collectors.toList()); 

As indicated in this postoverflow post , the filter is not actually applied until the terminal operation is called. Since I reassign the value of the thread before invoking the terminal operation, is the above code still the correct way to use Java 8 threads?

+6
source share
3 answers

There is no semantic difference between a chaining chaining chain and a series of calls that store intermediate return values. Thus, the following code fragments are equivalent:

 a = object.foo(); b = a.bar(); c = b.baz(); 

and

 c = object.foo().bar().baz(); 

In any case, each method is called as a result of a previous call. But in the latter case, the intermediate results are not saved, but are lost on the next call. In the case of the thread API, intermediate results should not be used after you have called the next method on it, so chaining is the natural way to use the thread, since it essentially ensures that you do not call more than one method on the returned link.

However, you should not keep the link to the stream if you obey the contract not to use the returned link more than once. Using it the same way as in your question, that is, rewriting a variable with the result of the next call, you will also make sure that you do not call more than one method on the returned link, thus, its correct use. Of course, this only works with intermediate results of the same type, so when you use map or flatMap , getting a stream with a different reference type, you cannot overwrite the local variable. Then you must be careful not to use the old local variable again, but as said, until you use it after the next call, there is nothing wrong with the intermediate storage.

Sometimes you need to save it, for example.

 try(Stream<String> stream = Files.lines(Paths.get("myFile.txt"))) { stream.filter(s -> !s.isEmpty()).forEach(System.out::println); } 

Note that the code is equivalent to the following alternatives:

 try(Stream<String> stream = Files.lines(Paths.get("myFile.txt")).filter(s->!s.isEmpty())) { stream.forEach(System.out::println); } 

and

 try(Stream<String> srcStream = Files.lines(Paths.get("myFile.txt"))) { Stream<String> tmp = srcStream.filter(s -> !s.isEmpty()); // must not be use variable srcStream here: tmp.forEach(System.out::println); } 

They are equivalent because forEach always called as a result of filter , which is always called as a result of Files.lines , and it does not matter which result the final operation close() is called for, since closing affects the entire stream.


To put it in one sentence, the way you use it is correct.


I even prefer to do it this way, since not binding the limit operation when you do not want to apply the restriction is the purest way of expressing your intentions. It is also worth noting that the proposed alternatives may work in many cases, but they are not semantically equivalent:

 .limit(condition? aLimit: Long.MAX_VALUE) 

assumes that the maximum number of elements you ever encounter is Long.MAX_VALUE , but streams can have more elements than this, they can even be infinite.

 .limit(condition? aLimit: list.size()) 

when the stream source is list , breaks the lazy stream estimate. In principle, a variable flow source can be legally arbitrarily changed up to the moment the terminal begins to operate. The result will reflect all changes made up to this point. When you add an intermediate operation that includes list.size() , that is, the actual size of the list at that point, subsequent changes applied to the collection between that point and the terminal operation may turn this value into a different value than the intended "really not "limit".

Compare with "Non Interference" in the API documentation :

For good flow sources, the source can be changed before the terminal starts to work, and these changes will be reflected in the elements covered. For example, consider the following code:

 List<String> l = new ArrayList(Arrays.asList("one", "two")); Stream<String> sl = l.stream(); l.add("three"); String s = sl.collect(joining(" ")); 

First, a list is created that consists of two lines: "one"; and two. ”Then a stream is created from this list. Next, the list is changed by adding a third line:β€œ three. ”Finally, the elements of the stream are assembled and connected together. Since the list was changed before the start of the terminal collection operation, the result will be the lineβ€œ one two three ” .

Of course, this is a rare case with an angle, as usual, the programmer will formulate the entire stream pipeline without changing the original collection between them. However, different semantic data remains, and if you ever enter such an angular case, it can be very difficult to find an error.

In addition, since they are not equivalent, the stream API will never recognize these values ​​as "really unlimited." Even specifying Long.MAX_VALUE implies that the stream implementation must track the number of items processed to ensure that the limit is met. Thus, not adding a limit operation can have a significant performance advantage over adding a limit with a number that the programmer does not expect to exceed.

+10
source

I think your first line should be:

 stream = stream.filter(e -> e.getTimestamp() < max); 

to use the stream returned by the filter in subsequent operations, rather than the original stream.

+5
source

There are two ways to do this:

 // Do some stream stuff List<E> results = list.stream() .filter(e -> e.getTimestamp() < max); .limit(limit > 0 ? limit : list.size()) .collect(Collectors.toList()); 

OR

 // Do some stream stuff stream = stream.filter(e -> e.getTimestamp() < max); // Limit the stream if (limit != -1) { stream = stream.limit(limit); } // Collect stream to list List<E> results = stream.collect(Collectors.toList()); 

Since this is functional programming, you should always work on the result of each function. You should specifically avoid changing anything in this programming style and treat everything as if it were immutable, if possible.

Since I reassign the value of the thread before invoking the terminal operation, is the above code still the correct way to use Java 8 threads?

It should work, however it is read as a combination of imperative and functional coding. I suggest writing it as a fixed stream according to my first answer.

+5
source

All Articles