MultiAgentLogsAnalyze / agents /parser_agent.py
PatrickRedStar's picture
add
d6f4b44
raw
history blame
8.66 kB
"""
Agent 1: Log Parser Agent
Преобразует сырые логи в структурированное представление.
"""
import re
import json
from datetime import datetime
from typing import Dict, List, Any
from collections import defaultdict
class LogParserAgent:
"""Парсит сырые логи и преобразует их в структурированный JSON."""
# Паттерны для распознавания уровней логирования
LOG_LEVELS = ['CRITICAL', 'ERROR', 'WARNING', 'INFO', 'DEBUG', 'TRACE']
# Паттерны для временных меток (поддержка различных форматов)
TIMESTAMP_PATTERNS = [
r'\d{4}-\d{2}-\d{2}[\sT]\d{2}:\d{2}:\d{2}(?:\.\d+)?', # ISO 8601
r'\d{2}/\d{2}/\d{4}[\s]\d{2}:\d{2}:\d{2}', # DD/MM/YYYY HH:MM:SS
r'\[(?:[A-Z][a-z]{2}\s+\d{1,2}\s+\d{2}:\d{2}:\d{2})\]', # [Mon Jan 1 12:00:00]
]
def __init__(self):
"""Инициализация агента."""
self.compiled_patterns = [re.compile(pattern) for pattern in self.TIMESTAMP_PATTERNS]
def parse(self, raw_logs: str) -> Dict[str, Any]:
"""
Парсит сырые логи и возвращает структурированный JSON.
Args:
raw_logs: Строка с сырыми логами
Returns:
Структурированный JSON-объект с событиями, ошибками, предупреждениями и статистикой
"""
if not raw_logs or not raw_logs.strip():
return self._empty_result()
lines = raw_logs.strip().split('\n')
events = []
errors = []
warnings = []
for line_num, line in enumerate(lines, start=1):
if not line.strip():
continue
parsed_event = self._parse_line(line, line_num)
if parsed_event:
events.append(parsed_event)
level = parsed_event.get('level', '').upper()
if level == 'ERROR' or level == 'CRITICAL':
errors.append(parsed_event)
elif level == 'WARNING':
warnings.append(parsed_event)
# Группировка по типам событий
event_types = defaultdict(int)
for event in events:
event_type = event.get('type', 'UNKNOWN')
event_types[event_type] += 1
# Статистика
statistics = {
'total_lines': len(lines),
'parsed_events': len(events),
'errors': len(errors),
'warnings': len(warnings),
'info_messages': len([e for e in events if e.get('level', '').upper() == 'INFO']),
'event_types': dict(event_types),
'time_range': self._calculate_time_range(events),
}
return {
'events': events,
'errors': errors,
'warnings': warnings,
'statistics': statistics
}
def _parse_line(self, line: str, line_num: int) -> Dict[str, Any] | None:
"""
Парсит одну строку лога.
Args:
line: Строка лога
line_num: Номер строки
Returns:
Словарь с распарсенными данными или None
"""
# Поиск временной метки
timestamp = None
timestamp_str = None
for pattern in self.compiled_patterns:
match = pattern.search(line)
if match:
timestamp_str = match.group(0)
try:
# Попытка парсинга различных форматов
timestamp = self._parse_timestamp(timestamp_str)
except:
pass
break
# Поиск уровня логирования
level = None
for log_level in self.LOG_LEVELS:
if log_level in line.upper():
level = log_level
break
# Если уровень не найден, определяем по ключевым словам
if not level:
line_upper = line.upper()
if any(word in line_upper for word in ['ERROR', 'EXCEPTION', 'FAILED', 'FAILURE']):
level = 'ERROR'
elif any(word in line_upper for word in ['WARN', 'WARNING']):
level = 'WARNING'
elif any(word in line_upper for word in ['INFO', 'INFORMATION']):
level = 'INFO'
elif any(word in line_upper for word in ['DEBUG']):
level = 'DEBUG'
else:
level = 'INFO' # По умолчанию
# Извлечение сообщения (часть после временной метки и уровня)
message = line
if timestamp_str:
message = message.replace(timestamp_str, '', 1).strip()
# Определение типа события
event_type = self._detect_event_type(line)
return {
'line_number': line_num,
'timestamp': timestamp_str if timestamp_str else None,
'level': level,
'message': message.strip(),
'type': event_type,
'raw': line
}
def _parse_timestamp(self, timestamp_str: str) -> datetime | None:
"""Парсит строку временной метки в объект datetime."""
# Удаление скобок если есть
timestamp_str = timestamp_str.strip('[]')
# Попытка различных форматов
formats = [
'%Y-%m-%d %H:%M:%S',
'%Y-%m-%dT%H:%M:%S',
'%Y-%m-%d %H:%M:%S.%f',
'%Y-%m-%dT%H:%M:%S.%f',
'%d/%m/%Y %H:%M:%S',
'%a %b %d %H:%M:%S %Y', # [Mon Jan 1 12:00:00 2024]
]
for fmt in formats:
try:
return datetime.strptime(timestamp_str, fmt)
except ValueError:
continue
return None
def _detect_event_type(self, line: str) -> str:
"""Определяет тип события по содержимому строки."""
line_lower = line.lower()
if any(keyword in line_lower for keyword in ['connection', 'connect', 'disconnect']):
return 'CONNECTION'
elif any(keyword in line_lower for keyword in ['request', 'response', 'http', 'api']):
return 'HTTP_REQUEST'
elif any(keyword in line_lower for keyword in ['database', 'db', 'query', 'sql']):
return 'DATABASE'
elif any(keyword in line_lower for keyword in ['authentication', 'auth', 'login', 'logout']):
return 'AUTHENTICATION'
elif any(keyword in line_lower for keyword in ['exception', 'error', 'failure']):
return 'EXCEPTION'
elif any(keyword in line_lower for keyword in ['start', 'stop', 'shutdown', 'initialized']):
return 'SYSTEM'
else:
return 'GENERAL'
def _calculate_time_range(self, events: List[Dict[str, Any]]) -> Dict[str, str] | None:
"""Вычисляет временной диапазон событий."""
timestamps = [e.get('timestamp') for e in events if e.get('timestamp')]
if not timestamps:
return None
return {
'start': timestamps[0],
'end': timestamps[-1]
}
def _empty_result(self) -> Dict[str, Any]:
"""Возвращает пустой результат при отсутствии логов."""
return {
'events': [],
'errors': [],
'warnings': [],
'statistics': {
'total_lines': 0,
'parsed_events': 0,
'errors': 0,
'warnings': 0,
'info_messages': 0,
'event_types': {},
'time_range': None
}
}