Spaces:
Sleeping
Sleeping
| #!/usr/bin/env python3 | |
| """ | |
| USLaP Database Access Layer | |
| Python module for accessing the USLaP SQLite lattice database. | |
| This module provides: | |
| 1. Database connection management with context managers | |
| 2. CRUD operations for all major tables | |
| 3. Advanced queries for cluster expansion using word_fingerprints | |
| 4. Session and engine queue management | |
| 5. Utility functions for working with the database | |
| بِسْمِ اللَّهِ الرَّحْمَٰنِ الرَّحِيمِ | |
| """ | |
| import sqlite3 | |
| import json | |
| from datetime import datetime | |
| from typing import Optional, List, Dict, Any, Tuple, Union | |
| from pathlib import Path | |
| import logging | |
| # Configure logging | |
| logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') | |
| logger = logging.getLogger(__name__) | |
| # ============================================================================ | |
| # DATABASE CONNECTION MANAGEMENT | |
| # ============================================================================ | |
| DEFAULT_DB_PATH = "Code_files/uslap_lattice.db" | |
| class DatabaseConnection: | |
| """Context manager for database connections with automatic cleanup.""" | |
| def __init__(self, db_path: str = DEFAULT_DB_PATH): | |
| self.db_path = db_path | |
| self.conn = None | |
| self.cursor = None | |
| def __enter__(self): | |
| self.conn = sqlite3.connect(self.db_path) | |
| self.conn.row_factory = sqlite3.Row # Return rows as dictionaries | |
| self.cursor = self.conn.cursor() | |
| # Enable foreign keys | |
| self.cursor.execute("PRAGMA foreign_keys = ON") | |
| # Register extract_consonants UDF if needed | |
| self._register_udfs() | |
| return self | |
| def __exit__(self, exc_type, exc_val, exc_tb): | |
| if self.conn: | |
| if exc_type is None: | |
| self.conn.commit() | |
| else: | |
| self.conn.rollback() | |
| self.conn.close() | |
| def _register_udfs(self): | |
| """Register Python UDFs with SQLite.""" | |
| def extract_consonants(word: str) -> str: | |
| """ | |
| Python UDF for SQLite: Extract consonant skeleton from a word. | |
| Must match the implementation in migrate_to_sqlite.py and USLaP_Engine.py. | |
| """ | |
| if not word: | |
| return "" | |
| vowels = set('aeiou') | |
| result = [] | |
| i = 0 | |
| word_lower = word.lower() | |
| while i < len(word_lower): | |
| digraph = word_lower[i:i+2] if i + 1 < len(word_lower) else '' | |
| if digraph in ('sh', 'ch', 'gh', 'th', 'ph', 'wh', 'qu'): | |
| result.append(digraph) | |
| i += 2 | |
| elif word_lower[i] not in vowels: | |
| result.append(word_lower[i]) | |
| i += 1 | |
| else: | |
| i += 1 | |
| return ''.join(result) | |
| self.conn.create_function("extract_consonants", 1, extract_consonants) | |
| def get_connection(db_path: str = DEFAULT_DB_PATH) -> DatabaseConnection: | |
| """Get a database connection context manager.""" | |
| return DatabaseConnection(db_path) | |
| # ============================================================================ | |
| # CORE CRUD OPERATIONS | |
| # ============================================================================ | |
| class EntryOperations: | |
| """Operations for the entries table.""" | |
| def get_entry(entry_id: int, conn: DatabaseConnection) -> Optional[Dict]: | |
| """Get a single entry by ID.""" | |
| with conn.cursor as cursor: | |
| cursor.execute("SELECT * FROM entries WHERE entry_id = ?", (entry_id,)) | |
| row = cursor.fetchone() | |
| return dict(row) if row else None | |
| def get_entries_by_root(root_id: str, conn: DatabaseConnection, limit: int = 100) -> List[Dict]: | |
| """Get all entries for a specific root.""" | |
| with conn.cursor as cursor: | |
| cursor.execute(""" | |
| SELECT * FROM entries | |
| WHERE root_id = ? | |
| ORDER BY score DESC, entry_id | |
| LIMIT ? | |
| """, (root_id, limit)) | |
| return [dict(row) for row in cursor.fetchall()] | |
| def search_entries(search_term: str, conn: DatabaseConnection, limit: int = 50) -> List[Dict]: | |
| """Search entries using full-text search.""" | |
| with conn.cursor as cursor: | |
| cursor.execute(""" | |
| SELECT e.* | |
| FROM entries_fts fts | |
| JOIN entries e ON fts.entry_id = e.entry_id | |
| WHERE entries_fts MATCH ? | |
| ORDER BY rank | |
| LIMIT ? | |
| """, (search_term, limit)) | |
| return [dict(row) for row in cursor.fetchall()] | |
| def get_high_score_entries(conn: DatabaseConnection, min_score: int = 8, limit: int = 20) -> List[Dict]: | |
| """Get entries with high confidence scores.""" | |
| with conn.cursor as cursor: | |
| cursor.execute(""" | |
| SELECT * FROM entries | |
| WHERE score >= ? | |
| ORDER BY score DESC, entry_id | |
| LIMIT ? | |
| """, (min_score, limit)) | |
| return [dict(row) for row in cursor.fetchall()] | |
| def create_entry(entry_data: Dict, conn: DatabaseConnection) -> int: | |
| """Create a new entry.""" | |
| required_fields = ['score', 'en_term', 'ar_word', 'root_letters'] | |
| for field in required_fields: | |
| if field not in entry_data: | |
| raise ValueError(f"Missing required field: {field}") | |
| with conn.cursor as cursor: | |
| cursor.execute(""" | |
| INSERT INTO entries ( | |
| score, en_term, ru_term, fa_term, ar_word, root_id, root_letters, | |
| qur_meaning, qur_refs, pattern, inversion_type, allah_name_id, | |
| network_id, phonetic_chain, source_form, ds_corridor, decay_level, | |
| dp_codes, ops_applied, foundation_refs, notes | |
| ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) | |
| """, ( | |
| entry_data.get('score'), | |
| entry_data.get('en_term'), | |
| entry_data.get('ru_term'), | |
| entry_data.get('fa_term'), | |
| entry_data.get('ar_word'), | |
| entry_data.get('root_id'), | |
| entry_data.get('root_letters'), | |
| entry_data.get('qur_meaning'), | |
| entry_data.get('qur_refs'), | |
| entry_data.get('pattern'), | |
| entry_data.get('inversion_type'), | |
| entry_data.get('allah_name_id'), | |
| entry_data.get('network_id'), | |
| entry_data.get('phonetic_chain'), | |
| entry_data.get('source_form'), | |
| entry_data.get('ds_corridor'), | |
| entry_data.get('decay_level'), | |
| entry_data.get('dp_codes'), | |
| entry_data.get('ops_applied'), | |
| entry_data.get('foundation_refs'), | |
| entry_data.get('notes') | |
| )) | |
| return cursor.lastrowid | |
| class RootOperations: | |
| """Operations for the roots table.""" | |
| def get_root(root_id: str, conn: DatabaseConnection) -> Optional[Dict]: | |
| """Get a single root by ID.""" | |
| with conn.cursor as cursor: | |
| cursor.execute("SELECT * FROM roots WHERE root_id = ?", (root_id,)) | |
| row = cursor.fetchone() | |
| return dict(row) if row else None | |
| def get_root_by_letters(root_letters: str, conn: DatabaseConnection) -> Optional[Dict]: | |
| """Get a root by its letters.""" | |
| with conn.cursor as cursor: | |
| cursor.execute("SELECT * FROM roots WHERE root_letters = ?", (root_letters,)) | |
| row = cursor.fetchone() | |
| return dict(row) if row else None | |
| def get_all_roots(conn: DatabaseConnection, limit: int = 1000) -> List[Dict]: | |
| """Get all roots.""" | |
| with conn.cursor as cursor: | |
| cursor.execute(""" | |
| SELECT * FROM roots | |
| ORDER BY root_id | |
| LIMIT ? | |
| """, (limit,)) | |
| return [dict(row) for row in cursor.fetchall()] | |
| def get_roots_with_entries(conn: DatabaseConnection, min_entries: int = 1) -> List[Dict]: | |
| """Get roots that have at least N entries.""" | |
| with conn.cursor as cursor: | |
| cursor.execute(""" | |
| SELECT r.*, COUNT(e.entry_id) as entry_count | |
| FROM roots r | |
| LEFT JOIN entries e ON r.root_id = e.root_id | |
| GROUP BY r.root_id | |
| HAVING COUNT(e.entry_id) >= ? | |
| ORDER BY entry_count DESC | |
| """, (min_entries,)) | |
| return [dict(row) for row in cursor.fetchall()] | |
| class ChildEntryOperations: | |
| """Operations for the CHILD schema tables.""" | |
| def get_child_entry(child_id: str, conn: DatabaseConnection) -> Optional[Dict]: | |
| """Get a single child entry by ID.""" | |
| with conn.cursor as cursor: | |
| cursor.execute("SELECT * FROM child_entries WHERE child_id = ?", (child_id,)) | |
| row = cursor.fetchone() | |
| return dict(row) if row else None | |
| def get_child_entries_by_operation(parent_op: str, conn: DatabaseConnection) -> List[Dict]: | |
| """Get child entries by parent operation.""" | |
| with conn.cursor as cursor: | |
| cursor.execute(""" | |
| SELECT * FROM child_entries | |
| WHERE parent_op = ? | |
| ORDER BY child_id | |
| """, (parent_op,)) | |
| return [dict(row) for row in cursor.fetchall()] | |
| def get_child_entry_links(child_id: str, conn: DatabaseConnection) -> List[Dict]: | |
| """Get all links for a child entry.""" | |
| with conn.cursor as cursor: | |
| cursor.execute(""" | |
| SELECT l.*, e.en_term, e.ar_word | |
| FROM child_entry_links l | |
| JOIN entries e ON l.entry_id = e.entry_id | |
| WHERE l.child_id = ? | |
| ORDER BY l.confidence DESC | |
| """, (child_id,)) | |
| return [dict(row) for row in cursor.fetchall()] | |
| # ============================================================================ | |
| # ADVANCED SEARCH OPERATIONS (CRITICAL FOR CLUSTER EXPANSION) | |
| # ============================================================================ | |
| class PhoneticSearchOperations: | |
| """Operations for phonetic search using word_fingerprints table.""" | |
| def find_similar_words(word: str, conn: DatabaseConnection, language: str = 'en', | |
| max_distance: int = 1, limit: int = 20) -> List[Dict]: | |
| """ | |
| Find words with similar consonant skeletons (O(log n) cluster expansion). | |
| This is the core function that makes cluster expansion instant. | |
| It uses the word_fingerprints table with indexed consonant_skeleton column. | |
| """ | |
| # First, extract consonant skeleton from the input word | |
| with conn.cursor as cursor: | |
| cursor.execute("SELECT extract_consonants(?) as skeleton", (word,)) | |
| result = cursor.fetchone() | |
| if not result or not result['skeleton']: | |
| return [] | |
| skeleton = result['skeleton'] | |
| # Find words with the same or similar skeleton | |
| # For exact matches: same skeleton | |
| # For similar matches: skeletons that are edit distance <= max_distance | |
| # We'll start with exact matches for speed | |
| cursor.execute(""" | |
| SELECT wf.*, | |
| e.en_term, e.ru_term, e.fa_term, e.ar_word, | |
| e.entry_id, e.score, e.root_id, e.root_letters | |
| FROM word_fingerprints wf | |
| LEFT JOIN entries e ON wf.entry_id = e.entry_id | |
| WHERE wf.consonant_skeleton = ? AND wf.language = ? | |
| ORDER BY e.score DESC | |
| LIMIT ? | |
| """, (skeleton, language, limit)) | |
| return [dict(row) for row in cursor.fetchall()] | |
| def expand_cluster_by_root(root_id: str, conn: DatabaseConnection) -> List[Dict]: | |
| """ | |
| Expand a cluster by finding all entries with the same root. | |
| This is the traditional semantic cluster expansion. | |
| """ | |
| with conn.cursor as cursor: | |
| cursor.execute(""" | |
| SELECT e.*, r.root_letters, r.quran_tokens | |
| FROM entries e | |
| JOIN roots r ON e.root_id = r.root_id | |
| WHERE e.root_id = ? | |
| ORDER BY e.score DESC | |
| """, (root_id,)) | |
| return [dict(row) for row in cursor.fetchall()] | |
| def phonetic_cluster_expansion(word: str, conn: DatabaseConnection, | |
| similarity_threshold: float = 0.7) -> List[Dict]: | |
| """ | |
| Advanced phonetic cluster expansion using multiple strategies. | |
| 1. Exact consonant skeleton match | |
| 2. Similar skeletons (edit distance) | |
| 3. Related roots via phonetic shifts | |
| 4. Network connections | |
| """ | |
| results = [] | |
| # Strategy 1: Exact skeleton match | |
| exact_matches = PhoneticSearchOperations.find_similar_words( | |
| word, 'en', conn, max_distance=0, limit=50 | |
| ) | |
| results.extend(exact_matches) | |
| # Strategy 2: Find via network connections if we have an entry | |
| if exact_matches and exact_matches[0].get('entry_id'): | |
| entry_id = exact_matches[0]['entry_id'] | |
| with conn.cursor as cursor: | |
| # Get network connections | |
| cursor.execute(""" | |
| SELECT e2.* | |
| FROM entries e1 | |
| JOIN cross_refs cr ON e1.entry_id = cr.from_entry_id | |
| JOIN entries e2 ON cr.to_entry_id = e2.entry_id | |
| WHERE e1.entry_id = ? AND cr.link_type IN ('SAME_ROOT', 'NETWORK', 'PHONETIC') | |
| UNION | |
| SELECT e2.* | |
| FROM entries e1 | |
| JOIN cross_refs cr ON e1.entry_id = cr.to_entry_id | |
| JOIN entries e2 ON cr.from_entry_id = e2.entry_id | |
| WHERE e1.entry_id = ? AND cr.link_type IN ('SAME_ROOT', 'NETWORK', 'PHONETIC') | |
| """, (entry_id, entry_id)) | |
| network_results = [dict(row) for row in cursor.fetchall()] | |
| results.extend(network_results) | |
| # Deduplicate by entry_id | |
| seen = set() | |
| deduplicated = [] | |
| for item in results: | |
| entry_id = item.get('entry_id') | |
| if entry_id and entry_id not in seen: | |
| seen.add(entry_id) | |
| deduplicated.append(item) | |
| return deduplicated | |
| # ============================================================================ | |
| # ENGINE CONTROL OPERATIONS | |
| # ============================================================================ | |
| class EngineQueueOperations: | |
| """Operations for the engine_queue table.""" | |
| def add_to_queue(operation_type: str, payload: Dict, conn: DatabaseConnection, source: str = 'engine', | |
| session_id: Optional[str] = None) -> int: | |
| """Add an operation to the engine queue.""" | |
| with conn.cursor as cursor: | |
| cursor.execute(""" | |
| INSERT INTO engine_queue ( | |
| operation_type, payload, source, session_id, status, priority | |
| ) VALUES (?, ?, ?, ?, 'PENDING', 5) | |
| """, (operation_type, json.dumps(payload), source, session_id)) | |
| return cursor.lastrowid | |
| def get_pending_queue_items(conn: DatabaseConnection, limit: int = 100) -> List[Dict]: | |
| """Get pending queue items.""" | |
| with conn.cursor as cursor: | |
| cursor.execute(""" | |
| SELECT * FROM engine_queue | |
| WHERE status = 'PENDING' | |
| ORDER BY priority DESC, created_at | |
| LIMIT ? | |
| """, (limit,)) | |
| return [dict(row) for row in cursor.fetchall()] | |
| def process_queue_item(queue_id: int, status: str = 'APPROVED', | |
| resolution_notes: Optional[str] = None, conn: DatabaseConnection) -> bool: | |
| """Process a queue item.""" | |
| with conn.cursor as cursor: | |
| cursor.execute(""" | |
| UPDATE engine_queue | |
| SET status = ?, processed_at = CURRENT_TIMESTAMP, resolution_notes = ? | |
| WHERE queue_id = ? | |
| """, (status, resolution_notes, queue_id)) | |
| return cursor.rowcount > 0 | |
| class SessionOperations: | |
| """Operations for the session_index table.""" | |
| def create_session(conn: DatabaseConnection, session_id: Optional[str] = None, excel_version: Optional[str] = None, | |
| initiated_by: str = 'engine') -> str: | |
| """Create a new engine session.""" | |
| if not session_id: | |
| import uuid | |
| session_id = str(uuid.uuid4()) | |
| with conn.cursor as cursor: | |
| cursor.execute(""" | |
| INSERT INTO session_index (session_id, excel_version, initiated_by) | |
| VALUES (?, ?, ?) | |
| """, (session_id, excel_version, initiated_by)) | |
| return session_id | |
| def update_session(session_id: str, conn: DatabaseConnection, status: str = 'COMPLETED', | |
| entries_processed: int = 0, queries_executed: int = 0, | |
| error_log: Optional[str] = None) -> bool: | |
| """Update session status and metrics.""" | |
| with conn.cursor as cursor: | |
| cursor.execute(""" | |
| UPDATE session_index | |
| SET status = ?, end_time = CURRENT_TIMESTAMP, | |
| entries_processed = ?, queries_executed = ?, | |
| error_log = ? | |
| WHERE session_id = ? | |
| """, (status, entries_processed, queries_executed, error_log, session_id)) | |
| return cursor.rowcount > 0 | |
| # ============================================================================ | |
| # ANALYTICS AND STATISTICS | |
| # ============================================================================ | |
| class AnalyticsOperations: | |
| """Operations for database analytics and statistics.""" | |
| def get_database_stats(conn: DatabaseConnection) -> Dict: | |
| """Get comprehensive database statistics.""" | |
| stats = {} | |
| with conn.cursor as cursor: | |
| # Table counts | |
| tables = ['entries', 'roots', 'child_entries', 'derivatives', 'cross_refs', | |
| 'quran_refs', 'networks', 'scholars', 'detection_patterns', | |
| 'word_fingerprints', 'engine_queue', 'session_index'] | |
| for table in tables: | |
| cursor.execute(f"SELECT COUNT(*) as count FROM {table}") | |
| result = cursor.fetchone() | |
| stats[f"{table}_count"] = result['count'] if result else 0 | |
| # Root productivity | |
| cursor.execute(""" | |
| SELECT AVG(entry_count) as avg_entries_per_root, | |
| MAX(entry_count) as max_entries_per_root | |
| FROM ( | |
| SELECT r.root_id, COUNT(e.entry_id) as entry_count | |
| FROM roots r | |
| LEFT JOIN entries e ON r.root_id = e.root_id | |
| GROUP BY r.root_id | |
| ) | |
| """) | |
| root_stats = cursor.fetchone() | |
| if root_stats: | |
| stats.update(dict(root_stats)) | |
| # Score distribution | |
| cursor.execute(""" | |
| SELECT score, COUNT(*) as count | |
| FROM entries | |
| GROUP BY score | |
| ORDER BY score DESC | |
| """) | |
| stats['score_distribution'] = [dict(row) for row in cursor.fetchall()] | |
| # Most productive roots | |
| cursor.execute(""" | |
| SELECT r.root_id, r.root_letters, COUNT(e.entry_id) as entry_count | |
| FROM roots r | |
| JOIN entries e ON r.root_id = e.root_id | |
| GROUP BY r.root_id | |
| ORDER BY entry_count DESC | |
| LIMIT 10 | |
| """) | |
| stats['top_roots'] = [dict(row) for row in cursor.fetchall()] | |
| return stats | |
| def get_cluster_analysis(root_id: str, conn: DatabaseConnection) -> Dict: | |
| """Get detailed analysis of a root cluster.""" | |
| analysis = {} | |
| with conn.cursor as cursor: | |
| # Get root info | |
| cursor.execute("SELECT * FROM roots WHERE root_id = ?", (root_id,)) | |
| root = cursor.fetchone() | |
| if root: | |
| analysis['root'] = dict(root) | |
| # Get entries | |
| cursor.execute(""" | |
| SELECT entry_id, en_term, ar_word, score, pattern, network_id | |
| FROM entries | |
| WHERE root_id = ? | |
| ORDER BY score DESC | |
| """, (root_id,)) | |
| analysis['entries'] = [dict(row) for row in cursor.fetchall()] | |
| # Get derivatives | |
| cursor.execute(""" | |
| SELECT d.* | |
| FROM derivatives d | |
| JOIN entries e ON d.entry_id = e.entry_id | |
| WHERE e.root_id = ? | |
| ORDER BY d.derivative_id | |
| """, (root_id,)) | |
| analysis['derivatives'] = [dict(row) for row in cursor.fetchall()] | |
| # Get cross-references | |
| cursor.execute(""" | |
| SELECT cr.*, e1.en_term as from_term, e2.en_term as to_term | |
| FROM cross_refs cr | |
| JOIN entries e1 ON cr.from_entry_id = e1.entry_id | |
| JOIN entries e2 ON cr.to_entry_id = e2.entry_id | |
| WHERE e1.root_id = ? OR e2.root_id = ? | |
| """, (root_id, root_id)) | |
| analysis['cross_references'] = [dict(row) for row in cursor.fetchall()] | |
| return analysis | |
| # ============================================================================ | |
| # HIGH-LEVEL API FUNCTIONS | |
| # ============================================================================ | |
| def search_word(word: str, db_path: str = DEFAULT_DB_PATH) -> Dict: | |
| """ | |
| High-level function to search for a word and return comprehensive results. | |
| Returns: | |
| Dictionary with: | |
| - exact_matches: Entries with exact phonetic match | |
| - similar_words: Words with similar consonant skeletons | |
| - cluster_expansion: Full cluster expansion results | |
| - root_info: Information about associated roots | |
| """ | |
| results = { | |
| 'word': word, | |
| 'exact_matches': [], | |
| 'similar_words': [], | |
| 'cluster_expansion': [], | |
| 'root_info': {} | |
| } | |
| with get_connection(db_path) as conn: | |
| # Get exact matches | |
| exact_matches = PhoneticSearchOperations.find_similar_words( | |
| word, 'en', conn, max_distance=0, limit=10 | |
| ) | |
| results['exact_matches'] = exact_matches | |
| # Get similar words (allow some distance) | |
| similar_words = PhoneticSearchOperations.find_similar_words( | |
| word, 'en', conn, max_distance=1, limit=20 | |
| ) | |
| # Filter out exact matches from similar words | |
| exact_entry_ids = {m['entry_id'] for m in exact_matches if 'entry_id' in m} | |
| results['similar_words'] = [ | |
| w for w in similar_words | |
| if w.get('entry_id') not in exact_entry_ids | |
| ] | |
| # Get cluster expansion if we have exact matches | |
| if exact_matches: | |
| # Use the first exact match for cluster expansion | |
| first_match = exact_matches[0] | |
| if first_match.get('root_id'): | |
| root_id = first_match['root_id'] | |
| # Get root info | |
| root_op = RootOperations() | |
| root_info = root_op.get_root(root_id, conn) | |
| if root_info: | |
| results['root_info'] = root_info | |
| # Get cluster expansion | |
| cluster = PhoneticSearchOperations.expand_cluster_by_root(root_id, conn) | |
| results['cluster_expansion'] = cluster | |
| # Also try phonetic cluster expansion | |
| phonetic_cluster = PhoneticSearchOperations.phonetic_cluster_expansion(word, conn) | |
| # Merge with existing results, avoiding duplicates | |
| seen_ids = {item['entry_id'] for item in results['cluster_expansion'] if 'entry_id' in item} | |
| for item in phonetic_cluster: | |
| if item.get('entry_id') and item['entry_id'] not in seen_ids: | |
| results['cluster_expansion'].append(item) | |
| seen_ids.add(item['entry_id']) | |
| return results | |
| def add_new_entry(entry_data: Dict, db_path: str = DEFAULT_DB_PATH, | |
| session_id: Optional[str] = None) -> Tuple[int, Optional[str]]: | |
| """ | |
| High-level function to add a new entry with proper queue management. | |
| Returns: | |
| Tuple of (entry_id, queue_id) if queued, or (entry_id, None) if directly inserted | |
| """ | |
| # First, check if we should use queue (based on source) | |
| source = entry_data.get('source', 'user') | |
| if source == 'engine' and session_id: | |
| # Engine operations during session go directly to database | |
| with get_connection(db_path) as conn: | |
| entry_id = EntryOperations.create_entry(entry_data, conn) | |
| return entry_id, None | |
| else: | |
| # User operations go through queue for review | |
| with get_connection(db_path) as conn: | |
| queue_ops = EngineQueueOperations() | |
| queue_id = queue_ops.add_to_queue( | |
| operation_type='PROPOSE_ENTRY', | |
| payload=entry_data, | |
| source=source, | |
| session_id=session_id, | |
| conn=conn | |
| ) | |
| return None, queue_id | |
| def run_engine_session(excel_version: Optional[str] = None, | |
| db_path: str = DEFAULT_DB_PATH) -> str: | |
| """ | |
| High-level function to run an engine session. | |
| Returns: | |
| Session ID for tracking | |
| """ | |
| with get_connection(db_path) as conn: | |
| session_ops = SessionOperations() | |
| session_id = session_ops.create_session( | |
| excel_version=excel_version, | |
| initiated_by='engine', | |
| conn=conn | |
| ) | |
| # Log session start | |
| logger.info(f"Engine session started: {session_id}") | |
| return session_id | |
| # ============================================================================ | |
| # MAIN FUNCTION FOR COMMAND LINE USE | |
| # ============================================================================ | |
| def main(): | |
| """Command-line interface for database operations.""" | |
| import argparse | |
| parser = argparse.ArgumentParser(description='USLaP Database Access Layer') | |
| parser.add_argument('--search', type=str, help='Search for a word') | |
| parser.add_argument('--stats', action='store_true', help='Get database statistics') | |
| parser.add_argument('--cluster', type=str, help='Analyze a root cluster (e.g., R001)') | |
| parser.add_argument('--test', action='store_true', help='Run basic tests') | |
| args = parser.parse_args() | |
| if args.search: | |
| print(f"Searching for: {args.search}") | |
| results = search_word(args.search) | |
| print(f"\nExact matches: {len(results['exact_matches'])}") | |
| for match in results['exact_matches'][:5]: | |
| print(f" - {match.get('en_term', 'N/A')} (score: {match.get('score', 'N/A')})") | |
| print(f"\nSimilar words: {len(results['similar_words'])}") | |
| for word in results['similar_words'][:5]: | |
| print(f" - {word.get('raw_word', 'N/A')}") | |
| print(f"\nCluster expansion: {len(results['cluster_expansion'])} entries") | |
| if results['root_info']: | |
| print(f"Root: {results['root_info'].get('root_letters', 'N/A')}") | |
| elif args.stats: | |
| with get_connection() as conn: | |
| analytics = AnalyticsOperations() | |
| stats = analytics.get_database_stats(conn) | |
| print("Database Statistics:") | |
| print("=" * 40) | |
| for key, value in stats.items(): | |
| if isinstance(value, list): | |
| print(f"\n{key}:") | |
| for item in value[:5]: # Show top 5 | |
| print(f" - {item}") | |
| else: | |
| print(f"{key}: {value}") | |
| elif args.cluster: | |
| print(f"Analyzing cluster: {args.cluster}") | |
| with get_connection() as conn: | |
| analytics = AnalyticsOperations() | |
| analysis = analytics.get_cluster_analysis(args.cluster, conn) | |
| if 'root' in analysis: | |
| root = analysis['root'] | |
| print(f"Root: {root.get('root_letters', 'N/A')}") | |
| print(f"Type: {root.get('root_type', 'N/A')}") | |
| print(f"Quran tokens: {root.get('quran_tokens', 0)}") | |
| print(f"\nEntries: {len(analysis.get('entries', []))}") | |
| for entry in analysis.get('entries', [])[:10]: | |
| print(f" - {entry.get('en_term', 'N/A')} (score: {entry.get('score', 'N/A')})") | |
| print(f"\nDerivatives: {len(analysis.get('derivatives', []))}") | |
| elif args.test: | |
| print("Running basic tests...") | |
| with get_connection() as conn: | |
| # Test 1: Count entries | |
| conn.cursor.execute("SELECT COUNT(*) as count FROM entries") | |
| count = conn.cursor.fetchone()['count'] | |
| print(f"✓ Database has {count} entries") | |
| # Test 2: Test UDF | |
| conn.cursor.execute("SELECT extract_consonants('example') as skeleton") | |
| skeleton = conn.cursor.fetchone()['skeleton'] | |
| print(f"✓ UDF test: 'example' -> '{skeleton}'") | |
| # Test 3: Test word_fingerprints | |
| conn.cursor.execute("SELECT COUNT(*) as count FROM word_fingerprints") | |
| fp_count = conn.cursor.fetchone()['count'] | |
| print(f"✓ Word fingerprints: {fp_count}") | |
| print("\nAll tests passed!") | |
| else: | |
| parser.print_help() | |
| if __name__ == "__main__": | |
| main() |