AgentGraph / backend /services /trace_management_service.py
wu981526092's picture
🚀 Deploy AgentGraph: Complete agent monitoring and knowledge graph system
c2ea5ed
"""
Trace Management Service
This service handles all database operations for trace management,
providing a clean interface between the database layer and the pure
analysis functions in agentgraph.input.
"""
import os
import logging
from typing import Dict, List, Any, Optional
from sqlalchemy.orm import Session
from datetime import datetime
from backend.database.models import Trace, KnowledgeGraph
from backend.database.utils import (
save_trace, get_trace, get_all_traces, update_trace_status,
link_knowledge_graph_to_trace, get_knowledge_graphs_for_trace
)
# Import pure analysis functions
from agentgraph.input.trace_management import (
analyze_trace_characteristics, display_trace_summary, preprocess_content_for_cost_optimization
)
logger = logging.getLogger(__name__)
class TraceManagementService:
"""
Service for orchestrating trace management with database operations.
This service fetches data from the database, calls pure analysis functions
from agentgraph.input, and saves the results back to the database.
"""
def __init__(self, session: Session):
self.session = session
def upload_trace(
self,
file_path: str,
title: str = None,
description: str = None,
trace_type: str = None,
uploader: str = None,
tags: List[str] = None,
analyze: bool = True
) -> Trace:
"""
Upload a trace from a file to the database.
Args:
file_path: Path to the trace file
title: Optional title for the trace
description: Optional description
trace_type: Optional type of trace
uploader: Optional name of uploader
tags: Optional list of tags
analyze: Whether to analyze and display trace characteristics
Returns:
The created Trace object
"""
# Read the trace file
with open(file_path, 'r', encoding='utf-8') as f:
original_content = f.read()
logger.info(f"Uploading trace: {file_path}")
logger.info(f"Original length of trace: {len(original_content)}")
# Use original content without preprocessing - line splitting will be handled during chunking
content = original_content
logger.info(f"Content length: {len(content)} characters")
# Generate a filename if not provided
filename = os.path.basename(file_path)
# Analyze trace characteristics if requested (use processed content)
trace_analysis = None
if analyze:
trace_analysis = analyze_trace_characteristics(content)
display_trace_summary(trace_analysis)
# If trace_type wasn't provided, use the one from analysis
if not trace_type and 'trace_type' in trace_analysis:
trace_type = trace_analysis['trace_type']
try:
# Save the trace to the database
trace = save_trace(
session=self.session,
content=content,
filename=filename,
title=title or f"Trace from {filename}",
description=description,
trace_type=trace_type,
uploader=uploader,
tags=tags or []
)
# Add analysis data if available
if trace_analysis:
# Update trace with analysis info if needed
# This would require adding additional fields to the Trace model
pass
logger.info(f"Trace uploaded successfully (ID: {trace.id}, trace_id: {trace.trace_id})")
logger.info(f" Title: {trace.title}")
logger.info(f" Characters: {trace.character_count}, Turns: {trace.turn_count}")
return trace
except Exception as e:
logger.error(f"Error uploading trace: {str(e)}")
self.session.rollback()
raise
def list_traces(self) -> List[Trace]:
"""
List all traces in the database.
Returns:
List of all Trace objects
"""
try:
traces = get_all_traces(self.session)
if not traces:
logger.info("No traces found in the database")
return []
logger.info(f"Found {len(traces)} traces in the database:")
for i, trace in enumerate(traces, 1):
print(f"\n{i}. {trace.title} (ID: {trace.id}, trace_id: {trace.trace_id})")
print(f" Uploaded: {trace.upload_timestamp.strftime('%Y-%m-%d %H:%M:%S')}")
print(f" Type: {trace.trace_type or 'Unknown'}, Status: {trace.status}")
print(f" Size: {trace.character_count} chars, {trace.turn_count} turns")
# Get linked knowledge graphs
kgs = get_knowledge_graphs_for_trace(self.session, trace.trace_id)
if kgs:
print(f" Knowledge Graphs: {len(kgs)}")
for kg in kgs[:3]: # Show just the first three
print(f" - {kg.filename} (ID: {kg.id})")
if len(kgs) > 3:
print(f" - ...and {len(kgs) - 3} more")
else:
print(" No linked knowledge graphs")
return traces
except Exception as e:
logger.error(f"Error listing traces: {str(e)}")
return []
async def process_trace(
self,
trace_id: str,
window_size: int = None,
overlap_size: int = None,
batch_size: int = 5,
parallel: bool = True
) -> Optional[KnowledgeGraph]:
"""
Process a trace using the sliding window monitor.
Args:
trace_id: ID of the trace to process
window_size: Optional window size in characters
overlap_size: Optional overlap size in characters
batch_size: Number of windows to process in parallel
parallel: Whether to process windows in parallel
Returns:
Created KnowledgeGraph object or None if failed
"""
try:
# Get the trace
trace = get_trace(self.session, trace_id)
if not trace:
logger.error(f"Trace with ID {trace_id} not found")
return None
# Update status to processing
update_trace_status(self.session, trace.trace_id, "processing")
logger.info(f"Processing trace: {trace.title} (ID: {trace.id}, trace_id: {trace.trace_id})")
logger.info(f" Size: {trace.character_count} chars, {trace.turn_count} turns")
# Import here to avoid circular imports
from agentgraph.extraction.graph_processing import SlidingWindowMonitor
# Initialize the sliding window monitor
monitor = SlidingWindowMonitor(
window_size=window_size,
overlap_size=overlap_size,
batch_size=batch_size,
parallel_processing=parallel,
model="gpt-4.1",
auto_determine_params=(window_size is None or overlap_size is None)
)
# Generate an output identifier
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
output_identifier = f"kg_trace_{trace.id}_{timestamp}"
# Process the trace
logger.info("Starting sliding window analysis...")
result = await monitor.process_trace(trace.content, output_identifier)
# Update trace status
update_trace_status(self.session, trace.trace_id, "completed")
# Get the created knowledge graph
kg = result.get('knowledge_graph')
if kg:
logger.info(f"Processing complete. Knowledge graph created: {kg.filename} (ID: {kg.id})")
logger.info(f" Entities: {kg.entity_count}, Relations: {kg.relation_count}")
return kg
except Exception as e:
logger.error(f"Error processing trace: {str(e)}")
update_trace_status(self.session, trace.trace_id, "error")
return None
def analyze_trace(self, trace_id: str) -> Optional[Dict[str, Any]]:
"""
Analyze a trace without processing it.
Args:
trace_id: ID of the trace to analyze
Returns:
Trace analysis dictionary or None if failed
"""
try:
# Get the trace
trace = get_trace(self.session, trace_id)
if not trace:
logger.error(f"Trace with ID {trace_id} not found")
return None
logger.info(f"Analyzing trace: {trace.title} (ID: {trace.id}, trace_id: {trace.trace_id})")
# Analyze trace using pure function
trace_analysis = analyze_trace_characteristics(trace.content)
display_trace_summary(trace_analysis)
return trace_analysis
except Exception as e:
logger.error(f"Error analyzing trace: {str(e)}")
return None
def get_trace_content(self, trace_id: str) -> Optional[str]:
"""
Get the content of a trace by ID.
Args:
trace_id: ID of the trace to get content for
Returns:
Trace content string or None if not found
"""
try:
trace = get_trace(self.session, trace_id)
if not trace:
logger.error(f"Trace with ID {trace_id} not found")
return None
return trace.content
except Exception as e:
logger.error(f"Error getting trace content: {str(e)}")
return None
def get_trace_by_id(self, trace_id: str) -> Optional[Trace]:
"""
Get a trace by ID.
Args:
trace_id: ID of the trace to get
Returns:
Trace object or None if not found
"""
try:
return get_trace(self.session, trace_id)
except Exception as e:
logger.error(f"Error getting trace by ID {trace_id}: {str(e)}")
return None
def update_trace_status(self, trace_id: str, status: str) -> bool:
"""
Update the status of a trace.
Args:
trace_id: ID of the trace to update
status: New status value
Returns:
True if successful, False otherwise
"""
try:
update_trace_status(self.session, trace_id, status)
return True
except Exception as e:
logger.error(f"Error updating trace status: {str(e)}")
return False