#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ أَمْر بَصَر — PERCEPTION ENGINE بِسْمِ اللَّهِ الرَّحْمَٰنِ الرَّحِيمِ Root: ب-ص-ر — to see, perceive, discern Q67:4 يَنقَلِبْ إِلَيْكَ ٱلْبَصَرُ — the vision returns to you The بَصَر perceives. It understands what the user MEANS. Input layer of the أَمْر AI. Takes raw user input and produces structured intent that the عَقْل can reason about and the نُطْق can articulate. Functions: perceive(user_input) → structured intent + parameters decompose(complex_query) → ordered sub-queries detect_root(word, lang) → root_id via DB + shift reversal track_context(history) → current focus root/topic classify_input(text) → input type classification """ import sys import os import re from collections import defaultdict sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) try: from uslap_db_connect import connect as _connect _HAS_DB = True except ImportError: _HAS_DB = False try: from amr_aql import ( deduce_meaning, reverse_trace, expand_root, relate_roots, hypothesise, verify_candidate, extract_consonants ) _HAS_AQL = True except ImportError: _HAS_AQL = False from amr_alphabet import ABJAD # ═══════════════════════════════════════════════════════════════════════ # INPUT TYPE CLASSIFICATION # ═══════════════════════════════════════════════════════════════════════ # Intent patterns — what the user is asking for INTENT_PATTERNS = { # Root operations 'explain_root': [ r'explain\s+(?:root\s+)?([A-Z]\d+|[\u0621-\u064A][\-\u0621-\u064A]+)', r'what\s+(?:does|is)\s+(?:root\s+)?([A-Z]\d+|[\u0621-\u064A][\-\u0621-\u064A]+)', r'tell\s+me\s+about\s+(?:root\s+)?([A-Z]\d+|[\u0621-\u064A][\-\u0621-\u064A]+)', ], 'trace_word': [ r'trace\s+(?:the\s+word\s+)?["\']?(\w+)["\']?', r'where\s+does\s+["\']?(\w+)["\']?\s+come\s+from', r'root\s+of\s+["\']?(\w+)["\']?', r'find\s+root\s+(?:for\s+)?["\']?(\w+)["\']?', ], 'compare_roots': [ r'compare\s+([\u0621-\u064A][\-\u0621-\u064A]+)\s+(?:and|vs|with|to)\s+([\u0621-\u064A][\-\u0621-\u064A]+)', r'relate\s+([\u0621-\u064A][\-\u0621-\u064A]+)\s+(?:and|to|with)\s+([\u0621-\u064A][\-\u0621-\u064A]+)', r'([\u0621-\u064A][\-\u0621-\u064A]+)\s+vs\s+([\u0621-\u064A][\-\u0621-\u064A]+)', ], 'search_lattice': [ r'search\s+(?:for\s+)?["\']?(.+?)["\']?$', r'find\s+(?:entry\s+)?["\']?(.+?)["\']?$', r'look\s+up\s+["\']?(.+?)["\']?$', ], 'get_entry': [ r'(?:show|get|display)\s+entry\s+([A-Z]{2}\d+)', r'entry\s+([A-Z]{2}\d+)', ], 'lattice_state': [ r'(?:show\s+)?(?:lattice\s+)?state', r'(?:show\s+)?(?:lattice\s+)?summary', r'how\s+many', r'current\s+state', ], 'report': [ r'(?:generate|create|make)\s+(?:a\s+)?report\s+(?:for\s+)?([A-Z]\d+|[\u0621-\u064A][\-\u0621-\u064A]+)', r'intelligence\s+(?:on|for|about)\s+([A-Z]\d+|[\u0621-\u064A][\-\u0621-\u064A]+)', ], # Computational fabric — full cross-domain root scan 'fabric_root': [ r'fabric\s+(.+)', r'computational[_\s]?fabric\s+(.+)', ], # Domain reasoning intents 'explain_body': [ r'(?:what\s+(?:does|is)\s+)?(?:root\s+)?(.+?)\s+(?:in|for)\s+(?:the\s+)?body', r'body\s+(?:of|for)\s+(.+)', r'(?:which|what)\s+(?:root|organ|system)\s+governs?\s+(?:the\s+)?(.+)', r'(?:heal|cure|therapy)\s+(?:for\s+)?(.+)', ], 'body_system': [ r'(?:show|explain|describe)\s+(?:the\s+)?(?:body\s+)?(heart|nafs|sensory|skeletal|nutrition|prayer|lifecycle|therapy|architecture|diagnostic)\s*(?:system)?', r'(heart|nafs|sensory|skeletal|nutrition|prayer|lifecycle)\s+(?:system|lattice|map)', ], 'explain_formula': [ r'(?:what\s+)?formula(?:s)?\s+(?:for|of|using)\s+(.+)', r'(?:show|explain)\s+formula\s+(.+)', r'ratio(?:s)?\s+(?:for|of|in)\s+(.+)', ], 'explain_history': [ r'(?:when|how)\s+was\s+(.+?)\s+deployed', r'timeline\s+(?:of|for)\s+(.+)', r'(?:show|explain)\s+era\s+(\d+)', r'(?:deployment|history)\s+(?:of|for)\s+(.+)', ], 'naming_op': [ r'(?:how\s+was\s+)?(.+?)\s+renamed', r'naming\s+(?:operation|inversion)\s+(?:of|for)\s+(.+)', r'(?:original|real)\s+name\s+(?:of|for)\s+(.+)', ], 'explain_intel': [ r'(?:what\s+)?(?:intelligence|intel)\s+(?:on|for|about)?\s*(.+)', r'confession(?:s)?\s+(?:about|for|on)?\s*(.+)', r'extraction\s+(?:of|for|in)?\s*(.+)', r'(?:who|what)\s+confessed\s+(?:about\s+)?(.+)', ], 'batch_operation': [ r'batch\s+(.+)', r'process\s+all\s+(.+)', ], # QUF operations 'quf_validate': [ r'quf\s+(?:validate\s+)?(?:entry\s+)?(\d+)', r'validate\s+(?:entry\s+)?(\d+)', r'quf\s+(\w+)\s+(\d+)', r'quf\s+([\w_]+)', ], 'quf_status': [ r'quf\s+status', r'quf\s+coverage', r'coverage', ], # Detection patterns 'explain_detection': [ r'(?:what\s+is\s+)?(?:detection\s+pattern\s+)?(DP\d+)', r'(?:explain\s+)?(DP\d+)', r'detection\s+(?:pattern\s+)?(.+)', ], # Keywords 'explain_keyword': [ r'keyword\s+([\u0621-\u064A]+)', r'(?:explain\s+)?keyword\s+(\w+)', ], # Bitig tasrif MUST come before tasrif (longer prefix match) 'bitig_tasrif': [ r'bitig\s+tasrif\s+status', r'bitig\s+tasrif\s+pattern\s+(\w+)', r'bitig\s+tasrif\s+harmony\s+(.+)', r'bitig\s+tasrif\s+compound\s+(.+)', r'bitig\s+tasrif\s+analyze\s+(.+)', r'bitig\s+tasrif\s+(\w+)', ], # AA Tasrif (must NOT match "bitig tasrif") 'tasrif': [ r'(? 50 else 'MEDIUM' else: result['confidence'] = 'LOW' return result def _detect_language(word): """Auto-detect language from script.""" if re.match(r'^[\u0621-\u064A\u0640-\u065F]+$', word): return 'ar' if re.match(r'^[а-яА-ЯёЁ]+$', word): return 'ru' if re.match(r'^[\u0600-\u06FF]+$', word): return 'fa' # Could also be Arabic — FA has same script range return 'en' # ═══════════════════════════════════════════════════════════════════════ # CONTEXT TRACKER — maintain focus across a session # ═══════════════════════════════════════════════════════════════════════ class ContextTracker: """Tracks the user's focus root/topic across a session. Maintains: - Current focus root(s) - Recent queries - Related roots discovered - Pending operations """ def __init__(self): self.focus_roots = [] # stack of root_ids/letters currently in focus self.recent_queries = [] # last N queries self.related_roots = set() # roots discovered during session self.pending_ops = [] # operations waiting for user confirmation self.max_history = 20 def update(self, perception_result): """Update context with a new perception result. Args: perception_result: dict from perceive() """ self.recent_queries.append({ 'input': perception_result['raw_input'], 'intent': perception_result['intent'], 'params': perception_result['params'], }) if len(self.recent_queries) > self.max_history: self.recent_queries.pop(0) # Update focus roots enriched = perception_result.get('enriched', {}) root_id = enriched.get('root_id') root_letters = enriched.get('root_letters') if root_id: if root_id not in self.focus_roots: self.focus_roots.append(root_id) if len(self.focus_roots) > 5: self.focus_roots.pop(0) if root_letters: self.related_roots.add(root_letters) def get_current_focus(self): """Get the current focus root.""" return self.focus_roots[-1] if self.focus_roots else None def get_context_summary(self): """Get a summary of current context.""" return { 'focus_root': self.get_current_focus(), 'focus_history': list(self.focus_roots), 'query_count': len(self.recent_queries), 'related_roots': list(self.related_roots), 'pending_ops': len(self.pending_ops), } def suggest_next(self): """Suggest what the user might want to do next. Based on current context, suggest relevant follow-up operations. """ suggestions = [] focus = self.get_current_focus() if focus: suggestions.append(f"expand {focus} — view full downstream tree") suggestions.append(f"report {focus} — generate intelligence report") if len(self.focus_roots) >= 2: suggestions.append( f"compare {self.focus_roots[-1]} {self.focus_roots[-2]} — structural comparison" ) if not focus: suggestions.append("Type any word to trace its root") suggestions.append("Type a root (e.g. ك-ف-ر) to explain it") return suggestions # Global tracker instance _tracker = ContextTracker() def track_context(perception_result): """Update the global context tracker.""" _tracker.update(perception_result) return _tracker.get_context_summary() def get_context(): """Get current context state.""" return _tracker.get_context_summary() def suggest_next(): """Get suggestions for next action.""" return _tracker.suggest_next() # ═══════════════════════════════════════════════════════════════════════ # CLI INTERFACE # ═══════════════════════════════════════════════════════════════════════ def main(): if len(sys.argv) < 2: print("أَمْر بَصَر — Perception Engine") print() print("Usage:") print(" python3 amr_basar.py perceive 'explain ك-ف-ر' # full perception") print(" python3 amr_basar.py classify 'cover' # classify input") print(" python3 amr_basar.py detect cover # detect root") print(" python3 amr_basar.py detect cover en # detect with language") print(" python3 amr_basar.py decompose 'trace X and Y' # decompose query") sys.exit(0) cmd = sys.argv[1] arg = ' '.join(sys.argv[2:]) if len(sys.argv) > 2 else '' if cmd == 'perceive': result = perceive(arg) print(f"\nPERCEPTION: '{arg}'") print(f" INTENT: {result['intent']}") print(f" PARAMS: {result['params']}") print(f" CONFIDENCE: {result['confidence']}") if result['enriched']: print(f" ENRICHED: {result['enriched']}") if result['sub_queries']: print(f" SUB-QUERIES: {len(result['sub_queries'])}") elif cmd == 'classify': result = classify_input(arg) print(f"\nCLASSIFICATION: '{arg}'") print(f" INTENT: {result['intent']}") print(f" PARAMS: {result['params']}") print(f" CONFIDENCE: {result['confidence']}") elif cmd == 'detect': parts = arg.split() word = parts[0] if parts else '' lang = parts[1] if len(parts) > 1 else 'auto' result = detect_root(word, lang) print(f"\nROOT DETECTION: '{word}' ({result['language']})") print(f" ROOT_ID: {result['root_id']}") print(f" ROOT: {result['root_letters']}") print(f" SOURCE: {result['source']}") print(f" CONFIDENCE: {result['confidence']}") if result['shift_chain']: print(f" CHAIN: {' | '.join(str(s) for s in result['shift_chain'])}") elif cmd == 'decompose': results = decompose(arg) print(f"\nDECOMPOSITION: '{arg}'") print(f" SUB-QUERIES: {len(results)}") for i, sq in enumerate(results): print(f" [{i+1}] {sq['intent']} ({sq['confidence']}): {sq['params']}") else: print(f"Unknown command: {cmd}") sys.exit(1) # ═══════════════════════════════════════════════════════════════════════ # QUF GATE — Called by amr_quf.py router # ═══════════════════════════════════════════════════════════════════════ def detection_quf(data: dict) -> dict: """ DETECTION QUF — L9. Handles multiple table schemas: - qv_translation_register: ROOT, CORRUPTION_TYPE, CORRECT_TRANSLATION, COMMON_MISTRANSLATION - dp_register: dp_code, name, class, mechanism, qur_anchor - disputed_words: various columns - contamination_blacklist: contaminated_term, contaminated_translation - phonetic_reversal: various columns """ GRADE_ORDER = {'HIGH': 4, 'MEDIUM': 3, 'LOW': 2, 'FAIL': 1, 'PENDING': 0} # Unified field extraction across table schemas root = (data.get('ROOT', '') or data.get('root', '') or data.get('root_letters', '') or '') corruption_type = (data.get('CORRUPTION_TYPE', '') or data.get('corruption_type', '') or data.get('class', '') or '') correct = (data.get('CORRECT_TRANSLATION', '') or data.get('correct_form', '') or data.get('mechanism', '') or '') wrong = (data.get('COMMON_MISTRANSLATION', '') or data.get('corrupted_form', '') or data.get('contaminated_translation', '') or '') ayat_count = data.get('AYAT_COUNT', 0) or data.get('ayat_count', 0) or 0 dp_id = (data.get('dp_id', '') or data.get('DP_ID', '') or data.get('dp_code', '') or '') qur_anchor = (data.get('qur_anchor', '') or data.get('qur_ref', '') or '') name = data.get('name', '') or data.get('contaminated_term', '') or '' status = data.get('status', '') or '' example = data.get('example', '') or '' # Q: evidence counted q_items = sum([bool(root) or bool(name), bool(corruption_type), int(ayat_count) > 0 or bool(qur_anchor), bool(dp_id)]) q = 'HIGH' if q_items >= 3 else ('MEDIUM' if q_items >= 2 else ('LOW' if q_items >= 1 else 'FAIL')) q_ev = [f'root/name={bool(root or name)}, type={str(corruption_type)[:20]}, qur={bool(qur_anchor)}, dp={dp_id}'] # U: pattern documented with examples/mechanism valid_types = {'ROOT_FLATTENED', 'ACTION_TO_ETHNIC', 'ATTRIBUTE_TO_GENERIC', 'SCOPE_NARROWED', 'ROOT_REPLACED', 'ROOT_INVERTED', 'LINGUISTIC', 'CIVILISATION', 'COVENANTAL'} type_valid = any(vt in str(corruption_type).upper() for vt in valid_types) if corruption_type else False has_mechanism = bool(correct) or bool(example) confirmed = str(status).upper() == 'CONFIRMED' if (type_valid and has_mechanism) or confirmed: u = 'HIGH' elif type_valid or has_mechanism: u = 'MEDIUM' else: u = 'LOW' u_ev = [f'Valid type: {type_valid}, mechanism: {has_mechanism}, confirmed: {confirmed}'] # F: verifiable — qur_anchor or washed≠corrupted or distinct_from documented distinct = data.get('distinct_from', '') or '' if qur_anchor and (correct or distinct): f = 'HIGH' f_ev = [f'Quranic anchor + mechanism/distinction documented'] elif correct and wrong and str(correct).strip() != str(wrong).strip(): f = 'HIGH' f_ev = [f'Washed ({str(correct)[:20]}) != corrupted ({str(wrong)[:20]})'] elif qur_anchor or correct: f = 'MEDIUM' f_ev = [f'Partial: qur_anchor={bool(qur_anchor)}, mechanism={bool(correct)}'] else: f = 'LOW' f_ev = [f'No Quranic anchor or mechanism documented'] passes = all(GRADE_ORDER.get(g, 0) >= 3 for g in [q, u, f]) return { 'q': q, 'u': u, 'f': f, 'pass': passes, 'q_evidence': q_ev, 'u_evidence': u_ev, 'f_evidence': f_ev, } def blacklist_quf(data: dict) -> dict: """QUF for contamination_blacklist — L9. These entries ARE the contamination register. They CONTAIN banned terms by design. Self-auditing: verifies the correction exists and the contamination is documented.""" GRADE_ORDER = {'HIGH': 4, 'MEDIUM': 3, 'LOW': 2, 'FAIL': 1, 'PENDING': 0} term = data.get('contaminated_term', '') or '' wrong = data.get('contaminated_translation', '') or '' correct = data.get('correct_translation', '') or '' source = data.get('source_of_correction', '') or '' why = data.get('why_contaminated', '') or '' # Q: contaminated term + both translations documented q = 'HIGH' if (term and wrong and correct) else ('MEDIUM' if (term and correct) else 'LOW') q_ev = [f'term={bool(term)}, wrong={bool(wrong)}, correct={bool(correct)}'] # U: correction source documented u = 'HIGH' if source else ('MEDIUM' if correct else 'LOW') u_ev = [f'Source of correction: {bool(source)}'] # F: WHY it's contaminated is documented (falsifiable claim) f = 'HIGH' if (why and correct) else ('MEDIUM' if why else 'LOW') f_ev = [f'Why contaminated: {bool(why)}, correction: {bool(correct)}'] passes = all(GRADE_ORDER.get(g, 0) >= 3 for g in [q, u, f]) return { 'q': q, 'u': u, 'f': f, 'pass': passes, 'q_evidence': q_ev, 'u_evidence': u_ev, 'f_evidence': f_ev, } def reversal_quf(data: dict) -> dict: """QUF for phonetic_reversal — L9. These entries document phonetic shift patterns. They CONTAIN reversed forms by design. Self-auditing: verifies shift is attested in actual entries.""" GRADE_ORDER = {'HIGH': 4, 'MEDIUM': 3, 'LOW': 2, 'FAIL': 1, 'PENDING': 0} shift_code = data.get('shift_code', '') or '' from_modern = data.get('from_modern', '') or '' to_orig = data.get('to_orig', '') or '' mechanism = data.get('mechanism', '') or '' example = data.get('attested_example', '') or '' reliability = data.get('reliability', '') or '' status = data.get('status', '') or '' # Q: shift code + both endpoints documented q = 'HIGH' if (shift_code and from_modern and to_orig) else ('MEDIUM' if shift_code else 'LOW') q_ev = [f'shift={shift_code}, from={from_modern}, to_orig={bool(to_orig)}'] # U: mechanism + attested example (cross-references actual data) u = 'HIGH' if (mechanism and example) else ('MEDIUM' if mechanism else 'LOW') u_ev = [f'mechanism={bool(mechanism)}, example={bool(example)}'] # F: reliability assessed + status confirmed confirmed = str(status).upper() == 'CONFIRMED' reliable = str(reliability).upper() in ('HIGH', 'MEDIUM') f = 'HIGH' if (confirmed and reliable) else ('MEDIUM' if confirmed or reliable else 'LOW') f_ev = [f'reliability={reliability}, status={status}'] passes = all(GRADE_ORDER.get(g, 0) >= 3 for g in [q, u, f]) return { 'q': q, 'u': u, 'f': f, 'pass': passes, 'q_evidence': q_ev, 'u_evidence': u_ev, 'f_evidence': f_ev, } if __name__ == "__main__": main()