|
|
""" |
|
|
Агенты для анализа логов на основе smolagents. |
|
|
Используют трансформеры для интеллектуального анализа. |
|
|
""" |
|
|
|
|
|
import os |
|
|
import json |
|
|
from smolagents import ToolCallingAgent, InferenceClientModel, FinalAnswerTool |
|
|
from schemas.schemas import parser_schema, anomaly_schema, rca_schema |
|
|
from utils.validation import validate_schema |
|
|
from utils.agent_runner import run_agent_safely |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
hf_token = os.getenv("HF_TOKEN") |
|
|
if not hf_token: |
|
|
raise ValueError( |
|
|
"HF_TOKEN environment variable is not set. " |
|
|
"Please set it in Hugging Face Spaces secrets or as an environment variable." |
|
|
) |
|
|
|
|
|
model = InferenceClientModel( |
|
|
model_id="deepseek-ai/DeepSeek-V3.1-Terminus", |
|
|
token=hf_token, |
|
|
max_tokens=4096 |
|
|
) |
|
|
|
|
|
final_tool = FinalAnswerTool() |
|
|
|
|
|
|
|
|
parser_agent = ToolCallingAgent( |
|
|
model=model, |
|
|
tools=[final_tool], |
|
|
instructions=""" |
|
|
Ты эксперт по анализу системных логов. Твоя задача - парсить сырые логи и преобразовывать их в структурированный JSON формат. |
|
|
|
|
|
ВАЖНО: Твой ответ должен быть ПОЛНЫМ и ЗАВЕРШЁННЫМ JSON объектом. Не обрезай ответ! |
|
|
|
|
|
Для каждой строки лога определи: |
|
|
- timestamp: временная метка (если есть) |
|
|
- level: уровень логирования (INFO, WARNING, ERROR, CRITICAL, DEBUG, TRACE) |
|
|
- message: основное сообщение |
|
|
- type: тип события (SYSTEM, HTTP_REQUEST, DATABASE, AUTHENTICATION, CONNECTION, EXCEPTION, GENERAL) |
|
|
|
|
|
Также определи: |
|
|
- errors: список всех событий с уровнем ERROR или CRITICAL (полные объекты событий) |
|
|
- warnings: список всех событий с уровнем WARNING (полные объекты событий) |
|
|
- statistics: статистика с total_lines, parsed_events, errors, warnings, info_messages, event_types, time_range |
|
|
|
|
|
Временной диапазон (time_range) должен содержать start и end - первую и последнюю временную метку. |
|
|
|
|
|
Ответ строго верни в JSON через final_answer в следующем формате (ОБЯЗАТЕЛЬНО ПОЛНЫЙ JSON): |
|
|
{ |
|
|
"events": [{"line_number": int, "timestamp": "str|null", "level": "str", "message": "str", "type": "str"}, ...], |
|
|
"errors": [{"line_number": int, "timestamp": "str|null", "level": "str", "message": "str", "type": "str"}, ...], |
|
|
"warnings": [{"line_number": int, "timestamp": "str|null", "level": "str", "message": "str", "type": "str"}, ...], |
|
|
"statistics": { |
|
|
"total_lines": int, |
|
|
"parsed_events": int, |
|
|
"errors": int, |
|
|
"warnings": int, |
|
|
"info_messages": int, |
|
|
"event_types": {"TYPE": count, ...}, |
|
|
"time_range": {"start": "str|null", "end": "str|null"} | null |
|
|
} |
|
|
} |
|
|
""", |
|
|
name="LogParserAgent", |
|
|
max_steps=10, |
|
|
) |
|
|
|
|
|
|
|
|
anomaly_agent = ToolCallingAgent( |
|
|
model=model, |
|
|
tools=[final_tool], |
|
|
instructions=""" |
|
|
Ты эксперт по обнаружению аномалий в системных логах. Твоя задача - анализировать структурированные логи и находить проблемные паттерны. |
|
|
|
|
|
Ищи следующие типы аномалий: |
|
|
1. BURST_ERRORS - всплески ошибок (5+ ошибок за короткий промежуток времени) |
|
|
2. REPEATED_ERRORS - повторяющиеся ошибки (одна и та же ошибка 3+ раза) |
|
|
3. ERROR_BEFORE_CRASH - ошибки перед крашем системы (критические ошибки в конце логов) |
|
|
4. TEMPORAL_SPIKE - временные всплески событий (превышение среднего в 2 раза) |
|
|
5. REPEATED_STACK_TRACES - повторяющиеся stack traces |
|
|
|
|
|
Для каждой аномалии укажи: |
|
|
- type: тип аномалии |
|
|
- severity: CRITICAL, HIGH, MEDIUM, LOW |
|
|
- description: описание проблемы |
|
|
- count: количество вхождений (если применимо) |
|
|
- error_message: пример сообщения об ошибке (если есть) |
|
|
- metadata: дополнительная информация (affected_lines и т.д.) |
|
|
|
|
|
Также вычисли статистику: |
|
|
- total: общее количество аномалий |
|
|
- by_type: распределение по типам |
|
|
- by_severity: распределение по серьёзности |
|
|
|
|
|
Ответ строго верни в JSON через final_answer в следующем формате: |
|
|
{ |
|
|
"anomalies": [{"type": "str", "severity": "str", "description": "str", "count": int|null, "error_message": "str|null", "metadata": object|null}, ...], |
|
|
"statistics": { |
|
|
"total": int, |
|
|
"by_type": {"TYPE": count, ...}, |
|
|
"by_severity": {"SEVERITY": count, ...} |
|
|
}, |
|
|
"severity_summary": {"CRITICAL": int, "HIGH": int, "MEDIUM": int, "LOW": int} |
|
|
} |
|
|
""", |
|
|
name="AnomalyDetectionAgent", |
|
|
max_steps=10, |
|
|
) |
|
|
|
|
|
|
|
|
rca_agent = ToolCallingAgent( |
|
|
model=model, |
|
|
tools=[final_tool], |
|
|
instructions=""" |
|
|
Ты эксперт по анализу первопричин и формированию рекомендаций. Твоя задача - интерпретировать обнаруженные аномалии и предлагать конкретные решения. |
|
|
|
|
|
Для каждой группы аномалий определи: |
|
|
- possible_causes: возможные первопричины проблемы |
|
|
- recommendations: рекомендации по устранению с приоритетами (CRITICAL, HIGH, MEDIUM, LOW) |
|
|
- actions: конкретные действия для решения проблемы |
|
|
|
|
|
Также сформулируй общие рекомендации для улучшения системы мониторинга и предотвращения подобных проблем. |
|
|
|
|
|
Ответ строго верни в JSON через final_answer в следующем формате: |
|
|
{ |
|
|
"analysis": { |
|
|
"root_causes": ["причина1", "причина2", ...], |
|
|
"details": [{ |
|
|
"anomaly_type": "str", |
|
|
"severity": "str", |
|
|
"description": "str", |
|
|
"possible_causes": ["причина1", ...] |
|
|
}, ...] |
|
|
}, |
|
|
"recommendations": [{ |
|
|
"priority": "str", |
|
|
"text": "str", |
|
|
"actions": ["действие1", "действие2", ...] |
|
|
}, ...], |
|
|
"general_recommendations": ["рекомендация1", "рекомендация2", ...] |
|
|
} |
|
|
""", |
|
|
name="RootCauseAgent", |
|
|
max_steps=10, |
|
|
) |
|
|
|
|
|
|
|
|
def run_parser_agent(raw_logs: str): |
|
|
"""Парсит сырые логи в структурированный формат.""" |
|
|
if not raw_logs or not raw_logs.strip(): |
|
|
return { |
|
|
"events": [], |
|
|
"errors": [], |
|
|
"warnings": [], |
|
|
"statistics": { |
|
|
"total_lines": 0, |
|
|
"parsed_events": 0, |
|
|
"errors": 0, |
|
|
"warnings": 0, |
|
|
"info_messages": 0, |
|
|
"event_types": {}, |
|
|
"time_range": None |
|
|
} |
|
|
} |
|
|
|
|
|
result = run_agent_safely(parser_agent, task=raw_logs) |
|
|
validate_schema(result, parser_schema) |
|
|
return result |
|
|
|
|
|
|
|
|
def run_anomaly_agent(structured_data: dict): |
|
|
"""Обнаруживает аномалии в структурированных логах.""" |
|
|
input_data = json.dumps(structured_data, ensure_ascii=False, indent=2) |
|
|
result = run_agent_safely(anomaly_agent, task=input_data) |
|
|
validate_schema(result, anomaly_schema) |
|
|
return result |
|
|
|
|
|
|
|
|
def run_rca_agent(anomaly_report: dict): |
|
|
"""Анализирует первопричины и генерирует рекомендации.""" |
|
|
input_data = json.dumps(anomaly_report, ensure_ascii=False, indent=2) |
|
|
result = run_agent_safely(rca_agent, task=input_data) |
|
|
validate_schema(result, rca_schema) |
|
|
return result |
|
|
|
|
|
|
|
|
gpt_prompt_agent = ToolCallingAgent( |
|
|
model=model, |
|
|
tools=[final_tool], |
|
|
instructions=""" |
|
|
Ты эксперт по созданию детальных промптов для GPT-моделей. Твоя задача - создать готовый промпт для анализа проблем на основе структурированных данных о логах, аномалиях и рекомендациях. |
|
|
|
|
|
Промпт должен быть структурированным и содержать: |
|
|
1. Контекст проблемы - общее описание ситуации |
|
|
2. Информация о системе - статистика, временные диапазоны, типы событий |
|
|
3. Обнаруженные проблемы - детальное описание аномалий с приоритетами |
|
|
4. Статистика и метрики - количественные показатели |
|
|
5. Примеры ошибок - ключевые ошибки из логов |
|
|
6. Предварительный анализ - рекомендации от предыдущих агентов (если есть) |
|
|
7. Запрос на решение - конкретные вопросы для GPT |
|
|
|
|
|
Промпт должен быть готов к использованию - его можно скопировать и вставить в ChatGPT, Claude или другую GPT-модель. |
|
|
|
|
|
Ответ верни как обычный текст (не JSON), используя final_answer. Это должен быть готовый промпт на русском языке в формате Markdown. |
|
|
""", |
|
|
name="GPTPromptAgent", |
|
|
max_steps=10, |
|
|
) |
|
|
|
|
|
|
|
|
def run_gpt_prompt_agent(structured_data: dict, anomaly_report: dict, recommendations: str = None): |
|
|
"""Генерирует промпт для GPT на основе всех данных анализа.""" |
|
|
|
|
|
input_data = { |
|
|
"structured_data": structured_data, |
|
|
"anomaly_report": anomaly_report, |
|
|
"recommendations": recommendations |
|
|
} |
|
|
input_json = json.dumps(input_data, ensure_ascii=False, indent=2) |
|
|
|
|
|
result = run_agent_safely(gpt_prompt_agent, task=input_json, return_string=True) |
|
|
|
|
|
|
|
|
if isinstance(result, str): |
|
|
return result |
|
|
elif isinstance(result, dict): |
|
|
|
|
|
if "answer" in result: |
|
|
return result["answer"] |
|
|
elif "prompt" in result: |
|
|
return result["prompt"] |
|
|
else: |
|
|
return json.dumps(result, ensure_ascii=False, indent=2) |
|
|
else: |
|
|
return str(result) |
|
|
|
|
|
__all__ = [ |
|
|
'run_parser_agent', |
|
|
'run_anomaly_agent', |
|
|
'run_rca_agent', |
|
|
'run_gpt_prompt_agent', |
|
|
] |
|
|
|