AIstudioProxyAPI / api_utils /utils_ext /function_calling_orchestrator.py
peijun1's picture
Deploy AI Studio Proxy API to Hugging Face Spaces
a5784e9
Raw
History Blame Contribute Delete
30.7 kB
"""
Function Calling Orchestrator
Central coordinator that routes function calling between native and emulated modes.
Integrates SchemaConverter, ResponseFormatter, and browser automation into the request flow.
Implements Phase 3 of ADR-001: Native Function Calling Architecture.
Includes caching to skip redundant UI operations for subsequent requests with same tools.
"""
import asyncio
import logging
import time
from dataclasses import dataclass
from typing import Any, Callable, Dict, List, Optional, Tuple, Union
from browser_utils.page_controller import PageController
from logging_utils.fc_debug import FCModule, get_fc_logger
# FC debug logger for orchestrator-level events
fc_logger = get_fc_logger()
# Import directly from the module file to avoid circular imports through __init__.py
# fmt: off
from api_utils.utils_ext.function_calling import ( # noqa: E501
FunctionCallingConfig,
FunctionCallingMode,
ParsedFunctionCall,
ResponseFormatter,
SchemaConversionError,
SchemaConverter,
get_finish_reason,
)
from api_utils.utils_ext.function_calling_cache import FunctionCallingCache
from config.settings import (
FUNCTION_CALLING_CLEAR_BETWEEN_REQUESTS,
FUNCTION_CALLING_DEBUG,
FUNCTION_CALLING_MODE,
)
from models import ClientDisconnectedError
# fmt: on
class NativeFunctionCallingError(Exception):
"""Raised when native function calling fails and fallback may be needed."""
pass
@dataclass
class FunctionCallingState:
"""Tracks the state of function calling for a request.
Attributes:
mode: The effective mode being used for this request.
native_enabled: Whether native mode was successfully enabled.
tools_configured: Whether tools were configured in native mode.
fallback_used: Whether fallback to emulated mode was used.
error_message: Any error message from native mode attempts.
tools_digest: Digest of tool definitions for caching.
cache_hit: Whether cache was used to skip UI operations.
"""
mode: FunctionCallingMode
native_enabled: bool = False
tools_configured: bool = False
fallback_used: bool = False
error_message: Optional[str] = None
tools_digest: Optional[str] = None
cache_hit: bool = False
class FunctionCallingOrchestrator:
"""
Central orchestrator for function calling that handles mode selection,
tool configuration, and response processing.
Usage:
orchestrator = FunctionCallingOrchestrator()
# Before sending prompt
state = await orchestrator.prepare_request(
tools=request.tools,
tool_choice=request.tool_choice,
page_controller=page_controller,
check_client_disconnected=check_fn,
req_id=req_id,
)
# After receiving response (for non-streaming)
processed = await orchestrator.process_response(
raw_content=response_content,
functions=function_data,
state=state,
)
"""
def __init__(self, logger: Optional[logging.Logger] = None):
"""Initialize the orchestrator.
Args:
logger: Optional logger instance. If None, uses default logger.
"""
self.logger = logger or logging.getLogger("AIStudioProxyServer")
self._config = FunctionCallingConfig.from_settings()
self._schema_converter = SchemaConverter()
self._response_formatter = ResponseFormatter()
self._cache = FunctionCallingCache.get_instance(self.logger)
@property
def config(self) -> FunctionCallingConfig:
"""Get the current function calling configuration."""
return self._config
@property
def response_formatter(self) -> ResponseFormatter:
"""Get the response formatter instance."""
return self._response_formatter
@property
def cache(self) -> FunctionCallingCache:
"""Get the function calling cache instance."""
return self._cache
async def _ensure_fc_disabled_when_no_tools(
self,
page_controller: PageController,
check_client_disconnected: Callable,
req_id: str,
) -> None:
"""Ensure function calling toggle is disabled when no tools are provided.
This handles the edge case where a previous request enabled FC,
but the current request (e.g., XML-based tools) doesn't use OpenAI-format tools.
We need to disable the toggle to prevent interference.
Args:
page_controller: PageController instance for browser automation.
check_client_disconnected: Callback to check client connection.
req_id: Request ID for logging.
"""
# Only check/disable if we're in native or auto mode
# In emulated mode, the toggle should never have been enabled by us
if self._config.mode == FunctionCallingMode.EMULATED:
return
try:
# Check if FC toggle is currently enabled
is_enabled = await page_controller.is_function_calling_enabled(
check_client_disconnected, use_cache=True
)
if is_enabled:
if FUNCTION_CALLING_DEBUG:
self.logger.info(
f"[{req_id}] [FC] No tools in request but FC toggle is enabled - disabling for clean state"
)
start_time = time.perf_counter()
success = await page_controller.disable_function_calling(
check_client_disconnected
)
elapsed = time.perf_counter() - start_time
if success:
# Invalidate cache since state changed
self._cache.invalidate(reason="no_tools_cleanup", req_id=req_id)
if FUNCTION_CALLING_DEBUG:
self.logger.info(
f"[{req_id}] [FC:Perf] FC toggle disabled in {elapsed:.2f}s"
)
else:
if FUNCTION_CALLING_DEBUG:
self.logger.warning(
f"[{req_id}] [FC] Failed to disable FC toggle - may affect response"
)
except ClientDisconnectedError:
# Client gone, nothing to do
pass
except asyncio.CancelledError:
# Cancelled, skip cleanup
pass
except Exception as e:
# Non-fatal error - log and continue
# The request can still proceed, just with FC potentially enabled
if FUNCTION_CALLING_DEBUG:
self.logger.warning(
f"[{req_id}] [FC] Error checking/disabling FC toggle: {e}"
)
def should_use_native_mode(
self,
tools: Optional[List[Dict[str, Any]]],
tool_choice: Optional[Union[str, Dict[str, Any]]],
) -> bool:
"""Determine if native mode should be attempted for this request.
Args:
tools: List of tool definitions from the request.
tool_choice: Tool choice parameter from the request.
Returns:
True if native mode should be attempted, False otherwise.
"""
# No tools = no need for native mode
if not tools or len(tools) == 0:
return False
# Check mode setting
mode = self._config.mode
# Emulated mode always uses text injection
if mode == FunctionCallingMode.EMULATED:
if self._config.debug:
self.logger.debug("FC: Mode is EMULATED, using prompt injection")
return False
# Native and auto modes should try native
if mode in (FunctionCallingMode.NATIVE, FunctionCallingMode.AUTO):
if self._config.debug:
self.logger.debug(
f"FC: Mode is {mode.value}, attempting native UI automation"
)
return True
if self._config.debug:
self.logger.debug(f"FC: Mode {mode} unknown, defaulting to False")
return False
def get_effective_mode(
self,
tools: Optional[List[Dict[str, Any]]],
) -> FunctionCallingMode:
"""Get the effective function calling mode for a request.
Args:
tools: List of tool definitions from the request.
Returns:
The effective FunctionCallingMode to use.
"""
if not tools or len(tools) == 0:
return FunctionCallingMode.EMULATED
return self._config.mode
async def prepare_request(
self,
tools: Optional[List[Dict[str, Any]]],
tool_choice: Optional[Union[str, Dict[str, Any]]],
page_controller: PageController,
check_client_disconnected: Callable,
req_id: str,
model_name: Optional[str] = None,
) -> FunctionCallingState:
"""Prepare a request for function calling based on the configured mode.
This method:
1. Computes tool digest and checks cache for existing state
2. If cache valid: skips UI automation (cache hit)
3. If cache miss: converts tools and configures browser UI
4. Updates cache on success
5. Handles fallback if native mode fails and fallback is enabled
6. If no tools provided but FC was previously enabled, disables it
Args:
tools: List of OpenAI-format tool definitions.
tool_choice: Tool choice parameter (auto, none, required, or specific).
page_controller: PageController instance for browser automation.
check_client_disconnected: Callback to check client connection.
req_id: Request ID for logging.
model_name: Optional model name for cache validation.
Returns:
FunctionCallingState with the configuration result.
"""
total_start = time.perf_counter()
state = FunctionCallingState(mode=self.get_effective_mode(tools))
# No tools provided - ensure FC toggle is disabled if it was previously enabled
if not tools or len(tools) == 0:
await self._ensure_fc_disabled_when_no_tools(
page_controller=page_controller,
check_client_disconnected=check_client_disconnected,
req_id=req_id,
)
if self._config.debug:
self.logger.debug(
f"[{req_id}] [FC] No tools in request, FC setup skipped/disabled"
)
return state
if state.mode == FunctionCallingMode.EMULATED:
if self._config.debug:
self.logger.debug(
f"[{req_id}] [FC] Using emulated mode, skipping native FC setup"
)
if FUNCTION_CALLING_DEBUG:
fc_logger.log_mode_selection(req_id, "emulated", "configured mode")
return state
# Native or Auto mode - compute digest and check cache
tools_digest = self._cache.compute_tools_digest(tools)
state.tools_digest = tools_digest
# Check cache first
if self._cache.is_cache_valid(tools_digest, model_name, req_id=req_id):
cached_state = self._cache.get_cached_state()
if (
cached_state
and cached_state.declarations_set
and cached_state.toggle_enabled
):
# Cache HIT - but UI toggle may have been reset by new_chat
# Verify and re-enable toggle if needed before trusting cache
try:
# Check actual UI toggle state (bypass instance cache)
toggle_enabled = await page_controller.is_function_calling_enabled(
check_client_disconnected, use_cache=False
)
if not toggle_enabled:
if FUNCTION_CALLING_DEBUG:
self.logger.warning(
f"[{req_id}] [FC:Cache] HIT but UI toggle disabled - re-enabling"
)
enable_success = await page_controller.enable_function_calling(
check_client_disconnected
)
if not enable_success:
if FUNCTION_CALLING_DEBUG:
self.logger.warning(
f"[{req_id}] [FC:Cache] Failed to re-enable toggle, "
"falling through to full setup"
)
# Fall through to full native configuration
else:
elapsed = time.perf_counter() - total_start
if FUNCTION_CALLING_DEBUG:
self.logger.info(
f"[{req_id}] [FC:Cache] HIT - toggle re-enabled "
f"(digest={tools_digest[:8]}..., checked in {elapsed:.3f}s)"
)
state.native_enabled = True
state.tools_configured = True
state.cache_hit = True
return state
else:
elapsed = time.perf_counter() - total_start
if FUNCTION_CALLING_DEBUG:
self.logger.info(
f"[{req_id}] [FC:Cache] HIT - skipping native FC setup "
f"(digest={tools_digest[:8]}..., checked in {elapsed:.3f}s)"
)
state.native_enabled = True
state.tools_configured = True
state.cache_hit = True
return state
except ClientDisconnectedError:
raise
except Exception as e:
if FUNCTION_CALLING_DEBUG:
self.logger.warning(
f"[{req_id}] [FC:Cache] Toggle verification failed: {e}, "
"falling through to full setup"
)
# Fall through to full native configuration
# Cache miss - proceed with native configuration
if FUNCTION_CALLING_DEBUG:
self.logger.info(
f"[{req_id}] [FC] Configuring native function calling with {len(tools)} tool(s) "
f"(digest={tools_digest[:8]}...)"
)
if FUNCTION_CALLING_DEBUG:
fc_logger.log_mode_selection(
req_id, "native", f"cache_miss, {len(tools)} tools"
)
# Log tool choice if specific
if tool_choice:
if isinstance(tool_choice, dict):
forced_fn = tool_choice.get("function", {}).get(
"name"
) or tool_choice.get("name")
if forced_fn:
if FUNCTION_CALLING_DEBUG:
self.logger.info(
f"[{req_id}] [FC] Tool choice: FORCING specific tool '{forced_fn}'"
)
elif isinstance(tool_choice, str) and tool_choice.lower() not in (
"auto",
"none",
"required",
):
if FUNCTION_CALLING_DEBUG:
self.logger.info(
f"[{req_id}] [FC] Tool choice: FORCING specific tool '{tool_choice}'"
)
try:
# Convert OpenAI tools to Gemini format
convert_start = time.perf_counter()
gemini_declarations = self._schema_converter.convert_tools(tools)
convert_elapsed = time.perf_counter() - convert_start
if self._config.debug:
self.logger.debug(
f"[{req_id}] [FC:Perf] Converted {len(tools)} tools to Gemini format "
f"in {convert_elapsed:.3f}s"
)
if FUNCTION_CALLING_DEBUG:
fc_logger.log_schema_conversion(
req_id, tool_count=len(tools), elapsed_ms=convert_elapsed * 1000
)
# Retry loop for UI automation
last_error: Optional[Exception] = None
for attempt in range(1, self._config.native_retry_count + 1):
try:
if attempt > 1:
if FUNCTION_CALLING_DEBUG:
self.logger.warning(
f"[{req_id}] [FC:UI] Retry attempt {attempt}/{self._config.native_retry_count}"
)
check_client_disconnected(f"FC prepare attempt {attempt}")
# Check if function calling is available
fc_available = await page_controller.is_function_calling_available(
check_client_disconnected
)
if not fc_available:
raise NativeFunctionCallingError(
"Function calling UI not available for this model"
)
# Set function declarations via UI (with caching support)
success = await page_controller.set_function_declarations(
gemini_declarations,
check_client_disconnected,
tools_digest=tools_digest,
model_name=model_name,
tools=tools,
)
if success:
state.native_enabled = True
state.tools_configured = True
total_elapsed = time.perf_counter() - total_start
if FUNCTION_CALLING_DEBUG:
self.logger.info(
f"[{req_id}] [FC:Perf] Native function calling configured "
f"in {total_elapsed:.2f}s"
)
if FUNCTION_CALLING_DEBUG:
fc_logger.info(
FCModule.ORCHESTRATOR,
f"Native FC configured successfully in {total_elapsed:.2f}s",
req_id=req_id,
)
return state
else:
raise NativeFunctionCallingError(
"Failed to set function declarations in UI"
)
except ClientDisconnectedError:
raise
except asyncio.CancelledError:
raise
except Exception as e:
last_error = e
if FUNCTION_CALLING_DEBUG:
self.logger.warning(
f"[{req_id}] [FC] Native FC attempt {attempt}/{self._config.native_retry_count} failed: {e}"
)
if attempt < self._config.native_retry_count:
await asyncio.sleep(0.5)
# All retries failed
raise NativeFunctionCallingError(
f"Native function calling failed after {self._config.native_retry_count} attempts: {last_error}"
)
except SchemaConversionError as e:
state.error_message = f"Schema conversion error: {e}"
if FUNCTION_CALLING_DEBUG:
self.logger.error(f"[{req_id}] [FC] {state.error_message}")
# Schema errors are not recoverable - don't fallback
if state.mode == FunctionCallingMode.NATIVE:
raise
# Auto mode with schema error - fall through to emulated
state.fallback_used = True
state.mode = FunctionCallingMode.EMULATED
if FUNCTION_CALLING_DEBUG:
self.logger.warning(
f"[{req_id}] [FC] Falling back to emulated mode due to schema error"
)
if FUNCTION_CALLING_DEBUG:
fc_logger.log_mode_selection(
req_id, "emulated", "fallback_schema_error"
)
return state
except NativeFunctionCallingError as e:
state.error_message = str(e)
if FUNCTION_CALLING_DEBUG:
self.logger.warning(
f"[{req_id}] [FC] Native function calling failed: {e}"
)
# Check if fallback is allowed
if state.mode == FunctionCallingMode.AUTO and self._config.native_fallback:
state.fallback_used = True
state.mode = FunctionCallingMode.EMULATED
if FUNCTION_CALLING_DEBUG:
self.logger.info(
f"[{req_id}] [FC] Falling back to emulated mode for function calling"
)
if FUNCTION_CALLING_DEBUG:
fc_logger.log_mode_selection(
req_id, "emulated", "fallback_native_error"
)
return state
elif state.mode == FunctionCallingMode.NATIVE:
# Native mode with no fallback - raise error
raise
except ClientDisconnectedError:
raise
except asyncio.CancelledError:
raise
except Exception as e:
state.error_message = f"Unexpected error: {e}"
if FUNCTION_CALLING_DEBUG:
self.logger.error(
f"[{req_id}] [FC] Unexpected error in FC prepare: {e}"
)
if state.mode == FunctionCallingMode.AUTO and self._config.native_fallback:
state.fallback_used = True
state.mode = FunctionCallingMode.EMULATED
return state
elif state.mode == FunctionCallingMode.NATIVE:
raise
return state
async def cleanup_after_request(
self,
state: FunctionCallingState,
page_controller: PageController,
check_client_disconnected: Callable,
req_id: str,
preserve_cache: bool = False,
) -> None:
"""Clean up function calling state after a request completes.
If configured to clear between requests and native mode was used,
this will clear the function declarations from the UI.
Args:
state: The function calling state from prepare_request.
page_controller: PageController instance for browser automation.
check_client_disconnected: Callback to check client connection.
req_id: Request ID for logging.
preserve_cache: If True, don't invalidate cache (for same-tool sequences).
"""
if not FUNCTION_CALLING_CLEAR_BETWEEN_REQUESTS:
if self._config.debug:
self.logger.debug(
f"[{req_id}] [FC] Skipping cleanup (CLEAR_BETWEEN_REQUESTS=False)"
)
return
if not state.tools_configured:
return
# If this was a cache hit, we didn't actually change anything
if state.cache_hit:
if self._config.debug:
self.logger.debug(
f"[{req_id}] [FC] Skipping cleanup (cache hit, no UI changes made)"
)
return
try:
start_time = time.perf_counter()
# Pass invalidate_cache=not preserve_cache to control cache behavior
await page_controller.clear_function_declarations(
check_client_disconnected,
invalidate_cache=not preserve_cache,
)
elapsed = time.perf_counter() - start_time
if self._config.debug:
self.logger.debug(
f"[{req_id}] [FC:Perf] Cleared function declarations in {elapsed:.2f}s"
)
except ClientDisconnectedError:
pass # Client gone, nothing to clean up
except asyncio.CancelledError:
pass # Cancelled, skip cleanup
except Exception as e:
if FUNCTION_CALLING_DEBUG:
self.logger.warning(
f"[{req_id}] [FC] Failed to clear function declarations: {e}"
)
def format_function_calls_for_response(
self,
functions: List[Dict[str, Any]],
content: Optional[str] = None,
) -> Tuple[Dict[str, Any], str]:
"""Format function call data from AI Studio into OpenAI response format.
Args:
functions: List of function call data from AI Studio.
Each item should have 'name' and 'params' keys.
content: Optional text content to include in the response.
Returns:
Tuple of (message_dict, finish_reason).
"""
if not functions:
return {"role": "assistant", "content": content or ""}, "stop"
parsed_calls: List[ParsedFunctionCall] = []
for func_data in functions:
if isinstance(func_data, dict):
name = func_data.get("name", "")
params = func_data.get("params", {})
if name:
parsed_calls.append(ParsedFunctionCall(name=name, arguments=params))
if not parsed_calls:
return {"role": "assistant", "content": content or ""}, "stop"
message = self._response_formatter.format_non_streaming_response(
parsed_calls, content=None
)
finish_reason = get_finish_reason(True)
return message, finish_reason
def format_streaming_tool_calls(
self,
functions: List[Dict[str, Any]],
chunk_size: int = 50,
) -> List[Dict[str, Any]]:
"""Format function calls for streaming response.
Generates all the delta chunks needed to stream function calls.
Args:
functions: List of function call data.
chunk_size: Size of each arguments chunk.
Returns:
List of delta objects for streaming.
"""
if not functions:
return []
all_chunks: List[Dict[str, Any]] = []
for idx, func_data in enumerate(functions):
if not isinstance(func_data, dict):
continue
name = func_data.get("name", "")
params = func_data.get("params", {})
if not name:
continue
parsed = ParsedFunctionCall(name=name, arguments=params)
chunks = self._response_formatter.format_streaming_chunks(
index=idx, parsed_call=parsed, chunk_size=chunk_size
)
all_chunks.extend(chunks)
return all_chunks
# Module-level singleton for convenience
_orchestrator: Optional[FunctionCallingOrchestrator] = None
def get_function_calling_orchestrator() -> FunctionCallingOrchestrator:
"""Get or create the global function calling orchestrator instance."""
global _orchestrator
if _orchestrator is None:
_orchestrator = FunctionCallingOrchestrator()
return _orchestrator
def reset_orchestrator() -> None:
"""Reset the global orchestrator (useful for testing)."""
global _orchestrator
_orchestrator = None
# =============================================================================
# Convenience Functions for Request Processing Integration
# =============================================================================
def should_skip_tool_injection(
tools: Optional[List[Dict[str, Any]]],
fc_state: Optional[FunctionCallingState] = None,
) -> bool:
"""Determine if tool catalog injection should be skipped.
In native mode, the tool catalog is configured via UI automation,
so we should skip injecting it into the prompt text.
When fc_state is provided (from prepare_request), it takes precedence
over static config to handle AUTO mode fallback correctly.
Args:
tools: List of tool definitions from the request.
fc_state: Optional dynamic state from FunctionCallingOrchestrator.
If provided, uses the actual resolved mode (handles fallback).
Returns:
True if tool injection should be skipped (native mode successfully configured),
False if tool catalog should be injected (emulated mode or fallback).
"""
if not tools or len(tools) == 0:
return True # No tools, nothing to inject anyway
# If we have dynamic state from the orchestrator, use it
# This correctly handles AUTO mode fallback scenarios
if fc_state is not None:
# Only skip injection if native mode was successfully configured
if fc_state.native_enabled and fc_state.tools_configured:
return True
# If fallback was used, we need to inject tools
if fc_state.fallback_used:
return False
# If mode is explicitly EMULATED (either configured or after fallback)
if fc_state.mode == FunctionCallingMode.EMULATED:
return False
# Native/Auto mode attempted but tools not configured - inject as fallback
if not fc_state.tools_configured:
return False
return True
# Fall back to static config check (backwards compatibility)
mode_str = FUNCTION_CALLING_MODE.lower()
# In emulated mode, always inject tools into prompt
if mode_str == "emulated":
return False
# In native or auto mode, ONLY skip if we're sure native mode is being used.
# If fc_state is None, we don't know the dynamic state, so we default to
# injecting tools into the prompt for safety (backwards compatibility).
return False
def get_effective_function_calling_mode() -> FunctionCallingMode:
"""Get the currently configured function calling mode.
Returns:
The FunctionCallingMode enum value.
"""
mode_str = FUNCTION_CALLING_MODE.lower()
try:
return FunctionCallingMode(mode_str)
except ValueError:
return FunctionCallingMode.EMULATED
__all__ = [
"FunctionCallingOrchestrator",
"FunctionCallingState",
"NativeFunctionCallingError",
"get_function_calling_orchestrator",
"reset_orchestrator",
"should_skip_tool_injection",
"get_effective_function_calling_mode",
]