Spaces:
Sleeping
Sleeping
| #!/usr/bin/env python3 | |
| """ | |
| Script de Análise Avançada de Logs para Desenvolvedores | |
| Este script fornece análise detalhada dos logs da aplicação, | |
| incluindo detecção de padrões, análise de performance e | |
| identificação de problemas comuns. | |
| Autor: Sistema de Análise de Mercado | |
| Versão: 1.0.0 | |
| """ | |
| import os | |
| import re | |
| import json | |
| import sqlite3 | |
| from datetime import datetime, timedelta | |
| from collections import defaultdict, Counter | |
| from typing import Dict, List, Tuple, Optional | |
| import argparse | |
| from pathlib import Path | |
| class AdvancedLogAnalyzer: | |
| """Analisador avançado de logs para desenvolvedores""" | |
| def __init__(self, log_db_path: str = None): | |
| self.log_db_path = log_db_path or "logs/application.db" | |
| self.patterns = { | |
| 'error': re.compile(r'ERROR|Exception|Traceback|Failed', re.IGNORECASE), | |
| 'warning': re.compile(r'WARNING|WARN', re.IGNORECASE), | |
| 'performance': re.compile(r'took (\d+\.?\d*)\s*(ms|seconds?)', re.IGNORECASE), | |
| 'memory': re.compile(r'memory|RAM|heap', re.IGNORECASE), | |
| 'import_error': re.compile(r'ImportError|ModuleNotFoundError|not defined', re.IGNORECASE), | |
| 'gradio': re.compile(r'gradio|Running on|localhost', re.IGNORECASE) | |
| } | |
| def connect_db(self) -> sqlite3.Connection: | |
| """Conecta ao banco de dados de logs""" | |
| if not os.path.exists(self.log_db_path): | |
| raise FileNotFoundError(f"Banco de dados de logs não encontrado: {self.log_db_path}") | |
| return sqlite3.connect(self.log_db_path) | |
| def analyze_recent_logs(self, hours: int = 24) -> Dict: | |
| """Analisa logs das últimas N horas""" | |
| try: | |
| conn = self.connect_db() | |
| cursor = conn.cursor() | |
| # Busca logs recentes | |
| since_time = datetime.now() - timedelta(hours=hours) | |
| cursor.execute(""" | |
| SELECT timestamp, level, message, module | |
| FROM logs | |
| WHERE timestamp >= ? | |
| ORDER BY timestamp DESC | |
| """, (since_time.isoformat(),)) | |
| logs = cursor.fetchall() | |
| conn.close() | |
| return self._process_logs(logs) | |
| except Exception as e: | |
| return {"error": f"Erro ao analisar logs: {str(e)}"} | |
| def _process_logs(self, logs: List[Tuple]) -> Dict: | |
| """Processa e analisa os logs""" | |
| analysis = { | |
| 'total_logs': len(logs), | |
| 'by_level': Counter(), | |
| 'by_module': Counter(), | |
| 'errors': [], | |
| 'warnings': [], | |
| 'performance_issues': [], | |
| 'import_errors': [], | |
| 'gradio_events': [], | |
| 'timeline': defaultdict(int) | |
| } | |
| for timestamp, level, message, module in logs: | |
| # Contadores básicos | |
| analysis['by_level'][level] += 1 | |
| analysis['by_module'][module] += 1 | |
| # Timeline por hora | |
| hour = datetime.fromisoformat(timestamp).strftime('%Y-%m-%d %H:00') | |
| analysis['timeline'][hour] += 1 | |
| # Análise de padrões | |
| if self.patterns['error'].search(message): | |
| analysis['errors'].append({ | |
| 'timestamp': timestamp, | |
| 'module': module, | |
| 'message': message[:200] + '...' if len(message) > 200 else message | |
| }) | |
| if self.patterns['warning'].search(message): | |
| analysis['warnings'].append({ | |
| 'timestamp': timestamp, | |
| 'module': module, | |
| 'message': message[:200] + '...' if len(message) > 200 else message | |
| }) | |
| if self.patterns['import_error'].search(message): | |
| analysis['import_errors'].append({ | |
| 'timestamp': timestamp, | |
| 'module': module, | |
| 'message': message | |
| }) | |
| if self.patterns['gradio'].search(message): | |
| analysis['gradio_events'].append({ | |
| 'timestamp': timestamp, | |
| 'message': message | |
| }) | |
| # Análise de performance | |
| perf_match = self.patterns['performance'].search(message) | |
| if perf_match: | |
| time_value = float(perf_match.group(1)) | |
| unit = perf_match.group(2).lower() | |
| # Converte para milissegundos | |
| if 'second' in unit: | |
| time_value *= 1000 | |
| if time_value > 1000: # Mais de 1 segundo | |
| analysis['performance_issues'].append({ | |
| 'timestamp': timestamp, | |
| 'module': module, | |
| 'duration_ms': time_value, | |
| 'message': message | |
| }) | |
| return analysis | |
| def generate_report(self, analysis: Dict) -> str: | |
| """Gera relatório detalhado da análise""" | |
| report = [] | |
| report.append("=" * 60) | |
| report.append("RELATÓRIO DE ANÁLISE AVANÇADA DE LOGS") | |
| report.append("=" * 60) | |
| report.append(f"Gerado em: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") | |
| report.append(f"Total de logs analisados: {analysis['total_logs']}") | |
| report.append("") | |
| # Resumo por nível | |
| report.append("📊 DISTRIBUIÇÃO POR NÍVEL:") | |
| for level, count in analysis['by_level'].most_common(): | |
| percentage = (count / analysis['total_logs']) * 100 | |
| report.append(f" {level}: {count} ({percentage:.1f}%)") | |
| report.append("") | |
| # Módulos mais ativos | |
| report.append("🔧 MÓDULOS MAIS ATIVOS:") | |
| for module, count in analysis['by_module'].most_common(10): | |
| report.append(f" {module}: {count} logs") | |
| report.append("") | |
| # Erros críticos | |
| if analysis['errors']: | |
| report.append(f"❌ ERROS ENCONTRADOS ({len(analysis['errors'])}):") | |
| for error in analysis['errors'][:5]: # Mostra apenas os 5 mais recentes | |
| report.append(f" [{error['timestamp']}] {error['module']}") | |
| report.append(f" {error['message']}") | |
| if len(analysis['errors']) > 5: | |
| report.append(f" ... e mais {len(analysis['errors']) - 5} erros") | |
| report.append("") | |
| # Problemas de importação | |
| if analysis['import_errors']: | |
| report.append(f"📦 ERROS DE IMPORTAÇÃO ({len(analysis['import_errors'])}):") | |
| for error in analysis['import_errors']: | |
| report.append(f" [{error['timestamp']}] {error['module']}") | |
| report.append(f" {error['message']}") | |
| report.append("") | |
| # Problemas de performance | |
| if analysis['performance_issues']: | |
| report.append(f"⚡ PROBLEMAS DE PERFORMANCE ({len(analysis['performance_issues'])}):") | |
| for issue in analysis['performance_issues'][:5]: | |
| report.append(f" [{issue['timestamp']}] {issue['module']}") | |
| report.append(f" Duração: {issue['duration_ms']:.0f}ms") | |
| report.append(f" {issue['message'][:100]}...") | |
| report.append("") | |
| # Eventos do Gradio | |
| if analysis['gradio_events']: | |
| report.append(f"🌐 EVENTOS DO GRADIO ({len(analysis['gradio_events'])}):") | |
| for event in analysis['gradio_events'][-3:]: # Últimos 3 eventos | |
| report.append(f" [{event['timestamp']}] {event['message']}") | |
| report.append("") | |
| # Timeline | |
| if analysis['timeline']: | |
| report.append("📈 ATIVIDADE POR HORA:") | |
| sorted_timeline = sorted(analysis['timeline'].items()) | |
| for hour, count in sorted_timeline[-12:]: # Últimas 12 horas | |
| bar = "█" * min(count // 10, 20) # Barra visual | |
| report.append(f" {hour}: {count:3d} {bar}") | |
| report.append("") | |
| report.append("=" * 60) | |
| return "\n".join(report) | |
| def export_json(self, analysis: Dict, output_file: str): | |
| """Exporta análise em formato JSON""" | |
| # Converte Counter para dict para serialização JSON | |
| json_analysis = { | |
| 'total_logs': analysis['total_logs'], | |
| 'by_level': dict(analysis['by_level']), | |
| 'by_module': dict(analysis['by_module']), | |
| 'errors': analysis['errors'], | |
| 'warnings': analysis['warnings'], | |
| 'performance_issues': analysis['performance_issues'], | |
| 'import_errors': analysis['import_errors'], | |
| 'gradio_events': analysis['gradio_events'], | |
| 'timeline': dict(analysis['timeline']), | |
| 'generated_at': datetime.now().isoformat() | |
| } | |
| with open(output_file, 'w', encoding='utf-8') as f: | |
| json.dump(json_analysis, f, indent=2, ensure_ascii=False) | |
| def main(): | |
| """Função principal do script""" | |
| parser = argparse.ArgumentParser(description='Análise Avançada de Logs para Desenvolvedores') | |
| parser.add_argument('--hours', type=int, default=24, help='Horas para analisar (padrão: 24)') | |
| parser.add_argument('--db-path', type=str, help='Caminho para o banco de dados de logs') | |
| parser.add_argument('--output', type=str, help='Arquivo de saída para relatório') | |
| parser.add_argument('--json', type=str, help='Exportar análise em JSON') | |
| parser.add_argument('--quiet', action='store_true', help='Modo silencioso (apenas erros)') | |
| args = parser.parse_args() | |
| try: | |
| # Inicializa o analisador | |
| analyzer = AdvancedLogAnalyzer(args.db_path) | |
| if not args.quiet: | |
| print(f"🔍 Analisando logs das últimas {args.hours} horas...") | |
| # Executa análise | |
| analysis = analyzer.analyze_recent_logs(args.hours) | |
| if 'error' in analysis: | |
| print(f"❌ Erro: {analysis['error']}") | |
| return 1 | |
| # Gera relatório | |
| report = analyzer.generate_report(analysis) | |
| # Salva ou exibe relatório | |
| if args.output: | |
| with open(args.output, 'w', encoding='utf-8') as f: | |
| f.write(report) | |
| if not args.quiet: | |
| print(f"📄 Relatório salvo em: {args.output}") | |
| else: | |
| print(report) | |
| # Exporta JSON se solicitado | |
| if args.json: | |
| analyzer.export_json(analysis, args.json) | |
| if not args.quiet: | |
| print(f"📊 Análise JSON salva em: {args.json}") | |
| # Resumo final | |
| if not args.quiet: | |
| total_issues = len(analysis['errors']) + len(analysis['import_errors']) + len(analysis['performance_issues']) | |
| if total_issues > 0: | |
| print(f"\n⚠️ Total de problemas encontrados: {total_issues}") | |
| else: | |
| print(f"\n✅ Nenhum problema crítico encontrado!") | |
| return 0 | |
| except Exception as e: | |
| print(f"❌ Erro inesperado: {str(e)}") | |
| return 1 | |
| if __name__ == "__main__": | |
| exit(main()) |