""" Agent 1: Log Parser Agent Преобразует сырые логи в структурированное представление. """ import re import json from datetime import datetime from typing import Dict, List, Any from collections import defaultdict class LogParserAgent: """Парсит сырые логи и преобразует их в структурированный JSON.""" # Паттерны для распознавания уровней логирования LOG_LEVELS = ['CRITICAL', 'ERROR', 'WARNING', 'INFO', 'DEBUG', 'TRACE'] # Паттерны для временных меток (поддержка различных форматов) TIMESTAMP_PATTERNS = [ r'\d{4}-\d{2}-\d{2}[\sT]\d{2}:\d{2}:\d{2}(?:\.\d+)?', # ISO 8601 r'\d{2}/\d{2}/\d{4}[\s]\d{2}:\d{2}:\d{2}', # DD/MM/YYYY HH:MM:SS r'\[(?:[A-Z][a-z]{2}\s+\d{1,2}\s+\d{2}:\d{2}:\d{2})\]', # [Mon Jan 1 12:00:00] ] def __init__(self): """Инициализация агента.""" self.compiled_patterns = [re.compile(pattern) for pattern in self.TIMESTAMP_PATTERNS] def parse(self, raw_logs: str) -> Dict[str, Any]: """ Парсит сырые логи и возвращает структурированный JSON. Args: raw_logs: Строка с сырыми логами Returns: Структурированный JSON-объект с событиями, ошибками, предупреждениями и статистикой """ if not raw_logs or not raw_logs.strip(): return self._empty_result() lines = raw_logs.strip().split('\n') events = [] errors = [] warnings = [] for line_num, line in enumerate(lines, start=1): if not line.strip(): continue parsed_event = self._parse_line(line, line_num) if parsed_event: events.append(parsed_event) level = parsed_event.get('level', '').upper() if level == 'ERROR' or level == 'CRITICAL': errors.append(parsed_event) elif level == 'WARNING': warnings.append(parsed_event) # Группировка по типам событий event_types = defaultdict(int) for event in events: event_type = event.get('type', 'UNKNOWN') event_types[event_type] += 1 # Статистика statistics = { 'total_lines': len(lines), 'parsed_events': len(events), 'errors': len(errors), 'warnings': len(warnings), 'info_messages': len([e for e in events if e.get('level', '').upper() == 'INFO']), 'event_types': dict(event_types), 'time_range': self._calculate_time_range(events), } return { 'events': events, 'errors': errors, 'warnings': warnings, 'statistics': statistics } def _parse_line(self, line: str, line_num: int) -> Dict[str, Any] | None: """ Парсит одну строку лога. Args: line: Строка лога line_num: Номер строки Returns: Словарь с распарсенными данными или None """ # Поиск временной метки timestamp = None timestamp_str = None for pattern in self.compiled_patterns: match = pattern.search(line) if match: timestamp_str = match.group(0) try: # Попытка парсинга различных форматов timestamp = self._parse_timestamp(timestamp_str) except: pass break # Поиск уровня логирования level = None for log_level in self.LOG_LEVELS: if log_level in line.upper(): level = log_level break # Если уровень не найден, определяем по ключевым словам if not level: line_upper = line.upper() if any(word in line_upper for word in ['ERROR', 'EXCEPTION', 'FAILED', 'FAILURE']): level = 'ERROR' elif any(word in line_upper for word in ['WARN', 'WARNING']): level = 'WARNING' elif any(word in line_upper for word in ['INFO', 'INFORMATION']): level = 'INFO' elif any(word in line_upper for word in ['DEBUG']): level = 'DEBUG' else: level = 'INFO' # По умолчанию # Извлечение сообщения (часть после временной метки и уровня) message = line if timestamp_str: message = message.replace(timestamp_str, '', 1).strip() # Определение типа события event_type = self._detect_event_type(line) return { 'line_number': line_num, 'timestamp': timestamp_str if timestamp_str else None, 'level': level, 'message': message.strip(), 'type': event_type, 'raw': line } def _parse_timestamp(self, timestamp_str: str) -> datetime | None: """Парсит строку временной метки в объект datetime.""" # Удаление скобок если есть timestamp_str = timestamp_str.strip('[]') # Попытка различных форматов formats = [ '%Y-%m-%d %H:%M:%S', '%Y-%m-%dT%H:%M:%S', '%Y-%m-%d %H:%M:%S.%f', '%Y-%m-%dT%H:%M:%S.%f', '%d/%m/%Y %H:%M:%S', '%a %b %d %H:%M:%S %Y', # [Mon Jan 1 12:00:00 2024] ] for fmt in formats: try: return datetime.strptime(timestamp_str, fmt) except ValueError: continue return None def _detect_event_type(self, line: str) -> str: """Определяет тип события по содержимому строки.""" line_lower = line.lower() if any(keyword in line_lower for keyword in ['connection', 'connect', 'disconnect']): return 'CONNECTION' elif any(keyword in line_lower for keyword in ['request', 'response', 'http', 'api']): return 'HTTP_REQUEST' elif any(keyword in line_lower for keyword in ['database', 'db', 'query', 'sql']): return 'DATABASE' elif any(keyword in line_lower for keyword in ['authentication', 'auth', 'login', 'logout']): return 'AUTHENTICATION' elif any(keyword in line_lower for keyword in ['exception', 'error', 'failure']): return 'EXCEPTION' elif any(keyword in line_lower for keyword in ['start', 'stop', 'shutdown', 'initialized']): return 'SYSTEM' else: return 'GENERAL' def _calculate_time_range(self, events: List[Dict[str, Any]]) -> Dict[str, str] | None: """Вычисляет временной диапазон событий.""" timestamps = [e.get('timestamp') for e in events if e.get('timestamp')] if not timestamps: return None return { 'start': timestamps[0], 'end': timestamps[-1] } def _empty_result(self) -> Dict[str, Any]: """Возвращает пустой результат при отсутствии логов.""" return { 'events': [], 'errors': [], 'warnings': [], 'statistics': { 'total_lines': 0, 'parsed_events': 0, 'errors': 0, 'warnings': 0, 'info_messages': 0, 'event_types': {}, 'time_range': None } }