"""PrologEngine — Singleton wrapper around pyswip for X-bar syntactic analysis. Thread safety: asyncio.Lock around all Prolog queries (pyswip is not thread-safe). Loads all Prolog modules at init time. Uses serialize.pl for clean data marshalling (pyswip returns compound terms as opaque strings; serialize.pl converts them to nested lists of atoms that pyswip marshals cleanly to Python). """ from __future__ import annotations import asyncio import logging from pathlib import Path from typing import Any from pyswip import Prolog logger = logging.getLogger("grammar_engine") _PROLOG_DIR = Path(__file__).resolve().parent.parent / "prolog" _MODULES = [ "lexicon", "agreement", "subcategorization", "binding", "movement", "isl_grammar", "contrastive", "xbar", "tree_validation", "compositional", "chomsky_hierarchy", "serialize", ] def _flatten_cons_cells(node: Any) -> list | Any: """Recursively flatten pyswip cons cell representation. pyswip marshals Prolog lists as nested ['[|]', head, tail] structures. This converts them back to flat Python lists. """ if isinstance(node, list): if len(node) == 3 and node[0] == '[|]': head = _flatten_cons_cells(node[1]) tail = _flatten_cons_cells(node[2]) if isinstance(tail, list): return [head] + tail return [head] return [_flatten_cons_cells(item) for item in node] return node def _extract_features(node: Any) -> dict[str, str] | None: """Extract feature dict from a Prolog feature list. After flattening, feature lists look like: [['=', 'person', 1], ['=', 'number', 'sg'], ...] """ if not isinstance(node, list): return None features: dict[str, str] = {} for item in node: if isinstance(item, list) and len(item) == 3 and item[0] == '=': features[str(item[1])] = str(item[2]) if features: return features return None def _nested_list_to_tree(node: Any) -> dict[str, Any] | None: """Convert serialize.pl nested list tree to dict. Format from Prolog: [Label, Child1, Child2, ...] Handles cons cell representation from pyswip. """ node = _flatten_cons_cells(node) if node is None: return None if isinstance(node, str): return {"label": node} if isinstance(node, (int, float)): return {"label": str(node)} if isinstance(node, list) and len(node) >= 1: label = str(node[0]) children: list[dict[str, Any]] = [] features: dict[str, str] | None = None for arg in node[1:]: if isinstance(arg, list): # Check if it's a feature list [['=', k, v], ...] feat = _extract_features(arg) if feat: features = feat else: child = _nested_list_to_tree(arg) if child: children.append(child) elif isinstance(arg, str): children.append({"label": arg}) elif isinstance(arg, (int, float)): children.append({"label": str(arg)}) result: dict[str, Any] = {"label": label} if children: result["children"] = children if features: result["features"] = features return result return None class PrologEngine: """Singleton wrapper around SWI-Prolog via pyswip.""" _instance: PrologEngine | None = None _lock: asyncio.Lock def __new__(cls) -> PrologEngine: if cls._instance is None: cls._instance = super().__new__(cls) cls._instance._initialized = False return cls._instance def __init__(self) -> None: if self._initialized: return self._prolog = Prolog() self._lock = asyncio.Lock() self._load_modules() self._initialized = True logger.info("PrologEngine initialized with %d modules", len(_MODULES)) def _load_modules(self) -> None: for mod in _MODULES: path = _PROLOG_DIR / f"{mod}.pl" if not path.exists(): raise FileNotFoundError(f"Prolog module not found: {path}") self._prolog.consult(str(path)) logger.debug("Loaded Prolog module: %s", mod) # ------------------------------------------------------------------ # Public async methods # ------------------------------------------------------------------ async def validate(self, gesture_ids: list[str]) -> dict[str, Any]: async with self._lock: return self._validate_sync(gesture_ids) async def predict_next(self, gesture_ids: list[str]) -> list[dict[str, Any]]: async with self._lock: return self._predict_next_sync(gesture_ids) async def detect_interference(self, gesture_ids: list[str]) -> list[dict[str, Any]]: async with self._lock: return self._detect_interference_sync(gesture_ids) async def transform_isl_to_english(self, gesture_ids: list[str]) -> dict[str, Any]: async with self._lock: return self._transform_sync(gesture_ids) async def get_parse_tree(self, gesture_ids: list[str]) -> dict[str, Any] | None: async with self._lock: return self._get_parse_tree_sync(gesture_ids) async def compose_semantics(self, gesture_ids: list[str]) -> dict[str, Any]: async with self._lock: return self._compose_semantics_sync(gesture_ids) async def get_grammar_capabilities(self) -> dict[str, Any]: async with self._lock: return self._get_grammar_capabilities_sync() # ------------------------------------------------------------------ # Synchronous implementations (called under lock) # ------------------------------------------------------------------ def _validate_sync(self, gesture_ids: list[str]) -> dict[str, Any]: result: dict[str, Any] = { "grammatical": False, "parse_tree": None, "agreement": None, "theta": None, "binding_violations": [], "tense_resolution": "present", "grammaticality_score": 0.0, } # 1. Parse tree parse_tree = self._get_parse_tree_sync(gesture_ids) if parse_tree: result["parse_tree"] = parse_tree result["grammatical"] = True score = 1.0 else: score = 0.3 # 2. Agreement result["agreement"] = self._check_agreement_sync(gesture_ids) if result["agreement"] and not result["agreement"].get("agrees", True): score -= 0.2 # 3. Theta criterion result["theta"] = self._check_theta_sync(gesture_ids) if result["theta"] and not result["theta"].get("satisfied", True): score -= 0.2 result["grammatical"] = False # 4. Binding violations result["binding_violations"] = self._check_binding_sync(gesture_ids) if result["binding_violations"]: score -= 0.1 * len(result["binding_violations"]) result["grammatical"] = False # 5. Tense result["tense_resolution"] = self._resolve_tense(gesture_ids) result["grammaticality_score"] = max(0.0, min(1.0, score)) # 6. Compositional semantics (Frege's Principle) result["semantics"] = self._compose_semantics_sync(gesture_ids) return result def _predict_next_sync(self, gesture_ids: list[str]) -> list[dict[str, Any]]: valid_next: list[dict[str, Any]] = [] categories = self._categorize_ids(gesture_ids) if not gesture_ids: expected_cat = "d" elif all(c == "subj" for c in categories): expected_cat = "v" elif "verb" in categories and "obj" not in categories: expected_cat = "n" else: return [] if expected_cat == "d": results = list(self._prolog.query( "lexicon:lex(ID, Form, d, Feats), member(case=nom, Feats)" )) # Deduplicate — pyswip may return multiple bindings seen = set() for r in results: gid = str(r.get("ID", "")) if gid not in seen: seen.add(gid) valid_next.append(self._build_lex_entry(gid, "d")) elif expected_cat == "v": results = list(self._prolog.query("lexicon:lex(ID, Form, v, _)")) seen = set() for r in results: gid = str(r.get("ID", "")) if gid not in seen: seen.add(gid) valid_next.append(self._build_lex_entry(gid, "v")) elif expected_cat == "n": results = list(self._prolog.query("lexicon:lex(ID, Form, n, _)")) verb_id = self._find_verb(gesture_ids) seen = set() for r in results: gid = str(r.get("ID", "")) if gid not in seen: seen.add(gid) entry = self._build_lex_entry(gid, "n") if verb_id: role_results = list(self._prolog.query( f"subcategorization:role_assignment({verb_id}, 2, Role)" )) if role_results: entry["theta_role"] = str(role_results[0].get("Role", "")) valid_next.append(entry) return valid_next def _detect_interference_sync(self, gesture_ids: list[str]) -> list[dict[str, Any]]: ids = self._to_prolog_list(gesture_ids) query = f"serialize:serialize_interference({ids}, Patterns)" results = list(self._prolog.query(query)) interferences: list[dict[str, Any]] = [] if results: patterns = results[0].get("Patterns", []) for p in patterns: if isinstance(p, list) and len(p) >= 3: interferences.append({ "type": str(p[0]), "severity": str(p[1]), "description": str(p[2]), }) return interferences def _transform_sync(self, gesture_ids: list[str]) -> dict[str, Any]: ids = self._to_prolog_list(gesture_ids) query = f"serialize:serialize_transform({ids}, Eng, TType, Ops)" results = list(self._prolog.query(query)) if results: r = results[0] eng_order = r.get("Eng", gesture_ids) if isinstance(eng_order, list): eng_list = [str(x) for x in eng_order] else: eng_list = gesture_ids transform_type = str(r.get("TType", "unknown")) raw_ops = r.get("Ops", []) operations: list[dict[str, str]] = [] if isinstance(raw_ops, list): for op in raw_ops: if isinstance(op, list) and len(op) >= 2: operations.append({ "operation": str(op[0]), "description": str(op[1]), }) return { "isl_order": gesture_ids, "english_order": eng_list, "transform": transform_type, "movement_traces": operations, } return { "isl_order": gesture_ids, "english_order": gesture_ids, "transform": "none", "movement_traces": [], } def _get_parse_tree_sync(self, gesture_ids: list[str]) -> dict[str, Any] | None: ids = self._to_prolog_list(gesture_ids) query = f"serialize:serialize_tree({ids}, TreeList)" results = list(self._prolog.query(query)) if results: tree_list = results[0].get("TreeList") if tree_list: tree = _nested_list_to_tree(tree_list) # Validate tree well-formedness (Partee et al., Ch 16) tree_valid = self._validate_tree_sync(gesture_ids) if tree and tree_valid is not None: tree["_well_formed"] = tree_valid return tree return None def _validate_tree_sync(self, gesture_ids: list[str]) -> dict[str, Any] | None: """Validate parse tree well-formedness using formal conditions. Uses the raw Prolog parse tree (not serialized) for validation, since well_formed_tree/1 operates on compound terms. """ ids = self._to_prolog_list(gesture_ids) try: # Query the raw parse tree and validate it directly wf_query = ( f"xbar:parse_sentence({ids}, T, []), " f"tree_validation:well_formed_tree(T)" ) wf_results = list(self._prolog.query(wf_query)) is_well_formed = len(wf_results) > 0 # Get detailed report from raw tree report_query = ( f"xbar:parse_sentence({ids}, T, []), " f"tree_validation:validate_tree_structure(T, Report)" ) report_results = list(self._prolog.query(report_query)) report: dict[str, Any] = {"well_formed": is_well_formed} if report_results: raw_report = report_results[0].get("Report", []) raw_report = _flatten_cons_cells(raw_report) if isinstance(raw_report, list): for item in raw_report: if isinstance(item, list) and len(item) == 2: report[str(item[0])] = str(item[1]) if not isinstance(item[1], (int, float)) else item[1] return report except Exception as exc: logger.warning("Tree validation failed: %s", exc) return {"well_formed": False, "error": str(exc)} # ------------------------------------------------------------------ # Agreement, Theta, Binding checks # ------------------------------------------------------------------ def _check_agreement_sync(self, gesture_ids: list[str]) -> dict[str, Any] | None: subj_id = None verb_id = None for gid in gesture_ids: cat = self._get_category(gid) if cat == "d" and subj_id is None: subj_id = gid elif cat == "v" and verb_id is None: verb_id = gid if not subj_id or not verb_id: return None query = ( f"serialize:serialize_agreement({subj_id}, {verb_id}, " f"Agrees, InflForm, _)" ) results = list(self._prolog.query(query)) if results: r = results[0] agrees = str(r.get("Agrees", "no")) == "yes" inflected_form = str(r.get("InflForm", "unknown")) return { "agrees": agrees, "inflected_form": inflected_form, "subject_id": subj_id, "verb_id": verb_id, } return None def _check_theta_sync(self, gesture_ids: list[str]) -> dict[str, Any] | None: verb_id = self._find_verb(gesture_ids) if not verb_id: return None args = [gid for gid in gesture_ids if gid != verb_id] args_list = self._to_prolog_list(args) query = f"subcategorization:check_theta_criterion({verb_id}, {args_list}, Result)" results = list(self._prolog.query(query)) if results: result_str = str(results[0].get("Result", "")) if result_str == "satisfied": role_results = list(self._prolog.query( f"lexicon:theta_grid({verb_id}, Roles)" )) roles = [] if role_results: roles_term = role_results[0].get("Roles", []) if isinstance(roles_term, list): roles = [str(r) for r in roles_term] return {"satisfied": True, "roles": roles} else: # Parse violation(type, count) string if "missing_args" in result_str: return {"satisfied": False, "violation_type": "missing_args", "missing_count": 1} elif "extra_args" in result_str: return {"satisfied": False, "violation_type": "extra_args", "missing_count": 1} return {"satisfied": False, "violation_type": "unknown", "missing_count": 0} return None def _check_binding_sync(self, gesture_ids: list[str]) -> list[dict[str, Any]]: ids = self._to_prolog_list(gesture_ids) query = f"binding:check_binding({ids}, local, Violations)" results = list(self._prolog.query(query)) violations: list[dict[str, Any]] = [] if results: raw = results[0].get("Violations", []) if isinstance(raw, list): for v in raw: v_str = str(v) if "principle_" in v_str: # Parse violation(principle_x, id, message) string violations.append({ "principle": "principle_b" if "principle_b" in v_str else "principle_c" if "principle_c" in v_str else "principle_a", "gesture_id": "", "message": v_str, }) return violations # ------------------------------------------------------------------ # Compositional Semantics (Partee et al., Ch 13) # ------------------------------------------------------------------ def _compose_semantics_sync(self, gesture_ids: list[str]) -> dict[str, Any]: """Compute compositional semantic representation via lambda calculus.""" ids = self._to_prolog_list(gesture_ids) try: query = f"compositional:compose_sentence({ids}, Sem, Type)" results = list(self._prolog.query(query)) if results: r = results[0] sem_raw = r.get("Sem", "unknown") type_raw = r.get("Type", "unknown") return { "semantic_form": self._term_to_string(sem_raw), "result_type": str(type_raw), "complete": str(type_raw) == "t", "gesture_ids": gesture_ids, "gesture_types": self._get_semantic_types(gesture_ids), } return { "semantic_form": "unknown", "result_type": "unknown", "complete": False, "gesture_ids": gesture_ids, "gesture_types": self._get_semantic_types(gesture_ids), } except Exception as exc: logger.warning("Compositional semantics failed: %s", exc) return { "semantic_form": "error", "result_type": "error", "complete": False, "error": str(exc), } def _get_semantic_types(self, gesture_ids: list[str]) -> list[dict[str, str]]: """Get semantic type for each gesture ID.""" types: list[dict[str, str]] = [] for gid in gesture_ids: try: results = list(self._prolog.query( f"compositional:semantic_type({gid}, Type)" )) if results: types.append({ "gesture_id": gid, "type": str(results[0].get("Type", "unknown")), }) else: types.append({"gesture_id": gid, "type": "unknown"}) except Exception: types.append({"gesture_id": gid, "type": "error"}) return types def _term_to_string(self, term: Any) -> str: """Convert a pyswip term to a readable string. Cleans up pyswip's Functor representation and unwraps entity() wrappers. """ import re if isinstance(term, (int, float)): return str(term) if isinstance(term, list): flat = _flatten_cons_cells(term) if isinstance(flat, list): parts = [self._term_to_string(t) for t in flat] return f"[{', '.join(parts)}]" return self._term_to_string(flat) # Handle pyswip Functor objects if hasattr(term, 'name') and hasattr(term, 'args'): name = str(term.name) if name == 'entity' and term.args and len(list(term.args)) == 1: return str(list(term.args)[0]) if term.args: args_str = ", ".join(self._term_to_string(a) for a in term.args) return f"{name}({args_str})" return name # String cleanup: pyswip sometimes returns stringified Functor refs s = str(term) # Clean up Functor(id,arity,name) → name s = re.sub(r'Functor\(\d+,\d+,(\w+)\)', r'\1', s) return s # ------------------------------------------------------------------ # Chomsky Hierarchy (Partee et al., Ch 16) # ------------------------------------------------------------------ def _get_grammar_capabilities_sync(self) -> dict[str, Any]: """Get MLAF's formal grammar classification report.""" try: components: list[dict[str, Any]] = [] results = list(self._prolog.query( "chomsky_hierarchy:grammar_class(Component, Type)" )) for r in results: comp = str(r.get("Component", "")) typ = str(r.get("Type", "")) level_results = list(self._prolog.query( f"chomsky_hierarchy:chomsky_level({typ}, Level)" )) level = int(level_results[0].get("Level", -1)) if level_results else -1 components.append({ "component": comp, "chomsky_type": typ, "level": level, }) # Classify by level by_level: dict[str, list[str]] = { "type_3_regular": [], "type_2_context_free": [], "type_1_context_sensitive": [], } for c in components: ct = c["chomsky_type"] if ct in by_level: by_level[ct].append(c["component"]) return { "components": components, "summary": { "total": len(components), "regular_count": len(by_level["type_3_regular"]), "context_free_count": len(by_level["type_2_context_free"]), "context_sensitive_count": len(by_level["type_1_context_sensitive"]), }, "by_level": by_level, "overall_power": "mildly_context_sensitive", "note": "Natural languages are mildly context-sensitive (Joshi 1985). " "MLAF uses CFG base + context-sensitive feature checking.", } except Exception as exc: logger.warning("Grammar capabilities query failed: %s", exc) return {"error": str(exc)} # ------------------------------------------------------------------ # Helpers # ------------------------------------------------------------------ def _to_prolog_list(self, ids: list[str]) -> str: items = ", ".join(ids) return f"[{items}]" def _get_category(self, gid: str) -> str | None: results = list(self._prolog.query(f"lexicon:lex({gid}, _, Cat, _)")) if results: return str(results[0].get("Cat", "")) return None def _find_verb(self, gesture_ids: list[str]) -> str | None: for gid in gesture_ids: if self._get_category(gid) == "v": return gid return None def _build_lex_entry(self, gid: str, cat: str) -> dict[str, Any]: results = list(self._prolog.query(f"lexicon:lex({gid}, Form, _, _)")) form = str(results[0].get("Form", "")) if results else gid return { "grammar_id": gid, "category": cat, "phonological_form": form, } def _categorize_ids(self, gesture_ids: list[str]) -> list[str]: categories = [] for gid in gesture_ids: cat = self._get_category(gid) if cat == "d": categories.append("subj") elif cat == "v": categories.append("verb") elif cat == "n": categories.append("obj") else: categories.append("unknown") return categories def _resolve_tense(self, gesture_ids: list[str]) -> str: for gid in gesture_ids: results = list(self._prolog.query( f"lexicon:lex({gid}, _, v, Feats), member(tense=T, Feats)" )) if results: return str(results[0].get("T", "present")) return "present"