| """ |
| Error Handling & Logging System |
| - Centralized error tracking |
| - Error recovery mechanisms |
| - Audit trail for failures |
| """ |
|
|
| import logging |
| import json |
| import traceback |
| from datetime import datetime |
| from pathlib import Path |
| from typing import Optional, Dict, Any |
| import sqlite3 |
| import os |
|
|
| |
| LOG_DIR = Path(os.environ.get('OUTPUT_DIR', './output')) / 'logs' |
| LOG_DIR.mkdir(parents=True, exist_ok=True) |
|
|
| |
| error_log = logging.getLogger('error') |
| error_log.setLevel(logging.ERROR) |
| error_handler = logging.FileHandler(LOG_DIR / 'errors.log') |
| error_handler.setFormatter(logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')) |
| error_log.addHandler(error_handler) |
|
|
| audit_log = logging.getLogger('audit') |
| audit_log.setLevel(logging.INFO) |
| audit_handler = logging.FileHandler(LOG_DIR / 'audit.log') |
| audit_handler.setFormatter(logging.Formatter('%(asctime)s - %(message)s')) |
| audit_log.addHandler(audit_handler) |
|
|
| perf_log = logging.getLogger('performance') |
| perf_log.setLevel(logging.INFO) |
| perf_handler = logging.FileHandler(LOG_DIR / 'performance.log') |
| perf_handler.setFormatter(logging.Formatter('%(asctime)s - %(message)s')) |
| perf_log.addHandler(perf_handler) |
|
|
| |
| ERROR_DB = LOG_DIR / 'errors.db' |
|
|
| def init_error_db(): |
| """Initialize error tracking database""" |
| conn = sqlite3.connect(str(ERROR_DB)) |
| conn.execute("""CREATE TABLE IF NOT EXISTS errors ( |
| id INTEGER PRIMARY KEY AUTOINCREMENT, |
| timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP, |
| error_type TEXT, |
| message TEXT, |
| traceback TEXT, |
| endpoint TEXT, |
| user_id INTEGER, |
| status_code INTEGER, |
| resolved BOOLEAN DEFAULT 0, |
| resolution TEXT, |
| retry_count INTEGER DEFAULT 0 |
| )""") |
| |
| conn.execute("""CREATE TABLE IF NOT EXISTS audit_trail ( |
| id INTEGER PRIMARY KEY AUTOINCREMENT, |
| timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP, |
| user_id INTEGER, |
| action TEXT, |
| resource TEXT, |
| status TEXT, |
| details TEXT |
| )""") |
| |
| conn.execute("""CREATE TABLE IF NOT EXISTS performance_metrics ( |
| id INTEGER PRIMARY KEY AUTOINCREMENT, |
| timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP, |
| endpoint TEXT, |
| method TEXT, |
| response_time_ms FLOAT, |
| status_code INTEGER, |
| user_id INTEGER |
| )""") |
| |
| conn.commit() |
| conn.close() |
|
|
| init_error_db() |
|
|
| class AppError(Exception): |
| """Base application error""" |
| def __init__(self, message: str, status_code: int = 500, error_type: str = 'INTERNAL_ERROR'): |
| self.message = message |
| self.status_code = status_code |
| self.error_type = error_type |
| super().__init__(self.message) |
|
|
| class ValidationError(AppError): |
| """Validation error""" |
| def __init__(self, message: str): |
| super().__init__(message, 400, 'VALIDATION_ERROR') |
|
|
| class AuthenticationError(AppError): |
| """Authentication error""" |
| def __init__(self, message: str = 'Unauthorized'): |
| super().__init__(message, 401, 'AUTH_ERROR') |
|
|
| class AuthorizationError(AppError): |
| """Authorization error""" |
| def __init__(self, message: str = 'Forbidden'): |
| super().__init__(message, 403, 'AUTHZ_ERROR') |
|
|
| class NotFoundError(AppError): |
| """Resource not found""" |
| def __init__(self, resource: str): |
| super().__init__(f'{resource} not found', 404, 'NOT_FOUND') |
|
|
| class RateLimitError(AppError): |
| """Rate limit exceeded""" |
| def __init__(self, message: str = 'Rate limit exceeded'): |
| super().__init__(message, 429, 'RATE_LIMIT') |
|
|
| def log_error(error_type: str, message: str, endpoint: str = '', user_id: Optional[int] = None, |
| status_code: int = 500, tb: Optional[str] = None) -> int: |
| """Log error to database and file""" |
| error_log.error(f'{error_type}: {message}') |
| |
| conn = sqlite3.connect(str(ERROR_DB)) |
| cur = conn.execute("""INSERT INTO errors |
| (error_type, message, traceback, endpoint, user_id, status_code) |
| VALUES (?, ?, ?, ?, ?, ?)""", |
| (error_type, message, tb or traceback.format_exc(), endpoint, user_id, status_code)) |
| error_id = cur.lastrowid |
| conn.commit() |
| conn.close() |
| |
| return error_id |
|
|
| def log_audit(user_id: Optional[int], action: str, resource: str, status: str, details: str = ''): |
| """Log audit trail""" |
| audit_log.info(f'User {user_id}: {action} {resource} - {status}') |
| |
| conn = sqlite3.connect(str(ERROR_DB)) |
| conn.execute("""INSERT INTO audit_trail |
| (user_id, action, resource, status, details) |
| VALUES (?, ?, ?, ?, ?)""", |
| (user_id, action, resource, status, details)) |
| conn.commit() |
| conn.close() |
|
|
| def log_performance(endpoint: str, method: str, response_time_ms: float, status_code: int, user_id: Optional[int] = None): |
| """Log performance metrics""" |
| perf_log.info(f'{method} {endpoint} - {response_time_ms:.2f}ms - {status_code}') |
| |
| conn = sqlite3.connect(str(ERROR_DB)) |
| conn.execute("""INSERT INTO performance_metrics |
| (endpoint, method, response_time_ms, status_code, user_id) |
| VALUES (?, ?, ?, ?, ?)""", |
| (endpoint, method, response_time_ms, status_code, user_id)) |
| conn.commit() |
| conn.close() |
|
|
| def get_errors(limit: int = 50, unresolved_only: bool = False) -> list: |
| """Get recent errors""" |
| conn = sqlite3.connect(str(ERROR_DB)) |
| conn.row_factory = sqlite3.Row |
| |
| query = 'SELECT * FROM errors' |
| if unresolved_only: |
| query += ' WHERE resolved = 0' |
| query += ' ORDER BY timestamp DESC LIMIT ?' |
| |
| rows = conn.execute(query, (limit,)).fetchall() |
| conn.close() |
| |
| return [dict(r) for r in rows] |
|
|
| def get_audit_trail(user_id: Optional[int] = None, limit: int = 100) -> list: |
| """Get audit trail""" |
| conn = sqlite3.connect(str(ERROR_DB)) |
| conn.row_factory = sqlite3.Row |
| |
| if user_id: |
| rows = conn.execute('SELECT * FROM audit_trail WHERE user_id = ? ORDER BY timestamp DESC LIMIT ?', |
| (user_id, limit)).fetchall() |
| else: |
| rows = conn.execute('SELECT * FROM audit_trail ORDER BY timestamp DESC LIMIT ?', (limit,)).fetchall() |
| |
| conn.close() |
| return [dict(r) for r in rows] |
|
|
| def get_performance_stats(endpoint: str = '', hours: int = 24) -> Dict[str, Any]: |
| """Get performance statistics""" |
| conn = sqlite3.connect(str(ERROR_DB)) |
| conn.row_factory = sqlite3.Row |
| |
| query = """SELECT |
| endpoint, method, |
| COUNT(*) as total_requests, |
| AVG(response_time_ms) as avg_response_time, |
| MAX(response_time_ms) as max_response_time, |
| MIN(response_time_ms) as min_response_time, |
| SUM(CASE WHEN status_code >= 400 THEN 1 ELSE 0 END) as error_count |
| FROM performance_metrics |
| WHERE timestamp > datetime('now', '-' || ? || ' hours')""" |
| |
| params = [hours] |
| |
| if endpoint: |
| query += ' AND endpoint = ?' |
| params.append(endpoint) |
| |
| query += ' GROUP BY endpoint, method' |
| |
| rows = conn.execute(query, params).fetchall() |
| conn.close() |
| |
| return [dict(r) for r in rows] |
|
|
| def resolve_error(error_id: int, resolution: str): |
| """Mark error as resolved""" |
| conn = sqlite3.connect(str(ERROR_DB)) |
| conn.execute('UPDATE errors SET resolved = 1, resolution = ? WHERE id = ?', |
| (resolution, error_id)) |
| conn.commit() |
| conn.close() |
|
|
| def get_error_summary() -> Dict[str, Any]: |
| """Get error summary statistics""" |
| conn = sqlite3.connect(str(ERROR_DB)) |
| |
| total = conn.execute('SELECT COUNT(*) FROM errors').fetchone()[0] |
| unresolved = conn.execute('SELECT COUNT(*) FROM errors WHERE resolved = 0').fetchone()[0] |
| by_type = conn.execute("""SELECT error_type, COUNT(*) as count |
| FROM errors GROUP BY error_type ORDER BY count DESC""").fetchall() |
| |
| conn.close() |
| |
| return { |
| 'total_errors': total, |
| 'unresolved': unresolved, |
| 'by_type': [{'type': t[0], 'count': t[1]} for t in by_type] |
| } |
|
|
| def retry_failed_operation(func, max_retries: int = 3, backoff: float = 1.0): |
| """Retry failed operation with exponential backoff""" |
| import time |
| |
| for attempt in range(max_retries): |
| try: |
| return func() |
| except Exception as e: |
| if attempt == max_retries - 1: |
| raise |
| wait_time = backoff * (2 ** attempt) |
| perf_log.warning(f'Retry attempt {attempt + 1}/{max_retries} after {wait_time}s: {str(e)}') |
| time.sleep(wait_time) |
|
|