Spaces:
Sleeping
Sleeping
| """ | |
| API endpoints for working with traces | |
| """ | |
| from fastapi import APIRouter, Depends, HTTPException, UploadFile, File, Form, BackgroundTasks, Body | |
| from typing import List, Dict, Any, Optional | |
| from sqlalchemy.orm import Session | |
| import logging | |
| from datetime import datetime | |
| from pydantic import BaseModel | |
| import time | |
| from backend.dependencies import get_db | |
| from backend.database.utils import get_all_traces, get_trace, delete_trace, get_knowledge_graphs_for_trace, save_trace, update_trace_content | |
| from backend.services.context_service import ContextService | |
| from backend.services.cost_calculation_service import cost_service | |
| from backend.models import ( | |
| ContextDocument, | |
| CreateContextRequest, | |
| UpdateContextRequest, | |
| ContextDocumentResponse, | |
| ContextDocumentType | |
| ) | |
| from backend.services.trace_management_service import TraceManagementService | |
| from backend.services.processing_service import process_trace_task, PipelineError | |
| from backend.services.task_service import create_task | |
| logger = logging.getLogger(__name__) | |
| router = APIRouter( | |
| prefix="/api/traces", | |
| tags=["traces"], | |
| ) | |
| def update_kg_statistics(session: Session, kg_list: List) -> None: | |
| """ | |
| Update entity_count and relation_count for knowledge graphs if they're missing but graph_data exists. | |
| Args: | |
| session: Database session | |
| kg_list: List of knowledge graph database objects | |
| """ | |
| updated_count = 0 | |
| for kg in kg_list: | |
| if kg and kg.graph_data: | |
| needs_update = False | |
| # Update entity count if missing/zero but entities exist in graph_data | |
| if kg.entity_count is None or kg.entity_count == 0: | |
| entities = kg.graph_data.get("entities", []) | |
| if entities: | |
| kg.entity_count = len(entities) | |
| needs_update = True | |
| # Update relation count if missing/zero but relations exist in graph_data | |
| if kg.relation_count is None or kg.relation_count == 0: | |
| relations = kg.graph_data.get("relations", []) | |
| if relations: | |
| kg.relation_count = len(relations) | |
| needs_update = True | |
| if needs_update: | |
| session.add(kg) | |
| updated_count += 1 | |
| if updated_count > 0: | |
| session.commit() | |
| logging.getLogger("agent_monitoring_server").info(f"Updated entity/relation counts for {updated_count} knowledge graphs") | |
| async def list_traces(db: Session = Depends(get_db)): | |
| """ | |
| List all traces stored in the database with their associated knowledge graphs. | |
| """ | |
| traces = get_all_traces(db) | |
| # Convert to a list of dictionaries and include knowledge graphs | |
| trace_list = [] | |
| for trace in traces: | |
| # Get related knowledge graphs for each trace | |
| knowledge_graphs = get_knowledge_graphs_for_trace(db, trace.trace_id) | |
| # Convert knowledge graphs to proper dict format with is_final field | |
| kg_list = [] | |
| for kg in knowledge_graphs: | |
| # Extract processing metadata from graph_data if available | |
| processing_metadata = {} | |
| system_info = {} | |
| if kg.graph_data and isinstance(kg.graph_data, dict): | |
| metadata = kg.graph_data.get("metadata", {}) | |
| processing_params = metadata.get("processing_params", {}) | |
| processing_metadata = { | |
| "method_name": processing_params.get("method_name", "unknown"), | |
| "splitter_type": processing_params.get("splitter_type", "unknown"), | |
| "window_size": processing_params.get("window_size", "unknown"), | |
| "overlap_size": processing_params.get("overlap_size", "unknown") | |
| } | |
| system_info = { | |
| "system_name": kg.graph_data.get("system_name"), | |
| "system_summary": kg.graph_data.get("system_summary"), | |
| "graph_data": kg.graph_data | |
| } | |
| # Determine if this is a final KG using the same logic as the specific endpoint | |
| is_final = (kg.window_index is None and kg.window_total is not None) | |
| kg_dict = { | |
| "kg_id": kg.id, | |
| "id": kg.id, | |
| "filename": kg.filename, | |
| "created_at": kg.creation_timestamp.isoformat() if kg.creation_timestamp else None, | |
| "status": kg.status, | |
| "is_final": is_final, | |
| "window_index": kg.window_index, | |
| "window_total": kg.window_total, | |
| "window_start_char": kg.window_start_char, | |
| "window_end_char": kg.window_end_char, | |
| "processing_run_id": kg.processing_run_id, | |
| "entity_count": kg.entity_count or 0, | |
| "relation_count": kg.relation_count or 0, | |
| "is_enriched": kg.status == "enriched" or kg.status == "perturbed" or kg.status == "analyzed", | |
| "is_perturbed": kg.status == "perturbed" or kg.status == "analyzed", | |
| "is_analyzed": kg.status == "analyzed", | |
| "processing_metadata": processing_metadata, | |
| "system_name": system_info.get("system_name"), | |
| "system_summary": system_info.get("system_summary"), | |
| "graph_data": system_info.get("graph_data"), | |
| } | |
| kg_list.append(kg_dict) | |
| # Prepare the trace response with knowledge graphs | |
| trace_dict = trace.to_dict() | |
| trace_dict["knowledge_graphs"] = kg_list | |
| trace_list.append(trace_dict) | |
| return { | |
| "status": "success", | |
| "traces": trace_list | |
| } | |
| async def upload_trace( | |
| trace_file: UploadFile = File(...), | |
| db: Session = Depends(get_db) | |
| ): | |
| """ | |
| Upload a trace file to the database. | |
| Args: | |
| trace_file: The trace file to upload | |
| db: Database session | |
| Returns: | |
| Status and trace ID | |
| """ | |
| try: | |
| # Read the uploaded file content | |
| file_content = await trace_file.read() | |
| file_content_str = file_content.decode('utf-8') | |
| try: | |
| # Import trace analysis utilities | |
| from agentgraph.input.trace_management import analyze_trace_characteristics | |
| # Analyze the trace to determine its characteristics | |
| trace_analysis = analyze_trace_characteristics(file_content_str) | |
| # Save the trace to the database | |
| trace = save_trace( | |
| session=db, | |
| content=file_content_str, | |
| filename=trace_file.filename, | |
| title=f"Trace from {trace_file.filename}", | |
| description=f"Uploaded via Stage Processor on {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}", | |
| trace_type=trace_analysis.get('trace_type', "user_upload"), | |
| trace_source="stage_processor", | |
| tags=["stage_processor"], | |
| trace_metadata=trace_analysis | |
| ) | |
| logger = logging.getLogger("agent_monitoring_server") | |
| logger.info(f"Trace saved to database: ID={trace.id}, trace_id={trace.trace_id}") | |
| logger.info(f"Trace characteristics: {trace_analysis.get('trace_type', 'unknown')} type, " + | |
| f"{trace_analysis.get('line_count', 0)} lines, {trace_analysis.get('total_length', 0)} chars") | |
| # Auto-generate context documents using universal parser | |
| context_docs_created = 0 | |
| try: | |
| from backend.services.universal_parser_service import auto_generate_context_documents | |
| created_docs = auto_generate_context_documents(trace.trace_id, file_content_str, db) | |
| context_docs_created = len(created_docs) | |
| if context_docs_created > 0: | |
| logger.info(f"Auto-generated {context_docs_created} context documents for trace {trace.trace_id}") | |
| except Exception as e: | |
| logger.warning(f"Failed to auto-generate context documents for trace {trace.trace_id}: {str(e)}") | |
| return { | |
| "status": "success", | |
| "message": "Trace uploaded successfully", | |
| "trace_id": trace.trace_id, | |
| "title": trace.title, | |
| "character_count": trace.character_count, | |
| "turn_count": trace.turn_count, | |
| "context_documents_generated": context_docs_created | |
| } | |
| except ImportError: | |
| # Fall back to basic trace saving if trace_uploader is not available | |
| logger = logging.getLogger("agent_monitoring_server") | |
| logger.warning("Could not import trace analysis utilities") | |
| # Save the trace to the database without analysis | |
| trace = save_trace( | |
| session=db, | |
| content=file_content_str, | |
| filename=trace_file.filename, | |
| title=f"Trace from {trace_file.filename}", | |
| trace_type="user_upload", | |
| trace_source="stage_processor", | |
| tags=["stage_processor"] | |
| ) | |
| logger.info(f"Trace saved to database: ID={trace.id}, trace_id={trace.trace_id}") | |
| # Auto-generate context documents using universal parser | |
| context_docs_created = 0 | |
| try: | |
| from backend.services.universal_parser_service import auto_generate_context_documents | |
| created_docs = auto_generate_context_documents(trace.trace_id, file_content_str, db) | |
| context_docs_created = len(created_docs) | |
| if context_docs_created > 0: | |
| logger.info(f"Auto-generated {context_docs_created} context documents for trace {trace.trace_id}") | |
| except Exception as e: | |
| logger.warning(f"Failed to auto-generate context documents for trace {trace.trace_id}: {str(e)}") | |
| return { | |
| "status": "success", | |
| "message": "Trace uploaded successfully", | |
| "trace_id": trace.trace_id, | |
| "title": trace.title, | |
| "character_count": trace.character_count, | |
| "turn_count": trace.turn_count, | |
| "context_documents_generated": context_docs_created | |
| } | |
| except Exception as e: | |
| logger = logging.getLogger("agent_monitoring_server") | |
| logger.error(f"Error uploading trace: {str(e)}") | |
| raise HTTPException(status_code=500, detail="An internal error occurred while uploading trace") | |
| async def get_trace_by_id(trace_id: str, db: Session = Depends(get_db)): | |
| """ | |
| Get a specific trace by ID. | |
| """ | |
| trace = get_trace(db, trace_id) | |
| if not trace: | |
| raise HTTPException(status_code=404, detail=f"Trace with ID {trace_id} not found") | |
| # Get related knowledge graphs | |
| knowledge_graphs = get_knowledge_graphs_for_trace(db, trace.trace_id) | |
| # Convert knowledge graphs to proper dict format with is_final field | |
| kg_list = [] | |
| for kg in knowledge_graphs: | |
| # Extract processing metadata from graph_data if available | |
| processing_metadata = {} | |
| system_info = {} | |
| if kg.graph_data and isinstance(kg.graph_data, dict): | |
| metadata = kg.graph_data.get("metadata", {}) | |
| processing_params = metadata.get("processing_params", {}) | |
| processing_metadata = { | |
| "method_name": processing_params.get("method_name", "unknown"), | |
| "splitter_type": processing_params.get("splitter_type", "unknown"), | |
| "window_size": processing_params.get("window_size", "unknown"), | |
| "overlap_size": processing_params.get("overlap_size", "unknown") | |
| } | |
| system_info = { | |
| "system_name": kg.graph_data.get("system_name"), | |
| "system_summary": kg.graph_data.get("system_summary"), | |
| "graph_data": kg.graph_data | |
| } | |
| # Determine if this is a final KG using the same logic as the specific endpoint | |
| is_final = (kg.window_index is None and kg.window_total is not None) | |
| kg_dict = { | |
| "kg_id": kg.id, | |
| "id": kg.id, | |
| "filename": kg.filename, | |
| "created_at": kg.creation_timestamp.isoformat() if kg.creation_timestamp else None, | |
| "status": kg.status, | |
| "is_final": is_final, | |
| "window_index": kg.window_index, | |
| "window_total": kg.window_total, | |
| "window_start_char": kg.window_start_char, | |
| "window_end_char": kg.window_end_char, | |
| "processing_run_id": kg.processing_run_id, | |
| "entity_count": kg.entity_count or 0, | |
| "relation_count": kg.relation_count or 0, | |
| "is_enriched": kg.status == "enriched" or kg.status == "perturbed" or kg.status == "analyzed", | |
| "is_perturbed": kg.status == "perturbed" or kg.status == "analyzed", | |
| "is_analyzed": kg.status == "analyzed", | |
| "processing_metadata": processing_metadata, | |
| "system_name": system_info.get("system_name"), | |
| "system_summary": system_info.get("system_summary"), | |
| "graph_data": system_info.get("graph_data"), | |
| } | |
| kg_list.append(kg_dict) | |
| # Prepare the response | |
| result = trace.to_dict() | |
| result["knowledge_graphs"] = kg_list | |
| return { | |
| "status": "success", | |
| "trace": result | |
| } | |
| async def delete_trace_by_id( | |
| trace_id: str, | |
| delete_related_kgs: bool = False, | |
| db: Session = Depends(get_db) | |
| ): | |
| """ | |
| Delete a trace by ID. | |
| """ | |
| success = delete_trace(db, trace_id, delete_related_kgs) | |
| if not success: | |
| raise HTTPException(status_code=404, detail=f"Trace with ID {trace_id} not found") | |
| return { | |
| "status": "success", | |
| "message": f"Trace {trace_id} deleted successfully" | |
| } | |
| async def get_knowledge_graphs_for_trace_id(trace_id: str, db: Session = Depends(get_db)): | |
| """ | |
| Get all knowledge graphs associated with a specific trace. | |
| Separates final merged KGs from individual window KGs and groups them appropriately. | |
| Args: | |
| trace_id: The ID of the trace | |
| db: Database session | |
| Returns: | |
| A list of final knowledge graphs with their associated window KGs nested underneath | |
| """ | |
| trace = get_trace(db, trace_id) | |
| if not trace: | |
| raise HTTPException(status_code=404, detail=f"Trace with ID {trace_id} not found") | |
| try: | |
| # Get all knowledge graphs for this trace | |
| all_knowledge_graphs = get_knowledge_graphs_for_trace(db, trace.trace_id) | |
| # Update statistics for all knowledge graphs before processing | |
| update_kg_statistics(db, all_knowledge_graphs) | |
| # Separate final KGs from window KGs | |
| # Final KGs: window_index IS NULL AND window_total IS NOT NULL | |
| # Window KGs: window_index IS NOT NULL | |
| final_kgs = [] | |
| window_kgs = [] | |
| for kg in all_knowledge_graphs: | |
| if kg.window_index is None and kg.window_total is not None: | |
| # This is a final merged KG | |
| final_kgs.append(kg) | |
| elif kg.window_index is not None: | |
| # This is a window KG | |
| window_kgs.append(kg) | |
| # Skip KGs that don't fit either pattern (legacy or malformed) | |
| # Group window KGs by processing_run_id to associate them with final KGs | |
| window_kgs_by_run = {} | |
| for window_kg in window_kgs: | |
| run_id = window_kg.processing_run_id or 'legacy' | |
| if run_id not in window_kgs_by_run: | |
| window_kgs_by_run[run_id] = [] | |
| window_kgs_by_run[run_id].append(window_kg) | |
| # Build the final response | |
| kg_list = [] | |
| for final_kg in final_kgs: | |
| # Get associated window KGs for this final KG | |
| run_id = final_kg.processing_run_id or 'legacy' | |
| associated_windows = window_kgs_by_run.get(run_id, []) | |
| # Sort window KGs by window_index | |
| associated_windows.sort(key=lambda wkg: wkg.window_index if wkg.window_index is not None else 0) | |
| # Convert window KGs to dict format | |
| window_kg_list = [] | |
| for window_kg in associated_windows: | |
| window_data = { | |
| "kg_id": window_kg.id, | |
| "filename": window_kg.filename, | |
| "window_index": window_kg.window_index, | |
| "window_start_char": window_kg.window_start_char, | |
| "window_end_char": window_kg.window_end_char, | |
| "created_at": window_kg.creation_timestamp.isoformat() if window_kg.creation_timestamp else None, | |
| "status": window_kg.status, | |
| "entity_count": window_kg.entity_count or 0, | |
| "relation_count": window_kg.relation_count or 0 | |
| } | |
| window_kg_list.append(window_data) | |
| # Extract processing metadata from graph_data if available | |
| processing_metadata = {} | |
| if final_kg.graph_data and isinstance(final_kg.graph_data, dict): | |
| metadata = final_kg.graph_data.get("metadata", {}) | |
| processing_params = metadata.get("processing_params", {}) | |
| processing_metadata = { | |
| "method_name": processing_params.get("method_name", "unknown"), | |
| "splitter_type": processing_params.get("splitter_type", "unknown"), | |
| "window_size": processing_params.get("window_size", "unknown"), | |
| "overlap_size": processing_params.get("overlap_size", "unknown") | |
| } | |
| # Extract system information from graph_data | |
| system_info = {} | |
| if final_kg.graph_data and isinstance(final_kg.graph_data, dict): | |
| system_info = { | |
| "system_name": final_kg.graph_data.get("system_name"), | |
| "system_summary": final_kg.graph_data.get("system_summary"), | |
| "graph_data": final_kg.graph_data # Include full graph_data for frontend access | |
| } | |
| # Build final KG data | |
| final_kg_data = { | |
| "kg_id": final_kg.id, | |
| "filename": final_kg.filename, | |
| "created_at": final_kg.creation_timestamp.isoformat() if final_kg.creation_timestamp else None, | |
| "updated_at": final_kg.update_timestamp.isoformat() if final_kg.update_timestamp else None, | |
| "status": final_kg.status, | |
| "is_final": True, | |
| "window_total": final_kg.window_total, | |
| "window_index": final_kg.window_index, # Should be None for final KGs | |
| "processing_run_id": final_kg.processing_run_id, | |
| "entity_count": final_kg.entity_count or 0, | |
| "relation_count": final_kg.relation_count or 0, | |
| "is_enriched": final_kg.status == "enriched" or final_kg.status == "perturbed" or final_kg.status == "analyzed", | |
| "is_perturbed": final_kg.status == "perturbed" or final_kg.status == "analyzed", | |
| "is_analyzed": final_kg.status == "analyzed", | |
| "processing_metadata": processing_metadata, # Add processing metadata | |
| "system_name": system_info.get("system_name"), # Add system_name to top level | |
| "system_summary": system_info.get("system_summary"), # Add system_summary to top level | |
| "graph_data": system_info.get("graph_data"), # Add graph_data to top level | |
| "window_knowledge_graphs": window_kg_list | |
| } | |
| kg_list.append(final_kg_data) | |
| # Handle any orphaned window KGs (those without associated final KGs) | |
| processed_run_ids = {kg.processing_run_id or 'legacy' for kg in final_kgs} | |
| orphaned_windows = [] | |
| for run_id, windows in window_kgs_by_run.items(): | |
| if run_id not in processed_run_ids: | |
| orphaned_windows.extend(windows) | |
| # Add orphaned windows as individual entries (for backward compatibility) | |
| for orphaned_kg in orphaned_windows: | |
| # Extract processing metadata for orphaned KGs too | |
| orphaned_metadata = {} | |
| orphaned_system_info = {} | |
| if orphaned_kg.graph_data and isinstance(orphaned_kg.graph_data, dict): | |
| metadata = orphaned_kg.graph_data.get("metadata", {}) | |
| processing_params = metadata.get("processing_params", {}) | |
| orphaned_metadata = { | |
| "method_name": processing_params.get("method_name", "unknown"), | |
| "splitter_type": processing_params.get("splitter_type", "unknown"), | |
| "window_size": processing_params.get("window_size", "unknown"), | |
| "overlap_size": processing_params.get("overlap_size", "unknown") | |
| } | |
| orphaned_system_info = { | |
| "system_name": orphaned_kg.graph_data.get("system_name"), | |
| "system_summary": orphaned_kg.graph_data.get("system_summary"), | |
| "graph_data": orphaned_kg.graph_data | |
| } | |
| orphaned_data = { | |
| "kg_id": orphaned_kg.id, | |
| "filename": orphaned_kg.filename, | |
| "created_at": orphaned_kg.creation_timestamp.isoformat() if orphaned_kg.creation_timestamp else None, | |
| "updated_at": orphaned_kg.update_timestamp.isoformat() if orphaned_kg.update_timestamp else None, | |
| "status": orphaned_kg.status, | |
| "is_final": False, # Mark as not final | |
| "window_total": orphaned_kg.window_total, | |
| "window_index": orphaned_kg.window_index, | |
| "processing_run_id": orphaned_kg.processing_run_id, | |
| "entity_count": orphaned_kg.entity_count or 0, | |
| "relation_count": orphaned_kg.relation_count or 0, | |
| "is_enriched": orphaned_kg.status == "enriched" or orphaned_kg.status == "perturbed" or orphaned_kg.status == "analyzed", | |
| "is_perturbed": orphaned_kg.status == "perturbed" or orphaned_kg.status == "analyzed", | |
| "is_analyzed": orphaned_kg.status == "analyzed", | |
| "processing_metadata": orphaned_metadata, # Add processing metadata | |
| "system_name": orphaned_system_info.get("system_name"), # Add system_name to top level | |
| "system_summary": orphaned_system_info.get("system_summary"), # Add system_summary to top level | |
| "graph_data": orphaned_system_info.get("graph_data"), # Add graph_data to top level | |
| "window_knowledge_graphs": [] # No nested windows for orphaned KGs | |
| } | |
| kg_list.append(orphaned_data) | |
| # Sort final list by creation timestamp, newest first | |
| kg_list.sort(key=lambda kg: kg["created_at"] if kg["created_at"] else "", reverse=True) | |
| return { | |
| "status": "success", | |
| "knowledge_graphs": kg_list | |
| } | |
| except Exception as e: | |
| logger = logging.getLogger("agent_monitoring_server") | |
| logger.error(f"Error retrieving knowledge graphs for trace {trace_id}: {str(e)}") | |
| raise HTTPException(status_code=500, detail="An internal error occurred while retrieving knowledge graphs") | |
| async def get_trace_content(trace_id: str, db: Session = Depends(get_db)): | |
| """ | |
| Get the content of a specific trace by ID. | |
| Args: | |
| trace_id: The ID of the trace | |
| db: Database session | |
| Returns: | |
| The content of the trace | |
| """ | |
| trace = get_trace(db, trace_id) | |
| if not trace: | |
| raise HTTPException(status_code=404, detail=f"Trace with ID {trace_id} not found") | |
| if not hasattr(trace, 'content') or not trace.content: | |
| raise HTTPException(status_code=404, detail=f"No content available for trace with ID {trace_id}") | |
| return { | |
| "status": "success", | |
| "content": trace.content | |
| } | |
| async def update_trace_content( | |
| trace_id: str, | |
| content_data: dict = Body(...), | |
| db: Session = Depends(get_db) | |
| ): | |
| """ | |
| Update the content of a specific trace by ID. | |
| Args: | |
| trace_id: The ID of the trace | |
| content_data: Dictionary containing the new content | |
| db: Database session | |
| Returns: | |
| Success status | |
| """ | |
| trace = get_trace(db, trace_id) | |
| if not trace: | |
| raise HTTPException(status_code=404, detail=f"Trace with ID {trace_id} not found") | |
| # Extract content from request body | |
| if 'content' not in content_data: | |
| raise HTTPException(status_code=400, detail="Content field is required") | |
| new_content = content_data['content'] | |
| # Update trace content | |
| trace.content = new_content | |
| # Update character count and other metadata | |
| trace.character_count = len(new_content) | |
| # Estimate turn count (approximate) | |
| turn_markers = [ | |
| "user:", "assistant:", "system:", "human:", "ai:", | |
| "User:", "Assistant:", "System:", "Human:", "AI:" | |
| ] | |
| turn_count = 0 | |
| for marker in turn_markers: | |
| turn_count += new_content.count(marker) | |
| trace.turn_count = max(1, turn_count) # At least 1 turn | |
| # Update timestamp | |
| trace.update_timestamp = datetime.utcnow() | |
| # Save to database | |
| db.add(trace) | |
| db.commit() | |
| return { | |
| "status": "success", | |
| "message": "Trace content updated successfully" | |
| } | |
| async def regenerate_trace_metadata( | |
| trace_id: str, | |
| db: Session = Depends(get_db) | |
| ) -> Dict[str, Any]: | |
| """ | |
| Regenerate metadata for a trace using the universal parser. | |
| Args: | |
| trace_id: The ID of the trace to regenerate metadata for | |
| db: Database session | |
| Returns: | |
| Success status and metadata info | |
| """ | |
| try: | |
| # Get the trace | |
| trace = get_trace(db, trace_id) | |
| if not trace: | |
| raise HTTPException(status_code=404, detail=f"Trace {trace_id} not found") | |
| if not trace.content: | |
| raise HTTPException(status_code=400, detail="Trace has no content to analyze") | |
| # Use UniversalParserService to regenerate metadata | |
| from backend.services.universal_parser_service import UniversalParserService | |
| parser_service = UniversalParserService(db) | |
| # This will regenerate and store the schema_analytics metadata | |
| context_docs = parser_service.generate_trace_context_documents(trace_id, trace.content) | |
| # Refresh the trace to get updated metadata | |
| db.refresh(trace) | |
| logger = logging.getLogger("agent_monitoring_server") | |
| logger.info(f"Successfully regenerated metadata for trace {trace_id}") | |
| return { | |
| "status": "success", | |
| "message": "Trace metadata regenerated successfully", | |
| "context_documents_created": len(context_docs), | |
| "metadata_updated": bool(trace.trace_metadata and trace.trace_metadata.get("schema_analytics")) | |
| } | |
| except Exception as e: | |
| logger = logging.getLogger("agent_monitoring_server") | |
| logger.error(f"Error regenerating metadata for trace {trace_id}: {str(e)}") | |
| raise HTTPException(status_code=500, detail="An internal error occurred while regenerating metadata") | |
| async def fix_long_lines( | |
| trace_id: str, | |
| request_data: Dict[str, Any], | |
| db: Session = Depends(get_db) | |
| ) -> Dict[str, Any]: | |
| """ | |
| Apply rule-based line splitting to a trace content. | |
| Args: | |
| trace_id: ID of the trace to process | |
| request_data: Request data containing max_line_length | |
| db: Database session | |
| Returns: | |
| Dictionary with processed content and metadata | |
| """ | |
| try: | |
| # Get the trace | |
| trace = get_trace(db, trace_id) | |
| if not trace: | |
| raise HTTPException(status_code=404, detail=f"Trace {trace_id} not found") | |
| # Get max_line_length from request, default to 800 | |
| max_line_length = request_data.get("max_line_length", 800) | |
| # Apply line splitting using ChunkingService | |
| from agentgraph.input.text_processing import ChunkingService | |
| chunking_service = ChunkingService() | |
| processed_content = chunking_service.fix_long_lines_in_content( | |
| trace.content, max_line_length | |
| ) | |
| # Update the trace content in database | |
| update_trace_content(db, trace.trace_id, processed_content) | |
| # Calculate statistics | |
| original_lines = len(trace.content.split('\n')) | |
| processed_lines = len(processed_content.split('\n')) | |
| return { | |
| "success": True, | |
| "content": processed_content, | |
| "statistics": { | |
| "original_lines": original_lines, | |
| "processed_lines": processed_lines, | |
| "lines_added": processed_lines - original_lines, | |
| "max_line_length": max_line_length | |
| }, | |
| "message": f"Applied line splitting: {original_lines} → {processed_lines} lines" | |
| } | |
| except Exception as e: | |
| logger = logging.getLogger("agent_monitoring_server") | |
| logger.error(f"Error applying line splitting to trace {trace_id}: {str(e)}") | |
| raise HTTPException(status_code=500, detail="An internal error occurred while applying line splitting") | |
| async def get_trace_content_numbered(trace_id: str, db: Session = Depends(get_db)): | |
| """ | |
| Return the trace content with line numbers already added using the same | |
| TraceLineNumberProcessor that the extraction pipeline employs. | |
| This guarantees front-end alignment with reference ranges. | |
| """ | |
| trace = get_trace(db, trace_id) | |
| if not trace: | |
| raise HTTPException(status_code=404, detail=f"Trace with ID {trace_id} not found") | |
| if not hasattr(trace, "content") or not trace.content: | |
| raise HTTPException(status_code=404, detail="Trace has no content") | |
| try: | |
| from agentgraph.input.text_processing.trace_line_processor import TraceLineNumberProcessor | |
| processor = TraceLineNumberProcessor(max_line_length=120) | |
| numbered, _ = processor.add_line_numbers(trace.content) | |
| except Exception as err: | |
| logging.getLogger("agent_monitoring_server").error( | |
| f"Failed to generate numbered trace for {trace_id}: {err}" | |
| ) | |
| raise HTTPException(status_code=500, detail="Failed to generate numbered trace") | |
| return {"status": "success", "content": numbered} | |
| # Context Documents Endpoints | |
| async def create_context_document( | |
| trace_id: str, | |
| request: CreateContextRequest, | |
| db: Session = Depends(get_db) | |
| ) -> ContextDocumentResponse: | |
| """Create a new context document for a trace.""" | |
| try: | |
| context_service = ContextService(db) | |
| document = context_service.create_context_document( | |
| trace_id=trace_id, | |
| title=request.title, | |
| document_type=request.document_type, | |
| content=request.content, | |
| file_name=request.file_name | |
| ) | |
| return ContextDocumentResponse( | |
| success=True, | |
| message="Context document created successfully", | |
| data=document | |
| ) | |
| except ValueError as e: | |
| raise HTTPException(status_code=400, detail=str(e)) | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail="An internal error occurred while creating context document") | |
| async def get_context_documents( | |
| trace_id: str, | |
| db: Session = Depends(get_db) | |
| ) -> List[ContextDocument]: | |
| """Get all context documents for a trace.""" | |
| try: | |
| context_service = ContextService(db) | |
| documents = context_service.get_context_documents(trace_id) | |
| return documents | |
| except ValueError as e: | |
| raise HTTPException(status_code=404, detail=str(e)) | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail="An internal error occurred while retrieving context documents") | |
| async def update_context_document( | |
| trace_id: str, | |
| context_id: str, | |
| request: UpdateContextRequest, | |
| db: Session = Depends(get_db) | |
| ) -> ContextDocumentResponse: | |
| """Update an existing context document.""" | |
| try: | |
| context_service = ContextService(db) | |
| document = context_service.update_context_document( | |
| trace_id=trace_id, | |
| context_id=context_id, | |
| updates=request | |
| ) | |
| return ContextDocumentResponse( | |
| success=True, | |
| message="Context document updated successfully", | |
| data=document | |
| ) | |
| except ValueError as e: | |
| raise HTTPException(status_code=404, detail=str(e)) | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail="An internal error occurred while updating context document") | |
| async def delete_context_document( | |
| trace_id: str, | |
| context_id: str, | |
| db: Session = Depends(get_db) | |
| ) -> ContextDocumentResponse: | |
| """Delete a context document.""" | |
| try: | |
| context_service = ContextService(db) | |
| success = context_service.delete_context_document(trace_id, context_id) | |
| return ContextDocumentResponse( | |
| success=success, | |
| message="Context document deleted successfully" | |
| ) | |
| except ValueError as e: | |
| raise HTTPException(status_code=404, detail=str(e)) | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail="An internal error occurred while deleting context document") | |
| async def upload_context_file( | |
| trace_id: str, | |
| file: UploadFile = File(...), | |
| title: str = Form(...), | |
| document_type: ContextDocumentType = Form(...), | |
| db: Session = Depends(get_db) | |
| ) -> ContextDocumentResponse: | |
| """Upload a file as a context document.""" | |
| try: | |
| # Validate file type | |
| allowed_extensions = ['.txt', '.md', '.json', '.csv'] | |
| if not any(file.filename.lower().endswith(ext) for ext in allowed_extensions): | |
| raise HTTPException( | |
| status_code=400, | |
| detail=f"File type not allowed. Supported types: {', '.join(allowed_extensions)}" | |
| ) | |
| # Validate file size (1MB limit) | |
| if file.size > 1024 * 1024: | |
| raise HTTPException(status_code=400, detail="File size exceeds 1MB limit") | |
| # Read file content | |
| content = await file.read() | |
| file_content = content.decode('utf-8') | |
| context_service = ContextService(db) | |
| document = context_service.process_uploaded_file( | |
| file_content=file_content, | |
| trace_id=trace_id, | |
| title=title, | |
| document_type=document_type, | |
| file_name=file.filename | |
| ) | |
| return ContextDocumentResponse( | |
| success=True, | |
| message="Context file uploaded successfully", | |
| data=document | |
| ) | |
| except UnicodeDecodeError: | |
| raise HTTPException(status_code=400, detail="File must be UTF-8 encoded text") | |
| except ValueError as e: | |
| raise HTTPException(status_code=400, detail=str(e)) | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail="An internal error occurred while uploading context file") | |
| async def auto_generate_context_documents_endpoint( | |
| trace_id: str, | |
| force: bool = False, | |
| db: Session = Depends(get_db) | |
| ) -> Dict[str, Any]: | |
| """ | |
| Auto-generate context documents for a trace using the universal parser. | |
| Args: | |
| trace_id: ID of the trace | |
| force: Whether to remove existing auto-generated context documents first | |
| db: Database session | |
| Returns: | |
| Status and information about generated context documents | |
| """ | |
| try: | |
| # Get the trace | |
| trace = get_trace(db, trace_id) | |
| if not trace: | |
| raise HTTPException(status_code=404, detail=f"Trace {trace_id} not found") | |
| if not trace.content: | |
| raise HTTPException(status_code=400, detail=f"Trace {trace_id} has no content") | |
| # Generate or regenerate context documents | |
| from backend.services.universal_parser_service import regenerate_context_documents | |
| created_docs = regenerate_context_documents(trace_id, trace.content, db, force=force) | |
| return { | |
| "status": "success", | |
| "message": f"{'Regenerated' if force else 'Generated'} {len(created_docs)} context documents", | |
| "trace_id": trace_id, | |
| "context_documents_generated": len(created_docs), | |
| "force_regenerate": force, | |
| "documents": [{"title": doc.get("title"), "type": doc.get("document_type")} for doc in created_docs] | |
| } | |
| except HTTPException: | |
| raise | |
| except Exception as e: | |
| logger = logging.getLogger("agent_monitoring_server") | |
| logger.error(f"Error auto-generating context documents for trace {trace_id}: {str(e)}") | |
| raise HTTPException(status_code=500, detail="An internal error occurred while auto-generating context documents") | |
| async def get_enhanced_trace_statistics(trace_id: str, db: Session = Depends(get_db)): | |
| """ | |
| Get enhanced trace statistics including cost information and detailed token analytics. | |
| Args: | |
| trace_id: The ID of the trace | |
| db: Database session | |
| Returns: | |
| Enhanced statistics with cost calculations | |
| """ | |
| trace = get_trace(db, trace_id) | |
| if not trace: | |
| raise HTTPException(status_code=404, detail=f"Trace with ID {trace_id} not found") | |
| try: | |
| # Get basic trace information | |
| basic_stats = { | |
| "trace_id": trace.trace_id, | |
| "character_count": trace.character_count or 0, | |
| "turn_count": trace.turn_count or 0, | |
| "upload_timestamp": trace.upload_timestamp.isoformat() if trace.upload_timestamp else None, | |
| "trace_type": trace.trace_type, | |
| } | |
| # Get schema analytics if available | |
| schema_analytics = None | |
| if trace.trace_metadata and trace.trace_metadata.get("schema_analytics"): | |
| schema_analytics = trace.trace_metadata["schema_analytics"] | |
| enhanced_stats = {"basic": basic_stats} | |
| if schema_analytics: | |
| # Get token analytics | |
| token_analytics = schema_analytics.get("numerical_overview", {}).get("token_analytics", {}) | |
| prompt_analytics = schema_analytics.get("prompt_analytics", {}) | |
| # Calculate cost information | |
| cost_info = cost_service.calculate_trace_costs(schema_analytics) | |
| # Enhanced token statistics | |
| enhanced_stats["tokens"] = { | |
| "total_tokens": token_analytics.get("total_tokens", 0), | |
| "total_prompt_tokens": token_analytics.get("total_prompt_tokens", 0), | |
| "total_completion_tokens": token_analytics.get("total_completion_tokens", 0), | |
| "avg_tokens_per_component": token_analytics.get("avg_tokens_per_component", 0), | |
| "prompt_to_completion_ratio": token_analytics.get("prompt_to_completion_ratio", 0), | |
| } | |
| # Enhanced prompt call statistics | |
| enhanced_stats["prompt_calls"] = { | |
| "total_calls": prompt_analytics.get("prompt_calls_detected", 0), | |
| "successful_calls": prompt_analytics.get("successful_calls", 0), | |
| "failed_calls": prompt_analytics.get("failed_calls", 0), | |
| "avg_prompt_tokens_per_call": cost_info.get("avg_prompt_tokens_per_call", 0), | |
| "avg_completion_tokens_per_call": cost_info.get("avg_completion_tokens_per_call", 0), | |
| } | |
| # Cost information | |
| enhanced_stats["cost"] = { | |
| "total_cost_usd": cost_info.get("total_cost_usd", 0.0), | |
| "input_cost_usd": cost_info.get("input_cost_usd", 0.0), | |
| "output_cost_usd": cost_info.get("output_cost_usd", 0.0), | |
| "avg_cost_per_call_usd": cost_info.get("avg_cost_per_call_usd", 0.0), | |
| "model_used": cost_info.get("model_used", "gpt-5-mini"), | |
| "pricing_source": cost_info.get("pricing_source", "fallback"), | |
| "cost_efficiency_tokens_per_dollar": cost_info.get("cost_efficiency_tokens_per_dollar", 0), | |
| "model_metadata": cost_info.get("model_metadata"), | |
| } | |
| # Performance analytics | |
| timing_analytics = schema_analytics.get("numerical_overview", {}).get("timing_analytics", {}) | |
| enhanced_stats["performance"] = { | |
| "total_execution_time_ms": timing_analytics.get("total_execution_time_ms", 0), | |
| "total_execution_time_seconds": timing_analytics.get("total_execution_time_seconds", 0), | |
| "avg_execution_time_ms": timing_analytics.get("avg_execution_time_ms", 0), | |
| "max_execution_time_ms": timing_analytics.get("max_execution_time_ms", 0), | |
| "min_execution_time_ms": timing_analytics.get("min_execution_time_ms", 0), | |
| } | |
| # Component statistics | |
| component_stats = schema_analytics.get("numerical_overview", {}).get("component_stats", {}) | |
| enhanced_stats["components"] = { | |
| "total_components": component_stats.get("total_components", 0), | |
| "unique_component_types": component_stats.get("unique_component_types", 0), | |
| "max_depth": component_stats.get("max_depth", 0), | |
| "success_rate": component_stats.get("success_rate", 0), | |
| "error_components": component_stats.get("error_components", 0), | |
| } | |
| else: | |
| # Provide basic fallback statistics when schema analytics is not available | |
| enhanced_stats.update({ | |
| "tokens": { | |
| "total_tokens": 0, | |
| "total_prompt_tokens": 0, | |
| "total_completion_tokens": 0, | |
| "avg_tokens_per_component": 0, | |
| "prompt_to_completion_ratio": 0, | |
| }, | |
| "prompt_calls": { | |
| "total_calls": 0, | |
| "successful_calls": 0, | |
| "failed_calls": 0, | |
| "avg_prompt_tokens_per_call": 0, | |
| "avg_completion_tokens_per_call": 0, | |
| }, | |
| "cost": { | |
| "total_cost_usd": 0.0, | |
| "input_cost_usd": 0.0, | |
| "output_cost_usd": 0.0, | |
| "avg_cost_per_call_usd": 0.0, | |
| "model_used": "unknown", | |
| "pricing_source": "unavailable", | |
| "cost_efficiency_tokens_per_dollar": 0, | |
| }, | |
| "performance": { | |
| "total_execution_time_ms": 0, | |
| "total_execution_time_seconds": 0, | |
| "avg_execution_time_ms": 0, | |
| "max_execution_time_ms": 0, | |
| "min_execution_time_ms": 0, | |
| }, | |
| "components": { | |
| "total_components": 0, | |
| "unique_component_types": 0, | |
| "max_depth": 0, | |
| "success_rate": 0, | |
| "error_components": 0, | |
| } | |
| }) | |
| return { | |
| "status": "success", | |
| "enhanced_statistics": enhanced_stats, | |
| "has_schema_analytics": schema_analytics is not None | |
| } | |
| except Exception as e: | |
| logger = logging.getLogger("agent_monitoring_server") | |
| logger.error(f"Error generating enhanced statistics for trace {trace_id}: {str(e)}") | |
| raise HTTPException(status_code=500, detail="An internal error occurred while generating enhanced statistics") | |
| class ChunkingConfig(BaseModel): | |
| min_chunk_size: Optional[int] = None | |
| max_chunk_size: Optional[int] = None | |
| class ProcessTraceRequest(BaseModel): | |
| splitter_type: str = "agent_semantic" | |
| force_regenerate: bool = True | |
| method_name: str = "production" | |
| model: str = "gpt-5-mini" | |
| chunking_config: Optional[ChunkingConfig] = None | |
| async def process_trace( | |
| trace_id: str, | |
| background_tasks: BackgroundTasks, | |
| request: ProcessTraceRequest, | |
| session: Session = Depends(get_db) | |
| ): | |
| """ | |
| Process a trace to create a knowledge graph using sliding window analysis. | |
| """ | |
| splitter_type = request.splitter_type | |
| force_regenerate = request.force_regenerate | |
| method_name = request.method_name | |
| model = request.model | |
| chunking_config = request.chunking_config | |
| logger.info(f"Processing trace {trace_id} with splitter_type={splitter_type}, force_regenerate={force_regenerate}, method_name={method_name}, model={model}, chunking_config={chunking_config}") | |
| valid_splitters = ["agent_semantic", "json", "prompt_interaction"] | |
| if splitter_type not in valid_splitters: | |
| raise HTTPException( | |
| status_code=400, | |
| detail=f"Invalid splitter_type '{splitter_type}'. Must be one of: {', '.join(valid_splitters)}" | |
| ) | |
| from agentgraph.shared.method_registry import is_valid_method, get_method_names | |
| if not is_valid_method(method_name): | |
| available_methods = get_method_names() | |
| raise HTTPException( | |
| status_code=400, | |
| detail=f"Invalid method_name '{method_name}'. Must be one of: {', '.join(available_methods)}" | |
| ) | |
| task_id = f"process_trace_{trace_id}_{int(time.time())}" | |
| try: | |
| task_message = f"Processing trace {trace_id} with {splitter_type} splitter, {method_name} method, and {model} model" | |
| if force_regenerate: | |
| task_message += " (force regenerate enabled)" | |
| create_task(task_id, "process_trace", task_message) | |
| background_tasks.add_task(process_trace_task, trace_id, session, task_id, splitter_type, force_regenerate, method_name, model, chunking_config) | |
| return { | |
| "status": "success", | |
| "task_id": task_id, | |
| "splitter_type": splitter_type, | |
| "force_regenerate": force_regenerate, | |
| "method_name": method_name, | |
| "model": model, | |
| "message": f"Started processing trace {trace_id} with {splitter_type} splitter, {method_name} method, and {model} model" + | |
| (" (force regenerate enabled)" if force_regenerate else "") | |
| } | |
| except Exception as e: | |
| logger.error(f"Error starting trace processing task: {e}") | |
| raise HTTPException( | |
| status_code=500, | |
| detail="An internal error occurred while starting trace processing" | |
| ) | |