| """
|
| Agent 3: Root Cause & Recommendation Agent
|
| Интерпретирует аномалии и формирует рекомендации.
|
| """
|
|
|
| from typing import Dict, List, Any
|
| import json
|
|
|
|
|
| class RootCauseAgent:
|
| """Анализирует аномалии и генерирует рекомендации."""
|
|
|
| def __init__(self):
|
| """Инициализация агента."""
|
| self.root_cause_templates = self._init_root_cause_templates()
|
| self.recommendation_templates = self._init_recommendation_templates()
|
|
|
| def analyze(self, anomaly_report: Dict[str, Any]) -> str:
|
| """
|
| Анализирует отчёт об аномалиях и генерирует рекомендации.
|
|
|
| Args:
|
| anomaly_report: Отчёт об аномалиях от AnomalyDetectionAgent
|
|
|
| Returns:
|
| Markdown-текст с анализом и рекомендациями
|
| """
|
| if not anomaly_report or not anomaly_report.get('anomalies'):
|
| return self._generate_no_anomalies_report()
|
|
|
| anomalies = anomaly_report.get('anomalies', [])
|
| statistics = anomaly_report.get('statistics', {})
|
| severity_summary = anomaly_report.get('severity_summary', {})
|
|
|
|
|
| report_parts = []
|
|
|
|
|
| report_parts.append("# Анализ первопричин и рекомендации\n")
|
| report_parts.append(f"**Обнаружено аномалий:** {statistics.get('total', 0)}\n")
|
|
|
|
|
| if severity_summary:
|
| report_parts.append("\n## Сводка по уровням серьёзности\n")
|
| severity_order = ['CRITICAL', 'HIGH', 'MEDIUM', 'LOW']
|
| for severity in severity_order:
|
| count = severity_summary.get(severity, 0)
|
| if count > 0:
|
| emoji = self._get_severity_emoji(severity)
|
| report_parts.append(f"- {emoji} **{severity}:** {count}\n")
|
|
|
|
|
| anomalies_by_type = {}
|
| for anomaly in anomalies:
|
| anomaly_type = anomaly.get('type', 'UNKNOWN')
|
| if anomaly_type not in anomalies_by_type:
|
| anomalies_by_type[anomaly_type] = []
|
| anomalies_by_type[anomaly_type].append(anomaly)
|
|
|
|
|
| report_parts.append("\n## Детальный анализ аномалий\n")
|
|
|
| for anomaly_type, type_anomalies in anomalies_by_type.items():
|
| report_parts.append(f"\n### {self._get_anomaly_type_name(anomaly_type)}\n")
|
|
|
|
|
| root_causes = self._identify_root_causes(anomaly_type, type_anomalies)
|
| if root_causes:
|
| report_parts.append("#### Возможные первопричины:\n")
|
| for i, cause in enumerate(root_causes, 1):
|
| report_parts.append(f"{i}. {cause}\n")
|
|
|
|
|
| report_parts.append("\n#### Детали:\n")
|
| for i, anomaly in enumerate(type_anomalies[:5], 1):
|
| severity = anomaly.get('severity', 'UNKNOWN')
|
| description = anomaly.get('description', 'Без описания')
|
| report_parts.append(f"**Аномалия {i}** ({severity}):\n")
|
| report_parts.append(f"- {description}\n")
|
|
|
|
|
| if anomaly.get('count'):
|
| report_parts.append(f"- Количество: {anomaly.get('count')}\n")
|
| if anomaly.get('error_message'):
|
| error_msg = anomaly.get('error_message', '')[:150]
|
| report_parts.append(f"- Сообщение: `{error_msg}`\n")
|
| if anomaly.get('metadata'):
|
| metadata = anomaly.get('metadata', {})
|
| if metadata.get('affected_lines'):
|
| lines = metadata.get('affected_lines', [])[:5]
|
| report_parts.append(f"- Затронутые строки: {', '.join(map(str, lines))}\n")
|
|
|
| if len(type_anomalies) > 5:
|
| report_parts.append(f"\n*... и ещё {len(type_anomalies) - 5} аномалий этого типа*\n")
|
|
|
|
|
| report_parts.append("\n## Рекомендации по устранению\n")
|
| recommendations = self._generate_recommendations(anomalies)
|
|
|
| for i, recommendation in enumerate(recommendations, 1):
|
| priority = recommendation.get('priority', 'MEDIUM')
|
| emoji = self._get_priority_emoji(priority)
|
| report_parts.append(f"\n### {emoji} Рекомендация {i} (Приоритет: {priority})\n")
|
| report_parts.append(f"{recommendation.get('text', '')}\n")
|
|
|
| if recommendation.get('actions'):
|
| report_parts.append("**Конкретные действия:**\n")
|
| for action in recommendation.get('actions', []):
|
| report_parts.append(f"- {action}\n")
|
|
|
|
|
| report_parts.append("\n## Общие рекомендации\n")
|
| general_recommendations = self._generate_general_recommendations(anomalies, statistics)
|
| for rec in general_recommendations:
|
| report_parts.append(f"- {rec}\n")
|
|
|
| return ''.join(report_parts)
|
|
|
| def _identify_root_causes(self, anomaly_type: str, anomalies: List[Dict]) -> List[str]:
|
| """Определяет возможные первопричины для типа аномалий."""
|
| causes = []
|
|
|
| if anomaly_type == 'BURST_ERRORS':
|
| causes.extend([
|
| "Внезапная перегрузка системы или внешнего сервиса",
|
| "Сбой в инфраструктуре (сеть, база данных, диск)",
|
| "Проблемы с зависимыми сервисами или API",
|
| "Некорректное обновление или развертывание кода"
|
| ])
|
| elif anomaly_type == 'REPEATED_ERRORS':
|
| causes.extend([
|
| "Проблема в коде, которая воспроизводится при определённых условиях",
|
| "Недостаточная обработка ошибок в цикле или повторяющемся процессе",
|
| "Проблема конфигурации, влияющая на конкретную функциональность",
|
| "Ресурсные ограничения (память, диск, соединения)"
|
| ])
|
| elif anomaly_type == 'ERROR_BEFORE_CRASH':
|
| causes.extend([
|
| "Критическая ошибка, приводящая к падению процесса",
|
| "Исчерпание ресурсов (память, дескрипторы файлов)",
|
| "Некорректное состояние приложения после длительной работы",
|
| "Проблемы с внешними зависимостями"
|
| ])
|
| elif anomaly_type == 'TEMPORAL_SPIKE':
|
| causes.extend([
|
| "Плановые задачи (cron jobs, scheduled tasks)",
|
| "Резкое увеличение нагрузки от пользователей",
|
| "Внешние события, вызывающие массовые запросы",
|
| "Проблемы с кэшированием или сессиями"
|
| ])
|
| elif anomaly_type == 'REPEATED_STACK_TRACES':
|
| causes.extend([
|
| "Необработанное исключение в часто вызываемом коде",
|
| "Проблема в библиотеке или зависимостях",
|
| "Некорректные входные данные, вызывающие исключение",
|
| "Race condition или проблема конкурентности"
|
| ])
|
| else:
|
| causes.append("Требуется дополнительный анализ для определения первопричины")
|
|
|
| return causes
|
|
|
| def _generate_recommendations(self, anomalies: List[Dict]) -> List[Dict[str, Any]]:
|
| """Генерирует рекомендации на основе обнаруженных аномалий."""
|
| recommendations = []
|
|
|
|
|
| anomaly_types = [a.get('type') for a in anomalies]
|
| severities = [a.get('severity') for a in anomalies]
|
|
|
| has_critical = any(s == 'CRITICAL' for s in severities)
|
| has_high = any(s == 'HIGH' for s in severities)
|
| has_burst = 'BURST_ERRORS' in anomaly_types
|
| has_crash = 'ERROR_BEFORE_CRASH' in anomaly_types
|
|
|
|
|
| if has_crash:
|
| recommendations.append({
|
| 'priority': 'CRITICAL',
|
| 'text': 'Обнаружены признаки возможного краша системы. Требуется немедленное внимание.',
|
| 'actions': [
|
| 'Проверить состояние системы и процессов',
|
| 'Проанализировать последние ошибки перед крашем',
|
| 'Убедиться, что мониторинг и алертинг настроены корректно',
|
| 'Рассмотреть возможность отката последних изменений'
|
| ]
|
| })
|
|
|
| if has_burst:
|
| recommendations.append({
|
| 'priority': 'HIGH',
|
| 'text': 'Обнаружены всплески ошибок. Необходимо определить источник нагрузки.',
|
| 'actions': [
|
| 'Проверить метрики нагрузки (CPU, память, сеть)',
|
| 'Изучить логи зависимых сервисов',
|
| 'Проверить состояние базы данных и внешних API',
|
| 'Рассмотреть возможность масштабирования или rate limiting'
|
| ]
|
| })
|
|
|
|
|
| if 'REPEATED_ERRORS' in anomaly_types:
|
| recommendations.append({
|
| 'priority': 'HIGH',
|
| 'text': 'Обнаружены повторяющиеся ошибки. Требуется исправление в коде или конфигурации.',
|
| 'actions': [
|
| 'Идентифицировать конкретный участок кода, вызывающий ошибку',
|
| 'Добавить более детальное логирование для отладки',
|
| 'Улучшить обработку ошибок с логированием контекста',
|
| 'Провести code review проблемного участка'
|
| ]
|
| })
|
|
|
|
|
| if 'REPEATED_STACK_TRACES' in anomaly_types:
|
| recommendations.append({
|
| 'priority': 'MEDIUM',
|
| 'text': 'Обнаружены повторяющиеся stack traces. Необходимо исправить необработанные исключения.',
|
| 'actions': [
|
| 'Найти и исправить источник исключения',
|
| 'Добавить обработку исключений (try-except блоки)',
|
| 'Улучшить валидацию входных данных',
|
| 'Обновить проблемные библиотеки или зависимости'
|
| ]
|
| })
|
|
|
|
|
| if has_high or has_critical:
|
| recommendations.append({
|
| 'priority': 'MEDIUM',
|
| 'text': 'Улучшить систему мониторинга и алертинга для раннего обнаружения проблем.',
|
| 'actions': [
|
| 'Настроить алерты на критические ошибки',
|
| 'Внедрить мониторинг метрик производительности',
|
| 'Настроить дашборды для визуализации состояния системы',
|
| 'Реализовать автоматические проверки здоровья (health checks)'
|
| ]
|
| })
|
|
|
| return recommendations
|
|
|
| def _generate_general_recommendations(self, anomalies: List[Dict], statistics: Dict) -> List[str]:
|
| """Генерирует общие рекомендации."""
|
| recommendations = []
|
|
|
| total = statistics.get('total', 0)
|
| if total == 0:
|
| return ["Логи не содержат аномалий. Система работает стабильно."]
|
|
|
| recommendations.append("Регулярно проверяйте логи на наличие паттернов и трендов")
|
| recommendations.append("Настройте автоматическое уведомление о критических ошибках")
|
|
|
| if total > 10:
|
| recommendations.append("Обнаружено значительное количество аномалий - рекомендуется провести комплексный анализ системы")
|
|
|
| recommendations.append("Ведите документацию по известным проблемам и их решениям")
|
| recommendations.append("Рассмотрите возможность внедрения централизованного логирования (ELK, Splunk и т.д.)")
|
|
|
| return recommendations
|
|
|
| def _get_anomaly_type_name(self, anomaly_type: str) -> str:
|
| """Возвращает читаемое название типа аномалии."""
|
| names = {
|
| 'BURST_ERRORS': 'Всплески ошибок',
|
| 'REPEATED_ERRORS': 'Повторяющиеся ошибки',
|
| 'ERROR_BEFORE_CRASH': 'Ошибки перед крашем',
|
| 'TEMPORAL_SPIKE': 'Временные всплески',
|
| 'REPEATED_STACK_TRACES': 'Повторяющиеся stack traces'
|
| }
|
| return names.get(anomaly_type, anomaly_type)
|
|
|
| def _get_severity_emoji(self, severity: str) -> str:
|
| """Возвращает emoji для уровня серьёзности."""
|
| emoji_map = {
|
| 'CRITICAL': '🔴',
|
| 'HIGH': '🟠',
|
| 'MEDIUM': '🟡',
|
| 'LOW': '🟢'
|
| }
|
| return emoji_map.get(severity, '⚪')
|
|
|
| def _get_priority_emoji(self, priority: str) -> str:
|
| """Возвращает emoji для приоритета."""
|
| emoji_map = {
|
| 'CRITICAL': '🔴',
|
| 'HIGH': '🟠',
|
| 'MEDIUM': '🟡',
|
| 'LOW': '🟢'
|
| }
|
| return emoji_map.get(priority, '⚪')
|
|
|
| def _init_root_cause_templates(self) -> Dict[str, List[str]]:
|
| """Инициализирует шаблоны первопричин."""
|
| return {}
|
|
|
| def _init_recommendation_templates(self) -> Dict[str, List[str]]:
|
| """Инициализирует шаблоны рекомендаций."""
|
| return {}
|
|
|
| def _generate_no_anomalies_report(self) -> str:
|
| """Генерирует отчёт, когда аномалий не обнаружено."""
|
| return """# Анализ первопричин и рекомендации
|
|
|
| ## Результаты анализа
|
|
|
| **Обнаружено аномалий:** 0
|
|
|
| ✅ **Система работает стабильно.** В логах не обнаружено значительных аномалий или паттернов, указывающих на проблемы.
|
|
|
| ### Общие рекомендации
|
|
|
| - Продолжайте регулярный мониторинг логов
|
| - Поддерживайте текущий уровень логирования
|
| - Настройте автоматические проверки для раннего обнаружения проблем
|
| - Регулярно просматривайте метрики производительности
|
| """
|
|
|