File size: 3,625 Bytes
7c89ed7
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at https://mozilla.org/MPL/2.0/.
# Copyright (c) 2017 Mozilla Corporation

import json
import os
import typing as types

import netaddr

from lib.config import ES
from mozdef_util.elasticsearch_client import ElasticsearchClient
from mozdef_util.query_models import SearchQuery, TermMatch, PhraseMatch
from mozdef_util.utilities.toUTC import toUTC


CONFIG_FILE = os.path.join(
    os.path.dirname(__file__),
    __file__.replace('py', 'json'),
)


# TODO: Switch to dataclass when we adopt Python 3.7+

class Config(types.NamedTuple):
    '''Expected configuration for the plugin, loaded from JSON.
    '''

    indices_to_search: types.List[str]
    search_window_hours: int
    vpn_ip_cidrs: types.List[str]

    def load(file_path: str) -> 'Config':
        '''Attempt to parse a configuration from a JSON file.
        '''

        with open(file_path) as cfg_file:
            return Config(**json.load(cfg_file))


class message(object):
    '''Alert plugin that handles any alert and attempts to enrich it with
    information about VPN IP address assignments.

    This plugin will add the following fields to the alert:

    ```json
    {
        "details": {
            "vpnassignment": {
                "username": "user@mozilla.com",
                "originalip": "1.2.3.4",
            }
        }
    }
    ```
    '''

    def __init__(self):
        self.registration = ['*']

        self.config = Config.load(CONFIG_FILE)

        # Create a closure around an Elasticsearch client that can be invoked
        # with search terms to find events in the configured indices.
        es_client = ElasticsearchClient(ES['servers'])

        def search_fn(query):
            return query.execute(
                es_client,
                indices=self.config.indices_to_search,
            ).get('hits', [])

        self.search = search_fn

    def onMessage(self, msg):
        return enrich(
            msg,
            self.config.search_window_hours,
            self.config.vpn_ip_cidrs,
            self.search,
        )


def enrich(
    alert: dict,
    search_window_hours: int,
    vpn_ip_cidrs: types.List[str],
    search_fn: types.Callable[[SearchQuery], types.List[dict]],
) -> dict:
    '''Search for events that describe an assignment of a VPN IP address to
    the sourceipaddress in an alert.
    '''

    details = alert.get('details', {})

    source_ip = details.get('sourceipaddress')

    if source_ip is None:
        return alert

    if netaddr.IPAddress(source_ip) not in netaddr.IPSet(vpn_ip_cidrs):
        return alert

    search_vpn_assignment = SearchQuery({
        'hours': search_window_hours,
    })
    search_vpn_assignment.add_must([
        TermMatch('tags', 'vpn'),
        TermMatch('tags', 'netfilter'),
        TermMatch('details.success', 'true'),
        TermMatch('details.vpnip', source_ip),
        PhraseMatch('summary', 'netfilter add upon connection'),
    ])

    assign_events = sorted(
        [hit.get('_source', {}) for hit in search_fn(search_vpn_assignment)],
        key=lambda evt: toUTC(evt['utctimestamp']),
        reverse=True,  # Sort into descending order from most recent to least.
    )

    if len(assign_events) == 0:
        return alert

    event = assign_events[0]

    details['vpnassignment'] = {
        'username': event['details']['username'],
        'originalip': event['details']['sourceipaddress'],
    }

    alert['details'] = details

    return alert