Mozdef / cron /sqs_queue_status.py
ineso22's picture
Upload folder using huggingface_hub
7c89ed7 verified
#!/usr/bin/env python
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at https://mozilla.org/MPL/2.0/.
# Copyright (c) 2014 Mozilla Corporation
# kombu's support for SQS is buggy
# so this version uses boto
# to read an SQS queue and put events into elastic search
# in the same manner as esworker_eventtask.py
import json
import os
import sys
from configlib import getConfig, OptionParser
from datetime import datetime
from hashlib import md5
import boto3
from mozdef_util.utilities.toUTC import toUTC
from mozdef_util.utilities.logger import logger
from mozdef_util.elasticsearch_client import ElasticsearchClient
def getDocID(sqsregionidentifier):
# create a hash to use as the ES doc id
# hostname plus salt as doctype.latest
hash = md5()
seed = '{0}.mozdefhealth.latest'.format(sqsregionidentifier)
hash.update(seed.encode())
return hash.hexdigest()
def getQueueSizes():
logger.debug('starting')
logger.debug(options)
es = ElasticsearchClient(options.esservers)
sqs_client = boto3.client(
"sqs",
region_name=options.region,
aws_access_key_id=options.accesskey,
aws_secret_access_key=options.secretkey
)
queues_stats = {
'queues': [],
'total_feeds': len(options.taskexchange),
'total_messages_ready': 0,
'username': 'mozdef'
}
for queue_name in options.taskexchange:
logger.debug('Looking for sqs queue stats in queue' + queue_name)
queue_url = sqs_client.get_queue_url(QueueName=queue_name)['QueueUrl']
queue_attributes = sqs_client.get_queue_attributes(QueueUrl=queue_url, AttributeNames=['All'])['Attributes']
queue_stats = {
'queue': queue_name,
}
if 'ApproximateNumberOfMessages' in queue_attributes:
queue_stats['messages_ready'] = int(queue_attributes['ApproximateNumberOfMessages'])
queues_stats['total_messages_ready'] += queue_stats['messages_ready']
if 'ApproximateNumberOfMessagesNotVisible' in queue_attributes:
queue_stats['messages_inflight'] = int(queue_attributes['ApproximateNumberOfMessagesNotVisible'])
if 'ApproximateNumberOfMessagesDelayed' in queue_attributes:
queue_stats['messages_delayed'] = int(queue_attributes['ApproximateNumberOfMessagesDelayed'])
queues_stats['queues'].append(queue_stats)
# setup a log entry for health/status.
sqsid = '{0}-{1}'.format(options.account, options.region)
healthlog = dict(
utctimestamp=toUTC(datetime.now()).isoformat(),
hostname=sqsid,
processid=os.getpid(),
processname=sys.argv[0],
severity='INFO',
summary='mozdef health/status',
category='mozdef',
source='aws-sqs',
tags=[],
details=queues_stats)
healthlog['tags'] = ['mozdef', 'status', 'sqs']
healthlog['type'] = 'mozdefhealth'
# post to elasticsearch servers directly without going through
# message queues in case there is an availability issue
es.save_event(index=options.index, body=json.dumps(healthlog))
# post another doc with a static docid and tag
# for use when querying for the latest sqs status
healthlog['tags'] = ['mozdef', 'status', 'sqs-latest']
es.save_event(index=options.index, doc_id=getDocID(sqsid), body=json.dumps(healthlog))
def main():
logger.debug('Starting')
logger.debug(options)
getQueueSizes()
def initConfig():
# aws options
options.accesskey = getConfig('accesskey', '', options.configfile)
options.secretkey = getConfig('secretkey', '', options.configfile)
options.region = getConfig('region', 'us-west-1', options.configfile)
options.taskexchange = getConfig('taskexchange', 'nsmglobalssqslists', options.configfile).split(',')
options.output = getConfig('output', 'stdout', options.configfile)
# mozdef options
options.sysloghostname = getConfig('sysloghostname', 'localhost', options.configfile)
options.syslogport = getConfig('syslogport', 514, options.configfile)
options.esservers = list(getConfig('esservers', 'http://localhost:9200', options.configfile).split(','))
options.index = getConfig('index', 'mozdefstate', options.configfile)
options.account = getConfig('account', '', options.configfile)
if __name__ == '__main__':
# configure ourselves
parser = OptionParser()
parser.add_option("-c", dest='configfile', default=sys.argv[0].replace('.py', '.conf'), help="configuration file to use")
(options, args) = parser.parse_args()
initConfig()
main()