| from __future__ import annotations |
|
|
| import json |
| import os |
| import asyncio |
| import time |
| from collections import deque |
| from dataclasses import dataclass, field |
| from json import JSONDecodeError |
| from typing import Any, Literal |
|
|
| import httpx |
| from openai import AsyncOpenAI |
|
|
| from mcp import OpenDotaMCPServer |
|
|
|
|
| HF_ROUTER_BASE_URL = "https://router.huggingface.co/v1" |
| OPENAI_MODEL = os.getenv("OPENAI_MODEL", "Qwen/Qwen3.6-27B:featherless-ai") |
| OPENAI_CALLS_PER_MINUTE = int(os.getenv("OPENAI_CALLS_PER_MINUTE", "60")) |
| OPENAI_STREAM = os.getenv("OPENAI_STREAM", "").lower() in {"1", "true", "yes", "on"} |
| MCP_PROTOCOL_VERSION = "2025-06-18" |
| MAX_PROMPT_VALUE_CHARS = int(os.getenv("MAX_PROMPT_VALUE_CHARS", "4000")) |
| OPENAI_MAX_TOKENS = int(os.getenv("OPENAI_MAX_TOKENS", "160")) |
| FAST_CANDIDATE_COUNT = int(os.getenv("FAST_CANDIDATE_COUNT", "6")) |
| FAST_DETAIL_COUNT = int(os.getenv("FAST_DETAIL_COUNT", "2")) |
| FAST_MATCHUP_COUNT = int(os.getenv("FAST_MATCHUP_COUNT", "0")) |
| MCP_PREFETCH_TIMEOUT_SECONDS = float(os.getenv("MCP_PREFETCH_TIMEOUT_SECONDS", "4")) |
| AGENT_DEBUG = os.getenv("AGENT_DEBUG", "").lower() in {"1", "true", "yes", "on"} |
|
|
|
|
| DEFAULT_HERO_POOL = [ |
| "Puck", |
| "Pudge", |
| "Invoker", |
| "Shadow Fiend", |
| "Rubick", |
| "Earthshaker", |
| "Storm Spirit", |
| "Ember Spirit", |
| "Crystal Maiden", |
| "Anti-Mage", |
| "Phantom Assassin", |
| "Juggernaut", |
| "Lina", |
| "Lion", |
| "Axe", |
| "Doom", |
| "Batrider", |
| "Beastmaster", |
| "Mars", |
| "Tidehunter", |
| "Dark Seer", |
| "Chen", |
| "Enchantress", |
| "Io", |
| "Disruptor", |
| "Shadow Demon", |
| ] |
|
|
| STATIC_HERO_POOL: list[str] = [] |
| _TOOL_RESULT_CACHE: dict[str, Any] = {} |
| _OPENAI_CLIENTS: dict[str, AsyncOpenAI] = {} |
| _MCP_CLIENTS: dict[str, OpenDotaMCPHttpClient] = {} |
| _CACHE_LOCK = asyncio.Lock() |
|
|
|
|
| class AsyncSlidingWindowRateLimiter: |
| def __init__(self, max_calls: int, window_seconds: float): |
| self.max_calls = max_calls |
| self.window_seconds = window_seconds |
| self._calls: deque[float] = deque() |
| self._lock = asyncio.Lock() |
|
|
| async def acquire(self) -> None: |
| if self.max_calls <= 0: |
| return |
| while True: |
| async with self._lock: |
| now = time.monotonic() |
| while self._calls and now - self._calls[0] >= self.window_seconds: |
| self._calls.popleft() |
| if len(self._calls) < self.max_calls: |
| self._calls.append(now) |
| return |
| sleep_for = self.window_seconds - (now - self._calls[0]) |
| await asyncio.sleep(max(sleep_for, 0.05)) |
|
|
|
|
| OPENAI_RATE_LIMITER = AsyncSlidingWindowRateLimiter( |
| max_calls=OPENAI_CALLS_PER_MINUTE, |
| window_seconds=60, |
| ) |
|
|
|
|
| @dataclass(frozen=True) |
| class DraftTeam: |
| name: str |
| side: str |
| picks: list[str] = field(default_factory=list) |
| bans: list[str] = field(default_factory=list) |
| player_ids: list[str] = field(default_factory=list) |
|
|
|
|
| @dataclass(frozen=True) |
| class DraftRecommendationRequest: |
| action: Literal["Pick", "Ban"] |
| step_number: int |
| phase: str |
| slot: int |
| you: DraftTeam |
| opponent: DraftTeam |
| unavailable_heroes: list[str] |
| opendota_api_key: str = "" |
|
|
|
|
| class OpenDotaMCPHttpClient: |
| def __init__(self, server: OpenDotaMCPServer | None = None, api_key: str = ""): |
| self.server = server or OpenDotaMCPServer() |
| self.api_key = api_key |
| self._client: httpx.AsyncClient | None = None |
| self._session_id = "" |
| self._request_id = 0 |
|
|
| async def __aenter__(self) -> OpenDotaMCPHttpClient: |
| if not self.server.is_http: |
| raise ValueError("Only the OpenDota MCP HTTP transport is supported by this client.") |
| self._client = httpx.AsyncClient(timeout=30) |
| await self._initialize() |
| return self |
|
|
| async def __aexit__(self, *_: object) -> None: |
| if self._client: |
| await self._client.aclose() |
|
|
| async def list_tools(self) -> list[dict[str, Any]]: |
| result = await self._request("tools/list", {}) |
| return result.get("tools", []) |
|
|
| async def call_tool(self, name: str, arguments: dict[str, Any]) -> Any: |
| return await self._request("tools/call", {"name": name, "arguments": arguments}) |
|
|
| async def _initialize(self) -> None: |
| result, headers = await self._post( |
| "initialize", |
| { |
| "protocolVersion": MCP_PROTOCOL_VERSION, |
| "capabilities": {"tools": {}}, |
| "clientInfo": {"name": "dota-draft-agent", "version": "0.1.0"}, |
| }, |
| include_session=False, |
| ) |
| self._session_id = headers.get("mcp-session-id", "") |
| if not result: |
| raise RuntimeError("OpenDota MCP server did not return an initialize result.") |
| await self._notify_initialized() |
|
|
| async def _notify_initialized(self) -> None: |
| assert self._client is not None |
| headers = self._headers(include_session=True) |
| message = {"jsonrpc": "2.0", "method": "notifications/initialized"} |
| await self._client.post(self.server.url, json=message, headers=headers) |
|
|
| async def _request(self, method: str, params: dict[str, Any]) -> Any: |
| result, _ = await self._post(method, params, include_session=True) |
| return result |
|
|
| async def _post( |
| self, |
| method: str, |
| params: dict[str, Any], |
| include_session: bool, |
| ) -> tuple[Any, httpx.Headers]: |
| assert self._client is not None |
| self._request_id += 1 |
| message = { |
| "jsonrpc": "2.0", |
| "id": self._request_id, |
| "method": method, |
| "params": params, |
| } |
| response = await self._client.post( |
| self.server.url, |
| json=message, |
| headers=self._headers(include_session=include_session), |
| ) |
| response.raise_for_status() |
| payload = _decode_mcp_response(response) |
| if "error" in payload: |
| raise RuntimeError(payload["error"]) |
| return payload.get("result"), response.headers |
|
|
| def _headers(self, include_session: bool) -> dict[str, str]: |
| headers = { |
| "Accept": "application/json, text/event-stream", |
| "Content-Type": "application/json", |
| } |
| if include_session and self._session_id: |
| headers["Mcp-Session-Id"] = self._session_id |
| if self.api_key: |
| headers["X-OpenDota-API-Key"] = self.api_key |
| return headers |
|
|
|
|
| async def recommend_draft_action(request: DraftRecommendationRequest) -> str: |
| context = await build_fast_recommendation_context(request) |
| try: |
| recommendation = await _call_openai_for_recommendation(request, context) |
| except Exception as exc: |
| return f"Recommendation unavailable: OpenAI chat completions request failed ({exc})" |
| if not recommendation: |
| return "Recommendation unavailable: the agent did not return a final pick or ban." |
| return _format_recommendation(request, recommendation, context.get("observations", [])) |
|
|
|
|
| def warm_static_dota_data(hero_names: list[str] | None = None) -> None: |
| """Seed static hero data at app startup without network calls.""" |
| global STATIC_HERO_POOL |
| names = hero_names or DEFAULT_HERO_POOL |
| seen: set[str] = set() |
| STATIC_HERO_POOL = [] |
| for name in [*names, *DEFAULT_HERO_POOL]: |
| normalized = _normalize_hero_name(name) |
| if normalized and normalized not in seen: |
| STATIC_HERO_POOL.append(name) |
| seen.add(normalized) |
|
|
|
|
| async def build_fast_recommendation_context( |
| request: DraftRecommendationRequest, |
| ) -> dict[str, Any]: |
| candidates = _select_candidate_heroes(request) |
| context: dict[str, Any] = { |
| "candidate_heroes": candidates, |
| "observations": [], |
| "prefetch_errors": [], |
| } |
| try: |
| client = await _get_mcp_client(request.opendota_api_key) |
| context["observations"] = await asyncio.wait_for( |
| _prefetch_draft_observations( |
| client, |
| request, |
| candidates, |
| ), |
| timeout=MCP_PREFETCH_TIMEOUT_SECONDS, |
| ) |
| except asyncio.TimeoutError: |
| context["prefetch_errors"].append( |
| f"OpenDota MCP prefetch timed out after {MCP_PREFETCH_TIMEOUT_SECONDS:g}s" |
| ) |
| except Exception as exc: |
| context["prefetch_errors"].append(str(exc)) |
| if AGENT_DEBUG: |
| _debug_log( |
| "fast context", |
| { |
| "candidates": len(candidates), |
| "observations": len(context["observations"]), |
| "prefetch_errors": context["prefetch_errors"], |
| }, |
| ) |
| return context |
|
|
|
|
| async def _get_mcp_client(api_key: str) -> OpenDotaMCPHttpClient: |
| probe = OpenDotaMCPHttpClient(api_key=api_key) |
| cache_key = f"{probe.server.url}\0{api_key}" |
| async with _CACHE_LOCK: |
| cached = _MCP_CLIENTS.get(cache_key) |
| if cached is not None: |
| return cached |
| try: |
| client = await probe.__aenter__() |
| except Exception: |
| await probe.__aexit__(None, None, None) |
| raise |
| async with _CACHE_LOCK: |
| cached = _MCP_CLIENTS.get(cache_key) |
| if cached is not None: |
| await client.__aexit__(None, None, None) |
| return cached |
| _MCP_CLIENTS[cache_key] = client |
| return client |
|
|
|
|
| def _select_candidate_heroes( |
| request: DraftRecommendationRequest, |
| limit: int | None = FAST_CANDIDATE_COUNT, |
| ) -> list[str]: |
| unavailable = {_normalize_hero_name(hero) for hero in request.unavailable_heroes} |
| pool = STATIC_HERO_POOL or DEFAULT_HERO_POOL |
| candidates: list[str] = [] |
| candidate_names: set[str] = set() |
| priority_names = [ |
| *request.opponent.picks, |
| *request.you.picks, |
| *pool, |
| ] |
| for hero in priority_names: |
| normalized = _normalize_hero_name(hero) |
| if not normalized or normalized in unavailable: |
| continue |
| if normalized in candidate_names: |
| continue |
| candidates.append(hero) |
| candidate_names.add(normalized) |
| if limit is not None and len(candidates) >= limit: |
| break |
| return candidates |
|
|
|
|
| def _normalize_hero_name(name: str) -> str: |
| return "".join(char for char in name.lower() if char.isalnum()) |
|
|
|
|
| async def _prefetch_draft_observations( |
| client: OpenDotaMCPHttpClient, |
| request: DraftRecommendationRequest, |
| candidates: list[str], |
| ) -> list[dict[str, Any]]: |
| observations: list[dict[str, Any]] = [] |
| detail_targets = candidates[:FAST_DETAIL_COUNT] |
| matchup_targets = [ |
| *request.opponent.picks, |
| *request.you.picks, |
| *candidates[:1], |
| ][:FAST_MATCHUP_COUNT] |
|
|
| detail_tasks = [ |
| _call_tool_observation( |
| client, |
| "get_hero_details", |
| {"hero": hero}, |
| compact_keys=( |
| "id", |
| "localized_name", |
| "primary_attr", |
| "attack_type", |
| "roles", |
| "base_health", |
| "base_mana", |
| "base_armor", |
| "base_attack_min", |
| "base_attack_max", |
| "move_speed", |
| ), |
| ) |
| for hero in detail_targets |
| ] |
|
|
| relevant_names = { |
| _normalize_hero_name(hero) |
| for hero in [ |
| *request.you.picks, |
| *request.opponent.picks, |
| *candidates, |
| ] |
| } |
| matchup_tasks = [ |
| _call_tool_observation(client, "get_hero_matchups", {"hero": hero}) |
| for hero in matchup_targets |
| ] |
|
|
| detail_observations, matchup_observations = await asyncio.gather( |
| asyncio.gather(*detail_tasks), |
| asyncio.gather(*matchup_tasks), |
| ) |
| observations.extend(detail_observations) |
| for observation in matchup_observations: |
| observation["result"] = _compact_matchups(observation.get("result"), relevant_names) |
| observations.append(observation) |
| return observations |
|
|
|
|
| async def _call_tool_observation( |
| client: OpenDotaMCPHttpClient, |
| name: str, |
| arguments: dict[str, Any], |
| compact_keys: tuple[str, ...] = (), |
| ) -> dict[str, Any]: |
| try: |
| result = await _call_tool_cached(client, name, arguments) |
| result = _normalize_tool_result(result) |
| if compact_keys and isinstance(result, dict): |
| result = {key: result.get(key) for key in compact_keys if key in result} |
| return {"tool": name, "arguments": arguments, "result": result} |
| except Exception as exc: |
| return {"tool": name, "arguments": arguments, "result": {"tool_error": str(exc)}} |
|
|
|
|
| async def _call_tool_cached( |
| client: OpenDotaMCPHttpClient, |
| name: str, |
| arguments: dict[str, Any], |
| ) -> Any: |
| cache_key = json.dumps( |
| { |
| "server": client.server.url, |
| "tool": name, |
| "arguments": arguments, |
| }, |
| sort_keys=True, |
| ensure_ascii=True, |
| ) |
| async with _CACHE_LOCK: |
| cached = _TOOL_RESULT_CACHE.get(cache_key) |
| if cached is not None: |
| return cached |
| result = await client.call_tool(name, arguments) |
| async with _CACHE_LOCK: |
| _TOOL_RESULT_CACHE[cache_key] = result |
| return result |
|
|
|
|
| def _compact_matchups(result: Any, relevant_names: set[str]) -> Any: |
| if not isinstance(result, list): |
| return result |
| compact = [] |
| for item in result: |
| if not isinstance(item, dict): |
| continue |
| hero_name = item.get("hero_name", "") |
| if _normalize_hero_name(hero_name) not in relevant_names: |
| continue |
| compact.append( |
| { |
| "hero_name": hero_name, |
| "games": item.get("games"), |
| "win_rate": item.get("win_rate"), |
| } |
| ) |
| return compact[:12] |
|
|
|
|
| async def _call_openai_for_recommendation( |
| request: DraftRecommendationRequest, |
| context: dict[str, Any], |
| ) -> dict[str, Any]: |
| api_key = os.getenv("HF_TOKEN", "") |
| if not api_key: |
| raise RuntimeError("HF_TOKEN is not set") |
|
|
| messages = [ |
| { |
| "role": "system", |
| "content": ( |
| "Respond with JSON only. Your first character must be `{` and your last " |
| "character must be `}`. Do not include analysis, planning, markdown, " |
| "code fences, or explanatory text. You are a Dota 2 Captains Mode draft " |
| "advisor. Make a final recommendation now from the supplied draft state " |
| "and prefetched OpenDota evidence. Do not request tools. Do not recommend " |
| "a hero that has already been picked or banned. Keep rationale under 25 " |
| "words and stats_used to at most two short items." |
| ), |
| }, |
| { |
| "role": "user", |
| "content": "/no_think\n" |
| + json.dumps( |
| { |
| "draft_request": _request_to_json(request), |
| "candidate_heroes": context.get("candidate_heroes", []), |
| "prefetched_evidence": _trim_for_prompt(context.get("observations", [])), |
| "prefetch_errors": context.get("prefetch_errors", []), |
| "response_schema": { |
| "recommendation": { |
| "hero": "Hero name", |
| "action": request.action, |
| "confidence": "low|medium|high", |
| "rationale": "under 25 words", |
| "stats_used": ["up to two brief facts from prefetched evidence"], |
| } |
| }, |
| }, |
| ensure_ascii=True, |
| ), |
| }, |
| ] |
| await OPENAI_RATE_LIMITER.acquire() |
| if AGENT_DEBUG: |
| _debug_log( |
| "final model request", |
| { |
| "model": OPENAI_MODEL, |
| "observations": len(context.get("observations", [])), |
| "prompt_chars": sum(len(message["content"]) for message in messages), |
| }, |
| ) |
| content = await _create_chat_completion(api_key, messages) |
| try: |
| decision = _parse_json_object(content) |
| except JSONDecodeError: |
| if AGENT_DEBUG: |
| _debug_log("JSON parse failed; retrying final prompt", {"prefix": content[:240]}) |
| retry_messages = [ |
| { |
| **messages[0], |
| "content": ( |
| "Return exactly one valid JSON object and nothing else. " |
| "No reasoning. No prose. Start with `{`. End with `}`. " |
| + messages[0]["content"] |
| ), |
| }, |
| messages[1], |
| ] |
| decision = _parse_json_object(await _create_chat_completion(api_key, retry_messages)) |
| return decision.get("recommendation", decision) |
|
|
|
|
| async def _create_chat_completion(api_key: str, messages: list[dict[str, str]]) -> str: |
| if not OPENAI_STREAM: |
| return await _create_non_stream_chat_completion(api_key, messages, 0, []) |
|
|
| content_chunks: list[str] = [] |
| alternate_chunks: list[str] = [] |
| chunk_count = 0 |
| finish_reasons: list[str] = [] |
| client = await _get_openai_client(api_key) |
| stream = await client.chat.completions.create( |
| model=OPENAI_MODEL, |
| messages=messages, |
| temperature=0.2, |
| max_tokens=OPENAI_MAX_TOKENS, |
| stream=True, |
| **_model_extra_kwargs(), |
| ) |
| async for chunk in stream: |
| chunk_count += 1 |
| if chunk.choices: |
| choice = chunk.choices[0] |
| if choice.finish_reason: |
| finish_reasons.append(choice.finish_reason) |
| content = _extract_primary_text(choice.delta) |
| if content: |
| content_chunks.append(content) |
| else: |
| alternate = _extract_alternate_text(choice.delta) |
| if alternate: |
| alternate_chunks.append(alternate) |
| content = "".join(content_chunks) |
| if content: |
| return content |
| alternate_content = "".join(alternate_chunks) |
| if alternate_content: |
| return alternate_content |
| if AGENT_DEBUG: |
| _debug_log( |
| "stream response was empty; retrying non-stream", |
| {"chunks": chunk_count, "finish_reasons": finish_reasons}, |
| ) |
| return await _create_non_stream_chat_completion( |
| api_key, |
| messages, |
| chunk_count, |
| finish_reasons, |
| ) |
|
|
|
|
| async def _create_non_stream_chat_completion( |
| api_key: str, |
| messages: list[dict[str, str]], |
| empty_stream_chunks: int, |
| empty_stream_finish_reasons: list[str], |
| ) -> str: |
| client = await _get_openai_client(api_key) |
| response = await client.chat.completions.create( |
| model=OPENAI_MODEL, |
| messages=messages, |
| temperature=0.2, |
| max_tokens=OPENAI_MAX_TOKENS, |
| stream=False, |
| **_model_extra_kwargs(), |
| ) |
| message = response.choices[0].message if response.choices else None |
| content = _extract_primary_text(message) if message else "" |
| if not content and message: |
| content = _extract_alternate_text(message) |
| if content: |
| return content |
| finish_reasons = [ |
| choice.finish_reason |
| for choice in response.choices |
| if choice.finish_reason |
| ] |
| raise RuntimeError( |
| "model returned an empty response " |
| f"(stream chunks={empty_stream_chunks}, " |
| f"stream finish_reasons={empty_stream_finish_reasons or ['unknown']}, " |
| f"non_stream finish_reasons={finish_reasons or ['unknown']})" |
| ) |
|
|
|
|
| async def _get_openai_client(api_key: str) -> AsyncOpenAI: |
| async with _CACHE_LOCK: |
| client = _OPENAI_CLIENTS.get(api_key) |
| if client is None: |
| client = AsyncOpenAI( |
| base_url=HF_ROUTER_BASE_URL, |
| api_key=api_key, |
| timeout=30, |
| max_retries=0, |
| ) |
| _OPENAI_CLIENTS[api_key] = client |
| return client |
|
|
|
|
| def _extract_primary_text(message_part: Any) -> str: |
| if not message_part: |
| return "" |
| for field in ("content", "text"): |
| value = getattr(message_part, field, None) |
| if isinstance(value, str) and value: |
| return value |
| if isinstance(message_part, dict): |
| for field in ("content", "text"): |
| value = message_part.get(field) |
| if isinstance(value, str) and value: |
| return value |
| return "" |
|
|
|
|
| def _extract_alternate_text(message_part: Any) -> str: |
| if not message_part: |
| return "" |
| for field in ("reasoning_content", "reasoning"): |
| value = getattr(message_part, field, None) |
| if isinstance(value, str) and value: |
| return value |
| if isinstance(message_part, dict): |
| for field in ("reasoning_content", "reasoning"): |
| value = message_part.get(field) |
| if isinstance(value, str) and value: |
| return value |
| return "" |
|
|
|
|
| def _model_extra_kwargs() -> dict[str, Any]: |
| if "qwen" not in OPENAI_MODEL.lower(): |
| return {} |
| return {"extra_body": {"chat_template_kwargs": {"enable_thinking": False}}} |
|
|
|
|
| def _trim_for_prompt(value: Any, max_chars: int = MAX_PROMPT_VALUE_CHARS) -> Any: |
| if isinstance(value, str): |
| if len(value) <= max_chars: |
| return value |
| return value[:max_chars] + f"... [truncated {len(value) - max_chars} chars]" |
| if isinstance(value, list): |
| return [_trim_for_prompt(item, max_chars) for item in value] |
| if isinstance(value, dict): |
| return {key: _trim_for_prompt(item, max_chars) for key, item in value.items()} |
| return value |
|
|
|
|
| def _debug_log(message: str, details: dict[str, Any]) -> None: |
| print(f"[agent] {message}: {json.dumps(details, ensure_ascii=True)}", flush=True) |
|
|
|
|
| def _parse_json_object(content: str) -> dict[str, Any]: |
| text = content.strip() |
| if text.startswith("```"): |
| lines = text.splitlines() |
| if lines and lines[0].startswith("```"): |
| lines = lines[1:] |
| if lines and lines[-1].startswith("```"): |
| lines = lines[:-1] |
| text = "\n".join(lines).strip() |
| if not text.startswith("{"): |
| extracted = _extract_json_object_text(text) |
| if extracted: |
| text = extracted |
| return json.loads(text) |
|
|
|
|
| def _extract_json_object_text(text: str) -> str: |
| start = text.find("{") |
| if start < 0: |
| return "" |
| depth = 0 |
| in_string = False |
| escaped = False |
| for index in range(start, len(text)): |
| char = text[index] |
| if escaped: |
| escaped = False |
| continue |
| if char == "\\": |
| escaped = True |
| continue |
| if char == '"': |
| in_string = not in_string |
| continue |
| if in_string: |
| continue |
| if char == "{": |
| depth += 1 |
| elif char == "}": |
| depth -= 1 |
| if depth == 0: |
| return text[start : index + 1] |
| return "" |
|
|
|
|
| def _decode_mcp_response(response: httpx.Response) -> dict[str, Any]: |
| text = response.text.strip() |
| if not text: |
| return {} |
| if "text/event-stream" in response.headers.get("content-type", ""): |
| data_lines = [ |
| line.removeprefix("data:").strip() |
| for line in text.splitlines() |
| if line.startswith("data:") |
| ] |
| text = data_lines[-1] if data_lines else "{}" |
| return json.loads(text) |
|
|
|
|
| def _normalize_tool_result(result: Any) -> Any: |
| if isinstance(result, dict) and "content" in result: |
| content = result["content"] |
| if isinstance(content, list): |
| return [ |
| item.get("text", item) |
| if isinstance(item, dict) |
| else getattr(item, "text", str(item)) |
| for item in content |
| ] |
| return result |
|
|
|
|
| def _request_to_json(request: DraftRecommendationRequest) -> dict[str, Any]: |
| return { |
| "action": request.action, |
| "step_number": request.step_number, |
| "phase": request.phase, |
| "slot": request.slot, |
| "you": request.you.__dict__, |
| "opponent": request.opponent.__dict__, |
| "unavailable_heroes": request.unavailable_heroes, |
| } |
|
|
|
|
| def _format_recommendation( |
| request: DraftRecommendationRequest, |
| recommendation: dict[str, Any], |
| observations: list[dict[str, Any]], |
| ) -> str: |
| hero = recommendation.get("hero", "Unknown hero") |
| action = recommendation.get("action", request.action) |
| confidence = recommendation.get("confidence", "unknown") |
| rationale = recommendation.get("rationale", "No rationale returned.") |
| stats = recommendation.get("stats_used") or [] |
| stats_text = "\n".join(f"- {item}" for item in stats) if stats else "- No stat summary returned." |
| tool_text = ", ".join(item["tool"] for item in observations) or "none" |
| return ( |
| f"Recommended {action}: **{hero}**\n\n" |
| f"Confidence: `{confidence}`\n\n" |
| f"{rationale}\n\n" |
| f"Stats used:\n{stats_text}\n\n" |
| f"OpenDota MCP tools called: {tool_text}" |
| ) |
|
|
|
|
| warm_static_dota_data() |
|
|