File size: 25,409 Bytes
47b17cc
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
"""PrologEngine — Singleton wrapper around pyswip for X-bar syntactic analysis.

Thread safety: asyncio.Lock around all Prolog queries (pyswip is not thread-safe).
Loads all Prolog modules at init time. Uses serialize.pl for clean data marshalling
(pyswip returns compound terms as opaque strings; serialize.pl converts them to
nested lists of atoms that pyswip marshals cleanly to Python).
"""

from __future__ import annotations

import asyncio
import logging
from pathlib import Path
from typing import Any

from pyswip import Prolog

logger = logging.getLogger("grammar_engine")

_PROLOG_DIR = Path(__file__).resolve().parent.parent / "prolog"

_MODULES = [
    "lexicon",
    "agreement",
    "subcategorization",
    "binding",
    "movement",
    "isl_grammar",
    "contrastive",
    "xbar",
    "tree_validation",
    "compositional",
    "chomsky_hierarchy",
    "serialize",
]


def _flatten_cons_cells(node: Any) -> list | Any:
    """Recursively flatten pyswip cons cell representation.

    pyswip marshals Prolog lists as nested ['[|]', head, tail] structures.
    This converts them back to flat Python lists.
    """
    if isinstance(node, list):
        if len(node) == 3 and node[0] == '[|]':
            head = _flatten_cons_cells(node[1])
            tail = _flatten_cons_cells(node[2])
            if isinstance(tail, list):
                return [head] + tail
            return [head]
        return [_flatten_cons_cells(item) for item in node]
    return node


def _extract_features(node: Any) -> dict[str, str] | None:
    """Extract feature dict from a Prolog feature list.

    After flattening, feature lists look like:
    [['=', 'person', 1], ['=', 'number', 'sg'], ...]
    """
    if not isinstance(node, list):
        return None
    features: dict[str, str] = {}
    for item in node:
        if isinstance(item, list) and len(item) == 3 and item[0] == '=':
            features[str(item[1])] = str(item[2])
    if features:
        return features
    return None


def _nested_list_to_tree(node: Any) -> dict[str, Any] | None:
    """Convert serialize.pl nested list tree to dict.

    Format from Prolog: [Label, Child1, Child2, ...]
    Handles cons cell representation from pyswip.
    """
    node = _flatten_cons_cells(node)

    if node is None:
        return None

    if isinstance(node, str):
        return {"label": node}

    if isinstance(node, (int, float)):
        return {"label": str(node)}

    if isinstance(node, list) and len(node) >= 1:
        label = str(node[0])
        children: list[dict[str, Any]] = []
        features: dict[str, str] | None = None

        for arg in node[1:]:
            if isinstance(arg, list):
                # Check if it's a feature list [['=', k, v], ...]
                feat = _extract_features(arg)
                if feat:
                    features = feat
                else:
                    child = _nested_list_to_tree(arg)
                    if child:
                        children.append(child)
            elif isinstance(arg, str):
                children.append({"label": arg})
            elif isinstance(arg, (int, float)):
                children.append({"label": str(arg)})

        result: dict[str, Any] = {"label": label}
        if children:
            result["children"] = children
        if features:
            result["features"] = features
        return result

    return None


class PrologEngine:
    """Singleton wrapper around SWI-Prolog via pyswip."""

    _instance: PrologEngine | None = None
    _lock: asyncio.Lock

    def __new__(cls) -> PrologEngine:
        if cls._instance is None:
            cls._instance = super().__new__(cls)
            cls._instance._initialized = False
        return cls._instance

    def __init__(self) -> None:
        if self._initialized:
            return
        self._prolog = Prolog()
        self._lock = asyncio.Lock()
        self._load_modules()
        self._initialized = True
        logger.info("PrologEngine initialized with %d modules", len(_MODULES))

    def _load_modules(self) -> None:
        for mod in _MODULES:
            path = _PROLOG_DIR / f"{mod}.pl"
            if not path.exists():
                raise FileNotFoundError(f"Prolog module not found: {path}")
            self._prolog.consult(str(path))
            logger.debug("Loaded Prolog module: %s", mod)

    # ------------------------------------------------------------------
    # Public async methods
    # ------------------------------------------------------------------

    async def validate(self, gesture_ids: list[str]) -> dict[str, Any]:
        async with self._lock:
            return self._validate_sync(gesture_ids)

    async def predict_next(self, gesture_ids: list[str]) -> list[dict[str, Any]]:
        async with self._lock:
            return self._predict_next_sync(gesture_ids)

    async def detect_interference(self, gesture_ids: list[str]) -> list[dict[str, Any]]:
        async with self._lock:
            return self._detect_interference_sync(gesture_ids)

    async def transform_isl_to_english(self, gesture_ids: list[str]) -> dict[str, Any]:
        async with self._lock:
            return self._transform_sync(gesture_ids)

    async def get_parse_tree(self, gesture_ids: list[str]) -> dict[str, Any] | None:
        async with self._lock:
            return self._get_parse_tree_sync(gesture_ids)

    async def compose_semantics(self, gesture_ids: list[str]) -> dict[str, Any]:
        async with self._lock:
            return self._compose_semantics_sync(gesture_ids)

    async def get_grammar_capabilities(self) -> dict[str, Any]:
        async with self._lock:
            return self._get_grammar_capabilities_sync()

    # ------------------------------------------------------------------
    # Synchronous implementations (called under lock)
    # ------------------------------------------------------------------

    def _validate_sync(self, gesture_ids: list[str]) -> dict[str, Any]:
        result: dict[str, Any] = {
            "grammatical": False,
            "parse_tree": None,
            "agreement": None,
            "theta": None,
            "binding_violations": [],
            "tense_resolution": "present",
            "grammaticality_score": 0.0,
        }

        # 1. Parse tree
        parse_tree = self._get_parse_tree_sync(gesture_ids)
        if parse_tree:
            result["parse_tree"] = parse_tree
            result["grammatical"] = True
            score = 1.0
        else:
            score = 0.3

        # 2. Agreement
        result["agreement"] = self._check_agreement_sync(gesture_ids)
        if result["agreement"] and not result["agreement"].get("agrees", True):
            score -= 0.2

        # 3. Theta criterion
        result["theta"] = self._check_theta_sync(gesture_ids)
        if result["theta"] and not result["theta"].get("satisfied", True):
            score -= 0.2
            result["grammatical"] = False

        # 4. Binding violations
        result["binding_violations"] = self._check_binding_sync(gesture_ids)
        if result["binding_violations"]:
            score -= 0.1 * len(result["binding_violations"])
            result["grammatical"] = False

        # 5. Tense
        result["tense_resolution"] = self._resolve_tense(gesture_ids)
        result["grammaticality_score"] = max(0.0, min(1.0, score))

        # 6. Compositional semantics (Frege's Principle)
        result["semantics"] = self._compose_semantics_sync(gesture_ids)

        return result

    def _predict_next_sync(self, gesture_ids: list[str]) -> list[dict[str, Any]]:
        valid_next: list[dict[str, Any]] = []
        categories = self._categorize_ids(gesture_ids)

        if not gesture_ids:
            expected_cat = "d"
        elif all(c == "subj" for c in categories):
            expected_cat = "v"
        elif "verb" in categories and "obj" not in categories:
            expected_cat = "n"
        else:
            return []

        if expected_cat == "d":
            results = list(self._prolog.query(
                "lexicon:lex(ID, Form, d, Feats), member(case=nom, Feats)"
            ))
            # Deduplicate — pyswip may return multiple bindings
            seen = set()
            for r in results:
                gid = str(r.get("ID", ""))
                if gid not in seen:
                    seen.add(gid)
                    valid_next.append(self._build_lex_entry(gid, "d"))
        elif expected_cat == "v":
            results = list(self._prolog.query("lexicon:lex(ID, Form, v, _)"))
            seen = set()
            for r in results:
                gid = str(r.get("ID", ""))
                if gid not in seen:
                    seen.add(gid)
                    valid_next.append(self._build_lex_entry(gid, "v"))
        elif expected_cat == "n":
            results = list(self._prolog.query("lexicon:lex(ID, Form, n, _)"))
            verb_id = self._find_verb(gesture_ids)
            seen = set()
            for r in results:
                gid = str(r.get("ID", ""))
                if gid not in seen:
                    seen.add(gid)
                    entry = self._build_lex_entry(gid, "n")
                    if verb_id:
                        role_results = list(self._prolog.query(
                            f"subcategorization:role_assignment({verb_id}, 2, Role)"
                        ))
                        if role_results:
                            entry["theta_role"] = str(role_results[0].get("Role", ""))
                    valid_next.append(entry)

        return valid_next

    def _detect_interference_sync(self, gesture_ids: list[str]) -> list[dict[str, Any]]:
        ids = self._to_prolog_list(gesture_ids)
        query = f"serialize:serialize_interference({ids}, Patterns)"
        results = list(self._prolog.query(query))

        interferences: list[dict[str, Any]] = []
        if results:
            patterns = results[0].get("Patterns", [])
            for p in patterns:
                if isinstance(p, list) and len(p) >= 3:
                    interferences.append({
                        "type": str(p[0]),
                        "severity": str(p[1]),
                        "description": str(p[2]),
                    })
        return interferences

    def _transform_sync(self, gesture_ids: list[str]) -> dict[str, Any]:
        ids = self._to_prolog_list(gesture_ids)
        query = f"serialize:serialize_transform({ids}, Eng, TType, Ops)"
        results = list(self._prolog.query(query))

        if results:
            r = results[0]
            eng_order = r.get("Eng", gesture_ids)
            if isinstance(eng_order, list):
                eng_list = [str(x) for x in eng_order]
            else:
                eng_list = gesture_ids

            transform_type = str(r.get("TType", "unknown"))
            raw_ops = r.get("Ops", [])
            operations: list[dict[str, str]] = []
            if isinstance(raw_ops, list):
                for op in raw_ops:
                    if isinstance(op, list) and len(op) >= 2:
                        operations.append({
                            "operation": str(op[0]),
                            "description": str(op[1]),
                        })

            return {
                "isl_order": gesture_ids,
                "english_order": eng_list,
                "transform": transform_type,
                "movement_traces": operations,
            }

        return {
            "isl_order": gesture_ids,
            "english_order": gesture_ids,
            "transform": "none",
            "movement_traces": [],
        }

    def _get_parse_tree_sync(self, gesture_ids: list[str]) -> dict[str, Any] | None:
        ids = self._to_prolog_list(gesture_ids)
        query = f"serialize:serialize_tree({ids}, TreeList)"
        results = list(self._prolog.query(query))

        if results:
            tree_list = results[0].get("TreeList")
            if tree_list:
                tree = _nested_list_to_tree(tree_list)
                # Validate tree well-formedness (Partee et al., Ch 16)
                tree_valid = self._validate_tree_sync(gesture_ids)
                if tree and tree_valid is not None:
                    tree["_well_formed"] = tree_valid
                return tree
        return None

    def _validate_tree_sync(self, gesture_ids: list[str]) -> dict[str, Any] | None:
        """Validate parse tree well-formedness using formal conditions.

        Uses the raw Prolog parse tree (not serialized) for validation,
        since well_formed_tree/1 operates on compound terms.
        """
        ids = self._to_prolog_list(gesture_ids)
        try:
            # Query the raw parse tree and validate it directly
            wf_query = (
                f"xbar:parse_sentence({ids}, T, []), "
                f"tree_validation:well_formed_tree(T)"
            )
            wf_results = list(self._prolog.query(wf_query))
            is_well_formed = len(wf_results) > 0

            # Get detailed report from raw tree
            report_query = (
                f"xbar:parse_sentence({ids}, T, []), "
                f"tree_validation:validate_tree_structure(T, Report)"
            )
            report_results = list(self._prolog.query(report_query))

            report: dict[str, Any] = {"well_formed": is_well_formed}
            if report_results:
                raw_report = report_results[0].get("Report", [])
                raw_report = _flatten_cons_cells(raw_report)
                if isinstance(raw_report, list):
                    for item in raw_report:
                        if isinstance(item, list) and len(item) == 2:
                            report[str(item[0])] = str(item[1]) if not isinstance(item[1], (int, float)) else item[1]
            return report
        except Exception as exc:
            logger.warning("Tree validation failed: %s", exc)
            return {"well_formed": False, "error": str(exc)}

    # ------------------------------------------------------------------
    # Agreement, Theta, Binding checks
    # ------------------------------------------------------------------

    def _check_agreement_sync(self, gesture_ids: list[str]) -> dict[str, Any] | None:
        subj_id = None
        verb_id = None
        for gid in gesture_ids:
            cat = self._get_category(gid)
            if cat == "d" and subj_id is None:
                subj_id = gid
            elif cat == "v" and verb_id is None:
                verb_id = gid

        if not subj_id or not verb_id:
            return None

        query = (
            f"serialize:serialize_agreement({subj_id}, {verb_id}, "
            f"Agrees, InflForm, _)"
        )
        results = list(self._prolog.query(query))
        if results:
            r = results[0]
            agrees = str(r.get("Agrees", "no")) == "yes"
            inflected_form = str(r.get("InflForm", "unknown"))
            return {
                "agrees": agrees,
                "inflected_form": inflected_form,
                "subject_id": subj_id,
                "verb_id": verb_id,
            }
        return None

    def _check_theta_sync(self, gesture_ids: list[str]) -> dict[str, Any] | None:
        verb_id = self._find_verb(gesture_ids)
        if not verb_id:
            return None

        args = [gid for gid in gesture_ids if gid != verb_id]
        args_list = self._to_prolog_list(args)
        query = f"subcategorization:check_theta_criterion({verb_id}, {args_list}, Result)"
        results = list(self._prolog.query(query))

        if results:
            result_str = str(results[0].get("Result", ""))
            if result_str == "satisfied":
                role_results = list(self._prolog.query(
                    f"lexicon:theta_grid({verb_id}, Roles)"
                ))
                roles = []
                if role_results:
                    roles_term = role_results[0].get("Roles", [])
                    if isinstance(roles_term, list):
                        roles = [str(r) for r in roles_term]
                return {"satisfied": True, "roles": roles}
            else:
                # Parse violation(type, count) string
                if "missing_args" in result_str:
                    return {"satisfied": False, "violation_type": "missing_args", "missing_count": 1}
                elif "extra_args" in result_str:
                    return {"satisfied": False, "violation_type": "extra_args", "missing_count": 1}
                return {"satisfied": False, "violation_type": "unknown", "missing_count": 0}

        return None

    def _check_binding_sync(self, gesture_ids: list[str]) -> list[dict[str, Any]]:
        ids = self._to_prolog_list(gesture_ids)
        query = f"binding:check_binding({ids}, local, Violations)"
        results = list(self._prolog.query(query))

        violations: list[dict[str, Any]] = []
        if results:
            raw = results[0].get("Violations", [])
            if isinstance(raw, list):
                for v in raw:
                    v_str = str(v)
                    if "principle_" in v_str:
                        # Parse violation(principle_x, id, message) string
                        violations.append({
                            "principle": "principle_b" if "principle_b" in v_str else
                                         "principle_c" if "principle_c" in v_str else
                                         "principle_a",
                            "gesture_id": "",
                            "message": v_str,
                        })
        return violations

    # ------------------------------------------------------------------
    # Compositional Semantics (Partee et al., Ch 13)
    # ------------------------------------------------------------------

    def _compose_semantics_sync(self, gesture_ids: list[str]) -> dict[str, Any]:
        """Compute compositional semantic representation via lambda calculus."""
        ids = self._to_prolog_list(gesture_ids)
        try:
            query = f"compositional:compose_sentence({ids}, Sem, Type)"
            results = list(self._prolog.query(query))

            if results:
                r = results[0]
                sem_raw = r.get("Sem", "unknown")
                type_raw = r.get("Type", "unknown")

                return {
                    "semantic_form": self._term_to_string(sem_raw),
                    "result_type": str(type_raw),
                    "complete": str(type_raw) == "t",
                    "gesture_ids": gesture_ids,
                    "gesture_types": self._get_semantic_types(gesture_ids),
                }

            return {
                "semantic_form": "unknown",
                "result_type": "unknown",
                "complete": False,
                "gesture_ids": gesture_ids,
                "gesture_types": self._get_semantic_types(gesture_ids),
            }
        except Exception as exc:
            logger.warning("Compositional semantics failed: %s", exc)
            return {
                "semantic_form": "error",
                "result_type": "error",
                "complete": False,
                "error": str(exc),
            }

    def _get_semantic_types(self, gesture_ids: list[str]) -> list[dict[str, str]]:
        """Get semantic type for each gesture ID."""
        types: list[dict[str, str]] = []
        for gid in gesture_ids:
            try:
                results = list(self._prolog.query(
                    f"compositional:semantic_type({gid}, Type)"
                ))
                if results:
                    types.append({
                        "gesture_id": gid,
                        "type": str(results[0].get("Type", "unknown")),
                    })
                else:
                    types.append({"gesture_id": gid, "type": "unknown"})
            except Exception:
                types.append({"gesture_id": gid, "type": "error"})
        return types

    def _term_to_string(self, term: Any) -> str:
        """Convert a pyswip term to a readable string.

        Cleans up pyswip's Functor representation and unwraps entity() wrappers.
        """
        import re

        if isinstance(term, (int, float)):
            return str(term)
        if isinstance(term, list):
            flat = _flatten_cons_cells(term)
            if isinstance(flat, list):
                parts = [self._term_to_string(t) for t in flat]
                return f"[{', '.join(parts)}]"
            return self._term_to_string(flat)
        # Handle pyswip Functor objects
        if hasattr(term, 'name') and hasattr(term, 'args'):
            name = str(term.name)
            if name == 'entity' and term.args and len(list(term.args)) == 1:
                return str(list(term.args)[0])
            if term.args:
                args_str = ", ".join(self._term_to_string(a) for a in term.args)
                return f"{name}({args_str})"
            return name
        # String cleanup: pyswip sometimes returns stringified Functor refs
        s = str(term)
        # Clean up Functor(id,arity,name) → name
        s = re.sub(r'Functor\(\d+,\d+,(\w+)\)', r'\1', s)
        return s

    # ------------------------------------------------------------------
    # Chomsky Hierarchy (Partee et al., Ch 16)
    # ------------------------------------------------------------------

    def _get_grammar_capabilities_sync(self) -> dict[str, Any]:
        """Get MLAF's formal grammar classification report."""
        try:
            components: list[dict[str, Any]] = []
            results = list(self._prolog.query(
                "chomsky_hierarchy:grammar_class(Component, Type)"
            ))
            for r in results:
                comp = str(r.get("Component", ""))
                typ = str(r.get("Type", ""))
                level_results = list(self._prolog.query(
                    f"chomsky_hierarchy:chomsky_level({typ}, Level)"
                ))
                level = int(level_results[0].get("Level", -1)) if level_results else -1
                components.append({
                    "component": comp,
                    "chomsky_type": typ,
                    "level": level,
                })

            # Classify by level
            by_level: dict[str, list[str]] = {
                "type_3_regular": [],
                "type_2_context_free": [],
                "type_1_context_sensitive": [],
            }
            for c in components:
                ct = c["chomsky_type"]
                if ct in by_level:
                    by_level[ct].append(c["component"])

            return {
                "components": components,
                "summary": {
                    "total": len(components),
                    "regular_count": len(by_level["type_3_regular"]),
                    "context_free_count": len(by_level["type_2_context_free"]),
                    "context_sensitive_count": len(by_level["type_1_context_sensitive"]),
                },
                "by_level": by_level,
                "overall_power": "mildly_context_sensitive",
                "note": "Natural languages are mildly context-sensitive (Joshi 1985). "
                        "MLAF uses CFG base + context-sensitive feature checking.",
            }
        except Exception as exc:
            logger.warning("Grammar capabilities query failed: %s", exc)
            return {"error": str(exc)}

    # ------------------------------------------------------------------
    # Helpers
    # ------------------------------------------------------------------

    def _to_prolog_list(self, ids: list[str]) -> str:
        items = ", ".join(ids)
        return f"[{items}]"

    def _get_category(self, gid: str) -> str | None:
        results = list(self._prolog.query(f"lexicon:lex({gid}, _, Cat, _)"))
        if results:
            return str(results[0].get("Cat", ""))
        return None

    def _find_verb(self, gesture_ids: list[str]) -> str | None:
        for gid in gesture_ids:
            if self._get_category(gid) == "v":
                return gid
        return None

    def _build_lex_entry(self, gid: str, cat: str) -> dict[str, Any]:
        results = list(self._prolog.query(f"lexicon:lex({gid}, Form, _, _)"))
        form = str(results[0].get("Form", "")) if results else gid
        return {
            "grammar_id": gid,
            "category": cat,
            "phonological_form": form,
        }

    def _categorize_ids(self, gesture_ids: list[str]) -> list[str]:
        categories = []
        for gid in gesture_ids:
            cat = self._get_category(gid)
            if cat == "d":
                categories.append("subj")
            elif cat == "v":
                categories.append("verb")
            elif cat == "n":
                categories.append("obj")
            else:
                categories.append("unknown")
        return categories

    def _resolve_tense(self, gesture_ids: list[str]) -> str:
        for gid in gesture_ids:
            results = list(self._prolog.query(
                f"lexicon:lex({gid}, _, v, Feats), member(tense=T, Feats)"
            ))
            if results:
                return str(results[0].get("T", "present"))
        return "present"