""" Gradio приложение для мультиагентной системы анализа логов. Оркестрирует работу агентов на основе smolagents и предоставляет веб-интерфейс. """ import gradio as gr import json import os from typing import Tuple from agents import run_parser_agent, run_anomaly_agent, run_rca_agent, run_gpt_prompt_agent from utils.streaming_processor import process_logs_streaming def format_rca_as_markdown(rca_result: dict) -> str: """Форматирует результат RCA агента в Markdown.""" markdown_parts = [] markdown_parts.append("# Анализ первопричин и рекомендации\n") # Анализ первопричин analysis = rca_result.get("analysis", {}) root_causes = analysis.get("root_causes", []) details = analysis.get("details", []) if root_causes: markdown_parts.append("\n## Возможные первопричины\n\n") for i, cause in enumerate(root_causes, 1): markdown_parts.append(f"{i}. {cause}\n") # Детали по типам аномалий if details: markdown_parts.append("\n## Детальный анализ аномалий\n\n") for detail in details: anomaly_type = detail.get("anomaly_type", "UNKNOWN") severity = detail.get("severity", "UNKNOWN") description = detail.get("description", "") possible_causes = detail.get("possible_causes", []) markdown_parts.append(f"### {anomaly_type} ({severity})\n\n") markdown_parts.append(f"{description}\n\n") if possible_causes: markdown_parts.append("#### Возможные первопричины:\n\n") for i, cause in enumerate(possible_causes, 1): markdown_parts.append(f"{i}. {cause}\n") markdown_parts.append("\n") # Рекомендации recommendations = rca_result.get("recommendations", []) if recommendations: markdown_parts.append("\n## Рекомендации по устранению\n\n") for i, rec in enumerate(recommendations, 1): priority = rec.get("priority", "MEDIUM") text = rec.get("text", "") actions = rec.get("actions", []) emoji_map = {"CRITICAL": "🔴", "HIGH": "🟠", "MEDIUM": "🟡", "LOW": "🟢"} emoji = emoji_map.get(priority, "⚪") markdown_parts.append(f"### {emoji} Рекомендация {i} (Приоритет: {priority})\n\n") markdown_parts.append(f"{text}\n\n") if actions: markdown_parts.append("**Конкретные действия:**\n\n") for action in actions: markdown_parts.append(f"- {action}\n") markdown_parts.append("\n") # Общие рекомендации general = rca_result.get("general_recommendations", []) if general: markdown_parts.append("\n## Общие рекомендации\n\n") for rec in general: markdown_parts.append(f"- {rec}\n") return "".join(markdown_parts) def analyze_logs(raw_logs: str, use_streaming: bool = False, chunk_size: int = 100) -> Tuple[str, str, str, str]: """ Обёртка для Gradio интерфейса. Args: raw_logs: Сырые логи из интерфейса use_streaming: Использовать потоковую обработку для больших логов chunk_size: Размер чанка для потоковой обработки (в строках) Returns: Кортеж результатов для отображения: - Распарсенные логи (JSON) - Обнаруженные аномалии (JSON) - Рекомендации (Markdown) - Промпт для GPT (текст) """ try: # Валидация входных данных if not raw_logs or not raw_logs.strip(): error_json = json.dumps({"error": "Логи не предоставлены"}, ensure_ascii=False, indent=2) error_prompt = "# Ошибка\n\nЛоги не предоставлены для анализа." return error_json, error_json, "# Ошибка\n\nЛоги не предоставлены для анализа.", error_prompt # Определяем количество строк log_lines = len(raw_logs.strip().split('\n')) # Автоматически включаем потоковую обработку для больших файлов # или если пользователь явно включил её should_stream = use_streaming or log_lines > 500 if should_stream: print(f"[Streaming Mode] Processing {log_lines} lines in chunks of {chunk_size}...") return analyze_logs_streaming(raw_logs, chunk_size) # Agent 1: Парсинг логов try: structured_data = run_parser_agent(raw_logs) parsed_json = json.dumps(structured_data, ensure_ascii=False, indent=2) except Exception as e: error_json = json.dumps({"error": f"Ошибка парсинга логов: {str(e)}"}, ensure_ascii=False, indent=2) error_prompt = f"# Ошибка\n\nОшибка на этапе парсинга: {str(e)}" return error_json, error_json, f"# Ошибка\n\nОшибка на этапе парсинга: {str(e)}", error_prompt # Agent 2: Обнаружение аномалий try: anomaly_report = run_anomaly_agent(structured_data) anomalies_json = json.dumps(anomaly_report, ensure_ascii=False, indent=2) except Exception as e: error_json = json.dumps({"error": f"Ошибка обнаружения аномалий: {str(e)}"}, ensure_ascii=False, indent=2) error_prompt = f"# Ошибка\n\nОшибка на этапе обнаружения аномалий: {str(e)}" return parsed_json, error_json, f"# Ошибка\n\nОшибка на этапе обнаружения аномалий: {str(e)}", error_prompt # Agent 3: Анализ первопричин и рекомендации try: rca_result = run_rca_agent(anomaly_report) recommendations_md = format_rca_as_markdown(rca_result) except Exception as e: recommendations_md = f"# Ошибка\n\nОшибка на этапе анализа первопричин: {str(e)}" rca_result = {} # Agent 4: Генерация промпта для GPT try: gpt_prompt = run_gpt_prompt_agent( structured_data, anomaly_report, recommendations_md ) except Exception as e: gpt_prompt = f"# Ошибка генерации промпта\n\nПроизошла ошибка при генерации промпта для GPT: {str(e)}\n\nПопробуйте использовать информацию из других вкладок." return parsed_json, anomalies_json, recommendations_md, gpt_prompt except Exception as e: error_json = json.dumps({"error": f"Критическая ошибка: {str(e)}"}, ensure_ascii=False, indent=2) error_prompt = f"# Критическая ошибка\n\n{str(e)}" return error_json, error_json, f"# Критическая ошибка\n\n{str(e)}", error_prompt def analyze_logs_streaming(raw_logs: str, chunk_size: int = 100) -> Tuple[str, str, str, str]: """ Потоковая обработка логов для больших файлов. Args: raw_logs: Сырые логи из интерфейса chunk_size: Размер чанка в строках Returns: Кортеж результатов для отображения """ try: result = process_logs_streaming(raw_logs, chunk_size=chunk_size) structured_data = result['structured_data'] anomaly_report = result['anomaly_report'] rca_result = result['rca_result'] gpt_prompt = result['gpt_prompt'] parsed_json = json.dumps(structured_data, ensure_ascii=False, indent=2) anomalies_json = json.dumps(anomaly_report, ensure_ascii=False, indent=2) recommendations_md = format_rca_as_markdown(rca_result) # Добавляем информацию о потоковой обработке info_note = f"\n\n*Обработано потоковым способом: {result['chunks_processed']} чанков по {result['chunk_size']} строк*\n" recommendations_md = recommendations_md + info_note return parsed_json, anomalies_json, recommendations_md, gpt_prompt except Exception as e: error_json = json.dumps({"error": f"Ошибка потоковой обработки: {str(e)}"}, ensure_ascii=False, indent=2) error_prompt = f"# Ошибка\n\nОшибка потоковой обработки: {str(e)}" return error_json, error_json, error_prompt, error_prompt def create_interface(): """Создаёт и настраивает Gradio интерфейс.""" # Описание интерфейса description = """ # 🔍 Мультиагентная система анализа логов Система использует четырёх независимых агентов на основе трансформеров для анализа логов: 1. **Log Parser Agent** - парсит и структурирует логи с помощью ИИ 2. **Anomaly Detection Agent** - обнаруживает аномалии и паттерны с помощью ИИ 3. **Root Cause Agent** - анализирует первопричины и генерирует рекомендации с помощью ИИ 4. **GPT Prompt Agent** - формирует готовый промпт для GPT-моделей с полным контекстом проблемы Вставьте логи в поле ниже или загрузите лог-файл, затем нажмите "Анализировать". """ # Создание интерфейса with gr.Blocks(title="Multi-Agent Log Analysis") as app: gr.Markdown(description) with gr.Row(): with gr.Column(scale=1): log_input = gr.Textbox( label="Логи для анализа", placeholder="Вставьте логи здесь или используйте кнопку загрузки ниже...", lines=15, max_lines=30 ) upload_btn = gr.UploadButton( "📁 Загрузить лог-файл", file_types=[".log", ".txt"], file_count="single" ) # Настройки обработки with gr.Accordion("⚙️ Настройки обработки", open=False): use_streaming = gr.Checkbox( label="Потоковая обработка (для больших файлов >500 строк)", value=False, info="Автоматически включается для файлов >500 строк. Разбивает логи на части для более быстрой обработки." ) chunk_size = gr.Slider( label="Размер чанка (строк)", minimum=50, maximum=200, value=100, step=50, info="Количество строк в одном чанке при потоковой обработке" ) analyze_btn = gr.Button("🔍 Анализировать", variant="primary", size="lg") # Обработчик загрузки файла def load_file(file): if file is None: return "" try: with open(file.name, 'r', encoding='utf-8') as f: content = f.read() return content except Exception as e: return f"Ошибка чтения файла: {str(e)}" upload_btn.upload(load_file, inputs=upload_btn, outputs=log_input) with gr.Row(): with gr.Tabs(): with gr.Tab("📊 Распарсенные логи (JSON)"): parsed_output = gr.Code( label="Структурированные данные", language="json", lines=10 ) with gr.Tab("⚠️ Обнаруженные аномалии (JSON)"): anomalies_output = gr.Code( label="Отчёт об аномалиях", language="json", lines=10 ) with gr.Tab("💡 Анализ и рекомендации (Markdown)"): recommendations_output = gr.Markdown( label="Рекомендации по устранению проблем" ) with gr.Tab("🤖 Промпт для GPT"): gpt_prompt_output = gr.Textbox( label="Готовый промпт для вставки в GPT", placeholder="Промпт будет сгенерирован после анализа логов...", lines=20, max_lines=50, interactive=True ) gr.Markdown(""" **💡 Подсказка:** - Скопируйте этот промпт и вставьте в ChatGPT, Claude, или любую другую GPT-модель - Промпт содержит всю необходимую информацию о проблеме, контекст системы и примеры ошибок - GPT сможет предложить конкретные шаги для решения проблемы """) # Примеры логов для тестирования gr.Markdown("### 📝 Примеры логов для тестирования") with gr.Row(): example_logs = [ """2024-01-15 10:00:00 INFO Application started 2024-01-15 10:00:05 INFO Database connection established 2024-01-15 10:01:00 ERROR Connection timeout to external API 2024-01-15 10:01:05 ERROR Connection timeout to external API 2024-01-15 10:01:10 ERROR Connection timeout to external API 2024-01-15 10:01:15 WARNING High memory usage detected: 85% 2024-01-15 10:02:00 CRITICAL System crash detected 2024-01-15 10:02:01 INFO Application shutdown""", """[2024-01-15 14:30:00] INFO User authentication successful [2024-01-15 14:30:01] DEBUG Request received: GET /api/users [2024-01-15 14:30:02] ERROR Database query failed: connection lost [2024-01-15 14:30:03] ERROR Database query failed: connection lost [2024-01-15 14:30:04] ERROR Database query failed: connection lost [2024-01-15 14:30:05] ERROR Database query failed: connection lost [2024-01-15 14:30:06] WARNING Retrying database connection [2024-01-15 14:30:10] INFO Database connection restored""" ] example_btn1 = gr.Button("Загрузить пример 1", size="sm") example_btn2 = gr.Button("Загрузить пример 2", size="sm") example_btn1.click( lambda: example_logs[0], outputs=log_input ) example_btn2.click( lambda: example_logs[1], outputs=log_input ) # Связывание кнопки анализа с обработчиком analyze_btn.click( fn=analyze_logs, inputs=[log_input, use_streaming, chunk_size], outputs=[parsed_output, anomalies_output, recommendations_output, gpt_prompt_output] ) # Информация о системе gr.Markdown(""" --- ### ℹ️ Информация о системе - **Архитектура:** Мультиагентная система (4 независимых агента на основе DeepSeek через smolagents) - **Платформа:** Hugging Face Spaces - **Интерфейс:** Gradio - **Поддержка:** До 10,000 строк логов - **Интеграция:** Генерация промптов для GPT-моделей - **Модель:** DeepSeek-V3.1-Terminus (через smolagents) """) return app # Создание приложения demo = create_interface() # Запуск приложения if __name__ == "__main__": # Проверка, запущено ли в Hugging Face Space in_hf_space = bool(os.getenv("SPACE_ID") or os.getenv("HF_SPACE")) share_flag = False if in_hf_space else False host = os.getenv("GRADIO_HOST") or os.getenv("GRADIO_SERVER_NAME") or "0.0.0.0" port = int(os.getenv("PORT") or os.getenv("GRADIO_SERVER_PORT") or 7860) demo.queue(api_open=False).launch( server_name=host, server_port=port, share=share_flag, show_api=False, )