MultiAgentLogsAnalyze / utils /streaming_processor.py
PatrickRedStar's picture
Add: Streaming processing toggle and speed optimization guide - Add streaming processing option in Gradio UI - Auto-enable for files >500 lines - Add chunk size slider - Create SPEED_OPTIMIZATION.md with model suggestions
4f1eb8e
"""
Потоковая обработка больших логов.
Разбивает логи на чанки и обрабатывает их параллельно или последовательно.
"""
from typing import List, Dict, Any, Callable, Generator
from agents import run_parser_agent, run_anomaly_agent, run_rca_agent, run_gpt_prompt_agent
def split_logs_into_chunks(logs: str, chunk_size: int = 100) -> Generator[str, None, None]:
"""
Разбивает логи на чанки по количеству строк.
Args:
logs: Полный текст логов
chunk_size: Количество строк в одном чанке
Yields:
str: Чанк логов
"""
lines = logs.strip().split('\n')
total_lines = len(lines)
for i in range(0, total_lines, chunk_size):
chunk_lines = lines[i:i + chunk_size]
yield '\n'.join(chunk_lines)
def process_logs_streaming(
raw_logs: str,
chunk_size: int = 100,
parallel: bool = False
) -> Dict[str, Any]:
"""
Обрабатывает логи потоковым способом - разбивает на чанки и обрабатывает каждый.
Args:
raw_logs: Сырые логи для обработки
chunk_size: Размер чанка в строках
parallel: Если True, обрабатывает чанки параллельно (требует threading/multiprocessing)
Returns:
dict: Объединенные результаты всех чанков
"""
chunks = list(split_logs_into_chunks(raw_logs, chunk_size))
total_chunks = len(chunks)
all_events = []
all_errors = []
all_warnings = []
all_anomalies = []
statistics_accumulator = {
'total_lines': 0,
'parsed_events': 0,
'errors': 0,
'warnings': 0,
'info_messages': 0,
'event_types': {},
'time_range': None
}
# Обрабатываем каждый чанк
for chunk_idx, chunk in enumerate(chunks, 1):
print(f"[Streaming] Processing chunk {chunk_idx}/{total_chunks} ({len(chunk.split(chr(10)))} lines)...")
# Шаг 1: Парсинг чанка
try:
structured_data = run_parser_agent(chunk)
# Объединяем результаты
all_events.extend(structured_data.get('events', []))
all_errors.extend(structured_data.get('errors', []))
all_warnings.extend(structured_data.get('warnings', []))
# Обновляем статистику
stats = structured_data.get('statistics', {})
statistics_accumulator['total_lines'] += stats.get('total_lines', 0)
statistics_accumulator['parsed_events'] += stats.get('parsed_events', 0)
statistics_accumulator['errors'] += stats.get('errors', 0)
statistics_accumulator['warnings'] += stats.get('warnings', 0)
statistics_accumulator['info_messages'] += stats.get('info_messages', 0)
# Объединяем типы событий
for event_type, count in stats.get('event_types', {}).items():
statistics_accumulator['event_types'][event_type] = \
statistics_accumulator['event_types'].get(event_type, 0) + count
# Обновляем временной диапазон (берем самый ранний start и самый поздний end)
chunk_time_range = stats.get('time_range')
if chunk_time_range:
if statistics_accumulator['time_range'] is None:
statistics_accumulator['time_range'] = chunk_time_range.copy()
else:
if chunk_time_range.get('start'):
if (statistics_accumulator['time_range'].get('start') is None or
chunk_time_range['start'] < statistics_accumulator['time_range']['start']):
statistics_accumulator['time_range']['start'] = chunk_time_range['start']
if chunk_time_range.get('end'):
if (statistics_accumulator['time_range'].get('end') is None or
chunk_time_range['end'] > statistics_accumulator['time_range']['end']):
statistics_accumulator['time_range']['end'] = chunk_time_range['end']
except Exception as e:
print(f"[Streaming] Error processing chunk {chunk_idx}: {e}")
continue
# Обновляем номера строк в объединенных событиях
for idx, event in enumerate(all_events, 1):
event['line_number'] = idx
# Шаг 2: Обнаружение аномалий на объединенных данных
merged_structured_data = {
'events': all_events,
'errors': all_errors,
'warnings': all_warnings,
'statistics': statistics_accumulator
}
print(f"[Streaming] Analyzing {len(all_events)} total events for anomalies...")
anomaly_report = run_anomaly_agent(merged_structured_data)
all_anomalies = anomaly_report.get('anomalies', [])
# Шаг 3: Анализ первопричин
print(f"[Streaming] Running root cause analysis...")
rca_result = run_rca_agent(anomaly_report)
# Шаг 4: Генерация промпта для GPT
print(f"[Streaming] Generating GPT prompt...")
recommendations_md = format_rca_as_markdown_streaming(rca_result)
gpt_prompt = run_gpt_prompt_agent(merged_structured_data, anomaly_report, recommendations_md)
return {
'structured_data': merged_structured_data,
'anomaly_report': anomaly_report,
'rca_result': rca_result,
'gpt_prompt': gpt_prompt,
'chunks_processed': total_chunks,
'chunk_size': chunk_size
}
def format_rca_as_markdown_streaming(rca_result: dict) -> str:
"""Форматирует результат RCA в Markdown (копия из app.py для избежания циклических зависимостей)."""
markdown_parts = []
# Возможные первопричины
analysis = rca_result.get("analysis", {})
root_causes = analysis.get("root_causes", [])
if root_causes:
markdown_parts.append("## Возможные первопричины\n\n")
for i, cause in enumerate(root_causes, 1):
markdown_parts.append(f"{i}. {cause}\n")
markdown_parts.append("\n")
# Детальный анализ аномалий
details = analysis.get("details", [])
if details:
markdown_parts.append("## Детальный анализ аномалий\n\n")
for detail in details:
anomaly_type = detail.get("anomaly_type", "UNKNOWN")
severity = detail.get("severity", "MEDIUM")
description = detail.get("description", "")
possible_causes = detail.get("possible_causes", [])
emoji = "🔴" if severity == "CRITICAL" else "🟡" if severity == "HIGH" else "🟢" if severity == "MEDIUM" else "⚪"
markdown_parts.append(f"### {emoji} {anomaly_type} ({severity})\n\n{description}\n\n")
if possible_causes:
markdown_parts.append("#### Возможные первопричины:\n\n")
for i, cause in enumerate(possible_causes, 1):
markdown_parts.append(f"{i}. {cause}\n")
markdown_parts.append("\n")
# Рекомендации по устранению
recommendations = rca_result.get("recommendations", [])
if recommendations:
markdown_parts.append("## Рекомендации по устранению\n\n")
for rec in recommendations:
priority = rec.get("priority", "MEDIUM")
text = rec.get("text", "")
actions = rec.get("actions", [])
emoji = "🔴" if priority == "CRITICAL" else "🟡" if priority == "HIGH" else "🟢" if priority == "MEDIUM" else "⚪"
markdown_parts.append(f"### {emoji} Рекомендация (Приоритет: {priority})\n\n{text}\n\n")
if actions:
markdown_parts.append("**Конкретные действия:**\n\n")
for action in actions:
markdown_parts.append(f"- {action}\n")
markdown_parts.append("\n")
# Общие рекомендации
general = rca_result.get("general_recommendations", [])
if general:
markdown_parts.append("## Общие рекомендации\n\n")
for rec in general:
markdown_parts.append(f"- {rec}\n")
return "".join(markdown_parts)