File size: 14,915 Bytes
cd7c282 809ad60 cd7c282 809ad60 cd7c282 809ad60 cd7c282 3572ba0 cd7c282 809ad60 cd7c282 8c0ec2b cd7c282 a4327d1 cd7c282 3572ba0 ae5413a 3572ba0 a4327d1 cd7c282 3f60cec cd7c282 2b44c5a cd7c282 809ad60 2b44c5a 809ad60 2b44c5a 809ad60 2b44c5a 809ad60 2b44c5a 809ad60 cd7c282 8c0ec2b 809ad60 8c0ec2b cd7c282 8c0ec2b cd7c282 8c0ec2b cd7c282 8c0ec2b cd7c282 8c0ec2b cd7c282 8c0ec2b cd7c282 8c0ec2b cd7c282 8c0ec2b cd7c282 8c0ec2b cd7c282 ecaa2e8 cd7c282 ecaa2e8 cd7c282 ecaa2e8 809ad60 ecaa2e8 cd7c282 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 |
"""HuggingFace Chat Client adapter for Microsoft Agent Framework.
This client enables the use of HuggingFace Inference API (including the free tier)
as a backend for the agent framework, allowing "Advanced Mode" to work without
an OpenAI API key.
"""
import asyncio
import json
from collections.abc import AsyncIterable, MutableSequence
from functools import partial
from typing import Any, cast
import structlog
from agent_framework import (
BaseChatClient,
ChatMessage,
ChatOptions,
ChatResponse,
ChatResponseUpdate,
FinishReason,
Role,
)
from agent_framework._middleware import use_chat_middleware
from agent_framework._tools import use_function_invocation
from agent_framework._types import FunctionCallContent, FunctionResultContent
from agent_framework.observability import use_observability
from huggingface_hub import InferenceClient
from src.middleware import RetryMiddleware, TokenTrackingMiddleware
from src.utils.config import settings
logger = structlog.get_logger()
@use_function_invocation
@use_observability
@use_chat_middleware
class HuggingFaceChatClient(BaseChatClient): # type: ignore[misc]
"""Adapter for HuggingFace Inference API with full function calling support."""
def __init__(
self,
model_id: str | None = None,
api_key: str | None = None,
**kwargs: Any,
) -> None:
"""Initialize the HuggingFace chat client.
Args:
model_id: The HuggingFace model ID (default: configured value or Qwen2.5-7B).
api_key: HF_TOKEN (optional, defaults to env var).
**kwargs: Additional arguments passed to BaseChatClient.
"""
# Create middleware instances for retry and token tracking
middleware = [
RetryMiddleware(max_attempts=3, min_wait=1.0, max_wait=10.0),
TokenTrackingMiddleware(),
]
super().__init__(middleware=middleware, **kwargs) # type: ignore[arg-type]
# FIX: Use 7B model to stay on HuggingFace native infrastructure (avoid Novita 500s)
self.model_id = model_id or settings.huggingface_model or "Qwen/Qwen2.5-7B-Instruct"
self.api_key = api_key or settings.hf_token
# Initialize the HF Inference Client
# timeout=60 to prevent premature timeouts on long reasonings
self._client = InferenceClient(
model=self.model_id,
token=self.api_key,
timeout=60,
)
logger.info("Initialized HuggingFaceChatClient", model=self.model_id)
def _convert_messages(self, messages: MutableSequence[ChatMessage]) -> list[dict[str, Any]]:
"""Convert framework messages to HuggingFace format."""
hf_messages: list[dict[str, Any]] = []
# Track call_id -> tool_name mapping for tool result messages
# Assistant messages with tool_calls come before tool result messages
call_id_to_name: dict[str, str] = {}
for msg in messages:
# msg.role can be string or enum - extract .value for enums
if hasattr(msg.role, "value"):
role_str = str(msg.role.value)
else:
role_str = str(msg.role)
content_str = msg.text or ""
tool_calls = []
tool_call_id = None
tool_name = None
# Process contents for tool calls and results
if msg.contents:
for item in msg.contents:
if isinstance(item, FunctionCallContent):
# This is an assistant message invoking a tool
# Track call_id -> name for later tool result messages
call_id_to_name[item.call_id] = item.name
tool_calls.append(
{
"id": item.call_id,
"type": "function",
"function": {
"name": item.name,
"arguments": (
item.arguments
if isinstance(item.arguments, str)
else json.dumps(item.arguments)
),
},
}
)
elif isinstance(item, FunctionResultContent):
# This is a tool result message
role_str = "tool"
tool_call_id = item.call_id
# Look up tool name from prior FunctionCallContent
tool_name = call_id_to_name.get(item.call_id)
# For tool results, JSON-encode structured data
# HuggingFace/OpenAI expects string content
if item.result is None:
content_str = ""
elif isinstance(item.result, str):
content_str = item.result
else:
content_str = json.dumps(item.result)
message_dict: dict[str, Any] = {"role": role_str, "content": content_str}
if tool_calls:
message_dict["tool_calls"] = tool_calls
if tool_call_id:
message_dict["tool_call_id"] = tool_call_id
# Add name field if we tracked it (required by some APIs)
if tool_name:
message_dict["name"] = tool_name
hf_messages.append(message_dict)
return hf_messages
def _convert_tools(self, tools: list[Any] | None) -> list[dict[str, Any]] | None:
"""Convert AIFunction objects to OpenAI-compatible tool definitions.
AIFunction.to_dict() returns:
{'type': 'ai_function', 'name': '...', 'input_model': {...}}
OpenAI/HuggingFace expects:
{'type': 'function', 'function': {'name': '...', 'parameters': {...}}}
"""
if not tools:
return None
json_tools = []
for tool in tools:
if hasattr(tool, "to_dict"):
try:
t_dict = tool.to_dict()
json_tools.append(
{
"type": "function",
"function": {
"name": t_dict["name"],
"description": t_dict.get("description", ""),
"parameters": t_dict["input_model"],
},
}
)
except (KeyError, TypeError) as e:
logger.warning("Failed to convert tool", tool=str(tool), error=str(e))
elif isinstance(tool, dict):
# Already a dict - assume correct format
json_tools.append(tool)
else:
logger.warning("Skipping non-serializable tool", tool_type=str(type(tool)))
return json_tools if json_tools else None
def _parse_tool_calls(self, message: Any) -> list[FunctionCallContent]:
"""Parse HuggingFace tool_calls into framework FunctionCallContent."""
contents: list[FunctionCallContent] = []
if not hasattr(message, "tool_calls") or not message.tool_calls:
return contents
for tc in message.tool_calls:
try:
contents.append(
FunctionCallContent(
call_id=tc.id,
name=tc.function.name,
arguments=tc.function.arguments, # JSON string or dict
)
)
except (AttributeError, TypeError) as e:
logger.warning("Failed to parse tool call", error=str(e))
return contents
async def _inner_get_response(
self,
*,
messages: MutableSequence[ChatMessage],
chat_options: ChatOptions,
**kwargs: Any,
) -> ChatResponse:
"""Synchronous response generation using chat_completion."""
hf_messages = self._convert_messages(messages)
# Convert AIFunction objects to OpenAI-compatible JSON
tools = self._convert_tools(chat_options.tools if chat_options.tools else None)
# HF expects 'tool_choice' to be 'auto', 'none', or specific tool
# Framework uses ToolMode enum or dict
hf_tool_choice: str | None = None
if tools and chat_options.tool_choice is not None:
tool_choice_str = str(chat_options.tool_choice)
if "AUTO" in tool_choice_str:
hf_tool_choice = "auto"
# For NONE or other, leave as None
try:
# Use explicit None checks - 'or' treats 0/0.0 as falsy
# temperature=0.0 is valid (deterministic output)
max_tokens = chat_options.max_tokens if chat_options.max_tokens is not None else 2048
temperature = chat_options.temperature if chat_options.temperature is not None else 0.7
# Use partial to create a callable with keyword args for to_thread
call_fn = partial(
self._client.chat_completion,
messages=hf_messages,
tools=tools,
tool_choice=hf_tool_choice,
max_tokens=max_tokens,
temperature=temperature,
stream=False,
)
response = await asyncio.to_thread(call_fn)
# Parse response
# HF returns a ChatCompletionOutput
choices = response.choices
if not choices:
return ChatResponse(messages=[], response_id="error-no-choices")
choice = choices[0]
message = choice.message
message_content = message.content or ""
# Parse tool calls if present
tool_call_contents = self._parse_tool_calls(message)
# Construct response message with tool calls in contents
response_msg = ChatMessage(
role=cast(Any, message.role),
text=message_content,
contents=tool_call_contents if tool_call_contents else None,
)
return ChatResponse(
messages=[response_msg],
response_id=response.id or "hf-response",
)
except Exception as e:
logger.error("HuggingFace API error", error=str(e))
raise
async def _inner_get_streaming_response(
self,
*,
messages: MutableSequence[ChatMessage],
chat_options: ChatOptions,
**kwargs: Any,
) -> AsyncIterable[ChatResponseUpdate]:
"""Streaming response generation."""
hf_messages = self._convert_messages(messages)
# Convert AIFunction objects to OpenAI-compatible JSON
tools = self._convert_tools(chat_options.tools if chat_options.tools else None)
hf_tool_choice: str | None = None
if tools and chat_options.tool_choice is not None:
if "AUTO" in str(chat_options.tool_choice):
hf_tool_choice = "auto"
try:
# Use explicit None checks - 'or' treats 0/0.0 as falsy
# temperature=0.0 is valid (deterministic output)
max_tokens = chat_options.max_tokens if chat_options.max_tokens is not None else 2048
temperature = chat_options.temperature if chat_options.temperature is not None else 0.7
# Use partial for streaming call
call_fn = partial(
self._client.chat_completion,
messages=hf_messages,
tools=tools,
tool_choice=hf_tool_choice,
max_tokens=max_tokens,
temperature=temperature,
stream=True,
)
stream = await asyncio.to_thread(call_fn)
# Accumulator for tool calls (index -> dict)
# We need to accumulate because deltas are partial
tool_call_accumulator: dict[int, dict[str, Any]] = {}
for chunk in stream:
# Chunk is ChatCompletionStreamOutput
if not chunk.choices:
continue
choice = chunk.choices[0]
delta = choice.delta
# 1. Handle Text Content
if delta.content:
yield ChatResponseUpdate(
role=cast(Any, delta.role) if delta.role else None,
text=delta.content,
)
# 2. Handle Tool Calls (Accumulate)
if hasattr(delta, "tool_calls") and delta.tool_calls:
for tc in delta.tool_calls:
idx = tc.index
if idx not in tool_call_accumulator:
tool_call_accumulator[idx] = {
"id": "",
"name": "",
"arguments": "",
}
# Merge fields
if tc.id:
tool_call_accumulator[idx]["id"] += tc.id
if tc.function:
if tc.function.name:
tool_call_accumulator[idx]["name"] += tc.function.name
if tc.function.arguments:
tool_call_accumulator[idx]["arguments"] += tc.function.arguments
# Yield control to event loop
await asyncio.sleep(0)
# 3. Yield Accumulated Tool Calls
if tool_call_accumulator:
contents: list[FunctionCallContent] = []
for idx in sorted(tool_call_accumulator.keys()):
tc_data = tool_call_accumulator[idx]
# Only yield if ID and Name are present (required by FunctionCallContent)
if tc_data["id"] and tc_data["name"]:
contents.append(
FunctionCallContent(
call_id=tc_data["id"],
name=tc_data["name"],
arguments=tc_data["arguments"],
)
)
if contents:
yield ChatResponseUpdate(
contents=contents,
role=Role.ASSISTANT,
finish_reason=FinishReason.TOOL_CALLS,
)
except Exception as e:
logger.error("HuggingFace Streaming error", error=str(e))
raise
|