Add: Streaming processing toggle and speed optimization guide - Add streaming processing option in Gradio UI - Auto-enable for files >500 lines - Add chunk size slider - Create SPEED_OPTIMIZATION.md with model suggestions
4f1eb8e
| """ | |
| Потоковая обработка больших логов. | |
| Разбивает логи на чанки и обрабатывает их параллельно или последовательно. | |
| """ | |
| from typing import List, Dict, Any, Callable, Generator | |
| from agents import run_parser_agent, run_anomaly_agent, run_rca_agent, run_gpt_prompt_agent | |
| def split_logs_into_chunks(logs: str, chunk_size: int = 100) -> Generator[str, None, None]: | |
| """ | |
| Разбивает логи на чанки по количеству строк. | |
| Args: | |
| logs: Полный текст логов | |
| chunk_size: Количество строк в одном чанке | |
| Yields: | |
| str: Чанк логов | |
| """ | |
| lines = logs.strip().split('\n') | |
| total_lines = len(lines) | |
| for i in range(0, total_lines, chunk_size): | |
| chunk_lines = lines[i:i + chunk_size] | |
| yield '\n'.join(chunk_lines) | |
| def process_logs_streaming( | |
| raw_logs: str, | |
| chunk_size: int = 100, | |
| parallel: bool = False | |
| ) -> Dict[str, Any]: | |
| """ | |
| Обрабатывает логи потоковым способом - разбивает на чанки и обрабатывает каждый. | |
| Args: | |
| raw_logs: Сырые логи для обработки | |
| chunk_size: Размер чанка в строках | |
| parallel: Если True, обрабатывает чанки параллельно (требует threading/multiprocessing) | |
| Returns: | |
| dict: Объединенные результаты всех чанков | |
| """ | |
| chunks = list(split_logs_into_chunks(raw_logs, chunk_size)) | |
| total_chunks = len(chunks) | |
| all_events = [] | |
| all_errors = [] | |
| all_warnings = [] | |
| all_anomalies = [] | |
| statistics_accumulator = { | |
| 'total_lines': 0, | |
| 'parsed_events': 0, | |
| 'errors': 0, | |
| 'warnings': 0, | |
| 'info_messages': 0, | |
| 'event_types': {}, | |
| 'time_range': None | |
| } | |
| # Обрабатываем каждый чанк | |
| for chunk_idx, chunk in enumerate(chunks, 1): | |
| print(f"[Streaming] Processing chunk {chunk_idx}/{total_chunks} ({len(chunk.split(chr(10)))} lines)...") | |
| # Шаг 1: Парсинг чанка | |
| try: | |
| structured_data = run_parser_agent(chunk) | |
| # Объединяем результаты | |
| all_events.extend(structured_data.get('events', [])) | |
| all_errors.extend(structured_data.get('errors', [])) | |
| all_warnings.extend(structured_data.get('warnings', [])) | |
| # Обновляем статистику | |
| stats = structured_data.get('statistics', {}) | |
| statistics_accumulator['total_lines'] += stats.get('total_lines', 0) | |
| statistics_accumulator['parsed_events'] += stats.get('parsed_events', 0) | |
| statistics_accumulator['errors'] += stats.get('errors', 0) | |
| statistics_accumulator['warnings'] += stats.get('warnings', 0) | |
| statistics_accumulator['info_messages'] += stats.get('info_messages', 0) | |
| # Объединяем типы событий | |
| for event_type, count in stats.get('event_types', {}).items(): | |
| statistics_accumulator['event_types'][event_type] = \ | |
| statistics_accumulator['event_types'].get(event_type, 0) + count | |
| # Обновляем временной диапазон (берем самый ранний start и самый поздний end) | |
| chunk_time_range = stats.get('time_range') | |
| if chunk_time_range: | |
| if statistics_accumulator['time_range'] is None: | |
| statistics_accumulator['time_range'] = chunk_time_range.copy() | |
| else: | |
| if chunk_time_range.get('start'): | |
| if (statistics_accumulator['time_range'].get('start') is None or | |
| chunk_time_range['start'] < statistics_accumulator['time_range']['start']): | |
| statistics_accumulator['time_range']['start'] = chunk_time_range['start'] | |
| if chunk_time_range.get('end'): | |
| if (statistics_accumulator['time_range'].get('end') is None or | |
| chunk_time_range['end'] > statistics_accumulator['time_range']['end']): | |
| statistics_accumulator['time_range']['end'] = chunk_time_range['end'] | |
| except Exception as e: | |
| print(f"[Streaming] Error processing chunk {chunk_idx}: {e}") | |
| continue | |
| # Обновляем номера строк в объединенных событиях | |
| for idx, event in enumerate(all_events, 1): | |
| event['line_number'] = idx | |
| # Шаг 2: Обнаружение аномалий на объединенных данных | |
| merged_structured_data = { | |
| 'events': all_events, | |
| 'errors': all_errors, | |
| 'warnings': all_warnings, | |
| 'statistics': statistics_accumulator | |
| } | |
| print(f"[Streaming] Analyzing {len(all_events)} total events for anomalies...") | |
| anomaly_report = run_anomaly_agent(merged_structured_data) | |
| all_anomalies = anomaly_report.get('anomalies', []) | |
| # Шаг 3: Анализ первопричин | |
| print(f"[Streaming] Running root cause analysis...") | |
| rca_result = run_rca_agent(anomaly_report) | |
| # Шаг 4: Генерация промпта для GPT | |
| print(f"[Streaming] Generating GPT prompt...") | |
| recommendations_md = format_rca_as_markdown_streaming(rca_result) | |
| gpt_prompt = run_gpt_prompt_agent(merged_structured_data, anomaly_report, recommendations_md) | |
| return { | |
| 'structured_data': merged_structured_data, | |
| 'anomaly_report': anomaly_report, | |
| 'rca_result': rca_result, | |
| 'gpt_prompt': gpt_prompt, | |
| 'chunks_processed': total_chunks, | |
| 'chunk_size': chunk_size | |
| } | |
| def format_rca_as_markdown_streaming(rca_result: dict) -> str: | |
| """Форматирует результат RCA в Markdown (копия из app.py для избежания циклических зависимостей).""" | |
| markdown_parts = [] | |
| # Возможные первопричины | |
| analysis = rca_result.get("analysis", {}) | |
| root_causes = analysis.get("root_causes", []) | |
| if root_causes: | |
| markdown_parts.append("## Возможные первопричины\n\n") | |
| for i, cause in enumerate(root_causes, 1): | |
| markdown_parts.append(f"{i}. {cause}\n") | |
| markdown_parts.append("\n") | |
| # Детальный анализ аномалий | |
| details = analysis.get("details", []) | |
| if details: | |
| markdown_parts.append("## Детальный анализ аномалий\n\n") | |
| for detail in details: | |
| anomaly_type = detail.get("anomaly_type", "UNKNOWN") | |
| severity = detail.get("severity", "MEDIUM") | |
| description = detail.get("description", "") | |
| possible_causes = detail.get("possible_causes", []) | |
| emoji = "🔴" if severity == "CRITICAL" else "🟡" if severity == "HIGH" else "🟢" if severity == "MEDIUM" else "⚪" | |
| markdown_parts.append(f"### {emoji} {anomaly_type} ({severity})\n\n{description}\n\n") | |
| if possible_causes: | |
| markdown_parts.append("#### Возможные первопричины:\n\n") | |
| for i, cause in enumerate(possible_causes, 1): | |
| markdown_parts.append(f"{i}. {cause}\n") | |
| markdown_parts.append("\n") | |
| # Рекомендации по устранению | |
| recommendations = rca_result.get("recommendations", []) | |
| if recommendations: | |
| markdown_parts.append("## Рекомендации по устранению\n\n") | |
| for rec in recommendations: | |
| priority = rec.get("priority", "MEDIUM") | |
| text = rec.get("text", "") | |
| actions = rec.get("actions", []) | |
| emoji = "🔴" if priority == "CRITICAL" else "🟡" if priority == "HIGH" else "🟢" if priority == "MEDIUM" else "⚪" | |
| markdown_parts.append(f"### {emoji} Рекомендация (Приоритет: {priority})\n\n{text}\n\n") | |
| if actions: | |
| markdown_parts.append("**Конкретные действия:**\n\n") | |
| for action in actions: | |
| markdown_parts.append(f"- {action}\n") | |
| markdown_parts.append("\n") | |
| # Общие рекомендации | |
| general = rca_result.get("general_recommendations", []) | |
| if general: | |
| markdown_parts.append("## Общие рекомендации\n\n") | |
| for rec in general: | |
| markdown_parts.append(f"- {rec}\n") | |
| return "".join(markdown_parts) | |