Here is the finished implementation from my project. I have an abstract separator that handles splitting into batches of a fixed size and allows you to efficiently parallelize the processing of any stream source based on I / O:
import static java.util.Spliterators.spliterator; import java.util.Comparator; import java.util.Spliterator; import java.util.function.Consumer; public abstract class FixedBatchSpliteratorBase<T> implements Spliterator<T> { private final int batchSize; private final int characteristics; private long est; public FixedBatchSpliteratorBase(int characteristics, int batchSize, long est) { characteristics |= ORDERED; if ((characteristics & SIZED) != 0) characteristics |= SUBSIZED; this.characteristics = characteristics; this.batchSize = batchSize; this.est = est; } public FixedBatchSpliteratorBase(int characteristics, int batchSize) { this(characteristics, batchSize, Long.MAX_VALUE); } public FixedBatchSpliteratorBase(int characteristics) { this(characteristics, 64, Long.MAX_VALUE); } @Override public Spliterator<T> trySplit() { final HoldingConsumer<T> holder = new HoldingConsumer<>(); if (!tryAdvance(holder)) return null; final Object[] a = new Object[batchSize]; int j = 0; do a[j] = holder.value; while (++j < batchSize && tryAdvance(holder)); if (est != Long.MAX_VALUE) est -= j; return spliterator(a, 0, j, characteristics()); } @Override public Comparator<? super T> getComparator() { if (hasCharacteristics(SORTED)) return null; throw new IllegalStateException(); } @Override public long estimateSize() { return est; } @Override public int characteristics() { return characteristics; } static final class HoldingConsumer<T> implements Consumer<T> { Object value; @Override public void accept(T value) { this.value = value; } } }
And here is the opencsv separator based on it:
public class CsvSpliterator extends FixedBatchSpliteratorBase<String[]> { private final CSVReader cr; CsvSpliterator(CSVReader cr, int batchSize) { super(NONNULL, batchSize); if (cr == null) throw new NullPointerException("CSVReader is null"); this.cr = cr; } public CsvSpliterator(CSVReader cr) { this(cr, 100); } @Override public void forEachRemaining(Consumer<? super String[]> action) { if (action == null) throw new NullPointerException(); uncheckRun(() -> { for (String[] row; (row = cr.readNext()) != null;) action.accept(row); }); } @Override public boolean tryAdvance(Consumer<? super String[]> action) { if (action == null) throw new NullPointerException(); return uncheckCall(() -> { final String[] row = cr.readNext(); if (row == null) return false; action.accept(row); return true; }); } }
where uncheckRun and uncheckCall are
public static <T> T uncheckCall(Callable<T> callable) { try { return callable.call(); } catch (Exception e) { return sneakyThrow(e); } } public static void uncheckRun(RunnableExc r) { try { r.run(); } catch (Exception e) { sneakyThrow(e); } } public static <T> T sneakyThrow(Throwable e) { return Util.<RuntimeException, T>sneakyThrow0(e); } @SuppressWarnings("unchecked") private static <E extends Throwable, T> T sneakyThrow0(Throwable t) throws E { throw (E)t; }
Using:
import static java.util.stream.StreamSupport.stream; .... final CSVReader cr = new CSVReader(new InputStreamReader(yourInputStream), separator, '"'); return stream(new CsvSpliterator(cr), true).onClose(() -> uncheckRun(cr::close));
Marko topolnik
source share