Spaces:
Running
Running
| """MLAF Grammar Engine — FastAPI application (port 8300). | |
| Prolog-based X-bar syntactic analysis service. Replaces imperative JS grammar | |
| logic with declarative Prolog queries grounded in Chomskyan generative syntax. | |
| Run: | |
| uvicorn app.main:app --host 0.0.0.0 --port 8300 | |
| """ | |
| from __future__ import annotations | |
| import csv | |
| import fcntl | |
| import json | |
| import logging | |
| import time | |
| from contextlib import asynccontextmanager | |
| from pathlib import Path | |
| from typing import AsyncIterator | |
| from fastapi import FastAPI, Request | |
| from fastapi.middleware.cors import CORSMiddleware | |
| from fastapi.responses import ORJSONResponse | |
| from pydantic import BaseModel, Field | |
| from .engine import PrologEngine | |
| from .schemas import ( | |
| GestureSequence, | |
| InterferencePattern, | |
| InterferenceResponse, | |
| MovementTrace, | |
| ParseTreeNode, | |
| PredictNextResponse, | |
| SemanticInterpretation, | |
| TransformResponse, | |
| TreeWellFormedness, | |
| ValidNextEntry, | |
| ValidationResponse, | |
| AgreementInfo, | |
| ThetaInfo, | |
| BindingViolation, | |
| ) | |
| logger = logging.getLogger("grammar_engine") | |
| # --------------------------------------------------------------------------- | |
| # Gesture recording — sharded storage for 25GB+ datasets | |
| # --------------------------------------------------------------------------- | |
| # Layout: | |
| # data/custom/landmarks/{gesture_id}.csv ← one file per gesture (append-only) | |
| # data/custom/recording_stats.json ← atomic O(1) stats cache | |
| # | |
| # Each shard has the same header: lm_0_x, lm_0_y, lm_0_z, ..., lm_20_z | |
| # (gesture_id is implicit from the filename, but also stored as the first column | |
| # for backward-compat with preprocess.py's _load_webcam) | |
| _CUSTOM_DIR = Path(__file__).resolve().parent.parent / "data" / "custom" | |
| _LANDMARKS_DIR = _CUSTOM_DIR / "landmarks" | |
| _LANDMARKS_DIR.mkdir(parents=True, exist_ok=True) | |
| _STATS_JSON = _CUSTOM_DIR / "recording_stats.json" | |
| # CSV header: gesture_id + 63 landmark columns | |
| _CSV_HEADER = ["gesture_id"] + [ | |
| f"lm_{i}_{c}" for i in range(21) for c in ("x", "y", "z") | |
| ] | |
| def _read_stats() -> dict: | |
| """Read the cached stats JSON (O(1)).""" | |
| if _STATS_JSON.exists(): | |
| try: | |
| return json.loads(_STATS_JSON.read_text()) | |
| except (json.JSONDecodeError, OSError): | |
| pass | |
| return {"total": 0, "per_gesture": {}} | |
| def _write_stats(stats: dict) -> None: | |
| """Atomically write stats JSON (write-to-tmp then rename).""" | |
| tmp = _STATS_JSON.with_suffix(".tmp") | |
| tmp.write_text(json.dumps(stats)) | |
| tmp.rename(_STATS_JSON) | |
| class GestureRecordingRequest(BaseModel): | |
| """Batch of recorded landmark frames for a single gesture.""" | |
| gesture_id: str | |
| frames: list[list[float]] = Field(..., min_length=1) | |
| # --------------------------------------------------------------------------- | |
| # Gesture ID mapping: frontend UPPERCASE → Prolog lowercase | |
| # --------------------------------------------------------------------------- | |
| GESTURE_ID_MAP: dict[str, str] = { | |
| # Pronouns | |
| "SUBJECT_I": "subject_i", | |
| "SUBJECT_YOU": "subject_you", | |
| "SUBJECT_HE": "subject_he", | |
| "SUBJECT_SHE": "subject_she", | |
| "SUBJECT_WE": "subject_we", | |
| "SUBJECT_THEY": "subject_they", | |
| # Verbs | |
| "WANT": "verb_want", | |
| "EAT": "verb_eat", | |
| "SEE": "verb_see", | |
| "GRAB": "verb_grab", | |
| "DRINK": "verb_drink", | |
| "GO": "verb_go", | |
| "STOP": "verb_stop", | |
| # Objects | |
| "FOOD": "object_food", | |
| "WATER": "object_water", | |
| "BOOK": "object_book", | |
| "APPLE": "object_apple", | |
| "BALL": "object_ball", | |
| "HOUSE": "object_house", | |
| } | |
| GESTURE_ID_MAP_REVERSE: dict[str, str] = {v: k for k, v in GESTURE_ID_MAP.items()} | |
| def _resolve_gesture_ids(raw: list[str]) -> list[str]: | |
| """Map frontend gesture IDs to Prolog IDs, with fallback to .lower().""" | |
| resolved = [] | |
| for g in raw: | |
| if g in GESTURE_ID_MAP: | |
| resolved.append(GESTURE_ID_MAP[g]) | |
| else: | |
| resolved.append(g.lower()) | |
| return resolved | |
| # --------------------------------------------------------------------------- | |
| # Lifespan: instantiate PrologEngine singleton | |
| # --------------------------------------------------------------------------- | |
| async def _lifespan(app: FastAPI) -> AsyncIterator[None]: | |
| try: | |
| engine = PrologEngine() | |
| app.state.engine = engine | |
| logger.info("Grammar engine ready — Prolog modules loaded") | |
| except Exception as exc: | |
| logger.error("Failed to initialize PrologEngine: %s", exc) | |
| app.state.engine = None | |
| yield | |
| # --------------------------------------------------------------------------- | |
| # Application | |
| # --------------------------------------------------------------------------- | |
| app = FastAPI( | |
| title="MLAF Grammar Engine", | |
| version="1.0.0", | |
| description=( | |
| "Prolog-based X-bar syntactic analysis service. " | |
| "Provides grammaticality judgments, ISL interference detection, " | |
| "and parse tree generation grounded in Chomskyan generative syntax." | |
| ), | |
| lifespan=_lifespan, | |
| default_response_class=ORJSONResponse, | |
| ) | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=[ | |
| "http://localhost:5173", | |
| "http://127.0.0.1:5173", | |
| "http://localhost:3000", | |
| "http://127.0.0.1:3000", | |
| "https://multi-modal-gesture-grammar.vercel.app", | |
| ], | |
| allow_credentials=True, | |
| allow_methods=["*"], | |
| allow_headers=["*"], | |
| ) | |
| # --------------------------------------------------------------------------- | |
| # Middleware: latency header | |
| # --------------------------------------------------------------------------- | |
| async def _latency_header(request: Request, call_next): | |
| t0 = time.perf_counter_ns() | |
| response = await call_next(request) | |
| elapsed_us = (time.perf_counter_ns() - t0) / 1_000 | |
| response.headers["X-Grammar-Latency-Us"] = str(int(elapsed_us)) | |
| return response | |
| # --------------------------------------------------------------------------- | |
| # Routes | |
| # --------------------------------------------------------------------------- | |
| async def health() -> dict: | |
| engine = getattr(app.state, "engine", None) | |
| return { | |
| "status": "ok", | |
| "engine_loaded": engine is not None, | |
| } | |
| async def validate(req: GestureSequence) -> ValidationResponse: | |
| """Grammaticality judgment + X-bar parse tree + agreement info.""" | |
| engine: PrologEngine = app.state.engine | |
| gesture_ids = _resolve_gesture_ids(req.gestures) | |
| result = await engine.validate(gesture_ids) | |
| # Extract tree well-formedness from parse tree metadata | |
| tree_wf = None | |
| pt = result.get("parse_tree") | |
| if pt and "_well_formed" in pt: | |
| wf_data = pt["_well_formed"] | |
| tree_wf = TreeWellFormedness( | |
| well_formed=wf_data.get("well_formed", False), | |
| node_count=wf_data.get("node_count"), | |
| depth=wf_data.get("depth"), | |
| ) | |
| # Extract compositional semantics | |
| sem = None | |
| sem_data = result.get("semantics") | |
| if sem_data: | |
| sem = SemanticInterpretation( | |
| semantic_form=sem_data.get("semantic_form", "unknown"), | |
| result_type=sem_data.get("result_type", "unknown"), | |
| complete=sem_data.get("complete", False), | |
| gesture_types=sem_data.get("gesture_types", []), | |
| ) | |
| return ValidationResponse( | |
| grammatical=result["grammatical"], | |
| parse_tree=_dict_to_tree_node(result.get("parse_tree")), | |
| agreement=_dict_to_agreement(result.get("agreement")), | |
| theta=_dict_to_theta(result.get("theta")), | |
| binding_violations=[ | |
| BindingViolation(**v) for v in result.get("binding_violations", []) | |
| ], | |
| tense_resolution=result.get("tense_resolution", "present"), | |
| grammaticality_score=result.get("grammaticality_score", 0.0), | |
| semantics=sem, | |
| tree_well_formedness=tree_wf, | |
| ) | |
| async def predict_next(req: GestureSequence) -> PredictNextResponse: | |
| """Valid next gestures with feature constraints.""" | |
| engine: PrologEngine = app.state.engine | |
| gesture_ids = _resolve_gesture_ids(req.gestures) | |
| valid_next = await engine.predict_next(gesture_ids) | |
| entries = [] | |
| for entry in valid_next: | |
| entries.append(ValidNextEntry( | |
| grammar_id=entry["grammar_id"], | |
| category=entry["category"], | |
| phonological_form=entry["phonological_form"], | |
| features=None, | |
| theta_role=entry.get("theta_role"), | |
| )) | |
| # Determine parse progress | |
| if not gesture_ids: | |
| progress = "incomplete" | |
| state = "START" | |
| elif not valid_next: | |
| progress = "complete" | |
| state = "END" | |
| else: | |
| progress = "incomplete" | |
| categories = [] | |
| for gid in gesture_ids: | |
| cat_map = {"d": "NP", "v": "VP", "n": "NP"} | |
| for e in valid_next: | |
| pass | |
| categories.append("NP" if gid.startswith("subject") else "VP" if gid.startswith("verb") else "NP") | |
| state = categories[-1] if categories else "START" | |
| return PredictNextResponse( | |
| valid_next=entries, | |
| current_state=state, | |
| parse_progress=progress, | |
| ) | |
| async def interference(req: GestureSequence) -> InterferenceResponse: | |
| """ISL transfer error detection + transform suggestion.""" | |
| engine: PrologEngine = app.state.engine | |
| gesture_ids = _resolve_gesture_ids(req.gestures) | |
| interferences = await engine.detect_interference(gesture_ids) | |
| patterns = [ | |
| InterferencePattern( | |
| type=i["type"], | |
| severity=i["severity"], | |
| description=i["description"], | |
| ) | |
| for i in interferences | |
| ] | |
| has_interference = len(patterns) > 0 | |
| severity = "none" | |
| if any(p.severity == "error" for p in patterns): | |
| severity = "error" | |
| elif any(p.severity == "warning" for p in patterns): | |
| severity = "warning" | |
| # Get transform suggestion | |
| transform_suggestion = None | |
| if has_interference: | |
| transform = await engine.transform_isl_to_english(gesture_ids) | |
| if transform["transform"] != "none": | |
| eng = transform["english_order"] | |
| transform_suggestion = f"Suggested order: {' '.join(eng)}" | |
| return InterferenceResponse( | |
| has_interference=has_interference, | |
| patterns=patterns, | |
| severity=severity, | |
| transform_suggestion=transform_suggestion, | |
| ) | |
| async def parse_tree(req: GestureSequence) -> dict: | |
| """Full X-bar tree + theta satisfaction + binding violations.""" | |
| engine: PrologEngine = app.state.engine | |
| gesture_ids = _resolve_gesture_ids(req.gestures) | |
| result = await engine.validate(gesture_ids) | |
| return result | |
| async def compose_semantics(req: GestureSequence) -> dict: | |
| """Compositional semantic interpretation via lambda calculus (Partee Ch 13).""" | |
| engine: PrologEngine = app.state.engine | |
| gesture_ids = _resolve_gesture_ids(req.gestures) | |
| return await engine.compose_semantics(gesture_ids) | |
| async def grammar_capabilities() -> dict: | |
| """Chomsky Hierarchy classification of MLAF's grammar components (Partee Ch 16).""" | |
| engine: PrologEngine = app.state.engine | |
| return await engine.get_grammar_capabilities() | |
| async def transform_isl_to_english(req: GestureSequence) -> TransformResponse: | |
| """Reordered sequence + movement traces.""" | |
| engine: PrologEngine = app.state.engine | |
| gesture_ids = _resolve_gesture_ids(req.gestures) | |
| result = await engine.transform_isl_to_english(gesture_ids) | |
| return TransformResponse( | |
| isl_order=result["isl_order"], | |
| english_order=result["english_order"], | |
| transform=result["transform"], | |
| movement_traces=[ | |
| MovementTrace( | |
| operation=mt["operation"], | |
| description=mt["description"], | |
| ) | |
| for mt in result.get("movement_traces", []) | |
| ], | |
| ) | |
| # --------------------------------------------------------------------------- | |
| # Gesture Recording endpoints | |
| # --------------------------------------------------------------------------- | |
| async def save_gesture_recording(req: GestureRecordingRequest) -> dict: | |
| """Append recorded landmark frames to a per-gesture shard CSV. | |
| Sharding strategy: one CSV per gesture_id under data/custom/landmarks/. | |
| This scales to 25GB+ without single-file bottlenecks. File-level locking | |
| (fcntl.LOCK_EX) ensures safe concurrent writes. | |
| """ | |
| gesture_id = req.gesture_id | |
| frames = req.frames | |
| # Validate frame dimensions (21 landmarks × 3 coords = 63 floats) | |
| for i, frame in enumerate(frames): | |
| if len(frame) != 63: | |
| return {"saved": 0, "error": f"Frame {i} has {len(frame)} values, expected 63"} | |
| shard_path = _LANDMARKS_DIR / f"{gesture_id}.csv" | |
| write_header = not shard_path.exists() or shard_path.stat().st_size == 0 | |
| # Append with exclusive file lock — safe for concurrent requests | |
| with open(shard_path, "a", newline="") as f: | |
| fcntl.flock(f.fileno(), fcntl.LOCK_EX) | |
| try: | |
| writer = csv.writer(f) | |
| if write_header: | |
| writer.writerow(_CSV_HEADER) | |
| for frame in frames: | |
| writer.writerow([gesture_id] + [f"{v:.6f}" for v in frame]) | |
| finally: | |
| fcntl.flock(f.fileno(), fcntl.LOCK_UN) | |
| # Update cached stats atomically | |
| stats = _read_stats() | |
| prev = stats["per_gesture"].get(gesture_id, 0) | |
| stats["per_gesture"][gesture_id] = prev + len(frames) | |
| stats["total"] = stats.get("total", 0) + len(frames) | |
| _write_stats(stats) | |
| logger.info( | |
| "Saved %d frames for '%s' (shard: %s, total: %d)", | |
| len(frames), gesture_id, shard_path.name, stats["total"], | |
| ) | |
| return {"saved": len(frames), "gesture_id": gesture_id, "shard": shard_path.name} | |
| async def gesture_recording_stats() -> dict: | |
| """Return per-gesture frame counts — O(1) from cached JSON.""" | |
| return _read_stats() | |
| async def rebuild_recording_stats() -> dict: | |
| """Rebuild stats cache by scanning all shard CSVs. Use if stats drift.""" | |
| per_gesture: dict[str, int] = {} | |
| total = 0 | |
| for shard in _LANDMARKS_DIR.glob("*.csv"): | |
| gid = shard.stem | |
| count = 0 | |
| with open(shard, "r") as f: | |
| reader = csv.reader(f) | |
| next(reader, None) # skip header | |
| for _ in reader: | |
| count += 1 | |
| per_gesture[gid] = count | |
| total += count | |
| stats = {"total": total, "per_gesture": per_gesture} | |
| _write_stats(stats) | |
| logger.info("Rebuilt recording stats: %d total frames across %d gestures", total, len(per_gesture)) | |
| return stats | |
| # --------------------------------------------------------------------------- | |
| # Response converters | |
| # --------------------------------------------------------------------------- | |
| def _dict_to_tree_node(d: dict | None) -> ParseTreeNode | None: | |
| if not d: | |
| return None | |
| children = None | |
| if "children" in d and d["children"]: | |
| children = [_dict_to_tree_node(c) for c in d["children"] if c] | |
| return ParseTreeNode( | |
| label=d.get("label", ""), | |
| children=children, | |
| features=d.get("features"), | |
| trace=d.get("trace"), | |
| phonological_form=d.get("phonological_form"), | |
| ) | |
| def _dict_to_agreement(d: dict | None) -> AgreementInfo | None: | |
| if not d: | |
| return None | |
| return AgreementInfo( | |
| agrees=d.get("agrees", False), | |
| inflected_form=d.get("inflected_form"), | |
| reason=d.get("reason"), | |
| ) | |
| def _dict_to_theta(d: dict | None) -> ThetaInfo | None: | |
| if not d: | |
| return None | |
| return ThetaInfo( | |
| satisfied=d.get("satisfied", False), | |
| roles=d.get("roles"), | |
| violation_type=d.get("violation_type"), | |
| missing_count=d.get("missing_count", 0), | |
| ) | |