|
|
""" |
|
|
Enterprise Structured Logging with Correlation IDs |
|
|
|
|
|
Features: |
|
|
- Structured logging with structlog |
|
|
- Correlation ID tracking across requests |
|
|
- Request/response logging |
|
|
- Performance timing |
|
|
- JSON output for log aggregation (ELK, Datadog, etc.) |
|
|
""" |
|
|
import os |
|
|
import sys |
|
|
import uuid |
|
|
import time |
|
|
import logging |
|
|
from typing import Optional |
|
|
from contextvars import ContextVar |
|
|
from aiohttp import web |
|
|
|
|
|
import structlog |
|
|
|
|
|
|
|
|
correlation_id_var: ContextVar[Optional[str]] = ContextVar('correlation_id', default=None) |
|
|
request_start_time_var: ContextVar[Optional[float]] = ContextVar('request_start_time', default=None) |
|
|
|
|
|
|
|
|
def get_correlation_id() -> str: |
|
|
"""Get current correlation ID or generate new one""" |
|
|
corr_id = correlation_id_var.get() |
|
|
if not corr_id: |
|
|
corr_id = str(uuid.uuid4()) |
|
|
correlation_id_var.set(corr_id) |
|
|
return corr_id |
|
|
|
|
|
|
|
|
def set_correlation_id(corr_id: str): |
|
|
"""Set correlation ID""" |
|
|
correlation_id_var.set(corr_id) |
|
|
|
|
|
|
|
|
def add_correlation_id(logger, method_name, event_dict): |
|
|
"""Add correlation ID to log context""" |
|
|
event_dict["correlation_id"] = get_correlation_id() |
|
|
return event_dict |
|
|
|
|
|
|
|
|
def add_timestamp(logger, method_name, event_dict): |
|
|
"""Add ISO timestamp to log""" |
|
|
event_dict["timestamp"] = time.strftime("%Y-%m-%dT%H:%M:%S") |
|
|
return event_dict |
|
|
|
|
|
|
|
|
def add_service_info(logger, method_name, event_dict): |
|
|
"""Add service information to log""" |
|
|
event_dict["service"] = os.getenv("SERVICE_NAME", "cx_ai_agent") |
|
|
event_dict["environment"] = os.getenv("ENVIRONMENT", "development") |
|
|
return event_dict |
|
|
|
|
|
|
|
|
def configure_logging( |
|
|
level: str = "INFO", |
|
|
json_output: bool = False, |
|
|
service_name: str = "cx_ai_agent" |
|
|
): |
|
|
""" |
|
|
Configure structured logging |
|
|
|
|
|
Args: |
|
|
level: Log level (DEBUG, INFO, WARNING, ERROR, CRITICAL) |
|
|
json_output: Whether to output JSON format (for production) |
|
|
service_name: Service name for logging |
|
|
""" |
|
|
os.environ["SERVICE_NAME"] = service_name |
|
|
|
|
|
|
|
|
processors = [ |
|
|
structlog.contextvars.merge_contextvars, |
|
|
structlog.stdlib.filter_by_level, |
|
|
add_correlation_id, |
|
|
add_timestamp, |
|
|
add_service_info, |
|
|
structlog.stdlib.add_logger_name, |
|
|
structlog.stdlib.add_log_level, |
|
|
structlog.stdlib.PositionalArgumentsFormatter(), |
|
|
structlog.processors.TimeStamper(fmt="iso"), |
|
|
structlog.processors.StackInfoRenderer(), |
|
|
] |
|
|
|
|
|
if json_output: |
|
|
|
|
|
processors.append(structlog.processors.JSONRenderer()) |
|
|
else: |
|
|
|
|
|
processors.extend([ |
|
|
structlog.processors.format_exc_info, |
|
|
structlog.dev.ConsoleRenderer(colors=True) |
|
|
]) |
|
|
|
|
|
structlog.configure( |
|
|
processors=processors, |
|
|
wrapper_class=structlog.stdlib.BoundLogger, |
|
|
context_class=dict, |
|
|
logger_factory=structlog.stdlib.LoggerFactory(), |
|
|
cache_logger_on_first_use=True, |
|
|
) |
|
|
|
|
|
|
|
|
logging.basicConfig( |
|
|
format="%(message)s", |
|
|
stream=sys.stdout, |
|
|
level=getattr(logging, level.upper()) |
|
|
) |
|
|
|
|
|
logger = structlog.get_logger() |
|
|
logger.info("Structured logging configured", level=level, json_output=json_output) |
|
|
|
|
|
|
|
|
def get_logger(name: str = None) -> structlog.stdlib.BoundLogger: |
|
|
""" |
|
|
Get a structured logger |
|
|
|
|
|
Args: |
|
|
name: Logger name (optional) |
|
|
|
|
|
Returns: |
|
|
Structured logger instance |
|
|
""" |
|
|
return structlog.get_logger(name) |
|
|
|
|
|
|
|
|
class LoggingMiddleware: |
|
|
"""aiohttp middleware for request/response logging""" |
|
|
|
|
|
def __init__(self, logger_name: str = "mcp.server"): |
|
|
self.logger = get_logger(logger_name) |
|
|
|
|
|
@web.middleware |
|
|
async def middleware(self, request: web.Request, handler): |
|
|
"""Middleware handler""" |
|
|
|
|
|
|
|
|
corr_id = request.headers.get("X-Correlation-ID") or request.headers.get("X-Request-ID") |
|
|
if not corr_id: |
|
|
corr_id = str(uuid.uuid4()) |
|
|
|
|
|
set_correlation_id(corr_id) |
|
|
|
|
|
|
|
|
start_time = time.time() |
|
|
request_start_time_var.set(start_time) |
|
|
|
|
|
|
|
|
method = request.method |
|
|
path = request.path |
|
|
client_ip = request.remote or "unknown" |
|
|
user_agent = request.headers.get("User-Agent", "unknown") |
|
|
|
|
|
|
|
|
self.logger.info( |
|
|
"request_started", |
|
|
method=method, |
|
|
path=path, |
|
|
client_ip=client_ip, |
|
|
user_agent=user_agent, |
|
|
correlation_id=corr_id |
|
|
) |
|
|
|
|
|
try: |
|
|
|
|
|
response = await handler(request) |
|
|
|
|
|
|
|
|
duration = time.time() - start_time |
|
|
|
|
|
|
|
|
self.logger.info( |
|
|
"request_completed", |
|
|
method=method, |
|
|
path=path, |
|
|
status=response.status, |
|
|
duration_ms=round(duration * 1000, 2), |
|
|
correlation_id=corr_id |
|
|
) |
|
|
|
|
|
|
|
|
response.headers["X-Correlation-ID"] = corr_id |
|
|
|
|
|
return response |
|
|
|
|
|
except Exception as e: |
|
|
|
|
|
duration = time.time() - start_time |
|
|
|
|
|
|
|
|
self.logger.error( |
|
|
"request_failed", |
|
|
method=method, |
|
|
path=path, |
|
|
error=str(e), |
|
|
error_type=type(e).__name__, |
|
|
duration_ms=round(duration * 1000, 2), |
|
|
correlation_id=corr_id, |
|
|
exc_info=True |
|
|
) |
|
|
|
|
|
raise |
|
|
|
|
|
|
|
|
class PerformanceLogger: |
|
|
"""Context manager for performance logging""" |
|
|
|
|
|
def __init__(self, operation: str, logger: Optional[structlog.stdlib.BoundLogger] = None): |
|
|
self.operation = operation |
|
|
self.logger = logger or get_logger() |
|
|
self.start_time = None |
|
|
|
|
|
def __enter__(self): |
|
|
self.start_time = time.time() |
|
|
self.logger.debug(f"{self.operation}_started") |
|
|
return self |
|
|
|
|
|
def __exit__(self, exc_type, exc_val, exc_tb): |
|
|
duration = time.time() - self.start_time |
|
|
duration_ms = round(duration * 1000, 2) |
|
|
|
|
|
if exc_type is None: |
|
|
self.logger.info( |
|
|
f"{self.operation}_completed", |
|
|
duration_ms=duration_ms |
|
|
) |
|
|
else: |
|
|
self.logger.error( |
|
|
f"{self.operation}_failed", |
|
|
duration_ms=duration_ms, |
|
|
error_type=exc_type.__name__, |
|
|
error=str(exc_val), |
|
|
exc_info=True |
|
|
) |
|
|
|
|
|
|
|
|
def log_mcp_call( |
|
|
logger: structlog.stdlib.BoundLogger, |
|
|
server: str, |
|
|
method: str, |
|
|
params: dict, |
|
|
result: any = None, |
|
|
error: Exception = None, |
|
|
duration_ms: float = None |
|
|
): |
|
|
""" |
|
|
Log MCP call with structured data |
|
|
|
|
|
Args: |
|
|
logger: Structured logger |
|
|
server: MCP server name (search, email, store, etc.) |
|
|
method: MCP method name |
|
|
params: Method parameters |
|
|
result: Method result (optional) |
|
|
error: Error if call failed (optional) |
|
|
duration_ms: Call duration in milliseconds (optional) |
|
|
""" |
|
|
log_data = { |
|
|
"mcp_server": server, |
|
|
"mcp_method": method, |
|
|
"mcp_params_keys": list(params.keys()) if params else [], |
|
|
} |
|
|
|
|
|
if duration_ms is not None: |
|
|
log_data["duration_ms"] = round(duration_ms, 2) |
|
|
|
|
|
if error: |
|
|
logger.error( |
|
|
"mcp_call_failed", |
|
|
**log_data, |
|
|
error=str(error), |
|
|
error_type=type(error).__name__ |
|
|
) |
|
|
else: |
|
|
logger.info( |
|
|
"mcp_call_success", |
|
|
**log_data, |
|
|
result_type=type(result).__name__ if result else None |
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
|
|
|
configure_logging(level="DEBUG", json_output=False) |
|
|
|
|
|
logger = get_logger(__name__) |
|
|
|
|
|
|
|
|
set_correlation_id("test-correlation-123") |
|
|
|
|
|
|
|
|
logger.info("Application started", version="1.0.0") |
|
|
logger.debug("Debug message", data={"key": "value"}) |
|
|
logger.warning("Warning message") |
|
|
|
|
|
try: |
|
|
raise ValueError("Test error") |
|
|
except Exception as e: |
|
|
logger.error("Error occurred", exc_info=True) |
|
|
|
|
|
|
|
|
with PerformanceLogger("database_query", logger): |
|
|
time.sleep(0.1) |
|
|
|