Spaces:
Sleeping
Sleeping
| #!/usr/bin/env python3 | |
| """ | |
| Monitor de Logs em Tempo Real para Desenvolvedores | |
| Este script monitora os logs em tempo real e alerta sobre | |
| problemas críticos, erros de importação e problemas de performance. | |
| Autor: Sistema de Análise de Mercado | |
| Versão: 1.0.0 | |
| """ | |
| import os | |
| import re | |
| import time | |
| import sqlite3 | |
| from datetime import datetime | |
| from typing import Dict, List, Optional | |
| import argparse | |
| from collections import deque | |
| class RealtimeLogMonitor: | |
| """Monitor de logs em tempo real""" | |
| def __init__(self, log_db_path: str = None, alert_threshold: int = 5): | |
| self.log_db_path = log_db_path or "logs/application.db" | |
| self.alert_threshold = alert_threshold | |
| self.last_check = datetime.now() | |
| self.error_buffer = deque(maxlen=100) # Buffer para últimos 100 erros | |
| # Padrões críticos para alertas | |
| self.critical_patterns = { | |
| 'import_error': re.compile(r'ImportError|ModuleNotFoundError|not defined', re.IGNORECASE), | |
| 'server_error': re.compile(r'server.*error|connection.*failed|timeout', re.IGNORECASE), | |
| 'memory_error': re.compile(r'MemoryError|OutOfMemory|memory.*exceeded', re.IGNORECASE), | |
| 'gradio_error': re.compile(r'gradio.*error|interface.*failed', re.IGNORECASE), | |
| 'performance_critical': re.compile(r'took ([5-9]\d+|\d{3,})\s*(ms|seconds?)', re.IGNORECASE) | |
| } | |
| # Cores para output no terminal | |
| self.colors = { | |
| 'red': '\033[91m', | |
| 'yellow': '\033[93m', | |
| 'green': '\033[92m', | |
| 'blue': '\033[94m', | |
| 'purple': '\033[95m', | |
| 'cyan': '\033[96m', | |
| 'white': '\033[97m', | |
| 'bold': '\033[1m', | |
| 'end': '\033[0m' | |
| } | |
| def connect_db(self) -> sqlite3.Connection: | |
| """Conecta ao banco de dados de logs""" | |
| if not os.path.exists(self.log_db_path): | |
| raise FileNotFoundError(f"Banco de dados de logs não encontrado: {self.log_db_path}") | |
| return sqlite3.connect(self.log_db_path) | |
| def colorize(self, text: str, color: str) -> str: | |
| """Adiciona cor ao texto""" | |
| return f"{self.colors.get(color, '')}{text}{self.colors['end']}" | |
| def check_new_logs(self) -> List[Dict]: | |
| """Verifica novos logs desde a última verificação""" | |
| try: | |
| conn = self.connect_db() | |
| cursor = conn.cursor() | |
| cursor.execute(""" | |
| SELECT timestamp, level, message, module | |
| FROM logs | |
| WHERE timestamp > ? | |
| ORDER BY timestamp ASC | |
| """, (self.last_check.isoformat(),)) | |
| new_logs = cursor.fetchall() | |
| conn.close() | |
| if new_logs: | |
| self.last_check = datetime.fromisoformat(new_logs[-1][0]) | |
| return [{ | |
| 'timestamp': log[0], | |
| 'level': log[1], | |
| 'message': log[2], | |
| 'module': log[3] | |
| } for log in new_logs] | |
| except Exception as e: | |
| print(self.colorize(f"❌ Erro ao verificar logs: {str(e)}", 'red')) | |
| return [] | |
| def analyze_log_entry(self, log: Dict) -> Optional[Dict]: | |
| """Analisa uma entrada de log para problemas críticos""" | |
| message = log['message'] | |
| alerts = [] | |
| # Verifica padrões críticos | |
| for pattern_name, pattern in self.critical_patterns.items(): | |
| if pattern.search(message): | |
| severity = self._get_severity(pattern_name, message) | |
| alerts.append({ | |
| 'type': pattern_name, | |
| 'severity': severity, | |
| 'message': message, | |
| 'module': log['module'], | |
| 'timestamp': log['timestamp'] | |
| }) | |
| # Verifica nível de log crítico | |
| if log['level'] in ['ERROR', 'CRITICAL']: | |
| alerts.append({ | |
| 'type': 'log_level_critical', | |
| 'severity': 'high' if log['level'] == 'CRITICAL' else 'medium', | |
| 'message': message, | |
| 'module': log['module'], | |
| 'timestamp': log['timestamp'] | |
| }) | |
| return alerts if alerts else None | |
| def _get_severity(self, pattern_name: str, message: str) -> str: | |
| """Determina a severidade do alerta""" | |
| high_severity = ['memory_error', 'server_error'] | |
| medium_severity = ['import_error', 'gradio_error'] | |
| if pattern_name in high_severity: | |
| return 'high' | |
| elif pattern_name in medium_severity: | |
| return 'medium' | |
| else: | |
| return 'low' | |
| def display_alert(self, alerts: List[Dict]): | |
| """Exibe alertas no terminal""" | |
| for alert in alerts: | |
| timestamp = datetime.fromisoformat(alert['timestamp']).strftime('%H:%M:%S') | |
| # Escolhe cor baseada na severidade | |
| if alert['severity'] == 'high': | |
| color = 'red' | |
| icon = '🚨' | |
| elif alert['severity'] == 'medium': | |
| color = 'yellow' | |
| icon = '⚠️' | |
| else: | |
| color = 'cyan' | |
| icon = 'ℹ️' | |
| print(f"{icon} {self.colorize('[' + timestamp + ']', 'white')} " | |
| f"{self.colorize(alert['type'].upper(), color)} " | |
| f"em {self.colorize(alert['module'], 'blue')}") | |
| # Exibe mensagem truncada | |
| msg = alert['message'][:100] + '...' if len(alert['message']) > 100 else alert['message'] | |
| print(f" {self.colorize(msg, 'white')}") | |
| print() | |
| def display_stats(self, logs_checked: int, alerts_count: int): | |
| """Exibe estatísticas do monitoramento""" | |
| current_time = datetime.now().strftime('%H:%M:%S') | |
| print(f"\r{self.colorize('[' + current_time + ']', 'green')} " | |
| f"Logs verificados: {logs_checked} | " | |
| f"Alertas: {self.colorize(str(alerts_count), 'yellow' if alerts_count > 0 else 'green')}", | |
| end='', flush=True) | |
| def run_monitor(self, interval: int = 5, verbose: bool = False): | |
| """Executa o monitor em tempo real""" | |
| print(self.colorize("🔍 Iniciando monitor de logs em tempo real...", 'green')) | |
| print(f"📊 Intervalo de verificação: {interval}s") | |
| print(f"📁 Banco de dados: {self.log_db_path}") | |
| print(f"🚨 Limite de alertas: {self.alert_threshold}") | |
| print("\n" + "="*60) | |
| print(self.colorize("Monitor ativo - Pressione Ctrl+C para parar", 'bold')) | |
| print("="*60 + "\n") | |
| total_logs_checked = 0 | |
| total_alerts = 0 | |
| try: | |
| while True: | |
| new_logs = self.check_new_logs() | |
| if new_logs: | |
| total_logs_checked += len(new_logs) | |
| for log in new_logs: | |
| if verbose: | |
| timestamp = datetime.fromisoformat(log['timestamp']).strftime('%H:%M:%S') | |
| level_color = { | |
| 'ERROR': 'red', | |
| 'WARNING': 'yellow', | |
| 'INFO': 'green', | |
| 'DEBUG': 'cyan' | |
| }.get(log['level'], 'white') | |
| print(f"[{timestamp}] {self.colorize(log['level'], level_color)} " | |
| f"{log['module']}: {log['message'][:80]}...") | |
| alerts = self.analyze_log_entry(log) | |
| if alerts: | |
| total_alerts += len(alerts) | |
| self.display_alert(alerts) | |
| # Adiciona ao buffer de erros | |
| for alert in alerts: | |
| self.error_buffer.append(alert) | |
| # Exibe estatísticas se não estiver em modo verbose | |
| if not verbose: | |
| self.display_stats(total_logs_checked, total_alerts) | |
| time.sleep(interval) | |
| except KeyboardInterrupt: | |
| print(f"\n\n{self.colorize('🛑 Monitor interrompido pelo usuário', 'yellow')}") | |
| print(f"📊 Resumo da sessão:") | |
| print(f" Logs verificados: {total_logs_checked}") | |
| print(f" Alertas gerados: {total_alerts}") | |
| if self.error_buffer: | |
| print(f"\n🔍 Últimos {len(self.error_buffer)} problemas detectados:") | |
| for i, error in enumerate(list(self.error_buffer)[-5:], 1): | |
| timestamp = datetime.fromisoformat(error['timestamp']).strftime('%H:%M:%S') | |
| print(f" {i}. [{timestamp}] {error['type']} em {error['module']}") | |
| except Exception as e: | |
| print(f"\n{self.colorize(f'❌ Erro inesperado: {str(e)}', 'red')}") | |
| def main(): | |
| """Função principal do script""" | |
| parser = argparse.ArgumentParser(description='Monitor de Logs em Tempo Real') | |
| parser.add_argument('--interval', type=int, default=5, help='Intervalo de verificação em segundos (padrão: 5)') | |
| parser.add_argument('--db-path', type=str, help='Caminho para o banco de dados de logs') | |
| parser.add_argument('--threshold', type=int, default=5, help='Limite de alertas por verificação (padrão: 5)') | |
| parser.add_argument('--verbose', action='store_true', help='Modo verboso (mostra todos os logs)') | |
| args = parser.parse_args() | |
| try: | |
| monitor = RealtimeLogMonitor(args.db_path, args.threshold) | |
| monitor.run_monitor(args.interval, args.verbose) | |
| except FileNotFoundError as e: | |
| print(f"❌ Arquivo não encontrado: {str(e)}") | |
| print("💡 Certifique-se de que a aplicação está rodando e gerando logs.") | |
| return 1 | |
| except Exception as e: | |
| print(f"❌ Erro inesperado: {str(e)}") | |
| return 1 | |
| if __name__ == "__main__": | |
| exit(main()) |