Environment
DB: Sybase Logstash: 2.2.0 with JDBC plugin, Elasticsearch exit plugin
SQL query:
select res.id as 'res.id', res.name as 'res.name', tag.name as 'tag.name'
from Res res, ResTags rt, Tags tag
where res.id *= rt.resrow and rt.tagid *= tag.id
SQL result:
res.id | res.name | tag.name
0 | result0 | null
0 | result0 | tagA
1 | result1 | tagA
1 | result1 | tagB
2 | result2 | tagA
2 | result2 | tagC
Display indices:
{
"mappings": {
"res": {
"properties": {
"id": { "type": "long"},
"name": { "type": "string" },
"tags": {
"type": "nested",
"properties": { "tagname": { "type": "string" }}
}
}
}
}
Conf File:
input {
jdbc {
jdbc_driver_library => "jtds-1.3.1.jar"
jdbc_driver_class => "Java::net.sourceforge.jtds.jdbc.Driver"
jdbc_connection_string => "jdbc:jtds:sybase://hostname.com:1234/schema"
jdbc_user => "george"
jdbc_password => "monkey"
jdbc_fetch_size => 100
statement_filepath => "/home/george/sql"
}
}
output {
elasticsearch {
action => "update"
index => "myres"
document_type => "res"
document_id => "%{res.id}"
script_lang => "groovy"
hosts => [ "my.other.host.com:5921" ]
upsert => ' {
"id" : %{res.id},
"name" : "%{res.name}",
"tags" :[{ "tagname": "%{tag.name}" }]
}'
script => '
if (ctx._source.res.tags.containsValue(null)) {
// if null has been added replace it with actual value
cts._source.res.tags = [{"tagname": "%{tag.name}" }];
else {
// if you find the tag, then do nothing
if (ctx._source.res.tags.containsValue("%{tag.name}")) {}
else {
// if the value you try to add is not null
if (%{tag.name} != null)
// add it as a new object into the tag array
ctx._source.res.tags += {"tagname": "%{tag.name}"};
}
}
'
}
}
The GOAL is to add multiple rows returned from the database to ES by combining the tags as new objects (this is a simplified example, so add_tag and filters do not do this work since I have a json structure deeper than 2 level (nested nested, etc.))
The desired result after mass loading in ES will be:
{
"hits": {
"total": 3,
"max_score": 1,
"hits": [ {
"_index": "myres",
"_type": "res",
"_id": 0,
"_score": 1,
"_source": {
"res": {
"id":0,
"name": "result0",
"tags": [{"tagname": "tagA"}],
"@version": "2",
"@timestamp": "2016-xx-yy..."
}
},{
"_index": "myres",
"_type": "res",
"_id": 1,
"_score": 1,
"_source": {
"res": {
"id":1,
"name": "result1",
"tags": [{"tagname": "tagA"},{"tagname": "tagB"}],
"@version": "2",
"@timestamp": "2016-xx-yy..."
}
}{
"_index": "myres",
"_type": "res",
"_id": 2,
"_score": 1,
"_source": {
"res": {
"id":2,
"name": "result2",
"tags": [{"tagname": "tagA"},{"tagname": "tagC"],
"@version": "2",
"@timestamp": "2016-xx-yy..."
}
}
}
...
QUESTION: if in the conf section, the script output is not commented out, the error below appears. If the script is not included, then only the original tags are imported (as expected), and in the second - no.
, script elasticsearch.
ERROR:
[400] {"error":"ActionRequestValidationException[Validation Failed:
1: script or doc is missing;
2: script or doc is missing;
3: script or doc is missing;],"status":400]} {:class=> ... bla bla ...}
- , doc_as_upsert = > true . / db.
- , river jdbc ES , eithe