""" AI Observability Platform Integration Router Handles connections to external AI observability platforms like Langfuse and LangSmith. Provides endpoints for: - Platform connection management - Trace fetching and importing - Automated synchronization """ import base64 import gc import json import logging import time import uuid from datetime import datetime from typing import Dict, List, Optional, cast import psutil from utils.environment import get_environment_info, debug_environment import requests from fastapi import APIRouter, Depends, HTTPException, Request from fastapi.responses import JSONResponse from langsmith import Client as LangsmithClient from pydantic import BaseModel from sqlalchemy import Column from sqlalchemy.orm import Session from sqlalchemy.orm.attributes import flag_modified from agentgraph.input.text_processing.trace_preprocessor import filter_langfuse_session, filter_langsmith_trace from backend.database import get_db from backend.database.models import FetchedTrace, ObservabilityConnection from backend.database.utils import save_trace from backend.routers.observe_models import LangFuseSession, LangSmithRun, LangSmithTrace from backend.services.platform.langfuse_downloader import LangfuseDownloader from backend.services.task_store_service import task_store logger = logging.getLogger("agent_monitoring_server.routers.observability") router = APIRouter(prefix="/api/observability", tags=["observability"]) def truncate_long_strings(obj, max_string_length=500): """ Recursively process JSON object to truncate very long strings No depth limit - all keys and array items are preserved Only truncates string values that are too long """ if isinstance(obj, dict): truncated = {} # Process ALL keys, no limit on key count or depth for key, value in obj.items(): truncated[key] = truncate_long_strings(value, max_string_length) return truncated elif isinstance(obj, list): truncated = [] # Process ALL items, no limit on item count or depth for item in obj: truncated.append(truncate_long_strings(item, max_string_length)) return truncated elif isinstance(obj, str) and len(obj) > max_string_length: return f"{obj[:max_string_length]}...({len(obj)} chars)" return obj # Helper Functions for Common Operations def get_langfuse_projects(public_key: str, secret_key: str, host: Optional[str]) -> List[Dict]: """Fetch projects from Langfuse API""" try: # Create Basic Auth header auth_string = f"{public_key}:{secret_key}" auth_bytes = auth_string.encode('ascii') auth_b64 = base64.b64encode(auth_bytes).decode('ascii') headers = { 'Authorization': f'Basic {auth_b64}', 'Content-Type': 'application/json' } # Get projects from Langfuse API host_url = host or "https://cloud.langfuse.com" projects_url = f"{host_url}/api/public/projects" response = requests.get(projects_url, headers=headers, timeout=10) response.raise_for_status() projects_data = response.json() projects_info = [] # Extract project information if 'data' in projects_data: for project in projects_data['data']: project_info = { "id": project.get('id', ''), "name": project.get('name', ''), "description": project.get('description', ''), "created_at": project.get('createdAt', None) } projects_info.append(project_info) if not projects_info: # Fallback to default project if no projects found projects_info = [{"name": "Default", "id": "default", "description": "Langfuse workspace"}] logger.info(f"Successfully fetched {len(projects_info)} Langfuse projects") return projects_info except Exception as e: logger.warning(f"Failed to fetch Langfuse projects: {str(e)}, using default project") return [{"name": "Default", "id": "default", "description": "Langfuse workspace"}] def get_langsmith_projects(api_key: str) -> List[Dict]: """Fetch projects from LangSmith API""" try: client = LangsmithClient(api_key=api_key) projects = list(client.list_projects()) logger.info(f"Successfully fetched {len(projects)} LangSmith projects") # Extract project information projects_info = [] for project in projects: project_info = { "id": str(project.id), "name": project.name, "description": getattr(project, 'description', ''), "created_at": getattr(project, 'created_at', None) } projects_info.append(project_info) return projects_info except Exception as e: logger.error(f"Failed to fetch LangSmith projects: {str(e)}") raise def test_langfuse_connection(public_key: str, secret_key: str, host: Optional[str]) -> bool: """Test Langfuse connection by fetching traces""" try: downloader = LangfuseDownloader( secret_key=secret_key, public_key=public_key, host=host or "https://cloud.langfuse.com" ) # Test connection by fetching a small number of traces test_traces = downloader.download_recent_traces(limit=1) logger.info(f"Successfully tested Langfuse connection, found {len(test_traces)} traces") return True except Exception as e: logger.error(f"Failed to connect to Langfuse: {str(e)}") raise HTTPException(status_code=400, detail="Failed to connect to Langfuse") from e def test_langsmith_connection(api_key: str) -> bool: """Test LangSmith connection by listing projects""" try: client = LangsmithClient(api_key=api_key) projects = list(client.list_projects()) logger.info(f"Successfully tested LangSmith connection, found {len(projects)} projects") return True except Exception as e: logger.error(f"Failed to connect to LangSmith: {str(e)}") raise HTTPException(status_code=400, detail="Failed to connect to LangSmith") from e def get_connection_projects(platform: str, public_key: str, secret_key: str, host: Optional[str]) -> List[Dict]: """Get projects for a platform connection""" platform = platform.lower() if platform == "langfuse": test_langfuse_connection(public_key, secret_key, host) return get_langfuse_projects(public_key, secret_key, host) elif platform == "langsmith": if not public_key: raise HTTPException(status_code=400, detail="LangSmith requires an API token") test_langsmith_connection(public_key) return get_langsmith_projects(public_key) else: raise HTTPException(status_code=400, detail=f"Unsupported platform: {platform}") def get_last_fetch_time(db: Session, connection_id: str, platform: str, project_name: Optional[str] = None) -> Optional[datetime]: """Get last fetch time for a connection and optionally a specific project""" query = db.query(FetchedTrace).filter( FetchedTrace.connection_id == connection_id, FetchedTrace.platform == platform ) if project_name: query = query.filter(FetchedTrace.project_name == project_name) last_trace = query.order_by(FetchedTrace.fetched_at.desc()).first() return cast(datetime, last_trace.fetched_at) if last_trace else None def create_fetched_trace(trace_id: str, name: str, platform: str, connection_id: str, data: Dict, project_name: Optional[str] = None) -> FetchedTrace: """Create a FetchedTrace object""" return FetchedTrace( trace_id=trace_id, name=name, platform=platform, connection_id=connection_id, project_name=project_name, data=data ) def fetch_langfuse_sessions(connection: ObservabilityConnection, db: Session, project_name: str, limit: int = 50) -> List[Dict]: """Fetch sessions from Langfuse""" downloader = LangfuseDownloader( secret_key=cast(str, connection.secret_key), public_key=cast(str, connection.public_key), host=cast(str, connection.host) ) # Get last fetched time for this specific project from_timestamp = get_last_fetch_time(db, cast(str, connection.connection_id), "langfuse", project_name) if from_timestamp: logger.info(f"Fetching sessions for project {project_name} from {from_timestamp} onwards") else: logger.info(f"No previous fetches found for project {project_name}, fetching all sessions") # List sessions to get session IDs if from_timestamp: sessions_response = downloader.client.api.sessions.list( limit=limit, from_timestamp=from_timestamp ) else: sessions_response = downloader.client.api.sessions.list(limit=limit) # Handle different response formats if hasattr(sessions_response, 'data'): sessions = [downloader._convert_to_dict(session) for session in sessions_response.data] else: sessions = [downloader._convert_to_dict(session) for session in sessions_response] logger.info(f"Found {len(sessions)} sessions") # Store each session as a fetched trace for session in sessions: session_id = session['id'] # Check if session already exists existing_session = db.query(FetchedTrace).filter( FetchedTrace.trace_id == session_id, FetchedTrace.connection_id == connection.connection_id ).first() if not existing_session: try: traces_response = downloader.client.api.trace.list(session_id=session_id) if hasattr(traces_response, 'data'): session_traces = [downloader._convert_to_dict(trace) for trace in traces_response.data] else: session_traces = [downloader._convert_to_dict(trace) for trace in traces_response] # Get detailed trace data for each trace detailed_traces = [] for i, trace_summary in enumerate(session_traces): trace_id = trace_summary['id'] if i > 0: time.sleep(1) detailed_trace = downloader.client.api.trace.get(trace_id) trace_data = downloader._convert_to_dict(detailed_trace) detailed_traces.append(trace_data) logger.info(f"Downloaded detailed trace: {trace_id} ({i+1}/{len(session_traces)})") # Create session data - correct LangFuseSession format session_data = LangFuseSession( session_id=session_id, session_name=session_id, project_name=project_name, export_timestamp=datetime.now().isoformat(), total_traces=len(detailed_traces), traces=detailed_traces ) # Convert to JSON-serializable format data_json = session_data.model_dump() fetched_trace = create_fetched_trace( trace_id=session_id, name=session_id, platform="langfuse", connection_id=cast(str, connection.connection_id), data=data_json, project_name=project_name ) db.add(fetched_trace) logger.info(f"Stored session {session_id} with {len(detailed_traces)} traces") except Exception as e: logger.error(f"Error processing session {session_id}: {e}") continue db.commit() logger.info(f"Fetched {len(sessions)} sessions from Langfuse") return sessions def fetch_langsmith_traces(connection: ObservabilityConnection, db: Session, project_name: str, limit: int = 50) -> List[Dict]: """Fetch traces from LangSmith""" try: client = LangsmithClient(api_key=cast(str, connection.public_key)) logger.info("Connected to LangSmith successfully") # Get all projects try: projects = list(client.list_projects()) logger.info(f"Found {len(projects)} projects") except Exception as e: logger.error(f"Error listing projects: {e}") raise HTTPException(status_code=500, detail="An internal error occurred while listing projects") from e # Export runs from specific project only all_traces = [] total_limit = limit # Get existing trace IDs to avoid duplicates existing_traces = db.query(FetchedTrace).filter( FetchedTrace.connection_id == connection.connection_id, FetchedTrace.platform == "langsmith", FetchedTrace.project_name == project_name ).all() existing_trace_ids = {cast(str, trace.trace_id) for trace in existing_traces} logger.info(f"Exporting specific project: {project_name}") # Get last fetched time for this specific project project_from_timestamp = get_last_fetch_time( db, cast(str, connection.connection_id), "langsmith", project_name ) if project_from_timestamp: logger.info(f"Fetching {project_name} runs from {project_from_timestamp} onwards") else: logger.info(f"No previous fetches found for {project_name}, fetching all runs") # Get all runs (top-level runs only) - same as langsmith_exporter.py list_runs_kwargs = { "project_name": project_name, "is_root": True, "limit": limit } # Add start_time filter if we have a project-specific timestamp if project_from_timestamp: list_runs_kwargs["start_time"] = project_from_timestamp runs = list(client.list_runs(**list_runs_kwargs)) logger.info(f"Found {len(runs)} runs in project {project_name}") # Process runs in batch new_traces_to_add = [] for run in runs: run_name = getattr(run, 'name', 'unnamed') run_id = str(run.id) unique_trace_id = f"{run_name}_{run_id}" # Skip if already exists if unique_trace_id in existing_trace_ids: logger.debug(f"Skipping existing trace: {unique_trace_id}") continue # Get all traces for this run (including nested children) - same as langsmith_exporter.py all_runs: List[LangSmithRun] = [] try: # Get the root run and all its children trace_runs = client.list_runs(project_name=project_name, trace_id=run.trace_id) run_list = list(trace_runs) # Sort traces by start_time descending (latest first) sorted_runs = sorted(run_list, key=lambda t: getattr(t, 'start_time', None) or datetime.min) for run_item in sorted_runs: run_data = run_item.dict() if hasattr(run_item, 'dict') else dict(run_item) all_runs.append(LangSmithRun(**run_data)) except Exception as e: logger.warning(f"Could not get child traces for run {run_id}: {e}") # Fallback to just the main run run_data = run.dict() if hasattr(run, 'dict') else dict(run) all_runs = [LangSmithRun(**run_data)] # Create run export structure - same format as langsmith_exporter.py run_export = LangSmithTrace( trace_id=run_id, trace_name=run_name, project_name=project_name, export_time=datetime.now().isoformat(), total_runs=len(all_runs), runs=all_runs ) # Prepare for batch database insert try: clean_data = run_export.model_dump() fetched_trace = create_fetched_trace( trace_id=unique_trace_id, name=f"{run_name}_{run_id[:8]}", platform="langsmith", connection_id=cast(str, connection.connection_id), data=clean_data, project_name=project_name ) new_traces_to_add.append(fetched_trace) all_traces.append(clean_data) existing_trace_ids.add(unique_trace_id) except Exception as e: logger.error(f"Error preparing trace {unique_trace_id}: {e}") continue # Stop if we've reached the limit if len(all_traces) >= total_limit: break # Batch insert new traces if new_traces_to_add: db.add_all(new_traces_to_add) logger.info(f"Added {len(new_traces_to_add)} new traces from project {project_name}") # Single commit for all operations db.commit() logger.info(f"Fetched {len(all_traces)} traces from LangSmith") return all_traces except Exception as e: logger.error(f"Error fetching LangSmith traces: {str(e)}") raise HTTPException(status_code=500, detail="An internal error occurred while fetching traces") from e # Request/Response Models class ConnectionRequest(BaseModel): platform: str publicKey: str secretKey: str host: Optional[str] = None class ConnectionResponse(BaseModel): status: str message: str connection_id: str class TraceFetchRequest(BaseModel): limit: int = 50 start_date: Optional[str] = None end_date: Optional[str] = None project_name: str class PreprocessingOptions(BaseModel): """Preprocessing options for trace filtering""" max_char: Optional[int] = 1000 topk: int = 10 raw: bool = False hierarchy: bool = False replace: bool = False truncate_enabled: bool = False class TraceImportRequest(BaseModel): trace_ids: List[str] preprocessing: Optional[PreprocessingOptions] = PreprocessingOptions() @router.post("/connect", response_model=ConnectionResponse) async def connect_platform(request: ConnectionRequest, db: Session = Depends(get_db)): # noqa: B008 """Connect to an AI observability platform""" try: platform = request.platform.lower() public_key = request.publicKey secret_key = request.secretKey # Get projects and test connection projects_info = get_connection_projects(platform, public_key, secret_key, request.host) # Store connection info in database connection_id = str(uuid.uuid4()) db_connection = ObservabilityConnection( connection_id=connection_id, platform=platform, public_key=public_key, secret_key=secret_key, host=request.host, projects=projects_info, status="connected" ) db.add(db_connection) db.commit() db.refresh(db_connection) logger.info(f"Successfully connected to {platform} with connection ID: {connection_id}") return ConnectionResponse( status="success", message=f"Successfully connected to {platform.title()}", connection_id=connection_id ) except HTTPException: raise except Exception as e: logger.error(f"Unexpected error connecting to platform: {str(e)}") raise HTTPException(status_code=500, detail="Internal server error") from e @router.get("/connections") async def get_connections(db: Session = Depends(get_db)): # noqa: B008 """Get all active platform connections""" connections = db.query(ObservabilityConnection).all() return {"connections": [conn.to_dict() for conn in connections]} @router.put("/connections/{connection_id}") async def update_connection( connection_id: str, request: ConnectionRequest, db: Session = Depends(get_db) # noqa: B008 ): """Update an existing platform connection""" try: # Find existing connection connection = db.query(ObservabilityConnection).filter( ObservabilityConnection.connection_id == connection_id ).first() if not connection: raise HTTPException(status_code=404, detail="Connection not found") platform = request.platform.lower() public_key = request.publicKey secret_key = request.secretKey # Test connection and get projects projects_info = get_connection_projects(platform, public_key, secret_key, request.host) # Update connection in database connection.public_key = cast(Column[str], public_key) connection.secret_key = cast(Column[str], secret_key) connection.host = cast(Column[str], request.host) connection.projects = cast(Column[List[Dict]], projects_info) connection.status = cast(Column[str], "connected") db.commit() db.refresh(connection) logger.info(f"Successfully updated {platform} connection: {connection_id}") return { "status": "success", "message": f"Successfully updated {platform.title()} connection" } except HTTPException: raise except Exception as e: logger.error(f"Unexpected error updating connection: {str(e)}") raise HTTPException(status_code=500, detail="Internal server error") from e @router.delete("/connections/{connection_id}") async def disconnect_platform(connection_id: str, db: Session = Depends(get_db)): # noqa: B008 """Disconnect from a platform""" connection = db.query(ObservabilityConnection).filter( ObservabilityConnection.connection_id == connection_id ).first() if not connection: raise HTTPException(status_code=404, detail="Connection not found") platform = connection.platform # Delete all fetched traces for this connection fetched_traces = db.query(FetchedTrace).filter( FetchedTrace.connection_id == connection_id ).all() deleted_traces_count = len(fetched_traces) # Delete fetched traces for fetched_trace in fetched_traces: db.delete(fetched_trace) # Remove connection from database db.delete(connection) db.commit() logger.info(f"Disconnected from {platform} (connection ID: {connection_id})") logger.info(f"Deleted {deleted_traces_count} fetched traces for connection {connection_id}") return { "status": "success", "message": f"Disconnected from {platform.title()}", "deleted_fetched_traces": deleted_traces_count, "disconnected_at": datetime.now().isoformat() } # Connection-specific routes (required by frontend) @router.get("/connections/{connection_id}/fetched-traces") async def get_fetched_traces_by_connection(connection_id: str, db: Session = Depends(get_db)): # noqa: B008 """Get all fetched traces for a specific connection""" # Get connection connection = db.query(ObservabilityConnection).filter( ObservabilityConnection.connection_id == connection_id, ObservabilityConnection.status == "connected" ).first() if not connection: raise HTTPException(status_code=404, detail=f"No active connection found with ID {connection_id}") # Get all fetched traces for this connection fetched_traces = db.query(FetchedTrace).filter( FetchedTrace.connection_id == connection_id ).order_by(FetchedTrace.fetched_at.desc()).all() return { "traces": [trace.to_dict() for trace in fetched_traces], "total": len(fetched_traces), "platform": connection.platform } @router.post("/connections/{connection_id}/fetch") async def fetch_traces_by_connection( connection_id: str, request: TraceFetchRequest, db: Session = Depends(get_db) # noqa: B008 ): """Fetch traces from a specific connection""" # Get connection connection = db.query(ObservabilityConnection).filter( ObservabilityConnection.connection_id == connection_id, ObservabilityConnection.status == "connected" ).first() if not connection: raise HTTPException(status_code=404, detail=f"No active connection found with ID {connection_id}") try: import asyncio # Run blocking operations in executor to avoid blocking the event loop loop = asyncio.get_event_loop() def sync_fetch(): # Create new db session for the thread from backend.database import get_db thread_db = next(get_db()) try: project_name = request.project_name if cast(str, connection.platform) == "langfuse": traces = fetch_langfuse_sessions(connection, thread_db, project_name, request.limit) elif cast(str, connection.platform) == "langsmith": traces = fetch_langsmith_traces(connection, thread_db, project_name, request.limit) else: raise HTTPException(status_code=400, detail=f"Unsupported platform: {connection.platform}") # Update last sync time connection.last_sync = cast(Column[datetime], datetime.now()) thread_db.commit() return traces finally: thread_db.close() # Execute in thread pool to avoid blocking traces = await loop.run_in_executor(None, sync_fetch) return { "status": "success", "message": f"Successfully fetched {len(traces)} traces from {connection.platform}", "platform": connection.platform, "traces_count": len(traces), "completed_at": datetime.now().isoformat() } except Exception as e: logger.error(f"Failed to fetch traces from connection {connection_id}: {str(e)}") raise HTTPException(status_code=500, detail="An internal error occurred while fetching traces") from e @router.post("/connections/{connection_id}/import") async def import_traces_by_connection( connection_id: str, request: TraceImportRequest, db: Session = Depends(get_db) # noqa: B008 ): """Import specific traces from a connection to local database""" # Get connection connection = db.query(ObservabilityConnection).filter( ObservabilityConnection.connection_id == connection_id, ObservabilityConnection.status == "connected" ).first() if not connection: raise HTTPException(status_code=404, detail=f"No active connection found with ID {connection_id}") try: imported_count = 0 errors = [] for trace_id in request.trace_ids: try: # Get trace from fetched_traces table trace = db.query(FetchedTrace).filter( FetchedTrace.trace_id == trace_id, FetchedTrace.connection_id == connection_id ).first() if not trace: errors.append(f"Trace {trace_id} not found in fetched traces for connection {connection_id}") continue # Process based on platform preprocessing_opts = request.preprocessing or PreprocessingOptions() if cast(str, connection.platform) == "langfuse": trace_data = trace.get_full_data()["data"] filtered_trace = filter_langfuse_session( LangFuseSession(**trace_data), max_char=preprocessing_opts.max_char, topk=preprocessing_opts.topk, raw=preprocessing_opts.raw, hierarchy=preprocessing_opts.hierarchy, replace=preprocessing_opts.replace ) processed_trace = save_trace( session=db, content=json.dumps(filtered_trace, indent=2, default=str), filename=f"langfuse_trace_{trace_id}", title=f"Langfuse trace {trace_id}", description=f"Imported from Langfuse on {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}", trace_type="langfuse", trace_source="langfuse", tags=["imported", "langfuse"] ) if processed_trace: imported_count += 1 logger.info(f"Successfully imported Langfuse trace {trace_id} as {processed_trace.trace_id}") # Run trace characteristics analysis to generate statistics try: from agentgraph.input.trace_management import analyze_trace_characteristics raw_content_for_analysis = json.dumps(filtered_trace, indent=2, default=str) trace_analysis = analyze_trace_characteristics(raw_content_for_analysis, optimize_content=False) # Update trace metadata with analysis results if processed_trace.trace_metadata: processed_trace.trace_metadata.update(trace_analysis) else: processed_trace.trace_metadata = trace_analysis flag_modified(processed_trace, "trace_metadata") db.commit() logger.info(f"Added trace characteristics analysis to imported trace {processed_trace.trace_id}") except Exception as e: logger.warning(f"Failed to analyze trace characteristics for imported trace {processed_trace.trace_id}: {str(e)}") # Auto-generate context documents using universal parser try: from backend.services.universal_parser_service import auto_generate_context_documents raw_content = json.dumps(filtered_trace, indent=2, default=str) created_docs = auto_generate_context_documents(cast(str, processed_trace.trace_id), raw_content, db) if created_docs: logger.info(f"Auto-generated {len(created_docs)} context documents for processed trace {processed_trace.trace_id}") except Exception as e: logger.warning(f"Failed to auto-generate context documents for processed trace {processed_trace.trace_id}: {str(e)}") elif cast(str, connection.platform) == "langsmith": langsmith_export = trace.get_full_data()["data"] filtered_export = filter_langsmith_trace( LangSmithTrace(**langsmith_export), max_char=preprocessing_opts.max_char, topk=preprocessing_opts.topk, raw=preprocessing_opts.raw, hierarchy=preprocessing_opts.hierarchy, replace=preprocessing_opts.replace ) processed_trace = save_trace( session=db, content=json.dumps(filtered_export, indent=2, default=str), filename=f"langsmith_trace_{trace_id}", title=f"LangSmith trace {trace_id}", description=f"Imported from LangSmith on {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}", trace_type="langsmith", trace_source="langsmith", tags=["imported", "langsmith"] ) if processed_trace: imported_count += 1 logger.info(f"Successfully imported LangSmith trace {trace_id} as {processed_trace.trace_id}") # Run trace characteristics analysis to generate statistics try: from agentgraph.input.trace_management import analyze_trace_characteristics # Use the original langsmith_export for better analysis results raw_content_for_analysis = json.dumps(langsmith_export, indent=2, default=str) trace_analysis = analyze_trace_characteristics(raw_content_for_analysis, optimize_content=False) # Update trace metadata with analysis results if processed_trace.trace_metadata: processed_trace.trace_metadata.update(trace_analysis) else: processed_trace.trace_metadata = trace_analysis flag_modified(processed_trace, "trace_metadata") db.commit() logger.info(f"Added trace characteristics analysis to imported trace {processed_trace.trace_id}") except Exception as e: logger.warning(f"Failed to analyze trace characteristics for imported trace {processed_trace.trace_id}: {str(e)}") # Auto-generate context documents using universal parser (use raw content, not processed) try: from backend.services.universal_parser_service import auto_generate_context_documents # Use the original langsmith_export for better parsing results raw_content = json.dumps(langsmith_export, indent=2, default=str) created_docs = auto_generate_context_documents(cast(str, processed_trace.trace_id), raw_content, db) if created_docs: logger.info(f"Auto-generated {len(created_docs)} context documents for processed trace {processed_trace.trace_id}") except Exception as e: logger.warning(f"Failed to auto-generate context documents for processed trace {processed_trace.trace_id}: {str(e)}") except Exception as e: error_msg = f"Failed to import trace {trace_id}: {str(e)}" errors.append(error_msg) logger.error(error_msg) # Update last sync time connection.last_sync = cast(Column[datetime], datetime.now()) db.commit() return { "imported": imported_count, "errors": errors, "platform": connection.platform, "import_completed_at": datetime.now().isoformat() } except Exception as e: logger.error(f"Failed to import traces from connection {connection_id}: {str(e)}") raise HTTPException(status_code=500, detail="An internal error occurred while importing traces") from e @router.get("/traces/{trace_id}/download") async def download_trace_by_id(trace_id: str, db: Session = Depends(get_db)): # noqa: B008 """Download full trace data by trace ID (platform-agnostic)""" trace = db.query(FetchedTrace).filter( FetchedTrace.trace_id == trace_id ).first() if not trace: raise HTTPException(status_code=404, detail="Trace not found") return trace.get_full_data() @router.get("/resource-usage") async def get_resource_usage(): """Get resource usage information for the current process.""" try: cpu_usage = psutil.cpu_percent() memory_usage = psutil.virtual_memory().percent return {"cpu_usage": cpu_usage, "memory_usage": memory_usage} except Exception as e: logger.error(f"Error retrieving resource usage: {str(e)}") raise HTTPException(status_code=500, detail="An internal error occurred while retrieving resource usage") from e @router.post("/clean-up") async def clean_up(session: Session = Depends(get_db)): # noqa: B008 """Clean up resources by closing database connections and freeing up memory.""" try: session.close() gc.collect() return {"success": True, "message": "Resources cleaned up successfully"} except Exception as e: logger.error(f"Error cleaning up resources: {str(e)}") raise HTTPException(status_code=500, detail="An internal error occurred while cleaning up resources") from e @router.get("/environment") async def get_environment(): """Get environment information and authentication status.""" env_info = get_environment_info() return { "environment": env_info, "timestamp": datetime.now().isoformat() } @router.get("/usage-summary") async def get_usage_summary(request: Request): """ Get a summary of recent API usage for monitoring purposes. This helps track OpenAI API costs and detect potential abuse. """ # Only authenticated users can see usage data user = getattr(request.state, "user", None) if not user: raise HTTPException(status_code=401, detail="Authentication required") # In a production system, you'd query a database or log aggregation service # For now, we'll return a summary based on recent log entries return { "message": "Usage tracking is active", "tracking_enabled": True, "openai_endpoints_monitored": [ "/api/knowledge-graphs/extract", "/api/knowledge-graphs/analyze", "/api/methods/", "/api/traces/analyze", "/api/causal/", ], "current_user": { "username": user.get("username", "unknown"), "auth_method": user.get("auth_method", "unknown"), }, "note": "Detailed usage logs are available in the application logs for administrator review", "timestamp": datetime.now().isoformat() } @router.get("/health-check") async def health_check(): """Comprehensive health check for the system.""" try: cpu_usage = psutil.cpu_percent() memory_usage = psutil.virtual_memory().percent total_tasks = len(task_store.tasks) pending_tasks = len([t for t in task_store.tasks.values() if t.get("status") == "PENDING"]) processing_tasks = len([t for t in task_store.tasks.values() if t.get("status") == "PROCESSING"]) completed_tasks = len([t for t in task_store.tasks.values() if t.get("status") == "COMPLETED"]) failed_tasks = len([t for t in task_store.tasks.values() if t.get("status") == "FAILED"]) stuck_tasks = [] current_time = datetime.now() for task_id, task in task_store.tasks.items(): if task.get("status") == "PROCESSING": updated_at_str = task.get("update_timestamp") or task.get("creation_timestamp") if updated_at_str: updated_at = datetime.fromisoformat(updated_at_str.replace("Z", "+00:00")) if updated_at.tzinfo is None: updated_at = updated_at.astimezone() time_diff = (current_time.astimezone() - updated_at).total_seconds() if time_diff > 3600: stuck_tasks.append({"task_id": task_id, "stuck_duration": time_diff}) health_status = "healthy" issues = [] if cpu_usage > 90: health_status = "warning" issues.append(f"High CPU usage: {cpu_usage}%") if memory_usage > 90: health_status = "critical" issues.append(f"High memory usage: {memory_usage}%") if stuck_tasks: health_status = "warning" issues.append(f"{len(stuck_tasks)} tasks appear stuck") tasks = { "total": total_tasks, "pending": pending_tasks, "processing": processing_tasks, "completed": completed_tasks, "failed": failed_tasks, "stuck": stuck_tasks } return { "status": health_status, "issues": issues, "resources": {"cpu_usage": cpu_usage, "memory_usage": memory_usage}, "tasks": tasks, "timestamp": current_time.isoformat() } except Exception as e: logger.error(f"Error in health check: {str(e)}") return JSONResponse(status_code=500, content={"status": "error", "error": str(e)}) @router.post("/cleanup-stuck-tasks") async def cleanup_stuck_tasks(): """Clean up tasks that have been stuck in processing state for more than 1 hour.""" try: current_time = datetime.now() cleaned_tasks = [] for task_id, task in list(task_store.tasks.items()): # Iterate over a copy if task.get("status") == "PROCESSING": updated_at_str = task.get("update_timestamp") or task.get("creation_timestamp") if updated_at_str: updated_at = datetime.fromisoformat(updated_at_str.replace("Z", "+00:00")) if updated_at.tzinfo is None: updated_at = updated_at.astimezone() time_diff = (current_time.astimezone() - updated_at).total_seconds() if time_diff > 3600: task_store.update_task(task_id, status="FAILED", error="Task timed out and was cleaned up.") cleaned_tasks.append(task_id) gc.collect() return {"success": True, "cleaned_tasks": cleaned_tasks} except Exception as e: logger.error(f"Error cleaning up stuck tasks: {str(e)}") raise HTTPException(status_code=500, detail="An internal error occurred while cleaning up stuck tasks") from e