How to create a request body for Python Elasticsearch mSearch

I am trying to run a multi-user query on an Elasticsearch Python client. I can correctly perform a unique search, but I can’t figure out how to format the search request. According to the documentation, the request body must be formatted as:

Query definitions (metadata query definition pairs), as either a string separated by a newline or a dicts sequence to serialize (one per line).

What is the best way to create this request body? I searched for examples but didn't seem to find one.

+4
source share
4 answers

( BulkAPI), , python Elasticsearch:

:

def msearch():
    es = get_es_instance()

    search_arr = []
    # req_head
    search_arr.append({'index': 'my_test_index', 'type': 'doc_type_1'})
    # req_body
    search_arr.append({"query": {"term" : {"text" : "bag"}}, 'from': 0, 'size': 2})

    # req_head
    search_arr.append({'index': 'my_test_index', 'type': 'doc_type_2'})
    # req_body
    search_arr.append({"query": {"match_all" : {}}, 'from': 0, 'size': 2})

    request = ''
    for each in search_arr:
        request += '%s \n' %json.dumps(each)

    # as you can see, you just need to feed the <body> parameter,
    # and don't need to specify the <index> and <doc_type> as usual 
    resp = es.msearch(body = request)

, req_unit. req_unit :

request_header(search control about index_name, optional mapping-types, search-types etc.)\n
reqeust_body(which involves query detail about this request)\n

, , :

def msearch():
    es = get_es_instance()

    request = []

    req_head = {'index': 'my_test_index', 'type': 'doc_type_1'}
    req_body = {
        'query': {'term': {'text' : 'bag'}}, 
        'from' : 0, 'size': 2  }
    request.extend([req_head, req_body])

    req_head = {'index': 'my_test_index', 'type': 'doc_type_2'}
    req_body = {
        'query': {'range': {'price': {'gte': 100, 'lt': 300}}},
        'from' : 0, 'size': 2  }
    request.extend([req_head, req_body])

    resp = es.msearch(body = request)

, . msearch.

+8

elasticsearch-dsl, MultiSearch.

:

from elasticsearch_dsl import MultiSearch, Search

ms = MultiSearch(index='blogs')

ms = ms.add(Search().filter('term', tags='python'))
ms = ms.add(Search().filter('term', tags='elasticsearch'))

responses = ms.execute()

for response in responses:
    print("Results for query %r." % response.search.query)
    for hit in response:
        print(hit.title)
+3

! - ...

query_list = ""
es = ElasticSearch("myurl")
for obj in my_list:
    query = constructQuery(name)
    query_count += 1
    query_list += json.dumps({})
    query_list += json.dumps(query)
    if query_count <= 19:
        query_list += "\n"
    if query_count == 20:
        es.msearch(index = "m_index", body = query_list)

I brought an error by adding an index twice. Even when using the Python client, you still have to include the part of the index described in the source documents. Now works!

0
source

Here is what I came up with. I use the same document type and index, so I optimized the code to run multiple queries with the same header:

from elasticsearch import Elasticsearch
from elasticsearch import exceptions as es_exceptions
import json

RETRY_ATTEMPTS = 10
RECONNECT_SLEEP_SECS = 0.5

def msearch(es_conn, queries, index, doc_type, retries=0):
    """
    Es multi-search query
    :param queries: list of dict, es queries
    :param index: str, index to query against
    :param doc_type: str, defined doc type i.e. event
    :param retries: int, current retry attempt
    :return: list, found docs
    """
    search_header = json.dumps({'index': index, 'type': doc_type})
    request = ''
    for q in queries:
        # request head, body pairs
        request += '{}\n{}\n'.format(search_header, json.dumps(q))
    try:
        resp = es_conn.msearch(body=request, index=index)
        found = [r['hits']['hits'] for r in resp['responses']]
    except (es_exceptions.ConnectionTimeout, es_exceptions.ConnectionError,
            es_exceptions.TransportError):  # pragma: no cover
        logging.warning("msearch connection failed, retrying...")  # Retry on timeout
        if retries > RETRY_ATTEMPTS:  # pragma: no cover
            raise
        time.sleep(RECONNECT_SLEEP_SECS)
        found = msearch(queries=queries, index=index, retries=retries + 1)
    except Exception as e:  # pragma: no cover
        logging.critical("msearch error {} on query {}".format(e, queries))
        raise
    return found

es_conn = Elasticsearch()
queries = []
queries.append(
    {"min_score": 2.0, "query": {"bool": {"should": [{"match": {"name.tokenized": {"query": "batman"}}}]}}}
)
queries.append(
    {"min_score": 1.0, "query": {"bool": {"should": [{"match": {"name.tokenized": {"query": "ironman"}}}]}}}
)
queries.append(
    {"track_scores": True, "min_score": 9.0, "query":
        {"bool": {"should": [{"match": {"name": {"query": "not-findable"}}}]}}}
)
q_results = msearch(es_conn, queries, index='pipeliner_current', doc_type='event')

This may be what some of you are looking for if you want to make multiple queries in the same index type and document type.

0
source

All Articles