Spaces:
Sleeping
Sleeping
ONE-SHOT plan generation for ZeroGPU: whole wish in a single @spaces.GPU call (ends the multi-call NVML crash, far faster) + deterministic town fallback so a wish never builds nothing
6ccf62b verified | """Grammar loading and strict JSON validation for the GODSEED mind. | |
| mind/grammar.gbnf holds TWO standalone llama.cpp grammars separated by a | |
| marker line: the turn grammar and the tiny moderation-judge grammar. This | |
| module splits them once at import and exposes TURN_GRAMMAR / MODERATION_GRAMMAR. | |
| The parsers here are the same strictness for every backend: llamacpp output | |
| is grammar-constrained but still validated; zerogpu output is validated and | |
| retried; mock output is trusted but parsed identically so the pipeline never | |
| special-cases a backend. | |
| """ | |
| from __future__ import annotations | |
| import json | |
| import re | |
| from pathlib import Path | |
| from .prompts import tool_names | |
| GRAMMAR_PATH = Path(__file__).with_name("grammar.gbnf") | |
| _SPLIT_MARKER = "# ===== GODSEED MODERATION JUDGE =====" | |
| THOUGHT_MAX_CHARS = 160 | |
| EPITAPH_MAX_CHARS = 120 | |
| def _load_grammars() -> tuple[str, str]: | |
| text = GRAMMAR_PATH.read_text(encoding="utf-8") | |
| if _SPLIT_MARKER not in text: | |
| raise RuntimeError( | |
| f"grammar split marker missing from {GRAMMAR_PATH}; " | |
| f"expected a line containing {_SPLIT_MARKER!r}" | |
| ) | |
| turn, moderation = text.split(_SPLIT_MARKER, 1) | |
| return turn.strip() + "\n", moderation.strip() + "\n" | |
| TURN_GRAMMAR, MODERATION_GRAMMAR = _load_grammars() | |
| # Backends that cannot enforce a grammar (mock, zerogpu) distinguish the two | |
| # by this substring — it appears only in the moderation grammar. | |
| MODERATION_GRAMMAR_KEY = '"allowed"' | |
| def is_moderation_grammar(grammar: str | None) -> bool: | |
| return bool(grammar) and MODERATION_GRAMMAR_KEY in grammar | |
| def extract_json_object(raw: str) -> str | None: | |
| """Return the first balanced top-level {...} in `raw`, or None. | |
| Brace-scans with string/escape awareness so JSON embedded in stray prose | |
| (a misbehaving unconstrained backend) is still recoverable. | |
| """ | |
| start = raw.find("{") | |
| if start < 0: | |
| return None | |
| depth = 0 | |
| in_string = False | |
| escaped = False | |
| for i in range(start, len(raw)): | |
| ch = raw[i] | |
| if in_string: | |
| if escaped: | |
| escaped = False | |
| elif ch == "\\": | |
| escaped = True | |
| elif ch == '"': | |
| in_string = False | |
| continue | |
| if ch == '"': | |
| in_string = True | |
| elif ch == "{": | |
| depth += 1 | |
| elif ch == "}": | |
| depth -= 1 | |
| if depth == 0: | |
| return raw[start : i + 1] | |
| return None | |
| def parse_turn(raw: str) -> tuple[dict | None, str | None]: | |
| """Parse one turn reply. Returns (normalized_obj, None) or (None, error). | |
| Normalized shapes: | |
| {"thought": str, "call": {"tool": str, "args": dict}} | |
| {"thought": str, "done": True, "epitaph": str} | |
| Thought/epitaph are stripped and length-clamped. Extra keys are ignored. | |
| """ | |
| if not raw or not raw.strip(): | |
| return None, "empty output" | |
| candidate = extract_json_object(raw) | |
| if candidate is None: | |
| return None, "no JSON object found" | |
| try: | |
| obj = json.loads(candidate) | |
| except (json.JSONDecodeError, ValueError) as exc: | |
| return None, f"invalid JSON: {exc.msg if hasattr(exc, 'msg') else exc}" | |
| if not isinstance(obj, dict): | |
| return None, "top level is not a JSON object" | |
| thought = obj.get("thought") | |
| if not isinstance(thought, str): | |
| return None, 'missing string "thought"' | |
| thought = thought.strip()[:THOUGHT_MAX_CHARS] | |
| call = obj.get("call") | |
| done = obj.get("done") | |
| has_call = call is not None | |
| has_done = done is True | |
| if has_call == has_done: | |
| return None, 'need exactly one of "call" or "done": true' | |
| if has_done: | |
| epitaph = obj.get("epitaph") | |
| if not isinstance(epitaph, str): | |
| return None, 'done turn missing string "epitaph"' | |
| return { | |
| "thought": thought, | |
| "done": True, | |
| "epitaph": epitaph.strip()[:EPITAPH_MAX_CHARS], | |
| }, None | |
| if not isinstance(call, dict): | |
| return None, '"call" is not an object' | |
| tool = call.get("tool") | |
| if not isinstance(tool, str) or tool not in tool_names(): | |
| return None, f"unknown tool {tool!r}" | |
| args = call.get("args") | |
| if not isinstance(args, dict): | |
| return None, '"args" is not an object' | |
| return {"thought": thought, "call": {"tool": tool, "args": args}}, None | |
| def parse_plan(raw: str) -> tuple[str, list[dict], str]: | |
| """Parse a ONE-SHOT plan: {"reading": str, "plan": [{tool,args}...], "epitaph": str}. | |
| Lenient by design — the live ZeroGPU backend has no grammar, so the model's | |
| JSON is best-effort. Returns (reading, calls, epitaph); calls is a list of | |
| {"tool": str, "args": dict} (tool not validated here — the engine forgives / | |
| maps it). Salvages individual {"tool":...} objects when the outer JSON is | |
| malformed, so a messy completion still yields a town rather than nothing. | |
| """ | |
| text = str(raw or "") | |
| reading, epitaph = "", "" | |
| calls: list[dict] = [] | |
| candidate = extract_json_object(text) | |
| if candidate is not None: | |
| try: | |
| obj = json.loads(candidate) | |
| except (json.JSONDecodeError, ValueError): | |
| obj = None | |
| if isinstance(obj, dict): | |
| r = obj.get("reading") | |
| if isinstance(r, str): | |
| reading = r.strip() | |
| e = obj.get("epitaph") | |
| if isinstance(e, str): | |
| epitaph = e.strip()[:EPITAPH_MAX_CHARS] | |
| plan = obj.get("plan") | |
| if isinstance(plan, list): | |
| for item in plan: | |
| if isinstance(item, dict) and isinstance(item.get("tool"), str) \ | |
| and isinstance(item.get("args"), dict): | |
| calls.append({"tool": item["tool"], "args": item["args"]}) | |
| # Salvage: if the outer object gave no calls, scrape every {"tool":...,"args":{...}} | |
| if not calls: | |
| for m in re.finditer(r'\{[^{}]*"tool"\s*:\s*"[^"]+"[^{}]*"args"\s*:\s*\{', text): | |
| frag = extract_json_object(text[m.start():]) | |
| if not frag: | |
| continue | |
| try: | |
| item = json.loads(frag) | |
| except (json.JSONDecodeError, ValueError): | |
| continue | |
| if isinstance(item, dict) and isinstance(item.get("tool"), str) \ | |
| and isinstance(item.get("args"), dict): | |
| calls.append({"tool": item["tool"], "args": item["args"]}) | |
| return reading, calls[:8], epitaph | |
| def parse_moderation(raw: str) -> tuple[dict | None, str | None]: | |
| """Parse a moderation verdict. Returns (obj, None) or (None, error). | |
| `allowed` must be a real JSON boolean — anything fuzzier than that is a | |
| parse error, and parse errors mean DENY upstream (default-deny). | |
| """ | |
| if not raw or not raw.strip(): | |
| return None, "empty output" | |
| candidate = extract_json_object(raw) | |
| if candidate is None: | |
| return None, "no JSON object found" | |
| try: | |
| obj = json.loads(candidate) | |
| except (json.JSONDecodeError, ValueError) as exc: | |
| return None, f"invalid JSON: {exc.msg if hasattr(exc, 'msg') else exc}" | |
| if not isinstance(obj, dict): | |
| return None, "top level is not a JSON object" | |
| allowed = obj.get("allowed") | |
| if not isinstance(allowed, bool): | |
| return None, '"allowed" is not a boolean' | |
| category = obj.get("category") | |
| if category is not None and not isinstance(category, str): | |
| return None, '"category" is not a string' | |
| return {"allowed": allowed, "category": (category or "").strip()[:40]}, None | |