Spaces:
Sleeping
Sleeping
File size: 2,942 Bytes
ba5110e |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 |
"""
LangSmith tracing configuration for agent observability.
Provides full tracking of all agent and tool calls.
"""
import os
from typing import Optional
from functools import wraps
import asyncio
# LangSmith environment variables
LANGSMITH_API_KEY = os.getenv("LANGSMITH_API_KEY")
LANGSMITH_PROJECT = os.getenv("LANGSMITH_PROJECT", "algebra-chatbot")
LANGSMITH_TRACING = os.getenv("LANGSMITH_TRACING", "true").lower() == "true"
def setup_langsmith():
"""
Configure LangSmith tracing.
Call this at application startup.
"""
if not LANGSMITH_API_KEY:
print("⚠️ LANGSMITH_API_KEY not set - tracing disabled")
return False
# Set environment variables for LangChain tracing
os.environ["LANGCHAIN_TRACING_V2"] = "true" if LANGSMITH_TRACING else "false"
os.environ["LANGCHAIN_API_KEY"] = LANGSMITH_API_KEY
os.environ["LANGCHAIN_PROJECT"] = LANGSMITH_PROJECT
os.environ["LANGCHAIN_ENDPOINT"] = "https://api.smith.langchain.com"
print(f"✅ LangSmith tracing enabled for project: {LANGSMITH_PROJECT}")
return True
def get_langsmith_client():
"""Get LangSmith client for custom tracing if needed."""
if not LANGSMITH_API_KEY:
return None
try:
from langsmith import Client
return Client(api_key=LANGSMITH_API_KEY)
except ImportError:
print("⚠️ langsmith package not installed")
return None
def get_tracer_callbacks():
"""
Get LangSmith tracer callbacks for use with LangChain/LangGraph.
Returns empty list if LangSmith not configured.
"""
if not LANGSMITH_API_KEY or not LANGSMITH_TRACING:
return []
try:
from langchain_core.tracers import LangChainTracer
tracer = LangChainTracer(project_name=LANGSMITH_PROJECT)
return [tracer]
except Exception as e:
print(f"⚠️ Could not create LangSmith tracer: {e}")
return []
def create_run_config(session_id: str, user_id: Optional[str] = None):
"""
Create a run configuration dict with metadata for tracing.
Args:
session_id: Conversation session ID
user_id: Optional user identifier
Returns:
Dict with callbacks and metadata for agent invocation
"""
callbacks = get_tracer_callbacks()
config = {
"callbacks": callbacks,
"metadata": {
"session_id": session_id,
"user_id": user_id or "anonymous",
},
"tags": ["algebra-chatbot", f"session:{session_id}"],
}
# Add run name for easy identification in LangSmith
config["run_name"] = f"chat-{session_id[:8]}"
return config
def get_tracing_status() -> dict:
"""Get current LangSmith tracing status."""
return {
"enabled": LANGSMITH_TRACING and bool(LANGSMITH_API_KEY),
"project": LANGSMITH_PROJECT,
"api_key_set": bool(LANGSMITH_API_KEY),
}
|