Spaces:
Running
Running
| import logging | |
| import asyncio | |
| import uuid | |
| import time | |
| import traceback | |
| from typing import Dict, Any | |
| from sqlalchemy.orm import Session | |
| from backend.database import models | |
| from backend.database.utils import get_trace, save_knowledge_graph, update_trace_status, get_context_documents_from_trace | |
| from agentgraph.extraction.graph_processing import SlidingWindowMonitor | |
| from agentgraph.input.text_processing import ChunkingService | |
| from backend.services.task_service import update_task_status | |
| logger = logging.getLogger(__name__) | |
| class PipelineError(Exception): | |
| """Exception raised for errors in the pipeline processing.""" | |
| pass | |
| async def process_trace_task(trace_id: str, session: Session, task_id: str = None, splitter_type: str = "agent_semantic", force_regenerate: bool = False, method_name: str = "production", model: str = "gpt-5-mini", chunking_config = None) -> Dict[str, Any]: | |
| """ | |
| Process a single trace and return the merged knowledge graph with proper timeout handling. | |
| """ | |
| start_time = time.time() | |
| max_processing_time = 3600 # 1 hour maximum processing time | |
| try: | |
| if task_id: | |
| update_task_status(task_id, "PROCESSING", "Initializing trace processing", 5) | |
| trace = get_trace(session, trace_id) | |
| if not trace: | |
| raise PipelineError(f"Trace with ID {trace_id} not found") | |
| if not force_regenerate: | |
| existing_kgs = session.query(models.KnowledgeGraph).filter( | |
| models.KnowledgeGraph.trace_id == trace_id, | |
| models.KnowledgeGraph.status == "created" | |
| ).all() | |
| if existing_kgs and task_id: | |
| update_task_status(task_id, "COMPLETED", f"Found {len(existing_kgs)} existing knowledge graphs. Use force_regenerate=True to create new graphs.", 100) | |
| return { | |
| "message": f"Trace {trace_id} already processed", | |
| "existing_graphs": len(existing_kgs), | |
| "status": "already_processed", | |
| "suggestion": "Use force_regenerate=True to bypass this check and generate new graphs" | |
| } | |
| else: | |
| if task_id: | |
| update_task_status(task_id, "PROCESSING", "Force regenerate enabled - creating new knowledge graphs", 5) | |
| trace_content = trace.content | |
| if not trace_content: | |
| raise PipelineError(f"Trace {trace_id} has no content") | |
| context_documents = get_context_documents_from_trace(session, trace_id) | |
| context_count = len(context_documents) if context_documents else 0 | |
| logger.info(f"Retrieved {context_count} context documents for trace {trace_id}") | |
| def check_timeout(): | |
| if time.time() - start_time > max_processing_time: | |
| raise PipelineError(f"Processing timeout exceeded {max_processing_time} seconds") | |
| check_timeout() | |
| if task_id: | |
| context_msg = f" with {context_count} context documents" if context_count > 0 else "" | |
| update_task_status(task_id, "PROCESSING", f"Splitting trace content ({len(trace_content)} characters){context_msg}", 10) | |
| logger.info(f"Splitting trace content with {splitter_type} splitter") | |
| chunking_service = ChunkingService( | |
| default_batch_size=3, | |
| default_model="gpt-5-mini" | |
| ) | |
| check_timeout() | |
| # Extract chunking parameters from config | |
| chunk_kwargs = {} | |
| if chunking_config: | |
| # Map min_chunk_size and max_chunk_size to the ChunkingService parameters | |
| # For AgentAwareSemanticSplitter, max_chunk_size maps to window_size | |
| if chunking_config.max_chunk_size: | |
| chunk_kwargs["window_size"] = chunking_config.max_chunk_size | |
| # Pass min_chunk_size directly | |
| if chunking_config.min_chunk_size: | |
| chunk_kwargs["min_chunk_size"] = chunking_config.min_chunk_size | |
| # Calculate overlap_size based on window_size (5% default) | |
| if chunking_config.max_chunk_size: | |
| chunk_kwargs["overlap_size"] = max(1000, int(chunking_config.max_chunk_size * 0.05)) | |
| logger.info(f"Using custom chunking config: {chunking_config}") | |
| logger.info(f"Mapped to chunk_kwargs: {chunk_kwargs}") | |
| chunks = chunking_service.chunk_trace_content( | |
| content=trace_content, | |
| splitter_type=splitter_type, | |
| **chunk_kwargs | |
| ) | |
| logger.info(f"Split content into {len(chunks)} chunks") | |
| if not chunks: | |
| raise PipelineError("No chunks were created from the trace content") | |
| if task_id: | |
| update_task_status(task_id, "PROCESSING", f"Processing {len(chunks)} chunks", 20) | |
| processing_run_id = str(uuid.uuid4())[:8] | |
| monitor = SlidingWindowMonitor( | |
| batch_size=3, | |
| parallel_processing=True, | |
| model=model, | |
| source_trace_id=trace_id, | |
| processing_run_id=processing_run_id, | |
| method_name=method_name, | |
| context_documents=context_documents, | |
| trace_content=trace_content, | |
| trace_metadata=trace.trace_metadata | |
| ) | |
| if task_id: | |
| def progress_callback(stage, step, total_steps, message=None): | |
| check_timeout() | |
| base_progress = 30 | |
| stage_progress = 50 | |
| raw_progress = base_progress + int((step / total_steps) * stage_progress) | |
| overall_progress = min(raw_progress, 80) | |
| status_message = f"{stage}: {step}/{total_steps}" | |
| if message: | |
| status_message += f" - {message}" | |
| update_task_status(task_id, "PROCESSING", status_message, overall_progress) | |
| else: | |
| def progress_callback(stage, step, total_steps, message=None): | |
| check_timeout() | |
| pass | |
| check_timeout() | |
| output_identifier = f"trace_{trace_id}_{processing_run_id}" | |
| if task_id: | |
| update_task_status(task_id, "PROCESSING", "Starting knowledge graph processing", 30) | |
| try: | |
| final_kg_data = await asyncio.wait_for( | |
| monitor.process_trace(chunks, output_identifier, progress_callback=progress_callback), | |
| timeout=max_processing_time - (time.time() - start_time) | |
| ) | |
| except asyncio.TimeoutError: | |
| raise PipelineError(f"Trace processing timed out after {max_processing_time} seconds") | |
| check_timeout() | |
| if not isinstance(final_kg_data, dict): | |
| raise PipelineError("Invalid result format from trace processing - expected dictionary") | |
| if 'final_kg' not in final_kg_data: | |
| raise PipelineError("Invalid result format - missing 'final_kg' key in response") | |
| final_kg_info = final_kg_data['final_kg'] | |
| if 'graph_data' not in final_kg_info: | |
| raise PipelineError("Invalid result format - missing 'graph_data' key in final_kg") | |
| actual_graph_data = final_kg_info['graph_data'] | |
| try: | |
| from agentgraph.reconstruction.content_reference_resolver import ContentReferenceResolver | |
| resolver = ContentReferenceResolver() | |
| actual_graph_data = resolver.resolve_knowledge_graph_content( | |
| actual_graph_data, | |
| trace_content, | |
| {"window_index": 0} | |
| ) | |
| except Exception as _resolver_err: | |
| logger.warning(f"ContentReferenceResolver failed on merged KG: {_resolver_err}") | |
| if not actual_graph_data.get('entities') and not actual_graph_data.get('relations'): | |
| raise PipelineError("Knowledge graph processing completed but no entities or relations were found") | |
| if task_id: | |
| update_task_status(task_id, "PROCESSING", "Saving knowledge graph to database", 85) | |
| window_count = final_kg_data.get('window_count', len(chunks)) | |
| kg_result = save_knowledge_graph( | |
| session, | |
| output_identifier, | |
| actual_graph_data, | |
| trace_id=trace_id, | |
| window_total=window_count, | |
| processing_run_id=processing_run_id | |
| ) | |
| kg_id = kg_result.id | |
| window_kgs_saved = 0 | |
| if 'window_kgs' in final_kg_data and final_kg_data['window_kgs']: | |
| if task_id: | |
| update_task_status(task_id, "PROCESSING", "Saving window knowledge graphs to database", 90) | |
| logger.info(f"Saving {len(final_kg_data['window_kgs'])} window knowledge graphs") | |
| for window_kg_data in final_kg_data['window_kgs']: | |
| try: | |
| window_filename = window_kg_data.get('filename') | |
| window_graph_data = window_kg_data.get('graph_data') | |
| window_index = window_kg_data.get('window_index') | |
| window_total = window_kg_data.get('window_total') | |
| window_start_char = window_kg_data.get('window_start_char') | |
| window_end_char = window_kg_data.get('window_end_char') | |
| window_processing_run_id = window_kg_data.get('processing_run_id', processing_run_id) | |
| window_trace_id = window_kg_data.get('trace_id', trace_id) | |
| window_kg_result = save_knowledge_graph( | |
| session, | |
| window_filename, | |
| window_graph_data, | |
| trace_id=window_trace_id, | |
| window_index=window_index, | |
| window_total=window_total, | |
| window_start_char=window_start_char, | |
| window_end_char=window_end_char, | |
| processing_run_id=window_processing_run_id | |
| ) | |
| window_kgs_saved += 1 | |
| logger.debug(f"Saved window KG {window_index}: {window_filename} (ID: {window_kg_result.id})") | |
| except Exception as window_error: | |
| logger.error(f"Failed to save window knowledge graph {window_kg_data.get('filename', 'unknown')}: {window_error}") | |
| logger.info(f"Successfully saved {window_kgs_saved} window knowledge graphs") | |
| update_trace_status(session, trace_id, "processed") | |
| processing_duration = time.time() - start_time | |
| if task_id: | |
| update_task_status( | |
| task_id, | |
| "COMPLETED", | |
| f"Processing completed in {processing_duration:.1f}s. Knowledge graph saved with ID: {kg_id}", | |
| 100 | |
| ) | |
| logger.info(f"Successfully processed trace {trace_id} in {processing_duration:.1f}s") | |
| return { | |
| "trace_id": trace_id, | |
| "knowledge_graph_id": kg_id, | |
| "processing_duration": processing_duration, | |
| "chunks_processed": len(chunks), | |
| "window_graphs_saved": window_kgs_saved, | |
| "entities_count": len(actual_graph_data.get('entities', [])), | |
| "relations_count": len(actual_graph_data.get('relations', [])), | |
| "status": "completed" | |
| } | |
| except Exception as e: | |
| error_message = f"Error processing trace {trace_id}: {str(e)}" | |
| logger.error(error_message) | |
| logger.error(traceback.format_exc()) | |
| if task_id: | |
| update_task_status(task_id, "FAILED", error_message) | |
| try: | |
| update_trace_status(session, trace_id, "failed") | |
| except Exception as update_error: | |
| logger.error(f"Failed to update trace status: {update_error}") | |
| raise PipelineError(error_message) | |