AgentGraph / backend /routers /observability.py
wu981526092's picture
Security: Fix critical vulnerabilities before public release
bcbd2ec
"""
AI Observability Platform Integration Router
Handles connections to external AI observability platforms like Langfuse and LangSmith.
Provides endpoints for:
- Platform connection management
- Trace fetching and importing
- Automated synchronization
"""
import base64
import gc
import json
import logging
import time
import uuid
from datetime import datetime
from typing import Dict, List, Optional, cast
import psutil
from utils.environment import get_environment_info, debug_environment
import requests
from fastapi import APIRouter, Depends, HTTPException, Request
from fastapi.responses import JSONResponse
from langsmith import Client as LangsmithClient
from pydantic import BaseModel
from sqlalchemy import Column
from sqlalchemy.orm import Session
from sqlalchemy.orm.attributes import flag_modified
from agentgraph.input.text_processing.trace_preprocessor import filter_langfuse_session, filter_langsmith_trace
from backend.database import get_db
from backend.database.models import FetchedTrace, ObservabilityConnection
from backend.database.utils import save_trace
from backend.routers.observe_models import LangFuseSession, LangSmithRun, LangSmithTrace
from backend.services.platform.langfuse_downloader import LangfuseDownloader
from backend.services.task_store_service import task_store
logger = logging.getLogger("agent_monitoring_server.routers.observability")
router = APIRouter(prefix="/api/observability", tags=["observability"])
def truncate_long_strings(obj, max_string_length=500):
"""
Recursively process JSON object to truncate very long strings
No depth limit - all keys and array items are preserved
Only truncates string values that are too long
"""
if isinstance(obj, dict):
truncated = {}
# Process ALL keys, no limit on key count or depth
for key, value in obj.items():
truncated[key] = truncate_long_strings(value, max_string_length)
return truncated
elif isinstance(obj, list):
truncated = []
# Process ALL items, no limit on item count or depth
for item in obj:
truncated.append(truncate_long_strings(item, max_string_length))
return truncated
elif isinstance(obj, str) and len(obj) > max_string_length:
return f"{obj[:max_string_length]}...({len(obj)} chars)"
return obj
# Helper Functions for Common Operations
def get_langfuse_projects(public_key: str, secret_key: str, host: Optional[str]) -> List[Dict]:
"""Fetch projects from Langfuse API"""
try:
# Create Basic Auth header
auth_string = f"{public_key}:{secret_key}"
auth_bytes = auth_string.encode('ascii')
auth_b64 = base64.b64encode(auth_bytes).decode('ascii')
headers = {
'Authorization': f'Basic {auth_b64}',
'Content-Type': 'application/json'
}
# Get projects from Langfuse API
host_url = host or "https://cloud.langfuse.com"
projects_url = f"{host_url}/api/public/projects"
response = requests.get(projects_url, headers=headers, timeout=10)
response.raise_for_status()
projects_data = response.json()
projects_info = []
# Extract project information
if 'data' in projects_data:
for project in projects_data['data']:
project_info = {
"id": project.get('id', ''),
"name": project.get('name', ''),
"description": project.get('description', ''),
"created_at": project.get('createdAt', None)
}
projects_info.append(project_info)
if not projects_info:
# Fallback to default project if no projects found
projects_info = [{"name": "Default", "id": "default", "description": "Langfuse workspace"}]
logger.info(f"Successfully fetched {len(projects_info)} Langfuse projects")
return projects_info
except Exception as e:
logger.warning(f"Failed to fetch Langfuse projects: {str(e)}, using default project")
return [{"name": "Default", "id": "default", "description": "Langfuse workspace"}]
def get_langsmith_projects(api_key: str) -> List[Dict]:
"""Fetch projects from LangSmith API"""
try:
client = LangsmithClient(api_key=api_key)
projects = list(client.list_projects())
logger.info(f"Successfully fetched {len(projects)} LangSmith projects")
# Extract project information
projects_info = []
for project in projects:
project_info = {
"id": str(project.id),
"name": project.name,
"description": getattr(project, 'description', ''),
"created_at": getattr(project, 'created_at', None)
}
projects_info.append(project_info)
return projects_info
except Exception as e:
logger.error(f"Failed to fetch LangSmith projects: {str(e)}")
raise
def test_langfuse_connection(public_key: str, secret_key: str, host: Optional[str]) -> bool:
"""Test Langfuse connection by fetching traces"""
try:
downloader = LangfuseDownloader(
secret_key=secret_key,
public_key=public_key,
host=host or "https://cloud.langfuse.com"
)
# Test connection by fetching a small number of traces
test_traces = downloader.download_recent_traces(limit=1)
logger.info(f"Successfully tested Langfuse connection, found {len(test_traces)} traces")
return True
except Exception as e:
logger.error(f"Failed to connect to Langfuse: {str(e)}")
raise HTTPException(status_code=400, detail="Failed to connect to Langfuse") from e
def test_langsmith_connection(api_key: str) -> bool:
"""Test LangSmith connection by listing projects"""
try:
client = LangsmithClient(api_key=api_key)
projects = list(client.list_projects())
logger.info(f"Successfully tested LangSmith connection, found {len(projects)} projects")
return True
except Exception as e:
logger.error(f"Failed to connect to LangSmith: {str(e)}")
raise HTTPException(status_code=400, detail="Failed to connect to LangSmith") from e
def get_connection_projects(platform: str, public_key: str, secret_key: str, host: Optional[str]) -> List[Dict]:
"""Get projects for a platform connection"""
platform = platform.lower()
if platform == "langfuse":
test_langfuse_connection(public_key, secret_key, host)
return get_langfuse_projects(public_key, secret_key, host)
elif platform == "langsmith":
if not public_key:
raise HTTPException(status_code=400, detail="LangSmith requires an API token")
test_langsmith_connection(public_key)
return get_langsmith_projects(public_key)
else:
raise HTTPException(status_code=400, detail=f"Unsupported platform: {platform}")
def get_last_fetch_time(db: Session, connection_id: str, platform: str, project_name: Optional[str] = None) -> Optional[datetime]:
"""Get last fetch time for a connection and optionally a specific project"""
query = db.query(FetchedTrace).filter(
FetchedTrace.connection_id == connection_id,
FetchedTrace.platform == platform
)
if project_name:
query = query.filter(FetchedTrace.project_name == project_name)
last_trace = query.order_by(FetchedTrace.fetched_at.desc()).first()
return cast(datetime, last_trace.fetched_at) if last_trace else None
def create_fetched_trace(trace_id: str, name: str, platform: str, connection_id: str,
data: Dict, project_name: Optional[str] = None) -> FetchedTrace:
"""Create a FetchedTrace object"""
return FetchedTrace(
trace_id=trace_id,
name=name,
platform=platform,
connection_id=connection_id,
project_name=project_name,
data=data
)
def fetch_langfuse_sessions(connection: ObservabilityConnection, db: Session, project_name: str, limit: int = 50) -> List[Dict]:
"""Fetch sessions from Langfuse"""
downloader = LangfuseDownloader(
secret_key=cast(str, connection.secret_key),
public_key=cast(str, connection.public_key),
host=cast(str, connection.host)
)
# Get last fetched time for this specific project
from_timestamp = get_last_fetch_time(db, cast(str, connection.connection_id), "langfuse", project_name)
if from_timestamp:
logger.info(f"Fetching sessions for project {project_name} from {from_timestamp} onwards")
else:
logger.info(f"No previous fetches found for project {project_name}, fetching all sessions")
# List sessions to get session IDs
if from_timestamp:
sessions_response = downloader.client.api.sessions.list(
limit=limit,
from_timestamp=from_timestamp
)
else:
sessions_response = downloader.client.api.sessions.list(limit=limit)
# Handle different response formats
if hasattr(sessions_response, 'data'):
sessions = [downloader._convert_to_dict(session) for session in sessions_response.data]
else:
sessions = [downloader._convert_to_dict(session) for session in sessions_response]
logger.info(f"Found {len(sessions)} sessions")
# Store each session as a fetched trace
for session in sessions:
session_id = session['id']
# Check if session already exists
existing_session = db.query(FetchedTrace).filter(
FetchedTrace.trace_id == session_id,
FetchedTrace.connection_id == connection.connection_id
).first()
if not existing_session:
try:
traces_response = downloader.client.api.trace.list(session_id=session_id)
if hasattr(traces_response, 'data'):
session_traces = [downloader._convert_to_dict(trace) for trace in traces_response.data]
else:
session_traces = [downloader._convert_to_dict(trace) for trace in traces_response]
# Get detailed trace data for each trace
detailed_traces = []
for i, trace_summary in enumerate(session_traces):
trace_id = trace_summary['id']
if i > 0:
time.sleep(1)
detailed_trace = downloader.client.api.trace.get(trace_id)
trace_data = downloader._convert_to_dict(detailed_trace)
detailed_traces.append(trace_data)
logger.info(f"Downloaded detailed trace: {trace_id} ({i+1}/{len(session_traces)})")
# Create session data - correct LangFuseSession format
session_data = LangFuseSession(
session_id=session_id,
session_name=session_id,
project_name=project_name,
export_timestamp=datetime.now().isoformat(),
total_traces=len(detailed_traces),
traces=detailed_traces
)
# Convert to JSON-serializable format
data_json = session_data.model_dump()
fetched_trace = create_fetched_trace(
trace_id=session_id,
name=session_id,
platform="langfuse",
connection_id=cast(str, connection.connection_id),
data=data_json,
project_name=project_name
)
db.add(fetched_trace)
logger.info(f"Stored session {session_id} with {len(detailed_traces)} traces")
except Exception as e:
logger.error(f"Error processing session {session_id}: {e}")
continue
db.commit()
logger.info(f"Fetched {len(sessions)} sessions from Langfuse")
return sessions
def fetch_langsmith_traces(connection: ObservabilityConnection, db: Session, project_name: str, limit: int = 50) -> List[Dict]:
"""Fetch traces from LangSmith"""
try:
client = LangsmithClient(api_key=cast(str, connection.public_key))
logger.info("Connected to LangSmith successfully")
# Get all projects
try:
projects = list(client.list_projects())
logger.info(f"Found {len(projects)} projects")
except Exception as e:
logger.error(f"Error listing projects: {e}")
raise HTTPException(status_code=500, detail="An internal error occurred while listing projects") from e
# Export runs from specific project only
all_traces = []
total_limit = limit
# Get existing trace IDs to avoid duplicates
existing_traces = db.query(FetchedTrace).filter(
FetchedTrace.connection_id == connection.connection_id,
FetchedTrace.platform == "langsmith",
FetchedTrace.project_name == project_name
).all()
existing_trace_ids = {cast(str, trace.trace_id) for trace in existing_traces}
logger.info(f"Exporting specific project: {project_name}")
# Get last fetched time for this specific project
project_from_timestamp = get_last_fetch_time(
db, cast(str, connection.connection_id), "langsmith", project_name
)
if project_from_timestamp:
logger.info(f"Fetching {project_name} runs from {project_from_timestamp} onwards")
else:
logger.info(f"No previous fetches found for {project_name}, fetching all runs")
# Get all runs (top-level runs only) - same as langsmith_exporter.py
list_runs_kwargs = {
"project_name": project_name,
"is_root": True,
"limit": limit
}
# Add start_time filter if we have a project-specific timestamp
if project_from_timestamp:
list_runs_kwargs["start_time"] = project_from_timestamp
runs = list(client.list_runs(**list_runs_kwargs))
logger.info(f"Found {len(runs)} runs in project {project_name}")
# Process runs in batch
new_traces_to_add = []
for run in runs:
run_name = getattr(run, 'name', 'unnamed')
run_id = str(run.id)
unique_trace_id = f"{run_name}_{run_id}"
# Skip if already exists
if unique_trace_id in existing_trace_ids:
logger.debug(f"Skipping existing trace: {unique_trace_id}")
continue
# Get all traces for this run (including nested children) - same as langsmith_exporter.py
all_runs: List[LangSmithRun] = []
try:
# Get the root run and all its children
trace_runs = client.list_runs(project_name=project_name, trace_id=run.trace_id)
run_list = list(trace_runs)
# Sort traces by start_time descending (latest first)
sorted_runs = sorted(run_list, key=lambda t: getattr(t, 'start_time', None) or datetime.min)
for run_item in sorted_runs:
run_data = run_item.dict() if hasattr(run_item, 'dict') else dict(run_item)
all_runs.append(LangSmithRun(**run_data))
except Exception as e:
logger.warning(f"Could not get child traces for run {run_id}: {e}")
# Fallback to just the main run
run_data = run.dict() if hasattr(run, 'dict') else dict(run)
all_runs = [LangSmithRun(**run_data)]
# Create run export structure - same format as langsmith_exporter.py
run_export = LangSmithTrace(
trace_id=run_id,
trace_name=run_name,
project_name=project_name,
export_time=datetime.now().isoformat(),
total_runs=len(all_runs),
runs=all_runs
)
# Prepare for batch database insert
try:
clean_data = run_export.model_dump()
fetched_trace = create_fetched_trace(
trace_id=unique_trace_id,
name=f"{run_name}_{run_id[:8]}",
platform="langsmith",
connection_id=cast(str, connection.connection_id),
data=clean_data,
project_name=project_name
)
new_traces_to_add.append(fetched_trace)
all_traces.append(clean_data)
existing_trace_ids.add(unique_trace_id)
except Exception as e:
logger.error(f"Error preparing trace {unique_trace_id}: {e}")
continue
# Stop if we've reached the limit
if len(all_traces) >= total_limit:
break
# Batch insert new traces
if new_traces_to_add:
db.add_all(new_traces_to_add)
logger.info(f"Added {len(new_traces_to_add)} new traces from project {project_name}")
# Single commit for all operations
db.commit()
logger.info(f"Fetched {len(all_traces)} traces from LangSmith")
return all_traces
except Exception as e:
logger.error(f"Error fetching LangSmith traces: {str(e)}")
raise HTTPException(status_code=500, detail="An internal error occurred while fetching traces") from e
# Request/Response Models
class ConnectionRequest(BaseModel):
platform: str
publicKey: str
secretKey: str
host: Optional[str] = None
class ConnectionResponse(BaseModel):
status: str
message: str
connection_id: str
class TraceFetchRequest(BaseModel):
limit: int = 50
start_date: Optional[str] = None
end_date: Optional[str] = None
project_name: str
class PreprocessingOptions(BaseModel):
"""Preprocessing options for trace filtering"""
max_char: Optional[int] = 1000
topk: int = 10
raw: bool = False
hierarchy: bool = False
replace: bool = False
truncate_enabled: bool = False
class TraceImportRequest(BaseModel):
trace_ids: List[str]
preprocessing: Optional[PreprocessingOptions] = PreprocessingOptions()
@router.post("/connect", response_model=ConnectionResponse)
async def connect_platform(request: ConnectionRequest, db: Session = Depends(get_db)): # noqa: B008
"""Connect to an AI observability platform"""
try:
platform = request.platform.lower()
public_key = request.publicKey
secret_key = request.secretKey
# Get projects and test connection
projects_info = get_connection_projects(platform, public_key, secret_key, request.host)
# Store connection info in database
connection_id = str(uuid.uuid4())
db_connection = ObservabilityConnection(
connection_id=connection_id,
platform=platform,
public_key=public_key,
secret_key=secret_key,
host=request.host,
projects=projects_info,
status="connected"
)
db.add(db_connection)
db.commit()
db.refresh(db_connection)
logger.info(f"Successfully connected to {platform} with connection ID: {connection_id}")
return ConnectionResponse(
status="success",
message=f"Successfully connected to {platform.title()}",
connection_id=connection_id
)
except HTTPException:
raise
except Exception as e:
logger.error(f"Unexpected error connecting to platform: {str(e)}")
raise HTTPException(status_code=500, detail="Internal server error") from e
@router.get("/connections")
async def get_connections(db: Session = Depends(get_db)): # noqa: B008
"""Get all active platform connections"""
connections = db.query(ObservabilityConnection).all()
return {"connections": [conn.to_dict() for conn in connections]}
@router.put("/connections/{connection_id}")
async def update_connection(
connection_id: str,
request: ConnectionRequest,
db: Session = Depends(get_db) # noqa: B008
):
"""Update an existing platform connection"""
try:
# Find existing connection
connection = db.query(ObservabilityConnection).filter(
ObservabilityConnection.connection_id == connection_id
).first()
if not connection:
raise HTTPException(status_code=404, detail="Connection not found")
platform = request.platform.lower()
public_key = request.publicKey
secret_key = request.secretKey
# Test connection and get projects
projects_info = get_connection_projects(platform, public_key, secret_key, request.host)
# Update connection in database
connection.public_key = cast(Column[str], public_key)
connection.secret_key = cast(Column[str], secret_key)
connection.host = cast(Column[str], request.host)
connection.projects = cast(Column[List[Dict]], projects_info)
connection.status = cast(Column[str], "connected")
db.commit()
db.refresh(connection)
logger.info(f"Successfully updated {platform} connection: {connection_id}")
return {
"status": "success",
"message": f"Successfully updated {platform.title()} connection"
}
except HTTPException:
raise
except Exception as e:
logger.error(f"Unexpected error updating connection: {str(e)}")
raise HTTPException(status_code=500, detail="Internal server error") from e
@router.delete("/connections/{connection_id}")
async def disconnect_platform(connection_id: str, db: Session = Depends(get_db)): # noqa: B008
"""Disconnect from a platform"""
connection = db.query(ObservabilityConnection).filter(
ObservabilityConnection.connection_id == connection_id
).first()
if not connection:
raise HTTPException(status_code=404, detail="Connection not found")
platform = connection.platform
# Delete all fetched traces for this connection
fetched_traces = db.query(FetchedTrace).filter(
FetchedTrace.connection_id == connection_id
).all()
deleted_traces_count = len(fetched_traces)
# Delete fetched traces
for fetched_trace in fetched_traces:
db.delete(fetched_trace)
# Remove connection from database
db.delete(connection)
db.commit()
logger.info(f"Disconnected from {platform} (connection ID: {connection_id})")
logger.info(f"Deleted {deleted_traces_count} fetched traces for connection {connection_id}")
return {
"status": "success",
"message": f"Disconnected from {platform.title()}",
"deleted_fetched_traces": deleted_traces_count,
"disconnected_at": datetime.now().isoformat()
}
# Connection-specific routes (required by frontend)
@router.get("/connections/{connection_id}/fetched-traces")
async def get_fetched_traces_by_connection(connection_id: str, db: Session = Depends(get_db)): # noqa: B008
"""Get all fetched traces for a specific connection"""
# Get connection
connection = db.query(ObservabilityConnection).filter(
ObservabilityConnection.connection_id == connection_id,
ObservabilityConnection.status == "connected"
).first()
if not connection:
raise HTTPException(status_code=404, detail=f"No active connection found with ID {connection_id}")
# Get all fetched traces for this connection
fetched_traces = db.query(FetchedTrace).filter(
FetchedTrace.connection_id == connection_id
).order_by(FetchedTrace.fetched_at.desc()).all()
return {
"traces": [trace.to_dict() for trace in fetched_traces],
"total": len(fetched_traces),
"platform": connection.platform
}
@router.post("/connections/{connection_id}/fetch")
async def fetch_traces_by_connection(
connection_id: str,
request: TraceFetchRequest,
db: Session = Depends(get_db) # noqa: B008
):
"""Fetch traces from a specific connection"""
# Get connection
connection = db.query(ObservabilityConnection).filter(
ObservabilityConnection.connection_id == connection_id,
ObservabilityConnection.status == "connected"
).first()
if not connection:
raise HTTPException(status_code=404, detail=f"No active connection found with ID {connection_id}")
try:
import asyncio
# Run blocking operations in executor to avoid blocking the event loop
loop = asyncio.get_event_loop()
def sync_fetch():
# Create new db session for the thread
from backend.database import get_db
thread_db = next(get_db())
try:
project_name = request.project_name
if cast(str, connection.platform) == "langfuse":
traces = fetch_langfuse_sessions(connection, thread_db, project_name, request.limit)
elif cast(str, connection.platform) == "langsmith":
traces = fetch_langsmith_traces(connection, thread_db, project_name, request.limit)
else:
raise HTTPException(status_code=400, detail=f"Unsupported platform: {connection.platform}")
# Update last sync time
connection.last_sync = cast(Column[datetime], datetime.now())
thread_db.commit()
return traces
finally:
thread_db.close()
# Execute in thread pool to avoid blocking
traces = await loop.run_in_executor(None, sync_fetch)
return {
"status": "success",
"message": f"Successfully fetched {len(traces)} traces from {connection.platform}",
"platform": connection.platform,
"traces_count": len(traces),
"completed_at": datetime.now().isoformat()
}
except Exception as e:
logger.error(f"Failed to fetch traces from connection {connection_id}: {str(e)}")
raise HTTPException(status_code=500, detail="An internal error occurred while fetching traces") from e
@router.post("/connections/{connection_id}/import")
async def import_traces_by_connection(
connection_id: str,
request: TraceImportRequest,
db: Session = Depends(get_db) # noqa: B008
):
"""Import specific traces from a connection to local database"""
# Get connection
connection = db.query(ObservabilityConnection).filter(
ObservabilityConnection.connection_id == connection_id,
ObservabilityConnection.status == "connected"
).first()
if not connection:
raise HTTPException(status_code=404, detail=f"No active connection found with ID {connection_id}")
try:
imported_count = 0
errors = []
for trace_id in request.trace_ids:
try:
# Get trace from fetched_traces table
trace = db.query(FetchedTrace).filter(
FetchedTrace.trace_id == trace_id,
FetchedTrace.connection_id == connection_id
).first()
if not trace:
errors.append(f"Trace {trace_id} not found in fetched traces for connection {connection_id}")
continue
# Process based on platform
preprocessing_opts = request.preprocessing or PreprocessingOptions()
if cast(str, connection.platform) == "langfuse":
trace_data = trace.get_full_data()["data"]
filtered_trace = filter_langfuse_session(
LangFuseSession(**trace_data),
max_char=preprocessing_opts.max_char,
topk=preprocessing_opts.topk,
raw=preprocessing_opts.raw,
hierarchy=preprocessing_opts.hierarchy,
replace=preprocessing_opts.replace
)
processed_trace = save_trace(
session=db,
content=json.dumps(filtered_trace, indent=2, default=str),
filename=f"langfuse_trace_{trace_id}",
title=f"Langfuse trace {trace_id}",
description=f"Imported from Langfuse on {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}",
trace_type="langfuse",
trace_source="langfuse",
tags=["imported", "langfuse"]
)
if processed_trace:
imported_count += 1
logger.info(f"Successfully imported Langfuse trace {trace_id} as {processed_trace.trace_id}")
# Run trace characteristics analysis to generate statistics
try:
from agentgraph.input.trace_management import analyze_trace_characteristics
raw_content_for_analysis = json.dumps(filtered_trace, indent=2, default=str)
trace_analysis = analyze_trace_characteristics(raw_content_for_analysis, optimize_content=False)
# Update trace metadata with analysis results
if processed_trace.trace_metadata:
processed_trace.trace_metadata.update(trace_analysis)
else:
processed_trace.trace_metadata = trace_analysis
flag_modified(processed_trace, "trace_metadata")
db.commit()
logger.info(f"Added trace characteristics analysis to imported trace {processed_trace.trace_id}")
except Exception as e:
logger.warning(f"Failed to analyze trace characteristics for imported trace {processed_trace.trace_id}: {str(e)}")
# Auto-generate context documents using universal parser
try:
from backend.services.universal_parser_service import auto_generate_context_documents
raw_content = json.dumps(filtered_trace, indent=2, default=str)
created_docs = auto_generate_context_documents(cast(str, processed_trace.trace_id), raw_content, db)
if created_docs:
logger.info(f"Auto-generated {len(created_docs)} context documents for processed trace {processed_trace.trace_id}")
except Exception as e:
logger.warning(f"Failed to auto-generate context documents for processed trace {processed_trace.trace_id}: {str(e)}")
elif cast(str, connection.platform) == "langsmith":
langsmith_export = trace.get_full_data()["data"]
filtered_export = filter_langsmith_trace(
LangSmithTrace(**langsmith_export),
max_char=preprocessing_opts.max_char,
topk=preprocessing_opts.topk,
raw=preprocessing_opts.raw,
hierarchy=preprocessing_opts.hierarchy,
replace=preprocessing_opts.replace
)
processed_trace = save_trace(
session=db,
content=json.dumps(filtered_export, indent=2, default=str),
filename=f"langsmith_trace_{trace_id}",
title=f"LangSmith trace {trace_id}",
description=f"Imported from LangSmith on {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}",
trace_type="langsmith",
trace_source="langsmith",
tags=["imported", "langsmith"]
)
if processed_trace:
imported_count += 1
logger.info(f"Successfully imported LangSmith trace {trace_id} as {processed_trace.trace_id}")
# Run trace characteristics analysis to generate statistics
try:
from agentgraph.input.trace_management import analyze_trace_characteristics
# Use the original langsmith_export for better analysis results
raw_content_for_analysis = json.dumps(langsmith_export, indent=2, default=str)
trace_analysis = analyze_trace_characteristics(raw_content_for_analysis, optimize_content=False)
# Update trace metadata with analysis results
if processed_trace.trace_metadata:
processed_trace.trace_metadata.update(trace_analysis)
else:
processed_trace.trace_metadata = trace_analysis
flag_modified(processed_trace, "trace_metadata")
db.commit()
logger.info(f"Added trace characteristics analysis to imported trace {processed_trace.trace_id}")
except Exception as e:
logger.warning(f"Failed to analyze trace characteristics for imported trace {processed_trace.trace_id}: {str(e)}")
# Auto-generate context documents using universal parser (use raw content, not processed)
try:
from backend.services.universal_parser_service import auto_generate_context_documents
# Use the original langsmith_export for better parsing results
raw_content = json.dumps(langsmith_export, indent=2, default=str)
created_docs = auto_generate_context_documents(cast(str, processed_trace.trace_id), raw_content, db)
if created_docs:
logger.info(f"Auto-generated {len(created_docs)} context documents for processed trace {processed_trace.trace_id}")
except Exception as e:
logger.warning(f"Failed to auto-generate context documents for processed trace {processed_trace.trace_id}: {str(e)}")
except Exception as e:
error_msg = f"Failed to import trace {trace_id}: {str(e)}"
errors.append(error_msg)
logger.error(error_msg)
# Update last sync time
connection.last_sync = cast(Column[datetime], datetime.now())
db.commit()
return {
"imported": imported_count,
"errors": errors,
"platform": connection.platform,
"import_completed_at": datetime.now().isoformat()
}
except Exception as e:
logger.error(f"Failed to import traces from connection {connection_id}: {str(e)}")
raise HTTPException(status_code=500, detail="An internal error occurred while importing traces") from e
@router.get("/traces/{trace_id}/download")
async def download_trace_by_id(trace_id: str, db: Session = Depends(get_db)): # noqa: B008
"""Download full trace data by trace ID (platform-agnostic)"""
trace = db.query(FetchedTrace).filter(
FetchedTrace.trace_id == trace_id
).first()
if not trace:
raise HTTPException(status_code=404, detail="Trace not found")
return trace.get_full_data()
@router.get("/resource-usage")
async def get_resource_usage():
"""Get resource usage information for the current process."""
try:
cpu_usage = psutil.cpu_percent()
memory_usage = psutil.virtual_memory().percent
return {"cpu_usage": cpu_usage, "memory_usage": memory_usage}
except Exception as e:
logger.error(f"Error retrieving resource usage: {str(e)}")
raise HTTPException(status_code=500, detail="An internal error occurred while retrieving resource usage") from e
@router.post("/clean-up")
async def clean_up(session: Session = Depends(get_db)): # noqa: B008
"""Clean up resources by closing database connections and freeing up memory."""
try:
session.close()
gc.collect()
return {"success": True, "message": "Resources cleaned up successfully"}
except Exception as e:
logger.error(f"Error cleaning up resources: {str(e)}")
raise HTTPException(status_code=500, detail="An internal error occurred while cleaning up resources") from e
@router.get("/environment")
async def get_environment():
"""Get environment information and authentication status."""
env_info = get_environment_info()
return {
"environment": env_info,
"timestamp": datetime.now().isoformat()
}
@router.get("/usage-summary")
async def get_usage_summary(request: Request):
"""
Get a summary of recent API usage for monitoring purposes.
This helps track OpenAI API costs and detect potential abuse.
"""
# Only authenticated users can see usage data
user = getattr(request.state, "user", None)
if not user:
raise HTTPException(status_code=401, detail="Authentication required")
# In a production system, you'd query a database or log aggregation service
# For now, we'll return a summary based on recent log entries
return {
"message": "Usage tracking is active",
"tracking_enabled": True,
"openai_endpoints_monitored": [
"/api/knowledge-graphs/extract",
"/api/knowledge-graphs/analyze",
"/api/methods/",
"/api/traces/analyze",
"/api/causal/",
],
"current_user": {
"username": user.get("username", "unknown"),
"auth_method": user.get("auth_method", "unknown"),
},
"note": "Detailed usage logs are available in the application logs for administrator review",
"timestamp": datetime.now().isoformat()
}
@router.get("/health-check")
async def health_check():
"""Comprehensive health check for the system."""
try:
cpu_usage = psutil.cpu_percent()
memory_usage = psutil.virtual_memory().percent
total_tasks = len(task_store.tasks)
pending_tasks = len([t for t in task_store.tasks.values() if t.get("status") == "PENDING"])
processing_tasks = len([t for t in task_store.tasks.values() if t.get("status") == "PROCESSING"])
completed_tasks = len([t for t in task_store.tasks.values() if t.get("status") == "COMPLETED"])
failed_tasks = len([t for t in task_store.tasks.values() if t.get("status") == "FAILED"])
stuck_tasks = []
current_time = datetime.now()
for task_id, task in task_store.tasks.items():
if task.get("status") == "PROCESSING":
updated_at_str = task.get("update_timestamp") or task.get("creation_timestamp")
if updated_at_str:
updated_at = datetime.fromisoformat(updated_at_str.replace("Z", "+00:00"))
if updated_at.tzinfo is None:
updated_at = updated_at.astimezone()
time_diff = (current_time.astimezone() - updated_at).total_seconds()
if time_diff > 3600:
stuck_tasks.append({"task_id": task_id, "stuck_duration": time_diff})
health_status = "healthy"
issues = []
if cpu_usage > 90:
health_status = "warning"
issues.append(f"High CPU usage: {cpu_usage}%")
if memory_usage > 90:
health_status = "critical"
issues.append(f"High memory usage: {memory_usage}%")
if stuck_tasks:
health_status = "warning"
issues.append(f"{len(stuck_tasks)} tasks appear stuck")
tasks = {
"total": total_tasks,
"pending": pending_tasks,
"processing": processing_tasks,
"completed": completed_tasks,
"failed": failed_tasks,
"stuck": stuck_tasks
}
return {
"status": health_status,
"issues": issues,
"resources": {"cpu_usage": cpu_usage, "memory_usage": memory_usage},
"tasks": tasks,
"timestamp": current_time.isoformat()
}
except Exception as e:
logger.error(f"Error in health check: {str(e)}")
return JSONResponse(status_code=500, content={"status": "error", "error": str(e)})
@router.post("/cleanup-stuck-tasks")
async def cleanup_stuck_tasks():
"""Clean up tasks that have been stuck in processing state for more than 1 hour."""
try:
current_time = datetime.now()
cleaned_tasks = []
for task_id, task in list(task_store.tasks.items()): # Iterate over a copy
if task.get("status") == "PROCESSING":
updated_at_str = task.get("update_timestamp") or task.get("creation_timestamp")
if updated_at_str:
updated_at = datetime.fromisoformat(updated_at_str.replace("Z", "+00:00"))
if updated_at.tzinfo is None:
updated_at = updated_at.astimezone()
time_diff = (current_time.astimezone() - updated_at).total_seconds()
if time_diff > 3600:
task_store.update_task(task_id, status="FAILED", error="Task timed out and was cleaned up.")
cleaned_tasks.append(task_id)
gc.collect()
return {"success": True, "cleaned_tasks": cleaned_tasks}
except Exception as e:
logger.error(f"Error cleaning up stuck tasks: {str(e)}")
raise HTTPException(status_code=500, detail="An internal error occurred while cleaning up stuck tasks") from e