Spaces:
Running
Running
| """ | |
| AI Observability Platform Integration Router | |
| Handles connections to external AI observability platforms like Langfuse and LangSmith. | |
| Provides endpoints for: | |
| - Platform connection management | |
| - Trace fetching and importing | |
| - Automated synchronization | |
| """ | |
| import base64 | |
| import gc | |
| import json | |
| import logging | |
| import time | |
| import uuid | |
| from datetime import datetime | |
| from typing import Dict, List, Optional, cast | |
| import psutil | |
| from utils.environment import get_environment_info, debug_environment | |
| import requests | |
| from fastapi import APIRouter, Depends, HTTPException, Request | |
| from fastapi.responses import JSONResponse | |
| from langsmith import Client as LangsmithClient | |
| from pydantic import BaseModel | |
| from sqlalchemy import Column | |
| from sqlalchemy.orm import Session | |
| from sqlalchemy.orm.attributes import flag_modified | |
| from agentgraph.input.text_processing.trace_preprocessor import filter_langfuse_session, filter_langsmith_trace | |
| from backend.database import get_db | |
| from backend.database.models import FetchedTrace, ObservabilityConnection | |
| from backend.database.utils import save_trace | |
| from backend.routers.observe_models import LangFuseSession, LangSmithRun, LangSmithTrace | |
| from backend.services.platform.langfuse_downloader import LangfuseDownloader | |
| from backend.services.task_store_service import task_store | |
| logger = logging.getLogger("agent_monitoring_server.routers.observability") | |
| router = APIRouter(prefix="/api/observability", tags=["observability"]) | |
| def truncate_long_strings(obj, max_string_length=500): | |
| """ | |
| Recursively process JSON object to truncate very long strings | |
| No depth limit - all keys and array items are preserved | |
| Only truncates string values that are too long | |
| """ | |
| if isinstance(obj, dict): | |
| truncated = {} | |
| # Process ALL keys, no limit on key count or depth | |
| for key, value in obj.items(): | |
| truncated[key] = truncate_long_strings(value, max_string_length) | |
| return truncated | |
| elif isinstance(obj, list): | |
| truncated = [] | |
| # Process ALL items, no limit on item count or depth | |
| for item in obj: | |
| truncated.append(truncate_long_strings(item, max_string_length)) | |
| return truncated | |
| elif isinstance(obj, str) and len(obj) > max_string_length: | |
| return f"{obj[:max_string_length]}...({len(obj)} chars)" | |
| return obj | |
| # Helper Functions for Common Operations | |
| def get_langfuse_projects(public_key: str, secret_key: str, host: Optional[str]) -> List[Dict]: | |
| """Fetch projects from Langfuse API""" | |
| try: | |
| # Create Basic Auth header | |
| auth_string = f"{public_key}:{secret_key}" | |
| auth_bytes = auth_string.encode('ascii') | |
| auth_b64 = base64.b64encode(auth_bytes).decode('ascii') | |
| headers = { | |
| 'Authorization': f'Basic {auth_b64}', | |
| 'Content-Type': 'application/json' | |
| } | |
| # Get projects from Langfuse API | |
| host_url = host or "https://cloud.langfuse.com" | |
| projects_url = f"{host_url}/api/public/projects" | |
| response = requests.get(projects_url, headers=headers, timeout=10) | |
| response.raise_for_status() | |
| projects_data = response.json() | |
| projects_info = [] | |
| # Extract project information | |
| if 'data' in projects_data: | |
| for project in projects_data['data']: | |
| project_info = { | |
| "id": project.get('id', ''), | |
| "name": project.get('name', ''), | |
| "description": project.get('description', ''), | |
| "created_at": project.get('createdAt', None) | |
| } | |
| projects_info.append(project_info) | |
| if not projects_info: | |
| # Fallback to default project if no projects found | |
| projects_info = [{"name": "Default", "id": "default", "description": "Langfuse workspace"}] | |
| logger.info(f"Successfully fetched {len(projects_info)} Langfuse projects") | |
| return projects_info | |
| except Exception as e: | |
| logger.warning(f"Failed to fetch Langfuse projects: {str(e)}, using default project") | |
| return [{"name": "Default", "id": "default", "description": "Langfuse workspace"}] | |
| def get_langsmith_projects(api_key: str) -> List[Dict]: | |
| """Fetch projects from LangSmith API""" | |
| try: | |
| client = LangsmithClient(api_key=api_key) | |
| projects = list(client.list_projects()) | |
| logger.info(f"Successfully fetched {len(projects)} LangSmith projects") | |
| # Extract project information | |
| projects_info = [] | |
| for project in projects: | |
| project_info = { | |
| "id": str(project.id), | |
| "name": project.name, | |
| "description": getattr(project, 'description', ''), | |
| "created_at": getattr(project, 'created_at', None) | |
| } | |
| projects_info.append(project_info) | |
| return projects_info | |
| except Exception as e: | |
| logger.error(f"Failed to fetch LangSmith projects: {str(e)}") | |
| raise | |
| def test_langfuse_connection(public_key: str, secret_key: str, host: Optional[str]) -> bool: | |
| """Test Langfuse connection by fetching traces""" | |
| try: | |
| downloader = LangfuseDownloader( | |
| secret_key=secret_key, | |
| public_key=public_key, | |
| host=host or "https://cloud.langfuse.com" | |
| ) | |
| # Test connection by fetching a small number of traces | |
| test_traces = downloader.download_recent_traces(limit=1) | |
| logger.info(f"Successfully tested Langfuse connection, found {len(test_traces)} traces") | |
| return True | |
| except Exception as e: | |
| logger.error(f"Failed to connect to Langfuse: {str(e)}") | |
| raise HTTPException(status_code=400, detail="Failed to connect to Langfuse") from e | |
| def test_langsmith_connection(api_key: str) -> bool: | |
| """Test LangSmith connection by listing projects""" | |
| try: | |
| client = LangsmithClient(api_key=api_key) | |
| projects = list(client.list_projects()) | |
| logger.info(f"Successfully tested LangSmith connection, found {len(projects)} projects") | |
| return True | |
| except Exception as e: | |
| logger.error(f"Failed to connect to LangSmith: {str(e)}") | |
| raise HTTPException(status_code=400, detail="Failed to connect to LangSmith") from e | |
| def get_connection_projects(platform: str, public_key: str, secret_key: str, host: Optional[str]) -> List[Dict]: | |
| """Get projects for a platform connection""" | |
| platform = platform.lower() | |
| if platform == "langfuse": | |
| test_langfuse_connection(public_key, secret_key, host) | |
| return get_langfuse_projects(public_key, secret_key, host) | |
| elif platform == "langsmith": | |
| if not public_key: | |
| raise HTTPException(status_code=400, detail="LangSmith requires an API token") | |
| test_langsmith_connection(public_key) | |
| return get_langsmith_projects(public_key) | |
| else: | |
| raise HTTPException(status_code=400, detail=f"Unsupported platform: {platform}") | |
| def get_last_fetch_time(db: Session, connection_id: str, platform: str, project_name: Optional[str] = None) -> Optional[datetime]: | |
| """Get last fetch time for a connection and optionally a specific project""" | |
| query = db.query(FetchedTrace).filter( | |
| FetchedTrace.connection_id == connection_id, | |
| FetchedTrace.platform == platform | |
| ) | |
| if project_name: | |
| query = query.filter(FetchedTrace.project_name == project_name) | |
| last_trace = query.order_by(FetchedTrace.fetched_at.desc()).first() | |
| return cast(datetime, last_trace.fetched_at) if last_trace else None | |
| def create_fetched_trace(trace_id: str, name: str, platform: str, connection_id: str, | |
| data: Dict, project_name: Optional[str] = None) -> FetchedTrace: | |
| """Create a FetchedTrace object""" | |
| return FetchedTrace( | |
| trace_id=trace_id, | |
| name=name, | |
| platform=platform, | |
| connection_id=connection_id, | |
| project_name=project_name, | |
| data=data | |
| ) | |
| def fetch_langfuse_sessions(connection: ObservabilityConnection, db: Session, project_name: str, limit: int = 50) -> List[Dict]: | |
| """Fetch sessions from Langfuse""" | |
| downloader = LangfuseDownloader( | |
| secret_key=cast(str, connection.secret_key), | |
| public_key=cast(str, connection.public_key), | |
| host=cast(str, connection.host) | |
| ) | |
| # Get last fetched time for this specific project | |
| from_timestamp = get_last_fetch_time(db, cast(str, connection.connection_id), "langfuse", project_name) | |
| if from_timestamp: | |
| logger.info(f"Fetching sessions for project {project_name} from {from_timestamp} onwards") | |
| else: | |
| logger.info(f"No previous fetches found for project {project_name}, fetching all sessions") | |
| # List sessions to get session IDs | |
| if from_timestamp: | |
| sessions_response = downloader.client.api.sessions.list( | |
| limit=limit, | |
| from_timestamp=from_timestamp | |
| ) | |
| else: | |
| sessions_response = downloader.client.api.sessions.list(limit=limit) | |
| # Handle different response formats | |
| if hasattr(sessions_response, 'data'): | |
| sessions = [downloader._convert_to_dict(session) for session in sessions_response.data] | |
| else: | |
| sessions = [downloader._convert_to_dict(session) for session in sessions_response] | |
| logger.info(f"Found {len(sessions)} sessions") | |
| # Store each session as a fetched trace | |
| for session in sessions: | |
| session_id = session['id'] | |
| # Check if session already exists | |
| existing_session = db.query(FetchedTrace).filter( | |
| FetchedTrace.trace_id == session_id, | |
| FetchedTrace.connection_id == connection.connection_id | |
| ).first() | |
| if not existing_session: | |
| try: | |
| traces_response = downloader.client.api.trace.list(session_id=session_id) | |
| if hasattr(traces_response, 'data'): | |
| session_traces = [downloader._convert_to_dict(trace) for trace in traces_response.data] | |
| else: | |
| session_traces = [downloader._convert_to_dict(trace) for trace in traces_response] | |
| # Get detailed trace data for each trace | |
| detailed_traces = [] | |
| for i, trace_summary in enumerate(session_traces): | |
| trace_id = trace_summary['id'] | |
| if i > 0: | |
| time.sleep(1) | |
| detailed_trace = downloader.client.api.trace.get(trace_id) | |
| trace_data = downloader._convert_to_dict(detailed_trace) | |
| detailed_traces.append(trace_data) | |
| logger.info(f"Downloaded detailed trace: {trace_id} ({i+1}/{len(session_traces)})") | |
| # Create session data - correct LangFuseSession format | |
| session_data = LangFuseSession( | |
| session_id=session_id, | |
| session_name=session_id, | |
| project_name=project_name, | |
| export_timestamp=datetime.now().isoformat(), | |
| total_traces=len(detailed_traces), | |
| traces=detailed_traces | |
| ) | |
| # Convert to JSON-serializable format | |
| data_json = session_data.model_dump() | |
| fetched_trace = create_fetched_trace( | |
| trace_id=session_id, | |
| name=session_id, | |
| platform="langfuse", | |
| connection_id=cast(str, connection.connection_id), | |
| data=data_json, | |
| project_name=project_name | |
| ) | |
| db.add(fetched_trace) | |
| logger.info(f"Stored session {session_id} with {len(detailed_traces)} traces") | |
| except Exception as e: | |
| logger.error(f"Error processing session {session_id}: {e}") | |
| continue | |
| db.commit() | |
| logger.info(f"Fetched {len(sessions)} sessions from Langfuse") | |
| return sessions | |
| def fetch_langsmith_traces(connection: ObservabilityConnection, db: Session, project_name: str, limit: int = 50) -> List[Dict]: | |
| """Fetch traces from LangSmith""" | |
| try: | |
| client = LangsmithClient(api_key=cast(str, connection.public_key)) | |
| logger.info("Connected to LangSmith successfully") | |
| # Get all projects | |
| try: | |
| projects = list(client.list_projects()) | |
| logger.info(f"Found {len(projects)} projects") | |
| except Exception as e: | |
| logger.error(f"Error listing projects: {e}") | |
| raise HTTPException(status_code=500, detail="An internal error occurred while listing projects") from e | |
| # Export runs from specific project only | |
| all_traces = [] | |
| total_limit = limit | |
| # Get existing trace IDs to avoid duplicates | |
| existing_traces = db.query(FetchedTrace).filter( | |
| FetchedTrace.connection_id == connection.connection_id, | |
| FetchedTrace.platform == "langsmith", | |
| FetchedTrace.project_name == project_name | |
| ).all() | |
| existing_trace_ids = {cast(str, trace.trace_id) for trace in existing_traces} | |
| logger.info(f"Exporting specific project: {project_name}") | |
| # Get last fetched time for this specific project | |
| project_from_timestamp = get_last_fetch_time( | |
| db, cast(str, connection.connection_id), "langsmith", project_name | |
| ) | |
| if project_from_timestamp: | |
| logger.info(f"Fetching {project_name} runs from {project_from_timestamp} onwards") | |
| else: | |
| logger.info(f"No previous fetches found for {project_name}, fetching all runs") | |
| # Get all runs (top-level runs only) - same as langsmith_exporter.py | |
| list_runs_kwargs = { | |
| "project_name": project_name, | |
| "is_root": True, | |
| "limit": limit | |
| } | |
| # Add start_time filter if we have a project-specific timestamp | |
| if project_from_timestamp: | |
| list_runs_kwargs["start_time"] = project_from_timestamp | |
| runs = list(client.list_runs(**list_runs_kwargs)) | |
| logger.info(f"Found {len(runs)} runs in project {project_name}") | |
| # Process runs in batch | |
| new_traces_to_add = [] | |
| for run in runs: | |
| run_name = getattr(run, 'name', 'unnamed') | |
| run_id = str(run.id) | |
| unique_trace_id = f"{run_name}_{run_id}" | |
| # Skip if already exists | |
| if unique_trace_id in existing_trace_ids: | |
| logger.debug(f"Skipping existing trace: {unique_trace_id}") | |
| continue | |
| # Get all traces for this run (including nested children) - same as langsmith_exporter.py | |
| all_runs: List[LangSmithRun] = [] | |
| try: | |
| # Get the root run and all its children | |
| trace_runs = client.list_runs(project_name=project_name, trace_id=run.trace_id) | |
| run_list = list(trace_runs) | |
| # Sort traces by start_time descending (latest first) | |
| sorted_runs = sorted(run_list, key=lambda t: getattr(t, 'start_time', None) or datetime.min) | |
| for run_item in sorted_runs: | |
| run_data = run_item.dict() if hasattr(run_item, 'dict') else dict(run_item) | |
| all_runs.append(LangSmithRun(**run_data)) | |
| except Exception as e: | |
| logger.warning(f"Could not get child traces for run {run_id}: {e}") | |
| # Fallback to just the main run | |
| run_data = run.dict() if hasattr(run, 'dict') else dict(run) | |
| all_runs = [LangSmithRun(**run_data)] | |
| # Create run export structure - same format as langsmith_exporter.py | |
| run_export = LangSmithTrace( | |
| trace_id=run_id, | |
| trace_name=run_name, | |
| project_name=project_name, | |
| export_time=datetime.now().isoformat(), | |
| total_runs=len(all_runs), | |
| runs=all_runs | |
| ) | |
| # Prepare for batch database insert | |
| try: | |
| clean_data = run_export.model_dump() | |
| fetched_trace = create_fetched_trace( | |
| trace_id=unique_trace_id, | |
| name=f"{run_name}_{run_id[:8]}", | |
| platform="langsmith", | |
| connection_id=cast(str, connection.connection_id), | |
| data=clean_data, | |
| project_name=project_name | |
| ) | |
| new_traces_to_add.append(fetched_trace) | |
| all_traces.append(clean_data) | |
| existing_trace_ids.add(unique_trace_id) | |
| except Exception as e: | |
| logger.error(f"Error preparing trace {unique_trace_id}: {e}") | |
| continue | |
| # Stop if we've reached the limit | |
| if len(all_traces) >= total_limit: | |
| break | |
| # Batch insert new traces | |
| if new_traces_to_add: | |
| db.add_all(new_traces_to_add) | |
| logger.info(f"Added {len(new_traces_to_add)} new traces from project {project_name}") | |
| # Single commit for all operations | |
| db.commit() | |
| logger.info(f"Fetched {len(all_traces)} traces from LangSmith") | |
| return all_traces | |
| except Exception as e: | |
| logger.error(f"Error fetching LangSmith traces: {str(e)}") | |
| raise HTTPException(status_code=500, detail="An internal error occurred while fetching traces") from e | |
| # Request/Response Models | |
| class ConnectionRequest(BaseModel): | |
| platform: str | |
| publicKey: str | |
| secretKey: str | |
| host: Optional[str] = None | |
| class ConnectionResponse(BaseModel): | |
| status: str | |
| message: str | |
| connection_id: str | |
| class TraceFetchRequest(BaseModel): | |
| limit: int = 50 | |
| start_date: Optional[str] = None | |
| end_date: Optional[str] = None | |
| project_name: str | |
| class PreprocessingOptions(BaseModel): | |
| """Preprocessing options for trace filtering""" | |
| max_char: Optional[int] = 1000 | |
| topk: int = 10 | |
| raw: bool = False | |
| hierarchy: bool = False | |
| replace: bool = False | |
| truncate_enabled: bool = False | |
| class TraceImportRequest(BaseModel): | |
| trace_ids: List[str] | |
| preprocessing: Optional[PreprocessingOptions] = PreprocessingOptions() | |
| async def connect_platform(request: ConnectionRequest, db: Session = Depends(get_db)): # noqa: B008 | |
| """Connect to an AI observability platform""" | |
| try: | |
| platform = request.platform.lower() | |
| public_key = request.publicKey | |
| secret_key = request.secretKey | |
| # Get projects and test connection | |
| projects_info = get_connection_projects(platform, public_key, secret_key, request.host) | |
| # Store connection info in database | |
| connection_id = str(uuid.uuid4()) | |
| db_connection = ObservabilityConnection( | |
| connection_id=connection_id, | |
| platform=platform, | |
| public_key=public_key, | |
| secret_key=secret_key, | |
| host=request.host, | |
| projects=projects_info, | |
| status="connected" | |
| ) | |
| db.add(db_connection) | |
| db.commit() | |
| db.refresh(db_connection) | |
| logger.info(f"Successfully connected to {platform} with connection ID: {connection_id}") | |
| return ConnectionResponse( | |
| status="success", | |
| message=f"Successfully connected to {platform.title()}", | |
| connection_id=connection_id | |
| ) | |
| except HTTPException: | |
| raise | |
| except Exception as e: | |
| logger.error(f"Unexpected error connecting to platform: {str(e)}") | |
| raise HTTPException(status_code=500, detail="Internal server error") from e | |
| async def get_connections(db: Session = Depends(get_db)): # noqa: B008 | |
| """Get all active platform connections""" | |
| connections = db.query(ObservabilityConnection).all() | |
| return {"connections": [conn.to_dict() for conn in connections]} | |
| async def update_connection( | |
| connection_id: str, | |
| request: ConnectionRequest, | |
| db: Session = Depends(get_db) # noqa: B008 | |
| ): | |
| """Update an existing platform connection""" | |
| try: | |
| # Find existing connection | |
| connection = db.query(ObservabilityConnection).filter( | |
| ObservabilityConnection.connection_id == connection_id | |
| ).first() | |
| if not connection: | |
| raise HTTPException(status_code=404, detail="Connection not found") | |
| platform = request.platform.lower() | |
| public_key = request.publicKey | |
| secret_key = request.secretKey | |
| # Test connection and get projects | |
| projects_info = get_connection_projects(platform, public_key, secret_key, request.host) | |
| # Update connection in database | |
| connection.public_key = cast(Column[str], public_key) | |
| connection.secret_key = cast(Column[str], secret_key) | |
| connection.host = cast(Column[str], request.host) | |
| connection.projects = cast(Column[List[Dict]], projects_info) | |
| connection.status = cast(Column[str], "connected") | |
| db.commit() | |
| db.refresh(connection) | |
| logger.info(f"Successfully updated {platform} connection: {connection_id}") | |
| return { | |
| "status": "success", | |
| "message": f"Successfully updated {platform.title()} connection" | |
| } | |
| except HTTPException: | |
| raise | |
| except Exception as e: | |
| logger.error(f"Unexpected error updating connection: {str(e)}") | |
| raise HTTPException(status_code=500, detail="Internal server error") from e | |
| async def disconnect_platform(connection_id: str, db: Session = Depends(get_db)): # noqa: B008 | |
| """Disconnect from a platform""" | |
| connection = db.query(ObservabilityConnection).filter( | |
| ObservabilityConnection.connection_id == connection_id | |
| ).first() | |
| if not connection: | |
| raise HTTPException(status_code=404, detail="Connection not found") | |
| platform = connection.platform | |
| # Delete all fetched traces for this connection | |
| fetched_traces = db.query(FetchedTrace).filter( | |
| FetchedTrace.connection_id == connection_id | |
| ).all() | |
| deleted_traces_count = len(fetched_traces) | |
| # Delete fetched traces | |
| for fetched_trace in fetched_traces: | |
| db.delete(fetched_trace) | |
| # Remove connection from database | |
| db.delete(connection) | |
| db.commit() | |
| logger.info(f"Disconnected from {platform} (connection ID: {connection_id})") | |
| logger.info(f"Deleted {deleted_traces_count} fetched traces for connection {connection_id}") | |
| return { | |
| "status": "success", | |
| "message": f"Disconnected from {platform.title()}", | |
| "deleted_fetched_traces": deleted_traces_count, | |
| "disconnected_at": datetime.now().isoformat() | |
| } | |
| # Connection-specific routes (required by frontend) | |
| async def get_fetched_traces_by_connection(connection_id: str, db: Session = Depends(get_db)): # noqa: B008 | |
| """Get all fetched traces for a specific connection""" | |
| # Get connection | |
| connection = db.query(ObservabilityConnection).filter( | |
| ObservabilityConnection.connection_id == connection_id, | |
| ObservabilityConnection.status == "connected" | |
| ).first() | |
| if not connection: | |
| raise HTTPException(status_code=404, detail=f"No active connection found with ID {connection_id}") | |
| # Get all fetched traces for this connection | |
| fetched_traces = db.query(FetchedTrace).filter( | |
| FetchedTrace.connection_id == connection_id | |
| ).order_by(FetchedTrace.fetched_at.desc()).all() | |
| return { | |
| "traces": [trace.to_dict() for trace in fetched_traces], | |
| "total": len(fetched_traces), | |
| "platform": connection.platform | |
| } | |
| async def fetch_traces_by_connection( | |
| connection_id: str, | |
| request: TraceFetchRequest, | |
| db: Session = Depends(get_db) # noqa: B008 | |
| ): | |
| """Fetch traces from a specific connection""" | |
| # Get connection | |
| connection = db.query(ObservabilityConnection).filter( | |
| ObservabilityConnection.connection_id == connection_id, | |
| ObservabilityConnection.status == "connected" | |
| ).first() | |
| if not connection: | |
| raise HTTPException(status_code=404, detail=f"No active connection found with ID {connection_id}") | |
| try: | |
| import asyncio | |
| # Run blocking operations in executor to avoid blocking the event loop | |
| loop = asyncio.get_event_loop() | |
| def sync_fetch(): | |
| # Create new db session for the thread | |
| from backend.database import get_db | |
| thread_db = next(get_db()) | |
| try: | |
| project_name = request.project_name | |
| if cast(str, connection.platform) == "langfuse": | |
| traces = fetch_langfuse_sessions(connection, thread_db, project_name, request.limit) | |
| elif cast(str, connection.platform) == "langsmith": | |
| traces = fetch_langsmith_traces(connection, thread_db, project_name, request.limit) | |
| else: | |
| raise HTTPException(status_code=400, detail=f"Unsupported platform: {connection.platform}") | |
| # Update last sync time | |
| connection.last_sync = cast(Column[datetime], datetime.now()) | |
| thread_db.commit() | |
| return traces | |
| finally: | |
| thread_db.close() | |
| # Execute in thread pool to avoid blocking | |
| traces = await loop.run_in_executor(None, sync_fetch) | |
| return { | |
| "status": "success", | |
| "message": f"Successfully fetched {len(traces)} traces from {connection.platform}", | |
| "platform": connection.platform, | |
| "traces_count": len(traces), | |
| "completed_at": datetime.now().isoformat() | |
| } | |
| except Exception as e: | |
| logger.error(f"Failed to fetch traces from connection {connection_id}: {str(e)}") | |
| raise HTTPException(status_code=500, detail="An internal error occurred while fetching traces") from e | |
| async def import_traces_by_connection( | |
| connection_id: str, | |
| request: TraceImportRequest, | |
| db: Session = Depends(get_db) # noqa: B008 | |
| ): | |
| """Import specific traces from a connection to local database""" | |
| # Get connection | |
| connection = db.query(ObservabilityConnection).filter( | |
| ObservabilityConnection.connection_id == connection_id, | |
| ObservabilityConnection.status == "connected" | |
| ).first() | |
| if not connection: | |
| raise HTTPException(status_code=404, detail=f"No active connection found with ID {connection_id}") | |
| try: | |
| imported_count = 0 | |
| errors = [] | |
| for trace_id in request.trace_ids: | |
| try: | |
| # Get trace from fetched_traces table | |
| trace = db.query(FetchedTrace).filter( | |
| FetchedTrace.trace_id == trace_id, | |
| FetchedTrace.connection_id == connection_id | |
| ).first() | |
| if not trace: | |
| errors.append(f"Trace {trace_id} not found in fetched traces for connection {connection_id}") | |
| continue | |
| # Process based on platform | |
| preprocessing_opts = request.preprocessing or PreprocessingOptions() | |
| if cast(str, connection.platform) == "langfuse": | |
| trace_data = trace.get_full_data()["data"] | |
| filtered_trace = filter_langfuse_session( | |
| LangFuseSession(**trace_data), | |
| max_char=preprocessing_opts.max_char, | |
| topk=preprocessing_opts.topk, | |
| raw=preprocessing_opts.raw, | |
| hierarchy=preprocessing_opts.hierarchy, | |
| replace=preprocessing_opts.replace | |
| ) | |
| processed_trace = save_trace( | |
| session=db, | |
| content=json.dumps(filtered_trace, indent=2, default=str), | |
| filename=f"langfuse_trace_{trace_id}", | |
| title=f"Langfuse trace {trace_id}", | |
| description=f"Imported from Langfuse on {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}", | |
| trace_type="langfuse", | |
| trace_source="langfuse", | |
| tags=["imported", "langfuse"] | |
| ) | |
| if processed_trace: | |
| imported_count += 1 | |
| logger.info(f"Successfully imported Langfuse trace {trace_id} as {processed_trace.trace_id}") | |
| # Run trace characteristics analysis to generate statistics | |
| try: | |
| from agentgraph.input.trace_management import analyze_trace_characteristics | |
| raw_content_for_analysis = json.dumps(filtered_trace, indent=2, default=str) | |
| trace_analysis = analyze_trace_characteristics(raw_content_for_analysis, optimize_content=False) | |
| # Update trace metadata with analysis results | |
| if processed_trace.trace_metadata: | |
| processed_trace.trace_metadata.update(trace_analysis) | |
| else: | |
| processed_trace.trace_metadata = trace_analysis | |
| flag_modified(processed_trace, "trace_metadata") | |
| db.commit() | |
| logger.info(f"Added trace characteristics analysis to imported trace {processed_trace.trace_id}") | |
| except Exception as e: | |
| logger.warning(f"Failed to analyze trace characteristics for imported trace {processed_trace.trace_id}: {str(e)}") | |
| # Auto-generate context documents using universal parser | |
| try: | |
| from backend.services.universal_parser_service import auto_generate_context_documents | |
| raw_content = json.dumps(filtered_trace, indent=2, default=str) | |
| created_docs = auto_generate_context_documents(cast(str, processed_trace.trace_id), raw_content, db) | |
| if created_docs: | |
| logger.info(f"Auto-generated {len(created_docs)} context documents for processed trace {processed_trace.trace_id}") | |
| except Exception as e: | |
| logger.warning(f"Failed to auto-generate context documents for processed trace {processed_trace.trace_id}: {str(e)}") | |
| elif cast(str, connection.platform) == "langsmith": | |
| langsmith_export = trace.get_full_data()["data"] | |
| filtered_export = filter_langsmith_trace( | |
| LangSmithTrace(**langsmith_export), | |
| max_char=preprocessing_opts.max_char, | |
| topk=preprocessing_opts.topk, | |
| raw=preprocessing_opts.raw, | |
| hierarchy=preprocessing_opts.hierarchy, | |
| replace=preprocessing_opts.replace | |
| ) | |
| processed_trace = save_trace( | |
| session=db, | |
| content=json.dumps(filtered_export, indent=2, default=str), | |
| filename=f"langsmith_trace_{trace_id}", | |
| title=f"LangSmith trace {trace_id}", | |
| description=f"Imported from LangSmith on {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}", | |
| trace_type="langsmith", | |
| trace_source="langsmith", | |
| tags=["imported", "langsmith"] | |
| ) | |
| if processed_trace: | |
| imported_count += 1 | |
| logger.info(f"Successfully imported LangSmith trace {trace_id} as {processed_trace.trace_id}") | |
| # Run trace characteristics analysis to generate statistics | |
| try: | |
| from agentgraph.input.trace_management import analyze_trace_characteristics | |
| # Use the original langsmith_export for better analysis results | |
| raw_content_for_analysis = json.dumps(langsmith_export, indent=2, default=str) | |
| trace_analysis = analyze_trace_characteristics(raw_content_for_analysis, optimize_content=False) | |
| # Update trace metadata with analysis results | |
| if processed_trace.trace_metadata: | |
| processed_trace.trace_metadata.update(trace_analysis) | |
| else: | |
| processed_trace.trace_metadata = trace_analysis | |
| flag_modified(processed_trace, "trace_metadata") | |
| db.commit() | |
| logger.info(f"Added trace characteristics analysis to imported trace {processed_trace.trace_id}") | |
| except Exception as e: | |
| logger.warning(f"Failed to analyze trace characteristics for imported trace {processed_trace.trace_id}: {str(e)}") | |
| # Auto-generate context documents using universal parser (use raw content, not processed) | |
| try: | |
| from backend.services.universal_parser_service import auto_generate_context_documents | |
| # Use the original langsmith_export for better parsing results | |
| raw_content = json.dumps(langsmith_export, indent=2, default=str) | |
| created_docs = auto_generate_context_documents(cast(str, processed_trace.trace_id), raw_content, db) | |
| if created_docs: | |
| logger.info(f"Auto-generated {len(created_docs)} context documents for processed trace {processed_trace.trace_id}") | |
| except Exception as e: | |
| logger.warning(f"Failed to auto-generate context documents for processed trace {processed_trace.trace_id}: {str(e)}") | |
| except Exception as e: | |
| error_msg = f"Failed to import trace {trace_id}: {str(e)}" | |
| errors.append(error_msg) | |
| logger.error(error_msg) | |
| # Update last sync time | |
| connection.last_sync = cast(Column[datetime], datetime.now()) | |
| db.commit() | |
| return { | |
| "imported": imported_count, | |
| "errors": errors, | |
| "platform": connection.platform, | |
| "import_completed_at": datetime.now().isoformat() | |
| } | |
| except Exception as e: | |
| logger.error(f"Failed to import traces from connection {connection_id}: {str(e)}") | |
| raise HTTPException(status_code=500, detail="An internal error occurred while importing traces") from e | |
| async def download_trace_by_id(trace_id: str, db: Session = Depends(get_db)): # noqa: B008 | |
| """Download full trace data by trace ID (platform-agnostic)""" | |
| trace = db.query(FetchedTrace).filter( | |
| FetchedTrace.trace_id == trace_id | |
| ).first() | |
| if not trace: | |
| raise HTTPException(status_code=404, detail="Trace not found") | |
| return trace.get_full_data() | |
| async def get_resource_usage(): | |
| """Get resource usage information for the current process.""" | |
| try: | |
| cpu_usage = psutil.cpu_percent() | |
| memory_usage = psutil.virtual_memory().percent | |
| return {"cpu_usage": cpu_usage, "memory_usage": memory_usage} | |
| except Exception as e: | |
| logger.error(f"Error retrieving resource usage: {str(e)}") | |
| raise HTTPException(status_code=500, detail="An internal error occurred while retrieving resource usage") from e | |
| async def clean_up(session: Session = Depends(get_db)): # noqa: B008 | |
| """Clean up resources by closing database connections and freeing up memory.""" | |
| try: | |
| session.close() | |
| gc.collect() | |
| return {"success": True, "message": "Resources cleaned up successfully"} | |
| except Exception as e: | |
| logger.error(f"Error cleaning up resources: {str(e)}") | |
| raise HTTPException(status_code=500, detail="An internal error occurred while cleaning up resources") from e | |
| async def get_environment(): | |
| """Get environment information and authentication status.""" | |
| env_info = get_environment_info() | |
| return { | |
| "environment": env_info, | |
| "timestamp": datetime.now().isoformat() | |
| } | |
| async def get_usage_summary(request: Request): | |
| """ | |
| Get a summary of recent API usage for monitoring purposes. | |
| This helps track OpenAI API costs and detect potential abuse. | |
| """ | |
| # Only authenticated users can see usage data | |
| user = getattr(request.state, "user", None) | |
| if not user: | |
| raise HTTPException(status_code=401, detail="Authentication required") | |
| # In a production system, you'd query a database or log aggregation service | |
| # For now, we'll return a summary based on recent log entries | |
| return { | |
| "message": "Usage tracking is active", | |
| "tracking_enabled": True, | |
| "openai_endpoints_monitored": [ | |
| "/api/knowledge-graphs/extract", | |
| "/api/knowledge-graphs/analyze", | |
| "/api/methods/", | |
| "/api/traces/analyze", | |
| "/api/causal/", | |
| ], | |
| "current_user": { | |
| "username": user.get("username", "unknown"), | |
| "auth_method": user.get("auth_method", "unknown"), | |
| }, | |
| "note": "Detailed usage logs are available in the application logs for administrator review", | |
| "timestamp": datetime.now().isoformat() | |
| } | |
| async def health_check(): | |
| """Comprehensive health check for the system.""" | |
| try: | |
| cpu_usage = psutil.cpu_percent() | |
| memory_usage = psutil.virtual_memory().percent | |
| total_tasks = len(task_store.tasks) | |
| pending_tasks = len([t for t in task_store.tasks.values() if t.get("status") == "PENDING"]) | |
| processing_tasks = len([t for t in task_store.tasks.values() if t.get("status") == "PROCESSING"]) | |
| completed_tasks = len([t for t in task_store.tasks.values() if t.get("status") == "COMPLETED"]) | |
| failed_tasks = len([t for t in task_store.tasks.values() if t.get("status") == "FAILED"]) | |
| stuck_tasks = [] | |
| current_time = datetime.now() | |
| for task_id, task in task_store.tasks.items(): | |
| if task.get("status") == "PROCESSING": | |
| updated_at_str = task.get("update_timestamp") or task.get("creation_timestamp") | |
| if updated_at_str: | |
| updated_at = datetime.fromisoformat(updated_at_str.replace("Z", "+00:00")) | |
| if updated_at.tzinfo is None: | |
| updated_at = updated_at.astimezone() | |
| time_diff = (current_time.astimezone() - updated_at).total_seconds() | |
| if time_diff > 3600: | |
| stuck_tasks.append({"task_id": task_id, "stuck_duration": time_diff}) | |
| health_status = "healthy" | |
| issues = [] | |
| if cpu_usage > 90: | |
| health_status = "warning" | |
| issues.append(f"High CPU usage: {cpu_usage}%") | |
| if memory_usage > 90: | |
| health_status = "critical" | |
| issues.append(f"High memory usage: {memory_usage}%") | |
| if stuck_tasks: | |
| health_status = "warning" | |
| issues.append(f"{len(stuck_tasks)} tasks appear stuck") | |
| tasks = { | |
| "total": total_tasks, | |
| "pending": pending_tasks, | |
| "processing": processing_tasks, | |
| "completed": completed_tasks, | |
| "failed": failed_tasks, | |
| "stuck": stuck_tasks | |
| } | |
| return { | |
| "status": health_status, | |
| "issues": issues, | |
| "resources": {"cpu_usage": cpu_usage, "memory_usage": memory_usage}, | |
| "tasks": tasks, | |
| "timestamp": current_time.isoformat() | |
| } | |
| except Exception as e: | |
| logger.error(f"Error in health check: {str(e)}") | |
| return JSONResponse(status_code=500, content={"status": "error", "error": str(e)}) | |
| async def cleanup_stuck_tasks(): | |
| """Clean up tasks that have been stuck in processing state for more than 1 hour.""" | |
| try: | |
| current_time = datetime.now() | |
| cleaned_tasks = [] | |
| for task_id, task in list(task_store.tasks.items()): # Iterate over a copy | |
| if task.get("status") == "PROCESSING": | |
| updated_at_str = task.get("update_timestamp") or task.get("creation_timestamp") | |
| if updated_at_str: | |
| updated_at = datetime.fromisoformat(updated_at_str.replace("Z", "+00:00")) | |
| if updated_at.tzinfo is None: | |
| updated_at = updated_at.astimezone() | |
| time_diff = (current_time.astimezone() - updated_at).total_seconds() | |
| if time_diff > 3600: | |
| task_store.update_task(task_id, status="FAILED", error="Task timed out and was cleaned up.") | |
| cleaned_tasks.append(task_id) | |
| gc.collect() | |
| return {"success": True, "cleaned_tasks": cleaned_tasks} | |
| except Exception as e: | |
| logger.error(f"Error cleaning up stuck tasks: {str(e)}") | |
| raise HTTPException(status_code=500, detail="An internal error occurred while cleaning up stuck tasks") from e | |