Skip to content
Advertisement

ElasticSearch how to aggregation the “copy_to” field

I need to group by 9 fileds and get the count for each group in ElasticSearch, the orignal code use the “Script” and the performance is bad so i need to optimize it. I managed to create a new field and use “copy_to”, but when I aggregate with the new filed i found some problem.

I use the ‘srcIp‘ and ‘dstIp‘ fields as test, the copy_to field is ‘aggCondition‘. Here is the mapping:

PUT /test_index
{
  "settings": {
    "number_of_replicas": 0,
    "number_of_shards": 1
  },
  "mappings": {
      "dynamic_templates": [
    {
      "set_copy_to": {
        "match": "^(src|dst).+",
        "match_pattern": "regex",
        "mapping": {
          "copy_to": "aggCondition",
          "fields": {
            "keyword": {
              "ignore_above": 256,
              "type": "keyword"
            }
          },
          "type": "text"
        }
      }
    }
  ]
  }
}

Then I add some data to it

{
  "srcIp":"192.0.0.1",
  "dstIp":"192.0.1.1"
}
{
  "srcIp":"192.0.1.1",
  "dstIp":"192.0.2.1"
}
{
  "srcIp":"192.0.2.1",
  "dstIp":"192.0.0.1"
}

Then I see the mappings in the kibana and it looks like this:

{
  "mappings": {
    "_doc": {
      "dynamic_templates": [
        {
          "set_copy_to": {
            "match": "^(src|dst).+",
            "match_pattern": "regex",
            "mapping": {
              "copy_to": "aggCondition",
              "fields": {
                "keyword": {
                  "ignore_above": 256,
                  "type": "keyword"
                }
              },
              "type": "text"
            }
          }
        }
      ],
      "properties": {
        "aggCondition": {
          "type": "text",
          "fields": {
            "keyword": {
              "type": "keyword",
              "ignore_above": 256
            }
          }
        },
        "dstIp": {
          "type": "text",
          "fields": {
            "keyword": {
              "type": "keyword",
              "ignore_above": 256
            }
          },
          "copy_to": [
            "aggCondition"
          ]
        },
        "srcIp": {
          "type": "text",
          "fields": {
            "keyword": {
              "type": "keyword",
              "ignore_above": 256
            }
          },
          "copy_to": [
            "aggCondition"
          ]
        }
      }
    }
  }
}

Then I aggregate use the new field ‘aggCondition’:

GET /test_index/_search
{
  "aggs": {
    "Ips": {
      "terms": {
        "field": "aggCondition.keyword"
      }
    }
  }
}

The result is

  "aggregations" : {
    "Ips" : {
      "doc_count_error_upper_bound" : 0,
      "sum_other_doc_count" : 0,
      "buckets" : [
        {
          "key" : "192.0.0.1",
          "doc_count" : 2
        },
        {
          "key" : "192.0.1.1",
          "doc_count" : 2
        },
        {
          "key" : "192.0.2.1",
          "doc_count" : 2
        }
      ]
    }
  }

But what I expect is like

  "aggregations" : {
    "Ips" : {
      "doc_count_error_upper_bound" : 0,
      "sum_other_doc_count" : 0,
      "buckets" : [
        {
          "key" : "[192.0.0.1 192.0.1.1]",
          "doc_count" : 1
        },
        {
          "key" : "[192.0.1.1 192.0.2.1]",
          "doc_count" : 1
        },
        {
          "key" : "[192.0.2.1 192.0.0.1]",
          "doc_count" : 1
        }
      ]
    }
  }

What can I do to get my expected result or is there other way to aggregate multi field efficently?

Advertisement

Answer

dynamic_templates and copy_to is not the way to go in your case. You’d be better off dynamically computing a new field that indexes the src/dst IP pairs. You can achieve this using an ingest pipeline with an append and join processor to create the new field.

PUT _ingest/pipeline/ip-pipeline
{
  "processors": [
    {
      "append": {
        "field": "srcDst",
        "value": ["{{{srcIp}}}", "{{{dstIp}}}"]
      }
    },
    {
      "join": {
        "field": "srcDst",
        "separator": "-"
      }
    }
  ]
}

Then when you index a new document, you can specify this pipeline and the new field will be created:

PUT my-index/_doc/1?pipeline=ip-pipeline
{
  "srcIp":"192.0.0.1",
  "dstIp":"192.0.1.1"
}

Your indexed document will look like this:

{
  "srcIp":"192.0.0.1",
  "dstIp":"192.0.1.1",
  "srcDst": "192.0.0.1-192.0.1.1"
}

And then you can run your aggregation query on that new srcDst field and get the result you expect.

User contributions licensed under: CC BY-SA
1 People found this is helpful
Advertisement