honcho-api / src /llm /api.py
rrizwan98
Honcho self-hosted deployment for HF Spaces
66227af
Raw
History Blame Contribute Delete
13.2 kB
"""Public LLM entrypoint: `honcho_llm_call`.
Orchestrates:
- Runtime config resolution from ConfiguredModelSettings → ModelConfig.
- Per-attempt planning (primary vs fallback selection).
- Retry with exponential backoff via tenacity.
- Tool-loop delegation when tools are supplied.
- Single-call delegation to the executor otherwise.
- Reasoning-trace telemetry emission.
"""
from __future__ import annotations
import logging
from collections.abc import AsyncIterator, Callable
from typing import Any, Literal, TypeVar, cast, overload
from pydantic import BaseModel
from sentry_sdk.ai.monitoring import ai_track
from tenacity import retry, stop_after_attempt, wait_exponential
from src.config import ConfiguredModelSettings, ModelConfig
from src.exceptions import ValidationException
from src.telemetry.logging import conditional_observe
from src.telemetry.reasoning_traces import log_reasoning_trace
from .executor import honcho_llm_call_inner
from .runtime import (
AttemptPlan,
current_attempt,
effective_temperature,
plan_attempt,
resolve_runtime_model_config,
update_current_langfuse_observation,
)
from .tool_loop import execute_tool_loop
from .types import (
HonchoLLMCallResponse,
HonchoLLMCallStreamChunk,
IterationCallback,
ReasoningEffortType,
StreamingResponseWithMetadata,
)
logger = logging.getLogger(__name__)
M = TypeVar("M", bound=BaseModel)
@overload
async def honcho_llm_call(
*,
model_config: ModelConfig | ConfiguredModelSettings,
prompt: str,
max_tokens: int,
track_name: str | None = None,
response_model: type[M],
json_mode: bool = False,
temperature: float | None = None,
stop_seqs: list[str] | None = None,
reasoning_effort: ReasoningEffortType = None,
verbosity: Literal["low", "medium", "high"] | None = None,
thinking_budget_tokens: int | None = None,
enable_retry: bool = True,
retry_attempts: int = 3,
stream: Literal[False] = False,
stream_final_only: bool = False,
tools: list[dict[str, Any]] | None = None,
tool_choice: str | dict[str, Any] | None = None,
tool_executor: Callable[[str, dict[str, Any]], Any] | None = None,
max_tool_iterations: int = 10,
messages: list[dict[str, Any]] | None = None,
max_input_tokens: int | None = None,
trace_name: str | None = None,
iteration_callback: IterationCallback | None = None,
) -> HonchoLLMCallResponse[M]: ...
@overload
async def honcho_llm_call(
*,
model_config: ModelConfig | ConfiguredModelSettings,
prompt: str,
max_tokens: int,
track_name: str | None = None,
response_model: None = None,
json_mode: bool = False,
temperature: float | None = None,
stop_seqs: list[str] | None = None,
reasoning_effort: ReasoningEffortType = None,
verbosity: Literal["low", "medium", "high"] | None = None,
thinking_budget_tokens: int | None = None,
enable_retry: bool = True,
retry_attempts: int = 3,
stream: Literal[False] = False,
stream_final_only: bool = False,
tools: list[dict[str, Any]] | None = None,
tool_choice: str | dict[str, Any] | None = None,
tool_executor: Callable[[str, dict[str, Any]], Any] | None = None,
max_tool_iterations: int = 10,
messages: list[dict[str, Any]] | None = None,
max_input_tokens: int | None = None,
trace_name: str | None = None,
iteration_callback: IterationCallback | None = None,
) -> HonchoLLMCallResponse[str]: ...
@overload
async def honcho_llm_call(
*,
model_config: ModelConfig | ConfiguredModelSettings,
prompt: str,
max_tokens: int,
track_name: str | None = None,
response_model: type[BaseModel] | None = None,
json_mode: bool = False,
temperature: float | None = None,
stop_seqs: list[str] | None = None,
reasoning_effort: ReasoningEffortType = None,
verbosity: Literal["low", "medium", "high"] | None = None,
thinking_budget_tokens: int | None = None,
enable_retry: bool = True,
retry_attempts: int = 3,
stream: Literal[True] = ...,
stream_final_only: bool = False,
tools: list[dict[str, Any]] | None = None,
tool_choice: str | dict[str, Any] | None = None,
tool_executor: Callable[[str, dict[str, Any]], Any] | None = None,
max_tool_iterations: int = 10,
messages: list[dict[str, Any]] | None = None,
max_input_tokens: int | None = None,
trace_name: str | None = None,
iteration_callback: IterationCallback | None = None,
) -> AsyncIterator[HonchoLLMCallStreamChunk] | StreamingResponseWithMetadata: ...
@conditional_observe(name="LLM Call")
async def honcho_llm_call(
*,
model_config: ModelConfig | ConfiguredModelSettings,
prompt: str,
max_tokens: int,
track_name: str | None = None,
response_model: type[BaseModel] | None = None,
json_mode: bool = False,
temperature: float | None = None,
stop_seqs: list[str] | None = None,
reasoning_effort: ReasoningEffortType = None,
verbosity: Literal["low", "medium", "high"] | None = None,
thinking_budget_tokens: int | None = None,
enable_retry: bool = True,
retry_attempts: int = 3,
stream: bool = False,
stream_final_only: bool = False,
tools: list[dict[str, Any]] | None = None,
tool_choice: str | dict[str, Any] | None = None,
tool_executor: Callable[[str, dict[str, Any]], Any] | None = None,
max_tool_iterations: int = 10,
messages: list[dict[str, Any]] | None = None,
max_input_tokens: int | None = None,
trace_name: str | None = None,
iteration_callback: IterationCallback | None = None,
) -> (
HonchoLLMCallResponse[Any]
| AsyncIterator[HonchoLLMCallStreamChunk]
| StreamingResponseWithMetadata
):
"""Make an LLM call with retry, optional backup failover, and optional tool loop.
Backup provider/model (if configured on the primary ModelConfig's
`fallback`) is used on the final retry attempt, which is 3 by default.
Raises:
ValidationException: If streaming and tool calling are combined
without `stream_final_only=True`.
"""
runtime_model_config = resolve_runtime_model_config(model_config)
# Caller kwargs left at None are resolved downstream by
# effective_config_for_call against whichever ModelConfig wins the
# attempt (primary or fallback). Defaulting here from
# runtime_model_config would clobber a fallback config's own
# temperature/thinking params on the final retry, so we deliberately
# keep the locals as the caller supplied them.
if stream and tools and not stream_final_only:
raise ValidationException(
"Streaming is not supported with tool calling. "
+ "Set stream=False when using tools, or use stream_final_only=True "
+ "to stream only the final response after tool calls."
)
# tenacity uses 1-indexed attempts.
current_attempt.set(1)
def _get_attempt_plan() -> AttemptPlan:
plan = plan_attempt(
runtime_model_config=runtime_model_config,
attempt=current_attempt.get(),
retry_attempts=retry_attempts,
call_thinking_budget_tokens=thinking_budget_tokens,
call_reasoning_effort=reasoning_effort,
)
update_current_langfuse_observation(
plan.provider,
plan.model,
name=track_name,
)
return plan
async def _call_with_provider_selection() -> (
HonchoLLMCallResponse[Any] | AsyncIterator[HonchoLLMCallStreamChunk]
):
"""Select provider/model based on current attempt, then call once.
This closure is what tenacity wraps, so selection re-runs per attempt
(and the fallback kicks in on the final attempt automatically).
"""
plan = _get_attempt_plan()
if stream:
return await honcho_llm_call_inner(
plan.provider,
plan.model,
prompt,
max_tokens,
response_model,
json_mode,
effective_temperature(temperature),
stop_seqs,
plan.reasoning_effort,
verbosity,
plan.thinking_budget_tokens,
stream=True,
client_override=plan.client,
tools=tools,
tool_choice=tool_choice,
selected_config=plan.selected_config,
)
return await honcho_llm_call_inner(
plan.provider,
plan.model,
prompt,
max_tokens,
response_model,
json_mode,
effective_temperature(temperature),
stop_seqs,
plan.reasoning_effort,
verbosity,
plan.thinking_budget_tokens,
stream=False,
client_override=plan.client,
tools=tools,
tool_choice=tool_choice,
selected_config=plan.selected_config,
)
decorated = _call_with_provider_selection
if track_name:
decorated = ai_track(track_name)(decorated)
def before_retry_callback(retry_state: Any) -> None:
"""Update attempt counter before each retry + log transient failures.
tenacity's before_sleep fires AFTER an attempt fails, BEFORE sleeping,
so we increment to the next attempt number here.
"""
next_attempt = retry_state.attempt_number + 1
current_attempt.set(next_attempt)
exc = retry_state.outcome.exception() if retry_state.outcome else None
if exc:
logger.warning(
f"Error on attempt {retry_state.attempt_number}/{retry_attempts} with "
+ f"{runtime_model_config.transport}/{runtime_model_config.model}: {exc}"
)
logger.info(f"Will retry with attempt {next_attempt}/{retry_attempts}")
if enable_retry:
decorated = retry(
stop=stop_after_attempt(retry_attempts),
wait=wait_exponential(multiplier=1, min=4, max=10),
before_sleep=before_retry_callback,
)(decorated)
def _trace_thinking_budget() -> int | None:
# Trace log should reflect what got applied, so fall back to the
# runtime config's value when the caller left the kwarg unset.
return (
thinking_budget_tokens
if thinking_budget_tokens is not None
else runtime_model_config.thinking_budget_tokens
)
def _trace_reasoning_effort() -> ReasoningEffortType:
if reasoning_effort is not None:
return reasoning_effort
config_effort = runtime_model_config.thinking_effort
return cast(ReasoningEffortType, config_effort) if config_effort else None
def _trace_stop_seqs() -> list[str] | None:
return (
stop_seqs if stop_seqs is not None else runtime_model_config.stop_sequences
)
# Tool-less path: call once and return.
if not tools or not tool_executor:
result: (
HonchoLLMCallResponse[Any] | AsyncIterator[HonchoLLMCallStreamChunk]
) = await decorated()
if trace_name and isinstance(result, HonchoLLMCallResponse):
log_reasoning_trace(
task_type=trace_name,
model_config=runtime_model_config,
prompt=prompt,
response=result,
max_tokens=max_tokens,
thinking_budget_tokens=_trace_thinking_budget(),
reasoning_effort=_trace_reasoning_effort(),
json_mode=json_mode,
stop_seqs=_trace_stop_seqs(),
messages=messages,
)
return result
# execute_tool_loop raises ValidationException on out-of-range
# max_tool_iterations; fail-fast is cheaper than silent clamping here.
result = await execute_tool_loop(
prompt=prompt,
max_tokens=max_tokens,
messages=messages,
tools=tools,
tool_choice=tool_choice,
tool_executor=tool_executor,
max_tool_iterations=max_tool_iterations,
response_model=response_model,
json_mode=json_mode,
temperature=temperature,
stop_seqs=stop_seqs,
verbosity=verbosity,
enable_retry=enable_retry,
retry_attempts=retry_attempts,
max_input_tokens=max_input_tokens,
get_attempt_plan=_get_attempt_plan,
before_retry_callback=before_retry_callback,
stream_final=stream_final_only,
iteration_callback=iteration_callback,
)
if trace_name and isinstance(result, HonchoLLMCallResponse):
log_reasoning_trace(
task_type=trace_name,
model_config=runtime_model_config,
prompt=prompt,
response=result,
max_tokens=max_tokens,
thinking_budget_tokens=_trace_thinking_budget(),
reasoning_effort=_trace_reasoning_effort(),
json_mode=json_mode,
stop_seqs=_trace_stop_seqs(),
messages=messages,
)
return result
__all__ = ["honcho_llm_call"]