teste / scripts /log_analysis /realtime_monitor.py
torxyton's picture
feat: Implementa estrutura completa de testes com pytest
b9c68d4
#!/usr/bin/env python3
"""
Monitor de Logs em Tempo Real para Desenvolvedores
Este script monitora os logs em tempo real e alerta sobre
problemas críticos, erros de importação e problemas de performance.
Autor: Sistema de Análise de Mercado
Versão: 1.0.0
"""
import os
import re
import time
import sqlite3
from datetime import datetime
from typing import Dict, List, Optional
import argparse
from collections import deque
class RealtimeLogMonitor:
"""Monitor de logs em tempo real"""
def __init__(self, log_db_path: str = None, alert_threshold: int = 5):
self.log_db_path = log_db_path or "logs/application.db"
self.alert_threshold = alert_threshold
self.last_check = datetime.now()
self.error_buffer = deque(maxlen=100) # Buffer para últimos 100 erros
# Padrões críticos para alertas
self.critical_patterns = {
'import_error': re.compile(r'ImportError|ModuleNotFoundError|not defined', re.IGNORECASE),
'server_error': re.compile(r'server.*error|connection.*failed|timeout', re.IGNORECASE),
'memory_error': re.compile(r'MemoryError|OutOfMemory|memory.*exceeded', re.IGNORECASE),
'gradio_error': re.compile(r'gradio.*error|interface.*failed', re.IGNORECASE),
'performance_critical': re.compile(r'took ([5-9]\d+|\d{3,})\s*(ms|seconds?)', re.IGNORECASE)
}
# Cores para output no terminal
self.colors = {
'red': '\033[91m',
'yellow': '\033[93m',
'green': '\033[92m',
'blue': '\033[94m',
'purple': '\033[95m',
'cyan': '\033[96m',
'white': '\033[97m',
'bold': '\033[1m',
'end': '\033[0m'
}
def connect_db(self) -> sqlite3.Connection:
"""Conecta ao banco de dados de logs"""
if not os.path.exists(self.log_db_path):
raise FileNotFoundError(f"Banco de dados de logs não encontrado: {self.log_db_path}")
return sqlite3.connect(self.log_db_path)
def colorize(self, text: str, color: str) -> str:
"""Adiciona cor ao texto"""
return f"{self.colors.get(color, '')}{text}{self.colors['end']}"
def check_new_logs(self) -> List[Dict]:
"""Verifica novos logs desde a última verificação"""
try:
conn = self.connect_db()
cursor = conn.cursor()
cursor.execute("""
SELECT timestamp, level, message, module
FROM logs
WHERE timestamp > ?
ORDER BY timestamp ASC
""", (self.last_check.isoformat(),))
new_logs = cursor.fetchall()
conn.close()
if new_logs:
self.last_check = datetime.fromisoformat(new_logs[-1][0])
return [{
'timestamp': log[0],
'level': log[1],
'message': log[2],
'module': log[3]
} for log in new_logs]
except Exception as e:
print(self.colorize(f"❌ Erro ao verificar logs: {str(e)}", 'red'))
return []
def analyze_log_entry(self, log: Dict) -> Optional[Dict]:
"""Analisa uma entrada de log para problemas críticos"""
message = log['message']
alerts = []
# Verifica padrões críticos
for pattern_name, pattern in self.critical_patterns.items():
if pattern.search(message):
severity = self._get_severity(pattern_name, message)
alerts.append({
'type': pattern_name,
'severity': severity,
'message': message,
'module': log['module'],
'timestamp': log['timestamp']
})
# Verifica nível de log crítico
if log['level'] in ['ERROR', 'CRITICAL']:
alerts.append({
'type': 'log_level_critical',
'severity': 'high' if log['level'] == 'CRITICAL' else 'medium',
'message': message,
'module': log['module'],
'timestamp': log['timestamp']
})
return alerts if alerts else None
def _get_severity(self, pattern_name: str, message: str) -> str:
"""Determina a severidade do alerta"""
high_severity = ['memory_error', 'server_error']
medium_severity = ['import_error', 'gradio_error']
if pattern_name in high_severity:
return 'high'
elif pattern_name in medium_severity:
return 'medium'
else:
return 'low'
def display_alert(self, alerts: List[Dict]):
"""Exibe alertas no terminal"""
for alert in alerts:
timestamp = datetime.fromisoformat(alert['timestamp']).strftime('%H:%M:%S')
# Escolhe cor baseada na severidade
if alert['severity'] == 'high':
color = 'red'
icon = '🚨'
elif alert['severity'] == 'medium':
color = 'yellow'
icon = '⚠️'
else:
color = 'cyan'
icon = 'ℹ️'
print(f"{icon} {self.colorize('[' + timestamp + ']', 'white')} "
f"{self.colorize(alert['type'].upper(), color)} "
f"em {self.colorize(alert['module'], 'blue')}")
# Exibe mensagem truncada
msg = alert['message'][:100] + '...' if len(alert['message']) > 100 else alert['message']
print(f" {self.colorize(msg, 'white')}")
print()
def display_stats(self, logs_checked: int, alerts_count: int):
"""Exibe estatísticas do monitoramento"""
current_time = datetime.now().strftime('%H:%M:%S')
print(f"\r{self.colorize('[' + current_time + ']', 'green')} "
f"Logs verificados: {logs_checked} | "
f"Alertas: {self.colorize(str(alerts_count), 'yellow' if alerts_count > 0 else 'green')}",
end='', flush=True)
def run_monitor(self, interval: int = 5, verbose: bool = False):
"""Executa o monitor em tempo real"""
print(self.colorize("🔍 Iniciando monitor de logs em tempo real...", 'green'))
print(f"📊 Intervalo de verificação: {interval}s")
print(f"📁 Banco de dados: {self.log_db_path}")
print(f"🚨 Limite de alertas: {self.alert_threshold}")
print("\n" + "="*60)
print(self.colorize("Monitor ativo - Pressione Ctrl+C para parar", 'bold'))
print("="*60 + "\n")
total_logs_checked = 0
total_alerts = 0
try:
while True:
new_logs = self.check_new_logs()
if new_logs:
total_logs_checked += len(new_logs)
for log in new_logs:
if verbose:
timestamp = datetime.fromisoformat(log['timestamp']).strftime('%H:%M:%S')
level_color = {
'ERROR': 'red',
'WARNING': 'yellow',
'INFO': 'green',
'DEBUG': 'cyan'
}.get(log['level'], 'white')
print(f"[{timestamp}] {self.colorize(log['level'], level_color)} "
f"{log['module']}: {log['message'][:80]}...")
alerts = self.analyze_log_entry(log)
if alerts:
total_alerts += len(alerts)
self.display_alert(alerts)
# Adiciona ao buffer de erros
for alert in alerts:
self.error_buffer.append(alert)
# Exibe estatísticas se não estiver em modo verbose
if not verbose:
self.display_stats(total_logs_checked, total_alerts)
time.sleep(interval)
except KeyboardInterrupt:
print(f"\n\n{self.colorize('🛑 Monitor interrompido pelo usuário', 'yellow')}")
print(f"📊 Resumo da sessão:")
print(f" Logs verificados: {total_logs_checked}")
print(f" Alertas gerados: {total_alerts}")
if self.error_buffer:
print(f"\n🔍 Últimos {len(self.error_buffer)} problemas detectados:")
for i, error in enumerate(list(self.error_buffer)[-5:], 1):
timestamp = datetime.fromisoformat(error['timestamp']).strftime('%H:%M:%S')
print(f" {i}. [{timestamp}] {error['type']} em {error['module']}")
except Exception as e:
print(f"\n{self.colorize(f'❌ Erro inesperado: {str(e)}', 'red')}")
def main():
"""Função principal do script"""
parser = argparse.ArgumentParser(description='Monitor de Logs em Tempo Real')
parser.add_argument('--interval', type=int, default=5, help='Intervalo de verificação em segundos (padrão: 5)')
parser.add_argument('--db-path', type=str, help='Caminho para o banco de dados de logs')
parser.add_argument('--threshold', type=int, default=5, help='Limite de alertas por verificação (padrão: 5)')
parser.add_argument('--verbose', action='store_true', help='Modo verboso (mostra todos os logs)')
args = parser.parse_args()
try:
monitor = RealtimeLogMonitor(args.db_path, args.threshold)
monitor.run_monitor(args.interval, args.verbose)
except FileNotFoundError as e:
print(f"❌ Arquivo não encontrado: {str(e)}")
print("💡 Certifique-se de que a aplicação está rodando e gerando logs.")
return 1
except Exception as e:
print(f"❌ Erro inesperado: {str(e)}")
return 1
if __name__ == "__main__":
exit(main())