teste / scripts /log_analysis /advanced_log_analyzer.py
torxyton's picture
feat: Implementa estrutura completa de testes com pytest
b9c68d4
#!/usr/bin/env python3
"""
Script de Análise Avançada de Logs para Desenvolvedores
Este script fornece análise detalhada dos logs da aplicação,
incluindo detecção de padrões, análise de performance e
identificação de problemas comuns.
Autor: Sistema de Análise de Mercado
Versão: 1.0.0
"""
import os
import re
import json
import sqlite3
from datetime import datetime, timedelta
from collections import defaultdict, Counter
from typing import Dict, List, Tuple, Optional
import argparse
from pathlib import Path
class AdvancedLogAnalyzer:
"""Analisador avançado de logs para desenvolvedores"""
def __init__(self, log_db_path: str = None):
self.log_db_path = log_db_path or "logs/application.db"
self.patterns = {
'error': re.compile(r'ERROR|Exception|Traceback|Failed', re.IGNORECASE),
'warning': re.compile(r'WARNING|WARN', re.IGNORECASE),
'performance': re.compile(r'took (\d+\.?\d*)\s*(ms|seconds?)', re.IGNORECASE),
'memory': re.compile(r'memory|RAM|heap', re.IGNORECASE),
'import_error': re.compile(r'ImportError|ModuleNotFoundError|not defined', re.IGNORECASE),
'gradio': re.compile(r'gradio|Running on|localhost', re.IGNORECASE)
}
def connect_db(self) -> sqlite3.Connection:
"""Conecta ao banco de dados de logs"""
if not os.path.exists(self.log_db_path):
raise FileNotFoundError(f"Banco de dados de logs não encontrado: {self.log_db_path}")
return sqlite3.connect(self.log_db_path)
def analyze_recent_logs(self, hours: int = 24) -> Dict:
"""Analisa logs das últimas N horas"""
try:
conn = self.connect_db()
cursor = conn.cursor()
# Busca logs recentes
since_time = datetime.now() - timedelta(hours=hours)
cursor.execute("""
SELECT timestamp, level, message, module
FROM logs
WHERE timestamp >= ?
ORDER BY timestamp DESC
""", (since_time.isoformat(),))
logs = cursor.fetchall()
conn.close()
return self._process_logs(logs)
except Exception as e:
return {"error": f"Erro ao analisar logs: {str(e)}"}
def _process_logs(self, logs: List[Tuple]) -> Dict:
"""Processa e analisa os logs"""
analysis = {
'total_logs': len(logs),
'by_level': Counter(),
'by_module': Counter(),
'errors': [],
'warnings': [],
'performance_issues': [],
'import_errors': [],
'gradio_events': [],
'timeline': defaultdict(int)
}
for timestamp, level, message, module in logs:
# Contadores básicos
analysis['by_level'][level] += 1
analysis['by_module'][module] += 1
# Timeline por hora
hour = datetime.fromisoformat(timestamp).strftime('%Y-%m-%d %H:00')
analysis['timeline'][hour] += 1
# Análise de padrões
if self.patterns['error'].search(message):
analysis['errors'].append({
'timestamp': timestamp,
'module': module,
'message': message[:200] + '...' if len(message) > 200 else message
})
if self.patterns['warning'].search(message):
analysis['warnings'].append({
'timestamp': timestamp,
'module': module,
'message': message[:200] + '...' if len(message) > 200 else message
})
if self.patterns['import_error'].search(message):
analysis['import_errors'].append({
'timestamp': timestamp,
'module': module,
'message': message
})
if self.patterns['gradio'].search(message):
analysis['gradio_events'].append({
'timestamp': timestamp,
'message': message
})
# Análise de performance
perf_match = self.patterns['performance'].search(message)
if perf_match:
time_value = float(perf_match.group(1))
unit = perf_match.group(2).lower()
# Converte para milissegundos
if 'second' in unit:
time_value *= 1000
if time_value > 1000: # Mais de 1 segundo
analysis['performance_issues'].append({
'timestamp': timestamp,
'module': module,
'duration_ms': time_value,
'message': message
})
return analysis
def generate_report(self, analysis: Dict) -> str:
"""Gera relatório detalhado da análise"""
report = []
report.append("=" * 60)
report.append("RELATÓRIO DE ANÁLISE AVANÇADA DE LOGS")
report.append("=" * 60)
report.append(f"Gerado em: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
report.append(f"Total de logs analisados: {analysis['total_logs']}")
report.append("")
# Resumo por nível
report.append("📊 DISTRIBUIÇÃO POR NÍVEL:")
for level, count in analysis['by_level'].most_common():
percentage = (count / analysis['total_logs']) * 100
report.append(f" {level}: {count} ({percentage:.1f}%)")
report.append("")
# Módulos mais ativos
report.append("🔧 MÓDULOS MAIS ATIVOS:")
for module, count in analysis['by_module'].most_common(10):
report.append(f" {module}: {count} logs")
report.append("")
# Erros críticos
if analysis['errors']:
report.append(f"❌ ERROS ENCONTRADOS ({len(analysis['errors'])}):")
for error in analysis['errors'][:5]: # Mostra apenas os 5 mais recentes
report.append(f" [{error['timestamp']}] {error['module']}")
report.append(f" {error['message']}")
if len(analysis['errors']) > 5:
report.append(f" ... e mais {len(analysis['errors']) - 5} erros")
report.append("")
# Problemas de importação
if analysis['import_errors']:
report.append(f"📦 ERROS DE IMPORTAÇÃO ({len(analysis['import_errors'])}):")
for error in analysis['import_errors']:
report.append(f" [{error['timestamp']}] {error['module']}")
report.append(f" {error['message']}")
report.append("")
# Problemas de performance
if analysis['performance_issues']:
report.append(f"⚡ PROBLEMAS DE PERFORMANCE ({len(analysis['performance_issues'])}):")
for issue in analysis['performance_issues'][:5]:
report.append(f" [{issue['timestamp']}] {issue['module']}")
report.append(f" Duração: {issue['duration_ms']:.0f}ms")
report.append(f" {issue['message'][:100]}...")
report.append("")
# Eventos do Gradio
if analysis['gradio_events']:
report.append(f"🌐 EVENTOS DO GRADIO ({len(analysis['gradio_events'])}):")
for event in analysis['gradio_events'][-3:]: # Últimos 3 eventos
report.append(f" [{event['timestamp']}] {event['message']}")
report.append("")
# Timeline
if analysis['timeline']:
report.append("📈 ATIVIDADE POR HORA:")
sorted_timeline = sorted(analysis['timeline'].items())
for hour, count in sorted_timeline[-12:]: # Últimas 12 horas
bar = "█" * min(count // 10, 20) # Barra visual
report.append(f" {hour}: {count:3d} {bar}")
report.append("")
report.append("=" * 60)
return "\n".join(report)
def export_json(self, analysis: Dict, output_file: str):
"""Exporta análise em formato JSON"""
# Converte Counter para dict para serialização JSON
json_analysis = {
'total_logs': analysis['total_logs'],
'by_level': dict(analysis['by_level']),
'by_module': dict(analysis['by_module']),
'errors': analysis['errors'],
'warnings': analysis['warnings'],
'performance_issues': analysis['performance_issues'],
'import_errors': analysis['import_errors'],
'gradio_events': analysis['gradio_events'],
'timeline': dict(analysis['timeline']),
'generated_at': datetime.now().isoformat()
}
with open(output_file, 'w', encoding='utf-8') as f:
json.dump(json_analysis, f, indent=2, ensure_ascii=False)
def main():
"""Função principal do script"""
parser = argparse.ArgumentParser(description='Análise Avançada de Logs para Desenvolvedores')
parser.add_argument('--hours', type=int, default=24, help='Horas para analisar (padrão: 24)')
parser.add_argument('--db-path', type=str, help='Caminho para o banco de dados de logs')
parser.add_argument('--output', type=str, help='Arquivo de saída para relatório')
parser.add_argument('--json', type=str, help='Exportar análise em JSON')
parser.add_argument('--quiet', action='store_true', help='Modo silencioso (apenas erros)')
args = parser.parse_args()
try:
# Inicializa o analisador
analyzer = AdvancedLogAnalyzer(args.db_path)
if not args.quiet:
print(f"🔍 Analisando logs das últimas {args.hours} horas...")
# Executa análise
analysis = analyzer.analyze_recent_logs(args.hours)
if 'error' in analysis:
print(f"❌ Erro: {analysis['error']}")
return 1
# Gera relatório
report = analyzer.generate_report(analysis)
# Salva ou exibe relatório
if args.output:
with open(args.output, 'w', encoding='utf-8') as f:
f.write(report)
if not args.quiet:
print(f"📄 Relatório salvo em: {args.output}")
else:
print(report)
# Exporta JSON se solicitado
if args.json:
analyzer.export_json(analysis, args.json)
if not args.quiet:
print(f"📊 Análise JSON salva em: {args.json}")
# Resumo final
if not args.quiet:
total_issues = len(analysis['errors']) + len(analysis['import_errors']) + len(analysis['performance_issues'])
if total_issues > 0:
print(f"\n⚠️ Total de problemas encontrados: {total_issues}")
else:
print(f"\n✅ Nenhum problema crítico encontrado!")
return 0
except Exception as e:
print(f"❌ Erro inesperado: {str(e)}")
return 1
if __name__ == "__main__":
exit(main())