#!/usr/bin/env python3 """ Monitor de Logs em Tempo Real para Desenvolvedores Este script monitora os logs em tempo real e alerta sobre problemas críticos, erros de importação e problemas de performance. Autor: Sistema de Análise de Mercado Versão: 1.0.0 """ import os import re import time import sqlite3 from datetime import datetime from typing import Dict, List, Optional import argparse from collections import deque class RealtimeLogMonitor: """Monitor de logs em tempo real""" def __init__(self, log_db_path: str = None, alert_threshold: int = 5): self.log_db_path = log_db_path or "logs/application.db" self.alert_threshold = alert_threshold self.last_check = datetime.now() self.error_buffer = deque(maxlen=100) # Buffer para últimos 100 erros # Padrões críticos para alertas self.critical_patterns = { 'import_error': re.compile(r'ImportError|ModuleNotFoundError|not defined', re.IGNORECASE), 'server_error': re.compile(r'server.*error|connection.*failed|timeout', re.IGNORECASE), 'memory_error': re.compile(r'MemoryError|OutOfMemory|memory.*exceeded', re.IGNORECASE), 'gradio_error': re.compile(r'gradio.*error|interface.*failed', re.IGNORECASE), 'performance_critical': re.compile(r'took ([5-9]\d+|\d{3,})\s*(ms|seconds?)', re.IGNORECASE) } # Cores para output no terminal self.colors = { 'red': '\033[91m', 'yellow': '\033[93m', 'green': '\033[92m', 'blue': '\033[94m', 'purple': '\033[95m', 'cyan': '\033[96m', 'white': '\033[97m', 'bold': '\033[1m', 'end': '\033[0m' } def connect_db(self) -> sqlite3.Connection: """Conecta ao banco de dados de logs""" if not os.path.exists(self.log_db_path): raise FileNotFoundError(f"Banco de dados de logs não encontrado: {self.log_db_path}") return sqlite3.connect(self.log_db_path) def colorize(self, text: str, color: str) -> str: """Adiciona cor ao texto""" return f"{self.colors.get(color, '')}{text}{self.colors['end']}" def check_new_logs(self) -> List[Dict]: """Verifica novos logs desde a última verificação""" try: conn = self.connect_db() cursor = conn.cursor() cursor.execute(""" SELECT timestamp, level, message, module FROM logs WHERE timestamp > ? ORDER BY timestamp ASC """, (self.last_check.isoformat(),)) new_logs = cursor.fetchall() conn.close() if new_logs: self.last_check = datetime.fromisoformat(new_logs[-1][0]) return [{ 'timestamp': log[0], 'level': log[1], 'message': log[2], 'module': log[3] } for log in new_logs] except Exception as e: print(self.colorize(f"❌ Erro ao verificar logs: {str(e)}", 'red')) return [] def analyze_log_entry(self, log: Dict) -> Optional[Dict]: """Analisa uma entrada de log para problemas críticos""" message = log['message'] alerts = [] # Verifica padrões críticos for pattern_name, pattern in self.critical_patterns.items(): if pattern.search(message): severity = self._get_severity(pattern_name, message) alerts.append({ 'type': pattern_name, 'severity': severity, 'message': message, 'module': log['module'], 'timestamp': log['timestamp'] }) # Verifica nível de log crítico if log['level'] in ['ERROR', 'CRITICAL']: alerts.append({ 'type': 'log_level_critical', 'severity': 'high' if log['level'] == 'CRITICAL' else 'medium', 'message': message, 'module': log['module'], 'timestamp': log['timestamp'] }) return alerts if alerts else None def _get_severity(self, pattern_name: str, message: str) -> str: """Determina a severidade do alerta""" high_severity = ['memory_error', 'server_error'] medium_severity = ['import_error', 'gradio_error'] if pattern_name in high_severity: return 'high' elif pattern_name in medium_severity: return 'medium' else: return 'low' def display_alert(self, alerts: List[Dict]): """Exibe alertas no terminal""" for alert in alerts: timestamp = datetime.fromisoformat(alert['timestamp']).strftime('%H:%M:%S') # Escolhe cor baseada na severidade if alert['severity'] == 'high': color = 'red' icon = '🚨' elif alert['severity'] == 'medium': color = 'yellow' icon = '⚠️' else: color = 'cyan' icon = 'ℹ️' print(f"{icon} {self.colorize('[' + timestamp + ']', 'white')} " f"{self.colorize(alert['type'].upper(), color)} " f"em {self.colorize(alert['module'], 'blue')}") # Exibe mensagem truncada msg = alert['message'][:100] + '...' if len(alert['message']) > 100 else alert['message'] print(f" {self.colorize(msg, 'white')}") print() def display_stats(self, logs_checked: int, alerts_count: int): """Exibe estatísticas do monitoramento""" current_time = datetime.now().strftime('%H:%M:%S') print(f"\r{self.colorize('[' + current_time + ']', 'green')} " f"Logs verificados: {logs_checked} | " f"Alertas: {self.colorize(str(alerts_count), 'yellow' if alerts_count > 0 else 'green')}", end='', flush=True) def run_monitor(self, interval: int = 5, verbose: bool = False): """Executa o monitor em tempo real""" print(self.colorize("🔍 Iniciando monitor de logs em tempo real...", 'green')) print(f"📊 Intervalo de verificação: {interval}s") print(f"📁 Banco de dados: {self.log_db_path}") print(f"🚨 Limite de alertas: {self.alert_threshold}") print("\n" + "="*60) print(self.colorize("Monitor ativo - Pressione Ctrl+C para parar", 'bold')) print("="*60 + "\n") total_logs_checked = 0 total_alerts = 0 try: while True: new_logs = self.check_new_logs() if new_logs: total_logs_checked += len(new_logs) for log in new_logs: if verbose: timestamp = datetime.fromisoformat(log['timestamp']).strftime('%H:%M:%S') level_color = { 'ERROR': 'red', 'WARNING': 'yellow', 'INFO': 'green', 'DEBUG': 'cyan' }.get(log['level'], 'white') print(f"[{timestamp}] {self.colorize(log['level'], level_color)} " f"{log['module']}: {log['message'][:80]}...") alerts = self.analyze_log_entry(log) if alerts: total_alerts += len(alerts) self.display_alert(alerts) # Adiciona ao buffer de erros for alert in alerts: self.error_buffer.append(alert) # Exibe estatísticas se não estiver em modo verbose if not verbose: self.display_stats(total_logs_checked, total_alerts) time.sleep(interval) except KeyboardInterrupt: print(f"\n\n{self.colorize('🛑 Monitor interrompido pelo usuário', 'yellow')}") print(f"📊 Resumo da sessão:") print(f" Logs verificados: {total_logs_checked}") print(f" Alertas gerados: {total_alerts}") if self.error_buffer: print(f"\n🔍 Últimos {len(self.error_buffer)} problemas detectados:") for i, error in enumerate(list(self.error_buffer)[-5:], 1): timestamp = datetime.fromisoformat(error['timestamp']).strftime('%H:%M:%S') print(f" {i}. [{timestamp}] {error['type']} em {error['module']}") except Exception as e: print(f"\n{self.colorize(f'❌ Erro inesperado: {str(e)}', 'red')}") def main(): """Função principal do script""" parser = argparse.ArgumentParser(description='Monitor de Logs em Tempo Real') parser.add_argument('--interval', type=int, default=5, help='Intervalo de verificação em segundos (padrão: 5)') parser.add_argument('--db-path', type=str, help='Caminho para o banco de dados de logs') parser.add_argument('--threshold', type=int, default=5, help='Limite de alertas por verificação (padrão: 5)') parser.add_argument('--verbose', action='store_true', help='Modo verboso (mostra todos os logs)') args = parser.parse_args() try: monitor = RealtimeLogMonitor(args.db_path, args.threshold) monitor.run_monitor(args.interval, args.verbose) except FileNotFoundError as e: print(f"❌ Arquivo não encontrado: {str(e)}") print("💡 Certifique-se de que a aplicação está rodando e gerando logs.") return 1 except Exception as e: print(f"❌ Erro inesperado: {str(e)}") return 1 if __name__ == "__main__": exit(main())