| |
| |
| |
| |
|
|
| import json |
| import os |
| import typing as types |
|
|
| import netaddr |
|
|
| from lib.config import ES |
| from mozdef_util.elasticsearch_client import ElasticsearchClient |
| from mozdef_util.query_models import SearchQuery, TermMatch, PhraseMatch |
| from mozdef_util.utilities.toUTC import toUTC |
|
|
|
|
| CONFIG_FILE = os.path.join( |
| os.path.dirname(__file__), |
| __file__.replace('py', 'json'), |
| ) |
|
|
|
|
| |
|
|
| class Config(types.NamedTuple): |
| '''Expected configuration for the plugin, loaded from JSON. |
| ''' |
|
|
| indices_to_search: types.List[str] |
| search_window_hours: int |
| vpn_ip_cidrs: types.List[str] |
|
|
| def load(file_path: str) -> 'Config': |
| '''Attempt to parse a configuration from a JSON file. |
| ''' |
|
|
| with open(file_path) as cfg_file: |
| return Config(**json.load(cfg_file)) |
|
|
|
|
| class message(object): |
| '''Alert plugin that handles any alert and attempts to enrich it with |
| information about VPN IP address assignments. |
| |
| This plugin will add the following fields to the alert: |
| |
| ```json |
| { |
| "details": { |
| "vpnassignment": { |
| "username": "user@mozilla.com", |
| "originalip": "1.2.3.4", |
| } |
| } |
| } |
| ``` |
| ''' |
|
|
| def __init__(self): |
| self.registration = ['*'] |
|
|
| self.config = Config.load(CONFIG_FILE) |
|
|
| |
| |
| es_client = ElasticsearchClient(ES['servers']) |
|
|
| def search_fn(query): |
| return query.execute( |
| es_client, |
| indices=self.config.indices_to_search, |
| ).get('hits', []) |
|
|
| self.search = search_fn |
|
|
| def onMessage(self, msg): |
| return enrich( |
| msg, |
| self.config.search_window_hours, |
| self.config.vpn_ip_cidrs, |
| self.search, |
| ) |
|
|
|
|
| def enrich( |
| alert: dict, |
| search_window_hours: int, |
| vpn_ip_cidrs: types.List[str], |
| search_fn: types.Callable[[SearchQuery], types.List[dict]], |
| ) -> dict: |
| '''Search for events that describe an assignment of a VPN IP address to |
| the sourceipaddress in an alert. |
| ''' |
|
|
| details = alert.get('details', {}) |
|
|
| source_ip = details.get('sourceipaddress') |
|
|
| if source_ip is None: |
| return alert |
|
|
| if netaddr.IPAddress(source_ip) not in netaddr.IPSet(vpn_ip_cidrs): |
| return alert |
|
|
| search_vpn_assignment = SearchQuery({ |
| 'hours': search_window_hours, |
| }) |
| search_vpn_assignment.add_must([ |
| TermMatch('tags', 'vpn'), |
| TermMatch('tags', 'netfilter'), |
| TermMatch('details.success', 'true'), |
| TermMatch('details.vpnip', source_ip), |
| PhraseMatch('summary', 'netfilter add upon connection'), |
| ]) |
|
|
| assign_events = sorted( |
| [hit.get('_source', {}) for hit in search_fn(search_vpn_assignment)], |
| key=lambda evt: toUTC(evt['utctimestamp']), |
| reverse=True, |
| ) |
|
|
| if len(assign_events) == 0: |
| return alert |
|
|
| event = assign_events[0] |
|
|
| details['vpnassignment'] = { |
| 'username': event['details']['username'], |
| 'originalip': event['details']['sourceipaddress'], |
| } |
|
|
| alert['details'] = details |
|
|
| return alert |
|
|