import logging import asyncio import uuid import time import traceback from typing import Dict, Any from sqlalchemy.orm import Session from backend.database import models from backend.database.utils import get_trace, save_knowledge_graph, update_trace_status, get_context_documents_from_trace from agentgraph.extraction.graph_processing import SlidingWindowMonitor from agentgraph.input.text_processing import ChunkingService from backend.services.task_service import update_task_status logger = logging.getLogger(__name__) class PipelineError(Exception): """Exception raised for errors in the pipeline processing.""" pass async def process_trace_task(trace_id: str, session: Session, task_id: str = None, splitter_type: str = "agent_semantic", force_regenerate: bool = False, method_name: str = "production", model: str = "gpt-5-mini", chunking_config = None) -> Dict[str, Any]: """ Process a single trace and return the merged knowledge graph with proper timeout handling. """ start_time = time.time() max_processing_time = 3600 # 1 hour maximum processing time try: if task_id: update_task_status(task_id, "PROCESSING", "Initializing trace processing", 5) trace = get_trace(session, trace_id) if not trace: raise PipelineError(f"Trace with ID {trace_id} not found") if not force_regenerate: existing_kgs = session.query(models.KnowledgeGraph).filter( models.KnowledgeGraph.trace_id == trace_id, models.KnowledgeGraph.status == "created" ).all() if existing_kgs and task_id: update_task_status(task_id, "COMPLETED", f"Found {len(existing_kgs)} existing knowledge graphs. Use force_regenerate=True to create new graphs.", 100) return { "message": f"Trace {trace_id} already processed", "existing_graphs": len(existing_kgs), "status": "already_processed", "suggestion": "Use force_regenerate=True to bypass this check and generate new graphs" } else: if task_id: update_task_status(task_id, "PROCESSING", "Force regenerate enabled - creating new knowledge graphs", 5) trace_content = trace.content if not trace_content: raise PipelineError(f"Trace {trace_id} has no content") context_documents = get_context_documents_from_trace(session, trace_id) context_count = len(context_documents) if context_documents else 0 logger.info(f"Retrieved {context_count} context documents for trace {trace_id}") def check_timeout(): if time.time() - start_time > max_processing_time: raise PipelineError(f"Processing timeout exceeded {max_processing_time} seconds") check_timeout() if task_id: context_msg = f" with {context_count} context documents" if context_count > 0 else "" update_task_status(task_id, "PROCESSING", f"Splitting trace content ({len(trace_content)} characters){context_msg}", 10) logger.info(f"Splitting trace content with {splitter_type} splitter") chunking_service = ChunkingService( default_batch_size=3, default_model="gpt-5-mini" ) check_timeout() # Extract chunking parameters from config chunk_kwargs = {} if chunking_config: # Map min_chunk_size and max_chunk_size to the ChunkingService parameters # For AgentAwareSemanticSplitter, max_chunk_size maps to window_size if chunking_config.max_chunk_size: chunk_kwargs["window_size"] = chunking_config.max_chunk_size # Pass min_chunk_size directly if chunking_config.min_chunk_size: chunk_kwargs["min_chunk_size"] = chunking_config.min_chunk_size # Calculate overlap_size based on window_size (5% default) if chunking_config.max_chunk_size: chunk_kwargs["overlap_size"] = max(1000, int(chunking_config.max_chunk_size * 0.05)) logger.info(f"Using custom chunking config: {chunking_config}") logger.info(f"Mapped to chunk_kwargs: {chunk_kwargs}") chunks = chunking_service.chunk_trace_content( content=trace_content, splitter_type=splitter_type, **chunk_kwargs ) logger.info(f"Split content into {len(chunks)} chunks") if not chunks: raise PipelineError("No chunks were created from the trace content") if task_id: update_task_status(task_id, "PROCESSING", f"Processing {len(chunks)} chunks", 20) processing_run_id = str(uuid.uuid4())[:8] monitor = SlidingWindowMonitor( batch_size=3, parallel_processing=True, model=model, source_trace_id=trace_id, processing_run_id=processing_run_id, method_name=method_name, context_documents=context_documents, trace_content=trace_content, trace_metadata=trace.trace_metadata ) if task_id: def progress_callback(stage, step, total_steps, message=None): check_timeout() base_progress = 30 stage_progress = 50 raw_progress = base_progress + int((step / total_steps) * stage_progress) overall_progress = min(raw_progress, 80) status_message = f"{stage}: {step}/{total_steps}" if message: status_message += f" - {message}" update_task_status(task_id, "PROCESSING", status_message, overall_progress) else: def progress_callback(stage, step, total_steps, message=None): check_timeout() pass check_timeout() output_identifier = f"trace_{trace_id}_{processing_run_id}" if task_id: update_task_status(task_id, "PROCESSING", "Starting knowledge graph processing", 30) try: final_kg_data = await asyncio.wait_for( monitor.process_trace(chunks, output_identifier, progress_callback=progress_callback), timeout=max_processing_time - (time.time() - start_time) ) except asyncio.TimeoutError: raise PipelineError(f"Trace processing timed out after {max_processing_time} seconds") check_timeout() if not isinstance(final_kg_data, dict): raise PipelineError("Invalid result format from trace processing - expected dictionary") if 'final_kg' not in final_kg_data: raise PipelineError("Invalid result format - missing 'final_kg' key in response") final_kg_info = final_kg_data['final_kg'] if 'graph_data' not in final_kg_info: raise PipelineError("Invalid result format - missing 'graph_data' key in final_kg") actual_graph_data = final_kg_info['graph_data'] try: from agentgraph.reconstruction.content_reference_resolver import ContentReferenceResolver resolver = ContentReferenceResolver() actual_graph_data = resolver.resolve_knowledge_graph_content( actual_graph_data, trace_content, {"window_index": 0} ) except Exception as _resolver_err: logger.warning(f"ContentReferenceResolver failed on merged KG: {_resolver_err}") if not actual_graph_data.get('entities') and not actual_graph_data.get('relations'): raise PipelineError("Knowledge graph processing completed but no entities or relations were found") if task_id: update_task_status(task_id, "PROCESSING", "Saving knowledge graph to database", 85) window_count = final_kg_data.get('window_count', len(chunks)) kg_result = save_knowledge_graph( session, output_identifier, actual_graph_data, trace_id=trace_id, window_total=window_count, processing_run_id=processing_run_id ) kg_id = kg_result.id window_kgs_saved = 0 if 'window_kgs' in final_kg_data and final_kg_data['window_kgs']: if task_id: update_task_status(task_id, "PROCESSING", "Saving window knowledge graphs to database", 90) logger.info(f"Saving {len(final_kg_data['window_kgs'])} window knowledge graphs") for window_kg_data in final_kg_data['window_kgs']: try: window_filename = window_kg_data.get('filename') window_graph_data = window_kg_data.get('graph_data') window_index = window_kg_data.get('window_index') window_total = window_kg_data.get('window_total') window_start_char = window_kg_data.get('window_start_char') window_end_char = window_kg_data.get('window_end_char') window_processing_run_id = window_kg_data.get('processing_run_id', processing_run_id) window_trace_id = window_kg_data.get('trace_id', trace_id) window_kg_result = save_knowledge_graph( session, window_filename, window_graph_data, trace_id=window_trace_id, window_index=window_index, window_total=window_total, window_start_char=window_start_char, window_end_char=window_end_char, processing_run_id=window_processing_run_id ) window_kgs_saved += 1 logger.debug(f"Saved window KG {window_index}: {window_filename} (ID: {window_kg_result.id})") except Exception as window_error: logger.error(f"Failed to save window knowledge graph {window_kg_data.get('filename', 'unknown')}: {window_error}") logger.info(f"Successfully saved {window_kgs_saved} window knowledge graphs") update_trace_status(session, trace_id, "processed") processing_duration = time.time() - start_time if task_id: update_task_status( task_id, "COMPLETED", f"Processing completed in {processing_duration:.1f}s. Knowledge graph saved with ID: {kg_id}", 100 ) logger.info(f"Successfully processed trace {trace_id} in {processing_duration:.1f}s") return { "trace_id": trace_id, "knowledge_graph_id": kg_id, "processing_duration": processing_duration, "chunks_processed": len(chunks), "window_graphs_saved": window_kgs_saved, "entities_count": len(actual_graph_data.get('entities', [])), "relations_count": len(actual_graph_data.get('relations', [])), "status": "completed" } except Exception as e: error_message = f"Error processing trace {trace_id}: {str(e)}" logger.error(error_message) logger.error(traceback.format_exc()) if task_id: update_task_status(task_id, "FAILED", error_message) try: update_trace_status(session, trace_id, "failed") except Exception as update_error: logger.error(f"Failed to update trace status: {update_error}") raise PipelineError(error_message)