"""Grammar loading and strict JSON validation for the GODSEED mind. mind/grammar.gbnf holds TWO standalone llama.cpp grammars separated by a marker line: the turn grammar and the tiny moderation-judge grammar. This module splits them once at import and exposes TURN_GRAMMAR / MODERATION_GRAMMAR. The parsers here are the same strictness for every backend: llamacpp output is grammar-constrained but still validated; zerogpu output is validated and retried; mock output is trusted but parsed identically so the pipeline never special-cases a backend. """ from __future__ import annotations import json import re from pathlib import Path from .prompts import tool_names GRAMMAR_PATH = Path(__file__).with_name("grammar.gbnf") _SPLIT_MARKER = "# ===== GODSEED MODERATION JUDGE =====" THOUGHT_MAX_CHARS = 160 EPITAPH_MAX_CHARS = 120 def _load_grammars() -> tuple[str, str]: text = GRAMMAR_PATH.read_text(encoding="utf-8") if _SPLIT_MARKER not in text: raise RuntimeError( f"grammar split marker missing from {GRAMMAR_PATH}; " f"expected a line containing {_SPLIT_MARKER!r}" ) turn, moderation = text.split(_SPLIT_MARKER, 1) return turn.strip() + "\n", moderation.strip() + "\n" TURN_GRAMMAR, MODERATION_GRAMMAR = _load_grammars() # Backends that cannot enforce a grammar (mock, zerogpu) distinguish the two # by this substring — it appears only in the moderation grammar. MODERATION_GRAMMAR_KEY = '"allowed"' def is_moderation_grammar(grammar: str | None) -> bool: return bool(grammar) and MODERATION_GRAMMAR_KEY in grammar def extract_json_object(raw: str) -> str | None: """Return the first balanced top-level {...} in `raw`, or None. Brace-scans with string/escape awareness so JSON embedded in stray prose (a misbehaving unconstrained backend) is still recoverable. """ start = raw.find("{") if start < 0: return None depth = 0 in_string = False escaped = False for i in range(start, len(raw)): ch = raw[i] if in_string: if escaped: escaped = False elif ch == "\\": escaped = True elif ch == '"': in_string = False continue if ch == '"': in_string = True elif ch == "{": depth += 1 elif ch == "}": depth -= 1 if depth == 0: return raw[start : i + 1] return None def parse_turn(raw: str) -> tuple[dict | None, str | None]: """Parse one turn reply. Returns (normalized_obj, None) or (None, error). Normalized shapes: {"thought": str, "call": {"tool": str, "args": dict}} {"thought": str, "done": True, "epitaph": str} Thought/epitaph are stripped and length-clamped. Extra keys are ignored. """ if not raw or not raw.strip(): return None, "empty output" candidate = extract_json_object(raw) if candidate is None: return None, "no JSON object found" try: obj = json.loads(candidate) except (json.JSONDecodeError, ValueError) as exc: return None, f"invalid JSON: {exc.msg if hasattr(exc, 'msg') else exc}" if not isinstance(obj, dict): return None, "top level is not a JSON object" thought = obj.get("thought") if not isinstance(thought, str): return None, 'missing string "thought"' thought = thought.strip()[:THOUGHT_MAX_CHARS] call = obj.get("call") done = obj.get("done") has_call = call is not None has_done = done is True if has_call == has_done: return None, 'need exactly one of "call" or "done": true' if has_done: epitaph = obj.get("epitaph") if not isinstance(epitaph, str): return None, 'done turn missing string "epitaph"' return { "thought": thought, "done": True, "epitaph": epitaph.strip()[:EPITAPH_MAX_CHARS], }, None if not isinstance(call, dict): return None, '"call" is not an object' tool = call.get("tool") if not isinstance(tool, str) or tool not in tool_names(): return None, f"unknown tool {tool!r}" args = call.get("args") if not isinstance(args, dict): return None, '"args" is not an object' return {"thought": thought, "call": {"tool": tool, "args": args}}, None def parse_plan(raw: str) -> tuple[str, list[dict], str]: """Parse a ONE-SHOT plan: {"reading": str, "plan": [{tool,args}...], "epitaph": str}. Lenient by design — the live ZeroGPU backend has no grammar, so the model's JSON is best-effort. Returns (reading, calls, epitaph); calls is a list of {"tool": str, "args": dict} (tool not validated here — the engine forgives / maps it). Salvages individual {"tool":...} objects when the outer JSON is malformed, so a messy completion still yields a town rather than nothing. """ text = str(raw or "") reading, epitaph = "", "" calls: list[dict] = [] candidate = extract_json_object(text) if candidate is not None: try: obj = json.loads(candidate) except (json.JSONDecodeError, ValueError): obj = None if isinstance(obj, dict): r = obj.get("reading") if isinstance(r, str): reading = r.strip() e = obj.get("epitaph") if isinstance(e, str): epitaph = e.strip()[:EPITAPH_MAX_CHARS] plan = obj.get("plan") if isinstance(plan, list): for item in plan: if isinstance(item, dict) and isinstance(item.get("tool"), str) \ and isinstance(item.get("args"), dict): calls.append({"tool": item["tool"], "args": item["args"]}) # Salvage: if the outer object gave no calls, scrape every {"tool":...,"args":{...}} if not calls: for m in re.finditer(r'\{[^{}]*"tool"\s*:\s*"[^"]+"[^{}]*"args"\s*:\s*\{', text): frag = extract_json_object(text[m.start():]) if not frag: continue try: item = json.loads(frag) except (json.JSONDecodeError, ValueError): continue if isinstance(item, dict) and isinstance(item.get("tool"), str) \ and isinstance(item.get("args"), dict): calls.append({"tool": item["tool"], "args": item["args"]}) return reading, calls[:8], epitaph def parse_moderation(raw: str) -> tuple[dict | None, str | None]: """Parse a moderation verdict. Returns (obj, None) or (None, error). `allowed` must be a real JSON boolean — anything fuzzier than that is a parse error, and parse errors mean DENY upstream (default-deny). """ if not raw or not raw.strip(): return None, "empty output" candidate = extract_json_object(raw) if candidate is None: return None, "no JSON object found" try: obj = json.loads(candidate) except (json.JSONDecodeError, ValueError) as exc: return None, f"invalid JSON: {exc.msg if hasattr(exc, 'msg') else exc}" if not isinstance(obj, dict): return None, "top level is not a JSON object" allowed = obj.get("allowed") if not isinstance(allowed, bool): return None, '"allowed" is not a boolean' category = obj.get("category") if category is not None and not isinstance(category, str): return None, '"category" is not a string' return {"allowed": allowed, "category": (category or "").strip()[:40]}, None