""" Temporal Graph API Router Provides endpoints for temporal force-directed graph visualization. """ import logging from typing import List, Dict, Any from fastapi import APIRouter, Depends, HTTPException from sqlalchemy.orm import Session from backend.dependencies import get_db from backend.database.utils import get_temporal_windows_by_trace_id, get_trace from backend.database.models import KnowledgeGraph logger = logging.getLogger(__name__) router = APIRouter(prefix="/api", tags=["temporal-graphs"]) @router.get("/temporal-graph/{trace_id}") async def get_temporal_graph_data(trace_id: str, processing_run_id: str = None, db: Session = Depends(get_db)): """ Get temporal graph data for a specific trace. Returns all knowledge graph windows in sequence for animation, plus the full/merged version. Args: trace_id: ID of the trace processing_run_id: Optional processing run ID to filter by specific run """ try: logger.info(f"Attempting to load temporal graph for trace_id: {trace_id}") if processing_run_id: logger.info(f"Filtering by processing_run_id: {processing_run_id}") # Get temporal windows data (includes windowed KGs and full KG) temporal_data = get_temporal_windows_by_trace_id(db, trace_id, processing_run_id) if not temporal_data["trace_info"]: logger.warning(f"Trace with ID {trace_id} not found in database") raise HTTPException(status_code=404, detail=f"Trace with ID {trace_id} not found") windows = temporal_data["windows"] full_kg = temporal_data["full_kg"] trace_info = temporal_data["trace_info"] logger.info(f"Found trace: {trace_info['trace_id']} (title: {trace_info['title']})") logger.info(f"Found {len(windows)} windows and {'1' if full_kg else '0'} full KG for trace {trace_id}") if not windows: raise HTTPException( status_code=404, detail=f"No knowledge graph windows found for trace {trace_id}" + (f" with processing_run_id {processing_run_id}" if processing_run_id else "") ) # Check if we have enough windows for temporal visualization if len(windows) < 2: raise HTTPException( status_code=400, detail=f"Trace {trace_id} has insufficient windows for temporal visualization (found {len(windows)}, need at least 2)" ) # Build response data for windowed KGs windows_data = [] for window in windows: graph_data = window.graph_data or {} window_info = { "window_index": window.window_index, "entities": graph_data.get("entities", []), "relations": graph_data.get("relations", []), "metadata": { "entity_count": window.entity_count, "relation_count": window.relation_count, "window_start_char": window.window_start_char, "window_end_char": window.window_end_char, "creation_timestamp": window.creation_timestamp.isoformat() if window.creation_timestamp else None, "status": window.status, "filename": window.filename } } windows_data.append(window_info) # Build response data for full KG (if available) full_kg_data = None if full_kg: graph_data = full_kg.graph_data or {} full_kg_data = { "entities": graph_data.get("entities", []), "relations": graph_data.get("relations", []), "metadata": { "entity_count": full_kg.entity_count, "relation_count": full_kg.relation_count, "window_total": full_kg.window_total, "creation_timestamp": full_kg.creation_timestamp.isoformat() if full_kg.creation_timestamp else None, "status": full_kg.status, "filename": full_kg.filename } } # Sort windowed data by window_index to ensure proper temporal order windows_data.sort(key=lambda x: x["window_index"]) response = { "trace_id": trace_id, "trace_title": trace_info["title"] or f"Trace {trace_id[:8]}", "trace_description": trace_info["description"], "total_windows": len(windows_data), "windows": windows_data, "full_kg": full_kg_data, "has_full_version": full_kg is not None } logger.info(f"Returning temporal graph data for trace {trace_id} with {len(windows_data)} windows and {'full version' if full_kg else 'no full version'}") return response except HTTPException: raise except Exception as e: logger.error(f"Error getting temporal graph data for trace {trace_id}: {str(e)}") raise HTTPException(status_code=500, detail="An internal error occurred") @router.get("/traces-with-windows") async def get_traces_with_windows(db: Session = Depends(get_db)): """ Get all traces that have multiple knowledge graph windows suitable for temporal visualization. Includes both traces with records and orphaned KGs (KGs with trace_id but no trace record). """ try: from sqlalchemy import func from backend.database.models import Trace, KnowledgeGraph # Query traces that have multiple knowledge graph windows (with trace records) traces_with_records = ( db.query( Trace.trace_id, Trace.title, Trace.description, Trace.upload_timestamp, func.count(KnowledgeGraph.id).label('window_count') ) .join(KnowledgeGraph, Trace.trace_id == KnowledgeGraph.trace_id) .group_by(Trace.trace_id, Trace.title, Trace.description, Trace.upload_timestamp) .having(func.count(KnowledgeGraph.id) >= 2) .order_by(Trace.upload_timestamp.desc()) .all() ) # Query orphaned KGs (KGs with trace_id but no trace record) that have multiple windows orphaned_kg_traces = ( db.query( KnowledgeGraph.trace_id, func.count(KnowledgeGraph.id).label('window_count'), func.min(KnowledgeGraph.creation_timestamp).label('earliest_creation') ) .outerjoin(Trace, KnowledgeGraph.trace_id == Trace.trace_id) .filter(Trace.trace_id.is_(None)) # No trace record exists .filter(KnowledgeGraph.trace_id.isnot(None)) # But KG has a trace_id .group_by(KnowledgeGraph.trace_id) .having(func.count(KnowledgeGraph.id) >= 2) .order_by(func.min(KnowledgeGraph.creation_timestamp).desc()) .all() ) result = [] # Add traces with records for trace_info in traces_with_records: result.append({ "trace_id": trace_info.trace_id, "title": trace_info.title or f"Trace {trace_info.trace_id[:8]}...", "description": trace_info.description, "upload_timestamp": trace_info.upload_timestamp.isoformat() if trace_info.upload_timestamp else None, "window_count": trace_info.window_count, "has_trace_record": True }) # Add orphaned KG traces for kg_trace_info in orphaned_kg_traces: result.append({ "trace_id": kg_trace_info.trace_id, "title": f"Trace {kg_trace_info.trace_id[:8]}...", "description": "Knowledge graphs exist but no trace record found", "upload_timestamp": kg_trace_info.earliest_creation.isoformat() if kg_trace_info.earliest_creation else None, "window_count": kg_trace_info.window_count, "has_trace_record": False }) # Sort by upload/creation timestamp result.sort(key=lambda x: x["upload_timestamp"] or "1970-01-01", reverse=True) logger.info(f"Found {len(traces_with_records)} traces with records and {len(orphaned_kg_traces)} orphaned KG traces") return {"traces": result} except Exception as e: logger.error(f"Error getting traces with windows: {str(e)}") raise HTTPException(status_code=500, detail="An internal error occurred") @router.get("/trace/{trace_id}/processing-runs") async def get_processing_runs_for_trace(trace_id: str, db: Session = Depends(get_db)): """ Get all processing runs for a specific trace. Returns information about each processing run including run ID, timestamp, and window count. """ try: from sqlalchemy import func from backend.database.models import KnowledgeGraph # Query all processing runs for this trace processing_runs = ( db.query( KnowledgeGraph.processing_run_id, func.count(KnowledgeGraph.id).label('kg_count'), func.min(KnowledgeGraph.creation_timestamp).label('earliest_creation'), func.max(KnowledgeGraph.creation_timestamp).label('latest_creation'), func.sum(KnowledgeGraph.entity_count).label('total_entities'), func.sum(KnowledgeGraph.relation_count).label('total_relations') ) .filter(KnowledgeGraph.trace_id == trace_id) .filter(KnowledgeGraph.processing_run_id.isnot(None)) .group_by(KnowledgeGraph.processing_run_id) .order_by(func.min(KnowledgeGraph.creation_timestamp).desc()) .all() ) # Also check for KGs without processing_run_id (legacy) legacy_kgs = ( db.query(func.count(KnowledgeGraph.id).label('kg_count')) .filter(KnowledgeGraph.trace_id == trace_id) .filter(KnowledgeGraph.processing_run_id.is_(None)) .first() ) result = [] # Add processing runs for run_info in processing_runs: result.append({ "processing_run_id": run_info.processing_run_id, "kg_count": run_info.kg_count, "earliest_creation": run_info.earliest_creation.isoformat() if run_info.earliest_creation else None, "latest_creation": run_info.latest_creation.isoformat() if run_info.latest_creation else None, "total_entities": run_info.total_entities or 0, "total_relations": run_info.total_relations or 0, "is_legacy": False }) # Add legacy KGs if any exist if legacy_kgs and legacy_kgs.kg_count > 0: result.append({ "processing_run_id": None, "kg_count": legacy_kgs.kg_count, "earliest_creation": None, "latest_creation": None, "total_entities": 0, "total_relations": 0, "is_legacy": True }) logger.info(f"Found {len(processing_runs)} processing runs for trace {trace_id}") return {"processing_runs": result} except Exception as e: logger.error(f"Error getting processing runs for trace {trace_id}: {str(e)}") raise HTTPException(status_code=500, detail="An internal error occurred")