""" Потоковая обработка больших логов. Разбивает логи на чанки и обрабатывает их параллельно или последовательно. """ from typing import List, Dict, Any, Callable, Generator from agents import run_parser_agent, run_anomaly_agent, run_rca_agent, run_gpt_prompt_agent def split_logs_into_chunks(logs: str, chunk_size: int = 100) -> Generator[str, None, None]: """ Разбивает логи на чанки по количеству строк. Args: logs: Полный текст логов chunk_size: Количество строк в одном чанке Yields: str: Чанк логов """ lines = logs.strip().split('\n') total_lines = len(lines) for i in range(0, total_lines, chunk_size): chunk_lines = lines[i:i + chunk_size] yield '\n'.join(chunk_lines) def process_logs_streaming( raw_logs: str, chunk_size: int = 100, parallel: bool = False ) -> Dict[str, Any]: """ Обрабатывает логи потоковым способом - разбивает на чанки и обрабатывает каждый. Args: raw_logs: Сырые логи для обработки chunk_size: Размер чанка в строках parallel: Если True, обрабатывает чанки параллельно (требует threading/multiprocessing) Returns: dict: Объединенные результаты всех чанков """ chunks = list(split_logs_into_chunks(raw_logs, chunk_size)) total_chunks = len(chunks) all_events = [] all_errors = [] all_warnings = [] all_anomalies = [] statistics_accumulator = { 'total_lines': 0, 'parsed_events': 0, 'errors': 0, 'warnings': 0, 'info_messages': 0, 'event_types': {}, 'time_range': None } # Обрабатываем каждый чанк for chunk_idx, chunk in enumerate(chunks, 1): print(f"[Streaming] Processing chunk {chunk_idx}/{total_chunks} ({len(chunk.split(chr(10)))} lines)...") # Шаг 1: Парсинг чанка try: structured_data = run_parser_agent(chunk) # Объединяем результаты all_events.extend(structured_data.get('events', [])) all_errors.extend(structured_data.get('errors', [])) all_warnings.extend(structured_data.get('warnings', [])) # Обновляем статистику stats = structured_data.get('statistics', {}) statistics_accumulator['total_lines'] += stats.get('total_lines', 0) statistics_accumulator['parsed_events'] += stats.get('parsed_events', 0) statistics_accumulator['errors'] += stats.get('errors', 0) statistics_accumulator['warnings'] += stats.get('warnings', 0) statistics_accumulator['info_messages'] += stats.get('info_messages', 0) # Объединяем типы событий for event_type, count in stats.get('event_types', {}).items(): statistics_accumulator['event_types'][event_type] = \ statistics_accumulator['event_types'].get(event_type, 0) + count # Обновляем временной диапазон (берем самый ранний start и самый поздний end) chunk_time_range = stats.get('time_range') if chunk_time_range: if statistics_accumulator['time_range'] is None: statistics_accumulator['time_range'] = chunk_time_range.copy() else: if chunk_time_range.get('start'): if (statistics_accumulator['time_range'].get('start') is None or chunk_time_range['start'] < statistics_accumulator['time_range']['start']): statistics_accumulator['time_range']['start'] = chunk_time_range['start'] if chunk_time_range.get('end'): if (statistics_accumulator['time_range'].get('end') is None or chunk_time_range['end'] > statistics_accumulator['time_range']['end']): statistics_accumulator['time_range']['end'] = chunk_time_range['end'] except Exception as e: print(f"[Streaming] Error processing chunk {chunk_idx}: {e}") continue # Обновляем номера строк в объединенных событиях for idx, event in enumerate(all_events, 1): event['line_number'] = idx # Шаг 2: Обнаружение аномалий на объединенных данных merged_structured_data = { 'events': all_events, 'errors': all_errors, 'warnings': all_warnings, 'statistics': statistics_accumulator } print(f"[Streaming] Analyzing {len(all_events)} total events for anomalies...") anomaly_report = run_anomaly_agent(merged_structured_data) all_anomalies = anomaly_report.get('anomalies', []) # Шаг 3: Анализ первопричин print(f"[Streaming] Running root cause analysis...") rca_result = run_rca_agent(anomaly_report) # Шаг 4: Генерация промпта для GPT print(f"[Streaming] Generating GPT prompt...") recommendations_md = format_rca_as_markdown_streaming(rca_result) gpt_prompt = run_gpt_prompt_agent(merged_structured_data, anomaly_report, recommendations_md) return { 'structured_data': merged_structured_data, 'anomaly_report': anomaly_report, 'rca_result': rca_result, 'gpt_prompt': gpt_prompt, 'chunks_processed': total_chunks, 'chunk_size': chunk_size } def format_rca_as_markdown_streaming(rca_result: dict) -> str: """Форматирует результат RCA в Markdown (копия из app.py для избежания циклических зависимостей).""" markdown_parts = [] # Возможные первопричины analysis = rca_result.get("analysis", {}) root_causes = analysis.get("root_causes", []) if root_causes: markdown_parts.append("## Возможные первопричины\n\n") for i, cause in enumerate(root_causes, 1): markdown_parts.append(f"{i}. {cause}\n") markdown_parts.append("\n") # Детальный анализ аномалий details = analysis.get("details", []) if details: markdown_parts.append("## Детальный анализ аномалий\n\n") for detail in details: anomaly_type = detail.get("anomaly_type", "UNKNOWN") severity = detail.get("severity", "MEDIUM") description = detail.get("description", "") possible_causes = detail.get("possible_causes", []) emoji = "🔴" if severity == "CRITICAL" else "🟡" if severity == "HIGH" else "🟢" if severity == "MEDIUM" else "⚪" markdown_parts.append(f"### {emoji} {anomaly_type} ({severity})\n\n{description}\n\n") if possible_causes: markdown_parts.append("#### Возможные первопричины:\n\n") for i, cause in enumerate(possible_causes, 1): markdown_parts.append(f"{i}. {cause}\n") markdown_parts.append("\n") # Рекомендации по устранению recommendations = rca_result.get("recommendations", []) if recommendations: markdown_parts.append("## Рекомендации по устранению\n\n") for rec in recommendations: priority = rec.get("priority", "MEDIUM") text = rec.get("text", "") actions = rec.get("actions", []) emoji = "🔴" if priority == "CRITICAL" else "🟡" if priority == "HIGH" else "🟢" if priority == "MEDIUM" else "⚪" markdown_parts.append(f"### {emoji} Рекомендация (Приоритет: {priority})\n\n{text}\n\n") if actions: markdown_parts.append("**Конкретные действия:**\n\n") for action in actions: markdown_parts.append(f"- {action}\n") markdown_parts.append("\n") # Общие рекомендации general = rca_result.get("general_recommendations", []) if general: markdown_parts.append("## Общие рекомендации\n\n") for rec in general: markdown_parts.append(f"- {rec}\n") return "".join(markdown_parts)