Spaces:
Sleeping
Sleeping
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Maria Learning Service | app.py | |
| # FastAPI + ZeroGPU (Qwen3.5-2B int4) + FAISS RAG + gTTS | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| import os | |
| import gc | |
| import json | |
| import base64 | |
| import hashlib | |
| import logging | |
| import copy | |
| from io import BytesIO | |
| from typing import List, Any | |
| import httpx | |
| import numpy as np | |
| import pandas as pd | |
| import faiss | |
| import gradio as gr | |
| from fastapi import FastAPI, HTTPException, Request | |
| from fastapi.responses import JSONResponse | |
| from pydantic import BaseModel | |
| from huggingface_hub import hf_hub_download | |
| from gtts import gTTS | |
| # ββ ZeroGPU: import spaces only when running inside HF Spaces βββββββββββββββββ | |
| try: | |
| import spaces as _spaces | |
| _ZEROGPU = True | |
| except ImportError: | |
| # Running locally β provide a no-op decorator so the rest of the code | |
| # works unchanged without modifying anything. | |
| import types | |
| class _spaces: # noqa: N801 | |
| def GPU(fn): | |
| return fn | |
| _ZEROGPU = False | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format="%(asctime)s %(levelname)-8s %(message)s", | |
| ) | |
| log = logging.getLogger(__name__) | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Config / Secrets | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| HASH_VALUE = os.environ.get("HASH_VALUE", "") | |
| CF_SECRET_KEY = os.environ.get("CF_SECRET_KEY", "") | |
| HF_REPO_ID = "digifreely/Maria" | |
| LLM_MODEL_ID = "Qwen/Qwen3.5-2B" | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Embedding model (CPU, loaded once per container lifetime) | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| _emb_model = None | |
| def _get_emb_model(name: str = "sentence-transformers/all-MiniLM-L6-v2"): | |
| global _emb_model | |
| if _emb_model is None: | |
| from sentence_transformers import SentenceTransformer | |
| log.info("Loading embedding model: %s", name) | |
| _emb_model = SentenceTransformer(name) | |
| return _emb_model | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Security helpers | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _check_auth_code(code: str) -> bool: | |
| if not HASH_VALUE: | |
| return False | |
| return hashlib.sha256(code.encode()).hexdigest() == HASH_VALUE | |
| async def _check_turnstile(token: str) -> bool: | |
| if not CF_SECRET_KEY: | |
| return False | |
| try: | |
| async with httpx.AsyncClient(timeout=8.0) as client: | |
| resp = await client.post( | |
| "https://challenges.cloudflare.com/turnstile/v0/siteverify", | |
| data={"secret": CF_SECRET_KEY, "response": token}, | |
| ) | |
| return resp.json().get("success", False) | |
| except Exception as exc: | |
| log.error("Turnstile verification error: %s", exc) | |
| return False | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Dataset loading (called per request β no pre-loading) | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _load_dataset(board: str, cls: str, subject: str): | |
| """Download config / FAISS index / metadata from HF Hub and return them.""" | |
| prefix = f"knowledgebase/{board}/{cls}/{subject}" | |
| log.info("Fetching dataset: %s", prefix) | |
| config_path = hf_hub_download( | |
| repo_id=HF_REPO_ID, | |
| filename=f"{prefix}/config.json", | |
| repo_type="dataset", | |
| ) | |
| faiss_path = hf_hub_download( | |
| repo_id=HF_REPO_ID, | |
| filename=f"{prefix}/faiss_index.bin", | |
| repo_type="dataset", | |
| ) | |
| meta_path = hf_hub_download( | |
| repo_id=HF_REPO_ID, | |
| filename=f"{prefix}/metadata.parquet", | |
| repo_type="dataset", | |
| ) | |
| with open(config_path) as fh: | |
| config = json.load(fh) | |
| index = faiss.read_index(faiss_path) | |
| metadata = pd.read_parquet(meta_path) | |
| return config, index, metadata | |
| def _rag_search( | |
| query: str, | |
| config: dict, | |
| index, | |
| metadata: pd.DataFrame, | |
| k: int = 3, | |
| ) -> List[str]: | |
| """Embed query, search FAISS, return top-k text chunks.""" | |
| emb_model_name = config.get( | |
| "embedding_model", "sentence-transformers/all-MiniLM-L6-v2" | |
| ) | |
| emb = _get_emb_model(emb_model_name) | |
| vec = emb.encode([query], normalize_embeddings=True).astype(np.float32) | |
| _, idxs = index.search(vec, k) | |
| # Try common column names used when building the index | |
| text_cols = ["text", "content", "chunk", "passage", "answer", "description"] | |
| chunks: List[str] = [] | |
| for i in idxs[0]: | |
| if 0 <= i < len(metadata): | |
| row = metadata.iloc[i] | |
| for col in text_cols: | |
| if col in metadata.columns and pd.notna(row[col]): | |
| chunks.append(str(row[col])[:800]) | |
| break | |
| return chunks | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # LLM inference β decorated with @spaces.GPU so GPU is only held during call | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _model_generate(system_prompt: str, user_prompt: str) -> str: | |
| """ | |
| Loads Qwen3.5-2B (NF4 4-bit), runs generation, unloads model, returns text. | |
| Kept as a plain function so the spaces.GPU decorator can wrap it cleanly. | |
| """ | |
| import torch | |
| from transformers import AutoTokenizer, AutoModelForCausalLM, BitsAndBytesConfig | |
| log.info("Loading %s (4-bit NF4)β¦", LLM_MODEL_ID) | |
| quant = BitsAndBytesConfig( | |
| load_in_4bit=True, | |
| bnb_4bit_compute_dtype=torch.float16, | |
| bnb_4bit_use_double_quant=True, | |
| bnb_4bit_quant_type="nf4", | |
| ) | |
| tok = AutoTokenizer.from_pretrained(LLM_MODEL_ID, trust_remote_code=True) | |
| model = AutoModelForCausalLM.from_pretrained( | |
| LLM_MODEL_ID, | |
| quantization_config=quant, | |
| device_map="auto", | |
| trust_remote_code=True, | |
| ) | |
| model.eval() | |
| messages = [ | |
| {"role": "system", "content": system_prompt}, | |
| {"role": "user", "content": user_prompt}, | |
| ] | |
| # Qwen3.5-2B is non-thinking by default; enable_thinking=False is explicit. | |
| text = tok.apply_chat_template( | |
| messages, | |
| tokenize=False, | |
| add_generation_prompt=True, | |
| enable_thinking=False, | |
| ) | |
| inputs = tok([text], return_tensors="pt").to(model.device) | |
| with torch.no_grad(): | |
| out_ids = model.generate( | |
| **inputs, | |
| max_new_tokens=600, | |
| temperature=0.7, | |
| top_p=0.9, | |
| do_sample=True, | |
| repetition_penalty=1.1, | |
| pad_token_id=tok.eos_token_id, | |
| ) | |
| new_tokens = out_ids[0][inputs.input_ids.shape[1]:] | |
| result = tok.decode(new_tokens, skip_special_tokens=True).strip() | |
| # Release GPU memory before returning | |
| del model, tok | |
| gc.collect() | |
| torch.cuda.empty_cache() | |
| log.info("Inference complete. Output length: %d chars", len(result)) | |
| return result | |
| # Apply ZeroGPU decorator | |
| run_inference = _spaces.GPU(_model_generate) | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Text-to-Speech | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _tts_to_b64(text: str) -> str: | |
| try: | |
| tts = gTTS(text=text[:3000], lang="en", tld="co.uk", slow=False) | |
| buf = BytesIO() | |
| tts.write_to_fp(buf) | |
| buf.seek(0) | |
| return base64.b64encode(buf.read()).decode("utf-8") | |
| except Exception as exc: | |
| log.error("TTS error: %s", exc) | |
| return "" | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Prompt builder | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _build_system_prompt(lp: dict, rag_chunks: List[str]) -> str: | |
| persona = lp.get("teacher_persona", "A friendly and patient teacher") | |
| student = lp.get("student_name", "Student") | |
| chat_history = lp.get("chat_history", [])[-6:] # last 6 turns | |
| scratchpad = lp.get("scratchpad", [])[-3:] # last 3 entries | |
| current_learning = lp.get("assessment_stages", {}).get("current_learning", []) | |
| history_block = "\n".join( | |
| f'Student: {h.get("user_input","")}\nTeacher: {h.get("system_output","")}' | |
| for h in chat_history | |
| ) or "No conversation history yet." | |
| scratch_block = "\n".join( | |
| f'[id={s.get("chat_id","")}] Thought: {s.get("thought","")} | ' | |
| f'Action: {s.get("action","")} | Obs: {s.get("observation","")}' | |
| for s in scratchpad | |
| ) or "Empty." | |
| rag_block = "\n---\n".join(rag_chunks) if rag_chunks else "No relevant content found in the knowledge base." | |
| cl_block = json.dumps(current_learning, indent=2) if current_learning else "[]" | |
| return f"""You are {persona}. You are teaching {student}, a child aged 6 to 12 years old. | |
| Always use simple and clear English. Do not use emojis. Be warm, patient, and encouraging. | |
| STUDENT NAME: {student} | |
| CURRENT LEARNING OBJECTIVES: | |
| {cl_block} | |
| KNOWLEDGE BASE (use this to teach or answer questions): | |
| {rag_block} | |
| RECENT CONVERSATION: | |
| {history_block} | |
| INTERNAL NOTES (scratchpad): | |
| {scratch_block} | |
| YOUR TASK: | |
| Step 1 β Decide the intent of the student message: block, questions, curriculum, or chitchat. | |
| Step 2 β Respond to the student following the rules for that intent. | |
| Step 3 β Return ONLY a valid JSON object. Nothing before or after the JSON. | |
| INTENT RULES: | |
| "block" | |
| The student said something rude, disrespectful, or inappropriate for a child aged 6 to 12. | |
| Check the recent conversation to decide if this is a repeated pattern. | |
| First occurrence: politely discourage and redirect to current learning. | |
| Repeated pattern: gently but firmly end the conversation. | |
| Never use harsh or unkind language. | |
| "questions" | |
| The student asked a general question that is not about the current learning topic. | |
| Search the knowledge base for an answer. | |
| If found: answer briefly in simple language, then redirect to current learning. | |
| If not found: say you do not know and redirect to current learning. | |
| "curriculum" | |
| The student is engaging with the current learning topic. | |
| For each goal in current_learning, follow these stages IN ORDER: | |
| 1. teach β Explain the goal using the knowledge base. Mark teach=complete. | |
| 2. re_teach β Ask one question to check understanding. | |
| If the answer is wrong, re-explain clearly. Mark re_teach=complete. | |
| 3. show_and_tell β Ask a similar but different question. Mark show_and_tell=complete. | |
| 4. assess β Decide pass or fail. | |
| Pass: mark assess=complete and congratulate. | |
| Fail: explain the mistake kindly and set assess=Not_Complete so it retries next turn. | |
| Only advance to the next stage when the current one is complete. | |
| "chitchat" | |
| Casual conversation such as greetings, sharing something personal, or general chat. | |
| Respond warmly and naturally, then gently bring up the current learning topic. | |
| RESPONSE FORMAT β return ONLY this JSON object, nothing else: | |
| {{ | |
| "intent": "<block|questions|curriculum|chitchat>", | |
| "response": "<your response to the student in plain English>", | |
| "stage_updates": [ | |
| {{ | |
| "topic": "<exact topic string from current_learning>", | |
| "goal": "<exact goal string from learning_objectives>", | |
| "teach": "<complete|Not_Complete>", | |
| "re_teach": "<complete|Not_Complete>", | |
| "show_and_tell": "<complete|Not_Complete>", | |
| "assess": "<complete|Not_Complete>" | |
| }} | |
| ], | |
| "thought": "<your internal reasoning>", | |
| "action": "<teach|re_teach|show_and_tell|assess|answer|redirect|discourage|end|chitchat>", | |
| "observation": "<what you observed about the student>" | |
| }}""" | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # JSON parser (robust β handles markdown fences, partial JSON, etc.) | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _parse_llm_output(raw: str) -> dict: | |
| text = raw.strip() | |
| # Strip markdown code fences if present | |
| if "```" in text: | |
| for part in text.split("```"): | |
| part = part.strip() | |
| if part.startswith("json"): | |
| part = part[4:].strip() | |
| try: | |
| return json.loads(part) | |
| except json.JSONDecodeError: | |
| continue | |
| # Direct parse | |
| try: | |
| return json.loads(text) | |
| except json.JSONDecodeError: | |
| pass | |
| # Locate first { ... } block | |
| start = text.find("{") | |
| end = text.rfind("}") + 1 | |
| if start != -1 and end > start: | |
| try: | |
| return json.loads(text[start:end]) | |
| except json.JSONDecodeError: | |
| pass | |
| log.warning("Could not parse JSON from model output. Using raw text as response.") | |
| return { | |
| "intent": "questions", | |
| "response": raw, | |
| "stage_updates": [], | |
| "thought": "", | |
| "action": "answer", | |
| "observation": "json_parse_failed", | |
| } | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # State updater | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _apply_state_updates( | |
| lp: dict, | |
| parsed: dict, | |
| user_msg: str, | |
| ai_msg: str, | |
| ) -> dict: | |
| lp = copy.deepcopy(lp) | |
| # Chat history β append new turn | |
| history = lp.setdefault("chat_history", []) | |
| new_id = (history[-1]["chat_id"] + 1) if history else 1 | |
| history.append({ | |
| "chat_id": new_id, | |
| "user_input": user_msg, | |
| "system_output": ai_msg, | |
| }) | |
| # Scratchpad β append new entry | |
| scratch = lp.setdefault("scratchpad", []) | |
| scratch.append({ | |
| "chat_id": new_id, | |
| "thought": parsed.get("thought", ""), | |
| "action": parsed.get("action", ""), | |
| "action_input": user_msg, | |
| "observation": parsed.get("observation", ""), | |
| }) | |
| # Assessment stages β apply stage_updates from model | |
| current_learning = lp.get("assessment_stages", {}).get("current_learning", []) | |
| valid_statuses = {"complete", "Not_Complete"} | |
| for upd in parsed.get("stage_updates", []): | |
| for item in current_learning: | |
| if item.get("topic") == upd.get("topic"): | |
| for obj in item.get("learning_objectives", []): | |
| if obj.get("goal") == upd.get("goal"): | |
| for stage in ("teach", "re_teach", "show_and_tell", "assess"): | |
| val = upd.get(stage) | |
| if val in valid_statuses: | |
| obj[stage] = val | |
| lp.setdefault("assessment_stages", {})["current_learning"] = current_learning | |
| return lp | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # FastAPI application | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| _fastapi = FastAPI( | |
| title="Maria Learning Service", | |
| description="AI tutoring API powered by Qwen3.5-2B with ZeroGPU.", | |
| version="1.0.0", | |
| docs_url="/docs", | |
| redoc_url="/redoc", | |
| ) | |
| class ChatRequest(BaseModel): | |
| learning_path: dict[str, Any] | |
| query: dict[str, Any] | |
| async def health(): | |
| return {"status": "ok", "model": LLM_MODEL_ID, "zerogpu": _ZEROGPU} | |
| async def chat(request: Request, body: ChatRequest): | |
| # ββ 1. Authentication βββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| auth_code = request.headers.get("auth_code") | |
| cf_token = request.headers.get("cf-turnstile-token") | |
| authenticated = False | |
| if auth_code: | |
| authenticated = _check_auth_code(auth_code) | |
| elif cf_token: | |
| authenticated = await _check_turnstile(cf_token) | |
| if not authenticated: | |
| raise HTTPException(status_code=403, detail="Forbidden") | |
| # ββ 2. Validate request body ββββββββββββββββββββββββββββββββββββββββββββ | |
| lp = body.learning_path | |
| msg = body.query.get("request_message", "").strip() | |
| if not msg: | |
| raise HTTPException(status_code=422, detail="request_message must not be empty") | |
| board = lp.get("board", "").strip() | |
| cls = lp.get("class", "").strip() | |
| subject = lp.get("subject", "").strip() | |
| if not all([board, cls, subject]): | |
| raise HTTPException( | |
| status_code=422, | |
| detail="learning_path must contain board, class, and subject", | |
| ) | |
| # ββ 3. Load dataset files from HF Hub βββββββββββββββββββββββββββββββββββ | |
| try: | |
| config, faiss_index, metadata = _load_dataset(board, cls, subject) | |
| except Exception as exc: | |
| log.error("Dataset load error: %s", exc) | |
| raise HTTPException( | |
| status_code=500, | |
| detail=f"Could not load dataset for {board}/{cls}/{subject}: {exc}", | |
| ) | |
| # ββ 4. RAG retrieval ββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| try: | |
| rag_chunks = _rag_search(msg, config, faiss_index, metadata) | |
| except Exception as exc: | |
| log.warning("RAG search failed (%s) β continuing without context", exc) | |
| rag_chunks = [] | |
| # ββ 5. Build prompt and run LLM βββββββββββββββββββββββββββββββββββββββββ | |
| system_prompt = _build_system_prompt(lp, rag_chunks) | |
| user_prompt = f"Student message: {msg}" | |
| try: | |
| raw_output = run_inference(system_prompt, user_prompt) | |
| except Exception as exc: | |
| log.error("Inference error: %s", exc) | |
| raise HTTPException(status_code=500, detail=f"Inference failed: {exc}") | |
| # ββ 6. Parse structured output ββββββββββββββββββββββββββββββββββββββββββ | |
| parsed = _parse_llm_output(raw_output) | |
| ai_text = parsed.get("response", raw_output).strip() | |
| # ββ 7. Text-to-speech βββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| audio_b64 = _tts_to_b64(ai_text) | |
| # ββ 8. Update learning path state βββββββββββββββββββββββββββββββββββββββ | |
| updated_lp = _apply_state_updates(lp, parsed, msg, ai_text) | |
| # ββ 9. Return response ββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| return JSONResponse({ | |
| "learning_path": updated_lp, | |
| "query": { | |
| "response_message": { | |
| "text": ai_text, | |
| "visual": "No", | |
| "visual_content": "", | |
| "audio_output": audio_b64, | |
| } | |
| }, | |
| }) | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Gradio shim | |
| # Required so the HF Spaces Gradio SDK runner detects a live Gradio app and | |
| # ZeroGPU's @spaces.GPU decorator registers correctly. | |
| # All actual functionality is in the FastAPI routes above. | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| with gr.Blocks(title="Maria Learning Service") as _gradio_ui: | |
| gr.Markdown( | |
| """ | |
| ## Maria Learning Service | |
| This Space exposes a **REST API** β it is not a chat UI. | |
| | Endpoint | Method | Description | | |
| |---|---|---| | |
| | `/chat` | POST | Main tutoring endpoint | | |
| | `/health` | GET | Health check | | |
| | `/docs` | GET | Swagger UI | | |
| Authenticate via `auth_code` header or `cf-turnstile-token` header. | |
| """ | |
| ) | |
| # Mount Gradio UI at /ui β keeps FastAPI routes at root level | |
| app = gr.mount_gradio_app(_fastapi, _gradio_ui, path="/ui") | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Entry point | |
| # HF Spaces runs `python app.py` which triggers this block. | |
| # uvicorn starts on 0.0.0.0:7860 (the port HF Spaces expects). | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| if __name__ == "__main__": | |
| import uvicorn | |
| uvicorn.run( | |
| "app:app", | |
| host="0.0.0.0", | |
| port=7860, | |
| log_level="info", | |
| workers=1, # Single worker β ZeroGPU requires this | |
| ) | |