Spaces:
Running
Running
| """ | |
| AdventureAgent β generates Blockly / Time Challenge story arcs. | |
| Model priority chain (tries each in order, falls to next on failure): | |
| 1. YOUR GGUF fine-tune (faaizashiq/adventure-agent-qwen-gguf) | |
| β llama-cpp-python on HF Spaces free CPU (16GB RAM) | |
| β Slow first load (~60s), fast after warm (~30s) | |
| β FREE FOREVER, uses your actual fine-tuned weights | |
| 2. Gemini 2.0 Flash fallback | |
| β Used automatically if GGUF fails or is still loading | |
| β Fast (~3-5s), free tier (1500 req/day) | |
| β Requires GEMINI_API_KEY | |
| 3. Static fallback levels | |
| β Used if both AI paths fail (no internet, no keys, etc.) | |
| Environment variables (set as HF Space Secrets): | |
| GEMINI_API_KEY β for fallback path 2 | |
| ADVENTURE_TUNED_MODEL β your HF repo: faaizashiq/adventure-agent-qwen-gguf | |
| ADVENTURE_GGUF_FILE β specific .gguf filename (auto-detected if blank) | |
| HF_TOKEN β only needed if your GGUF repo is private | |
| """ | |
| import google.generativeai as genai | |
| import json | |
| import os | |
| import random | |
| import time | |
| import csv | |
| import sys | |
| import threading | |
| from typing import Dict, Any, Optional, List | |
| csv.field_size_limit(sys.maxsize) | |
| STUDENTS_DATASET_ID = "majeedkazemi/students-coding-questions-from-ai-assistant" | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # GGUF SINGLETON LOADER | |
| # Loads once on first request, stays in memory for all subsequent requests. | |
| # Thread-safe: uses a lock so concurrent requests don't double-load. | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| _gguf_llm = None | |
| _gguf_load_lock = threading.Lock() | |
| _gguf_load_attempted = False | |
| _gguf_load_failed = False | |
| def _load_gguf_model(repo_id: str, gguf_filename: Optional[str] = None): | |
| """ | |
| Download and load a GGUF model from HF Hub via llama-cpp-python. | |
| Returns the Llama instance, or None if loading fails. | |
| Subsequent calls return the cached instance immediately. | |
| """ | |
| global _gguf_llm, _gguf_load_attempted, _gguf_load_failed | |
| # Fast path β already loaded or already failed | |
| if _gguf_load_attempted: | |
| return _gguf_llm | |
| with _gguf_load_lock: | |
| # Double-check inside lock | |
| if _gguf_load_attempted: | |
| return _gguf_llm | |
| _gguf_load_attempted = True | |
| try: | |
| # Check if llama-cpp-python is installed before attempting anything | |
| try: | |
| from llama_cpp import Llama # type: ignore | |
| except ImportError: | |
| print("[AdventureAgent] β llama-cpp-python not installed β GGUF unavailable. Using Gemini fallback.") | |
| _gguf_load_failed = True | |
| return None | |
| from huggingface_hub import hf_hub_download, list_repo_files | |
| hf_token = os.getenv("HF_TOKEN", "").strip() or None | |
| # Auto-detect the best GGUF file if not pinned | |
| if not gguf_filename: | |
| print(f"[AdventureAgent] Scanning {repo_id} for GGUF files...") | |
| all_files = list(list_repo_files(repo_id, token=hf_token)) | |
| gguf_files = [f for f in all_files if f.endswith(".gguf")] | |
| if not gguf_files: | |
| raise FileNotFoundError(f"No .gguf files found in {repo_id}") | |
| # Prefer Q4_K_M β best quality/speed balance on CPU | |
| # Then Q4_0, then Q5_K_M, then whatever is smallest | |
| def _rank(name: str) -> int: | |
| n = name.lower() | |
| if "q4_k_m" in n: return 0 | |
| if "q4_0" in n: return 1 | |
| if "q5_k_m" in n: return 2 | |
| if "q3_k_m" in n: return 3 | |
| return 99 | |
| gguf_files.sort(key=_rank) | |
| gguf_filename = gguf_files[0] | |
| print(f"[AdventureAgent] Auto-selected GGUF: {gguf_filename}") | |
| cache_dir = os.path.join(os.getenv("HF_HOME", "/tmp"), "gguf_models") | |
| print(f"[AdventureAgent] Downloading {repo_id}/{gguf_filename} ...") | |
| model_path = hf_hub_download( | |
| repo_id=repo_id, | |
| filename=gguf_filename, | |
| token=hf_token, | |
| local_dir=cache_dir, | |
| ) | |
| print(f"[AdventureAgent] Loading GGUF from {model_path} ...") | |
| _gguf_llm = Llama( | |
| model_path=model_path, | |
| n_ctx=2048, # context window | |
| n_threads=2, # HF Spaces free tier: 2 vCPU | |
| n_gpu_layers=0, # CPU only β no GPU on free tier | |
| verbose=False, | |
| ) | |
| print("[AdventureAgent] β GGUF model loaded and ready on CPU.") | |
| except ImportError: | |
| print("[AdventureAgent] β llama-cpp-python not installed.") | |
| _gguf_load_failed = True | |
| except Exception as e: | |
| print(f"[AdventureAgent] β GGUF load failed: {e}") | |
| _gguf_load_failed = True | |
| return _gguf_llm | |
| def _data_dir() -> str: | |
| return os.path.join(os.path.dirname(__file__), '..', 'data') | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # AGENT | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| class AdventureAgent: | |
| _dataset_cache: List[Dict[str, str]] = [] | |
| _students_cache: List[Dict[str, Any]] = [] | |
| _few_shot_cache: Optional[Dict[str, Any]] = None | |
| def __init__(self, api_key: Optional[str] = None): | |
| self.api_key = api_key or os.getenv("GEMINI_API_KEY") | |
| self.tuned_model_env = os.getenv("ADVENTURE_TUNED_MODEL", "").strip() | |
| self.gguf_file_env = os.getenv("ADVENTURE_GGUF_FILE", "").strip() or None | |
| # Decide which primary path to use | |
| self.use_gguf = ( | |
| bool(self.tuned_model_env) | |
| and "/" in self.tuned_model_env | |
| and not self.tuned_model_env.startswith("tunedModels/") | |
| ) | |
| # Gemini client β used as primary (if no GGUF) or fallback (if GGUF fails) | |
| self.gemini_client = None | |
| gemini_model_id = "gemini-2.0-flash" | |
| if self.tuned_model_env.startswith("tunedModels/"): | |
| gemini_model_id = self.tuned_model_env # Gemini tuned model | |
| if self.api_key: | |
| genai.configure(api_key=self.api_key) | |
| self.gemini_client = genai.GenerativeModel(gemini_model_id) | |
| if self.use_gguf: | |
| print(f"[AdventureAgent] Primary: GGUF ({self.tuned_model_env})") | |
| print(f"[AdventureAgent] Fallback: {'Gemini ' + gemini_model_id if self.gemini_client else 'Static levels'}") | |
| # Warm up GGUF model in background thread during application startup | |
| threading.Thread( | |
| target=_load_gguf_model, | |
| args=(self.tuned_model_env, self.gguf_file_env), | |
| daemon=True | |
| ).start() | |
| elif self.gemini_client: | |
| print(f"[AdventureAgent] Primary: Gemini ({gemini_model_id})") | |
| else: | |
| print("[AdventureAgent] WARNING: No model configured β static fallback only.") | |
| self.cache: Dict[str, Any] = {} | |
| self.cache_duration = 3600 | |
| self.high_rated_levels: List[Dict] = [] | |
| self._load_dataset_lazy() | |
| self._load_students_dataset_lazy() | |
| self._load_few_shot_examples() | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # DATA LOADING | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _load_dataset_lazy(self): | |
| if AdventureAgent._dataset_cache: | |
| return | |
| file_path = os.path.join(_data_dir(), 'train.csv') | |
| if os.path.exists(file_path): | |
| try: | |
| with open(file_path, "r", encoding="utf-8") as f: | |
| reader = csv.DictReader(f) | |
| for i, row in enumerate(reader): | |
| if i >= 500: | |
| break | |
| if row.get('question'): | |
| AdventureAgent._dataset_cache.append(row) | |
| print(f"[AdventureAgent] Loaded {len(AdventureAgent._dataset_cache)} rows from train.csv") | |
| except Exception as e: | |
| print(f"[AdventureAgent] train.csv load error: {e}") | |
| def _load_students_dataset_lazy(self): | |
| if AdventureAgent._students_cache: | |
| return | |
| jsonl_path = os.path.join(_data_dir(), 'students_coding_questions.jsonl') | |
| if os.path.isfile(jsonl_path): | |
| try: | |
| with open(jsonl_path, "r", encoding="utf-8") as f: | |
| for line in f: | |
| line = line.strip() | |
| if line: | |
| AdventureAgent._students_cache.append(json.loads(line)) | |
| print(f"[AdventureAgent] Loaded {len(AdventureAgent._students_cache)} CodeAid rows") | |
| return | |
| except Exception as e: | |
| print(f"[AdventureAgent] students jsonl error: {e}") | |
| if os.getenv("ADVENTURE_LOAD_HF_DATASET", "").lower() in ("1", "true", "yes"): | |
| try: | |
| from datasets import load_dataset | |
| ds = load_dataset(STUDENTS_DATASET_ID, split="train") | |
| for i, row in enumerate(ds): | |
| if i >= 800: | |
| break | |
| stem = (row.get("input_task_description") | |
| or row.get("input_intention") | |
| or row.get("input_question")) | |
| if stem and str(stem).strip().lower() not in ("null", "none", ""): | |
| AdventureAgent._students_cache.append({ | |
| "problem_stem": str(stem)[:500], | |
| "feature_type": row.get("feature_type"), | |
| }) | |
| print(f"[AdventureAgent] Loaded {len(AdventureAgent._students_cache)} rows from HF") | |
| except Exception as e: | |
| print(f"[AdventureAgent] HF dataset skip: {e}") | |
| def _load_few_shot_examples(self): | |
| if AdventureAgent._few_shot_cache is not None: | |
| return | |
| path = os.path.join(_data_dir(), 'adventure_few_shot_examples.json') | |
| if os.path.isfile(path): | |
| try: | |
| with open(path, "r", encoding="utf-8") as f: | |
| AdventureAgent._few_shot_cache = json.load(f) | |
| except Exception as e: | |
| print(f"[AdventureAgent] few-shot load error: {e}") | |
| AdventureAgent._few_shot_cache = {} | |
| else: | |
| AdventureAgent._few_shot_cache = {} | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # PROMPT HELPERS | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _pick_student_problem(self) -> Optional[Dict[str, Any]]: | |
| if not AdventureAgent._students_cache: | |
| return None | |
| weighted = [r for r in AdventureAgent._students_cache if r.get("task_description")] | |
| pool = weighted if weighted else AdventureAgent._students_cache | |
| row = random.choice(pool) | |
| return { | |
| "question": row.get("problem_stem") or row.get("task_description") or row.get("question", ""), | |
| "feature_type": row.get("feature_type", ""), | |
| } | |
| def _few_shot_block(self, mode: str, difficulty: str) -> str: | |
| if not AdventureAgent._few_shot_cache: | |
| return "" | |
| mode_key = "blockly" if mode == "blockly" else mode | |
| examples = (AdventureAgent._few_shot_cache.get(mode_key) or {}).get(difficulty) | |
| if not examples: | |
| return "" | |
| return ( | |
| "\nQUALITY REFERENCE (match this tone and JSON shape β do not copy verbatim):\n" | |
| f"{json.dumps(examples, indent=2)[:3000]}\n" | |
| ) | |
| def learn_from_feedback(self, level_data: Dict[str, Any], rating: int, | |
| developer_feedback: Optional[str] = None): | |
| if developer_feedback and str(developer_feedback).strip(): | |
| level_data["CRITICAL_DEVELOPER_TUNING_NOTE"] = str(developer_feedback).strip() | |
| if rating == 5 or (developer_feedback and str(developer_feedback).strip()): | |
| if len(self.high_rated_levels) > 50: | |
| self.high_rated_levels.pop(0) | |
| self.high_rated_levels.append(level_data) | |
| def _build_prompt(self, mode: str, difficulty: str, | |
| dataset_problem: Optional[Dict[str, str]] = None, | |
| is_gguf: bool = False) -> str: | |
| if difficulty == "Easy": | |
| diff_rules = "EASY: Simple If/Else or basic sequencing. No complex loops." | |
| concepts = '["if", "print"]' | |
| elif difficulty == "Intermediate": | |
| diff_rules = "INTERMEDIATE: MUST use a loop (For/While) or basic Array iteration." | |
| concepts = '["loop", "variable"]' | |
| else: | |
| diff_rules = "EXPERT: Nested loops, complex logic, or algorithmic thinking." | |
| concepts = '["loop", "nested_loop"]' | |
| dataset_block = "" | |
| if dataset_problem: | |
| q = dataset_problem.get('question') or dataset_problem.get('problem_stem', '') | |
| ft = dataset_problem.get('feature_type', '') | |
| dataset_block = ( | |
| f"\nREAL STUDENT PROBLEM (adapt for ages 10-14, sci-fi story):\n" | |
| f"Type: {ft}\nContext: {q}\n" | |
| "Do NOT use C pointers, malloc, or university-level topics.\n" | |
| ) | |
| prompt = ( | |
| "You are the Lead Game Designer for 'CodeCracker', an educational coding game for ages 10-14.\n" | |
| "Generate a UNIQUE STORY ARC of EXACTLY 5 interconnected game levels.\n\n" | |
| f"Mode: {mode} | Difficulty: {difficulty}\n" | |
| f"{diff_rules}\n" | |
| f"{dataset_block}" | |
| "\nSCAFFOLDING RULES:\n" | |
| "- Never give the full solution code.\n" | |
| "- Teach one concept per level; build across the arc.\n" | |
| "- Hints guide structure first, logic second β never paste the answer.\n" | |
| ) | |
| if not is_gguf: | |
| prompt += f"{self._few_shot_block(mode, difficulty)}" | |
| prompt += ( | |
| "\nOUTPUT: RAW JSON ONLY. No markdown. No explanation.\n" | |
| 'Top-level keys: "story_arc_title", "levels" (array of exactly 5).\n' | |
| ) | |
| if mode == "blockly": | |
| prompt += ( | |
| f'\nEach level: {{"type":"blockly","level_id":"id","title":"...","story":"...",' | |
| f'"concept_tutorial":"...","problem":"...","toolbox_categories":["Math","Loops","Variables","Text","Logic"],' | |
| f'"initial_code":"","validation_rules":{{"required_concepts":{concepts},"expected_output":"..."}},' | |
| f'"hint_1":"...","hint_2":"..."}}' | |
| ) | |
| elif mode == "time_challenge": | |
| prompt += ( | |
| '\nEach level: {"type":"time_challenge","level_id":"id","title":"...","story":"...",' | |
| '"concept_tutorial":"...","timer_seconds":60,"buggy_code":"...","task":"...","solution_patch":"..."}' | |
| ) | |
| if not is_gguf and self.high_rated_levels: | |
| prompt += ( | |
| f"\n\nDEVELOPER-APPROVED EXAMPLES (match this quality):\n" | |
| f"{json.dumps(self.high_rated_levels[-2:], indent=2)[:1500]}" | |
| ) | |
| return prompt | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # GGUF INFERENCE | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _call_gguf(self, prompt: str) -> str: | |
| llm = _load_gguf_model(self.tuned_model_env, self.gguf_file_env) | |
| if llm is None: | |
| raise RuntimeError("GGUF model unavailable") | |
| print("[AdventureAgent] Running GGUF inference on CPU...") | |
| response_chunks = [] | |
| try: | |
| for chunk in llm.create_chat_completion( | |
| messages=[ | |
| { | |
| "role": "system", | |
| "content": ( | |
| "You are the Lead Game Designer for CodeCracker, an educational " | |
| "coding game for ages 10-14. Always respond with valid JSON only. " | |
| "No markdown, no explanation." | |
| ), | |
| }, | |
| {"role": "user", "content": prompt}, | |
| ], | |
| max_tokens=1500, | |
| temperature=0.7, | |
| top_p=0.9, | |
| stop=["<|im_end|>", "<|endoftext|>"], | |
| stream=True | |
| ): | |
| token = chunk["choices"][0]["delta"].get("content", "") | |
| if token: | |
| response_chunks.append(token) | |
| print(token, end="", flush=True) | |
| except Exception as e: | |
| print(f"\n[AdventureAgent] GGUF streaming error: {e}") | |
| raise e | |
| print() # New line after streaming completes | |
| text = "".join(response_chunks) | |
| print("[AdventureAgent] GGUF inference complete.") | |
| return text | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # GEMINI INFERENCE | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _call_gemini(self, prompt: str) -> str: | |
| if not self.gemini_client: | |
| raise RuntimeError("Gemini client not configured (no API key)") | |
| print("[AdventureAgent] Calling Gemini API...") | |
| response = self.gemini_client.generate_content(prompt) | |
| return response.text or "" | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # JSON EXTRACTION | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _extract_json(text: str) -> Dict[str, Any]: | |
| """Strip markdown fences and extract the first JSON object.""" | |
| if "```json" in text: | |
| text = text.split("```json")[1].split("```")[0] | |
| elif "```" in text: | |
| text = text.split("```")[1].split("```")[0] | |
| # Find the opening brace | |
| start = text.find("{") | |
| if start != -1: | |
| text = text[start:] | |
| return json.loads(text.strip()) | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # PUBLIC ENTRY POINT | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def generate_level(self, mode: str, difficulty: str, | |
| specific_topic: Optional[str] = None) -> Dict[str, Any]: | |
| if difficulty in ["Tutorial", "Onboarding"]: | |
| return self._get_onboarding_level(mode) | |
| # Pick a student problem for context | |
| dataset_prob: Optional[Dict[str, Any]] = None | |
| if specific_topic: | |
| dataset_prob = {"question": specific_topic} | |
| elif mode == "blockly": | |
| dataset_prob = self._pick_student_problem() | |
| if not dataset_prob and AdventureAgent._dataset_cache: | |
| row = random.choice(AdventureAgent._dataset_cache) | |
| dataset_prob = {"question": (row.get("question") or "")[:600]} | |
| # Cache key β avoids hammering the model for identical requests | |
| cache_key = ( | |
| f"{mode}_{difficulty}_" | |
| f"{specific_topic or (dataset_prob or {}).get('question', '')[:80]}" | |
| ) | |
| if cache_key in self.cache: | |
| entry = self.cache[cache_key] | |
| if time.time() - entry['timestamp'] < self.cache_duration: | |
| print("[AdventureAgent] Serving from cache.") | |
| return entry['data'] | |
| # ββ Try GGUF first, then Gemini, then static fallback ββββββββββββββ | |
| text = None | |
| if self.use_gguf: | |
| try: | |
| gguf_prompt = self._build_prompt(mode, difficulty, dataset_prob, is_gguf=True) | |
| text = self._call_gguf(gguf_prompt) | |
| except Exception as e: | |
| import traceback | |
| print(f"[AdventureAgent] GGUF failed: {e}. Trying Gemini fallback...") | |
| traceback.print_exc() | |
| if text is None: | |
| try: | |
| gemini_prompt = self._build_prompt(mode, difficulty, dataset_prob, is_gguf=False) | |
| text = self._call_gemini(gemini_prompt) | |
| except Exception as e: | |
| import traceback | |
| print(f"[AdventureAgent] Gemini failed: {e}. Using static fallback.") | |
| traceback.print_exc() | |
| return self._get_fallback_level(mode, difficulty) | |
| try: | |
| level_data = self._extract_json(text) | |
| self.cache[cache_key] = {"data": level_data, "timestamp": time.time()} | |
| return level_data | |
| except Exception as e: | |
| import traceback | |
| print(f"[AdventureAgent] JSON parse failed: {e}. Using static fallback.") | |
| traceback.print_exc() | |
| return self._get_fallback_level(mode, difficulty) | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # STATIC LEVELS | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _get_onboarding_level(self, mode: str) -> Dict[str, Any]: | |
| return { | |
| "story_arc_title": "Welcome to CodeCracker!", | |
| "levels": [{ | |
| "type": "blockly", | |
| "level_id": "tutorial_001", | |
| "title": "Welcome to Coding!", | |
| "story": "Let's learn how to speak to the computer. We need to say 'Hello'.", | |
| "concept_tutorial": "Drag & Drop: You build code by snapping blocks together, just like LEGOs.", | |
| "problem": "Print 'Hello World'.", | |
| "toolbox_categories": ["Text", "Variables"], | |
| "initial_code": "", | |
| "validation_rules": { | |
| "required_concepts": ["print"], | |
| "expected_output": "Hello World", | |
| }, | |
| "hint_1": "Find the 'print' block in the Text category.", | |
| "hint_2": "Type 'Hello World' exactly β capital H and W.", | |
| }], | |
| } | |
| def _get_fallback_level(self, mode: str, difficulty: str = "Easy") -> Dict[str, Any]: | |
| if mode == "blockly": | |
| return { | |
| "story_arc_title": "Logic Repair Arc", | |
| "levels": [ | |
| { | |
| "type": "blockly", | |
| "level_id": f"fallback_00{i + 1}", | |
| "title": f"Logic Repair Part {i}", | |
| "story": "The automated systems are down. We need manual logic configuration.", | |
| "concept_tutorial": "Loops: Use loops to repeat actions without writing code over and over.", | |
| "problem": "Create a loop that counts to 3.", | |
| "toolbox_categories": ["Loops", "Math", "Variables", "Text"], | |
| "initial_code": "", | |
| "validation_rules": { | |
| "required_concepts": ["loop"], | |
| "expected_output": "1\n2\n3", | |
| }, | |
| "hint_1": "Use the 'repeat' block.", | |
| "hint_2": "Set the number to 3.", | |
| } | |
| for i in range(1, 6) | |
| ], | |
| } | |
| if mode == "time_challenge": | |
| return { | |
| "story_arc_title": "System Reboot Arc", | |
| "levels": [ | |
| { | |
| "type": "time_challenge", | |
| "level_id": f"fallback_tc_{i}", | |
| "title": f"System Reboot Part {i}", | |
| "story": "Critical Error! Debug the startup sequence immediately.", | |
| "concept_tutorial": "Syntax: Code must follow strict grammar rules.", | |
| "timer_seconds": 45, | |
| "buggy_code": "print('System Ready'\nstart_engine()", | |
| "task": "Fix the syntax error.", | |
| "solution_patch": "print('System Ready')\nstart_engine()", | |
| } | |
| for i in range(1, 6) | |
| ], | |
| } | |
| return {"story_arc_title": "Error Arc", "levels": []} | |
| if __name__ == "__main__": | |
| agent = AdventureAgent() | |
| print("Testing Adventure Generation...") | |
| print(json.dumps(agent.generate_level("blockly", "Easy"), indent=2)) | |