After doing some research at a local Starbucks, here's what I came up with:
, ( "old_index" ), ... ( "new_index" ), (, STRING vs INT , , ..).
- ( "old_index" ) ( "new_index" ). , :
1.
https://www.elastic.co/guide/en/elasticsearch/reference/current/search-request-scroll.html
, , . .. : " , , , .
Java API , : https://www.elastic.co/guide/en/elasticsearch/client/java-api/current/scrolling.html
2. . . Ingest Java API: https://www.elastic.co/guide/en/elasticsearch/client/java-api/current/bulk.html#_using_bulk_processor
ho, ...
1. , ""
SearchResponse scrollResp = client.prepareSearch("old_index")
.setSearchType(SearchType.SCAN)
.setScroll(new TimeValue(60000))
.setQuery(QueryBuilders.matchAllQuery())
.setSize(100).execute().actionGet();
2. .
int BULK_ACTIONS_THRESHOLD = 1000;
int BULK_CONCURRENT_REQUESTS = 1;
BulkProcessor bulkProcessor = BulkProcessor.builder(client, new BulkProcessor.Listener() {
@Override
public void beforeBulk(long executionId, BulkRequest request) {
logger.info("Bulk Going to execute new bulk composed of {} actions", request.numberOfActions());
}
@Override
public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
logger.info("Executed bulk composed of {} actions", request.numberOfActions());
}
@Override
public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
logger.warn("Error executing bulk", failure);
}
}).setBulkActions(BULK_ACTIONS_THRESHOLD).setConcurrentRequests(BULK_CONCURRENT_REQUESTS).setFlushInterval(TimeValue.timeValueMillis(5)).build();
3. 1 , mo-
while (true) {
scrollResp = client.prepareSearchScroll(scrollResp.getScrollId()).setScroll(new TimeValue(600000)).execute().actionGet();
if (scrollResp.getHits().getHits().length == 0) {
logger.info("Closing the bulk processor");
bulkProcessor.close();
break;
}
for (SearchHit hit: scrollResp.getHits()) {
IndexRequest request = new IndexRequest("new_index", hit.type(), hit.id());
Map source = ((Map) ((Map) hit.getSource()));
request.source(source);
bulkProcessor.add(request);
}
}
4. , , . , . , , , . : ElasticSeach JAVA API
client.admin().indices().prepareAliases().addAlias("new_index", "alias_name").get();
,
client.admin().indices().prepareAliases().removeAlias("old_index", "alias_name").execute().actionGet();
client.admin().indices().prepareDelete("old_index").execute().actionGet();