How to call Elastic Search for the current queue load?

With a wide ES request, I get

Failed to execute [ org.elasticsearch.action.search.SearchRequest@59e634e2 ] lastShard [true] org.elasticsearch.common.util.concurrent.EsRejectedExecutionException: rejected execution (queue capacity 1000) on org.elasticsearch.search. action.SearchServiceTransportAction$23@75bd024b at org.elasticsearch.common.util.concurrent.EsAbortPolicy.rejectedExecution(EsAbortPolicy.java:62) at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823) at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1369) at org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor.execute(EsThreadPoolExecutor.java:79) at org.elasticsearch.search.action.SearchServiceTransportAction.execute(SearchServiceTransportAction.java:551) at org.elasticsearch.search.action.SearchServiceTransportAction.sendExecuteQuery(SearchServiceTransportAction.java:228) at org.elasticsearch.action.search.type.TransportSearchQueryThenFetchAction$AsyncAction.sendExecuteFirstPhase(TransportSearchQueryThenFetchAction.java:83) 

on a very regular basis.

Now my plan is to pause query requests until the queue load drops below x . You can request a client for stats

 client.admin().cluster().threadPool().stats().iterator(); 

But since my client is not a given node (I suppose why), I return queue=0 , and the node server throws the above error.

I know why this works, and I know how to update the parameter, but it just postpones this error and creates others ...

How to set cluster nodes that their loading in the queue?

PS: I use Java Api

What I tried without a request is an empty string indicating another attempt, unless otherwise indicated

 //Nodes stats final NodesStatsResponse nodesStatsResponse = client.admin().cluster().prepareNodesStats().execute().actionGet(); final NodeStats nodeStats = nodesStatsResponse.getNodes()[0]; final String nodeId = nodeStats.getNode().getId(); // need this later on // same as before, but with explicit NodesStatsRequest (with id) final NodesStatsResponse response = client.admin().cluster().nodesStats(new NodesStatsRequest(nodeId)).actionGet(); final NodeStats[] nodeStats2 = response.getNodes(); for (NodeStats nodeStats3 : nodeStats2) { Stats stats = nodeStats3.getThreadPool().iterator().next(); } // Cluster? final ClusterStatsRequest clusterStatsRequest = new ClusterStatsRequestBuilder(client.admin().cluster()).request(); final ClusterStatsResponse clusterStatsResponse = client.admin().cluster().clusterStats(clusterStatsRequest).actionGet(); final ClusterStatsNodes clusterStatsNodes = clusterStatsResponse.getNodesStats(); // Nodes info? final NodesInfoResponse infoResponse = client.admin().cluster().nodesInfo(new NodesInfoRequest(nodeId)).actionGet();// here final NodeInfo[] nodeInfos = infoResponse.getNodes(); for (final NodeInfo nodeInfo : nodeInfos) { final ThreadPoolInfo info = nodeInfo.getThreadPool(); final Iterator<Info> infoIterator = info.iterator(); while (infoIterator.hasNext()) { final Info realInfo = infoIterator.next(); SizeValue sizeValue = realInfo.getQueueSize(); // is no == null, then (ΒΏhappens?, was expecting a nullpointer, but Thread disappeared) if (sizeValue == null) continue; // normal queue size, no load (oddly found 1000 (expected), and one of 200 in one node?) final long queueSize = sizeValue.getSingles(); } } 

The problem is that some processes must be called instantly (for example, user queries), while others may wait if the database is too busy (background processes). Preferably, I would assign a certain part of the queue to processes that are on immediate requests, and the other part to background processes (but I did not see this option).

Update It seems that I did not expect that you could get a request overload with a single bulk request when the total amount of individual requests exceeds 1000 (when x fragments or indexes x are divided by 1000 / x for the number of searches). So bulking ,, not option if you cannot make a single request. Therefore, when you aim at 700 search results right away (taking into account the above statement), you need to know if there are more than 300 elements in the queue, since then it will throw things.

Summarizing:

Suppose the load per call is maximum bulkrequest , so I cannot combine requests. How then can I start pausing requests until . Elasticsearch begins to throw the above exception. So I can pause part of my application, but not another? If I know that the queue is half full, say, the background process should sleep for a while. How to find out (approximate) queue loading?

+4
source share
2 answers

The way you try to look at using a queue is incorrect, because you are not looking at the correct statistics.

Take a look at this piece of code:

  final NodesStatsResponse response = client.admin().cluster().prepareNodesStats().setThreadPool(true).execute().actionGet(); final NodeStats[] nodeStats2 = response.getNodes(); for (NodeStats nodeStats3 : nodeStats2) { ThreadPoolStats stats = nodeStats3.getThreadPool(); if (stats != null) for (ThreadPoolStats.Stats threadPoolStat : stats) { System.out.println("node `" + nodeStats3.getNode().getName() + "`" + " has pool `" + threadPoolStat.getName() + "` with current queue size " + threadPoolStat.getQueue()); } } 

First of all, you need setThreadPool(true) to be able to return thread pool statistics, otherwise it will be null .

Secondly, you need ThreadPoolStats not ThreadPoolInfo , which is for thread pool settings.

So this is your second attempt, but incomplete. 1000 you saw is the setting itself (maximum queue size), not the actual load.

+4
source

I hope this is not an answer, source https://www.elastic.co/guide/en/elasticsearch/guide/current/_monitoring_individual_nodes.html#_threadpool_section

Mass deviations

If you are going to meet a queue rejection, it will most likely be triggered by bulk indexing requests. It is easy to send many bulk requests to Elasticsearch using parallel import processes. More is better, right?

In reality, each cluster has a certain limit at which it cannot be swallowed. Once this threshold is crossed, the queue will quickly fill up, and new volumes will be rejected.

It's good. Deviations in line are a useful form of back pressure. They tell you that your cluster has maximum capacity, which is much better than attaching data to a queue in memory. An increase in queue size does not increase performance; he just hides the problem. If your cluster can process only 10,000 documents per second, it does not matter if the queue is 100 or 10,000,000 - your cluster can still process only 10,000 documents per second.

The queue simply hides the performance problem and carries the real risk of data loss. Everything in line is, by definition, not yet processed. If node crashes, all of these requests are lost forever. In addition, the queue nourishes a lot of memory, and this is not ideal.

It is much better to handle queues in your application by gracefully controlling back pressure from a full queue. When you get the mass you should take the following steps:

Suspend the import flow for 3-5 seconds. Extract rejected actions from the mass response because it is likely that many of the actions were successful. A massive response will tell you what succeeded and which were rejected. Submit a new bulk request only with rejected action. Repeat step from step 1 if deviations are detected again. Using this procedure, your code naturally adapts to the load of your cluster and naturally recedes.

Deviations are not errors: they simply mean that you should try again later.

In particular, When you receive bulk rejections, you should take these steps I don't like. We must be able to solve emerging problems at forehand.

0
source

All Articles