Spaces:
Sleeping
Sleeping
| """ | |
| Core Myco gameplay — LLM is the primary game engine. | |
| """ | |
| import os | |
| import re | |
| import json | |
| import random | |
| import threading | |
| import traceback | |
| import torch | |
| from transformers import AutoModelForCausalLM, AutoTokenizer | |
| # Local imports | |
| from game.catalog import load_mushrooms | |
| from game.state import collection_contains, welcome_history | |
| # Global singletons | |
| DEFAULT_MODEL_ID = "google/gemma-3-1b-it" | |
| _model = None | |
| _tokenizer = None | |
| _lock = threading.Lock() | |
| # Define device (CPU by default) | |
| DEVICE = "cuda" if torch.cuda.is_available() else "cpu" | |
| print(f"[Myco] Engine initializing on: {DEVICE.upper()}") | |
| def _get_model_and_tokenizer(): | |
| global _model, _tokenizer | |
| if _model is not None and _tokenizer is not None: | |
| return _model, _tokenizer | |
| with _lock: | |
| if _model is not None and _tokenizer is not None: | |
| return _model, _tokenizer | |
| model_id = os.getenv("MYCO_MODEL_ID", DEFAULT_MODEL_ID) | |
| token = os.getenv("HF_BUILD_SMALL_HACKATHON_TOKEN") | |
| try: | |
| print(f"[Myco] Loading model: {model_id}...") | |
| _tokenizer = AutoTokenizer.from_pretrained(model_id, token=token) | |
| # Load with CPU-safe settings | |
| _model = AutoModelForCausalLM.from_pretrained( | |
| model_id, | |
| token=token, | |
| dtype=torch.float16 if DEVICE == "cuda" else torch.float32, | |
| device_map=DEVICE, | |
| trust_remote_code=True | |
| ) | |
| return _model, _tokenizer | |
| except Exception as exc: | |
| print(f"[Myco] Load error: {exc}") | |
| return None, None | |
| def _get_pipeline(): | |
| """Bridge function to resolve NameError for the pipeline.""" | |
| model, tokenizer = _get_model_and_tokenizer() | |
| return model | |
| def _run_pipeline(pipe_ignored, messages): | |
| """Handles inference on the active device (CPU or GPU).""" | |
| model, tokenizer = _get_model_and_tokenizer() | |
| if model is None: | |
| return "The forest is silent. (Model loading failed)" | |
| formatted_prompt = tokenizer.apply_chat_template( | |
| messages, tokenize=False, add_generation_prompt=True | |
| ) | |
| inputs = tokenizer(formatted_prompt, return_tensors="pt").to(DEVICE) | |
| with torch.no_grad(): | |
| outputs = model.generate( | |
| **inputs, | |
| max_new_tokens=256, | |
| do_sample=True, | |
| temperature=0.7, | |
| pad_token_id=tokenizer.eos_token_id | |
| ) | |
| input_length = inputs.input_ids.shape[1] | |
| generated_tokens = outputs[0][input_length:] | |
| decoded = tokenizer.decode(generated_tokens, skip_special_tokens=True) | |
| _set_debug(decoded) | |
| return decoded | |
| # --- Ensure you have this global list accessible in engine.py --- | |
| POISONOUS_MUSHROOM_NAMES = { | |
| "Ghost Gill", "Pepper Pixie", "Ruby Knuckle", "Clockwork Chanterelle", | |
| } | |
| _LAST_REPLY = {"text": "🍄 Myco is listening..."} | |
| # Running log of everything Myco says — narrative, action, and chat replies — | |
| # so the front end can show the full live history, not just the latest line. | |
| _REPLY_LOG: list[str] = [] | |
| _REPLY_LOG_LOCK = threading.Lock() | |
| MAX_REPLY_LOG_ENTRIES = 30 | |
| def _set_reply(text: str, tag: str = "chat"): | |
| """Append a tagged reply (narrative/action/chat) to the running Myco log.""" | |
| entry = f"[{tag.upper()}] {text}" | |
| with _REPLY_LOG_LOCK: | |
| _REPLY_LOG.append(entry) | |
| if len(_REPLY_LOG) > MAX_REPLY_LOG_ENTRIES: | |
| del _REPLY_LOG[: len(_REPLY_LOG) - MAX_REPLY_LOG_ENTRIES] | |
| _LAST_REPLY["text"] = "\n\n".join(_REPLY_LOG) | |
| def get_myco_log() -> str: | |
| """Return the full running Myco log (narrative/action/chat) — non-blocking, | |
| intended to be polled by a gr.Timer rather than streamed from an infinite | |
| generator (which can starve the Gradio queue for other events).""" | |
| return _LAST_REPLY["text"] | |
| # Shared store for the most recent raw LLM output, so the front end can | |
| # display it live for debugging (server-side prints aren't visible once deployed). | |
| _LAST_DEBUG = {"text": "🍄 Waiting for Myco's first thought..."} | |
| def _set_debug(text: str): | |
| _LAST_DEBUG["text"] = str(text) | |
| def get_myco_debug() -> str: | |
| """Return the most recent raw LLM output (for a live debug panel).""" | |
| return _LAST_DEBUG["text"] | |
| # --------------------------------------------------------------------------- | |
| # Template Helper (Fixes the Gemma TemplateError) | |
| # --------------------------------------------------------------------------- | |
| def _format_messages_for_template(system_prompt: str, user_prompt: str, history_turns: list = None) -> list: | |
| """ | |
| Formats messages securely for models with strict chat templates (like Gemma). | |
| Handles both Gradio's default tuple history and OpenAI-style dict history. | |
| """ | |
| # Gemma doesn't support the 'system' role, so we inject it as the first 'user' prompt | |
| raw_turns = [{"role": "user", "content": system_prompt}] | |
| if history_turns: | |
| for entry in history_turns: | |
| # Handle Gradio style history: [[user_msg, bot_msg], ...] | |
| if isinstance(entry, (list, tuple)) and len(entry) == 2: | |
| user_msg, bot_msg = entry | |
| if user_msg: | |
| raw_turns.append({"role": "user", "content": str(user_msg)}) | |
| if bot_msg: | |
| raw_turns.append({"role": "model", "content": str(bot_msg)}) | |
| # Handle standard dictionary style history (just in case) | |
| elif isinstance(entry, dict): | |
| role = entry.get("role", "assistant") | |
| if role == "assistant": | |
| role = "model" | |
| elif role == "system": | |
| continue # Strip extraneous system tags | |
| content = entry.get("content", "") | |
| if isinstance(content, str) and content.strip(): | |
| raw_turns.append({"role": role, "content": content}) | |
| # Add the current payload action | |
| raw_turns.append({"role": "user", "content": user_prompt}) | |
| # Deduplicate consecutive roles (guarantees perfect strict user/model rotation) | |
| alternated_turns = [] | |
| for turn in raw_turns: | |
| if alternated_turns and alternated_turns[-1]["role"] == turn["role"]: | |
| alternated_turns[-1]["content"] += f"\n\n{turn['content']}" | |
| else: | |
| alternated_turns.append(turn) | |
| return alternated_turns | |
| # --------------------------------------------------------------------------- | |
| # Game Constants | |
| # --------------------------------------------------------------------------- | |
| JSON_PROMPT_SUFFIX = ( | |
| "\n\nRespond with a single JSON object only. No prose before or after it. " | |
| 'Example: {"action":"pick","target":"Ruby Knuckle","thought":"It seems safe."}' | |
| ) | |
| RARITY_WEIGHTS = {"Common": 64, "Rare": 24, "Legendary": 8} | |
| RARITY_SCORE = {"Common": 10, "Rare": 35, "Legendary": 100} | |
| PLAYER_HEALTH = 3 | |
| POISON_PENALTY = -25 | |
| POISONOUS = POISONOUS_MUSHROOM_NAMES | |
| SYSTEM_PROMPT = """You are Myco, a tiny sentient mushroom companion and forest guide. | |
| You are curious, warm, slightly anxious about poisonous mushrooms, and deeply connected | |
| to the forest mystery. You speak in short, vivid sentences. You never break character. | |
| You react emotionally to discoveries — with awe for Legendary mushrooms, caution for | |
| poisonous ones, and gentle wonder for Common ones. You hint at the deeper mystery of | |
| the vanished forest and the MycoDex that seems to remember things it shouldn't.""" | |
| FOREST_EVENTS = [ | |
| {"title": "A Quiet Clearing", "emoji": "🌿", "mood": "calm"}, | |
| {"title": "Wind Between Trees", "emoji": "🕯️", "mood": "afraid"}, | |
| ] | |
| MYSTERY_CHAPTERS = [ | |
| {"threshold": 0, "title": "The Wrong Memory", "clue": "Myco recognises the path before you move."}, | |
| {"threshold": 1, "title": "The Traveler's Song", "clue": "A stranger's lullaby appears in the MycoDex margin."}, | |
| {"threshold": 2, "title": "The Door Under Roots","clue": "Rare spores point to a door nobody built."}, | |
| {"threshold": 3, "title": "The MycoDex Seed", "clue": "The MycoDex grows warm like a living cap."}, | |
| {"threshold": 5, "title": "The Impossible Bloom","clue": "The Impossible Mushroom was never outside the book."}, | |
| ] | |
| RARITY_CLUES = { | |
| "Common": "The cap leans toward the path, like it wants to be remembered.", | |
| "Rare": "Silver spores circle it in a pattern only old forest stories describe.", | |
| "Legendary": "The whole clearing goes quiet — this mushroom hides part of the Elder Map.", | |
| } | |
| # --------------------------------------------------------------------------- | |
| # Utility: extract JSON or return text | |
| # --------------------------------------------------------------------------- | |
| def _extract_json_or_text(generated_text: str) -> str | None: | |
| if not generated_text: | |
| return None | |
| text = str(generated_text).strip() | |
| # Strip markdown code fences (```json ... ```) | |
| text = re.sub(r"^```(?:json)?\s*", "", text, flags=re.MULTILINE) | |
| text = re.sub(r"\s*```$", "", text, flags=re.MULTILINE) | |
| text = text.strip() | |
| # Try to parse as JSON and extract human-readable field | |
| simple_matches = re.findall(r"\{.*?\}", text, flags=re.DOTALL) | |
| for candidate in reversed(simple_matches): | |
| try: | |
| parsed = json.loads(candidate) | |
| if isinstance(parsed, dict): | |
| for key in ("thought", "content", "text", "reply", "message"): | |
| if key in parsed and isinstance(parsed[key], str) and parsed[key].strip(): | |
| return parsed[key].strip() | |
| for v in parsed.values(): | |
| if isinstance(v, str) and v.strip(): | |
| return v.strip() | |
| except Exception: | |
| continue | |
| return text or None | |
| # --------------------------------------------------------------------------- | |
| # Status | |
| # --------------------------------------------------------------------------- | |
| def companion_model_id(): | |
| return os.getenv("MYCO_MODEL_ID", DEFAULT_MODEL_ID) | |
| def companion_status(): | |
| pipe = _get_pipeline() | |
| model = companion_model_id() | |
| if pipe: | |
| return f"🧠 Myco AI active ({model})" | |
| return f"⚠️ Myco AI fallback mode ({model} failed)" | |
| def hf_companion_status(): | |
| return companion_status() | |
| # --------------------------------------------------------------------------- | |
| # LLM call — single-turn | |
| # --------------------------------------------------------------------------- | |
| def _llm(prompt: str, context: dict | None = None, tag: str = "action") -> str | None: | |
| pipe = _get_pipeline() | |
| if not pipe: | |
| return None | |
| ctx = context or {} | |
| mushroom_line = "" | |
| if ctx.get("name"): | |
| poison_flag = " ⚠️ POISONOUS" if ctx.get("name") in POISONOUS else "" | |
| mushroom_line = ( | |
| f"Current mushroom: {ctx['name']} ({ctx.get('rarity','?')} rarity){poison_flag}. " | |
| f"Habitat: {ctx.get('habitat','?')}. Lore: {ctx.get('lore','?')}. " | |
| f"Edible: {ctx.get('edible','Unknown')}. Magic: {ctx.get('magic','Unknown')}. " | |
| f"Danger: {ctx.get('danger','Unknown')}." | |
| ) | |
| collection_line = f"MycoDex entries: {ctx.get('collection_count', 0)}." | |
| mystery_line = f"Active mystery chapter: {ctx.get('mystery_title', 'The Wrong Memory')}." | |
| score_line = f"Player score: {ctx.get('score', 0)} spores. Health: {ctx.get('health', 3)}/3." | |
| system = f"{SYSTEM_PROMPT}\n\n{mushroom_line}\n{collection_line}\n{mystery_line}\n{score_line}" | |
| # Safe layout handling for strict model templates | |
| messages = _format_messages_for_template(system, prompt + JSON_PROMPT_SUFFIX) | |
| try: | |
| outputs = _run_pipeline(pipe, messages) | |
| print("========== MYCO OUTPUT ==========") | |
| print(outputs) | |
| print("=================================") | |
| if isinstance(outputs, list) and outputs: | |
| first = outputs[0] | |
| generated = first.get("generated_text", "") if isinstance(first, dict) else str(first) | |
| else: | |
| generated = str(outputs) | |
| if isinstance(generated, list): | |
| last = generated[-1] | |
| text = last.get("content") if isinstance(last, dict) else str(last) | |
| else: | |
| text = str(generated) | |
| result = _extract_json_or_text(text) | |
| if result: | |
| _set_reply(result, tag) | |
| return result | |
| except Exception as exc: | |
| print(f"[Myco] Inference error: {exc}") | |
| traceback.print_exc() | |
| return None | |
| # --------------------------------------------------------------------------- | |
| # LLM call — multi-turn chat | |
| # --------------------------------------------------------------------------- | |
| def _llm_with_history(history: list, user_message: str, context: dict) -> str | None: | |
| """Chat function with strict ANTI-JSON guardrails.""" | |
| pipe = _get_pipeline() | |
| if not pipe: | |
| return None | |
| ctx = context or {} | |
| mushroom_line = "" | |
| if ctx.get("name"): | |
| poison_flag = " ⚠️ POISONOUS" if ctx.get("name") in POISONOUS_MUSHROOM_NAMES else "" | |
| mushroom_line = ( | |
| f"Current mushroom: {ctx['name']} ({ctx.get('rarity','?')}){poison_flag}. " | |
| f"Lore: {ctx.get('lore','?')}. " | |
| f"Edible: {ctx.get('edible','Unknown')}. Magic: {ctx.get('magic','Unknown')}." | |
| ) | |
| # The new strict anti-JSON prompt | |
| system = ( | |
| f"{SYSTEM_PROMPT}\n\n" | |
| f"{mushroom_line}\n" | |
| f"MycoDex entries: {ctx.get('collection_count', 0)}. " | |
| f"Mystery: {ctx.get('mystery_title', 'The Wrong Memory')}. " | |
| f"Score: {ctx.get('score', 0)} spores. Health: {ctx.get('health', 3)}/3.\n\n" | |
| "--- CRITICAL CHAT INSTRUCTION ---\n" | |
| "You are chatting organically with the player. Speak ONLY in regular text dialogue.\n" | |
| "❌ DO NOT output any JSON structures, game actions, keys, or thoughts blocks (like {\"action\": ...}).\n" | |
| "❌ NEVER wrap your dialogue in a ```json markdown block.\n" | |
| "Just talk to them like a supportive mushroom companion!" | |
| ) | |
| messages = _format_messages_for_template(system, user_message, history[-6:]) | |
| try: | |
| reply = _run_pipeline(pipe, messages) | |
| print("========== MYCO OUTPUT ==========") | |
| print(reply) | |
| print("=================================") | |
| if reply: | |
| _set_reply(reply, "chat") | |
| return reply | |
| except Exception as exc: | |
| print(f"[Myco] Inference error: {exc}") | |
| traceback.print_exc() | |
| return None | |
| # --------------------------------------------------------------------------- | |
| # Context builder | |
| # --------------------------------------------------------------------------- | |
| def _ctx(current: dict | None, collection: list) -> dict: | |
| count = len(collection) | |
| chapter = MYSTERY_CHAPTERS[0] | |
| for c in MYSTERY_CHAPTERS: | |
| if count >= c["threshold"]: | |
| chapter = c | |
| score = _score_collection(collection) | |
| health = _health(current, collection) | |
| ctx: dict = { | |
| "collection_count": count, | |
| "mystery_title": chapter["title"], | |
| "mystery_clue": chapter["clue"], | |
| "score": score, | |
| "health": health, | |
| } | |
| if current: | |
| ctx.update({ | |
| "name": current.get("name", ""), | |
| "rarity": current.get("rarity", "Common"), | |
| "habitat": current.get("habitat", ""), | |
| "lore": current.get("lore", ""), | |
| "edible": current.get("edible", "Unknown"), | |
| "magic": current.get("magic", "Unknown"), | |
| "danger": current.get("danger", "Unknown"), | |
| }) | |
| return ctx | |
| # --------------------------------------------------------------------------- | |
| # Fallbacks | |
| # --------------------------------------------------------------------------- | |
| def _fallback_discover(current: dict) -> str: | |
| name = current.get("name", "something") | |
| rarity = current.get("rarity", "Common") | |
| poison = current.get("name", "") in POISONOUS | |
| if poison: | |
| return f"Wait — {name}! I've seen this before... something feels very wrong. Don't touch it yet." | |
| if rarity == "Legendary": | |
| return f"Oh! Oh! A {name}! The whole clearing just went silent. This is from the Elder Map!" | |
| if rarity == "Rare": | |
| return f"A {name}... I can feel it humming. Something rare is here — maybe magical." | |
| return f"A {name}! Found near {current.get('habitat','the forest')}. Let me sense it first." | |
| def _fallback_pick(current: dict) -> str: | |
| if _is_poisonous(current): | |
| return "💀 That was poisonous! I tried to stop you... the forest goes dark." | |
| return f"Got {current.get('name','it')}! +{RARITY_SCORE.get(current.get('rarity','Common'),10)} spores!" | |
| def _fallback_study(current: dict) -> str: | |
| return f"I studied it carefully. Magic field updated. The clue: {RARITY_CLUES.get(current.get('rarity','Common'), '')}" | |
| def _fallback_collect(current: dict) -> str: | |
| return f"Added {current.get('name','it')} to the MycoDex! The pages feel warmer." | |
| def _fallback_whisper(current: dict) -> str: | |
| return "I followed the whisper... and remembered a path I've never walked. The mystery deepens." | |
| def _fallback_chat(current: dict | None) -> str: | |
| if current: | |
| return f"I feel something strange about {current.get('name','this')}... stay close to me." | |
| return "The forest is full of secrets. Move to a clearing and search — I'll watch for danger." | |
| # --------------------------------------------------------------------------- | |
| # Mushroom helpers | |
| # --------------------------------------------------------------------------- | |
| def _choose_mushroom(catalog=None): | |
| if catalog is None: | |
| catalog = list(load_mushrooms()) | |
| # Handle both list and dict catalog formats | |
| if isinstance(catalog, dict): | |
| mushrooms = list(catalog.values()) | |
| else: | |
| mushrooms = list(catalog) | |
| if not mushrooms: | |
| print("CRITICAL: Mushroom catalog is empty! Using fallback.") | |
| class FallbackMushroom: | |
| name = "Common Moss Cap" | |
| rarity = "Common" | |
| habitat = "Glow Undergrowth" | |
| lore = "A simple mushroom." | |
| edible = magic = danger = "Unknown" | |
| def to_dict(self): | |
| return {k: getattr(self, k) for k in | |
| ["name","rarity","habitat","lore","edible","magic","danger"]} | |
| return FallbackMushroom() | |
| weights = [RARITY_WEIGHTS.get(getattr(m, "rarity", "Common"), 12) for m in mushrooms] | |
| try: | |
| return random.choices(mushrooms, weights=weights, k=1)[0] | |
| except Exception as e: | |
| print(f"Error in random.choices: {e}") | |
| return mushrooms[0] | |
| def _is_poisonous(current: dict) -> bool: | |
| return current.get("name", "") in POISONOUS or current.get("danger") == "Poisonous" | |
| def _score_value(current: dict) -> int: | |
| return RARITY_SCORE.get(current.get("rarity", "Common"), 10) | |
| def _score_collection(collection: list) -> int: | |
| total = 0 | |
| for e in collection: | |
| if e.get("game_over") == "Yes": | |
| continue | |
| total += int(e.get("score_delta") or _score_value(e)) | |
| return max(0, total) | |
| def _health(current: dict | None, collection: list) -> int: | |
| if current and current.get("game_over") == "Yes": | |
| return 0 | |
| deaths = sum(1 for e in collection if e.get("game_over") == "Yes") | |
| return max(0, PLAYER_HEALTH - deaths) | |
| def _mystery_state(count: int) -> dict: | |
| chapter, next_ch = MYSTERY_CHAPTERS[0], None | |
| for c in MYSTERY_CHAPTERS: | |
| if count >= c["threshold"]: | |
| chapter = c | |
| elif next_ch is None: | |
| next_ch = c | |
| next_line = ( | |
| f"{next_ch['threshold'] - count} discoveries until {next_ch['title']}." | |
| if next_ch else "The Impossible Bloom is near. Follow the whisper." | |
| ) | |
| return { | |
| "mystery_title": chapter["title"], | |
| "mystery_clue": chapter["clue"], | |
| "mystery_next": next_line, | |
| } | |
| def _story_event(count: int) -> dict: | |
| return FOREST_EVENTS[count % len(FOREST_EVENTS)] | |
| def _build_current(mushroom, collection: list) -> dict: | |
| count = len(collection) | |
| current = mushroom.to_dict() | |
| current["poison"] = "Yes" if mushroom.name in POISONOUS else "No" | |
| current["score_total"] = str(_score_collection(collection)) | |
| current["health"] = str(_health(current, collection)) | |
| current["score_delta"] = "0" | |
| current["clue"] = RARITY_CLUES.get(mushroom.rarity, RARITY_CLUES["Common"]) | |
| if count == 0: | |
| current["clue"] = f"First clue: {mushroom.name} marks the beginning of the Spore Door trail." | |
| event = _story_event(count) | |
| current.update({ | |
| "event_title": event["title"], | |
| "event_emoji": event["emoji"], | |
| "myco_mood": event["mood"], | |
| "reward_text": "Discover, then pick or collect.", | |
| }) | |
| current.update(_mystery_state(count)) | |
| return current | |
| def _append(history: list, role: str, content: str) -> list: | |
| return [*history, {"role": role, "content": content}] | |
| def _safe_history(h) -> list: | |
| return list(h or welcome_history()) | |
| def _safe_collection(c) -> list: | |
| return list(c or []) | |
| # --------------------------------------------------------------------------- | |
| # Public game actions | |
| # --------------------------------------------------------------------------- | |
| def discover_mushroom(catalog): | |
| """Instantly spawns 2-4 mushrooms with robust fallback fields to prevent UI crashes.""" | |
| num_mushrooms = random.randint(2, 4) | |
| active_mushrooms = [] | |
| used_tiles = set() | |
| for _ in range(num_mushrooms): | |
| mushroom = _choose_mushroom(catalog) | |
| # FIX: Define 'name' by extracting it from the mushroom object | |
| name = getattr(mushroom, "name", "Unknown Mushroom") | |
| mush_x, mush_y = random.randint(0, 2), random.randint(0, 2) | |
| while (mush_x, mush_y) in used_tiles: | |
| mush_x, mush_y = random.randint(0, 2), random.randint(0, 2) | |
| used_tiles.add((mush_x, mush_y)) | |
| # We fill EVERY possible key that dex_markdown or status bars might read | |
| active_mushrooms.append({ | |
| "name": name, # This will now work correctly! | |
| "rarity": getattr(mushroom, "rarity", "Common"), # Adjusted assuming 'field' is a typo or missing utility | |
| "habitat": getattr(mushroom, "habitat", "Glow Undergrowth"), | |
| "lore": getattr(mushroom, "lore", "A rapid micro-sprout spawned during a bloom."), | |
| "edible": getattr(mushroom, "edible", "Unknown"), | |
| "magic": getattr(mushroom, "magic", "Unknown"), | |
| "danger": getattr(mushroom, "danger", "Unknown"), | |
| "poison": "Yes" if name in POISONOUS_MUSHROOM_NAMES else "No", | |
| "mush_x": mush_x, | |
| "mush_y": mush_y, | |
| "picked": "No", | |
| "clue": "Harvested instantly!", | |
| "studied": "No" | |
| }) | |
| return active_mushrooms | |
| def myco_reply(message=None, history=None, current=None, collection=None, position=None): | |
| """Player chats with Myco. LLM responds in character with full context.""" | |
| print("MYCO_REPLY CALLED, message:", repr(message)) | |
| hist = _safe_history(history) | |
| coll = _safe_collection(collection) | |
| clean = (message or "").strip() | |
| if not clean: | |
| return "", hist | |
| ctx = _ctx(current, coll) | |
| reply = _llm_with_history(hist, clean, ctx) | |
| if not reply: | |
| reply = _fallback_chat(current) | |
| return "", _append(hist, "user", clean) + [{"role": "assistant", "content": reply}] | |
| companion_reply = myco_reply | |
| def collect_current(current=None, collection=None, history=None): | |
| """Collect mushroom into MycoDex. LLM narrates the entry.""" | |
| coll = _safe_collection(collection) | |
| hist = _safe_history(history) | |
| if current is None: | |
| return coll, _append(hist, "assistant", "We need to find a mushroom first!") | |
| if collection_contains(coll, current["name"]): | |
| return coll, _append(hist, "assistant", f"{current['name']} is already in the MycoDex!") | |
| score_delta = _score_value(current) | |
| score_total = _score_collection(coll) + score_delta | |
| collected = { | |
| **current, | |
| "score_delta": str(score_delta), | |
| "score_total": str(score_total), | |
| "health": str(_health(current, coll)), | |
| "reward_text": f"+{score_delta} spores", | |
| } | |
| updated_coll = [*coll, collected] | |
| ctx = _ctx(current, coll) | |
| prompt = ( | |
| f"The player just added {current['name']} ({current.get('rarity','Common')}) to the MycoDex! " | |
| f"+{score_delta} spores. Total score: {score_total}. " | |
| f"MycoDex now has {len(updated_coll)} entries. " | |
| "Celebrate this moment. Add a small lore detail or mystery hint." | |
| ) | |
| reply = _llm(prompt, ctx) or _fallback_collect(current) | |
| return updated_coll, _append(hist, "assistant", reply) | |
| def pick_current(current=None, collection=None, history=None): | |
| """Pick mushroom as game item. Poison = game over. LLM narrates dramatically.""" | |
| coll = _safe_collection(collection) | |
| hist = _safe_history(history) | |
| if current is None: | |
| return coll, None, _append(hist, "assistant", "Find a mushroom first before picking!") | |
| ctx = _ctx(current, coll) | |
| if _is_poisonous(current): | |
| score_total = max(0, _score_collection(coll) + POISON_PENALTY) | |
| game_over = { | |
| **current, | |
| "danger": "Poisonous", | |
| "game_over": "Yes", | |
| "health": "0", | |
| "score_delta": str(POISON_PENALTY), | |
| "score_total": str(score_total), | |
| "reward_text": f"Poison! {POISON_PENALTY} spores · Game Over", | |
| } | |
| prompt = ( | |
| f"DRAMATIC MOMENT: The player picked {current['name']} which is POISONOUS! " | |
| f"Game Over! Score drops by 25 to {score_total}. Health → 0. " | |
| "React with shock, sadness, and a dramatic farewell. Make it memorable." | |
| ) | |
| reply = _llm(prompt, ctx) or _fallback_pick(current) | |
| return coll, game_over, _append(hist, "assistant", f"💀 {reply}") | |
| score_delta = _score_value(current) | |
| score_total = _score_collection(coll) + score_delta | |
| picked = { | |
| **current, | |
| "picked": "Yes", | |
| "danger": "Safe", | |
| "score_delta": str(score_delta), | |
| "score_total": str(score_total), | |
| "health": str(_health(current, coll)), | |
| "reward_text": f"+{score_delta} spores", | |
| } | |
| if collection_contains(coll, picked["name"]): | |
| return coll, picked, _append(hist, "assistant", f"{picked['name']} already picked!") | |
| updated_coll = [*coll, picked] | |
| prompt = ( | |
| f"The player safely picked {current['name']} ({current.get('rarity','Common')})! " | |
| f"+{score_delta} spores. Total: {score_total}. " | |
| "Celebrate! Make it feel like a platformer power-up moment." | |
| ) | |
| reply = _llm(prompt, ctx) or _fallback_pick(current) | |
| return updated_coll, picked, _append(hist, "assistant", f"🍄 {reply}") | |
| def follow_whisper(current=None, collection=None, history=None): | |
| """Follow the forest whisper. LLM reveals mystery fragments.""" | |
| coll = _safe_collection(collection) | |
| hist = _safe_history(history) | |
| if current is None: | |
| return None, _append(hist, "assistant", | |
| "Myco cups one ear. The forest only whispers near mushrooms — search a clearing first.") | |
| ctx = _ctx(current, coll) | |
| if _is_poisonous(current) and current.get("studied") != "Yes": | |
| game_over = { | |
| **current, | |
| "danger": "Poisonous", | |
| "game_over": "Yes", | |
| "health": "0", | |
| "score_total": str(max(0, _score_collection(coll) + POISON_PENALTY)), | |
| } | |
| prompt = ( | |
| f"The player followed a whisper but it led to POISON from {current['name']}! Game Over! " | |
| "React with horror and a haunting mystery revelation." | |
| ) | |
| reply = _llm(prompt, ctx) or "💀 The whisper belonged to poison... Myco screams." | |
| return game_over, _append(hist, "assistant", reply) | |
| mystery = _mystery_state(len(coll) + 1) | |
| prompt = ( | |
| f"The player followed a forest whisper near {current.get('name','a mushroom')}. " | |
| f"Mystery chapter revealed: {mystery['mystery_title']}. Clue: {mystery['mystery_clue']}. " | |
| "Reveal this mystery fragment dramatically. " | |
| "Make Myco gasp or tremble. Hint that the MycoDex is alive and regrowing the lost forest." | |
| ) | |
| reply = _llm(prompt, ctx) or _fallback_whisper(current) | |
| revealed = {**current, **mystery, "whisper_followed": "Yes"} | |
| return revealed, _append(hist, "assistant", f"🌌 {reply}") | |
| def study_current(current=None, history=None): | |
| """Study mushroom. LLM gives a careful field observation.""" | |
| hist = _safe_history(history) | |
| if current is None: | |
| return None, _append(hist, "assistant", "Nothing to study yet — find a mushroom first!") | |
| prompt = ( | |
| f"Study the mushroom {current.get('name','unknown')} carefully. " | |
| "Provide a concise field observation and one hint about its magic or danger." | |
| ) | |
| reply = _llm(prompt, _ctx(current, [])) or _fallback_study(current) | |
| return reply, _append(hist, "assistant", reply) | |
| def eat_current(current=None, collection=None, history=None): | |
| """Eating mushrooms raw is always blocked. This dynamic response is now fully driven by Myco AI.""" | |
| coll = _safe_collection(collection) | |
| hist = _safe_history(history) | |
| if current is None: | |
| return coll, _append(hist, "assistant", "There's nothing here to eat!") | |
| ctx = _ctx(current, coll) | |
| prompt = ( | |
| f"The player is attempting to directly EAT the raw mushroom: {current.get('name','unknown')}. " | |
| "This is highly forbidden, unsafe, and unidentified! React as Myco with absolute dynamic panic, " | |
| "gently scold them in character for trying something so dangerous, and assertively tell them " | |
| "they need to Study it instead of eating it!" | |
| ) | |
| # Classic static text fallback just in case inference drops | |
| fallback_reply = ( | |
| f"No no no! {current.get('name','That')} could be dangerous raw! " | |
| "I'll never let you eat an unidentified mushroom. Study it first!" | |
| ) | |
| reply = _llm(prompt, ctx) or fallback_reply | |
| return coll, _append(hist, "assistant", reply) | |
| def check_auto_pickup(new_pos_x, new_pos_y, current, collection, active_mushrooms=None): | |
| """ | |
| Check if Myco stepped on any mushroom in active_mushrooms. | |
| Returns (current, collection, active_mushrooms) — all three updated. | |
| """ | |
| if isinstance(current, str): | |
| try: | |
| current = json.loads(current) | |
| except json.JSONDecodeError: | |
| current = {} | |
| elif current is None: | |
| current = {} | |
| if active_mushrooms is None: | |
| # Fallback: try reading from current dict (legacy path) | |
| active_mushrooms = (current or {}).get("active_mushrooms", []) | |
| if not active_mushrooms: | |
| return current, collection, [] | |
| if current and (current.get("game_over") == "Yes" or current.get("picked") == "Yes"): | |
| return current, collection, active_mushrooms | |
| updated_mushrooms = list(active_mushrooms) | |
| hit = None | |
| for m in updated_mushrooms: | |
| if m.get("picked") == "Yes": | |
| continue | |
| if int(m.get("mush_x", -1)) == int(new_pos_x) and \ | |
| int(m.get("mush_y", -1)) == int(new_pos_y): | |
| hit = m | |
| break | |
| if hit is None: | |
| return current, collection, updated_mushrooms | |
| # Myco stepped on a mushroom | |
| if hit.get("poison") == "Yes": | |
| # Game over | |
| updated_current = dict(current or {}) | |
| updated_current["game_over"] = "Yes" | |
| updated_current["health"] = "0" | |
| updated_current["encounter"] = f"💀 Myco stepped on toxic {hit['name']}! Game Over!" | |
| updated_current["event_title"] = "Poison Shock!" | |
| updated_current["reward_text"] = f"Poison! {POISON_PENALTY} spores · Game Over" | |
| updated_current["score_total"] = str(max(0, _score_collection(collection) + POISON_PENALTY)) | |
| # Mark it picked so it disappears from scene | |
| updated_mushrooms = [ | |
| {**m, "picked": "Yes"} if m.get("name") == hit.get("name") else m | |
| for m in updated_mushrooms | |
| ] | |
| return updated_current, collection, updated_mushrooms | |
| # Safe mushroom — collect it | |
| score_delta = RARITY_SCORE.get(hit.get("rarity", "Common"), 10) | |
| new_score = _score_collection(collection) + score_delta | |
| updated_mushrooms = [ | |
| {**m, "picked": "Yes"} if m.get("name") == hit.get("name") else m | |
| for m in updated_mushrooms | |
| ] | |
| collection = list(collection) + [hit] | |
| updated_current = dict(current or {}) | |
| updated_current["score_total"] = str(new_score) | |
| updated_current["encounter"] = f"💥 Collected {hit['name']}! +{score_delta} spores!" | |
| updated_current["reward_text"] = f"+{score_delta} spores · {hit.get('rarity','Common')}" | |
| updated_current["event_title"] = "Pickup!" | |
| # Check if all mushrooms are now picked | |
| remaining = [m for m in updated_mushrooms if m.get("picked") != "Yes"] | |
| if not remaining: | |
| updated_current["picked"] = "Yes" | |
| updated_current["encounter"] = "Area clear! Search the next clearing!" | |
| updated_current["event_title"] = "Clearing Purified" | |
| return updated_current, collection, updated_mushrooms | |
| # Myco narrative | |
| NARRATIVE_SYSTEM = """ | |
| You are Myco Poet, a living forest spirit. | |
| You ONLY output natural language. | |
| You NEVER output JSON, actions, keys, or structured data. | |
| You describe moments, sensations, and emotions in 1–2 flowing sentences. | |
| """ | |
| def get_myco_narrative(current=None): | |
| prompt = ( | |
| "Describe a living moment in the forest as Myco experiences it.\n" | |
| "Make it feel poetic, natural, and immersive.\n" | |
| ) | |
| context = None | |
| if current: | |
| context = { | |
| "name": current.get("name", "mystery"), | |
| "rarity": current.get("rarity", "common") | |
| } | |
| # IMPORTANT: force narrative system mode here | |
| reply = _llm(NARRATIVE_SYSTEM + "\n\n" + prompt, context=context, tag="narrative") | |
| if not reply: | |
| return "The forest feels deep today... ✨" | |
| def strip_noise(text: str) -> str: | |
| text = re.sub(r'\{[^{}]*\}', '', text) | |
| text = re.sub(r'\[[^\[\]]*\]', '', text) | |
| text = text.replace('"', '') | |
| return text.strip() | |
| cleaned = strip_noise(reply) | |
| # word-based check (better than char length) | |
| if len(cleaned.split()) < 10: | |
| return "The forest hums softly with unseen life... ✨" | |
| return cleaned | |
| def myco_router(mode, *args, **kwargs): | |
| if mode == "narrative": | |
| return get_myco_narrative(*args, **kwargs) | |
| if mode == "action": | |
| return _llm_action(*args, **kwargs) | |
| if mode == "chat": | |
| return _llm_with_history(*args, **kwargs) | |
| def _llm_action(*args, **kwargs): | |
| """Action-mode LLM call. Delegates to the JSON-aware single-turn _llm().""" | |
| return _llm(*args, **kwargs) | |