from collections.abc import Callable from contextlib import contextmanager from typing import Any from langfuse import propagate_attributes from gaia_agent.config import settings def langfuse_enabled() -> bool: return bool(settings.langfuse_public_key and settings.langfuse_secret_key) @contextmanager def trace_agent_run( question: str, *, session_id: str | None = None, user_id: str | None = None, task_id: str | None = None, ): """Create a Langfuse trace when credentials are configured. This keeps local development and HF Space startup working before secrets are set. """ if not langfuse_enabled(): yield None return from langfuse import Langfuse client = Langfuse( public_key=settings.langfuse_public_key, secret_key=settings.langfuse_secret_key, host=settings.langfuse_host, ) with propagate_attributes( trace_name="gaia-agent-run", user_id=user_id, session_id=session_id, metadata={"task_id": task_id} if task_id else None, tags=["gaia", "final-assignment"], ): with client.start_as_current_observation( name="gaia-agent-run", as_type="agent", input={"question": question}, metadata={"component": "GaiaAgent", "task_id": task_id}, ) as observation: try: yield observation finally: client.flush() def traced_step(trace: Any, name: str, fn: Callable[[], dict[str, Any]]) -> dict[str, Any]: if trace is None: return fn() span = trace.start_observation(name=name, as_type="span") try: output = fn() span.update(output=output) span.end() return output except Exception as exc: span.update(level="ERROR", status_message=str(exc)) span.end() raise