Spaces:
Sleeping
Sleeping
| """ | |
| Trace Management Service | |
| This service handles all database operations for trace management, | |
| providing a clean interface between the database layer and the pure | |
| analysis functions in agentgraph.input. | |
| """ | |
| import os | |
| import logging | |
| from typing import Dict, List, Any, Optional | |
| from sqlalchemy.orm import Session | |
| from datetime import datetime | |
| from backend.database.models import Trace, KnowledgeGraph | |
| from backend.database.utils import ( | |
| save_trace, get_trace, get_all_traces, update_trace_status, | |
| link_knowledge_graph_to_trace, get_knowledge_graphs_for_trace | |
| ) | |
| # Import pure analysis functions | |
| from agentgraph.input.trace_management import ( | |
| analyze_trace_characteristics, display_trace_summary, preprocess_content_for_cost_optimization | |
| ) | |
| logger = logging.getLogger(__name__) | |
| class TraceManagementService: | |
| """ | |
| Service for orchestrating trace management with database operations. | |
| This service fetches data from the database, calls pure analysis functions | |
| from agentgraph.input, and saves the results back to the database. | |
| """ | |
| def __init__(self, session: Session): | |
| self.session = session | |
| def upload_trace( | |
| self, | |
| file_path: str, | |
| title: str = None, | |
| description: str = None, | |
| trace_type: str = None, | |
| uploader: str = None, | |
| tags: List[str] = None, | |
| analyze: bool = True | |
| ) -> Trace: | |
| """ | |
| Upload a trace from a file to the database. | |
| Args: | |
| file_path: Path to the trace file | |
| title: Optional title for the trace | |
| description: Optional description | |
| trace_type: Optional type of trace | |
| uploader: Optional name of uploader | |
| tags: Optional list of tags | |
| analyze: Whether to analyze and display trace characteristics | |
| Returns: | |
| The created Trace object | |
| """ | |
| # Read the trace file | |
| with open(file_path, 'r', encoding='utf-8') as f: | |
| original_content = f.read() | |
| logger.info(f"Uploading trace: {file_path}") | |
| logger.info(f"Original length of trace: {len(original_content)}") | |
| # Use original content without preprocessing - line splitting will be handled during chunking | |
| content = original_content | |
| logger.info(f"Content length: {len(content)} characters") | |
| # Generate a filename if not provided | |
| filename = os.path.basename(file_path) | |
| # Analyze trace characteristics if requested (use processed content) | |
| trace_analysis = None | |
| if analyze: | |
| trace_analysis = analyze_trace_characteristics(content) | |
| display_trace_summary(trace_analysis) | |
| # If trace_type wasn't provided, use the one from analysis | |
| if not trace_type and 'trace_type' in trace_analysis: | |
| trace_type = trace_analysis['trace_type'] | |
| try: | |
| # Save the trace to the database | |
| trace = save_trace( | |
| session=self.session, | |
| content=content, | |
| filename=filename, | |
| title=title or f"Trace from {filename}", | |
| description=description, | |
| trace_type=trace_type, | |
| uploader=uploader, | |
| tags=tags or [] | |
| ) | |
| # Add analysis data if available | |
| if trace_analysis: | |
| # Update trace with analysis info if needed | |
| # This would require adding additional fields to the Trace model | |
| pass | |
| logger.info(f"Trace uploaded successfully (ID: {trace.id}, trace_id: {trace.trace_id})") | |
| logger.info(f" Title: {trace.title}") | |
| logger.info(f" Characters: {trace.character_count}, Turns: {trace.turn_count}") | |
| return trace | |
| except Exception as e: | |
| logger.error(f"Error uploading trace: {str(e)}") | |
| self.session.rollback() | |
| raise | |
| def list_traces(self) -> List[Trace]: | |
| """ | |
| List all traces in the database. | |
| Returns: | |
| List of all Trace objects | |
| """ | |
| try: | |
| traces = get_all_traces(self.session) | |
| if not traces: | |
| logger.info("No traces found in the database") | |
| return [] | |
| logger.info(f"Found {len(traces)} traces in the database:") | |
| for i, trace in enumerate(traces, 1): | |
| print(f"\n{i}. {trace.title} (ID: {trace.id}, trace_id: {trace.trace_id})") | |
| print(f" Uploaded: {trace.upload_timestamp.strftime('%Y-%m-%d %H:%M:%S')}") | |
| print(f" Type: {trace.trace_type or 'Unknown'}, Status: {trace.status}") | |
| print(f" Size: {trace.character_count} chars, {trace.turn_count} turns") | |
| # Get linked knowledge graphs | |
| kgs = get_knowledge_graphs_for_trace(self.session, trace.trace_id) | |
| if kgs: | |
| print(f" Knowledge Graphs: {len(kgs)}") | |
| for kg in kgs[:3]: # Show just the first three | |
| print(f" - {kg.filename} (ID: {kg.id})") | |
| if len(kgs) > 3: | |
| print(f" - ...and {len(kgs) - 3} more") | |
| else: | |
| print(" No linked knowledge graphs") | |
| return traces | |
| except Exception as e: | |
| logger.error(f"Error listing traces: {str(e)}") | |
| return [] | |
| async def process_trace( | |
| self, | |
| trace_id: str, | |
| window_size: int = None, | |
| overlap_size: int = None, | |
| batch_size: int = 5, | |
| parallel: bool = True | |
| ) -> Optional[KnowledgeGraph]: | |
| """ | |
| Process a trace using the sliding window monitor. | |
| Args: | |
| trace_id: ID of the trace to process | |
| window_size: Optional window size in characters | |
| overlap_size: Optional overlap size in characters | |
| batch_size: Number of windows to process in parallel | |
| parallel: Whether to process windows in parallel | |
| Returns: | |
| Created KnowledgeGraph object or None if failed | |
| """ | |
| try: | |
| # Get the trace | |
| trace = get_trace(self.session, trace_id) | |
| if not trace: | |
| logger.error(f"Trace with ID {trace_id} not found") | |
| return None | |
| # Update status to processing | |
| update_trace_status(self.session, trace.trace_id, "processing") | |
| logger.info(f"Processing trace: {trace.title} (ID: {trace.id}, trace_id: {trace.trace_id})") | |
| logger.info(f" Size: {trace.character_count} chars, {trace.turn_count} turns") | |
| # Import here to avoid circular imports | |
| from agentgraph.extraction.graph_processing import SlidingWindowMonitor | |
| # Initialize the sliding window monitor | |
| monitor = SlidingWindowMonitor( | |
| window_size=window_size, | |
| overlap_size=overlap_size, | |
| batch_size=batch_size, | |
| parallel_processing=parallel, | |
| model="gpt-4.1", | |
| auto_determine_params=(window_size is None or overlap_size is None) | |
| ) | |
| # Generate an output identifier | |
| timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') | |
| output_identifier = f"kg_trace_{trace.id}_{timestamp}" | |
| # Process the trace | |
| logger.info("Starting sliding window analysis...") | |
| result = await monitor.process_trace(trace.content, output_identifier) | |
| # Update trace status | |
| update_trace_status(self.session, trace.trace_id, "completed") | |
| # Get the created knowledge graph | |
| kg = result.get('knowledge_graph') | |
| if kg: | |
| logger.info(f"Processing complete. Knowledge graph created: {kg.filename} (ID: {kg.id})") | |
| logger.info(f" Entities: {kg.entity_count}, Relations: {kg.relation_count}") | |
| return kg | |
| except Exception as e: | |
| logger.error(f"Error processing trace: {str(e)}") | |
| update_trace_status(self.session, trace.trace_id, "error") | |
| return None | |
| def analyze_trace(self, trace_id: str) -> Optional[Dict[str, Any]]: | |
| """ | |
| Analyze a trace without processing it. | |
| Args: | |
| trace_id: ID of the trace to analyze | |
| Returns: | |
| Trace analysis dictionary or None if failed | |
| """ | |
| try: | |
| # Get the trace | |
| trace = get_trace(self.session, trace_id) | |
| if not trace: | |
| logger.error(f"Trace with ID {trace_id} not found") | |
| return None | |
| logger.info(f"Analyzing trace: {trace.title} (ID: {trace.id}, trace_id: {trace.trace_id})") | |
| # Analyze trace using pure function | |
| trace_analysis = analyze_trace_characteristics(trace.content) | |
| display_trace_summary(trace_analysis) | |
| return trace_analysis | |
| except Exception as e: | |
| logger.error(f"Error analyzing trace: {str(e)}") | |
| return None | |
| def get_trace_content(self, trace_id: str) -> Optional[str]: | |
| """ | |
| Get the content of a trace by ID. | |
| Args: | |
| trace_id: ID of the trace to get content for | |
| Returns: | |
| Trace content string or None if not found | |
| """ | |
| try: | |
| trace = get_trace(self.session, trace_id) | |
| if not trace: | |
| logger.error(f"Trace with ID {trace_id} not found") | |
| return None | |
| return trace.content | |
| except Exception as e: | |
| logger.error(f"Error getting trace content: {str(e)}") | |
| return None | |
| def get_trace_by_id(self, trace_id: str) -> Optional[Trace]: | |
| """ | |
| Get a trace by ID. | |
| Args: | |
| trace_id: ID of the trace to get | |
| Returns: | |
| Trace object or None if not found | |
| """ | |
| try: | |
| return get_trace(self.session, trace_id) | |
| except Exception as e: | |
| logger.error(f"Error getting trace by ID {trace_id}: {str(e)}") | |
| return None | |
| def update_trace_status(self, trace_id: str, status: str) -> bool: | |
| """ | |
| Update the status of a trace. | |
| Args: | |
| trace_id: ID of the trace to update | |
| status: New status value | |
| Returns: | |
| True if successful, False otherwise | |
| """ | |
| try: | |
| update_trace_status(self.session, trace_id, status) | |
| return True | |
| except Exception as e: | |
| logger.error(f"Error updating trace status: {str(e)}") | |
| return False |