| |
|
|
| |
| |
| |
| |
|
|
|
|
| import json |
| import kombu |
| import sys |
| import socket |
| from configlib import getConfig, OptionParser |
| from datetime import datetime |
| from kombu import Connection, Queue, Exchange |
| from kombu.mixins import ConsumerMixin |
|
|
| from mozdef_util.elasticsearch_client import ( |
| ElasticsearchClient, |
| ElasticsearchBadServer, |
| ElasticsearchInvalidIndex, |
| ElasticsearchException, |
| ) |
|
|
| from mozdef_util.utilities.toUTC import toUTC |
| from mozdef_util.utilities.logger import logger, initLogger |
| from mozdef_util.utilities.to_unicode import toUnicode |
| from mozdef_util.utilities.remove_at import removeAt |
|
|
| from lib.plugins import sendEventToPlugins, registerPlugins |
|
|
|
|
| |
| try: |
| import uwsgi |
|
|
| hasUWSGI = True |
| except ImportError as e: |
| hasUWSGI = False |
|
|
|
|
| def keyMapping(aDict): |
| """map common key/fields to a normalized structure, |
| explicitly typed when possible to avoid schema changes for upsteam consumers |
| Special accomodations made for logstash,nxlog, beaver, heka and CEF |
| Some shippers attempt to conform to logstash-style @fieldname convention. |
| This strips the leading at symbol since it breaks some elastic search |
| libraries like elasticutils. |
| """ |
| returndict = dict() |
|
|
| |
| |
|
|
| |
| returndict["receivedtimestamp"] = toUTC(datetime.now()).isoformat() |
| returndict["mozdefhostname"] = options.mozdefhostname |
| returndict["details"] = {} |
| try: |
| for k, v in aDict.items(): |
| k = removeAt(k).lower() |
|
|
| if k == "sourceip": |
| returndict["details"]["eventsourceipaddress"] = v |
|
|
| if k in ("facility", "source"): |
| returndict["source"] = v |
|
|
| if k in ("message", "summary"): |
| returndict["summary"] = toUnicode(v) |
|
|
| if k in ("payload") and "summary" not in aDict: |
| |
| returndict["summary"] = toUnicode(v) |
| elif k in ("payload"): |
| returndict["details"]["payload"] = toUnicode(v) |
|
|
| if k in ("eventtime", "timestamp", "utctimestamp", "date"): |
| returndict["utctimestamp"] = toUTC(v).isoformat() |
| returndict["timestamp"] = toUTC(v).isoformat() |
|
|
| if k in ("hostname", "source_host", "host"): |
| returndict["hostname"] = toUnicode(v) |
|
|
| if k in ("tags"): |
| if "tags" not in returndict: |
| returndict["tags"] = [] |
| if type(v) == list: |
| returndict["tags"] += v |
| else: |
| if len(v) > 0: |
| returndict["tags"].append(v) |
|
|
| |
| if k in ("syslogseverity", "severity", "severityvalue", "level", "priority"): |
| returndict["severity"] = toUnicode(v).upper() |
|
|
| if k in ("facility", "syslogfacility"): |
| returndict["facility"] = toUnicode(v) |
|
|
| if k in ("pid", "processid"): |
| returndict["processid"] = toUnicode(v) |
|
|
| |
| if k in ("pname", "processname", "sourcename", "program"): |
| returndict["processname"] = toUnicode(v) |
|
|
| |
| if k in ("path", "logger", "file"): |
| returndict["eventsource"] = toUnicode(v) |
|
|
| if k in ("type", "eventtype", "category"): |
| returndict["category"] = toUnicode(v) |
|
|
| |
| if k in ("fields", "details"): |
| if type(v) is not dict: |
| returndict["details"]["message"] = v |
| else: |
| if len(v) > 0: |
| for details_key, details_value in v.items(): |
| returndict["details"][details_key] = details_value |
|
|
| |
| |
| |
| if k.startswith("fields.") or k.startswith("details."): |
| newName = k.replace("fields.", "") |
| newName = newName.lower().replace("details.", "") |
| |
| |
| |
| |
| |
| if newName.endswith("_int"): |
| returndict["details"][str(newName)] = int(v) |
| elif newName.endswith("_float"): |
| returndict["details"][str(newName)] = float(v) |
| else: |
| returndict["details"][str(newName)] = toUnicode(v) |
|
|
| |
| if "Domain" in aDict and "SourceModuleType" in aDict: |
| |
| |
| returndict["details"][k] = v |
|
|
| if "utctimestamp" not in returndict: |
| |
| returndict["utctimestamp"] = toUTC(datetime.now()).isoformat() |
|
|
| if "type" not in returndict: |
| |
| |
| returndict["type"] = "event" |
|
|
| except Exception as e: |
| logger.exception("Received exception while normalizing message: %r" % e) |
| logger.error("Malformed message: %r" % aDict) |
| return None |
|
|
| return returndict |
|
|
|
|
| def esConnect(): |
| """open or re-open a connection to elastic search""" |
| return ElasticsearchClient((list("{0}".format(s) for s in options.esservers)), options.esbulksize) |
|
|
|
|
| class taskConsumer(ConsumerMixin): |
| def __init__(self, mqConnection, taskQueue, topicExchange, esConnection): |
| self.connection = mqConnection |
| self.esConnection = esConnection |
| self.taskQueue = taskQueue |
| self.topicExchange = topicExchange |
| self.mqproducer = self.connection.Producer(serializer="json") |
| if hasUWSGI: |
| self.muleid = uwsgi.mule_id() |
| else: |
| self.muleid = 0 |
|
|
| def get_consumers(self, Consumer, channel): |
| consumer = Consumer( |
| self.taskQueue, callbacks=[self.on_message], accept=["json", "text/plain"], no_ack=(not options.mqack) |
| ) |
| consumer.qos(prefetch_count=options.prefetch) |
| return [consumer] |
|
|
| def on_message(self, body, message): |
| |
| try: |
| |
| metadata = {"index": "events", "id": None} |
| |
| if isinstance(body, dict): |
| bodyDict = body |
| elif isinstance(body, str): |
| try: |
| bodyDict = json.loads(body) |
| except ValueError as e: |
| |
| logger.error("Exception: unknown body type received: %r" % body) |
| message.ack() |
| return |
| else: |
| logger.error("Exception: unknown body type received: %r" % body) |
| message.ack() |
| return |
|
|
| if "customendpoint" in bodyDict and bodyDict["customendpoint"]: |
| |
| |
| (normalizedDict, metadata) = sendEventToPlugins(bodyDict, metadata, pluginList) |
| else: |
| |
| |
| normalizedDict = keyMapping(bodyDict) |
|
|
| |
| if normalizedDict is not None and isinstance(normalizedDict, dict): |
| (normalizedDict, metadata) = sendEventToPlugins(normalizedDict, metadata, pluginList) |
|
|
| |
| |
| if normalizedDict is None: |
| message.ack() |
| return |
|
|
| |
| jbody = json.JSONEncoder().encode(normalizedDict) |
|
|
| try: |
| bulk = False |
| if options.esbulksize != 0: |
| bulk = True |
|
|
| self.esConnection.save_event(index=metadata["index"], doc_id=metadata["id"], body=jbody, bulk=bulk) |
|
|
| except (ElasticsearchBadServer, ElasticsearchInvalidIndex) as e: |
| |
| try: |
| self.esConnection = esConnect() |
| message.requeue() |
| return |
| except kombu.exceptions.MessageStateError: |
| |
| logger.exception( |
| "Elastic Search and RabbitMQ exception (messages lost) while indexing event: %r" % e |
| ) |
| return |
| except ElasticsearchException as e: |
| |
| try: |
| logger.exception("ElasticSearchException while indexing event: %r" % e) |
| logger.error("Malformed message body: %r" % body) |
| message.requeue() |
| return |
| except kombu.exceptions.MessageStateError: |
| |
| logger.exception( |
| "Elastic Search and RabbitMQ exception (messages lost) while indexing event: %r" % e |
| ) |
| return |
| |
| |
| |
| |
| message.ack() |
| except Exception as e: |
| logger.exception(e) |
| logger.error("Malformed message body: %r" % body) |
|
|
|
|
| def main(): |
| |
| |
| |
| connString = "amqp://{0}:{1}@{2}:{3}/{4}".format( |
| options.mquser, options.mqpassword, options.mqserver, options.mqport, options.mqvhost |
| ) |
| if options.mqprotocol == "amqps": |
| mqSSL = True |
| else: |
| mqSSL = False |
| mqConn = Connection(connString, ssl=mqSSL) |
| |
| if options.mqack: |
| |
| eventTaskExchange = Exchange(name=options.taskexchange, type="direct", durable=True, delivery_mode=2) |
| else: |
| |
| eventTaskExchange = Exchange(name=options.taskexchange, type="direct", durable=True, delivery_mode=1) |
| eventTaskExchange(mqConn).declare() |
| |
| if options.mqack: |
| eventTaskQueue = Queue( |
| options.taskexchange, |
| exchange=eventTaskExchange, |
| routing_key=options.taskexchange, |
| durable=True, |
| no_ack=False, |
| ) |
| else: |
| eventTaskQueue = Queue( |
| options.taskexchange, |
| exchange=eventTaskExchange, |
| routing_key=options.taskexchange, |
| durable=True, |
| no_ack=True, |
| ) |
| eventTaskQueue(mqConn).declare() |
|
|
| |
| eventTopicExchange = Exchange(name=options.eventexchange, type="topic", durable=False, delivery_mode=1) |
| eventTopicExchange(mqConn).declare() |
|
|
| if hasUWSGI: |
| logger.info("started as uwsgi mule {0}".format(uwsgi.mule_id())) |
| else: |
| logger.info("started without uwsgi") |
| |
| taskConsumer(mqConn, eventTaskQueue, eventTopicExchange, es).run() |
|
|
|
|
| def initConfig(): |
| |
| options.mozdefhostname = getConfig("mozdefhostname", socket.gethostname(), options.configfile) |
|
|
| |
| options.esservers = list(getConfig("esservers", "http://localhost:9200", options.configfile).split(",")) |
| options.esbulksize = getConfig("esbulksize", 0, options.configfile) |
| options.esbulktimeout = getConfig("esbulktimeout", 30, options.configfile) |
|
|
| |
| options.mqserver = getConfig("mqserver", "localhost", options.configfile) |
| options.taskexchange = getConfig("taskexchange", "eventtask", options.configfile) |
| options.eventexchange = getConfig("eventexchange", "events", options.configfile) |
| |
| options.prefetch = getConfig("prefetch", 50, options.configfile) |
| options.mquser = getConfig("mquser", "guest", options.configfile) |
| options.mqpassword = getConfig("mqpassword", "guest", options.configfile) |
| options.mqport = getConfig("mqport", 5672, options.configfile) |
| options.mqvhost = getConfig("mqvhost", "/", options.configfile) |
| |
| options.mqprotocol = getConfig("mqprotocol", "amqp", options.configfile) |
| |
| |
| |
| options.mqack = getConfig("mqack", True, options.configfile) |
|
|
|
|
| if __name__ == "__main__": |
| |
| parser = OptionParser() |
| parser.add_option( |
| "-c", dest="configfile", default=sys.argv[0].replace(".py", ".conf"), help="configuration file to use" |
| ) |
| (options, args) = parser.parse_args() |
| initConfig() |
| initLogger(options) |
|
|
| |
| es = esConnect() |
|
|
| pluginList = registerPlugins() |
|
|
| try: |
| main() |
| except KeyboardInterrupt as e: |
| logger.info("Exiting worker") |
| if options.esbulksize != 0: |
| es.finish_bulk() |
| except Exception as e: |
| if options.esbulksize != 0: |
| es.finish_bulk() |
| raise |
|
|