#!/usr/bin/env python3 # -*- coding: utf-8 -*- # FOGLAMP_BEGIN # See: http://foglamp.readthedocs.io/ # FOGLAMP_END """ fogbench -- a Python script used to test FogLAMP. The objective is to simulate payloads for input, REST and other requests against one or more FogLAMP instances. This version of fogbench is meant to test the CoAP and HTTP plugins interface of FogLAMP southbound services. fogbench [IN] -h --help Print this help -i --interval The interval in seconds between each iteration (default: 0) [IN] -k --keep Do not delete (keep) the running sample (default: no) [IN] -o --output Set the output file for statistics [IN] -p --payload Type of payload and protocol (default: coap) [IN] -t --template Set the template to use [IN] -v --version Display the version and exit [IN] -H --host The FogLAMP host (default: localhost) -I --iterations The number of iterations of the test (default: 1) [IN] -O --occurrences The number of occurrences of the template (default: 1) [IN] -P --port The FogLAMP port. Default depends on payload and protocol [IN] -S --statistic The type of statistics to collect Example: $ cd $FOGLAMP_ROOT/bin $ ./fogbench Help: $ ./fogbench -h * Create reading objects from given template, as per the json file name specified with -t * Save those objects to the file, as per the file name specified with -o * Read those objects * Send those to CoAP or HTTP south plugin server, on specific host and port .. todo:: * Try generators """ import sys import os import random import json from datetime import datetime, timezone import argparse import collections import asyncio import aiohttp from .exceptions import * __author__ = "Praveen Garg" __copyright__ = "Copyright (c) 2017 OSIsoft, LLC" __license__ = "Apache 2.0" __version__ = "${VERSION}" _FOGBENCH_VERSION = u"0.1.1" _start_time = [] _end_time = [] _tot_msgs_transferred = [] _tot_byte_transferred = [] _num_iterated = 0 """Statistics to be collected""" # _logger = logger.setup(__name__) def local_timestamp(): """ :return: str - current time stamp with microseconds and machine timezone info :example '2018-05-08 14:06:40.517313+05:30' """ return str(datetime.now(timezone.utc).astimezone()) def read_templates(): templates = [] return templates def parse_template_and_prepare_json(_template_file, _write_to_file=None, _occurrences=1): # template_file = os.path.join(os.path.dirname(__file__), "templates/" + _template_file) with open(_template_file) as data_file: data = json.load(data_file) supported_format_types = ["number", "enum"] for _ in range(_occurrences): readings_ = _prepare_sensor_reading(data, supported_format_types) for r in readings_: _write_readings_to_file(_write_to_file, r) def _write_readings_to_file(to_file, r): with open(to_file, 'a') as the_file: json.dump(r, the_file) the_file.write(os.linesep) def _prepare_sensor_reading(data, supported_format_types): readings = [] for d in data: x_sensor_values = dict() _sensor_value_object_formats = d["sensor_values"] for fmt in _sensor_value_object_formats: if fmt["type"] not in supported_format_types: raise InvalidSensorValueObjectTemplateFormat(u"Invalid format, " u"Can not parse type {}".format(fmt["type"])) if fmt["type"] == "number": # check float precision if any precision = fmt.get("precision", None) min_val = fmt.get("min", None) max_val = fmt.get("max", None) if min_val is None or max_val is None: raise InvalidSensorValueObjectTemplateFormat(u"Invalid format, " u"Min and Max values must be defined for type number.") # print(precision) # print(min_val) # print(max_val) reading = round(random.uniform(min_val, max_val), precision) elif fmt["type"] == "enum": reading = random.choice(fmt["list"]) # print(fmt["name"], reading) x_sensor_values[fmt["name"]] = reading # print(d["name"]) sensor_value_object = dict() sensor_value_object["asset"] = d['name'] sensor_value_object["readings"] = x_sensor_values sensor_value_object["timestamp"] = "{!s}".format(local_timestamp()) # print(json.dumps(sensor_value_object)) ord_dict = collections.OrderedDict(sorted(sensor_value_object.items())) readings.append(ord_dict) return readings def read_out_file(_file=None, _keep=False, _iterations=1, _interval=0, send_to='coap'): global _start_time global _end_time global _tot_msgs_transferred global _tot_byte_transferred global _num_iterated # from pprint import pprint import time # _file = os.path.join(os.path.dirname(__file__), "out/{}".format(outfile)) with open(_file) as f: readings_list = [json.loads(line) for line in f] loop = asyncio.get_event_loop() while _iterations > 0: # Pre-calculate the messages and size msg_transferred_itr = 0 # Messages transferred in every iteration byte_transferred_itr = 0 # Bytes transferred in every iteration for r in readings_list: msg_transferred_itr += 1 byte_transferred_itr += sys.getsizeof(r) if send_to == 'coap': _start_time.append(datetime.now()) for r in readings_list: is_sent = loop.run_until_complete(send_to_coap(r)) if not is_sent: break elif send_to == 'http': _start_time.append(datetime.now()) loop.run_until_complete(send_to_http(readings_list)) _end_time.append(datetime.now()) # End time of every iteration _tot_msgs_transferred.append(msg_transferred_itr) _tot_byte_transferred.append(byte_transferred_itr) _iterations -= 1 _num_iterated += 1 if _iterations != 0: # print(u"Iteration {} completed, waiting for {} seconds".format(_iterations, _interval)) time.sleep(_interval) if not _keep: os.remove(_file) async def send_to_coap(payload): """ POST request to: localhost port 5683 (official IANA assigned CoAP port), URI "/other/sensor-values". """ from aiocoap import Context, Message from aiocoap.numbers.codes import Code from cbor2 import dumps context = await Context.create_client_context() request = Message(payload=dumps(payload), code=Code.POST) request.opt.uri_host = arg_host request.opt.uri_port = arg_port request.opt.uri_path = ("other", "sensor-values") response = await context.request(request).response str_res = str(response.code) status_code = str_res[:4] # or str_res.split()[0] if status_code == "4.00" or status_code == "5.00": print("Error: ", str_res) return False return True async def send_to_http(payload): """ POST request to: host localhost port 6683 (default HTTP south plugin port), uri sensor-reading """ headers = {'content-type': 'application/json'} url = 'http://{}:{}/sensor-reading'.format(arg_host, arg_port) async with aiohttp.ClientSession() as session: async with session.post(url, data=json.dumps(payload), headers=headers) as resp: await resp.text() status_code = resp.status if status_code in range(400, 500): print("Bad request error | code:{}, reason: {}".format(status_code, resp.reason)) return False if status_code in range(500, 600): print("Server error | code:{}, reason: {}".format(status_code, resp.reason)) return False return True def get_statistics(_stats_type=None, _out_file=None): stat = '' global _start_time global _end_time global _tot_msgs_transferred global _tot_byte_transferred global _num_iterated if _stats_type == 'total': stat += u"Total Statistics:\n" stat += (u"\nStart Time: {}".format(datetime.strftime(_start_time[0], "%Y-%m-%d %H:%M:%S.%f"))) stat += (u"\nEnd Time: {}\n".format(datetime.strftime(_end_time[-1], "%Y-%m-%d %H:%M:%S.%f"))) stat += (u"\nTotal Messages Transferred: {}".format(sum(_tot_msgs_transferred))) stat += (u"\nTotal Bytes Transferred: {}\n".format(sum(_tot_byte_transferred))) stat += (u"\nTotal Iterations: {}".format(_num_iterated)) stat += (u"\nTotal Messages per Iteration: {}".format(sum(_tot_msgs_transferred)/_num_iterated)) stat += (u"\nTotal Bytes per Iteration: {}\n".format(sum(_tot_byte_transferred)/_num_iterated)) _msg_rate = [] _byte_rate = [] for itr in range(_num_iterated): time_taken = _end_time[itr] - _start_time[itr] _msg_rate.append(_tot_msgs_transferred[itr]/(time_taken.seconds+time_taken.microseconds/1E6)) _byte_rate.append(_tot_byte_transferred[itr] / (time_taken.seconds+time_taken.microseconds/1E6)) stat += (u"\nMin messages/second: {}".format(min(_msg_rate))) stat += (u"\nMax messages/second: {}".format(max(_msg_rate))) stat += (u"\nAvg messages/second: {}\n".format(sum(_msg_rate)/_num_iterated)) stat += (u"\nMin Bytes/second: {}".format(min(_byte_rate))) stat += (u"\nMax Bytes/second: {}".format(max(_byte_rate))) stat += (u"\nAvg Bytes/second: {}".format(sum(_byte_rate)/_num_iterated)) if _out_file: with open(_out_file, 'w') as f: f.write(stat) else: print(stat) # should we also show total time diff? end_time - start_time def check_server(payload_type='coap'): template_str = ">>> Make sure south {} plugin service is running \n & listening on specified host and port \n" if payload_type == 'coap': print(template_str.format("CoAP")) elif payload_type == 'http': print(template_str.format("HTTP")) parser = argparse.ArgumentParser(prog='fogbench') parser.description = '%(prog)s -- a Python script used to test FogLAMP (simulate payloads)' parser.epilog = 'The initial version of %(prog)s is meant to test the south plugin interface of ' \ 'FogLAMP using CoAP or HTTP' parser.add_argument('-v', '--version', action='version', version='%(prog)s {0!s}'.format(_FOGBENCH_VERSION)) parser.add_argument('-k', '--keep', default=False, choices=['y', 'yes', 'n', 'no'], help='Do not delete the running sample (default: no)') parser.add_argument('-t', '--template', required=True, help='Set the template file, json extension') parser.add_argument('-o', '--output', default=None, help='Set the statistics output file') parser.add_argument('-p', '--payload', default='coap', choices=['coap', 'http'], help='Type of payload ' 'and protocol (default: coap)') parser.add_argument('-I', '--iterations', help='The number of iterations of the test (default: 1)') parser.add_argument('-O', '--occurrences', help='The number of occurrences of the template (default: 1)') parser.add_argument('-H', '--host', help='Server host address (default: localhost)') parser.add_argument('-P', '--port', help='The FogLAMP port. (default: 5683)') parser.add_argument('-i', '--interval', default=0, help='The interval in seconds for each iteration (default: 0)') parser.add_argument('-S', '--statistics', default='total', choices=['total'], help='The type of statistics to collect ' '(default: total)') namespace = parser.parse_args(sys.argv[1:]) infile = '{0}'.format(namespace.template if namespace.template else '') statistics_file = os.path.join(os.path.dirname(__file__), "out/{}".format(namespace.output)) if namespace.output else None keep_the_file = True if namespace.keep in ['y', 'yes'] else False # iterations and occurrences arg_iterations = int(namespace.iterations) if namespace.iterations else 1 arg_occurrences = int(namespace.occurrences) if namespace.occurrences else 1 # interval between each iteration arg_interval = int(namespace.interval) if namespace.interval else 0 arg_stats_type = '{0}'.format(namespace.statistics) if namespace.statistics else 'total' if namespace.payload: arg_payload_protocol = namespace.payload arg_host = '{0}'.format(namespace.host) if namespace.host else 'localhost' default_port = 6683 if arg_payload_protocol == 'http' else 5683 arg_port = int(namespace.port) if namespace.port else default_port check_server(arg_payload_protocol) sample_file = os.path.join("/tmp", "foglamp_running_sample.{}".format(os.getpid())) parse_template_and_prepare_json(_template_file=infile, _write_to_file=sample_file, _occurrences=arg_occurrences) read_out_file(_file=sample_file, _keep=keep_the_file, _iterations=arg_iterations, _interval=arg_interval, send_to=arg_payload_protocol) get_statistics(_stats_type=arg_stats_type, _out_file=statistics_file) # TODO: Change below per local_timestamp() values """ Expected output from given template { "timestamp" : "2017-08-04T06:59:57.503Z", "asset" : "TI sensorTag/luxometer", "sensor_values" : { "lux" : 49 } } { "timestamp" : "2017-08-04T06:59:57.863Z", "asset" : "TI sensorTag/pressure", "sensor_values" : { "pressure" : 1021.2 } } { "timestamp" : "2017-08-04T06:59:58.863Z", "asset" : "TI sensorTag/humidity", "sensor_values" : { "humidity" : 71.2, "temperature" : 18.6 } } { "timestamp" : "2017-08-04T06:59:59.863Z", "asset" : "TI sensorTag/temperature", "sensor_values" : { "object" : 18.2, "ambient" : 21.6 } } { "timestamp" : "2017-08-04T07:00:00.863Z", "asset" : "TI sensorTag/accelerometer", "sensor_values" : { "x" : 1.2, "y" : 0.0, "z" : -0.6 } } { "timestamp" : "2017-08-04T07:00:01.863Z", "asset" : "TI sensorTag/gyroscope", "sensor_values" : { "x" : 101.2, "y" : 46.2, "z" : -12.6 } } { "timestamp" : "2017-08-04T07:00:02.863Z", "asset" : "TI sensorTag/magnetometer", "sensor_values" : { "x" : 101.2, "y" : 46.2, "z" : -12.6 } } { "timestamp" : "2017-08-04T07:00:03.863Z", "asset" : "mouse", "sensor_values" : { "button" : "down" } } { "timestamp" : "2017-08-04T07:00:04.863Z", "asset" : "wall clock", "sensor_values" : { "tick" : "tock" } } """