Spaces:
Running
Running
| from __future__ import annotations | |
| import json | |
| import os | |
| import asyncio | |
| import time | |
| from collections import deque | |
| from dataclasses import dataclass, field | |
| from json import JSONDecodeError | |
| from typing import Any, Literal | |
| import httpx | |
| from openai import AsyncOpenAI | |
| from mcp import OpenDotaMCPServer | |
| HF_ROUTER_BASE_URL = "https://router.huggingface.co/v1" | |
| OPENAI_MODEL = os.getenv("OPENAI_MODEL", "Qwen/Qwen3.6-27B:featherless-ai") | |
| OPENAI_CALLS_PER_MINUTE = int(os.getenv("OPENAI_CALLS_PER_MINUTE", "60")) | |
| OPENAI_STREAM = os.getenv("OPENAI_STREAM", "").lower() in {"1", "true", "yes", "on"} | |
| MCP_PROTOCOL_VERSION = "2025-06-18" | |
| MAX_PROMPT_VALUE_CHARS = int(os.getenv("MAX_PROMPT_VALUE_CHARS", "4000")) | |
| OPENAI_MAX_TOKENS = int(os.getenv("OPENAI_MAX_TOKENS", "160")) | |
| FAST_CANDIDATE_COUNT = int(os.getenv("FAST_CANDIDATE_COUNT", "6")) | |
| FAST_DETAIL_COUNT = int(os.getenv("FAST_DETAIL_COUNT", "2")) | |
| FAST_MATCHUP_COUNT = int(os.getenv("FAST_MATCHUP_COUNT", "0")) | |
| MCP_PREFETCH_TIMEOUT_SECONDS = float(os.getenv("MCP_PREFETCH_TIMEOUT_SECONDS", "4")) | |
| AGENT_DEBUG = os.getenv("AGENT_DEBUG", "").lower() in {"1", "true", "yes", "on"} | |
| DEFAULT_HERO_POOL = [ | |
| "Puck", | |
| "Pudge", | |
| "Invoker", | |
| "Shadow Fiend", | |
| "Rubick", | |
| "Earthshaker", | |
| "Storm Spirit", | |
| "Ember Spirit", | |
| "Crystal Maiden", | |
| "Anti-Mage", | |
| "Phantom Assassin", | |
| "Juggernaut", | |
| "Lina", | |
| "Lion", | |
| "Axe", | |
| "Doom", | |
| "Batrider", | |
| "Beastmaster", | |
| "Mars", | |
| "Tidehunter", | |
| "Dark Seer", | |
| "Chen", | |
| "Enchantress", | |
| "Io", | |
| "Disruptor", | |
| "Shadow Demon", | |
| ] | |
| STATIC_HERO_POOL: list[str] = [] | |
| _TOOL_RESULT_CACHE: dict[str, Any] = {} | |
| _OPENAI_CLIENTS: dict[str, AsyncOpenAI] = {} | |
| _MCP_CLIENTS: dict[str, OpenDotaMCPHttpClient] = {} | |
| _CACHE_LOCK = asyncio.Lock() | |
| class AsyncSlidingWindowRateLimiter: | |
| def __init__(self, max_calls: int, window_seconds: float): | |
| self.max_calls = max_calls | |
| self.window_seconds = window_seconds | |
| self._calls: deque[float] = deque() | |
| self._lock = asyncio.Lock() | |
| async def acquire(self) -> None: | |
| if self.max_calls <= 0: | |
| return | |
| while True: | |
| async with self._lock: | |
| now = time.monotonic() | |
| while self._calls and now - self._calls[0] >= self.window_seconds: | |
| self._calls.popleft() | |
| if len(self._calls) < self.max_calls: | |
| self._calls.append(now) | |
| return | |
| sleep_for = self.window_seconds - (now - self._calls[0]) | |
| await asyncio.sleep(max(sleep_for, 0.05)) | |
| OPENAI_RATE_LIMITER = AsyncSlidingWindowRateLimiter( | |
| max_calls=OPENAI_CALLS_PER_MINUTE, | |
| window_seconds=60, | |
| ) | |
| class DraftTeam: | |
| name: str | |
| side: str | |
| picks: list[str] = field(default_factory=list) | |
| bans: list[str] = field(default_factory=list) | |
| player_ids: list[str] = field(default_factory=list) | |
| class DraftRecommendationRequest: | |
| action: Literal["Pick", "Ban"] | |
| step_number: int | |
| phase: str | |
| slot: int | |
| you: DraftTeam | |
| opponent: DraftTeam | |
| unavailable_heroes: list[str] | |
| opendota_api_key: str = "" | |
| class OpenDotaMCPHttpClient: | |
| def __init__(self, server: OpenDotaMCPServer | None = None, api_key: str = ""): | |
| self.server = server or OpenDotaMCPServer() | |
| self.api_key = api_key | |
| self._client: httpx.AsyncClient | None = None | |
| self._session_id = "" | |
| self._request_id = 0 | |
| async def __aenter__(self) -> OpenDotaMCPHttpClient: | |
| if not self.server.is_http: | |
| raise ValueError("Only the OpenDota MCP HTTP transport is supported by this client.") | |
| self._client = httpx.AsyncClient(timeout=30) | |
| await self._initialize() | |
| return self | |
| async def __aexit__(self, *_: object) -> None: | |
| if self._client: | |
| await self._client.aclose() | |
| async def list_tools(self) -> list[dict[str, Any]]: | |
| result = await self._request("tools/list", {}) | |
| return result.get("tools", []) | |
| async def call_tool(self, name: str, arguments: dict[str, Any]) -> Any: | |
| return await self._request("tools/call", {"name": name, "arguments": arguments}) | |
| async def _initialize(self) -> None: | |
| result, headers = await self._post( | |
| "initialize", | |
| { | |
| "protocolVersion": MCP_PROTOCOL_VERSION, | |
| "capabilities": {"tools": {}}, | |
| "clientInfo": {"name": "dota-draft-agent", "version": "0.1.0"}, | |
| }, | |
| include_session=False, | |
| ) | |
| self._session_id = headers.get("mcp-session-id", "") | |
| if not result: | |
| raise RuntimeError("OpenDota MCP server did not return an initialize result.") | |
| await self._notify_initialized() | |
| async def _notify_initialized(self) -> None: | |
| assert self._client is not None | |
| headers = self._headers(include_session=True) | |
| message = {"jsonrpc": "2.0", "method": "notifications/initialized"} | |
| await self._client.post(self.server.url, json=message, headers=headers) | |
| async def _request(self, method: str, params: dict[str, Any]) -> Any: | |
| result, _ = await self._post(method, params, include_session=True) | |
| return result | |
| async def _post( | |
| self, | |
| method: str, | |
| params: dict[str, Any], | |
| include_session: bool, | |
| ) -> tuple[Any, httpx.Headers]: | |
| assert self._client is not None | |
| self._request_id += 1 | |
| message = { | |
| "jsonrpc": "2.0", | |
| "id": self._request_id, | |
| "method": method, | |
| "params": params, | |
| } | |
| response = await self._client.post( | |
| self.server.url, | |
| json=message, | |
| headers=self._headers(include_session=include_session), | |
| ) | |
| response.raise_for_status() | |
| payload = _decode_mcp_response(response) | |
| if "error" in payload: | |
| raise RuntimeError(payload["error"]) | |
| return payload.get("result"), response.headers | |
| def _headers(self, include_session: bool) -> dict[str, str]: | |
| headers = { | |
| "Accept": "application/json, text/event-stream", | |
| "Content-Type": "application/json", | |
| } | |
| if include_session and self._session_id: | |
| headers["Mcp-Session-Id"] = self._session_id | |
| if self.api_key: | |
| headers["X-OpenDota-API-Key"] = self.api_key | |
| return headers | |
| async def recommend_draft_action(request: DraftRecommendationRequest) -> str: | |
| context = await build_fast_recommendation_context(request) | |
| try: | |
| recommendation = await _call_openai_for_recommendation(request, context) | |
| except Exception as exc: | |
| return f"Recommendation unavailable: OpenAI chat completions request failed ({exc})" | |
| if not recommendation: | |
| return "Recommendation unavailable: the agent did not return a final pick or ban." | |
| return _format_recommendation(request, recommendation, context.get("observations", [])) | |
| def warm_static_dota_data(hero_names: list[str] | None = None) -> None: | |
| """Seed static hero data at app startup without network calls.""" | |
| global STATIC_HERO_POOL | |
| names = hero_names or DEFAULT_HERO_POOL | |
| seen: set[str] = set() | |
| STATIC_HERO_POOL = [] | |
| for name in [*names, *DEFAULT_HERO_POOL]: | |
| normalized = _normalize_hero_name(name) | |
| if normalized and normalized not in seen: | |
| STATIC_HERO_POOL.append(name) | |
| seen.add(normalized) | |
| async def build_fast_recommendation_context( | |
| request: DraftRecommendationRequest, | |
| ) -> dict[str, Any]: | |
| candidates = _select_candidate_heroes(request) | |
| context: dict[str, Any] = { | |
| "candidate_heroes": candidates, | |
| "observations": [], | |
| "prefetch_errors": [], | |
| } | |
| try: | |
| client = await _get_mcp_client(request.opendota_api_key) | |
| context["observations"] = await asyncio.wait_for( | |
| _prefetch_draft_observations( | |
| client, | |
| request, | |
| candidates, | |
| ), | |
| timeout=MCP_PREFETCH_TIMEOUT_SECONDS, | |
| ) | |
| except asyncio.TimeoutError: | |
| context["prefetch_errors"].append( | |
| f"OpenDota MCP prefetch timed out after {MCP_PREFETCH_TIMEOUT_SECONDS:g}s" | |
| ) | |
| except Exception as exc: | |
| context["prefetch_errors"].append(str(exc)) | |
| if AGENT_DEBUG: | |
| _debug_log( | |
| "fast context", | |
| { | |
| "candidates": len(candidates), | |
| "observations": len(context["observations"]), | |
| "prefetch_errors": context["prefetch_errors"], | |
| }, | |
| ) | |
| return context | |
| async def _get_mcp_client(api_key: str) -> OpenDotaMCPHttpClient: | |
| probe = OpenDotaMCPHttpClient(api_key=api_key) | |
| cache_key = f"{probe.server.url}\0{api_key}" | |
| async with _CACHE_LOCK: | |
| cached = _MCP_CLIENTS.get(cache_key) | |
| if cached is not None: | |
| return cached | |
| try: | |
| client = await probe.__aenter__() | |
| except Exception: | |
| await probe.__aexit__(None, None, None) | |
| raise | |
| async with _CACHE_LOCK: | |
| cached = _MCP_CLIENTS.get(cache_key) | |
| if cached is not None: | |
| await client.__aexit__(None, None, None) | |
| return cached | |
| _MCP_CLIENTS[cache_key] = client | |
| return client | |
| def _select_candidate_heroes( | |
| request: DraftRecommendationRequest, | |
| limit: int | None = FAST_CANDIDATE_COUNT, | |
| ) -> list[str]: | |
| unavailable = {_normalize_hero_name(hero) for hero in request.unavailable_heroes} | |
| pool = STATIC_HERO_POOL or DEFAULT_HERO_POOL | |
| candidates: list[str] = [] | |
| candidate_names: set[str] = set() | |
| priority_names = [ | |
| *request.opponent.picks, | |
| *request.you.picks, | |
| *pool, | |
| ] | |
| for hero in priority_names: | |
| normalized = _normalize_hero_name(hero) | |
| if not normalized or normalized in unavailable: | |
| continue | |
| if normalized in candidate_names: | |
| continue | |
| candidates.append(hero) | |
| candidate_names.add(normalized) | |
| if limit is not None and len(candidates) >= limit: | |
| break | |
| return candidates | |
| def _normalize_hero_name(name: str) -> str: | |
| return "".join(char for char in name.lower() if char.isalnum()) | |
| async def _prefetch_draft_observations( | |
| client: OpenDotaMCPHttpClient, | |
| request: DraftRecommendationRequest, | |
| candidates: list[str], | |
| ) -> list[dict[str, Any]]: | |
| observations: list[dict[str, Any]] = [] | |
| detail_targets = candidates[:FAST_DETAIL_COUNT] | |
| matchup_targets = [ | |
| *request.opponent.picks, | |
| *request.you.picks, | |
| *candidates[:1], | |
| ][:FAST_MATCHUP_COUNT] | |
| detail_tasks = [ | |
| _call_tool_observation( | |
| client, | |
| "get_hero_details", | |
| {"hero": hero}, | |
| compact_keys=( | |
| "id", | |
| "localized_name", | |
| "primary_attr", | |
| "attack_type", | |
| "roles", | |
| "base_health", | |
| "base_mana", | |
| "base_armor", | |
| "base_attack_min", | |
| "base_attack_max", | |
| "move_speed", | |
| ), | |
| ) | |
| for hero in detail_targets | |
| ] | |
| relevant_names = { | |
| _normalize_hero_name(hero) | |
| for hero in [ | |
| *request.you.picks, | |
| *request.opponent.picks, | |
| *candidates, | |
| ] | |
| } | |
| matchup_tasks = [ | |
| _call_tool_observation(client, "get_hero_matchups", {"hero": hero}) | |
| for hero in matchup_targets | |
| ] | |
| detail_observations, matchup_observations = await asyncio.gather( | |
| asyncio.gather(*detail_tasks), | |
| asyncio.gather(*matchup_tasks), | |
| ) | |
| observations.extend(detail_observations) | |
| for observation in matchup_observations: | |
| observation["result"] = _compact_matchups(observation.get("result"), relevant_names) | |
| observations.append(observation) | |
| return observations | |
| async def _call_tool_observation( | |
| client: OpenDotaMCPHttpClient, | |
| name: str, | |
| arguments: dict[str, Any], | |
| compact_keys: tuple[str, ...] = (), | |
| ) -> dict[str, Any]: | |
| try: | |
| result = await _call_tool_cached(client, name, arguments) | |
| result = _normalize_tool_result(result) | |
| if compact_keys and isinstance(result, dict): | |
| result = {key: result.get(key) for key in compact_keys if key in result} | |
| return {"tool": name, "arguments": arguments, "result": result} | |
| except Exception as exc: | |
| return {"tool": name, "arguments": arguments, "result": {"tool_error": str(exc)}} | |
| async def _call_tool_cached( | |
| client: OpenDotaMCPHttpClient, | |
| name: str, | |
| arguments: dict[str, Any], | |
| ) -> Any: | |
| cache_key = json.dumps( | |
| { | |
| "server": client.server.url, | |
| "tool": name, | |
| "arguments": arguments, | |
| }, | |
| sort_keys=True, | |
| ensure_ascii=True, | |
| ) | |
| async with _CACHE_LOCK: | |
| cached = _TOOL_RESULT_CACHE.get(cache_key) | |
| if cached is not None: | |
| return cached | |
| result = await client.call_tool(name, arguments) | |
| async with _CACHE_LOCK: | |
| _TOOL_RESULT_CACHE[cache_key] = result | |
| return result | |
| def _compact_matchups(result: Any, relevant_names: set[str]) -> Any: | |
| if not isinstance(result, list): | |
| return result | |
| compact = [] | |
| for item in result: | |
| if not isinstance(item, dict): | |
| continue | |
| hero_name = item.get("hero_name", "") | |
| if _normalize_hero_name(hero_name) not in relevant_names: | |
| continue | |
| compact.append( | |
| { | |
| "hero_name": hero_name, | |
| "games": item.get("games"), | |
| "win_rate": item.get("win_rate"), | |
| } | |
| ) | |
| return compact[:12] | |
| async def _call_openai_for_recommendation( | |
| request: DraftRecommendationRequest, | |
| context: dict[str, Any], | |
| ) -> dict[str, Any]: | |
| api_key = os.getenv("HF_TOKEN", "") | |
| if not api_key: | |
| raise RuntimeError("HF_TOKEN is not set") | |
| messages = [ | |
| { | |
| "role": "system", | |
| "content": ( | |
| "Respond with JSON only. Your first character must be `{` and your last " | |
| "character must be `}`. Do not include analysis, planning, markdown, " | |
| "code fences, or explanatory text. You are a Dota 2 Captains Mode draft " | |
| "advisor. Make a final recommendation now from the supplied draft state " | |
| "and prefetched OpenDota evidence. Do not request tools. Do not recommend " | |
| "a hero that has already been picked or banned. Keep rationale under 25 " | |
| "words and stats_used to at most two short items." | |
| ), | |
| }, | |
| { | |
| "role": "user", | |
| "content": "/no_think\n" | |
| + json.dumps( | |
| { | |
| "draft_request": _request_to_json(request), | |
| "candidate_heroes": context.get("candidate_heroes", []), | |
| "prefetched_evidence": _trim_for_prompt(context.get("observations", [])), | |
| "prefetch_errors": context.get("prefetch_errors", []), | |
| "response_schema": { | |
| "recommendation": { | |
| "hero": "Hero name", | |
| "action": request.action, | |
| "confidence": "low|medium|high", | |
| "rationale": "under 25 words", | |
| "stats_used": ["up to two brief facts from prefetched evidence"], | |
| } | |
| }, | |
| }, | |
| ensure_ascii=True, | |
| ), | |
| }, | |
| ] | |
| await OPENAI_RATE_LIMITER.acquire() | |
| if AGENT_DEBUG: | |
| _debug_log( | |
| "final model request", | |
| { | |
| "model": OPENAI_MODEL, | |
| "observations": len(context.get("observations", [])), | |
| "prompt_chars": sum(len(message["content"]) for message in messages), | |
| }, | |
| ) | |
| content = await _create_chat_completion(api_key, messages) | |
| try: | |
| decision = _parse_json_object(content) | |
| except JSONDecodeError: | |
| if AGENT_DEBUG: | |
| _debug_log("JSON parse failed; retrying final prompt", {"prefix": content[:240]}) | |
| retry_messages = [ | |
| { | |
| **messages[0], | |
| "content": ( | |
| "Return exactly one valid JSON object and nothing else. " | |
| "No reasoning. No prose. Start with `{`. End with `}`. " | |
| + messages[0]["content"] | |
| ), | |
| }, | |
| messages[1], | |
| ] | |
| decision = _parse_json_object(await _create_chat_completion(api_key, retry_messages)) | |
| return decision.get("recommendation", decision) | |
| async def _create_chat_completion(api_key: str, messages: list[dict[str, str]]) -> str: | |
| if not OPENAI_STREAM: | |
| return await _create_non_stream_chat_completion(api_key, messages, 0, []) | |
| content_chunks: list[str] = [] | |
| alternate_chunks: list[str] = [] | |
| chunk_count = 0 | |
| finish_reasons: list[str] = [] | |
| client = await _get_openai_client(api_key) | |
| stream = await client.chat.completions.create( | |
| model=OPENAI_MODEL, | |
| messages=messages, | |
| temperature=0.2, | |
| max_tokens=OPENAI_MAX_TOKENS, | |
| stream=True, | |
| **_model_extra_kwargs(), | |
| ) | |
| async for chunk in stream: | |
| chunk_count += 1 | |
| if chunk.choices: | |
| choice = chunk.choices[0] | |
| if choice.finish_reason: | |
| finish_reasons.append(choice.finish_reason) | |
| content = _extract_primary_text(choice.delta) | |
| if content: | |
| content_chunks.append(content) | |
| else: | |
| alternate = _extract_alternate_text(choice.delta) | |
| if alternate: | |
| alternate_chunks.append(alternate) | |
| content = "".join(content_chunks) | |
| if content: | |
| return content | |
| alternate_content = "".join(alternate_chunks) | |
| if alternate_content: | |
| return alternate_content | |
| if AGENT_DEBUG: | |
| _debug_log( | |
| "stream response was empty; retrying non-stream", | |
| {"chunks": chunk_count, "finish_reasons": finish_reasons}, | |
| ) | |
| return await _create_non_stream_chat_completion( | |
| api_key, | |
| messages, | |
| chunk_count, | |
| finish_reasons, | |
| ) | |
| async def _create_non_stream_chat_completion( | |
| api_key: str, | |
| messages: list[dict[str, str]], | |
| empty_stream_chunks: int, | |
| empty_stream_finish_reasons: list[str], | |
| ) -> str: | |
| client = await _get_openai_client(api_key) | |
| response = await client.chat.completions.create( | |
| model=OPENAI_MODEL, | |
| messages=messages, | |
| temperature=0.2, | |
| max_tokens=OPENAI_MAX_TOKENS, | |
| stream=False, | |
| **_model_extra_kwargs(), | |
| ) | |
| message = response.choices[0].message if response.choices else None | |
| content = _extract_primary_text(message) if message else "" | |
| if not content and message: | |
| content = _extract_alternate_text(message) | |
| if content: | |
| return content | |
| finish_reasons = [ | |
| choice.finish_reason | |
| for choice in response.choices | |
| if choice.finish_reason | |
| ] | |
| raise RuntimeError( | |
| "model returned an empty response " | |
| f"(stream chunks={empty_stream_chunks}, " | |
| f"stream finish_reasons={empty_stream_finish_reasons or ['unknown']}, " | |
| f"non_stream finish_reasons={finish_reasons or ['unknown']})" | |
| ) | |
| async def _get_openai_client(api_key: str) -> AsyncOpenAI: | |
| async with _CACHE_LOCK: | |
| client = _OPENAI_CLIENTS.get(api_key) | |
| if client is None: | |
| client = AsyncOpenAI( | |
| base_url=HF_ROUTER_BASE_URL, | |
| api_key=api_key, | |
| timeout=30, | |
| max_retries=0, | |
| ) | |
| _OPENAI_CLIENTS[api_key] = client | |
| return client | |
| def _extract_primary_text(message_part: Any) -> str: | |
| if not message_part: | |
| return "" | |
| for field in ("content", "text"): | |
| value = getattr(message_part, field, None) | |
| if isinstance(value, str) and value: | |
| return value | |
| if isinstance(message_part, dict): | |
| for field in ("content", "text"): | |
| value = message_part.get(field) | |
| if isinstance(value, str) and value: | |
| return value | |
| return "" | |
| def _extract_alternate_text(message_part: Any) -> str: | |
| if not message_part: | |
| return "" | |
| for field in ("reasoning_content", "reasoning"): | |
| value = getattr(message_part, field, None) | |
| if isinstance(value, str) and value: | |
| return value | |
| if isinstance(message_part, dict): | |
| for field in ("reasoning_content", "reasoning"): | |
| value = message_part.get(field) | |
| if isinstance(value, str) and value: | |
| return value | |
| return "" | |
| def _model_extra_kwargs() -> dict[str, Any]: | |
| if "qwen" not in OPENAI_MODEL.lower(): | |
| return {} | |
| return {"extra_body": {"chat_template_kwargs": {"enable_thinking": False}}} | |
| def _trim_for_prompt(value: Any, max_chars: int = MAX_PROMPT_VALUE_CHARS) -> Any: | |
| if isinstance(value, str): | |
| if len(value) <= max_chars: | |
| return value | |
| return value[:max_chars] + f"... [truncated {len(value) - max_chars} chars]" | |
| if isinstance(value, list): | |
| return [_trim_for_prompt(item, max_chars) for item in value] | |
| if isinstance(value, dict): | |
| return {key: _trim_for_prompt(item, max_chars) for key, item in value.items()} | |
| return value | |
| def _debug_log(message: str, details: dict[str, Any]) -> None: | |
| print(f"[agent] {message}: {json.dumps(details, ensure_ascii=True)}", flush=True) | |
| def _parse_json_object(content: str) -> dict[str, Any]: | |
| text = content.strip() | |
| if text.startswith("```"): | |
| lines = text.splitlines() | |
| if lines and lines[0].startswith("```"): | |
| lines = lines[1:] | |
| if lines and lines[-1].startswith("```"): | |
| lines = lines[:-1] | |
| text = "\n".join(lines).strip() | |
| if not text.startswith("{"): | |
| extracted = _extract_json_object_text(text) | |
| if extracted: | |
| text = extracted | |
| return json.loads(text) | |
| def _extract_json_object_text(text: str) -> str: | |
| start = text.find("{") | |
| if start < 0: | |
| return "" | |
| depth = 0 | |
| in_string = False | |
| escaped = False | |
| for index in range(start, len(text)): | |
| char = text[index] | |
| if escaped: | |
| escaped = False | |
| continue | |
| if char == "\\": | |
| escaped = True | |
| continue | |
| if char == '"': | |
| in_string = not in_string | |
| continue | |
| if in_string: | |
| continue | |
| if char == "{": | |
| depth += 1 | |
| elif char == "}": | |
| depth -= 1 | |
| if depth == 0: | |
| return text[start : index + 1] | |
| return "" | |
| def _decode_mcp_response(response: httpx.Response) -> dict[str, Any]: | |
| text = response.text.strip() | |
| if not text: | |
| return {} | |
| if "text/event-stream" in response.headers.get("content-type", ""): | |
| data_lines = [ | |
| line.removeprefix("data:").strip() | |
| for line in text.splitlines() | |
| if line.startswith("data:") | |
| ] | |
| text = data_lines[-1] if data_lines else "{}" | |
| return json.loads(text) | |
| def _normalize_tool_result(result: Any) -> Any: | |
| if isinstance(result, dict) and "content" in result: | |
| content = result["content"] | |
| if isinstance(content, list): | |
| return [ | |
| item.get("text", item) | |
| if isinstance(item, dict) | |
| else getattr(item, "text", str(item)) | |
| for item in content | |
| ] | |
| return result | |
| def _request_to_json(request: DraftRecommendationRequest) -> dict[str, Any]: | |
| return { | |
| "action": request.action, | |
| "step_number": request.step_number, | |
| "phase": request.phase, | |
| "slot": request.slot, | |
| "you": request.you.__dict__, | |
| "opponent": request.opponent.__dict__, | |
| "unavailable_heroes": request.unavailable_heroes, | |
| } | |
| def _format_recommendation( | |
| request: DraftRecommendationRequest, | |
| recommendation: dict[str, Any], | |
| observations: list[dict[str, Any]], | |
| ) -> str: | |
| hero = recommendation.get("hero", "Unknown hero") | |
| action = recommendation.get("action", request.action) | |
| confidence = recommendation.get("confidence", "unknown") | |
| rationale = recommendation.get("rationale", "No rationale returned.") | |
| stats = recommendation.get("stats_used") or [] | |
| stats_text = "\n".join(f"- {item}" for item in stats) if stats else "- No stat summary returned." | |
| tool_text = ", ".join(item["tool"] for item in observations) or "none" | |
| return ( | |
| f"Recommended {action}: **{hero}**\n\n" | |
| f"Confidence: `{confidence}`\n\n" | |
| f"{rationale}\n\n" | |
| f"Stats used:\n{stats_text}\n\n" | |
| f"OpenDota MCP tools called: {tool_text}" | |
| ) | |
| warm_static_dota_data() | |