Shaankar39's picture
Initial deploy: MLAF grammar engine
47b17cc verified
"""MLAF Grammar Engine — FastAPI application (port 8300).
Prolog-based X-bar syntactic analysis service. Replaces imperative JS grammar
logic with declarative Prolog queries grounded in Chomskyan generative syntax.
Run:
uvicorn app.main:app --host 0.0.0.0 --port 8300
"""
from __future__ import annotations
import csv
import fcntl
import json
import logging
import time
from contextlib import asynccontextmanager
from pathlib import Path
from typing import AsyncIterator
from fastapi import FastAPI, Request
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import ORJSONResponse
from pydantic import BaseModel, Field
from .engine import PrologEngine
from .schemas import (
GestureSequence,
InterferencePattern,
InterferenceResponse,
MovementTrace,
ParseTreeNode,
PredictNextResponse,
SemanticInterpretation,
TransformResponse,
TreeWellFormedness,
ValidNextEntry,
ValidationResponse,
AgreementInfo,
ThetaInfo,
BindingViolation,
)
logger = logging.getLogger("grammar_engine")
# ---------------------------------------------------------------------------
# Gesture recording — sharded storage for 25GB+ datasets
# ---------------------------------------------------------------------------
# Layout:
# data/custom/landmarks/{gesture_id}.csv ← one file per gesture (append-only)
# data/custom/recording_stats.json ← atomic O(1) stats cache
#
# Each shard has the same header: lm_0_x, lm_0_y, lm_0_z, ..., lm_20_z
# (gesture_id is implicit from the filename, but also stored as the first column
# for backward-compat with preprocess.py's _load_webcam)
_CUSTOM_DIR = Path(__file__).resolve().parent.parent / "data" / "custom"
_LANDMARKS_DIR = _CUSTOM_DIR / "landmarks"
_LANDMARKS_DIR.mkdir(parents=True, exist_ok=True)
_STATS_JSON = _CUSTOM_DIR / "recording_stats.json"
# CSV header: gesture_id + 63 landmark columns
_CSV_HEADER = ["gesture_id"] + [
f"lm_{i}_{c}" for i in range(21) for c in ("x", "y", "z")
]
def _read_stats() -> dict:
"""Read the cached stats JSON (O(1))."""
if _STATS_JSON.exists():
try:
return json.loads(_STATS_JSON.read_text())
except (json.JSONDecodeError, OSError):
pass
return {"total": 0, "per_gesture": {}}
def _write_stats(stats: dict) -> None:
"""Atomically write stats JSON (write-to-tmp then rename)."""
tmp = _STATS_JSON.with_suffix(".tmp")
tmp.write_text(json.dumps(stats))
tmp.rename(_STATS_JSON)
class GestureRecordingRequest(BaseModel):
"""Batch of recorded landmark frames for a single gesture."""
gesture_id: str
frames: list[list[float]] = Field(..., min_length=1)
# ---------------------------------------------------------------------------
# Gesture ID mapping: frontend UPPERCASE → Prolog lowercase
# ---------------------------------------------------------------------------
GESTURE_ID_MAP: dict[str, str] = {
# Pronouns
"SUBJECT_I": "subject_i",
"SUBJECT_YOU": "subject_you",
"SUBJECT_HE": "subject_he",
"SUBJECT_SHE": "subject_she",
"SUBJECT_WE": "subject_we",
"SUBJECT_THEY": "subject_they",
# Verbs
"WANT": "verb_want",
"EAT": "verb_eat",
"SEE": "verb_see",
"GRAB": "verb_grab",
"DRINK": "verb_drink",
"GO": "verb_go",
"STOP": "verb_stop",
# Objects
"FOOD": "object_food",
"WATER": "object_water",
"BOOK": "object_book",
"APPLE": "object_apple",
"BALL": "object_ball",
"HOUSE": "object_house",
}
GESTURE_ID_MAP_REVERSE: dict[str, str] = {v: k for k, v in GESTURE_ID_MAP.items()}
def _resolve_gesture_ids(raw: list[str]) -> list[str]:
"""Map frontend gesture IDs to Prolog IDs, with fallback to .lower()."""
resolved = []
for g in raw:
if g in GESTURE_ID_MAP:
resolved.append(GESTURE_ID_MAP[g])
else:
resolved.append(g.lower())
return resolved
# ---------------------------------------------------------------------------
# Lifespan: instantiate PrologEngine singleton
# ---------------------------------------------------------------------------
@asynccontextmanager
async def _lifespan(app: FastAPI) -> AsyncIterator[None]:
try:
engine = PrologEngine()
app.state.engine = engine
logger.info("Grammar engine ready — Prolog modules loaded")
except Exception as exc:
logger.error("Failed to initialize PrologEngine: %s", exc)
app.state.engine = None
yield
# ---------------------------------------------------------------------------
# Application
# ---------------------------------------------------------------------------
app = FastAPI(
title="MLAF Grammar Engine",
version="1.0.0",
description=(
"Prolog-based X-bar syntactic analysis service. "
"Provides grammaticality judgments, ISL interference detection, "
"and parse tree generation grounded in Chomskyan generative syntax."
),
lifespan=_lifespan,
default_response_class=ORJSONResponse,
)
app.add_middleware(
CORSMiddleware,
allow_origins=[
"http://localhost:5173",
"http://127.0.0.1:5173",
"http://localhost:3000",
"http://127.0.0.1:3000",
"https://multi-modal-gesture-grammar.vercel.app",
],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# ---------------------------------------------------------------------------
# Middleware: latency header
# ---------------------------------------------------------------------------
@app.middleware("http")
async def _latency_header(request: Request, call_next):
t0 = time.perf_counter_ns()
response = await call_next(request)
elapsed_us = (time.perf_counter_ns() - t0) / 1_000
response.headers["X-Grammar-Latency-Us"] = str(int(elapsed_us))
return response
# ---------------------------------------------------------------------------
# Routes
# ---------------------------------------------------------------------------
@app.get("/health")
async def health() -> dict:
engine = getattr(app.state, "engine", None)
return {
"status": "ok",
"engine_loaded": engine is not None,
}
@app.post("/validate", response_model=ValidationResponse)
async def validate(req: GestureSequence) -> ValidationResponse:
"""Grammaticality judgment + X-bar parse tree + agreement info."""
engine: PrologEngine = app.state.engine
gesture_ids = _resolve_gesture_ids(req.gestures)
result = await engine.validate(gesture_ids)
# Extract tree well-formedness from parse tree metadata
tree_wf = None
pt = result.get("parse_tree")
if pt and "_well_formed" in pt:
wf_data = pt["_well_formed"]
tree_wf = TreeWellFormedness(
well_formed=wf_data.get("well_formed", False),
node_count=wf_data.get("node_count"),
depth=wf_data.get("depth"),
)
# Extract compositional semantics
sem = None
sem_data = result.get("semantics")
if sem_data:
sem = SemanticInterpretation(
semantic_form=sem_data.get("semantic_form", "unknown"),
result_type=sem_data.get("result_type", "unknown"),
complete=sem_data.get("complete", False),
gesture_types=sem_data.get("gesture_types", []),
)
return ValidationResponse(
grammatical=result["grammatical"],
parse_tree=_dict_to_tree_node(result.get("parse_tree")),
agreement=_dict_to_agreement(result.get("agreement")),
theta=_dict_to_theta(result.get("theta")),
binding_violations=[
BindingViolation(**v) for v in result.get("binding_violations", [])
],
tense_resolution=result.get("tense_resolution", "present"),
grammaticality_score=result.get("grammaticality_score", 0.0),
semantics=sem,
tree_well_formedness=tree_wf,
)
@app.post("/predict-next", response_model=PredictNextResponse)
async def predict_next(req: GestureSequence) -> PredictNextResponse:
"""Valid next gestures with feature constraints."""
engine: PrologEngine = app.state.engine
gesture_ids = _resolve_gesture_ids(req.gestures)
valid_next = await engine.predict_next(gesture_ids)
entries = []
for entry in valid_next:
entries.append(ValidNextEntry(
grammar_id=entry["grammar_id"],
category=entry["category"],
phonological_form=entry["phonological_form"],
features=None,
theta_role=entry.get("theta_role"),
))
# Determine parse progress
if not gesture_ids:
progress = "incomplete"
state = "START"
elif not valid_next:
progress = "complete"
state = "END"
else:
progress = "incomplete"
categories = []
for gid in gesture_ids:
cat_map = {"d": "NP", "v": "VP", "n": "NP"}
for e in valid_next:
pass
categories.append("NP" if gid.startswith("subject") else "VP" if gid.startswith("verb") else "NP")
state = categories[-1] if categories else "START"
return PredictNextResponse(
valid_next=entries,
current_state=state,
parse_progress=progress,
)
@app.post("/interference", response_model=InterferenceResponse)
async def interference(req: GestureSequence) -> InterferenceResponse:
"""ISL transfer error detection + transform suggestion."""
engine: PrologEngine = app.state.engine
gesture_ids = _resolve_gesture_ids(req.gestures)
interferences = await engine.detect_interference(gesture_ids)
patterns = [
InterferencePattern(
type=i["type"],
severity=i["severity"],
description=i["description"],
)
for i in interferences
]
has_interference = len(patterns) > 0
severity = "none"
if any(p.severity == "error" for p in patterns):
severity = "error"
elif any(p.severity == "warning" for p in patterns):
severity = "warning"
# Get transform suggestion
transform_suggestion = None
if has_interference:
transform = await engine.transform_isl_to_english(gesture_ids)
if transform["transform"] != "none":
eng = transform["english_order"]
transform_suggestion = f"Suggested order: {' '.join(eng)}"
return InterferenceResponse(
has_interference=has_interference,
patterns=patterns,
severity=severity,
transform_suggestion=transform_suggestion,
)
@app.post("/parse-tree")
async def parse_tree(req: GestureSequence) -> dict:
"""Full X-bar tree + theta satisfaction + binding violations."""
engine: PrologEngine = app.state.engine
gesture_ids = _resolve_gesture_ids(req.gestures)
result = await engine.validate(gesture_ids)
return result
@app.post("/compose-semantics")
async def compose_semantics(req: GestureSequence) -> dict:
"""Compositional semantic interpretation via lambda calculus (Partee Ch 13)."""
engine: PrologEngine = app.state.engine
gesture_ids = _resolve_gesture_ids(req.gestures)
return await engine.compose_semantics(gesture_ids)
@app.get("/grammar-capabilities")
async def grammar_capabilities() -> dict:
"""Chomsky Hierarchy classification of MLAF's grammar components (Partee Ch 16)."""
engine: PrologEngine = app.state.engine
return await engine.get_grammar_capabilities()
@app.post("/transform/isl-to-english", response_model=TransformResponse)
async def transform_isl_to_english(req: GestureSequence) -> TransformResponse:
"""Reordered sequence + movement traces."""
engine: PrologEngine = app.state.engine
gesture_ids = _resolve_gesture_ids(req.gestures)
result = await engine.transform_isl_to_english(gesture_ids)
return TransformResponse(
isl_order=result["isl_order"],
english_order=result["english_order"],
transform=result["transform"],
movement_traces=[
MovementTrace(
operation=mt["operation"],
description=mt["description"],
)
for mt in result.get("movement_traces", [])
],
)
# ---------------------------------------------------------------------------
# Gesture Recording endpoints
# ---------------------------------------------------------------------------
@app.post("/gesture-recordings/save")
async def save_gesture_recording(req: GestureRecordingRequest) -> dict:
"""Append recorded landmark frames to a per-gesture shard CSV.
Sharding strategy: one CSV per gesture_id under data/custom/landmarks/.
This scales to 25GB+ without single-file bottlenecks. File-level locking
(fcntl.LOCK_EX) ensures safe concurrent writes.
"""
gesture_id = req.gesture_id
frames = req.frames
# Validate frame dimensions (21 landmarks × 3 coords = 63 floats)
for i, frame in enumerate(frames):
if len(frame) != 63:
return {"saved": 0, "error": f"Frame {i} has {len(frame)} values, expected 63"}
shard_path = _LANDMARKS_DIR / f"{gesture_id}.csv"
write_header = not shard_path.exists() or shard_path.stat().st_size == 0
# Append with exclusive file lock — safe for concurrent requests
with open(shard_path, "a", newline="") as f:
fcntl.flock(f.fileno(), fcntl.LOCK_EX)
try:
writer = csv.writer(f)
if write_header:
writer.writerow(_CSV_HEADER)
for frame in frames:
writer.writerow([gesture_id] + [f"{v:.6f}" for v in frame])
finally:
fcntl.flock(f.fileno(), fcntl.LOCK_UN)
# Update cached stats atomically
stats = _read_stats()
prev = stats["per_gesture"].get(gesture_id, 0)
stats["per_gesture"][gesture_id] = prev + len(frames)
stats["total"] = stats.get("total", 0) + len(frames)
_write_stats(stats)
logger.info(
"Saved %d frames for '%s' (shard: %s, total: %d)",
len(frames), gesture_id, shard_path.name, stats["total"],
)
return {"saved": len(frames), "gesture_id": gesture_id, "shard": shard_path.name}
@app.get("/gesture-recordings/stats")
async def gesture_recording_stats() -> dict:
"""Return per-gesture frame counts — O(1) from cached JSON."""
return _read_stats()
@app.post("/gesture-recordings/rebuild-stats")
async def rebuild_recording_stats() -> dict:
"""Rebuild stats cache by scanning all shard CSVs. Use if stats drift."""
per_gesture: dict[str, int] = {}
total = 0
for shard in _LANDMARKS_DIR.glob("*.csv"):
gid = shard.stem
count = 0
with open(shard, "r") as f:
reader = csv.reader(f)
next(reader, None) # skip header
for _ in reader:
count += 1
per_gesture[gid] = count
total += count
stats = {"total": total, "per_gesture": per_gesture}
_write_stats(stats)
logger.info("Rebuilt recording stats: %d total frames across %d gestures", total, len(per_gesture))
return stats
# ---------------------------------------------------------------------------
# Response converters
# ---------------------------------------------------------------------------
def _dict_to_tree_node(d: dict | None) -> ParseTreeNode | None:
if not d:
return None
children = None
if "children" in d and d["children"]:
children = [_dict_to_tree_node(c) for c in d["children"] if c]
return ParseTreeNode(
label=d.get("label", ""),
children=children,
features=d.get("features"),
trace=d.get("trace"),
phonological_form=d.get("phonological_form"),
)
def _dict_to_agreement(d: dict | None) -> AgreementInfo | None:
if not d:
return None
return AgreementInfo(
agrees=d.get("agrees", False),
inflected_form=d.get("inflected_form"),
reason=d.get("reason"),
)
def _dict_to_theta(d: dict | None) -> ThetaInfo | None:
if not d:
return None
return ThetaInfo(
satisfied=d.get("satisfied", False),
roles=d.get("roles"),
violation_type=d.get("violation_type"),
missing_count=d.get("missing_count", 0),
)