Spaces:
Running
Running
File size: 15,901 Bytes
c2ea5ed |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 |
"""
Universal Parser Service for automated context document generation.
This service integrates the universal LangSmith trace parser to automatically
generate context documents when traces are uploaded or updated.
"""
import json
import logging
from typing import List, Dict, Any, Optional
from sqlalchemy.orm import Session
from sqlalchemy.orm.attributes import flag_modified
from backend.services.context_service import ContextService
from backend.models import ContextDocumentType
from backend.database.models import Trace
logger = logging.getLogger("agent_monitoring_server.services.universal_parser")
class UniversalParserService:
"""Service for automatically generating context documents from traces using universal parser."""
def __init__(self, db: Session):
self.db = db
self.context_service = ContextService(db)
def generate_trace_context_documents(self, trace_id: str, trace_content: str) -> List[Dict[str, Any]]:
"""
Generate context documents for a trace using the universal parser.
Args:
trace_id: ID of the trace
trace_content: Raw content of the trace
Returns:
List of created context document dictionaries
"""
try:
# Import the universal parser from its proper location
from agentgraph.input.parsers import GenericLangSmithParser
import tempfile
import os
# Check if trace content looks like LangSmith format
if not self._is_parseable_trace(trace_content):
logger.info(f"Trace {trace_id} is not in a parseable format, skipping universal parser")
return []
# Initialize the parser
parser = GenericLangSmithParser()
# Create a temporary file-like structure for the parser
# Extract the actual trace data if it's wrapped in a fetched trace structure
try:
# Strip line numbers first if present
cleaned_trace_content = self._strip_line_numbers(trace_content)
data = json.loads(cleaned_trace_content)
# If this is a fetched trace with nested data, extract just the trace data
if isinstance(data, dict) and 'data' in data and 'platform' in data:
logger.info(f"Detected fetched trace structure, extracting nested data for parsing")
actual_trace_data = data['data']
trace_content_for_parser = json.dumps(actual_trace_data, indent=2)
else:
trace_content_for_parser = json.dumps(data, indent=2)
except json.JSONDecodeError:
# If not valid JSON after cleaning, try to use cleaned content as-is
logger.info(f"Content is not valid JSON after cleaning, using cleaned content as-is")
trace_content_for_parser = self._strip_line_numbers(trace_content)
with tempfile.NamedTemporaryFile(mode='w', delete=False, suffix='.json') as temp_file:
temp_file.write(trace_content_for_parser)
temp_path = temp_file.name
try:
# Parse the trace
logger.info(f"Running universal parser on trace {trace_id}")
parsed_result = parser.parse_trace_file(temp_path)
if 'error' in parsed_result:
logger.warning(f"Parser error for trace {trace_id}: {parsed_result['error']}")
return []
# Store global schema view data in trace metadata
global_view = parsed_result.get('global_schema_view')
if global_view:
self._store_schema_metadata(trace_id, global_view)
logger.info(f"Stored global schema view metadata for trace {trace_id}")
# Generate context documents from the parsed results
context_docs = parser.generate_universal_context_documents(parsed_result)
if not context_docs:
logger.info(f"No context documents generated for trace {trace_id}")
return []
# Create context documents in the database
created_docs = []
for doc in context_docs:
try:
# Map document types to our enum
doc_type = self._map_document_type(doc.get('document_type', 'technical'))
# Create the context document
created_doc = self.context_service.create_context_document(
trace_id=trace_id,
title=doc.get('title', 'Universal Parser Analysis'),
document_type=doc_type,
content=doc.get('content', ''),
file_name=f"universal_parser_{doc.get('document_type', 'analysis')}.md"
)
created_docs.append(created_doc.dict())
logger.info(f"Created context document: {created_doc.title}")
except ValueError as e:
# Handle duplicate titles or other validation errors
if "already exists" in str(e):
logger.info(f"Context document already exists: {doc.get('title')}")
else:
logger.warning(f"Failed to create context document: {str(e)}")
except Exception as e:
logger.error(f"Error creating context document: {str(e)}")
logger.info(f"Successfully created {len(created_docs)} context documents for trace {trace_id}")
return created_docs
finally:
# Clean up temporary file
if os.path.exists(temp_path):
os.remove(temp_path)
except ImportError:
logger.warning("Universal parser not available - trace_schema_parser.py not found")
return []
except Exception as e:
logger.error(f"Error running universal parser on trace {trace_id}: {str(e)}")
return []
def _store_schema_metadata(self, trace_id: str, global_view) -> None:
"""Store global schema view data in trace metadata for frontend access."""
try:
# Get the trace
trace = self.db.query(Trace).filter(Trace.trace_id == trace_id).first()
if not trace:
logger.warning(f"Trace {trace_id} not found for schema metadata storage")
return
# Ensure trace_metadata exists
if not trace.trace_metadata:
trace.trace_metadata = {}
# Convert GlobalSchemaView to dictionary for JSON storage
schema_data = {
'architecture_description': global_view.architecture_description,
'execution_flow_summary': global_view.execution_flow_summary,
'component_hierarchy': global_view.component_hierarchy,
'numerical_overview': global_view.numerical_overview,
'prompt_analytics': global_view.prompt_analytics,
'system_complexity_assessment': global_view.system_complexity_assessment
}
# Store in trace metadata
trace.trace_metadata['schema_analytics'] = schema_data
# Mark as modified for SQLAlchemy
flag_modified(trace, "trace_metadata")
self.db.commit()
logger.info(f"Successfully stored schema analytics metadata for trace {trace_id}")
except Exception as e:
logger.error(f"Error storing schema metadata for trace {trace_id}: {str(e)}")
def regenerate_context_documents(self, trace_id: str, trace_content: str, force: bool = False) -> List[Dict[str, Any]]:
"""
Regenerate context documents for a trace, optionally removing existing auto-generated ones.
Args:
trace_id: ID of the trace
trace_content: Raw content of the trace
force: Whether to remove existing auto-generated context documents first
Returns:
List of created context document dictionaries
"""
if force:
try:
# Remove existing auto-generated context documents
existing_docs = self.context_service.get_context_documents(trace_id)
for doc in existing_docs:
if doc.title.startswith("Auto-generated:") or doc.file_name.startswith("universal_parser_"):
try:
self.context_service.delete_context_document(trace_id, doc.id)
logger.info(f"Removed existing auto-generated context document: {doc.title}")
except Exception as e:
logger.warning(f"Failed to remove context document {doc.title}: {str(e)}")
except Exception as e:
logger.warning(f"Error removing existing context documents: {str(e)}")
# Generate new context documents (this will also update schema metadata)
return self.generate_trace_context_documents(trace_id, trace_content)
def _is_parseable_trace(self, trace_content: str) -> bool:
"""
Check if trace content is in a format that the universal parser can handle.
Args:
trace_content: Raw trace content
Returns:
True if the trace appears to be parseable
"""
try:
# Check if content has line numbers and strip them if present
cleaned_content = self._strip_line_numbers(trace_content)
# Try to parse as JSON
data = json.loads(cleaned_content)
# Check for LangSmith-like structure
if isinstance(data, dict):
# Check for common LangSmith fields at top level
langsmith_indicators = ['traces', 'run_id', 'run_name', 'export_timestamp', 'trace_id', 'trace_name']
if any(field in data for field in langsmith_indicators):
return True
# Check for LangSmith fields nested in 'data' field (for fetched traces)
if 'data' in data and isinstance(data['data'], dict):
if any(field in data['data'] for field in langsmith_indicators):
return True
# Check for single trace structure
trace_indicators = ['start_time', 'end_time', 'run_type', 'inputs', 'outputs']
if any(field in data for field in trace_indicators):
return True
# Check for trace indicators in nested data
if 'data' in data and isinstance(data['data'], dict):
if any(field in data['data'] for field in trace_indicators):
return True
# Check inside traces array in nested data
nested_traces = data['data'].get('traces', [])
if isinstance(nested_traces, list) and nested_traces:
first_trace = nested_traces[0]
if isinstance(first_trace, dict) and any(field in first_trace for field in trace_indicators):
return True
# Check for traces array at top level
traces = data.get('traces', [])
if isinstance(traces, list) and traces:
first_trace = traces[0]
if isinstance(first_trace, dict) and any(field in first_trace for field in trace_indicators):
return True
# Check for runs array (LangSmith format)
runs = data.get('runs', [])
if isinstance(runs, list) and runs:
first_run = runs[0]
if isinstance(first_run, dict) and any(field in first_run for field in trace_indicators):
return True
return False
except json.JSONDecodeError:
# Not JSON, check for other formats
# Could add support for other trace formats here
return False
def _strip_line_numbers(self, content: str) -> str:
"""
Strip line numbers from content if present.
Handles formats like:
<L1> content
<L2> content
etc.
Args:
content: Content that may have line numbers
Returns:
Content with line numbers stripped
"""
import re
# Check if content has line number pattern
if '<L' in content and '>' in content:
# Remove line numbers using regex
# Pattern matches <L{number}> at the start of lines
cleaned_content = re.sub(r'^<L\d+>\s*', '', content, flags=re.MULTILINE)
return cleaned_content
return content
def _map_document_type(self, parser_doc_type: str) -> ContextDocumentType:
"""
Map universal parser document types to our ContextDocumentType enum.
Args:
parser_doc_type: Document type from the universal parser
Returns:
ContextDocumentType enum value
"""
# Map parser document types to our enum
type_mapping = {
'component_structure': ContextDocumentType.SCHEMA,
'execution_pattern': ContextDocumentType.DOCUMENTATION,
'performance_profile': ContextDocumentType.DOCUMENTATION,
'system_indicators': ContextDocumentType.DOCUMENTATION,
'langgraph_workflow': ContextDocumentType.GUIDELINES,
'human_interaction': ContextDocumentType.GUIDELINES
}
return type_mapping.get(parser_doc_type, ContextDocumentType.DOCUMENTATION)
def auto_generate_context_documents(trace_id: str, trace_content: str, db: Session) -> List[Dict[str, Any]]:
"""
Convenience function to auto-generate context documents for a trace.
Args:
trace_id: ID of the trace
trace_content: Raw content of the trace
db: Database session
Returns:
List of created context document dictionaries
"""
service = UniversalParserService(db)
return service.generate_trace_context_documents(trace_id, trace_content)
def regenerate_context_documents(trace_id: str, trace_content: str, db: Session, force: bool = False) -> List[Dict[str, Any]]:
"""
Convenience function to regenerate context documents for a trace.
Args:
trace_id: ID of the trace
trace_content: Raw content of the trace
db: Database session
force: Whether to remove existing auto-generated context documents first
Returns:
List of created context document dictionaries
"""
service = UniversalParserService(db)
return service.regenerate_context_documents(trace_id, trace_content, force) |