Spaces:
Running
Running
File size: 43,528 Bytes
c2ea5ed c8243d5 c2ea5ed 3c3bac2 c2ea5ed bcbd2ec c2ea5ed bcbd2ec c2ea5ed bcbd2ec c2ea5ed bcbd2ec c2ea5ed bcbd2ec c2ea5ed bcbd2ec c2ea5ed bcbd2ec c2ea5ed bcbd2ec c2ea5ed c8243d5 7da14b7 c2ea5ed bcbd2ec |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 856 857 858 859 860 861 862 863 864 865 866 867 868 869 870 871 872 873 874 875 876 877 878 879 880 881 882 883 884 885 886 887 888 889 890 891 892 893 894 895 896 897 898 899 900 901 902 903 904 905 906 907 908 909 910 911 912 913 914 915 916 917 918 919 920 921 922 923 924 925 926 927 928 929 930 931 932 933 934 935 936 937 938 939 940 941 942 943 944 945 946 947 948 949 950 951 952 953 954 955 956 957 958 959 960 961 962 963 964 965 966 967 968 969 970 971 972 973 974 975 976 977 978 979 980 981 982 983 984 985 986 987 988 989 990 991 992 993 994 995 996 997 998 |
"""
AI Observability Platform Integration Router
Handles connections to external AI observability platforms like Langfuse and LangSmith.
Provides endpoints for:
- Platform connection management
- Trace fetching and importing
- Automated synchronization
"""
import base64
import gc
import json
import logging
import time
import uuid
from datetime import datetime
from typing import Dict, List, Optional, cast
import psutil
from utils.environment import get_environment_info, debug_environment
import requests
from fastapi import APIRouter, Depends, HTTPException, Request
from fastapi.responses import JSONResponse
from langsmith import Client as LangsmithClient
from pydantic import BaseModel
from sqlalchemy import Column
from sqlalchemy.orm import Session
from sqlalchemy.orm.attributes import flag_modified
from agentgraph.input.text_processing.trace_preprocessor import filter_langfuse_session, filter_langsmith_trace
from backend.database import get_db
from backend.database.models import FetchedTrace, ObservabilityConnection
from backend.database.utils import save_trace
from backend.routers.observe_models import LangFuseSession, LangSmithRun, LangSmithTrace
from backend.services.platform.langfuse_downloader import LangfuseDownloader
from backend.services.task_store_service import task_store
logger = logging.getLogger("agent_monitoring_server.routers.observability")
router = APIRouter(prefix="/api/observability", tags=["observability"])
def truncate_long_strings(obj, max_string_length=500):
"""
Recursively process JSON object to truncate very long strings
No depth limit - all keys and array items are preserved
Only truncates string values that are too long
"""
if isinstance(obj, dict):
truncated = {}
# Process ALL keys, no limit on key count or depth
for key, value in obj.items():
truncated[key] = truncate_long_strings(value, max_string_length)
return truncated
elif isinstance(obj, list):
truncated = []
# Process ALL items, no limit on item count or depth
for item in obj:
truncated.append(truncate_long_strings(item, max_string_length))
return truncated
elif isinstance(obj, str) and len(obj) > max_string_length:
return f"{obj[:max_string_length]}...({len(obj)} chars)"
return obj
# Helper Functions for Common Operations
def get_langfuse_projects(public_key: str, secret_key: str, host: Optional[str]) -> List[Dict]:
"""Fetch projects from Langfuse API"""
try:
# Create Basic Auth header
auth_string = f"{public_key}:{secret_key}"
auth_bytes = auth_string.encode('ascii')
auth_b64 = base64.b64encode(auth_bytes).decode('ascii')
headers = {
'Authorization': f'Basic {auth_b64}',
'Content-Type': 'application/json'
}
# Get projects from Langfuse API
host_url = host or "https://cloud.langfuse.com"
projects_url = f"{host_url}/api/public/projects"
response = requests.get(projects_url, headers=headers, timeout=10)
response.raise_for_status()
projects_data = response.json()
projects_info = []
# Extract project information
if 'data' in projects_data:
for project in projects_data['data']:
project_info = {
"id": project.get('id', ''),
"name": project.get('name', ''),
"description": project.get('description', ''),
"created_at": project.get('createdAt', None)
}
projects_info.append(project_info)
if not projects_info:
# Fallback to default project if no projects found
projects_info = [{"name": "Default", "id": "default", "description": "Langfuse workspace"}]
logger.info(f"Successfully fetched {len(projects_info)} Langfuse projects")
return projects_info
except Exception as e:
logger.warning(f"Failed to fetch Langfuse projects: {str(e)}, using default project")
return [{"name": "Default", "id": "default", "description": "Langfuse workspace"}]
def get_langsmith_projects(api_key: str) -> List[Dict]:
"""Fetch projects from LangSmith API"""
try:
client = LangsmithClient(api_key=api_key)
projects = list(client.list_projects())
logger.info(f"Successfully fetched {len(projects)} LangSmith projects")
# Extract project information
projects_info = []
for project in projects:
project_info = {
"id": str(project.id),
"name": project.name,
"description": getattr(project, 'description', ''),
"created_at": getattr(project, 'created_at', None)
}
projects_info.append(project_info)
return projects_info
except Exception as e:
logger.error(f"Failed to fetch LangSmith projects: {str(e)}")
raise
def test_langfuse_connection(public_key: str, secret_key: str, host: Optional[str]) -> bool:
"""Test Langfuse connection by fetching traces"""
try:
downloader = LangfuseDownloader(
secret_key=secret_key,
public_key=public_key,
host=host or "https://cloud.langfuse.com"
)
# Test connection by fetching a small number of traces
test_traces = downloader.download_recent_traces(limit=1)
logger.info(f"Successfully tested Langfuse connection, found {len(test_traces)} traces")
return True
except Exception as e:
logger.error(f"Failed to connect to Langfuse: {str(e)}")
raise HTTPException(status_code=400, detail="Failed to connect to Langfuse") from e
def test_langsmith_connection(api_key: str) -> bool:
"""Test LangSmith connection by listing projects"""
try:
client = LangsmithClient(api_key=api_key)
projects = list(client.list_projects())
logger.info(f"Successfully tested LangSmith connection, found {len(projects)} projects")
return True
except Exception as e:
logger.error(f"Failed to connect to LangSmith: {str(e)}")
raise HTTPException(status_code=400, detail="Failed to connect to LangSmith") from e
def get_connection_projects(platform: str, public_key: str, secret_key: str, host: Optional[str]) -> List[Dict]:
"""Get projects for a platform connection"""
platform = platform.lower()
if platform == "langfuse":
test_langfuse_connection(public_key, secret_key, host)
return get_langfuse_projects(public_key, secret_key, host)
elif platform == "langsmith":
if not public_key:
raise HTTPException(status_code=400, detail="LangSmith requires an API token")
test_langsmith_connection(public_key)
return get_langsmith_projects(public_key)
else:
raise HTTPException(status_code=400, detail=f"Unsupported platform: {platform}")
def get_last_fetch_time(db: Session, connection_id: str, platform: str, project_name: Optional[str] = None) -> Optional[datetime]:
"""Get last fetch time for a connection and optionally a specific project"""
query = db.query(FetchedTrace).filter(
FetchedTrace.connection_id == connection_id,
FetchedTrace.platform == platform
)
if project_name:
query = query.filter(FetchedTrace.project_name == project_name)
last_trace = query.order_by(FetchedTrace.fetched_at.desc()).first()
return cast(datetime, last_trace.fetched_at) if last_trace else None
def create_fetched_trace(trace_id: str, name: str, platform: str, connection_id: str,
data: Dict, project_name: Optional[str] = None) -> FetchedTrace:
"""Create a FetchedTrace object"""
return FetchedTrace(
trace_id=trace_id,
name=name,
platform=platform,
connection_id=connection_id,
project_name=project_name,
data=data
)
def fetch_langfuse_sessions(connection: ObservabilityConnection, db: Session, project_name: str, limit: int = 50) -> List[Dict]:
"""Fetch sessions from Langfuse"""
downloader = LangfuseDownloader(
secret_key=cast(str, connection.secret_key),
public_key=cast(str, connection.public_key),
host=cast(str, connection.host)
)
# Get last fetched time for this specific project
from_timestamp = get_last_fetch_time(db, cast(str, connection.connection_id), "langfuse", project_name)
if from_timestamp:
logger.info(f"Fetching sessions for project {project_name} from {from_timestamp} onwards")
else:
logger.info(f"No previous fetches found for project {project_name}, fetching all sessions")
# List sessions to get session IDs
if from_timestamp:
sessions_response = downloader.client.api.sessions.list(
limit=limit,
from_timestamp=from_timestamp
)
else:
sessions_response = downloader.client.api.sessions.list(limit=limit)
# Handle different response formats
if hasattr(sessions_response, 'data'):
sessions = [downloader._convert_to_dict(session) for session in sessions_response.data]
else:
sessions = [downloader._convert_to_dict(session) for session in sessions_response]
logger.info(f"Found {len(sessions)} sessions")
# Store each session as a fetched trace
for session in sessions:
session_id = session['id']
# Check if session already exists
existing_session = db.query(FetchedTrace).filter(
FetchedTrace.trace_id == session_id,
FetchedTrace.connection_id == connection.connection_id
).first()
if not existing_session:
try:
traces_response = downloader.client.api.trace.list(session_id=session_id)
if hasattr(traces_response, 'data'):
session_traces = [downloader._convert_to_dict(trace) for trace in traces_response.data]
else:
session_traces = [downloader._convert_to_dict(trace) for trace in traces_response]
# Get detailed trace data for each trace
detailed_traces = []
for i, trace_summary in enumerate(session_traces):
trace_id = trace_summary['id']
if i > 0:
time.sleep(1)
detailed_trace = downloader.client.api.trace.get(trace_id)
trace_data = downloader._convert_to_dict(detailed_trace)
detailed_traces.append(trace_data)
logger.info(f"Downloaded detailed trace: {trace_id} ({i+1}/{len(session_traces)})")
# Create session data - correct LangFuseSession format
session_data = LangFuseSession(
session_id=session_id,
session_name=session_id,
project_name=project_name,
export_timestamp=datetime.now().isoformat(),
total_traces=len(detailed_traces),
traces=detailed_traces
)
# Convert to JSON-serializable format
data_json = session_data.model_dump()
fetched_trace = create_fetched_trace(
trace_id=session_id,
name=session_id,
platform="langfuse",
connection_id=cast(str, connection.connection_id),
data=data_json,
project_name=project_name
)
db.add(fetched_trace)
logger.info(f"Stored session {session_id} with {len(detailed_traces)} traces")
except Exception as e:
logger.error(f"Error processing session {session_id}: {e}")
continue
db.commit()
logger.info(f"Fetched {len(sessions)} sessions from Langfuse")
return sessions
def fetch_langsmith_traces(connection: ObservabilityConnection, db: Session, project_name: str, limit: int = 50) -> List[Dict]:
"""Fetch traces from LangSmith"""
try:
client = LangsmithClient(api_key=cast(str, connection.public_key))
logger.info("Connected to LangSmith successfully")
# Get all projects
try:
projects = list(client.list_projects())
logger.info(f"Found {len(projects)} projects")
except Exception as e:
logger.error(f"Error listing projects: {e}")
raise HTTPException(status_code=500, detail="An internal error occurred while listing projects") from e
# Export runs from specific project only
all_traces = []
total_limit = limit
# Get existing trace IDs to avoid duplicates
existing_traces = db.query(FetchedTrace).filter(
FetchedTrace.connection_id == connection.connection_id,
FetchedTrace.platform == "langsmith",
FetchedTrace.project_name == project_name
).all()
existing_trace_ids = {cast(str, trace.trace_id) for trace in existing_traces}
logger.info(f"Exporting specific project: {project_name}")
# Get last fetched time for this specific project
project_from_timestamp = get_last_fetch_time(
db, cast(str, connection.connection_id), "langsmith", project_name
)
if project_from_timestamp:
logger.info(f"Fetching {project_name} runs from {project_from_timestamp} onwards")
else:
logger.info(f"No previous fetches found for {project_name}, fetching all runs")
# Get all runs (top-level runs only) - same as langsmith_exporter.py
list_runs_kwargs = {
"project_name": project_name,
"is_root": True,
"limit": limit
}
# Add start_time filter if we have a project-specific timestamp
if project_from_timestamp:
list_runs_kwargs["start_time"] = project_from_timestamp
runs = list(client.list_runs(**list_runs_kwargs))
logger.info(f"Found {len(runs)} runs in project {project_name}")
# Process runs in batch
new_traces_to_add = []
for run in runs:
run_name = getattr(run, 'name', 'unnamed')
run_id = str(run.id)
unique_trace_id = f"{run_name}_{run_id}"
# Skip if already exists
if unique_trace_id in existing_trace_ids:
logger.debug(f"Skipping existing trace: {unique_trace_id}")
continue
# Get all traces for this run (including nested children) - same as langsmith_exporter.py
all_runs: List[LangSmithRun] = []
try:
# Get the root run and all its children
trace_runs = client.list_runs(project_name=project_name, trace_id=run.trace_id)
run_list = list(trace_runs)
# Sort traces by start_time descending (latest first)
sorted_runs = sorted(run_list, key=lambda t: getattr(t, 'start_time', None) or datetime.min)
for run_item in sorted_runs:
run_data = run_item.dict() if hasattr(run_item, 'dict') else dict(run_item)
all_runs.append(LangSmithRun(**run_data))
except Exception as e:
logger.warning(f"Could not get child traces for run {run_id}: {e}")
# Fallback to just the main run
run_data = run.dict() if hasattr(run, 'dict') else dict(run)
all_runs = [LangSmithRun(**run_data)]
# Create run export structure - same format as langsmith_exporter.py
run_export = LangSmithTrace(
trace_id=run_id,
trace_name=run_name,
project_name=project_name,
export_time=datetime.now().isoformat(),
total_runs=len(all_runs),
runs=all_runs
)
# Prepare for batch database insert
try:
clean_data = run_export.model_dump()
fetched_trace = create_fetched_trace(
trace_id=unique_trace_id,
name=f"{run_name}_{run_id[:8]}",
platform="langsmith",
connection_id=cast(str, connection.connection_id),
data=clean_data,
project_name=project_name
)
new_traces_to_add.append(fetched_trace)
all_traces.append(clean_data)
existing_trace_ids.add(unique_trace_id)
except Exception as e:
logger.error(f"Error preparing trace {unique_trace_id}: {e}")
continue
# Stop if we've reached the limit
if len(all_traces) >= total_limit:
break
# Batch insert new traces
if new_traces_to_add:
db.add_all(new_traces_to_add)
logger.info(f"Added {len(new_traces_to_add)} new traces from project {project_name}")
# Single commit for all operations
db.commit()
logger.info(f"Fetched {len(all_traces)} traces from LangSmith")
return all_traces
except Exception as e:
logger.error(f"Error fetching LangSmith traces: {str(e)}")
raise HTTPException(status_code=500, detail="An internal error occurred while fetching traces") from e
# Request/Response Models
class ConnectionRequest(BaseModel):
platform: str
publicKey: str
secretKey: str
host: Optional[str] = None
class ConnectionResponse(BaseModel):
status: str
message: str
connection_id: str
class TraceFetchRequest(BaseModel):
limit: int = 50
start_date: Optional[str] = None
end_date: Optional[str] = None
project_name: str
class PreprocessingOptions(BaseModel):
"""Preprocessing options for trace filtering"""
max_char: Optional[int] = 1000
topk: int = 10
raw: bool = False
hierarchy: bool = False
replace: bool = False
truncate_enabled: bool = False
class TraceImportRequest(BaseModel):
trace_ids: List[str]
preprocessing: Optional[PreprocessingOptions] = PreprocessingOptions()
@router.post("/connect", response_model=ConnectionResponse)
async def connect_platform(request: ConnectionRequest, db: Session = Depends(get_db)): # noqa: B008
"""Connect to an AI observability platform"""
try:
platform = request.platform.lower()
public_key = request.publicKey
secret_key = request.secretKey
# Get projects and test connection
projects_info = get_connection_projects(platform, public_key, secret_key, request.host)
# Store connection info in database
connection_id = str(uuid.uuid4())
db_connection = ObservabilityConnection(
connection_id=connection_id,
platform=platform,
public_key=public_key,
secret_key=secret_key,
host=request.host,
projects=projects_info,
status="connected"
)
db.add(db_connection)
db.commit()
db.refresh(db_connection)
logger.info(f"Successfully connected to {platform} with connection ID: {connection_id}")
return ConnectionResponse(
status="success",
message=f"Successfully connected to {platform.title()}",
connection_id=connection_id
)
except HTTPException:
raise
except Exception as e:
logger.error(f"Unexpected error connecting to platform: {str(e)}")
raise HTTPException(status_code=500, detail="Internal server error") from e
@router.get("/connections")
async def get_connections(db: Session = Depends(get_db)): # noqa: B008
"""Get all active platform connections"""
connections = db.query(ObservabilityConnection).all()
return {"connections": [conn.to_dict() for conn in connections]}
@router.put("/connections/{connection_id}")
async def update_connection(
connection_id: str,
request: ConnectionRequest,
db: Session = Depends(get_db) # noqa: B008
):
"""Update an existing platform connection"""
try:
# Find existing connection
connection = db.query(ObservabilityConnection).filter(
ObservabilityConnection.connection_id == connection_id
).first()
if not connection:
raise HTTPException(status_code=404, detail="Connection not found")
platform = request.platform.lower()
public_key = request.publicKey
secret_key = request.secretKey
# Test connection and get projects
projects_info = get_connection_projects(platform, public_key, secret_key, request.host)
# Update connection in database
connection.public_key = cast(Column[str], public_key)
connection.secret_key = cast(Column[str], secret_key)
connection.host = cast(Column[str], request.host)
connection.projects = cast(Column[List[Dict]], projects_info)
connection.status = cast(Column[str], "connected")
db.commit()
db.refresh(connection)
logger.info(f"Successfully updated {platform} connection: {connection_id}")
return {
"status": "success",
"message": f"Successfully updated {platform.title()} connection"
}
except HTTPException:
raise
except Exception as e:
logger.error(f"Unexpected error updating connection: {str(e)}")
raise HTTPException(status_code=500, detail="Internal server error") from e
@router.delete("/connections/{connection_id}")
async def disconnect_platform(connection_id: str, db: Session = Depends(get_db)): # noqa: B008
"""Disconnect from a platform"""
connection = db.query(ObservabilityConnection).filter(
ObservabilityConnection.connection_id == connection_id
).first()
if not connection:
raise HTTPException(status_code=404, detail="Connection not found")
platform = connection.platform
# Delete all fetched traces for this connection
fetched_traces = db.query(FetchedTrace).filter(
FetchedTrace.connection_id == connection_id
).all()
deleted_traces_count = len(fetched_traces)
# Delete fetched traces
for fetched_trace in fetched_traces:
db.delete(fetched_trace)
# Remove connection from database
db.delete(connection)
db.commit()
logger.info(f"Disconnected from {platform} (connection ID: {connection_id})")
logger.info(f"Deleted {deleted_traces_count} fetched traces for connection {connection_id}")
return {
"status": "success",
"message": f"Disconnected from {platform.title()}",
"deleted_fetched_traces": deleted_traces_count,
"disconnected_at": datetime.now().isoformat()
}
# Connection-specific routes (required by frontend)
@router.get("/connections/{connection_id}/fetched-traces")
async def get_fetched_traces_by_connection(connection_id: str, db: Session = Depends(get_db)): # noqa: B008
"""Get all fetched traces for a specific connection"""
# Get connection
connection = db.query(ObservabilityConnection).filter(
ObservabilityConnection.connection_id == connection_id,
ObservabilityConnection.status == "connected"
).first()
if not connection:
raise HTTPException(status_code=404, detail=f"No active connection found with ID {connection_id}")
# Get all fetched traces for this connection
fetched_traces = db.query(FetchedTrace).filter(
FetchedTrace.connection_id == connection_id
).order_by(FetchedTrace.fetched_at.desc()).all()
return {
"traces": [trace.to_dict() for trace in fetched_traces],
"total": len(fetched_traces),
"platform": connection.platform
}
@router.post("/connections/{connection_id}/fetch")
async def fetch_traces_by_connection(
connection_id: str,
request: TraceFetchRequest,
db: Session = Depends(get_db) # noqa: B008
):
"""Fetch traces from a specific connection"""
# Get connection
connection = db.query(ObservabilityConnection).filter(
ObservabilityConnection.connection_id == connection_id,
ObservabilityConnection.status == "connected"
).first()
if not connection:
raise HTTPException(status_code=404, detail=f"No active connection found with ID {connection_id}")
try:
import asyncio
# Run blocking operations in executor to avoid blocking the event loop
loop = asyncio.get_event_loop()
def sync_fetch():
# Create new db session for the thread
from backend.database import get_db
thread_db = next(get_db())
try:
project_name = request.project_name
if cast(str, connection.platform) == "langfuse":
traces = fetch_langfuse_sessions(connection, thread_db, project_name, request.limit)
elif cast(str, connection.platform) == "langsmith":
traces = fetch_langsmith_traces(connection, thread_db, project_name, request.limit)
else:
raise HTTPException(status_code=400, detail=f"Unsupported platform: {connection.platform}")
# Update last sync time
connection.last_sync = cast(Column[datetime], datetime.now())
thread_db.commit()
return traces
finally:
thread_db.close()
# Execute in thread pool to avoid blocking
traces = await loop.run_in_executor(None, sync_fetch)
return {
"status": "success",
"message": f"Successfully fetched {len(traces)} traces from {connection.platform}",
"platform": connection.platform,
"traces_count": len(traces),
"completed_at": datetime.now().isoformat()
}
except Exception as e:
logger.error(f"Failed to fetch traces from connection {connection_id}: {str(e)}")
raise HTTPException(status_code=500, detail="An internal error occurred while fetching traces") from e
@router.post("/connections/{connection_id}/import")
async def import_traces_by_connection(
connection_id: str,
request: TraceImportRequest,
db: Session = Depends(get_db) # noqa: B008
):
"""Import specific traces from a connection to local database"""
# Get connection
connection = db.query(ObservabilityConnection).filter(
ObservabilityConnection.connection_id == connection_id,
ObservabilityConnection.status == "connected"
).first()
if not connection:
raise HTTPException(status_code=404, detail=f"No active connection found with ID {connection_id}")
try:
imported_count = 0
errors = []
for trace_id in request.trace_ids:
try:
# Get trace from fetched_traces table
trace = db.query(FetchedTrace).filter(
FetchedTrace.trace_id == trace_id,
FetchedTrace.connection_id == connection_id
).first()
if not trace:
errors.append(f"Trace {trace_id} not found in fetched traces for connection {connection_id}")
continue
# Process based on platform
preprocessing_opts = request.preprocessing or PreprocessingOptions()
if cast(str, connection.platform) == "langfuse":
trace_data = trace.get_full_data()["data"]
filtered_trace = filter_langfuse_session(
LangFuseSession(**trace_data),
max_char=preprocessing_opts.max_char,
topk=preprocessing_opts.topk,
raw=preprocessing_opts.raw,
hierarchy=preprocessing_opts.hierarchy,
replace=preprocessing_opts.replace
)
processed_trace = save_trace(
session=db,
content=json.dumps(filtered_trace, indent=2, default=str),
filename=f"langfuse_trace_{trace_id}",
title=f"Langfuse trace {trace_id}",
description=f"Imported from Langfuse on {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}",
trace_type="langfuse",
trace_source="langfuse",
tags=["imported", "langfuse"]
)
if processed_trace:
imported_count += 1
logger.info(f"Successfully imported Langfuse trace {trace_id} as {processed_trace.trace_id}")
# Run trace characteristics analysis to generate statistics
try:
from agentgraph.input.trace_management import analyze_trace_characteristics
raw_content_for_analysis = json.dumps(filtered_trace, indent=2, default=str)
trace_analysis = analyze_trace_characteristics(raw_content_for_analysis, optimize_content=False)
# Update trace metadata with analysis results
if processed_trace.trace_metadata:
processed_trace.trace_metadata.update(trace_analysis)
else:
processed_trace.trace_metadata = trace_analysis
flag_modified(processed_trace, "trace_metadata")
db.commit()
logger.info(f"Added trace characteristics analysis to imported trace {processed_trace.trace_id}")
except Exception as e:
logger.warning(f"Failed to analyze trace characteristics for imported trace {processed_trace.trace_id}: {str(e)}")
# Auto-generate context documents using universal parser
try:
from backend.services.universal_parser_service import auto_generate_context_documents
raw_content = json.dumps(filtered_trace, indent=2, default=str)
created_docs = auto_generate_context_documents(cast(str, processed_trace.trace_id), raw_content, db)
if created_docs:
logger.info(f"Auto-generated {len(created_docs)} context documents for processed trace {processed_trace.trace_id}")
except Exception as e:
logger.warning(f"Failed to auto-generate context documents for processed trace {processed_trace.trace_id}: {str(e)}")
elif cast(str, connection.platform) == "langsmith":
langsmith_export = trace.get_full_data()["data"]
filtered_export = filter_langsmith_trace(
LangSmithTrace(**langsmith_export),
max_char=preprocessing_opts.max_char,
topk=preprocessing_opts.topk,
raw=preprocessing_opts.raw,
hierarchy=preprocessing_opts.hierarchy,
replace=preprocessing_opts.replace
)
processed_trace = save_trace(
session=db,
content=json.dumps(filtered_export, indent=2, default=str),
filename=f"langsmith_trace_{trace_id}",
title=f"LangSmith trace {trace_id}",
description=f"Imported from LangSmith on {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}",
trace_type="langsmith",
trace_source="langsmith",
tags=["imported", "langsmith"]
)
if processed_trace:
imported_count += 1
logger.info(f"Successfully imported LangSmith trace {trace_id} as {processed_trace.trace_id}")
# Run trace characteristics analysis to generate statistics
try:
from agentgraph.input.trace_management import analyze_trace_characteristics
# Use the original langsmith_export for better analysis results
raw_content_for_analysis = json.dumps(langsmith_export, indent=2, default=str)
trace_analysis = analyze_trace_characteristics(raw_content_for_analysis, optimize_content=False)
# Update trace metadata with analysis results
if processed_trace.trace_metadata:
processed_trace.trace_metadata.update(trace_analysis)
else:
processed_trace.trace_metadata = trace_analysis
flag_modified(processed_trace, "trace_metadata")
db.commit()
logger.info(f"Added trace characteristics analysis to imported trace {processed_trace.trace_id}")
except Exception as e:
logger.warning(f"Failed to analyze trace characteristics for imported trace {processed_trace.trace_id}: {str(e)}")
# Auto-generate context documents using universal parser (use raw content, not processed)
try:
from backend.services.universal_parser_service import auto_generate_context_documents
# Use the original langsmith_export for better parsing results
raw_content = json.dumps(langsmith_export, indent=2, default=str)
created_docs = auto_generate_context_documents(cast(str, processed_trace.trace_id), raw_content, db)
if created_docs:
logger.info(f"Auto-generated {len(created_docs)} context documents for processed trace {processed_trace.trace_id}")
except Exception as e:
logger.warning(f"Failed to auto-generate context documents for processed trace {processed_trace.trace_id}: {str(e)}")
except Exception as e:
error_msg = f"Failed to import trace {trace_id}: {str(e)}"
errors.append(error_msg)
logger.error(error_msg)
# Update last sync time
connection.last_sync = cast(Column[datetime], datetime.now())
db.commit()
return {
"imported": imported_count,
"errors": errors,
"platform": connection.platform,
"import_completed_at": datetime.now().isoformat()
}
except Exception as e:
logger.error(f"Failed to import traces from connection {connection_id}: {str(e)}")
raise HTTPException(status_code=500, detail="An internal error occurred while importing traces") from e
@router.get("/traces/{trace_id}/download")
async def download_trace_by_id(trace_id: str, db: Session = Depends(get_db)): # noqa: B008
"""Download full trace data by trace ID (platform-agnostic)"""
trace = db.query(FetchedTrace).filter(
FetchedTrace.trace_id == trace_id
).first()
if not trace:
raise HTTPException(status_code=404, detail="Trace not found")
return trace.get_full_data()
@router.get("/resource-usage")
async def get_resource_usage():
"""Get resource usage information for the current process."""
try:
cpu_usage = psutil.cpu_percent()
memory_usage = psutil.virtual_memory().percent
return {"cpu_usage": cpu_usage, "memory_usage": memory_usage}
except Exception as e:
logger.error(f"Error retrieving resource usage: {str(e)}")
raise HTTPException(status_code=500, detail="An internal error occurred while retrieving resource usage") from e
@router.post("/clean-up")
async def clean_up(session: Session = Depends(get_db)): # noqa: B008
"""Clean up resources by closing database connections and freeing up memory."""
try:
session.close()
gc.collect()
return {"success": True, "message": "Resources cleaned up successfully"}
except Exception as e:
logger.error(f"Error cleaning up resources: {str(e)}")
raise HTTPException(status_code=500, detail="An internal error occurred while cleaning up resources") from e
@router.get("/environment")
async def get_environment():
"""Get environment information and authentication status."""
env_info = get_environment_info()
return {
"environment": env_info,
"timestamp": datetime.now().isoformat()
}
@router.get("/usage-summary")
async def get_usage_summary(request: Request):
"""
Get a summary of recent API usage for monitoring purposes.
This helps track OpenAI API costs and detect potential abuse.
"""
# Only authenticated users can see usage data
user = getattr(request.state, "user", None)
if not user:
raise HTTPException(status_code=401, detail="Authentication required")
# In a production system, you'd query a database or log aggregation service
# For now, we'll return a summary based on recent log entries
return {
"message": "Usage tracking is active",
"tracking_enabled": True,
"openai_endpoints_monitored": [
"/api/knowledge-graphs/extract",
"/api/knowledge-graphs/analyze",
"/api/methods/",
"/api/traces/analyze",
"/api/causal/",
],
"current_user": {
"username": user.get("username", "unknown"),
"auth_method": user.get("auth_method", "unknown"),
},
"note": "Detailed usage logs are available in the application logs for administrator review",
"timestamp": datetime.now().isoformat()
}
@router.get("/health-check")
async def health_check():
"""Comprehensive health check for the system."""
try:
cpu_usage = psutil.cpu_percent()
memory_usage = psutil.virtual_memory().percent
total_tasks = len(task_store.tasks)
pending_tasks = len([t for t in task_store.tasks.values() if t.get("status") == "PENDING"])
processing_tasks = len([t for t in task_store.tasks.values() if t.get("status") == "PROCESSING"])
completed_tasks = len([t for t in task_store.tasks.values() if t.get("status") == "COMPLETED"])
failed_tasks = len([t for t in task_store.tasks.values() if t.get("status") == "FAILED"])
stuck_tasks = []
current_time = datetime.now()
for task_id, task in task_store.tasks.items():
if task.get("status") == "PROCESSING":
updated_at_str = task.get("update_timestamp") or task.get("creation_timestamp")
if updated_at_str:
updated_at = datetime.fromisoformat(updated_at_str.replace("Z", "+00:00"))
if updated_at.tzinfo is None:
updated_at = updated_at.astimezone()
time_diff = (current_time.astimezone() - updated_at).total_seconds()
if time_diff > 3600:
stuck_tasks.append({"task_id": task_id, "stuck_duration": time_diff})
health_status = "healthy"
issues = []
if cpu_usage > 90:
health_status = "warning"
issues.append(f"High CPU usage: {cpu_usage}%")
if memory_usage > 90:
health_status = "critical"
issues.append(f"High memory usage: {memory_usage}%")
if stuck_tasks:
health_status = "warning"
issues.append(f"{len(stuck_tasks)} tasks appear stuck")
tasks = {
"total": total_tasks,
"pending": pending_tasks,
"processing": processing_tasks,
"completed": completed_tasks,
"failed": failed_tasks,
"stuck": stuck_tasks
}
return {
"status": health_status,
"issues": issues,
"resources": {"cpu_usage": cpu_usage, "memory_usage": memory_usage},
"tasks": tasks,
"timestamp": current_time.isoformat()
}
except Exception as e:
logger.error(f"Error in health check: {str(e)}")
return JSONResponse(status_code=500, content={"status": "error", "error": str(e)})
@router.post("/cleanup-stuck-tasks")
async def cleanup_stuck_tasks():
"""Clean up tasks that have been stuck in processing state for more than 1 hour."""
try:
current_time = datetime.now()
cleaned_tasks = []
for task_id, task in list(task_store.tasks.items()): # Iterate over a copy
if task.get("status") == "PROCESSING":
updated_at_str = task.get("update_timestamp") or task.get("creation_timestamp")
if updated_at_str:
updated_at = datetime.fromisoformat(updated_at_str.replace("Z", "+00:00"))
if updated_at.tzinfo is None:
updated_at = updated_at.astimezone()
time_diff = (current_time.astimezone() - updated_at).total_seconds()
if time_diff > 3600:
task_store.update_task(task_id, status="FAILED", error="Task timed out and was cleaned up.")
cleaned_tasks.append(task_id)
gc.collect()
return {"success": True, "cleaned_tasks": cleaned_tasks}
except Exception as e:
logger.error(f"Error cleaning up stuck tasks: {str(e)}")
raise HTTPException(status_code=500, detail="An internal error occurred while cleaning up stuck tasks") from e
|