Spaces:
Running
Running
File size: 6,515 Bytes
c2ea5ed |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 |
"""
Context Service for managing context documents in trace metadata.
"""
import json
import uuid
from datetime import datetime
from typing import List, Dict, Any, Optional
from sqlalchemy.orm import Session
from sqlalchemy.orm.attributes import flag_modified
from backend.database.models import Trace
from backend.models import (
ContextDocument,
CreateContextRequest,
UpdateContextRequest,
ContextDocumentType
)
class ContextService:
"""Service for managing context documents stored in trace metadata."""
def __init__(self, db: Session):
self.db = db
def create_context_document(
self,
trace_id: str,
title: str,
document_type: ContextDocumentType,
content: str,
file_name: Optional[str] = None
) -> ContextDocument:
"""Create a new context document for a trace."""
# Validate document type
self.validate_document_type(document_type)
# Get current context documents
current_docs = self.get_context_documents(trace_id)
# Check limits
if len(current_docs) >= 20:
raise ValueError("Maximum of 20 context documents per trace allowed")
# Check for duplicate titles
if any(doc.title == title for doc in current_docs):
raise ValueError(f"Context document with title '{title}' already exists")
# Create new document
new_doc = ContextDocument(
id=self._generate_context_id(),
title=title,
document_type=document_type,
content=content,
file_name=file_name,
created_at=datetime.utcnow(),
is_active=True
)
# Add to existing documents
current_docs.append(new_doc)
# Update trace metadata
self._update_trace_metadata(trace_id, current_docs)
return new_doc
def get_context_documents(self, trace_id: str) -> List[ContextDocument]:
"""Get all context documents for a trace."""
trace = self.db.query(Trace).filter(Trace.trace_id == trace_id).first()
if not trace:
raise ValueError(f"Trace {trace_id} not found")
if not trace.trace_metadata or "context_documents" not in trace.trace_metadata:
return []
# Convert dict data back to ContextDocument objects
docs_data = trace.trace_metadata["context_documents"]
return [ContextDocument.model_validate(doc_data) for doc_data in docs_data]
def update_context_document(
self,
trace_id: str,
context_id: str,
updates: UpdateContextRequest
) -> ContextDocument:
"""Update an existing context document."""
current_docs = self.get_context_documents(trace_id)
# Find document to update
doc_index = None
for i, doc in enumerate(current_docs):
if doc.id == context_id:
doc_index = i
break
if doc_index is None:
raise ValueError(f"Context document {context_id} not found")
# Update document
doc = current_docs[doc_index]
update_data = updates.dict(exclude_unset=True)
for field, value in update_data.items():
setattr(doc, field, value)
# Check for duplicate titles (excluding current doc)
other_docs = current_docs[:doc_index] + current_docs[doc_index+1:]
if updates.title and any(other_doc.title == updates.title for other_doc in other_docs):
raise ValueError(f"Context document with title '{updates.title}' already exists")
# Update trace metadata
self._update_trace_metadata(trace_id, current_docs)
return doc
def delete_context_document(self, trace_id: str, context_id: str) -> bool:
"""Delete a context document."""
current_docs = self.get_context_documents(trace_id)
# Find and remove document
updated_docs = [doc for doc in current_docs if doc.id != context_id]
if len(updated_docs) == len(current_docs):
raise ValueError(f"Context document {context_id} not found")
# Update trace metadata
self._update_trace_metadata(trace_id, updated_docs)
return True
def validate_document_type(self, document_type: ContextDocumentType) -> bool:
"""Validate document type enum."""
valid_types = [item.value for item in ContextDocumentType]
if document_type not in valid_types:
raise ValueError(f"Invalid document type. Must be one of: {valid_types}")
return True
def process_uploaded_file(
self,
file_content: str,
trace_id: str,
title: str,
document_type: ContextDocumentType,
file_name: str
) -> ContextDocument:
"""Process an uploaded file as a context document."""
# Validate content length
if len(file_content) > 100000:
raise ValueError("File content exceeds maximum length of 100,000 characters")
return self.create_context_document(
trace_id=trace_id,
title=title,
document_type=document_type,
content=file_content,
file_name=file_name
)
def _update_trace_metadata(self, trace_id: str, context_documents: List[ContextDocument]) -> None:
"""Update trace metadata with context documents."""
trace = self.db.query(Trace).filter(Trace.trace_id == trace_id).first()
if not trace:
raise ValueError(f"Trace {trace_id} not found")
# Ensure trace_metadata exists
if not trace.trace_metadata:
trace.trace_metadata = {}
# Convert ContextDocument objects to dict for JSON storage
# Use mode='json' to ensure datetime objects are serialized as strings
docs_data = [doc.model_dump(mode='json') for doc in context_documents]
trace.trace_metadata["context_documents"] = docs_data
# Mark as modified for SQLAlchemy - use flag_modified for JSON fields
flag_modified(trace, "trace_metadata")
self.db.commit()
def _generate_context_id(self) -> str:
"""Generate a unique ID for context documents."""
return str(uuid.uuid4()) |