""" Tracer utility for api_light_hf. Provides a @customtracer decorator that: - Logs function calls and results to stdout. - Optionally exports spans to Cloud Trace when GCP_SERVICE_KEY_FOR_TRACE is set. - Gracefully degrades to console-only logging when OpenTelemetry is unavailable. """ import functools import inspect import logging import os logger = logging.getLogger("tracer") if not logger.handlers: _h = logging.StreamHandler() _h.setFormatter(logging.Formatter("%(asctime)s [%(name)s] %(levelname)s: %(message)s")) logger.addHandler(_h) logger.setLevel(logging.INFO) # --------------------------------------------------------------------------- # Optional OpenTelemetry / Cloud Trace setup # --------------------------------------------------------------------------- _tracer = None _USE_CLOUD_TRACE = False try: from opentelemetry import trace, context from opentelemetry.sdk.trace import TracerProvider from opentelemetry.sdk.trace.export import BatchSpanProcessor, ConsoleSpanExporter from opentelemetry.trace import Status, StatusCode from opentelemetry.trace.propagation.tracecontext import TraceContextTextMapPropagator _gcp_key = os.getenv("GCP_SERVICE_KEY_FOR_TRACE") if _gcp_key: import tempfile from opentelemetry.exporter.cloud_trace import CloudTraceSpanExporter _keyfile = "/tmp/key_path_tracer_hf" with open(_keyfile, "w") as _f: _f.write(_gcp_key) os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = _keyfile _provider = TracerProvider() _provider.add_span_processor(BatchSpanProcessor(CloudTraceSpanExporter())) trace.set_tracer_provider(_provider) _USE_CLOUD_TRACE = True else: _provider = TracerProvider() trace.set_tracer_provider(_provider) _tracer = trace.get_tracer("api_light_hf") _OTEL_AVAILABLE = True except Exception: _OTEL_AVAILABLE = False # --------------------------------------------------------------------------- # Decorator # --------------------------------------------------------------------------- def customtracer(func): """Decorator that traces function invocations (console + optional Cloud Trace).""" @functools.wraps(func) def wrapper(*args, **kwargs): func_name = func.__name__ logger.info(f"[{func_name}] start") if not _OTEL_AVAILABLE or _tracer is None: # Plain logging fallback try: result = func(*args, **kwargs) logger.info(f"[{func_name}] done") return result except Exception as exc: logger.error(f"[{func_name}] error: {exc}") raise # Full OpenTelemetry path from opentelemetry import trace as _trace from opentelemetry.trace import Status, StatusCode from opentelemetry.trace.propagation.tracecontext import TraceContextTextMapPropagator # Extract traceparent from request header if present carrier = {} for arg in args: # Accept fastapi.Request or gradio.Request headers = getattr(getattr(arg, "headers", None), "items", None) if headers: carrier = dict(headers()) break ctx = TraceContextTextMapPropagator().extract(carrier=carrier) if carrier else None span_name = f"{os.getenv('SPACE_ID', 'api_light_hf')}.{func_name}" with _tracer.start_as_current_span(span_name, context=ctx) as span: try: span.set_attribute("func", func_name) span.set_attribute("HF_SPACE_ID", os.getenv("SPACE_ID", "")) sig = inspect.signature(func) param_names = list(sig.parameters.keys()) for idx, arg in enumerate(args): pname = param_names[idx] if idx < len(param_names) else str(idx) span.set_attribute(f"args[{idx}:{pname}]", str(arg)[:256]) for k, v in kwargs.items(): span.set_attribute(k, str(v)[:256]) result = func(*args, **kwargs) if isinstance(result, (list, tuple)): r_dict = {str(i): str(v)[:256] for i, v in enumerate(result)} elif isinstance(result, dict): r_dict = {k: str(v)[:256] for k, v in result.items()} else: r_dict = {"value": str(result)[:256]} span.add_event(f"result({type(result).__name__})", r_dict) span.set_status(Status(StatusCode.OK)) logger.info(f"[{func_name}] done") return result except Exception as exc: span.set_status(Status(StatusCode.ERROR)) span.record_exception(exc) logger.error(f"[{func_name}] error: {exc}") raise return wrapper # Async variant def asynctracer(func): """Async version of customtracer.""" @functools.wraps(func) async def wrapper(*args, **kwargs): func_name = func.__name__ logger.info(f"[{func_name}] start (async)") try: result = await func(*args, **kwargs) logger.info(f"[{func_name}] done (async)") return result except Exception as exc: logger.error(f"[{func_name}] error (async): {exc}") raise return wrapper