Spaces:
Sleeping
Sleeping
| """ | |
| Temporal Graph API Router | |
| Provides endpoints for temporal force-directed graph visualization. | |
| """ | |
| import logging | |
| from typing import List, Dict, Any | |
| from fastapi import APIRouter, Depends, HTTPException | |
| from sqlalchemy.orm import Session | |
| from backend.dependencies import get_db | |
| from backend.database.utils import get_temporal_windows_by_trace_id, get_trace | |
| from backend.database.models import KnowledgeGraph | |
| logger = logging.getLogger(__name__) | |
| router = APIRouter(prefix="/api", tags=["temporal-graphs"]) | |
| async def get_temporal_graph_data(trace_id: str, processing_run_id: str = None, db: Session = Depends(get_db)): | |
| """ | |
| Get temporal graph data for a specific trace. | |
| Returns all knowledge graph windows in sequence for animation, plus the full/merged version. | |
| Args: | |
| trace_id: ID of the trace | |
| processing_run_id: Optional processing run ID to filter by specific run | |
| """ | |
| try: | |
| logger.info(f"Attempting to load temporal graph for trace_id: {trace_id}") | |
| if processing_run_id: | |
| logger.info(f"Filtering by processing_run_id: {processing_run_id}") | |
| # Get temporal windows data (includes windowed KGs and full KG) | |
| temporal_data = get_temporal_windows_by_trace_id(db, trace_id, processing_run_id) | |
| if not temporal_data["trace_info"]: | |
| logger.warning(f"Trace with ID {trace_id} not found in database") | |
| raise HTTPException(status_code=404, detail=f"Trace with ID {trace_id} not found") | |
| windows = temporal_data["windows"] | |
| full_kg = temporal_data["full_kg"] | |
| trace_info = temporal_data["trace_info"] | |
| logger.info(f"Found trace: {trace_info['trace_id']} (title: {trace_info['title']})") | |
| logger.info(f"Found {len(windows)} windows and {'1' if full_kg else '0'} full KG for trace {trace_id}") | |
| if not windows: | |
| raise HTTPException( | |
| status_code=404, | |
| detail=f"No knowledge graph windows found for trace {trace_id}" + | |
| (f" with processing_run_id {processing_run_id}" if processing_run_id else "") | |
| ) | |
| # Check if we have enough windows for temporal visualization | |
| if len(windows) < 2: | |
| raise HTTPException( | |
| status_code=400, | |
| detail=f"Trace {trace_id} has insufficient windows for temporal visualization (found {len(windows)}, need at least 2)" | |
| ) | |
| # Build response data for windowed KGs | |
| windows_data = [] | |
| for window in windows: | |
| graph_data = window.graph_data or {} | |
| window_info = { | |
| "window_index": window.window_index, | |
| "entities": graph_data.get("entities", []), | |
| "relations": graph_data.get("relations", []), | |
| "metadata": { | |
| "entity_count": window.entity_count, | |
| "relation_count": window.relation_count, | |
| "window_start_char": window.window_start_char, | |
| "window_end_char": window.window_end_char, | |
| "creation_timestamp": window.creation_timestamp.isoformat() if window.creation_timestamp else None, | |
| "status": window.status, | |
| "filename": window.filename | |
| } | |
| } | |
| windows_data.append(window_info) | |
| # Build response data for full KG (if available) | |
| full_kg_data = None | |
| if full_kg: | |
| graph_data = full_kg.graph_data or {} | |
| full_kg_data = { | |
| "entities": graph_data.get("entities", []), | |
| "relations": graph_data.get("relations", []), | |
| "metadata": { | |
| "entity_count": full_kg.entity_count, | |
| "relation_count": full_kg.relation_count, | |
| "window_total": full_kg.window_total, | |
| "creation_timestamp": full_kg.creation_timestamp.isoformat() if full_kg.creation_timestamp else None, | |
| "status": full_kg.status, | |
| "filename": full_kg.filename | |
| } | |
| } | |
| # Sort windowed data by window_index to ensure proper temporal order | |
| windows_data.sort(key=lambda x: x["window_index"]) | |
| response = { | |
| "trace_id": trace_id, | |
| "trace_title": trace_info["title"] or f"Trace {trace_id[:8]}", | |
| "trace_description": trace_info["description"], | |
| "total_windows": len(windows_data), | |
| "windows": windows_data, | |
| "full_kg": full_kg_data, | |
| "has_full_version": full_kg is not None | |
| } | |
| logger.info(f"Returning temporal graph data for trace {trace_id} with {len(windows_data)} windows and {'full version' if full_kg else 'no full version'}") | |
| return response | |
| except HTTPException: | |
| raise | |
| except Exception as e: | |
| logger.error(f"Error getting temporal graph data for trace {trace_id}: {str(e)}") | |
| raise HTTPException(status_code=500, detail="An internal error occurred") | |
| async def get_traces_with_windows(db: Session = Depends(get_db)): | |
| """ | |
| Get all traces that have multiple knowledge graph windows suitable for temporal visualization. | |
| Includes both traces with records and orphaned KGs (KGs with trace_id but no trace record). | |
| """ | |
| try: | |
| from sqlalchemy import func | |
| from backend.database.models import Trace, KnowledgeGraph | |
| # Query traces that have multiple knowledge graph windows (with trace records) | |
| traces_with_records = ( | |
| db.query( | |
| Trace.trace_id, | |
| Trace.title, | |
| Trace.description, | |
| Trace.upload_timestamp, | |
| func.count(KnowledgeGraph.id).label('window_count') | |
| ) | |
| .join(KnowledgeGraph, Trace.trace_id == KnowledgeGraph.trace_id) | |
| .group_by(Trace.trace_id, Trace.title, Trace.description, Trace.upload_timestamp) | |
| .having(func.count(KnowledgeGraph.id) >= 2) | |
| .order_by(Trace.upload_timestamp.desc()) | |
| .all() | |
| ) | |
| # Query orphaned KGs (KGs with trace_id but no trace record) that have multiple windows | |
| orphaned_kg_traces = ( | |
| db.query( | |
| KnowledgeGraph.trace_id, | |
| func.count(KnowledgeGraph.id).label('window_count'), | |
| func.min(KnowledgeGraph.creation_timestamp).label('earliest_creation') | |
| ) | |
| .outerjoin(Trace, KnowledgeGraph.trace_id == Trace.trace_id) | |
| .filter(Trace.trace_id.is_(None)) # No trace record exists | |
| .filter(KnowledgeGraph.trace_id.isnot(None)) # But KG has a trace_id | |
| .group_by(KnowledgeGraph.trace_id) | |
| .having(func.count(KnowledgeGraph.id) >= 2) | |
| .order_by(func.min(KnowledgeGraph.creation_timestamp).desc()) | |
| .all() | |
| ) | |
| result = [] | |
| # Add traces with records | |
| for trace_info in traces_with_records: | |
| result.append({ | |
| "trace_id": trace_info.trace_id, | |
| "title": trace_info.title or f"Trace {trace_info.trace_id[:8]}...", | |
| "description": trace_info.description, | |
| "upload_timestamp": trace_info.upload_timestamp.isoformat() if trace_info.upload_timestamp else None, | |
| "window_count": trace_info.window_count, | |
| "has_trace_record": True | |
| }) | |
| # Add orphaned KG traces | |
| for kg_trace_info in orphaned_kg_traces: | |
| result.append({ | |
| "trace_id": kg_trace_info.trace_id, | |
| "title": f"Trace {kg_trace_info.trace_id[:8]}...", | |
| "description": "Knowledge graphs exist but no trace record found", | |
| "upload_timestamp": kg_trace_info.earliest_creation.isoformat() if kg_trace_info.earliest_creation else None, | |
| "window_count": kg_trace_info.window_count, | |
| "has_trace_record": False | |
| }) | |
| # Sort by upload/creation timestamp | |
| result.sort(key=lambda x: x["upload_timestamp"] or "1970-01-01", reverse=True) | |
| logger.info(f"Found {len(traces_with_records)} traces with records and {len(orphaned_kg_traces)} orphaned KG traces") | |
| return {"traces": result} | |
| except Exception as e: | |
| logger.error(f"Error getting traces with windows: {str(e)}") | |
| raise HTTPException(status_code=500, detail="An internal error occurred") | |
| async def get_processing_runs_for_trace(trace_id: str, db: Session = Depends(get_db)): | |
| """ | |
| Get all processing runs for a specific trace. | |
| Returns information about each processing run including run ID, timestamp, and window count. | |
| """ | |
| try: | |
| from sqlalchemy import func | |
| from backend.database.models import KnowledgeGraph | |
| # Query all processing runs for this trace | |
| processing_runs = ( | |
| db.query( | |
| KnowledgeGraph.processing_run_id, | |
| func.count(KnowledgeGraph.id).label('kg_count'), | |
| func.min(KnowledgeGraph.creation_timestamp).label('earliest_creation'), | |
| func.max(KnowledgeGraph.creation_timestamp).label('latest_creation'), | |
| func.sum(KnowledgeGraph.entity_count).label('total_entities'), | |
| func.sum(KnowledgeGraph.relation_count).label('total_relations') | |
| ) | |
| .filter(KnowledgeGraph.trace_id == trace_id) | |
| .filter(KnowledgeGraph.processing_run_id.isnot(None)) | |
| .group_by(KnowledgeGraph.processing_run_id) | |
| .order_by(func.min(KnowledgeGraph.creation_timestamp).desc()) | |
| .all() | |
| ) | |
| # Also check for KGs without processing_run_id (legacy) | |
| legacy_kgs = ( | |
| db.query(func.count(KnowledgeGraph.id).label('kg_count')) | |
| .filter(KnowledgeGraph.trace_id == trace_id) | |
| .filter(KnowledgeGraph.processing_run_id.is_(None)) | |
| .first() | |
| ) | |
| result = [] | |
| # Add processing runs | |
| for run_info in processing_runs: | |
| result.append({ | |
| "processing_run_id": run_info.processing_run_id, | |
| "kg_count": run_info.kg_count, | |
| "earliest_creation": run_info.earliest_creation.isoformat() if run_info.earliest_creation else None, | |
| "latest_creation": run_info.latest_creation.isoformat() if run_info.latest_creation else None, | |
| "total_entities": run_info.total_entities or 0, | |
| "total_relations": run_info.total_relations or 0, | |
| "is_legacy": False | |
| }) | |
| # Add legacy KGs if any exist | |
| if legacy_kgs and legacy_kgs.kg_count > 0: | |
| result.append({ | |
| "processing_run_id": None, | |
| "kg_count": legacy_kgs.kg_count, | |
| "earliest_creation": None, | |
| "latest_creation": None, | |
| "total_entities": 0, | |
| "total_relations": 0, | |
| "is_legacy": True | |
| }) | |
| logger.info(f"Found {len(processing_runs)} processing runs for trace {trace_id}") | |
| return {"processing_runs": result} | |
| except Exception as e: | |
| logger.error(f"Error getting processing runs for trace {trace_id}: {str(e)}") | |
| raise HTTPException(status_code=500, detail="An internal error occurred") |