The amount of data flow in a java stream

I am reading a file through the java nio interface directly to the stream. This launches asynchronous HTTP requests and processes them in the future. Every 10,000 records, I upload this result to the server, and I clear the records, so this frees up memory consumption.
I start with an array of bytes that remains in memory all the time. The http ( commons CloseableHttpAsyncClient) client runs async requests, so they all start at once at the beginning.
Is there a way to limit the lambda stream so that I can limit the number of lines that are processed at the same time? Thus, I control my memory.

new BufferedReader(new InputStreamReader(new ByteArrayInputStream(file)))
    .lines()
    .map(line -> CsvLine.create(line))
    .filter(line -> !line.isHeader())
    .forEach(line -> getResult(line, new FutureCallback<HttpResponse>() {
        @Override
        public void completed(HttpResponse response) {
            try {
                result.addLine(response);
            } catch (IOException e) {
                LOGGER.error("IOException, cannot write to server", e);
                todo.set(-1); // finish in error
            } finally {
                todo.decrementAndGet();
            }
       }

       @Override
       public void failed(Exception ex) {
           handleError();
       }

       @Override
       public void cancelled() {
           handleError();
       }
    }
));
+4
source share
1 answer

, . :

Semaphore semaphore = new Semaphore(MAX_CONCURRENT_REQUESTS, true); // false if FIFO is not important
new BufferedReader(new InputStreamReader(new ByteArrayInputStream(file)))
.lines()
        .map(line -> CsvLine.create(line))
        .filter(line -> !line.isHeader())
        .forEach(line -> {
            try {
                if (!semaphore.tryAcquire(ASYNC_REQUEST_TIMEOUT, TimeUnit.MILLISECONDS)) {
                    handleTimeout();
                } else {
                    getResult(line, new FutureCallback<HttpResponse>() {
                        @Override
                        public void completed(HttpResponse response) {
                            try {
                                result.addLine(response);
                            } catch (IOException e) {
                                LOGGER.error("IOException, cannot write to server", e);
                                todo.set(-1); // finish in error
                            } finally {
                                todo.decrementAndGet();
                                semaphore.release();
                            }
                        }

                        @Override
                        public void failed(Exception ex) {
                            handleError();
                            semaphore.release();
                        }

                        @Override
                        public void cancelled() {
                            handleError();
                            semaphore.release();
                        }
                    }
                    );
                }
            } catch (InterruptedException e) {
                // handle appropriately
            }

        });
+1

All Articles