""" Агенты для анализа логов на основе smolagents. Используют трансформеры для интеллектуального анализа. """ import os import json from smolagents import ToolCallingAgent, InferenceClientModel, FinalAnswerTool from schemas.schemas import parser_schema, anomaly_schema, rca_schema from utils.validation import validate_schema from utils.agent_runner import run_agent_safely # Загружаем модель # Токен должен быть установлен через переменную окружения HF_TOKEN # В Hugging Face Spaces установите секрет через настройки Space hf_token = os.getenv("HF_TOKEN") if not hf_token: raise ValueError( "HF_TOKEN environment variable is not set. " "Please set it in Hugging Face Spaces secrets or as an environment variable." ) model = InferenceClientModel( model_id="deepseek-ai/DeepSeek-V3.1-Terminus", token=hf_token, max_tokens=4096 # Увеличено для больших промптов и JSON ) final_tool = FinalAnswerTool() # Агент 1: Парсинг логов parser_agent = ToolCallingAgent( model=model, tools=[final_tool], instructions=""" Ты эксперт по анализу системных логов. Твоя задача - парсить сырые логи и преобразовывать их в структурированный JSON формат. ВАЖНО: Твой ответ должен быть ПОЛНЫМ и ЗАВЕРШЁННЫМ JSON объектом. Не обрезай ответ! Для каждой строки лога определи: - timestamp: временная метка (если есть) - level: уровень логирования (INFO, WARNING, ERROR, CRITICAL, DEBUG, TRACE) - message: основное сообщение - type: тип события (SYSTEM, HTTP_REQUEST, DATABASE, AUTHENTICATION, CONNECTION, EXCEPTION, GENERAL) Также определи: - errors: список всех событий с уровнем ERROR или CRITICAL (полные объекты событий) - warnings: список всех событий с уровнем WARNING (полные объекты событий) - statistics: статистика с total_lines, parsed_events, errors, warnings, info_messages, event_types, time_range Временной диапазон (time_range) должен содержать start и end - первую и последнюю временную метку. Ответ строго верни в JSON через final_answer в следующем формате (ОБЯЗАТЕЛЬНО ПОЛНЫЙ JSON): { "events": [{"line_number": int, "timestamp": "str|null", "level": "str", "message": "str", "type": "str"}, ...], "errors": [{"line_number": int, "timestamp": "str|null", "level": "str", "message": "str", "type": "str"}, ...], "warnings": [{"line_number": int, "timestamp": "str|null", "level": "str", "message": "str", "type": "str"}, ...], "statistics": { "total_lines": int, "parsed_events": int, "errors": int, "warnings": int, "info_messages": int, "event_types": {"TYPE": count, ...}, "time_range": {"start": "str|null", "end": "str|null"} | null } } """, name="LogParserAgent", max_steps=10, ) # Агент 2: Обнаружение аномалий anomaly_agent = ToolCallingAgent( model=model, tools=[final_tool], instructions=""" Ты эксперт по обнаружению аномалий в системных логах. Твоя задача - анализировать структурированные логи и находить проблемные паттерны. Ищи следующие типы аномалий: 1. BURST_ERRORS - всплески ошибок (5+ ошибок за короткий промежуток времени) 2. REPEATED_ERRORS - повторяющиеся ошибки (одна и та же ошибка 3+ раза) 3. ERROR_BEFORE_CRASH - ошибки перед крашем системы (критические ошибки в конце логов) 4. TEMPORAL_SPIKE - временные всплески событий (превышение среднего в 2 раза) 5. REPEATED_STACK_TRACES - повторяющиеся stack traces Для каждой аномалии укажи: - type: тип аномалии - severity: CRITICAL, HIGH, MEDIUM, LOW - description: описание проблемы - count: количество вхождений (если применимо) - error_message: пример сообщения об ошибке (если есть) - metadata: дополнительная информация (affected_lines и т.д.) Также вычисли статистику: - total: общее количество аномалий - by_type: распределение по типам - by_severity: распределение по серьёзности Ответ строго верни в JSON через final_answer в следующем формате: { "anomalies": [{"type": "str", "severity": "str", "description": "str", "count": int|null, "error_message": "str|null", "metadata": object|null}, ...], "statistics": { "total": int, "by_type": {"TYPE": count, ...}, "by_severity": {"SEVERITY": count, ...} }, "severity_summary": {"CRITICAL": int, "HIGH": int, "MEDIUM": int, "LOW": int} } """, name="AnomalyDetectionAgent", max_steps=10, ) # Агент 3: Анализ первопричин и рекомендации rca_agent = ToolCallingAgent( model=model, tools=[final_tool], instructions=""" Ты эксперт по анализу первопричин и формированию рекомендаций. Твоя задача - интерпретировать обнаруженные аномалии и предлагать конкретные решения. Для каждой группы аномалий определи: - possible_causes: возможные первопричины проблемы - recommendations: рекомендации по устранению с приоритетами (CRITICAL, HIGH, MEDIUM, LOW) - actions: конкретные действия для решения проблемы Также сформулируй общие рекомендации для улучшения системы мониторинга и предотвращения подобных проблем. Ответ строго верни в JSON через final_answer в следующем формате: { "analysis": { "root_causes": ["причина1", "причина2", ...], "details": [{ "anomaly_type": "str", "severity": "str", "description": "str", "possible_causes": ["причина1", ...] }, ...] }, "recommendations": [{ "priority": "str", "text": "str", "actions": ["действие1", "действие2", ...] }, ...], "general_recommendations": ["рекомендация1", "рекомендация2", ...] } """, name="RootCauseAgent", max_steps=10, ) # Pipeline шаг 1 — парсинг логов def run_parser_agent(raw_logs: str): """Парсит сырые логи в структурированный формат.""" if not raw_logs or not raw_logs.strip(): return { "events": [], "errors": [], "warnings": [], "statistics": { "total_lines": 0, "parsed_events": 0, "errors": 0, "warnings": 0, "info_messages": 0, "event_types": {}, "time_range": None } } result = run_agent_safely(parser_agent, task=raw_logs) validate_schema(result, parser_schema) return result # Pipeline шаг 2 — обнаружение аномалий def run_anomaly_agent(structured_data: dict): """Обнаруживает аномалии в структурированных логах.""" input_data = json.dumps(structured_data, ensure_ascii=False, indent=2) result = run_agent_safely(anomaly_agent, task=input_data) validate_schema(result, anomaly_schema) return result # Pipeline шаг 3 — анализ первопричин и рекомендации def run_rca_agent(anomaly_report: dict): """Анализирует первопричины и генерирует рекомендации.""" input_data = json.dumps(anomaly_report, ensure_ascii=False, indent=2) result = run_agent_safely(rca_agent, task=input_data) validate_schema(result, rca_schema) return result # Агент 4: Генерация промпта для GPT gpt_prompt_agent = ToolCallingAgent( model=model, tools=[final_tool], instructions=""" Ты эксперт по созданию детальных промптов для GPT-моделей. Твоя задача - создать готовый промпт для анализа проблем на основе структурированных данных о логах, аномалиях и рекомендациях. Промпт должен быть структурированным и содержать: 1. Контекст проблемы - общее описание ситуации 2. Информация о системе - статистика, временные диапазоны, типы событий 3. Обнаруженные проблемы - детальное описание аномалий с приоритетами 4. Статистика и метрики - количественные показатели 5. Примеры ошибок - ключевые ошибки из логов 6. Предварительный анализ - рекомендации от предыдущих агентов (если есть) 7. Запрос на решение - конкретные вопросы для GPT Промпт должен быть готов к использованию - его можно скопировать и вставить в ChatGPT, Claude или другую GPT-модель. Ответ верни как обычный текст (не JSON), используя final_answer. Это должен быть готовый промпт на русском языке в формате Markdown. """, name="GPTPromptAgent", max_steps=10, ) # Pipeline шаг 4 — генерация промпта для GPT def run_gpt_prompt_agent(structured_data: dict, anomaly_report: dict, recommendations: str = None): """Генерирует промпт для GPT на основе всех данных анализа.""" # Подготавливаем входные данные для агента input_data = { "structured_data": structured_data, "anomaly_report": anomaly_report, "recommendations": recommendations } input_json = json.dumps(input_data, ensure_ascii=False, indent=2) result = run_agent_safely(gpt_prompt_agent, task=input_json, return_string=True) # Результат должен быть строкой (промпт) if isinstance(result, str): return result elif isinstance(result, dict): # Если вернулся словарь, попробуем извлечь текст if "answer" in result: return result["answer"] elif "prompt" in result: return result["prompt"] else: return json.dumps(result, ensure_ascii=False, indent=2) else: return str(result) __all__ = [ 'run_parser_agent', 'run_anomaly_agent', 'run_rca_agent', 'run_gpt_prompt_agent', ]