AgentGraph / backend /services /context_service.py
wu981526092's picture
🚀 Deploy AgentGraph: Complete agent monitoring and knowledge graph system
c2ea5ed
"""
Context Service for managing context documents in trace metadata.
"""
import json
import uuid
from datetime import datetime
from typing import List, Dict, Any, Optional
from sqlalchemy.orm import Session
from sqlalchemy.orm.attributes import flag_modified
from backend.database.models import Trace
from backend.models import (
ContextDocument,
CreateContextRequest,
UpdateContextRequest,
ContextDocumentType
)
class ContextService:
"""Service for managing context documents stored in trace metadata."""
def __init__(self, db: Session):
self.db = db
def create_context_document(
self,
trace_id: str,
title: str,
document_type: ContextDocumentType,
content: str,
file_name: Optional[str] = None
) -> ContextDocument:
"""Create a new context document for a trace."""
# Validate document type
self.validate_document_type(document_type)
# Get current context documents
current_docs = self.get_context_documents(trace_id)
# Check limits
if len(current_docs) >= 20:
raise ValueError("Maximum of 20 context documents per trace allowed")
# Check for duplicate titles
if any(doc.title == title for doc in current_docs):
raise ValueError(f"Context document with title '{title}' already exists")
# Create new document
new_doc = ContextDocument(
id=self._generate_context_id(),
title=title,
document_type=document_type,
content=content,
file_name=file_name,
created_at=datetime.utcnow(),
is_active=True
)
# Add to existing documents
current_docs.append(new_doc)
# Update trace metadata
self._update_trace_metadata(trace_id, current_docs)
return new_doc
def get_context_documents(self, trace_id: str) -> List[ContextDocument]:
"""Get all context documents for a trace."""
trace = self.db.query(Trace).filter(Trace.trace_id == trace_id).first()
if not trace:
raise ValueError(f"Trace {trace_id} not found")
if not trace.trace_metadata or "context_documents" not in trace.trace_metadata:
return []
# Convert dict data back to ContextDocument objects
docs_data = trace.trace_metadata["context_documents"]
return [ContextDocument.model_validate(doc_data) for doc_data in docs_data]
def update_context_document(
self,
trace_id: str,
context_id: str,
updates: UpdateContextRequest
) -> ContextDocument:
"""Update an existing context document."""
current_docs = self.get_context_documents(trace_id)
# Find document to update
doc_index = None
for i, doc in enumerate(current_docs):
if doc.id == context_id:
doc_index = i
break
if doc_index is None:
raise ValueError(f"Context document {context_id} not found")
# Update document
doc = current_docs[doc_index]
update_data = updates.dict(exclude_unset=True)
for field, value in update_data.items():
setattr(doc, field, value)
# Check for duplicate titles (excluding current doc)
other_docs = current_docs[:doc_index] + current_docs[doc_index+1:]
if updates.title and any(other_doc.title == updates.title for other_doc in other_docs):
raise ValueError(f"Context document with title '{updates.title}' already exists")
# Update trace metadata
self._update_trace_metadata(trace_id, current_docs)
return doc
def delete_context_document(self, trace_id: str, context_id: str) -> bool:
"""Delete a context document."""
current_docs = self.get_context_documents(trace_id)
# Find and remove document
updated_docs = [doc for doc in current_docs if doc.id != context_id]
if len(updated_docs) == len(current_docs):
raise ValueError(f"Context document {context_id} not found")
# Update trace metadata
self._update_trace_metadata(trace_id, updated_docs)
return True
def validate_document_type(self, document_type: ContextDocumentType) -> bool:
"""Validate document type enum."""
valid_types = [item.value for item in ContextDocumentType]
if document_type not in valid_types:
raise ValueError(f"Invalid document type. Must be one of: {valid_types}")
return True
def process_uploaded_file(
self,
file_content: str,
trace_id: str,
title: str,
document_type: ContextDocumentType,
file_name: str
) -> ContextDocument:
"""Process an uploaded file as a context document."""
# Validate content length
if len(file_content) > 100000:
raise ValueError("File content exceeds maximum length of 100,000 characters")
return self.create_context_document(
trace_id=trace_id,
title=title,
document_type=document_type,
content=file_content,
file_name=file_name
)
def _update_trace_metadata(self, trace_id: str, context_documents: List[ContextDocument]) -> None:
"""Update trace metadata with context documents."""
trace = self.db.query(Trace).filter(Trace.trace_id == trace_id).first()
if not trace:
raise ValueError(f"Trace {trace_id} not found")
# Ensure trace_metadata exists
if not trace.trace_metadata:
trace.trace_metadata = {}
# Convert ContextDocument objects to dict for JSON storage
# Use mode='json' to ensure datetime objects are serialized as strings
docs_data = [doc.model_dump(mode='json') for doc in context_documents]
trace.trace_metadata["context_documents"] = docs_data
# Mark as modified for SQLAlchemy - use flag_modified for JSON fields
flag_modified(trace, "trace_metadata")
self.db.commit()
def _generate_context_id(self) -> str:
"""Generate a unique ID for context documents."""
return str(uuid.uuid4())