hf-hub-query / monty_api /query_entrypoints.py
evalstate's picture
evalstate HF Staff
Deploy hf-hub-query with current fast-agent and Monty
06ea0aa verified
from __future__ import annotations
import argparse
import asyncio
import inspect
import json
import os
import sys
import time
from typing import Any, Callable
from .constants import (
DEFAULT_MAX_CALLS,
DEFAULT_MONTY_MAX_ALLOCATIONS,
DEFAULT_MONTY_MAX_MEMORY,
DEFAULT_MONTY_MAX_RECURSION_DEPTH,
DEFAULT_TIMEOUT_SEC,
INTERNAL_STRICT_MODE,
MAX_CALLS_LIMIT,
)
from .runtime_context import build_runtime_helper_environment
from .validation import (
_coerce_jsonish_python_literals,
_compact_result_metadata,
_summarize_limit_hit,
_truncate_result_payload,
_validate_generated_code,
_wrap_raw_result,
)
class MontyExecutionError(RuntimeError):
def __init__(self, message: str, api_calls: int, trace: list[dict[str, Any]]):
super().__init__(message)
self.api_calls = api_calls
self.trace = trace
def _query_debug_enabled() -> bool:
value = os.environ.get("MONTY_DEBUG_QUERY", "")
return value.strip().lower() in {"1", "true", "yes", "on"}
def _execution_debug_enabled() -> bool:
value = os.environ.get("MONTY_DEBUG_EXECUTION", "")
if value.strip().lower() in {"1", "true", "yes", "on"}:
return True
return _query_debug_enabled()
def _debug_log(*parts: Any) -> None:
if not _execution_debug_enabled():
return
print("[monty-debug]", *parts, file=sys.stderr)
sys.stderr.flush()
def _log_generated_query(
*, query: str | None, code: str, max_calls: int | None, timeout_sec: int | None
) -> None:
if not _query_debug_enabled():
return
if query:
print("[monty-debug] query:", file=sys.stderr)
print(query, file=sys.stderr)
print("[monty-debug] max_calls:", max_calls, file=sys.stderr)
print("[monty-debug] timeout_sec:", timeout_sec, file=sys.stderr)
print("[monty-debug] code:", file=sys.stderr)
print(code, file=sys.stderr)
sys.stderr.flush()
def _introspect_helper_signatures() -> dict[str, set[str]]:
env = build_runtime_helper_environment(
max_calls=DEFAULT_MAX_CALLS,
strict_mode=INTERNAL_STRICT_MODE,
timeout_sec=DEFAULT_TIMEOUT_SEC,
)
signatures = {
name: {
parameter.name for parameter in inspect.signature(fn).parameters.values()
}
for name, fn in env.helper_functions.items()
}
return signatures
async def _run_with_monty(
*,
code: str,
query: str | None,
max_calls: int,
strict_mode: bool,
timeout_sec: int,
) -> dict[str, Any]:
_debug_log(
"run_monty:start",
f"max_calls={max_calls}",
f"timeout_sec={timeout_sec}",
f"strict_mode={strict_mode}",
)
try:
import pydantic_monty
except Exception as e:
raise RuntimeError(
"pydantic_monty is not installed. Install with `uv pip install pydantic-monty`."
) from e
env = build_runtime_helper_environment(
max_calls=max_calls,
strict_mode=strict_mode,
timeout_sec=timeout_sec,
)
m = pydantic_monty.Monty(
code,
inputs=["query", "max_calls"],
script_name="monty_agent.py",
type_check=False,
)
def _collecting_wrapper(
helper_name: str, fn: Callable[..., Any]
) -> Callable[..., Any]:
async def wrapped(*args: Any, **kwargs: Any) -> Any:
started = time.perf_counter()
_debug_log(
"helper:start",
helper_name,
f"args={len(args)}",
f"kwargs={sorted(kwargs)}",
)
result = await fn(*args, **kwargs)
summary = _summarize_limit_hit(helper_name, result)
if summary is not None and len(env.limit_summaries) < 20:
env.limit_summaries.append(summary)
elapsed_ms = round((time.perf_counter() - started) * 1000, 2)
_debug_log(
"helper:end",
helper_name,
f"elapsed_ms={elapsed_ms}",
f"api_calls={env.call_count['n']}",
)
return result
return wrapped
limits: pydantic_monty.ResourceLimits = {
"max_duration_secs": float(timeout_sec),
"max_memory": DEFAULT_MONTY_MAX_MEMORY,
"max_allocations": DEFAULT_MONTY_MAX_ALLOCATIONS,
"max_recursion_depth": DEFAULT_MONTY_MAX_RECURSION_DEPTH,
}
try:
_debug_log("run_monty:invoke")
result = await pydantic_monty.run_monty_async(
m,
inputs={"query": query or "", "max_calls": max_calls},
external_functions={
name: _collecting_wrapper(name, fn)
for name, fn in env.helper_functions.items()
},
limits=limits,
)
_debug_log("run_monty:return", f"api_calls={env.call_count['n']}")
except Exception as e:
_debug_log("run_monty:error", type(e).__name__, str(e))
raise MontyExecutionError(str(e), env.call_count["n"], env.trace) from e
if env.call_count["n"] == 0:
if env.internal_helper_used["used"]:
return {
"output": _truncate_result_payload(result),
"api_calls": env.call_count["n"],
"trace": env.trace,
"limit_summaries": env.limit_summaries,
}
if isinstance(result, dict) and result.get("ok") is True:
meta = result.get("meta") if isinstance(result.get("meta"), dict) else {}
source = meta.get("source")
if isinstance(source, str) and source.startswith("internal://"):
return {
"output": _truncate_result_payload(result),
"api_calls": env.call_count["n"],
"trace": env.trace,
"limit_summaries": env.limit_summaries,
}
latest_helper_error = env.latest_helper_error_box.get("value")
if latest_helper_error is not None:
return {
"output": _truncate_result_payload(latest_helper_error),
"api_calls": env.call_count["n"],
"trace": env.trace,
"limit_summaries": env.limit_summaries,
}
if (
isinstance(result, dict)
and result.get("ok") is False
and isinstance(result.get("error"), str)
):
return {
"output": _truncate_result_payload(result),
"api_calls": env.call_count["n"],
"trace": env.trace,
"limit_summaries": env.limit_summaries,
}
raise MontyExecutionError(
"Code completed without calling any external API function",
env.call_count["n"],
env.trace,
)
if not any(step.get("ok") is True for step in env.trace):
if (
isinstance(result, dict)
and result.get("ok") is False
and isinstance(result.get("error"), str)
):
return {
"output": _truncate_result_payload(result),
"api_calls": env.call_count["n"],
"trace": env.trace,
"limit_summaries": env.limit_summaries,
}
raise MontyExecutionError(
"Code completed without a successful API call; refusing non-live fallback result",
env.call_count["n"],
env.trace,
)
return {
"output": _truncate_result_payload(result),
"api_calls": env.call_count["n"],
"trace": env.trace,
"limit_summaries": env.limit_summaries,
}
def _prepare_query_inputs(
*,
query: str | None,
code: str,
max_calls: int | None,
timeout_sec: int | None,
) -> tuple[str, str, int, int]:
if not code or not code.strip():
raise ValueError("code is required")
normalized_query = str(query or "").strip()
resolved_max_calls = DEFAULT_MAX_CALLS if max_calls is None else max_calls
resolved_timeout_sec = DEFAULT_TIMEOUT_SEC if timeout_sec is None else timeout_sec
normalized_max_calls = max(1, min(int(resolved_max_calls), MAX_CALLS_LIMIT))
normalized_timeout_sec = int(resolved_timeout_sec)
normalized_code = _coerce_jsonish_python_literals(code.strip())
_validate_generated_code(normalized_code)
return normalized_query, normalized_code, normalized_max_calls, normalized_timeout_sec
async def _execute_query(
*,
query: str | None,
code: str,
max_calls: int | None,
timeout_sec: int | None,
) -> dict[str, Any]:
_debug_log("execute_query:start")
prepared_query, prepared_code, prepared_max_calls, prepared_timeout = (
_prepare_query_inputs(
query=query,
code=code,
max_calls=max_calls,
timeout_sec=timeout_sec,
)
)
_debug_log(
"execute_query:prepared",
f"query_len={len(prepared_query)}",
f"code_len={len(prepared_code)}",
f"max_calls={prepared_max_calls}",
f"timeout_sec={prepared_timeout}",
)
_log_generated_query(
query=prepared_query,
code=prepared_code,
max_calls=prepared_max_calls,
timeout_sec=prepared_timeout,
)
return await _run_with_monty(
code=prepared_code,
query=prepared_query,
max_calls=prepared_max_calls,
strict_mode=INTERNAL_STRICT_MODE,
timeout_sec=prepared_timeout,
)
async def hf_hub_query(
code: str,
query: str | None = None,
max_calls: int | None = DEFAULT_MAX_CALLS,
timeout_sec: int | None = DEFAULT_TIMEOUT_SEC,
) -> dict[str, Any]:
"""Use natural-language queries to explore the Hugging Face Hub.
Best for read-only Hub discovery, lookup, ranking, and relationship questions
across users, organizations, repositories, activity, followers, likes,
discussions, and collections.
"""
try:
run = await _execute_query(
query=query,
code=code,
max_calls=max_calls,
timeout_sec=timeout_sec,
)
return {
"ok": True,
"data": _compact_result_metadata(run["output"]),
"error": None,
"api_calls": run["api_calls"],
}
except MontyExecutionError as e:
return {
"ok": False,
"data": None,
"error": str(e),
"api_calls": e.api_calls,
}
except Exception as e:
return {
"ok": False,
"data": None,
"error": str(e),
"api_calls": 0,
}
async def hf_hub_query_raw(
code: str,
query: str | None = None,
max_calls: int | None = DEFAULT_MAX_CALLS,
timeout_sec: int | None = DEFAULT_TIMEOUT_SEC,
) -> Any:
"""Use natural-language queries to explore the Hugging Face Hub in raw mode.
Best for read-only Hub discovery, lookup, ranking, and relationship
questions when the caller wants a runtime-owned raw envelope:
``result`` contains the generated script's final `result` value and ``meta`` contains
execution details such as timing, call counts, and limit summaries.
"""
started = time.perf_counter()
try:
run = await _execute_query(
query=query,
code=code,
max_calls=max_calls,
timeout_sec=timeout_sec,
)
elapsed_ms = int((time.perf_counter() - started) * 1000)
return _wrap_raw_result(
_compact_result_metadata(run["output"]),
ok=True,
api_calls=run["api_calls"],
elapsed_ms=elapsed_ms,
limit_summaries=run.get("limit_summaries"),
)
except MontyExecutionError as e:
elapsed_ms = int((time.perf_counter() - started) * 1000)
return _wrap_raw_result(
None,
ok=False,
api_calls=e.api_calls,
elapsed_ms=elapsed_ms,
error=str(e),
)
except Exception as e:
elapsed_ms = int((time.perf_counter() - started) * 1000)
return _wrap_raw_result(
None,
ok=False,
api_calls=0,
elapsed_ms=elapsed_ms,
error=str(e),
)
def _arg_parser() -> argparse.ArgumentParser:
p = argparse.ArgumentParser(description="Monty-backed API chaining tool (v3)")
p.add_argument("--query", default=None, help="Optional natural language query/context")
p.add_argument("--code", default=None, help="Inline Monty code to execute")
p.add_argument(
"--code-file", default=None, help="Path to .py file with Monty code to execute"
)
p.add_argument(
"--max-calls",
type=int,
default=DEFAULT_MAX_CALLS,
help="Max external API/helper calls",
)
p.add_argument("--timeout", type=int, default=DEFAULT_TIMEOUT_SEC)
return p
def main() -> int:
args = _arg_parser().parse_args()
code = args.code
if args.code_file:
with open(args.code_file, "r", encoding="utf-8") as f:
code = f.read()
if not code:
print(
json.dumps(
{"ok": False, "error": "Either --code or --code-file is required"},
ensure_ascii=False,
)
)
return 1
try:
out = asyncio.run(
hf_hub_query(
code=code,
query=args.query,
max_calls=args.max_calls,
timeout_sec=args.timeout,
)
)
print(json.dumps(out, ensure_ascii=False))
return 0 if out.get("ok") else 1
except Exception as e:
print(json.dumps({"ok": False, "error": str(e)}, ensure_ascii=False))
return 1