MultiAgentLogsAnalyze / agents /rca_agent.py
PatrickRedStar's picture
add
d6f4b44
raw
history blame
18.1 kB
"""
Agent 3: Root Cause & Recommendation Agent
Интерпретирует аномалии и формирует рекомендации.
"""
from typing import Dict, List, Any
import json
class RootCauseAgent:
"""Анализирует аномалии и генерирует рекомендации."""
def __init__(self):
"""Инициализация агента."""
self.root_cause_templates = self._init_root_cause_templates()
self.recommendation_templates = self._init_recommendation_templates()
def analyze(self, anomaly_report: Dict[str, Any]) -> str:
"""
Анализирует отчёт об аномалиях и генерирует рекомендации.
Args:
anomaly_report: Отчёт об аномалиях от AnomalyDetectionAgent
Returns:
Markdown-текст с анализом и рекомендациями
"""
if not anomaly_report or not anomaly_report.get('anomalies'):
return self._generate_no_anomalies_report()
anomalies = anomaly_report.get('anomalies', [])
statistics = anomaly_report.get('statistics', {})
severity_summary = anomaly_report.get('severity_summary', {})
# Генерация отчёта
report_parts = []
# Заголовок
report_parts.append("# Анализ первопричин и рекомендации\n")
report_parts.append(f"**Обнаружено аномалий:** {statistics.get('total', 0)}\n")
# Сводка по серьёзности
if severity_summary:
report_parts.append("\n## Сводка по уровням серьёзности\n")
severity_order = ['CRITICAL', 'HIGH', 'MEDIUM', 'LOW']
for severity in severity_order:
count = severity_summary.get(severity, 0)
if count > 0:
emoji = self._get_severity_emoji(severity)
report_parts.append(f"- {emoji} **{severity}:** {count}\n")
# Группировка аномалий по типам
anomalies_by_type = {}
for anomaly in anomalies:
anomaly_type = anomaly.get('type', 'UNKNOWN')
if anomaly_type not in anomalies_by_type:
anomalies_by_type[anomaly_type] = []
anomalies_by_type[anomaly_type].append(anomaly)
# Анализ каждого типа аномалий
report_parts.append("\n## Детальный анализ аномалий\n")
for anomaly_type, type_anomalies in anomalies_by_type.items():
report_parts.append(f"\n### {self._get_anomaly_type_name(anomaly_type)}\n")
# Анализ первопричин
root_causes = self._identify_root_causes(anomaly_type, type_anomalies)
if root_causes:
report_parts.append("#### Возможные первопричины:\n")
for i, cause in enumerate(root_causes, 1):
report_parts.append(f"{i}. {cause}\n")
# Детали аномалий
report_parts.append("\n#### Детали:\n")
for i, anomaly in enumerate(type_anomalies[:5], 1): # Показываем до 5 примеров
severity = anomaly.get('severity', 'UNKNOWN')
description = anomaly.get('description', 'Без описания')
report_parts.append(f"**Аномалия {i}** ({severity}):\n")
report_parts.append(f"- {description}\n")
# Дополнительная информация
if anomaly.get('count'):
report_parts.append(f"- Количество: {anomaly.get('count')}\n")
if anomaly.get('error_message'):
error_msg = anomaly.get('error_message', '')[:150]
report_parts.append(f"- Сообщение: `{error_msg}`\n")
if anomaly.get('metadata'):
metadata = anomaly.get('metadata', {})
if metadata.get('affected_lines'):
lines = metadata.get('affected_lines', [])[:5]
report_parts.append(f"- Затронутые строки: {', '.join(map(str, lines))}\n")
if len(type_anomalies) > 5:
report_parts.append(f"\n*... и ещё {len(type_anomalies) - 5} аномалий этого типа*\n")
# Рекомендации
report_parts.append("\n## Рекомендации по устранению\n")
recommendations = self._generate_recommendations(anomalies)
for i, recommendation in enumerate(recommendations, 1):
priority = recommendation.get('priority', 'MEDIUM')
emoji = self._get_priority_emoji(priority)
report_parts.append(f"\n### {emoji} Рекомендация {i} (Приоритет: {priority})\n")
report_parts.append(f"{recommendation.get('text', '')}\n")
if recommendation.get('actions'):
report_parts.append("**Конкретные действия:**\n")
for action in recommendation.get('actions', []):
report_parts.append(f"- {action}\n")
# Общие рекомендации
report_parts.append("\n## Общие рекомендации\n")
general_recommendations = self._generate_general_recommendations(anomalies, statistics)
for rec in general_recommendations:
report_parts.append(f"- {rec}\n")
return ''.join(report_parts)
def _identify_root_causes(self, anomaly_type: str, anomalies: List[Dict]) -> List[str]:
"""Определяет возможные первопричины для типа аномалий."""
causes = []
if anomaly_type == 'BURST_ERRORS':
causes.extend([
"Внезапная перегрузка системы или внешнего сервиса",
"Сбой в инфраструктуре (сеть, база данных, диск)",
"Проблемы с зависимыми сервисами или API",
"Некорректное обновление или развертывание кода"
])
elif anomaly_type == 'REPEATED_ERRORS':
causes.extend([
"Проблема в коде, которая воспроизводится при определённых условиях",
"Недостаточная обработка ошибок в цикле или повторяющемся процессе",
"Проблема конфигурации, влияющая на конкретную функциональность",
"Ресурсные ограничения (память, диск, соединения)"
])
elif anomaly_type == 'ERROR_BEFORE_CRASH':
causes.extend([
"Критическая ошибка, приводящая к падению процесса",
"Исчерпание ресурсов (память, дескрипторы файлов)",
"Некорректное состояние приложения после длительной работы",
"Проблемы с внешними зависимостями"
])
elif anomaly_type == 'TEMPORAL_SPIKE':
causes.extend([
"Плановые задачи (cron jobs, scheduled tasks)",
"Резкое увеличение нагрузки от пользователей",
"Внешние события, вызывающие массовые запросы",
"Проблемы с кэшированием или сессиями"
])
elif anomaly_type == 'REPEATED_STACK_TRACES':
causes.extend([
"Необработанное исключение в часто вызываемом коде",
"Проблема в библиотеке или зависимостях",
"Некорректные входные данные, вызывающие исключение",
"Race condition или проблема конкурентности"
])
else:
causes.append("Требуется дополнительный анализ для определения первопричины")
return causes
def _generate_recommendations(self, anomalies: List[Dict]) -> List[Dict[str, Any]]:
"""Генерирует рекомендации на основе обнаруженных аномалий."""
recommendations = []
# Группировка по типам для приоритизации
anomaly_types = [a.get('type') for a in anomalies]
severities = [a.get('severity') for a in anomalies]
has_critical = any(s == 'CRITICAL' for s in severities)
has_high = any(s == 'HIGH' for s in severities)
has_burst = 'BURST_ERRORS' in anomaly_types
has_crash = 'ERROR_BEFORE_CRASH' in anomaly_types
# Критические рекомендации
if has_crash:
recommendations.append({
'priority': 'CRITICAL',
'text': 'Обнаружены признаки возможного краша системы. Требуется немедленное внимание.',
'actions': [
'Проверить состояние системы и процессов',
'Проанализировать последние ошибки перед крашем',
'Убедиться, что мониторинг и алертинг настроены корректно',
'Рассмотреть возможность отката последних изменений'
]
})
if has_burst:
recommendations.append({
'priority': 'HIGH',
'text': 'Обнаружены всплески ошибок. Необходимо определить источник нагрузки.',
'actions': [
'Проверить метрики нагрузки (CPU, память, сеть)',
'Изучить логи зависимых сервисов',
'Проверить состояние базы данных и внешних API',
'Рассмотреть возможность масштабирования или rate limiting'
]
})
# Рекомендации по повторяющимся ошибкам
if 'REPEATED_ERRORS' in anomaly_types:
recommendations.append({
'priority': 'HIGH',
'text': 'Обнаружены повторяющиеся ошибки. Требуется исправление в коде или конфигурации.',
'actions': [
'Идентифицировать конкретный участок кода, вызывающий ошибку',
'Добавить более детальное логирование для отладки',
'Улучшить обработку ошибок с логированием контекста',
'Провести code review проблемного участка'
]
})
# Рекомендации по stack traces
if 'REPEATED_STACK_TRACES' in anomaly_types:
recommendations.append({
'priority': 'MEDIUM',
'text': 'Обнаружены повторяющиеся stack traces. Необходимо исправить необработанные исключения.',
'actions': [
'Найти и исправить источник исключения',
'Добавить обработку исключений (try-except блоки)',
'Улучшить валидацию входных данных',
'Обновить проблемные библиотеки или зависимости'
]
})
# Общие рекомендации по мониторингу
if has_high or has_critical:
recommendations.append({
'priority': 'MEDIUM',
'text': 'Улучшить систему мониторинга и алертинга для раннего обнаружения проблем.',
'actions': [
'Настроить алерты на критические ошибки',
'Внедрить мониторинг метрик производительности',
'Настроить дашборды для визуализации состояния системы',
'Реализовать автоматические проверки здоровья (health checks)'
]
})
return recommendations
def _generate_general_recommendations(self, anomalies: List[Dict], statistics: Dict) -> List[str]:
"""Генерирует общие рекомендации."""
recommendations = []
total = statistics.get('total', 0)
if total == 0:
return ["Логи не содержат аномалий. Система работает стабильно."]
recommendations.append("Регулярно проверяйте логи на наличие паттернов и трендов")
recommendations.append("Настройте автоматическое уведомление о критических ошибках")
if total > 10:
recommendations.append("Обнаружено значительное количество аномалий - рекомендуется провести комплексный анализ системы")
recommendations.append("Ведите документацию по известным проблемам и их решениям")
recommendations.append("Рассмотрите возможность внедрения централизованного логирования (ELK, Splunk и т.д.)")
return recommendations
def _get_anomaly_type_name(self, anomaly_type: str) -> str:
"""Возвращает читаемое название типа аномалии."""
names = {
'BURST_ERRORS': 'Всплески ошибок',
'REPEATED_ERRORS': 'Повторяющиеся ошибки',
'ERROR_BEFORE_CRASH': 'Ошибки перед крашем',
'TEMPORAL_SPIKE': 'Временные всплески',
'REPEATED_STACK_TRACES': 'Повторяющиеся stack traces'
}
return names.get(anomaly_type, anomaly_type)
def _get_severity_emoji(self, severity: str) -> str:
"""Возвращает emoji для уровня серьёзности."""
emoji_map = {
'CRITICAL': '🔴',
'HIGH': '🟠',
'MEDIUM': '🟡',
'LOW': '🟢'
}
return emoji_map.get(severity, '⚪')
def _get_priority_emoji(self, priority: str) -> str:
"""Возвращает emoji для приоритета."""
emoji_map = {
'CRITICAL': '🔴',
'HIGH': '🟠',
'MEDIUM': '🟡',
'LOW': '🟢'
}
return emoji_map.get(priority, '⚪')
def _init_root_cause_templates(self) -> Dict[str, List[str]]:
"""Инициализирует шаблоны первопричин."""
return {}
def _init_recommendation_templates(self) -> Dict[str, List[str]]:
"""Инициализирует шаблоны рекомендаций."""
return {}
def _generate_no_anomalies_report(self) -> str:
"""Генерирует отчёт, когда аномалий не обнаружено."""
return """# Анализ первопричин и рекомендации
## Результаты анализа
**Обнаружено аномалий:** 0
✅ **Система работает стабильно.** В логах не обнаружено значительных аномалий или паттернов, указывающих на проблемы.
### Общие рекомендации
- Продолжайте регулярный мониторинг логов
- Поддерживайте текущий уровень логирования
- Настройте автоматические проверки для раннего обнаружения проблем
- Регулярно просматривайте метрики производительности
"""