AgentGraph / backend /services /processing_service.py
wu981526092's picture
add
7bc750c
import logging
import asyncio
import uuid
import time
import traceback
from typing import Dict, Any
from sqlalchemy.orm import Session
from backend.database import models
from backend.database.utils import get_trace, save_knowledge_graph, update_trace_status, get_context_documents_from_trace
from agentgraph.extraction.graph_processing import SlidingWindowMonitor
from agentgraph.input.text_processing import ChunkingService
from backend.services.task_service import update_task_status
logger = logging.getLogger(__name__)
class PipelineError(Exception):
"""Exception raised for errors in the pipeline processing."""
pass
async def process_trace_task(trace_id: str, session: Session, task_id: str = None, splitter_type: str = "agent_semantic", force_regenerate: bool = False, method_name: str = "production", model: str = "gpt-5-mini", chunking_config = None) -> Dict[str, Any]:
"""
Process a single trace and return the merged knowledge graph with proper timeout handling.
"""
start_time = time.time()
max_processing_time = 3600 # 1 hour maximum processing time
try:
if task_id:
update_task_status(task_id, "PROCESSING", "Initializing trace processing", 5)
trace = get_trace(session, trace_id)
if not trace:
raise PipelineError(f"Trace with ID {trace_id} not found")
if not force_regenerate:
existing_kgs = session.query(models.KnowledgeGraph).filter(
models.KnowledgeGraph.trace_id == trace_id,
models.KnowledgeGraph.status == "created"
).all()
if existing_kgs and task_id:
update_task_status(task_id, "COMPLETED", f"Found {len(existing_kgs)} existing knowledge graphs. Use force_regenerate=True to create new graphs.", 100)
return {
"message": f"Trace {trace_id} already processed",
"existing_graphs": len(existing_kgs),
"status": "already_processed",
"suggestion": "Use force_regenerate=True to bypass this check and generate new graphs"
}
else:
if task_id:
update_task_status(task_id, "PROCESSING", "Force regenerate enabled - creating new knowledge graphs", 5)
trace_content = trace.content
if not trace_content:
raise PipelineError(f"Trace {trace_id} has no content")
context_documents = get_context_documents_from_trace(session, trace_id)
context_count = len(context_documents) if context_documents else 0
logger.info(f"Retrieved {context_count} context documents for trace {trace_id}")
def check_timeout():
if time.time() - start_time > max_processing_time:
raise PipelineError(f"Processing timeout exceeded {max_processing_time} seconds")
check_timeout()
if task_id:
context_msg = f" with {context_count} context documents" if context_count > 0 else ""
update_task_status(task_id, "PROCESSING", f"Splitting trace content ({len(trace_content)} characters){context_msg}", 10)
logger.info(f"Splitting trace content with {splitter_type} splitter")
chunking_service = ChunkingService(
default_batch_size=3,
default_model="gpt-5-mini"
)
check_timeout()
# Extract chunking parameters from config
chunk_kwargs = {}
if chunking_config:
# Map min_chunk_size and max_chunk_size to the ChunkingService parameters
# For AgentAwareSemanticSplitter, max_chunk_size maps to window_size
if chunking_config.max_chunk_size:
chunk_kwargs["window_size"] = chunking_config.max_chunk_size
# Pass min_chunk_size directly
if chunking_config.min_chunk_size:
chunk_kwargs["min_chunk_size"] = chunking_config.min_chunk_size
# Calculate overlap_size based on window_size (5% default)
if chunking_config.max_chunk_size:
chunk_kwargs["overlap_size"] = max(1000, int(chunking_config.max_chunk_size * 0.05))
logger.info(f"Using custom chunking config: {chunking_config}")
logger.info(f"Mapped to chunk_kwargs: {chunk_kwargs}")
chunks = chunking_service.chunk_trace_content(
content=trace_content,
splitter_type=splitter_type,
**chunk_kwargs
)
logger.info(f"Split content into {len(chunks)} chunks")
if not chunks:
raise PipelineError("No chunks were created from the trace content")
if task_id:
update_task_status(task_id, "PROCESSING", f"Processing {len(chunks)} chunks", 20)
processing_run_id = str(uuid.uuid4())[:8]
monitor = SlidingWindowMonitor(
batch_size=3,
parallel_processing=True,
model=model,
source_trace_id=trace_id,
processing_run_id=processing_run_id,
method_name=method_name,
context_documents=context_documents,
trace_content=trace_content,
trace_metadata=trace.trace_metadata
)
if task_id:
def progress_callback(stage, step, total_steps, message=None):
check_timeout()
base_progress = 30
stage_progress = 50
raw_progress = base_progress + int((step / total_steps) * stage_progress)
overall_progress = min(raw_progress, 80)
status_message = f"{stage}: {step}/{total_steps}"
if message:
status_message += f" - {message}"
update_task_status(task_id, "PROCESSING", status_message, overall_progress)
else:
def progress_callback(stage, step, total_steps, message=None):
check_timeout()
pass
check_timeout()
output_identifier = f"trace_{trace_id}_{processing_run_id}"
if task_id:
update_task_status(task_id, "PROCESSING", "Starting knowledge graph processing", 30)
try:
final_kg_data = await asyncio.wait_for(
monitor.process_trace(chunks, output_identifier, progress_callback=progress_callback),
timeout=max_processing_time - (time.time() - start_time)
)
except asyncio.TimeoutError:
raise PipelineError(f"Trace processing timed out after {max_processing_time} seconds")
check_timeout()
if not isinstance(final_kg_data, dict):
raise PipelineError("Invalid result format from trace processing - expected dictionary")
if 'final_kg' not in final_kg_data:
raise PipelineError("Invalid result format - missing 'final_kg' key in response")
final_kg_info = final_kg_data['final_kg']
if 'graph_data' not in final_kg_info:
raise PipelineError("Invalid result format - missing 'graph_data' key in final_kg")
actual_graph_data = final_kg_info['graph_data']
try:
from agentgraph.reconstruction.content_reference_resolver import ContentReferenceResolver
resolver = ContentReferenceResolver()
actual_graph_data = resolver.resolve_knowledge_graph_content(
actual_graph_data,
trace_content,
{"window_index": 0}
)
except Exception as _resolver_err:
logger.warning(f"ContentReferenceResolver failed on merged KG: {_resolver_err}")
if not actual_graph_data.get('entities') and not actual_graph_data.get('relations'):
raise PipelineError("Knowledge graph processing completed but no entities or relations were found")
if task_id:
update_task_status(task_id, "PROCESSING", "Saving knowledge graph to database", 85)
window_count = final_kg_data.get('window_count', len(chunks))
kg_result = save_knowledge_graph(
session,
output_identifier,
actual_graph_data,
trace_id=trace_id,
window_total=window_count,
processing_run_id=processing_run_id
)
kg_id = kg_result.id
window_kgs_saved = 0
if 'window_kgs' in final_kg_data and final_kg_data['window_kgs']:
if task_id:
update_task_status(task_id, "PROCESSING", "Saving window knowledge graphs to database", 90)
logger.info(f"Saving {len(final_kg_data['window_kgs'])} window knowledge graphs")
for window_kg_data in final_kg_data['window_kgs']:
try:
window_filename = window_kg_data.get('filename')
window_graph_data = window_kg_data.get('graph_data')
window_index = window_kg_data.get('window_index')
window_total = window_kg_data.get('window_total')
window_start_char = window_kg_data.get('window_start_char')
window_end_char = window_kg_data.get('window_end_char')
window_processing_run_id = window_kg_data.get('processing_run_id', processing_run_id)
window_trace_id = window_kg_data.get('trace_id', trace_id)
window_kg_result = save_knowledge_graph(
session,
window_filename,
window_graph_data,
trace_id=window_trace_id,
window_index=window_index,
window_total=window_total,
window_start_char=window_start_char,
window_end_char=window_end_char,
processing_run_id=window_processing_run_id
)
window_kgs_saved += 1
logger.debug(f"Saved window KG {window_index}: {window_filename} (ID: {window_kg_result.id})")
except Exception as window_error:
logger.error(f"Failed to save window knowledge graph {window_kg_data.get('filename', 'unknown')}: {window_error}")
logger.info(f"Successfully saved {window_kgs_saved} window knowledge graphs")
update_trace_status(session, trace_id, "processed")
processing_duration = time.time() - start_time
if task_id:
update_task_status(
task_id,
"COMPLETED",
f"Processing completed in {processing_duration:.1f}s. Knowledge graph saved with ID: {kg_id}",
100
)
logger.info(f"Successfully processed trace {trace_id} in {processing_duration:.1f}s")
return {
"trace_id": trace_id,
"knowledge_graph_id": kg_id,
"processing_duration": processing_duration,
"chunks_processed": len(chunks),
"window_graphs_saved": window_kgs_saved,
"entities_count": len(actual_graph_data.get('entities', [])),
"relations_count": len(actual_graph_data.get('relations', [])),
"status": "completed"
}
except Exception as e:
error_message = f"Error processing trace {trace_id}: {str(e)}"
logger.error(error_message)
logger.error(traceback.format_exc())
if task_id:
update_task_status(task_id, "FAILED", error_message)
try:
update_trace_status(session, trace_id, "failed")
except Exception as update_error:
logger.error(f"Failed to update trace status: {update_error}")
raise PipelineError(error_message)