Mozdef / tests /mozdef_util /query_models /test_aggregation.py
ineso22's picture
Upload folder using huggingface_hub
7c89ed7 verified
#!/usr/bin/env python
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at https://mozilla.org/MPL/2.0/.
# Copyright (c) 2017 Mozilla Corporation
from mozdef_util.query_models import SearchQuery, Aggregation, TermMatch, ExistsMatch
from tests.unit_test_suite import UnitTestSuite
class TestAggregation(UnitTestSuite):
def test_simple_aggregation_source_field(self):
events = [
{"test": "value", "source": "somesource1"},
{"test": "value", "source": "anothersource1"},
{"test": "value", "source": "anothersource1"},
{"test": "value", "note": "think"},
{"test": "value", "source": "somesource1 anothersource1"},
]
for event in events:
self.populate_test_object(event)
self.refresh(self.event_index_name)
search_query = SearchQuery()
search_query.add_must(TermMatch('test', 'value'))
search_query.add_aggregation(Aggregation('source'))
results = search_query.execute(self.es_client)
assert list(results['aggregations'].keys()) == ['source']
assert list(results['aggregations']['source'].keys()) == ['terms']
assert len(results['aggregations']['source']['terms']) == 3
assert list(results['aggregations']['source']['terms'][0].keys()) == ['count', 'key']
assert results['aggregations']['source']['terms'][0]['count'] == 2
assert results['aggregations']['source']['terms'][0]['key'] == 'anothersource1'
assert results['aggregations']['source']['terms'][1]['count'] == 1
assert results['aggregations']['source']['terms'][1]['key'] == 'somesource1'
assert results['aggregations']['source']['terms'][2]['count'] == 1
assert results['aggregations']['source']['terms'][2]['key'] == 'somesource1 anothersource1'
def test_simple_aggregation_note_field(self):
events = [
{"test": "value", "note": "abvc"},
{"test": "value", "note": "abvc"},
{"test": "value", "note": "think"},
{"test": "value", "summary": "think"},
{"test": "value", "note": "abvc space line"},
]
for event in events:
self.populate_test_object(event)
self.refresh(self.event_index_name)
search_query = SearchQuery()
search_query.add_must(TermMatch('test', 'value'))
search_query.add_aggregation(Aggregation('note'))
results = search_query.execute(self.es_client)
assert list(results['aggregations'].keys()) == ['note']
assert list(results['aggregations']['note'].keys()) == ['terms']
assert len(results['aggregations']['note']['terms']) == 3
assert list(results['aggregations']['note']['terms'][0].keys()) == ['count', 'key']
assert results['aggregations']['note']['terms'][0]['count'] == 2
assert results['aggregations']['note']['terms'][0]['key'] == 'abvc'
assert results['aggregations']['note']['terms'][1]['count'] == 1
assert results['aggregations']['note']['terms'][1]['key'] == 'abvc space line'
assert results['aggregations']['note']['terms'][2]['count'] == 1
assert results['aggregations']['note']['terms'][2]['key'] == 'think'
def test_multiple_aggregations(self):
events = [
{"test": "value", "note": "abvc"},
{"test": "value", "note": "abvc"},
{"test": "value", "note": "think"},
{"test": "value", "summary": "think"},
]
for event in events:
self.populate_test_object(event)
self.refresh(self.event_index_name)
search_query = SearchQuery()
search_query.add_must(TermMatch('test', 'value'))
search_query.add_aggregation(Aggregation('note'))
search_query.add_aggregation(Aggregation('test'))
results = search_query.execute(self.es_client)
aggregation_keys = sorted(results['aggregations'].keys())
assert aggregation_keys == ['note', 'test']
assert list(results['aggregations']['note'].keys()) == ['terms']
assert len(results['aggregations']['note']['terms']) == 2
assert list(results['aggregations']['note']['terms'][0].keys()) == ['count', 'key']
assert results['aggregations']['note']['terms'][0]['count'] == 2
assert results['aggregations']['note']['terms'][0]['key'] == 'abvc'
assert results['aggregations']['note']['terms'][1]['count'] == 1
assert results['aggregations']['note']['terms'][1]['key'] == 'think'
assert list(results['aggregations']['test'].keys()) == ['terms']
assert len(results['aggregations']['test']['terms']) == 1
assert list(results['aggregations']['test']['terms'][0].keys()) == ['count', 'key']
assert results['aggregations']['test']['terms'][0]['count'] == 4
assert results['aggregations']['test']['terms'][0]['key'] == 'value'
def test_aggregation_non_existing_term(self):
events = [
{"test": "value", "note": "abvc"},
{"test": "value", "note": "abvc"},
{"test": "value", "note": "think"},
{"test": "value", "summary": "think"},
]
for event in events:
self.populate_test_object(event)
self.refresh(self.event_index_name)
search_query = SearchQuery()
search_query.add_must(TermMatch('test', 'value'))
search_query.add_aggregation(Aggregation('example'))
results = search_query.execute(self.es_client)
assert sorted(results.keys()) == ['aggregations', 'hits', 'meta']
assert len(results['hits']) == 4
assert list(results['aggregations'].keys()) == ['example']
assert list(results['aggregations']['example'].keys()) == ['terms']
assert results['aggregations']['example']['terms'] == []
def test_aggregation_multiple_layers(self):
events = [
{
"test": "value",
"details": {"ip": "127.0.0.1"},
},
{
"test": "value",
"details": {"ip": "127.0.0.1"},
},
{
"test": "value",
"details": {"ip": "192.168.1.1"},
},
]
for event in events:
self.populate_test_object(event)
self.refresh(self.event_index_name)
search_query = SearchQuery()
search_query.add_must(TermMatch('test', 'value'))
search_query.add_aggregation(Aggregation('details.ip'))
results = search_query.execute(self.es_client)
assert list(results['aggregations'].keys()) == ['details.ip']
assert list(results['aggregations']['details.ip'].keys()) == ['terms']
assert len(results['aggregations']['details.ip']['terms']) == 2
assert results['aggregations']['details.ip']['terms'][0]['count'] == 2
assert results['aggregations']['details.ip']['terms'][0]['key'] == "127.0.0.1"
assert results['aggregations']['details.ip']['terms'][1]['count'] == 1
assert results['aggregations']['details.ip']['terms'][1]['key'] == "192.168.1.1"
def test_aggregation_non_existing_layers_term(self):
events = [
{"test": "value", "note": "abvc"},
{"test": "value", "note": "abvc"},
{"test": "value", "note": "think"},
{"test": "value", "summary": "think"},
]
for event in events:
self.populate_test_object(event)
self.refresh(self.event_index_name)
search_query = SearchQuery()
search_query.add_must(TermMatch('test', 'value'))
search_query.add_aggregation(Aggregation('details.ipinformation'))
results = search_query.execute(self.es_client)
assert list(results['aggregations'].keys()) == ['details.ipinformation']
assert list(results['aggregations']['details.ipinformation'].keys()) == ['terms']
assert len(results['aggregations']['details.ipinformation']['terms']) == 0
def test_aggregation_with_default_size(self):
for num in range(0, 100):
event = {'keyname': 'value' + str(num)}
self.populate_test_object(event)
self.refresh(self.event_index_name)
search_query = SearchQuery()
search_query.add_must(ExistsMatch('keyname'))
search_query.add_aggregation(Aggregation('keyname'))
results = search_query.execute(self.es_client)
assert len(results['aggregations']['keyname']['terms']) == 20
def test_aggregation_with_aggregation_size(self):
for num in range(0, 100):
event = {'keyname': 'value' + str(num)}
self.populate_test_object(event)
self.refresh(self.event_index_name)
search_query = SearchQuery()
search_query.add_must(ExistsMatch('keyname'))
search_query.add_aggregation(Aggregation('keyname', 2))
results = search_query.execute(self.es_client)
assert len(results['aggregations']['keyname']['terms']) == 2