Spaces:
Running
Running
| """ | |
| Tracer utility for api_light_hf. | |
| Provides a @customtracer decorator that: | |
| - Logs function calls and results to stdout. | |
| - Optionally exports spans to Cloud Trace when GCP_SERVICE_KEY_FOR_TRACE is set. | |
| - Gracefully degrades to console-only logging when OpenTelemetry is unavailable. | |
| """ | |
| import functools | |
| import inspect | |
| import logging | |
| import os | |
| logger = logging.getLogger("tracer") | |
| if not logger.handlers: | |
| _h = logging.StreamHandler() | |
| _h.setFormatter(logging.Formatter("%(asctime)s [%(name)s] %(levelname)s: %(message)s")) | |
| logger.addHandler(_h) | |
| logger.setLevel(logging.INFO) | |
| # --------------------------------------------------------------------------- | |
| # Optional OpenTelemetry / Cloud Trace setup | |
| # --------------------------------------------------------------------------- | |
| _tracer = None | |
| _USE_CLOUD_TRACE = False | |
| try: | |
| from opentelemetry import trace, context | |
| from opentelemetry.sdk.trace import TracerProvider | |
| from opentelemetry.sdk.trace.export import BatchSpanProcessor, ConsoleSpanExporter | |
| from opentelemetry.trace import Status, StatusCode | |
| from opentelemetry.trace.propagation.tracecontext import TraceContextTextMapPropagator | |
| _gcp_key = os.getenv("GCP_SERVICE_KEY_FOR_TRACE") | |
| if _gcp_key: | |
| import tempfile | |
| from opentelemetry.exporter.cloud_trace import CloudTraceSpanExporter | |
| _keyfile = "/tmp/key_path_tracer_hf" | |
| with open(_keyfile, "w") as _f: | |
| _f.write(_gcp_key) | |
| os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = _keyfile | |
| _provider = TracerProvider() | |
| _provider.add_span_processor(BatchSpanProcessor(CloudTraceSpanExporter())) | |
| trace.set_tracer_provider(_provider) | |
| _USE_CLOUD_TRACE = True | |
| else: | |
| _provider = TracerProvider() | |
| trace.set_tracer_provider(_provider) | |
| _tracer = trace.get_tracer("api_light_hf") | |
| _OTEL_AVAILABLE = True | |
| except Exception: | |
| _OTEL_AVAILABLE = False | |
| # --------------------------------------------------------------------------- | |
| # Decorator | |
| # --------------------------------------------------------------------------- | |
| def customtracer(func): | |
| """Decorator that traces function invocations (console + optional Cloud Trace).""" | |
| def wrapper(*args, **kwargs): | |
| func_name = func.__name__ | |
| logger.info(f"[{func_name}] start") | |
| if not _OTEL_AVAILABLE or _tracer is None: | |
| # Plain logging fallback | |
| try: | |
| result = func(*args, **kwargs) | |
| logger.info(f"[{func_name}] done") | |
| return result | |
| except Exception as exc: | |
| logger.error(f"[{func_name}] error: {exc}") | |
| raise | |
| # Full OpenTelemetry path | |
| from opentelemetry import trace as _trace | |
| from opentelemetry.trace import Status, StatusCode | |
| from opentelemetry.trace.propagation.tracecontext import TraceContextTextMapPropagator | |
| # Extract traceparent from request header if present | |
| carrier = {} | |
| for arg in args: | |
| # Accept fastapi.Request or gradio.Request | |
| headers = getattr(getattr(arg, "headers", None), "items", None) | |
| if headers: | |
| carrier = dict(headers()) | |
| break | |
| ctx = TraceContextTextMapPropagator().extract(carrier=carrier) if carrier else None | |
| span_name = f"{os.getenv('SPACE_ID', 'api_light_hf')}.{func_name}" | |
| with _tracer.start_as_current_span(span_name, context=ctx) as span: | |
| try: | |
| span.set_attribute("func", func_name) | |
| span.set_attribute("HF_SPACE_ID", os.getenv("SPACE_ID", "")) | |
| sig = inspect.signature(func) | |
| param_names = list(sig.parameters.keys()) | |
| for idx, arg in enumerate(args): | |
| pname = param_names[idx] if idx < len(param_names) else str(idx) | |
| span.set_attribute(f"args[{idx}:{pname}]", str(arg)[:256]) | |
| for k, v in kwargs.items(): | |
| span.set_attribute(k, str(v)[:256]) | |
| result = func(*args, **kwargs) | |
| if isinstance(result, (list, tuple)): | |
| r_dict = {str(i): str(v)[:256] for i, v in enumerate(result)} | |
| elif isinstance(result, dict): | |
| r_dict = {k: str(v)[:256] for k, v in result.items()} | |
| else: | |
| r_dict = {"value": str(result)[:256]} | |
| span.add_event(f"result({type(result).__name__})", r_dict) | |
| span.set_status(Status(StatusCode.OK)) | |
| logger.info(f"[{func_name}] done") | |
| return result | |
| except Exception as exc: | |
| span.set_status(Status(StatusCode.ERROR)) | |
| span.record_exception(exc) | |
| logger.error(f"[{func_name}] error: {exc}") | |
| raise | |
| return wrapper | |
| # Async variant | |
| def asynctracer(func): | |
| """Async version of customtracer.""" | |
| async def wrapper(*args, **kwargs): | |
| func_name = func.__name__ | |
| logger.info(f"[{func_name}] start (async)") | |
| try: | |
| result = await func(*args, **kwargs) | |
| logger.info(f"[{func_name}] done (async)") | |
| return result | |
| except Exception as exc: | |
| logger.error(f"[{func_name}] error (async): {exc}") | |
| raise | |
| return wrapper | |