PatrickRedStar's picture
Add: Streaming processing toggle and speed optimization guide - Add streaming processing option in Gradio UI - Auto-enable for files >500 lines - Add chunk size slider - Create SPEED_OPTIMIZATION.md with model suggestions
4f1eb8e
"""
Gradio приложение для мультиагентной системы анализа логов.
Оркестрирует работу агентов на основе smolagents и предоставляет веб-интерфейс.
"""
import gradio as gr
import json
import os
from typing import Tuple
from agents import run_parser_agent, run_anomaly_agent, run_rca_agent, run_gpt_prompt_agent
from utils.streaming_processor import process_logs_streaming
def format_rca_as_markdown(rca_result: dict) -> str:
"""Форматирует результат RCA агента в Markdown."""
markdown_parts = []
markdown_parts.append("# Анализ первопричин и рекомендации\n")
# Анализ первопричин
analysis = rca_result.get("analysis", {})
root_causes = analysis.get("root_causes", [])
details = analysis.get("details", [])
if root_causes:
markdown_parts.append("\n## Возможные первопричины\n\n")
for i, cause in enumerate(root_causes, 1):
markdown_parts.append(f"{i}. {cause}\n")
# Детали по типам аномалий
if details:
markdown_parts.append("\n## Детальный анализ аномалий\n\n")
for detail in details:
anomaly_type = detail.get("anomaly_type", "UNKNOWN")
severity = detail.get("severity", "UNKNOWN")
description = detail.get("description", "")
possible_causes = detail.get("possible_causes", [])
markdown_parts.append(f"### {anomaly_type} ({severity})\n\n")
markdown_parts.append(f"{description}\n\n")
if possible_causes:
markdown_parts.append("#### Возможные первопричины:\n\n")
for i, cause in enumerate(possible_causes, 1):
markdown_parts.append(f"{i}. {cause}\n")
markdown_parts.append("\n")
# Рекомендации
recommendations = rca_result.get("recommendations", [])
if recommendations:
markdown_parts.append("\n## Рекомендации по устранению\n\n")
for i, rec in enumerate(recommendations, 1):
priority = rec.get("priority", "MEDIUM")
text = rec.get("text", "")
actions = rec.get("actions", [])
emoji_map = {"CRITICAL": "🔴", "HIGH": "🟠", "MEDIUM": "🟡", "LOW": "🟢"}
emoji = emoji_map.get(priority, "⚪")
markdown_parts.append(f"### {emoji} Рекомендация {i} (Приоритет: {priority})\n\n")
markdown_parts.append(f"{text}\n\n")
if actions:
markdown_parts.append("**Конкретные действия:**\n\n")
for action in actions:
markdown_parts.append(f"- {action}\n")
markdown_parts.append("\n")
# Общие рекомендации
general = rca_result.get("general_recommendations", [])
if general:
markdown_parts.append("\n## Общие рекомендации\n\n")
for rec in general:
markdown_parts.append(f"- {rec}\n")
return "".join(markdown_parts)
def analyze_logs(raw_logs: str, use_streaming: bool = False, chunk_size: int = 100) -> Tuple[str, str, str, str]:
"""
Обёртка для Gradio интерфейса.
Args:
raw_logs: Сырые логи из интерфейса
use_streaming: Использовать потоковую обработку для больших логов
chunk_size: Размер чанка для потоковой обработки (в строках)
Returns:
Кортеж результатов для отображения:
- Распарсенные логи (JSON)
- Обнаруженные аномалии (JSON)
- Рекомендации (Markdown)
- Промпт для GPT (текст)
"""
try:
# Валидация входных данных
if not raw_logs or not raw_logs.strip():
error_json = json.dumps({"error": "Логи не предоставлены"}, ensure_ascii=False, indent=2)
error_prompt = "# Ошибка\n\nЛоги не предоставлены для анализа."
return error_json, error_json, "# Ошибка\n\nЛоги не предоставлены для анализа.", error_prompt
# Определяем количество строк
log_lines = len(raw_logs.strip().split('\n'))
# Автоматически включаем потоковую обработку для больших файлов
# или если пользователь явно включил её
should_stream = use_streaming or log_lines > 500
if should_stream:
print(f"[Streaming Mode] Processing {log_lines} lines in chunks of {chunk_size}...")
return analyze_logs_streaming(raw_logs, chunk_size)
# Agent 1: Парсинг логов
try:
structured_data = run_parser_agent(raw_logs)
parsed_json = json.dumps(structured_data, ensure_ascii=False, indent=2)
except Exception as e:
error_json = json.dumps({"error": f"Ошибка парсинга логов: {str(e)}"}, ensure_ascii=False, indent=2)
error_prompt = f"# Ошибка\n\nОшибка на этапе парсинга: {str(e)}"
return error_json, error_json, f"# Ошибка\n\nОшибка на этапе парсинга: {str(e)}", error_prompt
# Agent 2: Обнаружение аномалий
try:
anomaly_report = run_anomaly_agent(structured_data)
anomalies_json = json.dumps(anomaly_report, ensure_ascii=False, indent=2)
except Exception as e:
error_json = json.dumps({"error": f"Ошибка обнаружения аномалий: {str(e)}"}, ensure_ascii=False, indent=2)
error_prompt = f"# Ошибка\n\nОшибка на этапе обнаружения аномалий: {str(e)}"
return parsed_json, error_json, f"# Ошибка\n\nОшибка на этапе обнаружения аномалий: {str(e)}", error_prompt
# Agent 3: Анализ первопричин и рекомендации
try:
rca_result = run_rca_agent(anomaly_report)
recommendations_md = format_rca_as_markdown(rca_result)
except Exception as e:
recommendations_md = f"# Ошибка\n\nОшибка на этапе анализа первопричин: {str(e)}"
rca_result = {}
# Agent 4: Генерация промпта для GPT
try:
gpt_prompt = run_gpt_prompt_agent(
structured_data,
anomaly_report,
recommendations_md
)
except Exception as e:
gpt_prompt = f"# Ошибка генерации промпта\n\nПроизошла ошибка при генерации промпта для GPT: {str(e)}\n\nПопробуйте использовать информацию из других вкладок."
return parsed_json, anomalies_json, recommendations_md, gpt_prompt
except Exception as e:
error_json = json.dumps({"error": f"Критическая ошибка: {str(e)}"}, ensure_ascii=False, indent=2)
error_prompt = f"# Критическая ошибка\n\n{str(e)}"
return error_json, error_json, f"# Критическая ошибка\n\n{str(e)}", error_prompt
def analyze_logs_streaming(raw_logs: str, chunk_size: int = 100) -> Tuple[str, str, str, str]:
"""
Потоковая обработка логов для больших файлов.
Args:
raw_logs: Сырые логи из интерфейса
chunk_size: Размер чанка в строках
Returns:
Кортеж результатов для отображения
"""
try:
result = process_logs_streaming(raw_logs, chunk_size=chunk_size)
structured_data = result['structured_data']
anomaly_report = result['anomaly_report']
rca_result = result['rca_result']
gpt_prompt = result['gpt_prompt']
parsed_json = json.dumps(structured_data, ensure_ascii=False, indent=2)
anomalies_json = json.dumps(anomaly_report, ensure_ascii=False, indent=2)
recommendations_md = format_rca_as_markdown(rca_result)
# Добавляем информацию о потоковой обработке
info_note = f"\n\n*Обработано потоковым способом: {result['chunks_processed']} чанков по {result['chunk_size']} строк*\n"
recommendations_md = recommendations_md + info_note
return parsed_json, anomalies_json, recommendations_md, gpt_prompt
except Exception as e:
error_json = json.dumps({"error": f"Ошибка потоковой обработки: {str(e)}"}, ensure_ascii=False, indent=2)
error_prompt = f"# Ошибка\n\nОшибка потоковой обработки: {str(e)}"
return error_json, error_json, error_prompt, error_prompt
def create_interface():
"""Создаёт и настраивает Gradio интерфейс."""
# Описание интерфейса
description = """
# 🔍 Мультиагентная система анализа логов
Система использует четырёх независимых агентов на основе трансформеров для анализа логов:
1. **Log Parser Agent** - парсит и структурирует логи с помощью ИИ
2. **Anomaly Detection Agent** - обнаруживает аномалии и паттерны с помощью ИИ
3. **Root Cause Agent** - анализирует первопричины и генерирует рекомендации с помощью ИИ
4. **GPT Prompt Agent** - формирует готовый промпт для GPT-моделей с полным контекстом проблемы
Вставьте логи в поле ниже или загрузите лог-файл, затем нажмите "Анализировать".
"""
# Создание интерфейса
with gr.Blocks(title="Multi-Agent Log Analysis") as app:
gr.Markdown(description)
with gr.Row():
with gr.Column(scale=1):
log_input = gr.Textbox(
label="Логи для анализа",
placeholder="Вставьте логи здесь или используйте кнопку загрузки ниже...",
lines=15,
max_lines=30
)
upload_btn = gr.UploadButton(
"📁 Загрузить лог-файл",
file_types=[".log", ".txt"],
file_count="single"
)
# Настройки обработки
with gr.Accordion("⚙️ Настройки обработки", open=False):
use_streaming = gr.Checkbox(
label="Потоковая обработка (для больших файлов >500 строк)",
value=False,
info="Автоматически включается для файлов >500 строк. Разбивает логи на части для более быстрой обработки."
)
chunk_size = gr.Slider(
label="Размер чанка (строк)",
minimum=50,
maximum=200,
value=100,
step=50,
info="Количество строк в одном чанке при потоковой обработке"
)
analyze_btn = gr.Button("🔍 Анализировать", variant="primary", size="lg")
# Обработчик загрузки файла
def load_file(file):
if file is None:
return ""
try:
with open(file.name, 'r', encoding='utf-8') as f:
content = f.read()
return content
except Exception as e:
return f"Ошибка чтения файла: {str(e)}"
upload_btn.upload(load_file, inputs=upload_btn, outputs=log_input)
with gr.Row():
with gr.Tabs():
with gr.Tab("📊 Распарсенные логи (JSON)"):
parsed_output = gr.Code(
label="Структурированные данные",
language="json",
lines=10
)
with gr.Tab("⚠️ Обнаруженные аномалии (JSON)"):
anomalies_output = gr.Code(
label="Отчёт об аномалиях",
language="json",
lines=10
)
with gr.Tab("💡 Анализ и рекомендации (Markdown)"):
recommendations_output = gr.Markdown(
label="Рекомендации по устранению проблем"
)
with gr.Tab("🤖 Промпт для GPT"):
gpt_prompt_output = gr.Textbox(
label="Готовый промпт для вставки в GPT",
placeholder="Промпт будет сгенерирован после анализа логов...",
lines=20,
max_lines=50,
interactive=True
)
gr.Markdown("""
**💡 Подсказка:**
- Скопируйте этот промпт и вставьте в ChatGPT, Claude, или любую другую GPT-модель
- Промпт содержит всю необходимую информацию о проблеме, контекст системы и примеры ошибок
- GPT сможет предложить конкретные шаги для решения проблемы
""")
# Примеры логов для тестирования
gr.Markdown("### 📝 Примеры логов для тестирования")
with gr.Row():
example_logs = [
"""2024-01-15 10:00:00 INFO Application started
2024-01-15 10:00:05 INFO Database connection established
2024-01-15 10:01:00 ERROR Connection timeout to external API
2024-01-15 10:01:05 ERROR Connection timeout to external API
2024-01-15 10:01:10 ERROR Connection timeout to external API
2024-01-15 10:01:15 WARNING High memory usage detected: 85%
2024-01-15 10:02:00 CRITICAL System crash detected
2024-01-15 10:02:01 INFO Application shutdown""",
"""[2024-01-15 14:30:00] INFO User authentication successful
[2024-01-15 14:30:01] DEBUG Request received: GET /api/users
[2024-01-15 14:30:02] ERROR Database query failed: connection lost
[2024-01-15 14:30:03] ERROR Database query failed: connection lost
[2024-01-15 14:30:04] ERROR Database query failed: connection lost
[2024-01-15 14:30:05] ERROR Database query failed: connection lost
[2024-01-15 14:30:06] WARNING Retrying database connection
[2024-01-15 14:30:10] INFO Database connection restored"""
]
example_btn1 = gr.Button("Загрузить пример 1", size="sm")
example_btn2 = gr.Button("Загрузить пример 2", size="sm")
example_btn1.click(
lambda: example_logs[0],
outputs=log_input
)
example_btn2.click(
lambda: example_logs[1],
outputs=log_input
)
# Связывание кнопки анализа с обработчиком
analyze_btn.click(
fn=analyze_logs,
inputs=[log_input, use_streaming, chunk_size],
outputs=[parsed_output, anomalies_output, recommendations_output, gpt_prompt_output]
)
# Информация о системе
gr.Markdown("""
---
### ℹ️ Информация о системе
- **Архитектура:** Мультиагентная система (4 независимых агента на основе DeepSeek через smolagents)
- **Платформа:** Hugging Face Spaces
- **Интерфейс:** Gradio
- **Поддержка:** До 10,000 строк логов
- **Интеграция:** Генерация промптов для GPT-моделей
- **Модель:** DeepSeek-V3.1-Terminus (через smolagents)
""")
return app
# Создание приложения
demo = create_interface()
# Запуск приложения
if __name__ == "__main__":
# Проверка, запущено ли в Hugging Face Space
in_hf_space = bool(os.getenv("SPACE_ID") or os.getenv("HF_SPACE"))
share_flag = False if in_hf_space else False
host = os.getenv("GRADIO_HOST") or os.getenv("GRADIO_SERVER_NAME") or "0.0.0.0"
port = int(os.getenv("PORT") or os.getenv("GRADIO_SERVER_PORT") or 7860)
demo.queue(api_open=False).launch(
server_name=host,
server_port=port,
share=share_flag,
show_api=False,
)