File size: 5,295 Bytes
9e5fa5b | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 | """Structured logging utilities for the trading system.
Provides JSON-formatted logging with correlation IDs for tracing
requests across services.
"""
import logging
import json
import time
import uuid
import threading
from functools import wraps
# Thread-local storage for correlation ID
_context = threading.local()
def get_correlation_id():
"""Get current correlation ID, or generate a new one."""
if not hasattr(_context, 'correlation_id') or _context.correlation_id is None:
_context.correlation_id = str(uuid.uuid4())[:8]
return _context.correlation_id
def set_correlation_id(correlation_id):
"""Set correlation ID for current thread."""
_context.correlation_id = correlation_id
def clear_correlation_id():
"""Clear correlation ID for current thread."""
_context.correlation_id = None
class JSONFormatter(logging.Formatter):
"""JSON log formatter for structured logging."""
def format(self, record):
log_entry = {
"timestamp": time.strftime("%Y-%m-%dT%H:%M:%S", time.gmtime(record.created)),
"level": record.levelname,
"logger": record.name,
"message": record.getMessage(),
"correlation_id": get_correlation_id(),
}
# Add component if set
if hasattr(record, 'component'):
log_entry["component"] = record.component
# Add extra fields
if hasattr(record, 'extra_data') and record.extra_data:
log_entry.update(record.extra_data)
# Add exception info if present
if record.exc_info:
log_entry["exception"] = self.formatException(record.exc_info)
# Add source location for errors
if record.levelno >= logging.ERROR:
log_entry["source"] = {
"file": record.filename,
"line": record.lineno,
"function": record.funcName
}
return json.dumps(log_entry)
def setup_logging(component_name, level=logging.INFO, json_format=True):
"""Configure logging for a service component.
Args:
component_name: Name of the service (e.g., "matcher", "dashboard")
level: Logging level
json_format: Use JSON formatting (True) or standard format (False)
Returns:
Logger instance
"""
logger = logging.getLogger(component_name)
logger.setLevel(level)
# Remove existing handlers
logger.handlers = []
# Create console handler
handler = logging.StreamHandler()
handler.setLevel(level)
if json_format:
handler.setFormatter(JSONFormatter())
else:
handler.setFormatter(logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s'
))
logger.addHandler(handler)
return logger
class StructuredLogger:
"""Convenience wrapper for structured logging."""
def __init__(self, component_name, json_format=True):
self.logger = setup_logging(component_name, json_format=json_format)
self.component = component_name
def _log(self, level, message, **extra):
"""Internal logging method with extra data."""
record = self.logger.makeRecord(
self.logger.name, level, "", 0, message, (), None
)
record.component = self.component
record.extra_data = extra if extra else None
self.logger.handle(record)
def info(self, message, **extra):
self._log(logging.INFO, message, **extra)
def debug(self, message, **extra):
self._log(logging.DEBUG, message, **extra)
def warning(self, message, **extra):
self._log(logging.WARNING, message, **extra)
def error(self, message, **extra):
self._log(logging.ERROR, message, **extra)
def order_received(self, order_id, symbol, side, quantity, price, source=None):
"""Log order received event."""
self.info("order_received", event="order_received",
order_id=order_id, symbol=symbol, side=side,
quantity=quantity, price=price, source=source)
def trade_executed(self, trade_id, symbol, price, quantity, buy_id, sell_id):
"""Log trade execution event."""
self.info("trade_executed", event="trade_executed",
trade_id=trade_id, symbol=symbol, price=price,
quantity=quantity, buy_id=buy_id, sell_id=sell_id)
def order_cancelled(self, order_id, reason=None):
"""Log order cancellation event."""
self.info("order_cancelled", event="order_cancelled",
order_id=order_id, reason=reason)
def with_correlation_id(func):
"""Decorator to set correlation ID for request handling."""
@wraps(func)
def wrapper(*args, **kwargs):
# Check for correlation ID in headers (for Flask)
try:
from flask import request
correlation_id = request.headers.get('X-Correlation-ID')
if correlation_id:
set_correlation_id(correlation_id)
else:
set_correlation_id(str(uuid.uuid4())[:8])
except:
set_correlation_id(str(uuid.uuid4())[:8])
try:
return func(*args, **kwargs)
finally:
clear_correlation_id()
return wrapper
|