from __future__ import annotations import os import sys import time from dataclasses import dataclass, field from typing import TYPE_CHECKING, Any, Callable, NamedTuple, cast import httpx from .constants import MAX_CALLS_LIMIT from .helpers.activity import register_activity_helpers from .helpers.collections import register_collection_helpers from .helpers.introspection import register_introspection_helpers from .helpers.profiles import register_profile_helpers from .helpers.repos import register_repo_helpers from .http_runtime import ( _as_int, _author_from_any, _canonical_repo_type, _clamp_int, _coerce_str_list, _dt_to_str, _extract_author_names, _extract_num_params, _extract_profile_name, _load_token, _normalize_collection_repo_item, _normalize_daily_paper_row, _normalize_repo_detail_row, _normalize_repo_search_row, _normalize_repo_sort_key, _normalize_trending_row, _optional_str_list, _repo_detail_call, _repo_list_call, _repo_web_url, _sort_repo_rows, call_api_host, ) from .registry import PAGINATION_POLICY from .runtime_envelopes import ( _build_exhaustive_meta, _build_exhaustive_result_meta, _derive_can_request_more, _derive_limit_metadata, _derive_more_available, _derive_next_request_hint, _derive_truncated_by, _helper_error, _helper_meta, _helper_success, _overview_count_only_success, _resolve_exhaustive_limits, ) from .runtime_filtering import ( _apply_where, _helper_item, _item_matches_where, _normalize_where, _overview_count, _project_activity_items, _project_actor_items, _project_collection_items, _project_discussion_detail_items, _project_discussion_items, _project_daily_paper_items, _project_items, _project_repo_items, _project_user_items, _project_user_like_items, ) from .validation import _resolve_helper_functions if TYPE_CHECKING: from huggingface_hub import HfApi class RuntimeHelperEnvironment(NamedTuple): context: "RuntimeContext" call_count: dict[str, int] trace: list[dict[str, Any]] limit_summaries: list[dict[str, Any]] latest_helper_error_box: dict[str, dict[str, Any] | None] internal_helper_used: dict[str, bool] helper_functions: dict[str, Callable[..., Any]] def _execution_debug_enabled() -> bool: for key in ("MONTY_DEBUG_EXECUTION", "MONTY_DEBUG_QUERY"): value = os.environ.get(key, "") if value.strip().lower() in {"1", "true", "yes", "on"}: return True return False def _debug_log(*parts: Any) -> None: if not _execution_debug_enabled(): return print("[monty-debug]", *parts, file=sys.stderr) sys.stderr.flush() def _hf_call_timeout_default() -> int: raw = os.environ.get("MONTY_HF_CALL_TIMEOUT_SEC", "20").strip() try: return max(1, int(raw)) except Exception: return 20 _HF_CLIENT_TIMEOUT_SEC: int | None = None def _configure_hf_client_factory(timeout_sec: int) -> None: global _HF_CLIENT_TIMEOUT_SEC if _HF_CLIENT_TIMEOUT_SEC == timeout_sec: return from huggingface_hub.utils._http import hf_request_event_hook, set_client_factory set_client_factory( lambda: httpx.Client( event_hooks={"request": [hf_request_event_hook]}, follow_redirects=True, timeout=float(timeout_sec), ) ) _HF_CLIENT_TIMEOUT_SEC = timeout_sec @dataclass(slots=True) class RuntimeContext: max_calls: int strict_mode: bool timeout_sec: int call_count: dict[str, int] = field(default_factory=lambda: {"n": 0}) trace: list[dict[str, Any]] = field(default_factory=list) limit_summaries: list[dict[str, Any]] = field(default_factory=list) latest_helper_error_box: dict[str, dict[str, Any] | None] = field( default_factory=lambda: {"value": None} ) internal_helper_used: dict[str, bool] = field( default_factory=lambda: {"used": False} ) helper_registry: dict[str, Callable[..., Any]] = field(default_factory=dict) _hf_api_client: "HfApi | None" = field(default=None, init=False, repr=False) def _budget_remaining(self) -> int: return max(0, self.max_calls - self.call_count["n"]) def _policy_int(self, helper_name: str, key: str, default: int) -> int: cfg = PAGINATION_POLICY.get(helper_name) or {} try: return int(cfg.get(key, default)) except Exception: return int(default) def _consume_call(self, endpoint: str, method: str = "GET") -> int: if self.call_count["n"] >= self.max_calls: raise RuntimeError(f"Max API calls exceeded ({self.max_calls})") self.call_count["n"] += 1 return self.call_count["n"] def _trace_ok( self, idx: int, endpoint: str, method: str = "GET", status: int = 200 ) -> None: self.trace.append( { "call_index": idx, "depth": idx, "method": method, "endpoint": endpoint, "ok": True, "status": status, } ) def _trace_err( self, idx: int, endpoint: str, err: Any, method: str = "GET", status: int = 0 ) -> None: self.trace.append( { "call_index": idx, "depth": idx, "method": method, "endpoint": endpoint, "ok": False, "status": status, "error": str(err), } ) def _host_raw_call( self, endpoint: str, *, params: dict[str, Any] | None = None, method: str = "GET", json_body: dict[str, Any] | None = None, ) -> dict[str, Any]: idx = self._consume_call(endpoint, method) started = time.perf_counter() _debug_log("host_raw:start", f"call={idx}", method, endpoint) try: resp = call_api_host( endpoint, method=method, params=params, json_body=json_body, timeout_sec=self.timeout_sec, strict_mode=self.strict_mode, ) if resp.get("ok"): self._trace_ok( idx, endpoint, method=method, status=int(resp.get("status") or 200) ) else: self._trace_err( idx, endpoint, resp.get("error"), method=method, status=int(resp.get("status") or 0), ) elapsed_ms = round((time.perf_counter() - started) * 1000, 2) _debug_log( "host_raw:end", f"call={idx}", method, endpoint, f"ok={bool(resp.get('ok'))}", f"status={resp.get('status')}", f"elapsed_ms={elapsed_ms}", ) return resp except Exception as exc: self._trace_err(idx, endpoint, exc, method=method, status=0) elapsed_ms = round((time.perf_counter() - started) * 1000, 2) _debug_log( "host_raw:error", f"call={idx}", method, endpoint, type(exc).__name__, str(exc), f"elapsed_ms={elapsed_ms}", ) raise def _get_hf_api_client(self) -> "HfApi": if self._hf_api_client is None: from huggingface_hub import HfApi endpoint = os.getenv("HF_ENDPOINT", "https://huggingface.co").rstrip("/") _configure_hf_client_factory( max(1, min(self.timeout_sec, _hf_call_timeout_default())) ) self._hf_api_client = HfApi(endpoint=endpoint, token=_load_token()) return self._hf_api_client def _host_hf_call(self, endpoint: str, fn: Callable[[], Any]) -> Any: idx = self._consume_call(endpoint, "GET") started = time.perf_counter() timeout_sec = max(1, min(self.timeout_sec, _hf_call_timeout_default())) _debug_log( "host_hf:start", f"call={idx}", endpoint, f"timeout_sec={timeout_sec}", ) try: out = fn() self._trace_ok(idx, endpoint, method="GET", status=200) elapsed_ms = round((time.perf_counter() - started) * 1000, 2) _debug_log("host_hf:end", f"call={idx}", endpoint, f"elapsed_ms={elapsed_ms}") return out except Exception as exc: self._trace_err(idx, endpoint, exc, method="GET", status=0) elapsed_ms = round((time.perf_counter() - started) * 1000, 2) _debug_log( "host_hf:error", f"call={idx}", endpoint, type(exc).__name__, str(exc), f"elapsed_ms={elapsed_ms}", ) raise async def call_helper(self, helper_name: str, /, *args: Any, **kwargs: Any) -> Any: fn = self.helper_registry.get(helper_name) if not callable(fn): raise RuntimeError(f"Helper '{helper_name}' is not registered") started = time.perf_counter() _debug_log( "runtime_helper:start", helper_name, f"args={len(args)}", f"kwargs={sorted(kwargs)}", f"budget_remaining={self._budget_remaining()}", ) try: result = await cast(Callable[..., Any], fn)(*args, **kwargs) except Exception as exc: elapsed_ms = round((time.perf_counter() - started) * 1000, 2) _debug_log( "runtime_helper:error", helper_name, type(exc).__name__, str(exc), f"elapsed_ms={elapsed_ms}", ) raise elapsed_ms = round((time.perf_counter() - started) * 1000, 2) ok = result.get("ok") if isinstance(result, dict) else None _debug_log( "runtime_helper:end", helper_name, f"ok={ok}", f"elapsed_ms={elapsed_ms}", f"budget_remaining={self._budget_remaining()}", ) return result for name, value in { "_helper_meta": _helper_meta, "_derive_limit_metadata": _derive_limit_metadata, "_derive_more_available": _derive_more_available, "_derive_truncated_by": _derive_truncated_by, "_derive_can_request_more": _derive_can_request_more, "_derive_next_request_hint": _derive_next_request_hint, "_resolve_exhaustive_limits": _resolve_exhaustive_limits, "_build_exhaustive_meta": _build_exhaustive_meta, "_overview_count_only_success": _overview_count_only_success, "_build_exhaustive_result_meta": _build_exhaustive_result_meta, "_helper_success": _helper_success, "_helper_error": _helper_error, "_project_items": _project_items, "_project_repo_items": _project_repo_items, "_project_collection_items": _project_collection_items, "_project_discussion_items": _project_discussion_items, "_project_discussion_detail_items": _project_discussion_detail_items, "_project_daily_paper_items": _project_daily_paper_items, "_project_user_items": _project_user_items, "_project_actor_items": _project_actor_items, "_project_user_like_items": _project_user_like_items, "_project_activity_items": _project_activity_items, "_normalize_where": _normalize_where, "_item_matches_where": _item_matches_where, "_apply_where": _apply_where, "_helper_item": _helper_item, "_overview_count": _overview_count, "_as_int": staticmethod(_as_int), "_author_from_any": staticmethod(_author_from_any), "_canonical_repo_type": staticmethod(_canonical_repo_type), "_clamp_int": staticmethod(_clamp_int), "_coerce_str_list": staticmethod(_coerce_str_list), "_dt_to_str": staticmethod(_dt_to_str), "_extract_author_names": staticmethod(_extract_author_names), "_extract_num_params": staticmethod(_extract_num_params), "_extract_profile_name": staticmethod(_extract_profile_name), "_load_token": staticmethod(_load_token), "_normalize_collection_repo_item": staticmethod(_normalize_collection_repo_item), "_normalize_daily_paper_row": staticmethod(_normalize_daily_paper_row), "_normalize_repo_detail_row": staticmethod(_normalize_repo_detail_row), "_normalize_repo_search_row": staticmethod(_normalize_repo_search_row), "_normalize_repo_sort_key": staticmethod(_normalize_repo_sort_key), "_normalize_trending_row": staticmethod(_normalize_trending_row), "_optional_str_list": staticmethod(_optional_str_list), "_repo_detail_call": staticmethod(_repo_detail_call), "_repo_list_call": staticmethod(_repo_list_call), "_repo_web_url": staticmethod(_repo_web_url), "_sort_repo_rows": staticmethod(_sort_repo_rows), }.items(): setattr(RuntimeContext, name, value) def build_runtime_helper_environment( *, max_calls: int, strict_mode: bool, timeout_sec: int, ) -> RuntimeHelperEnvironment: ctx = RuntimeContext( max_calls=max(1, min(int(max_calls), MAX_CALLS_LIMIT)), strict_mode=strict_mode, timeout_sec=timeout_sec, ) for registration in ( register_profile_helpers, register_repo_helpers, register_activity_helpers, register_collection_helpers, register_introspection_helpers, ): ctx.helper_registry.update(registration(ctx)) helper_functions = _resolve_helper_functions(ctx.helper_registry) return RuntimeHelperEnvironment( context=ctx, call_count=ctx.call_count, trace=ctx.trace, limit_summaries=ctx.limit_summaries, latest_helper_error_box=ctx.latest_helper_error_box, internal_helper_used=ctx.internal_helper_used, helper_functions=helper_functions, )