last_edit / server /error_handler.py
Moharek
Deploy Moharek GEO Platform
a74b879
"""
Error Handling & Logging System
- Centralized error tracking
- Error recovery mechanisms
- Audit trail for failures
"""
import logging
import json
import traceback
from datetime import datetime
from pathlib import Path
from typing import Optional, Dict, Any
import sqlite3
import os
# Setup logging
LOG_DIR = Path(os.environ.get('OUTPUT_DIR', './output')) / 'logs'
LOG_DIR.mkdir(parents=True, exist_ok=True)
# File handlers
error_log = logging.getLogger('error')
error_log.setLevel(logging.ERROR)
error_handler = logging.FileHandler(LOG_DIR / 'errors.log')
error_handler.setFormatter(logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s'))
error_log.addHandler(error_handler)
audit_log = logging.getLogger('audit')
audit_log.setLevel(logging.INFO)
audit_handler = logging.FileHandler(LOG_DIR / 'audit.log')
audit_handler.setFormatter(logging.Formatter('%(asctime)s - %(message)s'))
audit_log.addHandler(audit_handler)
perf_log = logging.getLogger('performance')
perf_log.setLevel(logging.INFO)
perf_handler = logging.FileHandler(LOG_DIR / 'performance.log')
perf_handler.setFormatter(logging.Formatter('%(asctime)s - %(message)s'))
perf_log.addHandler(perf_handler)
# Database for error tracking
ERROR_DB = LOG_DIR / 'errors.db'
def init_error_db():
"""Initialize error tracking database"""
conn = sqlite3.connect(str(ERROR_DB))
conn.execute("""CREATE TABLE IF NOT EXISTS errors (
id INTEGER PRIMARY KEY AUTOINCREMENT,
timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
error_type TEXT,
message TEXT,
traceback TEXT,
endpoint TEXT,
user_id INTEGER,
status_code INTEGER,
resolved BOOLEAN DEFAULT 0,
resolution TEXT,
retry_count INTEGER DEFAULT 0
)""")
conn.execute("""CREATE TABLE IF NOT EXISTS audit_trail (
id INTEGER PRIMARY KEY AUTOINCREMENT,
timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
user_id INTEGER,
action TEXT,
resource TEXT,
status TEXT,
details TEXT
)""")
conn.execute("""CREATE TABLE IF NOT EXISTS performance_metrics (
id INTEGER PRIMARY KEY AUTOINCREMENT,
timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
endpoint TEXT,
method TEXT,
response_time_ms FLOAT,
status_code INTEGER,
user_id INTEGER
)""")
conn.commit()
conn.close()
init_error_db()
class AppError(Exception):
"""Base application error"""
def __init__(self, message: str, status_code: int = 500, error_type: str = 'INTERNAL_ERROR'):
self.message = message
self.status_code = status_code
self.error_type = error_type
super().__init__(self.message)
class ValidationError(AppError):
"""Validation error"""
def __init__(self, message: str):
super().__init__(message, 400, 'VALIDATION_ERROR')
class AuthenticationError(AppError):
"""Authentication error"""
def __init__(self, message: str = 'Unauthorized'):
super().__init__(message, 401, 'AUTH_ERROR')
class AuthorizationError(AppError):
"""Authorization error"""
def __init__(self, message: str = 'Forbidden'):
super().__init__(message, 403, 'AUTHZ_ERROR')
class NotFoundError(AppError):
"""Resource not found"""
def __init__(self, resource: str):
super().__init__(f'{resource} not found', 404, 'NOT_FOUND')
class RateLimitError(AppError):
"""Rate limit exceeded"""
def __init__(self, message: str = 'Rate limit exceeded'):
super().__init__(message, 429, 'RATE_LIMIT')
def log_error(error_type: str, message: str, endpoint: str = '', user_id: Optional[int] = None,
status_code: int = 500, tb: Optional[str] = None) -> int:
"""Log error to database and file"""
error_log.error(f'{error_type}: {message}')
conn = sqlite3.connect(str(ERROR_DB))
cur = conn.execute("""INSERT INTO errors
(error_type, message, traceback, endpoint, user_id, status_code)
VALUES (?, ?, ?, ?, ?, ?)""",
(error_type, message, tb or traceback.format_exc(), endpoint, user_id, status_code))
error_id = cur.lastrowid
conn.commit()
conn.close()
return error_id
def log_audit(user_id: Optional[int], action: str, resource: str, status: str, details: str = ''):
"""Log audit trail"""
audit_log.info(f'User {user_id}: {action} {resource} - {status}')
conn = sqlite3.connect(str(ERROR_DB))
conn.execute("""INSERT INTO audit_trail
(user_id, action, resource, status, details)
VALUES (?, ?, ?, ?, ?)""",
(user_id, action, resource, status, details))
conn.commit()
conn.close()
def log_performance(endpoint: str, method: str, response_time_ms: float, status_code: int, user_id: Optional[int] = None):
"""Log performance metrics"""
perf_log.info(f'{method} {endpoint} - {response_time_ms:.2f}ms - {status_code}')
conn = sqlite3.connect(str(ERROR_DB))
conn.execute("""INSERT INTO performance_metrics
(endpoint, method, response_time_ms, status_code, user_id)
VALUES (?, ?, ?, ?, ?)""",
(endpoint, method, response_time_ms, status_code, user_id))
conn.commit()
conn.close()
def get_errors(limit: int = 50, unresolved_only: bool = False) -> list:
"""Get recent errors"""
conn = sqlite3.connect(str(ERROR_DB))
conn.row_factory = sqlite3.Row
query = 'SELECT * FROM errors'
if unresolved_only:
query += ' WHERE resolved = 0'
query += ' ORDER BY timestamp DESC LIMIT ?'
rows = conn.execute(query, (limit,)).fetchall()
conn.close()
return [dict(r) for r in rows]
def get_audit_trail(user_id: Optional[int] = None, limit: int = 100) -> list:
"""Get audit trail"""
conn = sqlite3.connect(str(ERROR_DB))
conn.row_factory = sqlite3.Row
if user_id:
rows = conn.execute('SELECT * FROM audit_trail WHERE user_id = ? ORDER BY timestamp DESC LIMIT ?',
(user_id, limit)).fetchall()
else:
rows = conn.execute('SELECT * FROM audit_trail ORDER BY timestamp DESC LIMIT ?', (limit,)).fetchall()
conn.close()
return [dict(r) for r in rows]
def get_performance_stats(endpoint: str = '', hours: int = 24) -> Dict[str, Any]:
"""Get performance statistics"""
conn = sqlite3.connect(str(ERROR_DB))
conn.row_factory = sqlite3.Row
query = """SELECT
endpoint, method,
COUNT(*) as total_requests,
AVG(response_time_ms) as avg_response_time,
MAX(response_time_ms) as max_response_time,
MIN(response_time_ms) as min_response_time,
SUM(CASE WHEN status_code >= 400 THEN 1 ELSE 0 END) as error_count
FROM performance_metrics
WHERE timestamp > datetime('now', '-' || ? || ' hours')"""
params = [hours]
if endpoint:
query += ' AND endpoint = ?'
params.append(endpoint)
query += ' GROUP BY endpoint, method'
rows = conn.execute(query, params).fetchall()
conn.close()
return [dict(r) for r in rows]
def resolve_error(error_id: int, resolution: str):
"""Mark error as resolved"""
conn = sqlite3.connect(str(ERROR_DB))
conn.execute('UPDATE errors SET resolved = 1, resolution = ? WHERE id = ?',
(resolution, error_id))
conn.commit()
conn.close()
def get_error_summary() -> Dict[str, Any]:
"""Get error summary statistics"""
conn = sqlite3.connect(str(ERROR_DB))
total = conn.execute('SELECT COUNT(*) FROM errors').fetchone()[0]
unresolved = conn.execute('SELECT COUNT(*) FROM errors WHERE resolved = 0').fetchone()[0]
by_type = conn.execute("""SELECT error_type, COUNT(*) as count
FROM errors GROUP BY error_type ORDER BY count DESC""").fetchall()
conn.close()
return {
'total_errors': total,
'unresolved': unresolved,
'by_type': [{'type': t[0], 'count': t[1]} for t in by_type]
}
def retry_failed_operation(func, max_retries: int = 3, backoff: float = 1.0):
"""Retry failed operation with exponential backoff"""
import time
for attempt in range(max_retries):
try:
return func()
except Exception as e:
if attempt == max_retries - 1:
raise
wait_time = backoff * (2 ** attempt)
perf_log.warning(f'Retry attempt {attempt + 1}/{max_retries} after {wait_time}s: {str(e)}')
time.sleep(wait_time)