""" Trace Management Service This service handles all database operations for trace management, providing a clean interface between the database layer and the pure analysis functions in agentgraph.input. """ import os import logging from typing import Dict, List, Any, Optional from sqlalchemy.orm import Session from datetime import datetime from backend.database.models import Trace, KnowledgeGraph from backend.database.utils import ( save_trace, get_trace, get_all_traces, update_trace_status, link_knowledge_graph_to_trace, get_knowledge_graphs_for_trace ) # Import pure analysis functions from agentgraph.input.trace_management import ( analyze_trace_characteristics, display_trace_summary, preprocess_content_for_cost_optimization ) logger = logging.getLogger(__name__) class TraceManagementService: """ Service for orchestrating trace management with database operations. This service fetches data from the database, calls pure analysis functions from agentgraph.input, and saves the results back to the database. """ def __init__(self, session: Session): self.session = session def upload_trace( self, file_path: str, title: str = None, description: str = None, trace_type: str = None, uploader: str = None, tags: List[str] = None, analyze: bool = True ) -> Trace: """ Upload a trace from a file to the database. Args: file_path: Path to the trace file title: Optional title for the trace description: Optional description trace_type: Optional type of trace uploader: Optional name of uploader tags: Optional list of tags analyze: Whether to analyze and display trace characteristics Returns: The created Trace object """ # Read the trace file with open(file_path, 'r', encoding='utf-8') as f: original_content = f.read() logger.info(f"Uploading trace: {file_path}") logger.info(f"Original length of trace: {len(original_content)}") # Use original content without preprocessing - line splitting will be handled during chunking content = original_content logger.info(f"Content length: {len(content)} characters") # Generate a filename if not provided filename = os.path.basename(file_path) # Analyze trace characteristics if requested (use processed content) trace_analysis = None if analyze: trace_analysis = analyze_trace_characteristics(content) display_trace_summary(trace_analysis) # If trace_type wasn't provided, use the one from analysis if not trace_type and 'trace_type' in trace_analysis: trace_type = trace_analysis['trace_type'] try: # Save the trace to the database trace = save_trace( session=self.session, content=content, filename=filename, title=title or f"Trace from {filename}", description=description, trace_type=trace_type, uploader=uploader, tags=tags or [] ) # Add analysis data if available if trace_analysis: # Update trace with analysis info if needed # This would require adding additional fields to the Trace model pass logger.info(f"Trace uploaded successfully (ID: {trace.id}, trace_id: {trace.trace_id})") logger.info(f" Title: {trace.title}") logger.info(f" Characters: {trace.character_count}, Turns: {trace.turn_count}") return trace except Exception as e: logger.error(f"Error uploading trace: {str(e)}") self.session.rollback() raise def list_traces(self) -> List[Trace]: """ List all traces in the database. Returns: List of all Trace objects """ try: traces = get_all_traces(self.session) if not traces: logger.info("No traces found in the database") return [] logger.info(f"Found {len(traces)} traces in the database:") for i, trace in enumerate(traces, 1): print(f"\n{i}. {trace.title} (ID: {trace.id}, trace_id: {trace.trace_id})") print(f" Uploaded: {trace.upload_timestamp.strftime('%Y-%m-%d %H:%M:%S')}") print(f" Type: {trace.trace_type or 'Unknown'}, Status: {trace.status}") print(f" Size: {trace.character_count} chars, {trace.turn_count} turns") # Get linked knowledge graphs kgs = get_knowledge_graphs_for_trace(self.session, trace.trace_id) if kgs: print(f" Knowledge Graphs: {len(kgs)}") for kg in kgs[:3]: # Show just the first three print(f" - {kg.filename} (ID: {kg.id})") if len(kgs) > 3: print(f" - ...and {len(kgs) - 3} more") else: print(" No linked knowledge graphs") return traces except Exception as e: logger.error(f"Error listing traces: {str(e)}") return [] async def process_trace( self, trace_id: str, window_size: int = None, overlap_size: int = None, batch_size: int = 5, parallel: bool = True ) -> Optional[KnowledgeGraph]: """ Process a trace using the sliding window monitor. Args: trace_id: ID of the trace to process window_size: Optional window size in characters overlap_size: Optional overlap size in characters batch_size: Number of windows to process in parallel parallel: Whether to process windows in parallel Returns: Created KnowledgeGraph object or None if failed """ try: # Get the trace trace = get_trace(self.session, trace_id) if not trace: logger.error(f"Trace with ID {trace_id} not found") return None # Update status to processing update_trace_status(self.session, trace.trace_id, "processing") logger.info(f"Processing trace: {trace.title} (ID: {trace.id}, trace_id: {trace.trace_id})") logger.info(f" Size: {trace.character_count} chars, {trace.turn_count} turns") # Import here to avoid circular imports from agentgraph.extraction.graph_processing import SlidingWindowMonitor # Initialize the sliding window monitor monitor = SlidingWindowMonitor( window_size=window_size, overlap_size=overlap_size, batch_size=batch_size, parallel_processing=parallel, model="gpt-4.1", auto_determine_params=(window_size is None or overlap_size is None) ) # Generate an output identifier timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') output_identifier = f"kg_trace_{trace.id}_{timestamp}" # Process the trace logger.info("Starting sliding window analysis...") result = await monitor.process_trace(trace.content, output_identifier) # Update trace status update_trace_status(self.session, trace.trace_id, "completed") # Get the created knowledge graph kg = result.get('knowledge_graph') if kg: logger.info(f"Processing complete. Knowledge graph created: {kg.filename} (ID: {kg.id})") logger.info(f" Entities: {kg.entity_count}, Relations: {kg.relation_count}") return kg except Exception as e: logger.error(f"Error processing trace: {str(e)}") update_trace_status(self.session, trace.trace_id, "error") return None def analyze_trace(self, trace_id: str) -> Optional[Dict[str, Any]]: """ Analyze a trace without processing it. Args: trace_id: ID of the trace to analyze Returns: Trace analysis dictionary or None if failed """ try: # Get the trace trace = get_trace(self.session, trace_id) if not trace: logger.error(f"Trace with ID {trace_id} not found") return None logger.info(f"Analyzing trace: {trace.title} (ID: {trace.id}, trace_id: {trace.trace_id})") # Analyze trace using pure function trace_analysis = analyze_trace_characteristics(trace.content) display_trace_summary(trace_analysis) return trace_analysis except Exception as e: logger.error(f"Error analyzing trace: {str(e)}") return None def get_trace_content(self, trace_id: str) -> Optional[str]: """ Get the content of a trace by ID. Args: trace_id: ID of the trace to get content for Returns: Trace content string or None if not found """ try: trace = get_trace(self.session, trace_id) if not trace: logger.error(f"Trace with ID {trace_id} not found") return None return trace.content except Exception as e: logger.error(f"Error getting trace content: {str(e)}") return None def get_trace_by_id(self, trace_id: str) -> Optional[Trace]: """ Get a trace by ID. Args: trace_id: ID of the trace to get Returns: Trace object or None if not found """ try: return get_trace(self.session, trace_id) except Exception as e: logger.error(f"Error getting trace by ID {trace_id}: {str(e)}") return None def update_trace_status(self, trace_id: str, status: str) -> bool: """ Update the status of a trace. Args: trace_id: ID of the trace to update status: New status value Returns: True if successful, False otherwise """ try: update_trace_status(self.session, trace_id, status) return True except Exception as e: logger.error(f"Error updating trace status: {str(e)}") return False