Spaces:
Running
Running
File size: 5,442 Bytes
cf7f643 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 | """
Tracer utility for api_light_hf.
Provides a @customtracer decorator that:
- Logs function calls and results to stdout.
- Optionally exports spans to Cloud Trace when GCP_SERVICE_KEY_FOR_TRACE is set.
- Gracefully degrades to console-only logging when OpenTelemetry is unavailable.
"""
import functools
import inspect
import logging
import os
logger = logging.getLogger("tracer")
if not logger.handlers:
_h = logging.StreamHandler()
_h.setFormatter(logging.Formatter("%(asctime)s [%(name)s] %(levelname)s: %(message)s"))
logger.addHandler(_h)
logger.setLevel(logging.INFO)
# ---------------------------------------------------------------------------
# Optional OpenTelemetry / Cloud Trace setup
# ---------------------------------------------------------------------------
_tracer = None
_USE_CLOUD_TRACE = False
try:
from opentelemetry import trace, context
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor, ConsoleSpanExporter
from opentelemetry.trace import Status, StatusCode
from opentelemetry.trace.propagation.tracecontext import TraceContextTextMapPropagator
_gcp_key = os.getenv("GCP_SERVICE_KEY_FOR_TRACE")
if _gcp_key:
import tempfile
from opentelemetry.exporter.cloud_trace import CloudTraceSpanExporter
_keyfile = "/tmp/key_path_tracer_hf"
with open(_keyfile, "w") as _f:
_f.write(_gcp_key)
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = _keyfile
_provider = TracerProvider()
_provider.add_span_processor(BatchSpanProcessor(CloudTraceSpanExporter()))
trace.set_tracer_provider(_provider)
_USE_CLOUD_TRACE = True
else:
_provider = TracerProvider()
trace.set_tracer_provider(_provider)
_tracer = trace.get_tracer("api_light_hf")
_OTEL_AVAILABLE = True
except Exception:
_OTEL_AVAILABLE = False
# ---------------------------------------------------------------------------
# Decorator
# ---------------------------------------------------------------------------
def customtracer(func):
"""Decorator that traces function invocations (console + optional Cloud Trace)."""
@functools.wraps(func)
def wrapper(*args, **kwargs):
func_name = func.__name__
logger.info(f"[{func_name}] start")
if not _OTEL_AVAILABLE or _tracer is None:
# Plain logging fallback
try:
result = func(*args, **kwargs)
logger.info(f"[{func_name}] done")
return result
except Exception as exc:
logger.error(f"[{func_name}] error: {exc}")
raise
# Full OpenTelemetry path
from opentelemetry import trace as _trace
from opentelemetry.trace import Status, StatusCode
from opentelemetry.trace.propagation.tracecontext import TraceContextTextMapPropagator
# Extract traceparent from request header if present
carrier = {}
for arg in args:
# Accept fastapi.Request or gradio.Request
headers = getattr(getattr(arg, "headers", None), "items", None)
if headers:
carrier = dict(headers())
break
ctx = TraceContextTextMapPropagator().extract(carrier=carrier) if carrier else None
span_name = f"{os.getenv('SPACE_ID', 'api_light_hf')}.{func_name}"
with _tracer.start_as_current_span(span_name, context=ctx) as span:
try:
span.set_attribute("func", func_name)
span.set_attribute("HF_SPACE_ID", os.getenv("SPACE_ID", ""))
sig = inspect.signature(func)
param_names = list(sig.parameters.keys())
for idx, arg in enumerate(args):
pname = param_names[idx] if idx < len(param_names) else str(idx)
span.set_attribute(f"args[{idx}:{pname}]", str(arg)[:256])
for k, v in kwargs.items():
span.set_attribute(k, str(v)[:256])
result = func(*args, **kwargs)
if isinstance(result, (list, tuple)):
r_dict = {str(i): str(v)[:256] for i, v in enumerate(result)}
elif isinstance(result, dict):
r_dict = {k: str(v)[:256] for k, v in result.items()}
else:
r_dict = {"value": str(result)[:256]}
span.add_event(f"result({type(result).__name__})", r_dict)
span.set_status(Status(StatusCode.OK))
logger.info(f"[{func_name}] done")
return result
except Exception as exc:
span.set_status(Status(StatusCode.ERROR))
span.record_exception(exc)
logger.error(f"[{func_name}] error: {exc}")
raise
return wrapper
# Async variant
def asynctracer(func):
"""Async version of customtracer."""
@functools.wraps(func)
async def wrapper(*args, **kwargs):
func_name = func.__name__
logger.info(f"[{func_name}] start (async)")
try:
result = await func(*args, **kwargs)
logger.info(f"[{func_name}] done (async)")
return result
except Exception as exc:
logger.error(f"[{func_name}] error (async): {exc}")
raise
return wrapper
|