Spaces:
Build error
Build error
Validify-testbot-1
/
botbuilder-python
/libraries
/botbuilder-applicationinsights
/botbuilder
/applicationinsights
/flask
/flask_telemetry_middleware.py
| # Copyright (c) Microsoft Corporation. All rights reserved. | |
| # Licensed under the MIT License. | |
| """Flask Telemetry Bot Middleware.""" | |
| from io import BytesIO | |
| from threading import current_thread | |
| # Map of thread id => POST body text | |
| _REQUEST_BODIES = {} | |
| def retrieve_flask_body(): | |
| """retrieve_flask_body | |
| Retrieve the POST body text from temporary cache. | |
| The POST body corresponds with the thread id and should resides in | |
| cache just for lifetime of request. | |
| """ | |
| result = _REQUEST_BODIES.pop(current_thread().ident, None) | |
| return result | |
| class BotTelemetryMiddleware: | |
| """Bot Telemetry Middleware | |
| Save off the POST body to later populate bot-specific properties to | |
| add to Application Insights. | |
| Example adding telemetry middleware to Flask: | |
| app = Flask(__name__) | |
| app.wsgi_app = BotTelemetryMiddleware(app.wsgi_app) | |
| """ | |
| def __init__(self, app): | |
| self.app = app | |
| def __call__(self, environ, start_response): | |
| self.process_request(environ) | |
| return self.app(environ, start_response) | |
| def process_request(self, environ) -> bool: | |
| """Process the incoming Flask request.""" | |
| # Bot Service doesn't handle anything over 256k | |
| length = int(environ.get("CONTENT_LENGTH", "0")) | |
| if length > 256 * 1024: | |
| print(f"request too long - rejected") | |
| else: | |
| body_bytes = environ["wsgi.input"].read(length) | |
| environ["wsgi.input"] = BytesIO(body_bytes) | |
| body_unicode = body_bytes.decode("utf-8") | |
| # Sanity check JSON | |
| if body_unicode is not None: | |
| # Integration layer expecting just the json text. | |
| _REQUEST_BODIES[current_thread().ident] = body_unicode | |
| return True | |