Spaces:
Sleeping
Sleeping
| import sqlite3 | |
| import json | |
| import datetime | |
| import threading | |
| import os | |
| from typing import Dict, Any, Optional, List | |
| from enum import Enum | |
| from dataclasses import dataclass, asdict | |
| from contextlib import contextmanager | |
| class LogLevel(Enum): | |
| """Níveis de log disponíveis""" | |
| DEBUG = "DEBUG" | |
| INFO = "INFO" | |
| WARNING = "WARNING" | |
| ERROR = "ERROR" | |
| CRITICAL = "CRITICAL" | |
| class LogCategory(Enum): | |
| """Categorias de log para organização""" | |
| SYSTEM = "SYSTEM" | |
| API = "API" | |
| AI_MODEL = "AI_MODEL" | |
| MARKET_ANALYSIS = "MARKET_ANALYSIS" | |
| SENTIMENT_ANALYSIS = "SENTIMENT_ANALYSIS" | |
| PERFORMANCE = "PERFORMANCE" | |
| USER_INTERACTION = "USER_INTERACTION" | |
| ERROR_TRACKING = "ERROR_TRACKING" | |
| SECURITY = "SECURITY" | |
| class LogEntry: | |
| """Estrutura de uma entrada de log""" | |
| timestamp: str | |
| level: str | |
| category: str | |
| message: str | |
| module: str | |
| function: str | |
| line_number: int | |
| user_id: Optional[str] = None | |
| session_id: Optional[str] = None | |
| request_id: Optional[str] = None | |
| metadata: Optional[Dict[str, Any]] = None | |
| stack_trace: Optional[str] = None | |
| execution_time: Optional[float] = None | |
| class DatabaseLogger: | |
| """Sistema de logging avançado com SQLite3""" | |
| def __init__(self, db_path: str = "logs/application.db"): | |
| self.db_path = db_path | |
| self._lock = threading.Lock() | |
| self._ensure_directory_exists() | |
| self._initialize_database() | |
| def _ensure_directory_exists(self): | |
| """Garante que o diretório do banco de dados existe""" | |
| os.makedirs(os.path.dirname(self.db_path), exist_ok=True) | |
| def _initialize_database(self): | |
| """Inicializa o banco de dados e cria as tabelas necessárias""" | |
| with self._get_connection() as conn: | |
| cursor = conn.cursor() | |
| # Tabela principal de logs | |
| cursor.execute(""" | |
| CREATE TABLE IF NOT EXISTS logs ( | |
| id INTEGER PRIMARY KEY AUTOINCREMENT, | |
| timestamp TEXT NOT NULL, | |
| level TEXT NOT NULL, | |
| category TEXT NOT NULL, | |
| message TEXT NOT NULL, | |
| module TEXT NOT NULL, | |
| function TEXT NOT NULL, | |
| line_number INTEGER NOT NULL, | |
| user_id TEXT, | |
| session_id TEXT, | |
| request_id TEXT, | |
| metadata TEXT, | |
| stack_trace TEXT, | |
| execution_time REAL, | |
| created_at DATETIME DEFAULT CURRENT_TIMESTAMP | |
| ) | |
| """) | |
| # Tabela para métricas de performance | |
| cursor.execute(""" | |
| CREATE TABLE IF NOT EXISTS performance_metrics ( | |
| id INTEGER PRIMARY KEY AUTOINCREMENT, | |
| timestamp TEXT NOT NULL, | |
| metric_name TEXT NOT NULL, | |
| metric_value REAL NOT NULL, | |
| unit TEXT, | |
| category TEXT, | |
| metadata TEXT, | |
| created_at DATETIME DEFAULT CURRENT_TIMESTAMP | |
| ) | |
| """) | |
| # Tabela para eventos do sistema | |
| cursor.execute(""" | |
| CREATE TABLE IF NOT EXISTS system_events ( | |
| id INTEGER PRIMARY KEY AUTOINCREMENT, | |
| timestamp TEXT NOT NULL, | |
| event_type TEXT NOT NULL, | |
| event_name TEXT NOT NULL, | |
| description TEXT, | |
| severity TEXT, | |
| metadata TEXT, | |
| created_at DATETIME DEFAULT CURRENT_TIMESTAMP | |
| ) | |
| """) | |
| # Índices para melhor performance | |
| cursor.execute("CREATE INDEX IF NOT EXISTS idx_logs_timestamp ON logs(timestamp)") | |
| cursor.execute("CREATE INDEX IF NOT EXISTS idx_logs_level ON logs(level)") | |
| cursor.execute("CREATE INDEX IF NOT EXISTS idx_logs_category ON logs(category)") | |
| cursor.execute("CREATE INDEX IF NOT EXISTS idx_logs_session ON logs(session_id)") | |
| cursor.execute("CREATE INDEX IF NOT EXISTS idx_performance_timestamp ON performance_metrics(timestamp)") | |
| cursor.execute("CREATE INDEX IF NOT EXISTS idx_events_timestamp ON system_events(timestamp)") | |
| conn.commit() | |
| def _get_connection(self): | |
| """Context manager para conexões com o banco de dados""" | |
| conn = sqlite3.connect(self.db_path, timeout=30.0) | |
| conn.row_factory = sqlite3.Row | |
| try: | |
| yield conn | |
| finally: | |
| conn.close() | |
| def log(self, level: LogLevel, category: LogCategory, message: str, | |
| module: str, function: str, line_number: int, | |
| user_id: Optional[str] = None, session_id: Optional[str] = None, | |
| request_id: Optional[str] = None, metadata: Optional[Dict[str, Any]] = None, | |
| stack_trace: Optional[str] = None, execution_time: Optional[float] = None): | |
| """Registra uma entrada de log no banco de dados""" | |
| log_entry = LogEntry( | |
| timestamp=datetime.datetime.now().isoformat(), | |
| level=level.value, | |
| category=category.value, | |
| message=message, | |
| module=module, | |
| function=function, | |
| line_number=line_number, | |
| user_id=user_id, | |
| session_id=session_id, | |
| request_id=request_id, | |
| metadata=json.dumps(metadata) if metadata else None, | |
| stack_trace=stack_trace, | |
| execution_time=execution_time | |
| ) | |
| with self._lock: | |
| with self._get_connection() as conn: | |
| cursor = conn.cursor() | |
| cursor.execute(""" | |
| INSERT INTO logs ( | |
| timestamp, level, category, message, module, function, | |
| line_number, user_id, session_id, request_id, metadata, | |
| stack_trace, execution_time | |
| ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) | |
| """, ( | |
| log_entry.timestamp, log_entry.level, log_entry.category, | |
| log_entry.message, log_entry.module, log_entry.function, | |
| log_entry.line_number, log_entry.user_id, log_entry.session_id, | |
| log_entry.request_id, log_entry.metadata, log_entry.stack_trace, | |
| log_entry.execution_time | |
| )) | |
| conn.commit() | |
| def log_performance_metric(self, metric_name: str, metric_value: float, | |
| unit: str = None, category: str = None, | |
| metadata: Optional[Dict[str, Any]] = None): | |
| """Registra uma métrica de performance""" | |
| with self._lock: | |
| with self._get_connection() as conn: | |
| cursor = conn.cursor() | |
| cursor.execute(""" | |
| INSERT INTO performance_metrics ( | |
| timestamp, metric_name, metric_value, unit, category, metadata | |
| ) VALUES (?, ?, ?, ?, ?, ?) | |
| """, ( | |
| datetime.datetime.now().isoformat(), | |
| metric_name, | |
| metric_value, | |
| unit, | |
| category, | |
| json.dumps(metadata) if metadata else None | |
| )) | |
| conn.commit() | |
| def log_system_event(self, event_type: str, event_name: str, | |
| description: str = None, severity: str = "INFO", | |
| metadata: Optional[Dict[str, Any]] = None): | |
| """Registra um evento do sistema""" | |
| with self._lock: | |
| with self._get_connection() as conn: | |
| cursor = conn.cursor() | |
| cursor.execute(""" | |
| INSERT INTO system_events ( | |
| timestamp, event_type, event_name, description, severity, metadata | |
| ) VALUES (?, ?, ?, ?, ?, ?) | |
| """, ( | |
| datetime.datetime.now().isoformat(), | |
| event_type, | |
| event_name, | |
| description, | |
| severity, | |
| json.dumps(metadata) if metadata else None | |
| )) | |
| conn.commit() | |
| def get_logs(self, level: Optional[str] = None, category: Optional[str] = None, | |
| start_time: Optional[str] = None, end_time: Optional[str] = None, | |
| limit: int = 100, offset: int = 0) -> List[Dict[str, Any]]: | |
| """Recupera logs do banco de dados com filtros""" | |
| query = "SELECT * FROM logs WHERE 1=1" | |
| params = [] | |
| if level: | |
| query += " AND level = ?" | |
| params.append(level) | |
| if category: | |
| query += " AND category = ?" | |
| params.append(category) | |
| if start_time: | |
| query += " AND timestamp >= ?" | |
| params.append(start_time) | |
| if end_time: | |
| query += " AND timestamp <= ?" | |
| params.append(end_time) | |
| query += " ORDER BY timestamp DESC LIMIT ? OFFSET ?" | |
| params.extend([limit, offset]) | |
| with self._get_connection() as conn: | |
| cursor = conn.cursor() | |
| cursor.execute(query, params) | |
| rows = cursor.fetchall() | |
| return [dict(row) for row in rows] | |
| def get_performance_metrics(self, metric_name: Optional[str] = None, | |
| start_time: Optional[str] = None, | |
| end_time: Optional[str] = None, | |
| limit: int = 100) -> List[Dict[str, Any]]: | |
| """Recupera métricas de performance""" | |
| query = "SELECT * FROM performance_metrics WHERE 1=1" | |
| params = [] | |
| if metric_name: | |
| query += " AND metric_name = ?" | |
| params.append(metric_name) | |
| if start_time: | |
| query += " AND timestamp >= ?" | |
| params.append(start_time) | |
| if end_time: | |
| query += " AND timestamp <= ?" | |
| params.append(end_time) | |
| query += " ORDER BY timestamp DESC LIMIT ?" | |
| params.append(limit) | |
| with self._get_connection() as conn: | |
| cursor = conn.cursor() | |
| cursor.execute(query, params) | |
| rows = cursor.fetchall() | |
| return [dict(row) for row in rows] | |
| def get_system_events(self, event_type: Optional[str] = None, | |
| severity: Optional[str] = None, | |
| start_time: Optional[str] = None, | |
| end_time: Optional[str] = None, | |
| limit: int = 100) -> List[Dict[str, Any]]: | |
| """Recupera eventos do sistema""" | |
| query = "SELECT * FROM system_events WHERE 1=1" | |
| params = [] | |
| if event_type: | |
| query += " AND event_type = ?" | |
| params.append(event_type) | |
| if severity: | |
| query += " AND severity = ?" | |
| params.append(severity) | |
| if start_time: | |
| query += " AND timestamp >= ?" | |
| params.append(start_time) | |
| if end_time: | |
| query += " AND timestamp <= ?" | |
| params.append(end_time) | |
| query += " ORDER BY timestamp DESC LIMIT ?" | |
| params.append(limit) | |
| with self._get_connection() as conn: | |
| cursor = conn.cursor() | |
| cursor.execute(query, params) | |
| rows = cursor.fetchall() | |
| return [dict(row) for row in rows] | |
| def cleanup_old_logs(self, days_to_keep: int = 30): | |
| """Remove logs antigos para manter o banco de dados otimizado""" | |
| cutoff_date = (datetime.datetime.now() - datetime.timedelta(days=days_to_keep)).isoformat() | |
| with self._lock: | |
| with self._get_connection() as conn: | |
| cursor = conn.cursor() | |
| # Remove logs antigos | |
| cursor.execute("DELETE FROM logs WHERE timestamp < ?", (cutoff_date,)) | |
| # Remove métricas antigas | |
| cursor.execute("DELETE FROM performance_metrics WHERE timestamp < ?", (cutoff_date,)) | |
| # Remove eventos antigos | |
| cursor.execute("DELETE FROM system_events WHERE timestamp < ?", (cutoff_date,)) | |
| # Otimiza o banco de dados | |
| cursor.execute("VACUUM") | |
| conn.commit() | |
| def get_statistics(self) -> Dict[str, Any]: | |
| """Retorna estatísticas do sistema de logging""" | |
| with self._get_connection() as conn: | |
| cursor = conn.cursor() | |
| # Contagem total de logs | |
| cursor.execute("SELECT COUNT(*) as total FROM logs") | |
| total_logs = cursor.fetchone()['total'] | |
| # Logs por nível | |
| cursor.execute(""" | |
| SELECT level, COUNT(*) as count | |
| FROM logs | |
| GROUP BY level | |
| ORDER BY count DESC | |
| """) | |
| logs_by_level = dict(cursor.fetchall()) | |
| # Logs por categoria | |
| cursor.execute(""" | |
| SELECT category, COUNT(*) as count | |
| FROM logs | |
| GROUP BY category | |
| ORDER BY count DESC | |
| """) | |
| logs_by_category = dict(cursor.fetchall()) | |
| # Logs das últimas 24 horas | |
| last_24h = (datetime.datetime.now() - datetime.timedelta(hours=24)).isoformat() | |
| cursor.execute("SELECT COUNT(*) as count FROM logs WHERE timestamp >= ?", (last_24h,)) | |
| logs_last_24h = cursor.fetchone()['count'] | |
| return { | |
| 'total_logs': total_logs, | |
| 'logs_by_level': logs_by_level, | |
| 'logs_by_category': logs_by_category, | |
| 'logs_last_24h': logs_last_24h, | |
| 'database_path': self.db_path | |
| } | |
| # Instância global do logger | |
| _logger_instance = None | |
| def get_logger() -> DatabaseLogger: | |
| """Retorna a instância global do logger""" | |
| global _logger_instance | |
| if _logger_instance is None: | |
| _logger_instance = DatabaseLogger() | |
| return _logger_instance | |
| def initialize_logger(db_path: str = "logs/application.db") -> DatabaseLogger: | |
| """Inicializa o logger com um caminho específico""" | |
| global _logger_instance | |
| _logger_instance = DatabaseLogger(db_path) | |
| return _logger_instance |