""" Error Handling & Logging System - Centralized error tracking - Error recovery mechanisms - Audit trail for failures """ import logging import json import traceback from datetime import datetime from pathlib import Path from typing import Optional, Dict, Any import sqlite3 import os # Setup logging LOG_DIR = Path(os.environ.get('OUTPUT_DIR', './output')) / 'logs' LOG_DIR.mkdir(parents=True, exist_ok=True) # File handlers error_log = logging.getLogger('error') error_log.setLevel(logging.ERROR) error_handler = logging.FileHandler(LOG_DIR / 'errors.log') error_handler.setFormatter(logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')) error_log.addHandler(error_handler) audit_log = logging.getLogger('audit') audit_log.setLevel(logging.INFO) audit_handler = logging.FileHandler(LOG_DIR / 'audit.log') audit_handler.setFormatter(logging.Formatter('%(asctime)s - %(message)s')) audit_log.addHandler(audit_handler) perf_log = logging.getLogger('performance') perf_log.setLevel(logging.INFO) perf_handler = logging.FileHandler(LOG_DIR / 'performance.log') perf_handler.setFormatter(logging.Formatter('%(asctime)s - %(message)s')) perf_log.addHandler(perf_handler) # Database for error tracking ERROR_DB = LOG_DIR / 'errors.db' def init_error_db(): """Initialize error tracking database""" conn = sqlite3.connect(str(ERROR_DB)) conn.execute("""CREATE TABLE IF NOT EXISTS errors ( id INTEGER PRIMARY KEY AUTOINCREMENT, timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP, error_type TEXT, message TEXT, traceback TEXT, endpoint TEXT, user_id INTEGER, status_code INTEGER, resolved BOOLEAN DEFAULT 0, resolution TEXT, retry_count INTEGER DEFAULT 0 )""") conn.execute("""CREATE TABLE IF NOT EXISTS audit_trail ( id INTEGER PRIMARY KEY AUTOINCREMENT, timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP, user_id INTEGER, action TEXT, resource TEXT, status TEXT, details TEXT )""") conn.execute("""CREATE TABLE IF NOT EXISTS performance_metrics ( id INTEGER PRIMARY KEY AUTOINCREMENT, timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP, endpoint TEXT, method TEXT, response_time_ms FLOAT, status_code INTEGER, user_id INTEGER )""") conn.commit() conn.close() init_error_db() class AppError(Exception): """Base application error""" def __init__(self, message: str, status_code: int = 500, error_type: str = 'INTERNAL_ERROR'): self.message = message self.status_code = status_code self.error_type = error_type super().__init__(self.message) class ValidationError(AppError): """Validation error""" def __init__(self, message: str): super().__init__(message, 400, 'VALIDATION_ERROR') class AuthenticationError(AppError): """Authentication error""" def __init__(self, message: str = 'Unauthorized'): super().__init__(message, 401, 'AUTH_ERROR') class AuthorizationError(AppError): """Authorization error""" def __init__(self, message: str = 'Forbidden'): super().__init__(message, 403, 'AUTHZ_ERROR') class NotFoundError(AppError): """Resource not found""" def __init__(self, resource: str): super().__init__(f'{resource} not found', 404, 'NOT_FOUND') class RateLimitError(AppError): """Rate limit exceeded""" def __init__(self, message: str = 'Rate limit exceeded'): super().__init__(message, 429, 'RATE_LIMIT') def log_error(error_type: str, message: str, endpoint: str = '', user_id: Optional[int] = None, status_code: int = 500, tb: Optional[str] = None) -> int: """Log error to database and file""" error_log.error(f'{error_type}: {message}') conn = sqlite3.connect(str(ERROR_DB)) cur = conn.execute("""INSERT INTO errors (error_type, message, traceback, endpoint, user_id, status_code) VALUES (?, ?, ?, ?, ?, ?)""", (error_type, message, tb or traceback.format_exc(), endpoint, user_id, status_code)) error_id = cur.lastrowid conn.commit() conn.close() return error_id def log_audit(user_id: Optional[int], action: str, resource: str, status: str, details: str = ''): """Log audit trail""" audit_log.info(f'User {user_id}: {action} {resource} - {status}') conn = sqlite3.connect(str(ERROR_DB)) conn.execute("""INSERT INTO audit_trail (user_id, action, resource, status, details) VALUES (?, ?, ?, ?, ?)""", (user_id, action, resource, status, details)) conn.commit() conn.close() def log_performance(endpoint: str, method: str, response_time_ms: float, status_code: int, user_id: Optional[int] = None): """Log performance metrics""" perf_log.info(f'{method} {endpoint} - {response_time_ms:.2f}ms - {status_code}') conn = sqlite3.connect(str(ERROR_DB)) conn.execute("""INSERT INTO performance_metrics (endpoint, method, response_time_ms, status_code, user_id) VALUES (?, ?, ?, ?, ?)""", (endpoint, method, response_time_ms, status_code, user_id)) conn.commit() conn.close() def get_errors(limit: int = 50, unresolved_only: bool = False) -> list: """Get recent errors""" conn = sqlite3.connect(str(ERROR_DB)) conn.row_factory = sqlite3.Row query = 'SELECT * FROM errors' if unresolved_only: query += ' WHERE resolved = 0' query += ' ORDER BY timestamp DESC LIMIT ?' rows = conn.execute(query, (limit,)).fetchall() conn.close() return [dict(r) for r in rows] def get_audit_trail(user_id: Optional[int] = None, limit: int = 100) -> list: """Get audit trail""" conn = sqlite3.connect(str(ERROR_DB)) conn.row_factory = sqlite3.Row if user_id: rows = conn.execute('SELECT * FROM audit_trail WHERE user_id = ? ORDER BY timestamp DESC LIMIT ?', (user_id, limit)).fetchall() else: rows = conn.execute('SELECT * FROM audit_trail ORDER BY timestamp DESC LIMIT ?', (limit,)).fetchall() conn.close() return [dict(r) for r in rows] def get_performance_stats(endpoint: str = '', hours: int = 24) -> Dict[str, Any]: """Get performance statistics""" conn = sqlite3.connect(str(ERROR_DB)) conn.row_factory = sqlite3.Row query = """SELECT endpoint, method, COUNT(*) as total_requests, AVG(response_time_ms) as avg_response_time, MAX(response_time_ms) as max_response_time, MIN(response_time_ms) as min_response_time, SUM(CASE WHEN status_code >= 400 THEN 1 ELSE 0 END) as error_count FROM performance_metrics WHERE timestamp > datetime('now', '-' || ? || ' hours')""" params = [hours] if endpoint: query += ' AND endpoint = ?' params.append(endpoint) query += ' GROUP BY endpoint, method' rows = conn.execute(query, params).fetchall() conn.close() return [dict(r) for r in rows] def resolve_error(error_id: int, resolution: str): """Mark error as resolved""" conn = sqlite3.connect(str(ERROR_DB)) conn.execute('UPDATE errors SET resolved = 1, resolution = ? WHERE id = ?', (resolution, error_id)) conn.commit() conn.close() def get_error_summary() -> Dict[str, Any]: """Get error summary statistics""" conn = sqlite3.connect(str(ERROR_DB)) total = conn.execute('SELECT COUNT(*) FROM errors').fetchone()[0] unresolved = conn.execute('SELECT COUNT(*) FROM errors WHERE resolved = 0').fetchone()[0] by_type = conn.execute("""SELECT error_type, COUNT(*) as count FROM errors GROUP BY error_type ORDER BY count DESC""").fetchall() conn.close() return { 'total_errors': total, 'unresolved': unresolved, 'by_type': [{'type': t[0], 'count': t[1]} for t in by_type] } def retry_failed_operation(func, max_retries: int = 3, backoff: float = 1.0): """Retry failed operation with exponential backoff""" import time for attempt in range(max_retries): try: return func() except Exception as e: if attempt == max_retries - 1: raise wait_time = backoff * (2 ** attempt) perf_log.warning(f'Retry attempt {attempt + 1}/{max_retries} after {wait_time}s: {str(e)}') time.sleep(wait_time)