chrisjcc's picture
Evaluation Pipeline and Langfuse Monitoring (#7)
35042ea
Raw
History Blame Contribute Delete
2.1 kB
import os
import base64
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter
from opentelemetry.sdk.resources import Resource
def setup_telemetry():
"""
Configures OpenTelemetry to export traces to Langfuse.
Dependencies: langfuse, opentelemetry-sdk, opentelemetry-exporter-otlp
"""
# helper to check if vars exist
secret_key = os.environ.get("LANGFUSE_SECRET_KEY")
public_key = os.environ.get("LANGFUSE_PUBLIC_KEY")
base_url = os.environ.get("LANGFUSE_BASE_URL", "https://cloud.langfuse.com")
if not (secret_key and public_key):
print("⚠ Langfuse credentials not found. Telemetry disabled.")
return
print(f"Initializing Langfuse Telemetry at {base_url}...")
# Auth Header for Langfuse (Basic Auth)
auth_str = f"{public_key}:{secret_key}"
auth_bytes = auth_str.encode("ascii")
base64_auth = base64.b64encode(auth_bytes).decode("ascii")
# Configure OTLP HTTP Exporter
# Langfuse OTLP HTTP endpoint: /api/public/otel/v1/traces
# Construct endpoint
# Remove trailing slash from base
clean_base_url = base_url.rstrip("/")
# Note: OTLPSpanExporter (HTTP) does NOT automatically append /v1/traces in all versions
# or if we provide a full URL. Best to be explicit for Langfuse.
endpoint = f"{clean_base_url}/api/public/otel/v1/traces"
exporter = OTLPSpanExporter(
endpoint=endpoint,
headers={"Authorization": f"Basic {base64_auth}"}
)
# Resource
resource = Resource.create(attributes={
"service.name": "fraud_model_explainability_assistant",
"service.version": "1.0.0"
})
# Provider
provider = TracerProvider(resource=resource)
processor = BatchSpanProcessor(exporter)
provider.add_span_processor(processor)
# Set global provider
trace.set_tracer_provider(provider)
print("✅ Langfuse Telemetry configured.")