AgentGraph / backend /routers /temporal_graphs.py
wu981526092's picture
Security: Fix critical vulnerabilities before public release
bcbd2ec
"""
Temporal Graph API Router
Provides endpoints for temporal force-directed graph visualization.
"""
import logging
from typing import List, Dict, Any
from fastapi import APIRouter, Depends, HTTPException
from sqlalchemy.orm import Session
from backend.dependencies import get_db
from backend.database.utils import get_temporal_windows_by_trace_id, get_trace
from backend.database.models import KnowledgeGraph
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/api", tags=["temporal-graphs"])
@router.get("/temporal-graph/{trace_id}")
async def get_temporal_graph_data(trace_id: str, processing_run_id: str = None, db: Session = Depends(get_db)):
"""
Get temporal graph data for a specific trace.
Returns all knowledge graph windows in sequence for animation, plus the full/merged version.
Args:
trace_id: ID of the trace
processing_run_id: Optional processing run ID to filter by specific run
"""
try:
logger.info(f"Attempting to load temporal graph for trace_id: {trace_id}")
if processing_run_id:
logger.info(f"Filtering by processing_run_id: {processing_run_id}")
# Get temporal windows data (includes windowed KGs and full KG)
temporal_data = get_temporal_windows_by_trace_id(db, trace_id, processing_run_id)
if not temporal_data["trace_info"]:
logger.warning(f"Trace with ID {trace_id} not found in database")
raise HTTPException(status_code=404, detail=f"Trace with ID {trace_id} not found")
windows = temporal_data["windows"]
full_kg = temporal_data["full_kg"]
trace_info = temporal_data["trace_info"]
logger.info(f"Found trace: {trace_info['trace_id']} (title: {trace_info['title']})")
logger.info(f"Found {len(windows)} windows and {'1' if full_kg else '0'} full KG for trace {trace_id}")
if not windows:
raise HTTPException(
status_code=404,
detail=f"No knowledge graph windows found for trace {trace_id}" +
(f" with processing_run_id {processing_run_id}" if processing_run_id else "")
)
# Check if we have enough windows for temporal visualization
if len(windows) < 2:
raise HTTPException(
status_code=400,
detail=f"Trace {trace_id} has insufficient windows for temporal visualization (found {len(windows)}, need at least 2)"
)
# Build response data for windowed KGs
windows_data = []
for window in windows:
graph_data = window.graph_data or {}
window_info = {
"window_index": window.window_index,
"entities": graph_data.get("entities", []),
"relations": graph_data.get("relations", []),
"metadata": {
"entity_count": window.entity_count,
"relation_count": window.relation_count,
"window_start_char": window.window_start_char,
"window_end_char": window.window_end_char,
"creation_timestamp": window.creation_timestamp.isoformat() if window.creation_timestamp else None,
"status": window.status,
"filename": window.filename
}
}
windows_data.append(window_info)
# Build response data for full KG (if available)
full_kg_data = None
if full_kg:
graph_data = full_kg.graph_data or {}
full_kg_data = {
"entities": graph_data.get("entities", []),
"relations": graph_data.get("relations", []),
"metadata": {
"entity_count": full_kg.entity_count,
"relation_count": full_kg.relation_count,
"window_total": full_kg.window_total,
"creation_timestamp": full_kg.creation_timestamp.isoformat() if full_kg.creation_timestamp else None,
"status": full_kg.status,
"filename": full_kg.filename
}
}
# Sort windowed data by window_index to ensure proper temporal order
windows_data.sort(key=lambda x: x["window_index"])
response = {
"trace_id": trace_id,
"trace_title": trace_info["title"] or f"Trace {trace_id[:8]}",
"trace_description": trace_info["description"],
"total_windows": len(windows_data),
"windows": windows_data,
"full_kg": full_kg_data,
"has_full_version": full_kg is not None
}
logger.info(f"Returning temporal graph data for trace {trace_id} with {len(windows_data)} windows and {'full version' if full_kg else 'no full version'}")
return response
except HTTPException:
raise
except Exception as e:
logger.error(f"Error getting temporal graph data for trace {trace_id}: {str(e)}")
raise HTTPException(status_code=500, detail="An internal error occurred")
@router.get("/traces-with-windows")
async def get_traces_with_windows(db: Session = Depends(get_db)):
"""
Get all traces that have multiple knowledge graph windows suitable for temporal visualization.
Includes both traces with records and orphaned KGs (KGs with trace_id but no trace record).
"""
try:
from sqlalchemy import func
from backend.database.models import Trace, KnowledgeGraph
# Query traces that have multiple knowledge graph windows (with trace records)
traces_with_records = (
db.query(
Trace.trace_id,
Trace.title,
Trace.description,
Trace.upload_timestamp,
func.count(KnowledgeGraph.id).label('window_count')
)
.join(KnowledgeGraph, Trace.trace_id == KnowledgeGraph.trace_id)
.group_by(Trace.trace_id, Trace.title, Trace.description, Trace.upload_timestamp)
.having(func.count(KnowledgeGraph.id) >= 2)
.order_by(Trace.upload_timestamp.desc())
.all()
)
# Query orphaned KGs (KGs with trace_id but no trace record) that have multiple windows
orphaned_kg_traces = (
db.query(
KnowledgeGraph.trace_id,
func.count(KnowledgeGraph.id).label('window_count'),
func.min(KnowledgeGraph.creation_timestamp).label('earliest_creation')
)
.outerjoin(Trace, KnowledgeGraph.trace_id == Trace.trace_id)
.filter(Trace.trace_id.is_(None)) # No trace record exists
.filter(KnowledgeGraph.trace_id.isnot(None)) # But KG has a trace_id
.group_by(KnowledgeGraph.trace_id)
.having(func.count(KnowledgeGraph.id) >= 2)
.order_by(func.min(KnowledgeGraph.creation_timestamp).desc())
.all()
)
result = []
# Add traces with records
for trace_info in traces_with_records:
result.append({
"trace_id": trace_info.trace_id,
"title": trace_info.title or f"Trace {trace_info.trace_id[:8]}...",
"description": trace_info.description,
"upload_timestamp": trace_info.upload_timestamp.isoformat() if trace_info.upload_timestamp else None,
"window_count": trace_info.window_count,
"has_trace_record": True
})
# Add orphaned KG traces
for kg_trace_info in orphaned_kg_traces:
result.append({
"trace_id": kg_trace_info.trace_id,
"title": f"Trace {kg_trace_info.trace_id[:8]}...",
"description": "Knowledge graphs exist but no trace record found",
"upload_timestamp": kg_trace_info.earliest_creation.isoformat() if kg_trace_info.earliest_creation else None,
"window_count": kg_trace_info.window_count,
"has_trace_record": False
})
# Sort by upload/creation timestamp
result.sort(key=lambda x: x["upload_timestamp"] or "1970-01-01", reverse=True)
logger.info(f"Found {len(traces_with_records)} traces with records and {len(orphaned_kg_traces)} orphaned KG traces")
return {"traces": result}
except Exception as e:
logger.error(f"Error getting traces with windows: {str(e)}")
raise HTTPException(status_code=500, detail="An internal error occurred")
@router.get("/trace/{trace_id}/processing-runs")
async def get_processing_runs_for_trace(trace_id: str, db: Session = Depends(get_db)):
"""
Get all processing runs for a specific trace.
Returns information about each processing run including run ID, timestamp, and window count.
"""
try:
from sqlalchemy import func
from backend.database.models import KnowledgeGraph
# Query all processing runs for this trace
processing_runs = (
db.query(
KnowledgeGraph.processing_run_id,
func.count(KnowledgeGraph.id).label('kg_count'),
func.min(KnowledgeGraph.creation_timestamp).label('earliest_creation'),
func.max(KnowledgeGraph.creation_timestamp).label('latest_creation'),
func.sum(KnowledgeGraph.entity_count).label('total_entities'),
func.sum(KnowledgeGraph.relation_count).label('total_relations')
)
.filter(KnowledgeGraph.trace_id == trace_id)
.filter(KnowledgeGraph.processing_run_id.isnot(None))
.group_by(KnowledgeGraph.processing_run_id)
.order_by(func.min(KnowledgeGraph.creation_timestamp).desc())
.all()
)
# Also check for KGs without processing_run_id (legacy)
legacy_kgs = (
db.query(func.count(KnowledgeGraph.id).label('kg_count'))
.filter(KnowledgeGraph.trace_id == trace_id)
.filter(KnowledgeGraph.processing_run_id.is_(None))
.first()
)
result = []
# Add processing runs
for run_info in processing_runs:
result.append({
"processing_run_id": run_info.processing_run_id,
"kg_count": run_info.kg_count,
"earliest_creation": run_info.earliest_creation.isoformat() if run_info.earliest_creation else None,
"latest_creation": run_info.latest_creation.isoformat() if run_info.latest_creation else None,
"total_entities": run_info.total_entities or 0,
"total_relations": run_info.total_relations or 0,
"is_legacy": False
})
# Add legacy KGs if any exist
if legacy_kgs and legacy_kgs.kg_count > 0:
result.append({
"processing_run_id": None,
"kg_count": legacy_kgs.kg_count,
"earliest_creation": None,
"latest_creation": None,
"total_entities": 0,
"total_relations": 0,
"is_legacy": True
})
logger.info(f"Found {len(processing_runs)} processing runs for trace {trace_id}")
return {"processing_runs": result}
except Exception as e:
logger.error(f"Error getting processing runs for trace {trace_id}: {str(e)}")
raise HTTPException(status_code=500, detail="An internal error occurred")