""" API endpoints for working with traces """ from fastapi import APIRouter, Depends, HTTPException, UploadFile, File, Form, BackgroundTasks, Body from typing import List, Dict, Any, Optional from sqlalchemy.orm import Session import logging from datetime import datetime from pydantic import BaseModel import time from backend.dependencies import get_db from backend.database.utils import get_all_traces, get_trace, delete_trace, get_knowledge_graphs_for_trace, save_trace, update_trace_content from backend.services.context_service import ContextService from backend.services.cost_calculation_service import cost_service from backend.models import ( ContextDocument, CreateContextRequest, UpdateContextRequest, ContextDocumentResponse, ContextDocumentType ) from backend.services.trace_management_service import TraceManagementService from backend.services.processing_service import process_trace_task, PipelineError from backend.services.task_service import create_task logger = logging.getLogger(__name__) router = APIRouter( prefix="/api/traces", tags=["traces"], ) def update_kg_statistics(session: Session, kg_list: List) -> None: """ Update entity_count and relation_count for knowledge graphs if they're missing but graph_data exists. Args: session: Database session kg_list: List of knowledge graph database objects """ updated_count = 0 for kg in kg_list: if kg and kg.graph_data: needs_update = False # Update entity count if missing/zero but entities exist in graph_data if kg.entity_count is None or kg.entity_count == 0: entities = kg.graph_data.get("entities", []) if entities: kg.entity_count = len(entities) needs_update = True # Update relation count if missing/zero but relations exist in graph_data if kg.relation_count is None or kg.relation_count == 0: relations = kg.graph_data.get("relations", []) if relations: kg.relation_count = len(relations) needs_update = True if needs_update: session.add(kg) updated_count += 1 if updated_count > 0: session.commit() logging.getLogger("agent_monitoring_server").info(f"Updated entity/relation counts for {updated_count} knowledge graphs") @router.get("/") async def list_traces(db: Session = Depends(get_db)): """ List all traces stored in the database with their associated knowledge graphs. """ traces = get_all_traces(db) # Convert to a list of dictionaries and include knowledge graphs trace_list = [] for trace in traces: # Get related knowledge graphs for each trace knowledge_graphs = get_knowledge_graphs_for_trace(db, trace.trace_id) # Convert knowledge graphs to proper dict format with is_final field kg_list = [] for kg in knowledge_graphs: # Extract processing metadata from graph_data if available processing_metadata = {} system_info = {} if kg.graph_data and isinstance(kg.graph_data, dict): metadata = kg.graph_data.get("metadata", {}) processing_params = metadata.get("processing_params", {}) processing_metadata = { "method_name": processing_params.get("method_name", "unknown"), "splitter_type": processing_params.get("splitter_type", "unknown"), "window_size": processing_params.get("window_size", "unknown"), "overlap_size": processing_params.get("overlap_size", "unknown") } system_info = { "system_name": kg.graph_data.get("system_name"), "system_summary": kg.graph_data.get("system_summary"), "graph_data": kg.graph_data } # Determine if this is a final KG using the same logic as the specific endpoint is_final = (kg.window_index is None and kg.window_total is not None) kg_dict = { "kg_id": kg.id, "id": kg.id, "filename": kg.filename, "created_at": kg.creation_timestamp.isoformat() if kg.creation_timestamp else None, "status": kg.status, "is_final": is_final, "window_index": kg.window_index, "window_total": kg.window_total, "window_start_char": kg.window_start_char, "window_end_char": kg.window_end_char, "processing_run_id": kg.processing_run_id, "entity_count": kg.entity_count or 0, "relation_count": kg.relation_count or 0, "is_enriched": kg.status == "enriched" or kg.status == "perturbed" or kg.status == "analyzed", "is_perturbed": kg.status == "perturbed" or kg.status == "analyzed", "is_analyzed": kg.status == "analyzed", "processing_metadata": processing_metadata, "system_name": system_info.get("system_name"), "system_summary": system_info.get("system_summary"), "graph_data": system_info.get("graph_data"), } kg_list.append(kg_dict) # Prepare the trace response with knowledge graphs trace_dict = trace.to_dict() trace_dict["knowledge_graphs"] = kg_list trace_list.append(trace_dict) return { "status": "success", "traces": trace_list } @router.post("/") async def upload_trace( trace_file: UploadFile = File(...), db: Session = Depends(get_db) ): """ Upload a trace file to the database. Args: trace_file: The trace file to upload db: Database session Returns: Status and trace ID """ try: # Read the uploaded file content file_content = await trace_file.read() file_content_str = file_content.decode('utf-8') try: # Import trace analysis utilities from agentgraph.input.trace_management import analyze_trace_characteristics # Analyze the trace to determine its characteristics trace_analysis = analyze_trace_characteristics(file_content_str) # Save the trace to the database trace = save_trace( session=db, content=file_content_str, filename=trace_file.filename, title=f"Trace from {trace_file.filename}", description=f"Uploaded via Stage Processor on {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}", trace_type=trace_analysis.get('trace_type', "user_upload"), trace_source="stage_processor", tags=["stage_processor"], trace_metadata=trace_analysis ) logger = logging.getLogger("agent_monitoring_server") logger.info(f"Trace saved to database: ID={trace.id}, trace_id={trace.trace_id}") logger.info(f"Trace characteristics: {trace_analysis.get('trace_type', 'unknown')} type, " + f"{trace_analysis.get('line_count', 0)} lines, {trace_analysis.get('total_length', 0)} chars") # Auto-generate context documents using universal parser context_docs_created = 0 try: from backend.services.universal_parser_service import auto_generate_context_documents created_docs = auto_generate_context_documents(trace.trace_id, file_content_str, db) context_docs_created = len(created_docs) if context_docs_created > 0: logger.info(f"Auto-generated {context_docs_created} context documents for trace {trace.trace_id}") except Exception as e: logger.warning(f"Failed to auto-generate context documents for trace {trace.trace_id}: {str(e)}") return { "status": "success", "message": "Trace uploaded successfully", "trace_id": trace.trace_id, "title": trace.title, "character_count": trace.character_count, "turn_count": trace.turn_count, "context_documents_generated": context_docs_created } except ImportError: # Fall back to basic trace saving if trace_uploader is not available logger = logging.getLogger("agent_monitoring_server") logger.warning("Could not import trace analysis utilities") # Save the trace to the database without analysis trace = save_trace( session=db, content=file_content_str, filename=trace_file.filename, title=f"Trace from {trace_file.filename}", trace_type="user_upload", trace_source="stage_processor", tags=["stage_processor"] ) logger.info(f"Trace saved to database: ID={trace.id}, trace_id={trace.trace_id}") # Auto-generate context documents using universal parser context_docs_created = 0 try: from backend.services.universal_parser_service import auto_generate_context_documents created_docs = auto_generate_context_documents(trace.trace_id, file_content_str, db) context_docs_created = len(created_docs) if context_docs_created > 0: logger.info(f"Auto-generated {context_docs_created} context documents for trace {trace.trace_id}") except Exception as e: logger.warning(f"Failed to auto-generate context documents for trace {trace.trace_id}: {str(e)}") return { "status": "success", "message": "Trace uploaded successfully", "trace_id": trace.trace_id, "title": trace.title, "character_count": trace.character_count, "turn_count": trace.turn_count, "context_documents_generated": context_docs_created } except Exception as e: logger = logging.getLogger("agent_monitoring_server") logger.error(f"Error uploading trace: {str(e)}") raise HTTPException(status_code=500, detail="An internal error occurred while uploading trace") @router.get("/{trace_id}") async def get_trace_by_id(trace_id: str, db: Session = Depends(get_db)): """ Get a specific trace by ID. """ trace = get_trace(db, trace_id) if not trace: raise HTTPException(status_code=404, detail=f"Trace with ID {trace_id} not found") # Get related knowledge graphs knowledge_graphs = get_knowledge_graphs_for_trace(db, trace.trace_id) # Convert knowledge graphs to proper dict format with is_final field kg_list = [] for kg in knowledge_graphs: # Extract processing metadata from graph_data if available processing_metadata = {} system_info = {} if kg.graph_data and isinstance(kg.graph_data, dict): metadata = kg.graph_data.get("metadata", {}) processing_params = metadata.get("processing_params", {}) processing_metadata = { "method_name": processing_params.get("method_name", "unknown"), "splitter_type": processing_params.get("splitter_type", "unknown"), "window_size": processing_params.get("window_size", "unknown"), "overlap_size": processing_params.get("overlap_size", "unknown") } system_info = { "system_name": kg.graph_data.get("system_name"), "system_summary": kg.graph_data.get("system_summary"), "graph_data": kg.graph_data } # Determine if this is a final KG using the same logic as the specific endpoint is_final = (kg.window_index is None and kg.window_total is not None) kg_dict = { "kg_id": kg.id, "id": kg.id, "filename": kg.filename, "created_at": kg.creation_timestamp.isoformat() if kg.creation_timestamp else None, "status": kg.status, "is_final": is_final, "window_index": kg.window_index, "window_total": kg.window_total, "window_start_char": kg.window_start_char, "window_end_char": kg.window_end_char, "processing_run_id": kg.processing_run_id, "entity_count": kg.entity_count or 0, "relation_count": kg.relation_count or 0, "is_enriched": kg.status == "enriched" or kg.status == "perturbed" or kg.status == "analyzed", "is_perturbed": kg.status == "perturbed" or kg.status == "analyzed", "is_analyzed": kg.status == "analyzed", "processing_metadata": processing_metadata, "system_name": system_info.get("system_name"), "system_summary": system_info.get("system_summary"), "graph_data": system_info.get("graph_data"), } kg_list.append(kg_dict) # Prepare the response result = trace.to_dict() result["knowledge_graphs"] = kg_list return { "status": "success", "trace": result } @router.delete("/{trace_id}") async def delete_trace_by_id( trace_id: str, delete_related_kgs: bool = False, db: Session = Depends(get_db) ): """ Delete a trace by ID. """ success = delete_trace(db, trace_id, delete_related_kgs) if not success: raise HTTPException(status_code=404, detail=f"Trace with ID {trace_id} not found") return { "status": "success", "message": f"Trace {trace_id} deleted successfully" } @router.get("/{trace_id}/knowledge-graphs") async def get_knowledge_graphs_for_trace_id(trace_id: str, db: Session = Depends(get_db)): """ Get all knowledge graphs associated with a specific trace. Separates final merged KGs from individual window KGs and groups them appropriately. Args: trace_id: The ID of the trace db: Database session Returns: A list of final knowledge graphs with their associated window KGs nested underneath """ trace = get_trace(db, trace_id) if not trace: raise HTTPException(status_code=404, detail=f"Trace with ID {trace_id} not found") try: # Get all knowledge graphs for this trace all_knowledge_graphs = get_knowledge_graphs_for_trace(db, trace.trace_id) # Update statistics for all knowledge graphs before processing update_kg_statistics(db, all_knowledge_graphs) # Separate final KGs from window KGs # Final KGs: window_index IS NULL AND window_total IS NOT NULL # Window KGs: window_index IS NOT NULL final_kgs = [] window_kgs = [] for kg in all_knowledge_graphs: if kg.window_index is None and kg.window_total is not None: # This is a final merged KG final_kgs.append(kg) elif kg.window_index is not None: # This is a window KG window_kgs.append(kg) # Skip KGs that don't fit either pattern (legacy or malformed) # Group window KGs by processing_run_id to associate them with final KGs window_kgs_by_run = {} for window_kg in window_kgs: run_id = window_kg.processing_run_id or 'legacy' if run_id not in window_kgs_by_run: window_kgs_by_run[run_id] = [] window_kgs_by_run[run_id].append(window_kg) # Build the final response kg_list = [] for final_kg in final_kgs: # Get associated window KGs for this final KG run_id = final_kg.processing_run_id or 'legacy' associated_windows = window_kgs_by_run.get(run_id, []) # Sort window KGs by window_index associated_windows.sort(key=lambda wkg: wkg.window_index if wkg.window_index is not None else 0) # Convert window KGs to dict format window_kg_list = [] for window_kg in associated_windows: window_data = { "kg_id": window_kg.id, "filename": window_kg.filename, "window_index": window_kg.window_index, "window_start_char": window_kg.window_start_char, "window_end_char": window_kg.window_end_char, "created_at": window_kg.creation_timestamp.isoformat() if window_kg.creation_timestamp else None, "status": window_kg.status, "entity_count": window_kg.entity_count or 0, "relation_count": window_kg.relation_count or 0 } window_kg_list.append(window_data) # Extract processing metadata from graph_data if available processing_metadata = {} if final_kg.graph_data and isinstance(final_kg.graph_data, dict): metadata = final_kg.graph_data.get("metadata", {}) processing_params = metadata.get("processing_params", {}) processing_metadata = { "method_name": processing_params.get("method_name", "unknown"), "splitter_type": processing_params.get("splitter_type", "unknown"), "window_size": processing_params.get("window_size", "unknown"), "overlap_size": processing_params.get("overlap_size", "unknown") } # Extract system information from graph_data system_info = {} if final_kg.graph_data and isinstance(final_kg.graph_data, dict): system_info = { "system_name": final_kg.graph_data.get("system_name"), "system_summary": final_kg.graph_data.get("system_summary"), "graph_data": final_kg.graph_data # Include full graph_data for frontend access } # Build final KG data final_kg_data = { "kg_id": final_kg.id, "filename": final_kg.filename, "created_at": final_kg.creation_timestamp.isoformat() if final_kg.creation_timestamp else None, "updated_at": final_kg.update_timestamp.isoformat() if final_kg.update_timestamp else None, "status": final_kg.status, "is_final": True, "window_total": final_kg.window_total, "window_index": final_kg.window_index, # Should be None for final KGs "processing_run_id": final_kg.processing_run_id, "entity_count": final_kg.entity_count or 0, "relation_count": final_kg.relation_count or 0, "is_enriched": final_kg.status == "enriched" or final_kg.status == "perturbed" or final_kg.status == "analyzed", "is_perturbed": final_kg.status == "perturbed" or final_kg.status == "analyzed", "is_analyzed": final_kg.status == "analyzed", "processing_metadata": processing_metadata, # Add processing metadata "system_name": system_info.get("system_name"), # Add system_name to top level "system_summary": system_info.get("system_summary"), # Add system_summary to top level "graph_data": system_info.get("graph_data"), # Add graph_data to top level "window_knowledge_graphs": window_kg_list } kg_list.append(final_kg_data) # Handle any orphaned window KGs (those without associated final KGs) processed_run_ids = {kg.processing_run_id or 'legacy' for kg in final_kgs} orphaned_windows = [] for run_id, windows in window_kgs_by_run.items(): if run_id not in processed_run_ids: orphaned_windows.extend(windows) # Add orphaned windows as individual entries (for backward compatibility) for orphaned_kg in orphaned_windows: # Extract processing metadata for orphaned KGs too orphaned_metadata = {} orphaned_system_info = {} if orphaned_kg.graph_data and isinstance(orphaned_kg.graph_data, dict): metadata = orphaned_kg.graph_data.get("metadata", {}) processing_params = metadata.get("processing_params", {}) orphaned_metadata = { "method_name": processing_params.get("method_name", "unknown"), "splitter_type": processing_params.get("splitter_type", "unknown"), "window_size": processing_params.get("window_size", "unknown"), "overlap_size": processing_params.get("overlap_size", "unknown") } orphaned_system_info = { "system_name": orphaned_kg.graph_data.get("system_name"), "system_summary": orphaned_kg.graph_data.get("system_summary"), "graph_data": orphaned_kg.graph_data } orphaned_data = { "kg_id": orphaned_kg.id, "filename": orphaned_kg.filename, "created_at": orphaned_kg.creation_timestamp.isoformat() if orphaned_kg.creation_timestamp else None, "updated_at": orphaned_kg.update_timestamp.isoformat() if orphaned_kg.update_timestamp else None, "status": orphaned_kg.status, "is_final": False, # Mark as not final "window_total": orphaned_kg.window_total, "window_index": orphaned_kg.window_index, "processing_run_id": orphaned_kg.processing_run_id, "entity_count": orphaned_kg.entity_count or 0, "relation_count": orphaned_kg.relation_count or 0, "is_enriched": orphaned_kg.status == "enriched" or orphaned_kg.status == "perturbed" or orphaned_kg.status == "analyzed", "is_perturbed": orphaned_kg.status == "perturbed" or orphaned_kg.status == "analyzed", "is_analyzed": orphaned_kg.status == "analyzed", "processing_metadata": orphaned_metadata, # Add processing metadata "system_name": orphaned_system_info.get("system_name"), # Add system_name to top level "system_summary": orphaned_system_info.get("system_summary"), # Add system_summary to top level "graph_data": orphaned_system_info.get("graph_data"), # Add graph_data to top level "window_knowledge_graphs": [] # No nested windows for orphaned KGs } kg_list.append(orphaned_data) # Sort final list by creation timestamp, newest first kg_list.sort(key=lambda kg: kg["created_at"] if kg["created_at"] else "", reverse=True) return { "status": "success", "knowledge_graphs": kg_list } except Exception as e: logger = logging.getLogger("agent_monitoring_server") logger.error(f"Error retrieving knowledge graphs for trace {trace_id}: {str(e)}") raise HTTPException(status_code=500, detail="An internal error occurred while retrieving knowledge graphs") @router.get("/{trace_id}/content") async def get_trace_content(trace_id: str, db: Session = Depends(get_db)): """ Get the content of a specific trace by ID. Args: trace_id: The ID of the trace db: Database session Returns: The content of the trace """ trace = get_trace(db, trace_id) if not trace: raise HTTPException(status_code=404, detail=f"Trace with ID {trace_id} not found") if not hasattr(trace, 'content') or not trace.content: raise HTTPException(status_code=404, detail=f"No content available for trace with ID {trace_id}") return { "status": "success", "content": trace.content } @router.post("/{trace_id}/content") async def update_trace_content( trace_id: str, content_data: dict = Body(...), db: Session = Depends(get_db) ): """ Update the content of a specific trace by ID. Args: trace_id: The ID of the trace content_data: Dictionary containing the new content db: Database session Returns: Success status """ trace = get_trace(db, trace_id) if not trace: raise HTTPException(status_code=404, detail=f"Trace with ID {trace_id} not found") # Extract content from request body if 'content' not in content_data: raise HTTPException(status_code=400, detail="Content field is required") new_content = content_data['content'] # Update trace content trace.content = new_content # Update character count and other metadata trace.character_count = len(new_content) # Estimate turn count (approximate) turn_markers = [ "user:", "assistant:", "system:", "human:", "ai:", "User:", "Assistant:", "System:", "Human:", "AI:" ] turn_count = 0 for marker in turn_markers: turn_count += new_content.count(marker) trace.turn_count = max(1, turn_count) # At least 1 turn # Update timestamp trace.update_timestamp = datetime.utcnow() # Save to database db.add(trace) db.commit() return { "status": "success", "message": "Trace content updated successfully" } @router.post("/{trace_id}/regenerate-metadata") async def regenerate_trace_metadata( trace_id: str, db: Session = Depends(get_db) ) -> Dict[str, Any]: """ Regenerate metadata for a trace using the universal parser. Args: trace_id: The ID of the trace to regenerate metadata for db: Database session Returns: Success status and metadata info """ try: # Get the trace trace = get_trace(db, trace_id) if not trace: raise HTTPException(status_code=404, detail=f"Trace {trace_id} not found") if not trace.content: raise HTTPException(status_code=400, detail="Trace has no content to analyze") # Use UniversalParserService to regenerate metadata from backend.services.universal_parser_service import UniversalParserService parser_service = UniversalParserService(db) # This will regenerate and store the schema_analytics metadata context_docs = parser_service.generate_trace_context_documents(trace_id, trace.content) # Refresh the trace to get updated metadata db.refresh(trace) logger = logging.getLogger("agent_monitoring_server") logger.info(f"Successfully regenerated metadata for trace {trace_id}") return { "status": "success", "message": "Trace metadata regenerated successfully", "context_documents_created": len(context_docs), "metadata_updated": bool(trace.trace_metadata and trace.trace_metadata.get("schema_analytics")) } except Exception as e: logger = logging.getLogger("agent_monitoring_server") logger.error(f"Error regenerating metadata for trace {trace_id}: {str(e)}") raise HTTPException(status_code=500, detail="An internal error occurred while regenerating metadata") @router.post("/{trace_id}/fix-long-lines") async def fix_long_lines( trace_id: str, request_data: Dict[str, Any], db: Session = Depends(get_db) ) -> Dict[str, Any]: """ Apply rule-based line splitting to a trace content. Args: trace_id: ID of the trace to process request_data: Request data containing max_line_length db: Database session Returns: Dictionary with processed content and metadata """ try: # Get the trace trace = get_trace(db, trace_id) if not trace: raise HTTPException(status_code=404, detail=f"Trace {trace_id} not found") # Get max_line_length from request, default to 800 max_line_length = request_data.get("max_line_length", 800) # Apply line splitting using ChunkingService from agentgraph.input.text_processing import ChunkingService chunking_service = ChunkingService() processed_content = chunking_service.fix_long_lines_in_content( trace.content, max_line_length ) # Update the trace content in database update_trace_content(db, trace.trace_id, processed_content) # Calculate statistics original_lines = len(trace.content.split('\n')) processed_lines = len(processed_content.split('\n')) return { "success": True, "content": processed_content, "statistics": { "original_lines": original_lines, "processed_lines": processed_lines, "lines_added": processed_lines - original_lines, "max_line_length": max_line_length }, "message": f"Applied line splitting: {original_lines} → {processed_lines} lines" } except Exception as e: logger = logging.getLogger("agent_monitoring_server") logger.error(f"Error applying line splitting to trace {trace_id}: {str(e)}") raise HTTPException(status_code=500, detail="An internal error occurred while applying line splitting") @router.get("/{trace_id}/content-numbered") async def get_trace_content_numbered(trace_id: str, db: Session = Depends(get_db)): """ Return the trace content with line numbers already added using the same TraceLineNumberProcessor that the extraction pipeline employs. This guarantees front-end alignment with reference ranges. """ trace = get_trace(db, trace_id) if not trace: raise HTTPException(status_code=404, detail=f"Trace with ID {trace_id} not found") if not hasattr(trace, "content") or not trace.content: raise HTTPException(status_code=404, detail="Trace has no content") try: from agentgraph.input.text_processing.trace_line_processor import TraceLineNumberProcessor processor = TraceLineNumberProcessor(max_line_length=120) numbered, _ = processor.add_line_numbers(trace.content) except Exception as err: logging.getLogger("agent_monitoring_server").error( f"Failed to generate numbered trace for {trace_id}: {err}" ) raise HTTPException(status_code=500, detail="Failed to generate numbered trace") return {"status": "success", "content": numbered} # Context Documents Endpoints @router.post("/{trace_id}/context") async def create_context_document( trace_id: str, request: CreateContextRequest, db: Session = Depends(get_db) ) -> ContextDocumentResponse: """Create a new context document for a trace.""" try: context_service = ContextService(db) document = context_service.create_context_document( trace_id=trace_id, title=request.title, document_type=request.document_type, content=request.content, file_name=request.file_name ) return ContextDocumentResponse( success=True, message="Context document created successfully", data=document ) except ValueError as e: raise HTTPException(status_code=400, detail=str(e)) except Exception as e: raise HTTPException(status_code=500, detail="An internal error occurred while creating context document") @router.get("/{trace_id}/context") async def get_context_documents( trace_id: str, db: Session = Depends(get_db) ) -> List[ContextDocument]: """Get all context documents for a trace.""" try: context_service = ContextService(db) documents = context_service.get_context_documents(trace_id) return documents except ValueError as e: raise HTTPException(status_code=404, detail=str(e)) except Exception as e: raise HTTPException(status_code=500, detail="An internal error occurred while retrieving context documents") @router.put("/{trace_id}/context/{context_id}") async def update_context_document( trace_id: str, context_id: str, request: UpdateContextRequest, db: Session = Depends(get_db) ) -> ContextDocumentResponse: """Update an existing context document.""" try: context_service = ContextService(db) document = context_service.update_context_document( trace_id=trace_id, context_id=context_id, updates=request ) return ContextDocumentResponse( success=True, message="Context document updated successfully", data=document ) except ValueError as e: raise HTTPException(status_code=404, detail=str(e)) except Exception as e: raise HTTPException(status_code=500, detail="An internal error occurred while updating context document") @router.delete("/{trace_id}/context/{context_id}") async def delete_context_document( trace_id: str, context_id: str, db: Session = Depends(get_db) ) -> ContextDocumentResponse: """Delete a context document.""" try: context_service = ContextService(db) success = context_service.delete_context_document(trace_id, context_id) return ContextDocumentResponse( success=success, message="Context document deleted successfully" ) except ValueError as e: raise HTTPException(status_code=404, detail=str(e)) except Exception as e: raise HTTPException(status_code=500, detail="An internal error occurred while deleting context document") @router.post("/{trace_id}/context/upload") async def upload_context_file( trace_id: str, file: UploadFile = File(...), title: str = Form(...), document_type: ContextDocumentType = Form(...), db: Session = Depends(get_db) ) -> ContextDocumentResponse: """Upload a file as a context document.""" try: # Validate file type allowed_extensions = ['.txt', '.md', '.json', '.csv'] if not any(file.filename.lower().endswith(ext) for ext in allowed_extensions): raise HTTPException( status_code=400, detail=f"File type not allowed. Supported types: {', '.join(allowed_extensions)}" ) # Validate file size (1MB limit) if file.size > 1024 * 1024: raise HTTPException(status_code=400, detail="File size exceeds 1MB limit") # Read file content content = await file.read() file_content = content.decode('utf-8') context_service = ContextService(db) document = context_service.process_uploaded_file( file_content=file_content, trace_id=trace_id, title=title, document_type=document_type, file_name=file.filename ) return ContextDocumentResponse( success=True, message="Context file uploaded successfully", data=document ) except UnicodeDecodeError: raise HTTPException(status_code=400, detail="File must be UTF-8 encoded text") except ValueError as e: raise HTTPException(status_code=400, detail=str(e)) except Exception as e: raise HTTPException(status_code=500, detail="An internal error occurred while uploading context file") @router.post("/{trace_id}/context/auto-generate") async def auto_generate_context_documents_endpoint( trace_id: str, force: bool = False, db: Session = Depends(get_db) ) -> Dict[str, Any]: """ Auto-generate context documents for a trace using the universal parser. Args: trace_id: ID of the trace force: Whether to remove existing auto-generated context documents first db: Database session Returns: Status and information about generated context documents """ try: # Get the trace trace = get_trace(db, trace_id) if not trace: raise HTTPException(status_code=404, detail=f"Trace {trace_id} not found") if not trace.content: raise HTTPException(status_code=400, detail=f"Trace {trace_id} has no content") # Generate or regenerate context documents from backend.services.universal_parser_service import regenerate_context_documents created_docs = regenerate_context_documents(trace_id, trace.content, db, force=force) return { "status": "success", "message": f"{'Regenerated' if force else 'Generated'} {len(created_docs)} context documents", "trace_id": trace_id, "context_documents_generated": len(created_docs), "force_regenerate": force, "documents": [{"title": doc.get("title"), "type": doc.get("document_type")} for doc in created_docs] } except HTTPException: raise except Exception as e: logger = logging.getLogger("agent_monitoring_server") logger.error(f"Error auto-generating context documents for trace {trace_id}: {str(e)}") raise HTTPException(status_code=500, detail="An internal error occurred while auto-generating context documents") @router.get("/{trace_id}/enhanced-statistics") async def get_enhanced_trace_statistics(trace_id: str, db: Session = Depends(get_db)): """ Get enhanced trace statistics including cost information and detailed token analytics. Args: trace_id: The ID of the trace db: Database session Returns: Enhanced statistics with cost calculations """ trace = get_trace(db, trace_id) if not trace: raise HTTPException(status_code=404, detail=f"Trace with ID {trace_id} not found") try: # Get basic trace information basic_stats = { "trace_id": trace.trace_id, "character_count": trace.character_count or 0, "turn_count": trace.turn_count or 0, "upload_timestamp": trace.upload_timestamp.isoformat() if trace.upload_timestamp else None, "trace_type": trace.trace_type, } # Get schema analytics if available schema_analytics = None if trace.trace_metadata and trace.trace_metadata.get("schema_analytics"): schema_analytics = trace.trace_metadata["schema_analytics"] enhanced_stats = {"basic": basic_stats} if schema_analytics: # Get token analytics token_analytics = schema_analytics.get("numerical_overview", {}).get("token_analytics", {}) prompt_analytics = schema_analytics.get("prompt_analytics", {}) # Calculate cost information cost_info = cost_service.calculate_trace_costs(schema_analytics) # Enhanced token statistics enhanced_stats["tokens"] = { "total_tokens": token_analytics.get("total_tokens", 0), "total_prompt_tokens": token_analytics.get("total_prompt_tokens", 0), "total_completion_tokens": token_analytics.get("total_completion_tokens", 0), "avg_tokens_per_component": token_analytics.get("avg_tokens_per_component", 0), "prompt_to_completion_ratio": token_analytics.get("prompt_to_completion_ratio", 0), } # Enhanced prompt call statistics enhanced_stats["prompt_calls"] = { "total_calls": prompt_analytics.get("prompt_calls_detected", 0), "successful_calls": prompt_analytics.get("successful_calls", 0), "failed_calls": prompt_analytics.get("failed_calls", 0), "avg_prompt_tokens_per_call": cost_info.get("avg_prompt_tokens_per_call", 0), "avg_completion_tokens_per_call": cost_info.get("avg_completion_tokens_per_call", 0), } # Cost information enhanced_stats["cost"] = { "total_cost_usd": cost_info.get("total_cost_usd", 0.0), "input_cost_usd": cost_info.get("input_cost_usd", 0.0), "output_cost_usd": cost_info.get("output_cost_usd", 0.0), "avg_cost_per_call_usd": cost_info.get("avg_cost_per_call_usd", 0.0), "model_used": cost_info.get("model_used", "gpt-5-mini"), "pricing_source": cost_info.get("pricing_source", "fallback"), "cost_efficiency_tokens_per_dollar": cost_info.get("cost_efficiency_tokens_per_dollar", 0), "model_metadata": cost_info.get("model_metadata"), } # Performance analytics timing_analytics = schema_analytics.get("numerical_overview", {}).get("timing_analytics", {}) enhanced_stats["performance"] = { "total_execution_time_ms": timing_analytics.get("total_execution_time_ms", 0), "total_execution_time_seconds": timing_analytics.get("total_execution_time_seconds", 0), "avg_execution_time_ms": timing_analytics.get("avg_execution_time_ms", 0), "max_execution_time_ms": timing_analytics.get("max_execution_time_ms", 0), "min_execution_time_ms": timing_analytics.get("min_execution_time_ms", 0), } # Component statistics component_stats = schema_analytics.get("numerical_overview", {}).get("component_stats", {}) enhanced_stats["components"] = { "total_components": component_stats.get("total_components", 0), "unique_component_types": component_stats.get("unique_component_types", 0), "max_depth": component_stats.get("max_depth", 0), "success_rate": component_stats.get("success_rate", 0), "error_components": component_stats.get("error_components", 0), } else: # Provide basic fallback statistics when schema analytics is not available enhanced_stats.update({ "tokens": { "total_tokens": 0, "total_prompt_tokens": 0, "total_completion_tokens": 0, "avg_tokens_per_component": 0, "prompt_to_completion_ratio": 0, }, "prompt_calls": { "total_calls": 0, "successful_calls": 0, "failed_calls": 0, "avg_prompt_tokens_per_call": 0, "avg_completion_tokens_per_call": 0, }, "cost": { "total_cost_usd": 0.0, "input_cost_usd": 0.0, "output_cost_usd": 0.0, "avg_cost_per_call_usd": 0.0, "model_used": "unknown", "pricing_source": "unavailable", "cost_efficiency_tokens_per_dollar": 0, }, "performance": { "total_execution_time_ms": 0, "total_execution_time_seconds": 0, "avg_execution_time_ms": 0, "max_execution_time_ms": 0, "min_execution_time_ms": 0, }, "components": { "total_components": 0, "unique_component_types": 0, "max_depth": 0, "success_rate": 0, "error_components": 0, } }) return { "status": "success", "enhanced_statistics": enhanced_stats, "has_schema_analytics": schema_analytics is not None } except Exception as e: logger = logging.getLogger("agent_monitoring_server") logger.error(f"Error generating enhanced statistics for trace {trace_id}: {str(e)}") raise HTTPException(status_code=500, detail="An internal error occurred while generating enhanced statistics") class ChunkingConfig(BaseModel): min_chunk_size: Optional[int] = None max_chunk_size: Optional[int] = None class ProcessTraceRequest(BaseModel): splitter_type: str = "agent_semantic" force_regenerate: bool = True method_name: str = "production" model: str = "gpt-5-mini" chunking_config: Optional[ChunkingConfig] = None @router.post("/{trace_id}/process") async def process_trace( trace_id: str, background_tasks: BackgroundTasks, request: ProcessTraceRequest, session: Session = Depends(get_db) ): """ Process a trace to create a knowledge graph using sliding window analysis. """ splitter_type = request.splitter_type force_regenerate = request.force_regenerate method_name = request.method_name model = request.model chunking_config = request.chunking_config logger.info(f"Processing trace {trace_id} with splitter_type={splitter_type}, force_regenerate={force_regenerate}, method_name={method_name}, model={model}, chunking_config={chunking_config}") valid_splitters = ["agent_semantic", "json", "prompt_interaction"] if splitter_type not in valid_splitters: raise HTTPException( status_code=400, detail=f"Invalid splitter_type '{splitter_type}'. Must be one of: {', '.join(valid_splitters)}" ) from agentgraph.shared.method_registry import is_valid_method, get_method_names if not is_valid_method(method_name): available_methods = get_method_names() raise HTTPException( status_code=400, detail=f"Invalid method_name '{method_name}'. Must be one of: {', '.join(available_methods)}" ) task_id = f"process_trace_{trace_id}_{int(time.time())}" try: task_message = f"Processing trace {trace_id} with {splitter_type} splitter, {method_name} method, and {model} model" if force_regenerate: task_message += " (force regenerate enabled)" create_task(task_id, "process_trace", task_message) background_tasks.add_task(process_trace_task, trace_id, session, task_id, splitter_type, force_regenerate, method_name, model, chunking_config) return { "status": "success", "task_id": task_id, "splitter_type": splitter_type, "force_regenerate": force_regenerate, "method_name": method_name, "model": model, "message": f"Started processing trace {trace_id} with {splitter_type} splitter, {method_name} method, and {model} model" + (" (force regenerate enabled)" if force_regenerate else "") } except Exception as e: logger.error(f"Error starting trace processing task: {e}") raise HTTPException( status_code=500, detail="An internal error occurred while starting trace processing" )