Spaces:
Sleeping
Sleeping
| import json | |
| import math | |
| import os | |
| import random | |
| import time | |
| import httpx | |
| from dotenv import load_dotenv | |
| from . import hf_spaces | |
| load_dotenv() | |
| _HF_API_TOKEN = (os.environ.get("HF_API_TOKEN") or os.environ.get("HUGGINGFACE_API_TOKEN") or "").strip() | |
| _HF_CHAT_URL = "https://router.huggingface.co/v1/chat/completions" | |
| _MODEL_COOLDOWNS: dict[str, float] = {} | |
| MAX_AGENT_SPEED = 80 | |
| print(f"[GROQ_CLIENT_INIT] HF_API_TOKEN present: {bool(_HF_API_TOKEN)}") | |
| if not _HF_API_TOKEN: | |
| print("[GROQ_CLIENT_INIT] WARNING: No HF API token found! Set HF_API_TOKEN or HUGGINGFACE_API_TOKEN env var.") | |
| def is_ready(): | |
| return bool(_HF_API_TOKEN) | |
| def _headers() -> dict[str, str]: | |
| if not _HF_API_TOKEN: | |
| return {} | |
| return { | |
| "Authorization": f"Bearer {_HF_API_TOKEN}", | |
| "Content-Type": "application/json", | |
| } | |
| def _pick_line(options: list[str], previous_message: str | None = None) -> str: | |
| if not previous_message: | |
| return random.choice(options) | |
| previous = " ".join(previous_message.strip().split()).lower() | |
| filtered = [option for option in options if option.lower() != previous] | |
| return random.choice(filtered or options) | |
| def _generate_chat_message(action: str, agent_name: str, fire_distance: float, has_water: bool, previous_message: str | None = None) -> str: | |
| action_messages = { | |
| "search_water": [ | |
| "I'm heading for the nearest well.", | |
| "I need water first, then I'm coming back.", | |
| "Give me a second, I'm making for the water.", | |
| "Water first, then we push the fire.", | |
| "I'm going for the well, stay alive.", | |
| ], | |
| "collect_water": [ | |
| "I'm at the well now, filling up.", | |
| "Got water, turning back in a second.", | |
| "Hold on, I'm grabbing water.", | |
| "Tank's full, I'm heading back.", | |
| "Water secured, let's make this count.", | |
| ], | |
| "extinguish_fire": [ | |
| "I've got water, I'm going in.", | |
| "I'm on the fire line now, push with me.", | |
| "Alright, I'm hitting the flames.", | |
| "I'm close enough, pouring water now.", | |
| "Keep moving, I'm taking a shot at the fire.", | |
| ], | |
| "escape": [ | |
| "Too hot here, I'm backing off.", | |
| "Nope, that's way too close, I'm out.", | |
| "I need space, falling back now.", | |
| "I'm peeling away before this gets worse.", | |
| "I'm not dying here, backing up.", | |
| ], | |
| "vote_for_leader": [ | |
| "Someone call it, we need one plan.", | |
| "I'll follow a lead if somebody steps up.", | |
| "We need one caller right now.", | |
| "Pick a lead so we stop wasting time.", | |
| "I'm good with a leader, just make it clear.", | |
| ], | |
| } | |
| messages = action_messages.get(action, action_messages["escape"]) | |
| return _pick_line(messages, previous_message) | |
| def _build_fire_state_summary(agent, fire, all_agents) -> str: | |
| standings = [] | |
| for other in all_agents: | |
| if not other.alive: | |
| continue | |
| distance = math.dist((other.x, other.y), (fire.x, fire.y)) | |
| standings.append({ | |
| "name": other.display_name, | |
| "distance_from_fire": distance, | |
| "has_water": other.water_collected, | |
| }) | |
| standings.sort(key=lambda item: item["distance_from_fire"]) | |
| lines = ["Current standings:"] | |
| for index, item in enumerate(standings, 1): | |
| suffix = " (carrying water)" if item["has_water"] else "" | |
| lines.append(f"#{index} {item['name']}: {item['distance_from_fire']:.0f}px from fire{suffix}") | |
| return "\n".join(lines) | |
| def _extract_message_content(payload) -> str: | |
| choices = payload.get("choices") or [] | |
| if not choices or not isinstance(choices[0], dict): | |
| return "" | |
| message = choices[0].get("message") or {} | |
| content = message.get("content") | |
| if isinstance(content, str): | |
| return content.strip() | |
| if isinstance(content, list): | |
| parts = [] | |
| for item in content: | |
| if isinstance(item, dict) and item.get("type") == "text" and isinstance(item.get("text"), str): | |
| parts.append(item["text"]) | |
| return "".join(parts).strip() | |
| return "" | |
| def _extract_json_object(text: str) -> dict: | |
| if not text: | |
| return {} | |
| cleaned = text.strip() | |
| if cleaned.startswith("```"): | |
| cleaned = cleaned.replace("```json", "").replace("```", "").strip() | |
| start = cleaned.find("{") | |
| end = cleaned.rfind("}") + 1 | |
| if start < 0 or end <= start: | |
| return {} | |
| try: | |
| candidate = cleaned[start:end] | |
| parsed = json.loads(candidate) | |
| except json.JSONDecodeError: | |
| return {} | |
| return parsed if isinstance(parsed, dict) else {} | |
| def _is_robotic_message(message: str) -> bool: | |
| lowered = message.lower().strip() | |
| if not lowered: | |
| return True | |
| robotic_starts = ( | |
| "locate ", | |
| "locating ", | |
| "find ", | |
| "finding ", | |
| "search ", | |
| "searching ", | |
| "head ", | |
| "heading ", | |
| "move ", | |
| "moving ", | |
| "look ", | |
| "looking ", | |
| "nearest water", | |
| ) | |
| return lowered.startswith(robotic_starts) | |
| def _normalize_decision(decision: dict, agent_name: str, dist_to_fire: float, has_water: bool, previous_message: str | None = None) -> dict: | |
| action = decision.get("action", "escape") | |
| if action not in {"search_water", "collect_water", "extinguish_fire", "escape", "vote_for_leader"}: | |
| action = "escape" | |
| message = " ".join(str(decision.get("message", "")).strip().split()) | |
| if not message or _is_robotic_message(message): | |
| message = _generate_chat_message(action, agent_name, dist_to_fire, has_water, previous_message) | |
| elif previous_message and message.lower() == " ".join(previous_message.strip().split()).lower(): | |
| message = _generate_chat_message(action, agent_name, dist_to_fire, has_water, previous_message) | |
| vote_for = decision.get("vote_for") | |
| if vote_for is not None and not isinstance(vote_for, str): | |
| vote_for = None | |
| reasoning = " ".join(str(decision.get("reasoning", "")).strip().split()) | |
| if not reasoning: | |
| reasoning = "Survival and teamwork." | |
| return { | |
| "action": action, | |
| "vote_for": vote_for, | |
| "message": message[:220], | |
| "reasoning": reasoning[:220], | |
| } | |
| def _model_available(model_id: str) -> bool: | |
| return _MODEL_COOLDOWNS.get(model_id, 0.0) <= time.monotonic() | |
| def _mark_model_unavailable(model_id: str, seconds: int = 90) -> None: | |
| _MODEL_COOLDOWNS[model_id] = time.monotonic() + seconds | |
| async def _request_model_response(target_model: str, prompt: str) -> str: | |
| payload = { | |
| "model": target_model, | |
| "messages": [{"role": "user", "content": prompt}], | |
| "max_tokens": 220, | |
| "temperature": 0.4, | |
| } | |
| async with httpx.AsyncClient(timeout=20.0) as client: | |
| response = await client.post(_HF_CHAT_URL, headers=_headers(), json=payload) | |
| response.raise_for_status() | |
| data = response.json() | |
| return _extract_message_content(data) | |
| def _fallback_decision(agent, fire, dist_to_fire: float, dist_to_water: float | None) -> dict: | |
| if dist_to_fire <= max(fire.radius + 20, 140): | |
| action = "escape" | |
| elif agent.water_collected and dist_to_fire <= 360: | |
| action = "extinguish_fire" | |
| elif not agent.water_collected and dist_to_water is not None and dist_to_water <= 60: | |
| action = "collect_water" | |
| elif getattr(agent, "is_leader", False) is False and dist_to_fire > 240 and random.random() < 0.08: | |
| action = "vote_for_leader" | |
| else: | |
| action = "search_water" | |
| return { | |
| "message": _generate_chat_message(action, agent.model_name, dist_to_fire, agent.water_collected, getattr(agent, "last_message", None)), | |
| "action": action, | |
| "vote_for": None, | |
| "reasoning": "Fallback: keep moving with the situation.", | |
| } | |
| async def generate_fire_decision(agent, fire, water_sources, other_agents, bounds, recent_radio=None) -> dict: | |
| if not is_ready(): | |
| print(f"[INFERENCE_FAIL] {agent.model_name}: HF token not ready, using fallback") | |
| return _fallback_decision(agent, fire, math.dist((agent.x, agent.y), (fire.x, fire.y)), None) | |
| dist_to_fire = math.dist((agent.x, agent.y), (fire.x, fire.y)) | |
| nearest_water = min(water_sources, key=lambda water: math.dist((agent.x, agent.y), (water.x, water.y))) if water_sources else None | |
| dist_to_water = math.dist((agent.x, agent.y), (nearest_water.x, nearest_water.y)) if nearest_water else None | |
| living_agents = [other for other in other_agents if other.alive and other.model_name != agent.model_name] | |
| state_summary = _build_fire_state_summary(agent, fire, [agent] + living_agents) | |
| radio_summary = "\n".join(recent_radio or []) if recent_radio else "(no recent chat yet)" | |
| coalition_leader = next((other.model_name for other in other_agents if other.is_leader), None) | |
| dist_to_water_display = f"{dist_to_water:.0f}px" if dist_to_water is not None else "unknown" | |
| prompt = f"""You are {agent.model_name} in a wildfire survival simulation. | |
| Scenario: | |
| - A wildfire is spreading across the map | |
| - Water wells are scattered around the area | |
| - Agents can coordinate as a coalition and may vote for a leader | |
| - Winning means getting water and using it to extinguish the fire | |
| - Dying in the fire means losing | |
| Allowed actions: | |
| - search_water | |
| - collect_water | |
| - extinguish_fire | |
| - escape | |
| - vote_for_leader | |
| Rules: | |
| - If the fire is too close, prioritize survival | |
| - If you already have water, move to the fire edge and fight it | |
| - If you are at a well, collect water immediately | |
| - Speak like a real teammate over a radio, not like a status dashboard | |
| - Use normal conversational English in first person | |
| - The message must sound casual, human, and alive | |
| - Avoid robotic phrases like "locate nearest water source", "search for water", "coalition survival", "moving to water source" | |
| - React to the moment and vary your wording from your previous line | |
| - Keep the message to one short sentence, around 6 to 14 words | |
| - Respond with only valid JSON on one line | |
| Current state: | |
| - Position: ({agent.x}, {agent.y}) | |
| - Fire position: ({fire.x}, {fire.y}) | |
| - Distance from fire: {dist_to_fire:.0f}px | |
| - Fire radius: {fire.radius:.0f}px | |
| - Fire intensity: {fire.intensity:.0f}% | |
| - Carrying water: {agent.water_collected} | |
| - Mode: {agent.mode} | |
| - Nearest water distance: {dist_to_water_display} | |
| - Coalition leader: {coalition_leader or 'none'} | |
| - Your previous line: {getattr(agent, 'last_message', None) or 'none yet'} | |
| Recent radio: | |
| {radio_summary} | |
| {state_summary} | |
| Return exactly: | |
| {{"action":"search_water|collect_water|extinguish_fire|escape|vote_for_leader","vote_for":null,"message":"casual first-person sentence","reasoning":"short sentence"}}""" | |
| requested_model = agent.model_name if hf_spaces.is_supported_model(agent.model_name) else hf_spaces.get_default_model_id() | |
| fallback_model = hf_spaces.get_default_model_id() | |
| models_to_try = [] | |
| if _model_available(requested_model): | |
| models_to_try.append(requested_model) | |
| if fallback_model not in models_to_try and _model_available(fallback_model): | |
| models_to_try.append(fallback_model) | |
| for target_model in models_to_try: | |
| try: | |
| print(f"[HF_INFERENCE] {agent.model_name} -> calling {target_model}") | |
| raw_text = await _request_model_response(target_model, prompt) | |
| print(f"[HF_INFERENCE] {agent.model_name}: raw response (first 300 chars): {raw_text[:300]}") | |
| decision = _extract_json_object(raw_text) | |
| if decision: | |
| normalized = _normalize_decision(decision, agent.model_name, dist_to_fire, agent.water_collected, getattr(agent, "last_message", None)) | |
| if dist_to_water is not None and dist_to_water <= 60 and not agent.water_collected: | |
| normalized["action"] = "collect_water" | |
| elif agent.water_collected and dist_to_fire <= 350: | |
| normalized["action"] = "extinguish_fire" | |
| return normalized | |
| except Exception as exc: | |
| if getattr(exc, "response", None) is not None and getattr(exc.response, "status_code", None) == 402: | |
| _mark_model_unavailable(target_model) | |
| print(f"[HF_INFERENCE_ERROR] {agent.model_name} via {target_model}: {type(exc).__name__}: {exc}") | |
| return _fallback_decision(agent, fire, dist_to_fire, dist_to_water) | |
| def _fallback_escape(agent, fire) -> dict: | |
| return { | |
| "message": "Running to safety!", | |
| "action": "escape", | |
| "vote_for": None, | |
| "reasoning": "Fallback: survive.", | |
| } | |