Spaces:
Running
Running
File size: 11,765 Bytes
c2ea5ed bcbd2ec c2ea5ed bcbd2ec c2ea5ed bcbd2ec |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 |
"""
Temporal Graph API Router
Provides endpoints for temporal force-directed graph visualization.
"""
import logging
from typing import List, Dict, Any
from fastapi import APIRouter, Depends, HTTPException
from sqlalchemy.orm import Session
from backend.dependencies import get_db
from backend.database.utils import get_temporal_windows_by_trace_id, get_trace
from backend.database.models import KnowledgeGraph
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/api", tags=["temporal-graphs"])
@router.get("/temporal-graph/{trace_id}")
async def get_temporal_graph_data(trace_id: str, processing_run_id: str = None, db: Session = Depends(get_db)):
"""
Get temporal graph data for a specific trace.
Returns all knowledge graph windows in sequence for animation, plus the full/merged version.
Args:
trace_id: ID of the trace
processing_run_id: Optional processing run ID to filter by specific run
"""
try:
logger.info(f"Attempting to load temporal graph for trace_id: {trace_id}")
if processing_run_id:
logger.info(f"Filtering by processing_run_id: {processing_run_id}")
# Get temporal windows data (includes windowed KGs and full KG)
temporal_data = get_temporal_windows_by_trace_id(db, trace_id, processing_run_id)
if not temporal_data["trace_info"]:
logger.warning(f"Trace with ID {trace_id} not found in database")
raise HTTPException(status_code=404, detail=f"Trace with ID {trace_id} not found")
windows = temporal_data["windows"]
full_kg = temporal_data["full_kg"]
trace_info = temporal_data["trace_info"]
logger.info(f"Found trace: {trace_info['trace_id']} (title: {trace_info['title']})")
logger.info(f"Found {len(windows)} windows and {'1' if full_kg else '0'} full KG for trace {trace_id}")
if not windows:
raise HTTPException(
status_code=404,
detail=f"No knowledge graph windows found for trace {trace_id}" +
(f" with processing_run_id {processing_run_id}" if processing_run_id else "")
)
# Check if we have enough windows for temporal visualization
if len(windows) < 2:
raise HTTPException(
status_code=400,
detail=f"Trace {trace_id} has insufficient windows for temporal visualization (found {len(windows)}, need at least 2)"
)
# Build response data for windowed KGs
windows_data = []
for window in windows:
graph_data = window.graph_data or {}
window_info = {
"window_index": window.window_index,
"entities": graph_data.get("entities", []),
"relations": graph_data.get("relations", []),
"metadata": {
"entity_count": window.entity_count,
"relation_count": window.relation_count,
"window_start_char": window.window_start_char,
"window_end_char": window.window_end_char,
"creation_timestamp": window.creation_timestamp.isoformat() if window.creation_timestamp else None,
"status": window.status,
"filename": window.filename
}
}
windows_data.append(window_info)
# Build response data for full KG (if available)
full_kg_data = None
if full_kg:
graph_data = full_kg.graph_data or {}
full_kg_data = {
"entities": graph_data.get("entities", []),
"relations": graph_data.get("relations", []),
"metadata": {
"entity_count": full_kg.entity_count,
"relation_count": full_kg.relation_count,
"window_total": full_kg.window_total,
"creation_timestamp": full_kg.creation_timestamp.isoformat() if full_kg.creation_timestamp else None,
"status": full_kg.status,
"filename": full_kg.filename
}
}
# Sort windowed data by window_index to ensure proper temporal order
windows_data.sort(key=lambda x: x["window_index"])
response = {
"trace_id": trace_id,
"trace_title": trace_info["title"] or f"Trace {trace_id[:8]}",
"trace_description": trace_info["description"],
"total_windows": len(windows_data),
"windows": windows_data,
"full_kg": full_kg_data,
"has_full_version": full_kg is not None
}
logger.info(f"Returning temporal graph data for trace {trace_id} with {len(windows_data)} windows and {'full version' if full_kg else 'no full version'}")
return response
except HTTPException:
raise
except Exception as e:
logger.error(f"Error getting temporal graph data for trace {trace_id}: {str(e)}")
raise HTTPException(status_code=500, detail="An internal error occurred")
@router.get("/traces-with-windows")
async def get_traces_with_windows(db: Session = Depends(get_db)):
"""
Get all traces that have multiple knowledge graph windows suitable for temporal visualization.
Includes both traces with records and orphaned KGs (KGs with trace_id but no trace record).
"""
try:
from sqlalchemy import func
from backend.database.models import Trace, KnowledgeGraph
# Query traces that have multiple knowledge graph windows (with trace records)
traces_with_records = (
db.query(
Trace.trace_id,
Trace.title,
Trace.description,
Trace.upload_timestamp,
func.count(KnowledgeGraph.id).label('window_count')
)
.join(KnowledgeGraph, Trace.trace_id == KnowledgeGraph.trace_id)
.group_by(Trace.trace_id, Trace.title, Trace.description, Trace.upload_timestamp)
.having(func.count(KnowledgeGraph.id) >= 2)
.order_by(Trace.upload_timestamp.desc())
.all()
)
# Query orphaned KGs (KGs with trace_id but no trace record) that have multiple windows
orphaned_kg_traces = (
db.query(
KnowledgeGraph.trace_id,
func.count(KnowledgeGraph.id).label('window_count'),
func.min(KnowledgeGraph.creation_timestamp).label('earliest_creation')
)
.outerjoin(Trace, KnowledgeGraph.trace_id == Trace.trace_id)
.filter(Trace.trace_id.is_(None)) # No trace record exists
.filter(KnowledgeGraph.trace_id.isnot(None)) # But KG has a trace_id
.group_by(KnowledgeGraph.trace_id)
.having(func.count(KnowledgeGraph.id) >= 2)
.order_by(func.min(KnowledgeGraph.creation_timestamp).desc())
.all()
)
result = []
# Add traces with records
for trace_info in traces_with_records:
result.append({
"trace_id": trace_info.trace_id,
"title": trace_info.title or f"Trace {trace_info.trace_id[:8]}...",
"description": trace_info.description,
"upload_timestamp": trace_info.upload_timestamp.isoformat() if trace_info.upload_timestamp else None,
"window_count": trace_info.window_count,
"has_trace_record": True
})
# Add orphaned KG traces
for kg_trace_info in orphaned_kg_traces:
result.append({
"trace_id": kg_trace_info.trace_id,
"title": f"Trace {kg_trace_info.trace_id[:8]}...",
"description": "Knowledge graphs exist but no trace record found",
"upload_timestamp": kg_trace_info.earliest_creation.isoformat() if kg_trace_info.earliest_creation else None,
"window_count": kg_trace_info.window_count,
"has_trace_record": False
})
# Sort by upload/creation timestamp
result.sort(key=lambda x: x["upload_timestamp"] or "1970-01-01", reverse=True)
logger.info(f"Found {len(traces_with_records)} traces with records and {len(orphaned_kg_traces)} orphaned KG traces")
return {"traces": result}
except Exception as e:
logger.error(f"Error getting traces with windows: {str(e)}")
raise HTTPException(status_code=500, detail="An internal error occurred")
@router.get("/trace/{trace_id}/processing-runs")
async def get_processing_runs_for_trace(trace_id: str, db: Session = Depends(get_db)):
"""
Get all processing runs for a specific trace.
Returns information about each processing run including run ID, timestamp, and window count.
"""
try:
from sqlalchemy import func
from backend.database.models import KnowledgeGraph
# Query all processing runs for this trace
processing_runs = (
db.query(
KnowledgeGraph.processing_run_id,
func.count(KnowledgeGraph.id).label('kg_count'),
func.min(KnowledgeGraph.creation_timestamp).label('earliest_creation'),
func.max(KnowledgeGraph.creation_timestamp).label('latest_creation'),
func.sum(KnowledgeGraph.entity_count).label('total_entities'),
func.sum(KnowledgeGraph.relation_count).label('total_relations')
)
.filter(KnowledgeGraph.trace_id == trace_id)
.filter(KnowledgeGraph.processing_run_id.isnot(None))
.group_by(KnowledgeGraph.processing_run_id)
.order_by(func.min(KnowledgeGraph.creation_timestamp).desc())
.all()
)
# Also check for KGs without processing_run_id (legacy)
legacy_kgs = (
db.query(func.count(KnowledgeGraph.id).label('kg_count'))
.filter(KnowledgeGraph.trace_id == trace_id)
.filter(KnowledgeGraph.processing_run_id.is_(None))
.first()
)
result = []
# Add processing runs
for run_info in processing_runs:
result.append({
"processing_run_id": run_info.processing_run_id,
"kg_count": run_info.kg_count,
"earliest_creation": run_info.earliest_creation.isoformat() if run_info.earliest_creation else None,
"latest_creation": run_info.latest_creation.isoformat() if run_info.latest_creation else None,
"total_entities": run_info.total_entities or 0,
"total_relations": run_info.total_relations or 0,
"is_legacy": False
})
# Add legacy KGs if any exist
if legacy_kgs and legacy_kgs.kg_count > 0:
result.append({
"processing_run_id": None,
"kg_count": legacy_kgs.kg_count,
"earliest_creation": None,
"latest_creation": None,
"total_entities": 0,
"total_relations": 0,
"is_legacy": True
})
logger.info(f"Found {len(processing_runs)} processing runs for trace {trace_id}")
return {"processing_runs": result}
except Exception as e:
logger.error(f"Error getting processing runs for trace {trace_id}: {str(e)}")
raise HTTPException(status_code=500, detail="An internal error occurred") |