Spaces:
Sleeping
Sleeping
| """ | |
| محرك البحث الشامل (Global Semantic Search Engine) | |
| ===================================================== | |
| محرك بحث متقدم في أرشيف الملفات المعالجة. | |
| الميزات: | |
| - بحث نصي كامل مع سياق (context snippet) | |
| - بحث بالتصنيف واللغة | |
| - بحث بالتاريخ | |
| - تصفية حسب نسبة الثقة | |
| - بحث متقدم (AND, OR, NOT) | |
| - تصدير النتائج | |
| الاستخدام: | |
| from modules.core.search_engine import SearchEngine | |
| engine = SearchEngine(db_path="omni_processor.db") | |
| results = engine.search("كسر عنق الفخذ", limit=20) | |
| for r in results: | |
| print(f"{r['file_name']}: {r['snippet']}") | |
| """ | |
| import logging | |
| import os | |
| import re | |
| import sqlite3 | |
| from datetime import datetime | |
| from typing import Optional, List, Dict, Any, Tuple | |
| logger = logging.getLogger(__name__) | |
| class SearchEngine: | |
| """ | |
| محرك بحث شامل في الأرشيف الرقمي. | |
| يدعم البحث في: | |
| - النصوص المستخرجة (extracted_text) | |
| - أسماء الملفات (file_name) | |
| - التصنيفات (category, tags) | |
| - الملاحظات والتعليقات | |
| """ | |
| def __init__(self, db_path: str = "omni_processor.db"): | |
| """ | |
| تهيئة محرك البحث. | |
| Args: | |
| db_path: مسار قاعدة بيانات OmniDatabase | |
| """ | |
| self.db_path = db_path | |
| self.conn = None | |
| self._connect() | |
| def _connect(self): | |
| """الاتصال بقاعدة البيانات.""" | |
| try: | |
| self.conn = sqlite3.connect(self.db_path, check_same_thread=False) | |
| self.conn.row_factory = sqlite3.Row | |
| logger.info("تم الاتصال بقاعدة البيانات: %s", self.db_path) | |
| except Exception as e: | |
| logger.error("فشل الاتصال بقاعدة البيانات: %s", e) | |
| self.conn = None | |
| def is_connected(self) -> bool: | |
| """هل قاعدة البيانات متصلة؟""" | |
| return self.conn is not None | |
| def search( | |
| self, | |
| query: str, | |
| limit: int = 50, | |
| offset: int = 0, | |
| category: Optional[str] = None, | |
| language: Optional[str] = None, | |
| min_confidence: Optional[float] = None, | |
| date_from: Optional[str] = None, | |
| date_to: Optional[str] = None, | |
| search_mode: str = "standard", | |
| ) -> Dict[str, Any]: | |
| """ | |
| البحث الشامل في الأرشيف. | |
| Args: | |
| query: كلمة أو جملة البحث | |
| limit: الحد الأقصى للنتائج | |
| offset: عدد النتائج المتخطاة (للتصفح) | |
| category: تصفية حسب التصنيف | |
| language: تصفية حسب اللغة | |
| min_confidence: الحد الأدنى لنسبة الثقة | |
| date_from: تاريخ البداية (YYYY-MM-DD) | |
| date_to: تاريخ النهاية (YYYY-MM-DD) | |
| search_mode: نمط البحث ('standard', 'fts5', 'like', 'advanced') | |
| Returns: | |
| قاموس يحتوي على results, total_count, query, filters | |
| """ | |
| if not self.conn: | |
| return {"results": [], "total_count": 0, "query": query, "error": "not connected"} | |
| if not query or not query.strip(): | |
| return {"results": [], "total_count": 0, "query": query} | |
| try: | |
| if search_mode == "fts5": | |
| return self._search_fts5( | |
| query, limit, offset, category, language, | |
| min_confidence, date_from, date_to | |
| ) | |
| elif search_mode == "advanced": | |
| return self._search_advanced( | |
| query, limit, offset, category, language, | |
| min_confidence, date_from, date_to | |
| ) | |
| else: | |
| return self._search_standard( | |
| query, limit, offset, category, language, | |
| min_confidence, date_from, date_to | |
| ) | |
| except Exception as e: | |
| logger.error("خطأ في البحث: %s", e) | |
| return {"results": [], "total_count": 0, "query": query, "error": str(e)} | |
| def _search_standard( | |
| self, query: str, limit: int, offset: int, | |
| category: Optional[str], language: Optional[str], | |
| min_confidence: Optional[float], date_from: Optional[str], | |
| date_to: Optional[str], | |
| ) -> Dict[str, Any]: | |
| """بحث قياسي باستخدام LIKE مع سياق.""" | |
| conditions = [] | |
| params = [] | |
| # شرط البحث النصي | |
| conditions.append( | |
| "(extracted_text LIKE ? OR file_name LIKE ? OR category LIKE ? OR tags LIKE ?)" | |
| ) | |
| search_term = f"%{query}%" | |
| params.extend([search_term, search_term, search_term, search_term]) | |
| # الفلاتر | |
| if category: | |
| conditions.append("category = ?") | |
| params.append(category) | |
| if language: | |
| conditions.append("language = ?") | |
| params.append(language) | |
| if min_confidence is not None: | |
| conditions.append("confidence_score >= ?") | |
| params.append(min_confidence) | |
| if date_from: | |
| conditions.append("process_date >= ?") | |
| params.append(date_from) | |
| if date_to: | |
| conditions.append("process_date <= ?") | |
| params.append(date_to) | |
| where_clause = " AND ".join(conditions) | |
| # عدد النتائج الإجمالي | |
| count_sql = f"SELECT COUNT(*) as cnt FROM processed_files WHERE {where_clause}" | |
| cursor = self.conn.execute(count_sql, params) | |
| total = cursor.fetchone()["cnt"] | |
| # استعلام النتائج | |
| sql = f""" | |
| SELECT | |
| id, file_name, file_path, category, subcategory, tags, | |
| confidence_score, ocr_engine, language, page_count, | |
| process_date, processing_time, | |
| SUBSTR(extracted_text, 1, 500) as preview | |
| FROM processed_files | |
| WHERE {where_clause} | |
| ORDER BY confidence_score DESC, process_date DESC | |
| LIMIT ? OFFSET ? | |
| """ | |
| params.extend([limit, offset]) | |
| cursor = self.conn.execute(sql, params) | |
| results = [] | |
| for row in cursor.fetchall(): | |
| result = dict(row) | |
| # استخراج سياق البحث | |
| snippet = self._extract_context( | |
| result.get("preview", ""), query, context_length=100 | |
| ) | |
| result["snippet"] = snippet | |
| result.pop("preview", None) | |
| results.append(result) | |
| return { | |
| "results": results, | |
| "total_count": total, | |
| "query": query, | |
| "limit": limit, | |
| "offset": offset, | |
| "filters": { | |
| "category": category, | |
| "language": language, | |
| "min_confidence": min_confidence, | |
| "date_from": date_from, | |
| "date_to": date_to, | |
| }, | |
| } | |
| def _search_fts5( | |
| self, query: str, limit: int, offset: int, | |
| category: Optional[str], language: Optional[str], | |
| min_confidence: Optional[float], date_from: Optional[str], | |
| date_to: Optional[str], | |
| ) -> Dict[str, Any]: | |
| """بحث باستخدام محرك FTS5 (أسرع).""" | |
| try: | |
| fts_query = self._to_fts5_query(query) | |
| conditions = ["files_fts MATCH ?"] | |
| params = [fts_query] | |
| if category: | |
| conditions.append("p.category = ?") | |
| params.append(category) | |
| if language: | |
| conditions.append("p.language = ?") | |
| params.append(language) | |
| if min_confidence is not None: | |
| conditions.append("p.confidence_score >= ?") | |
| params.append(min_confidence) | |
| where_clause = " AND ".join(conditions) | |
| # عدد النتائج | |
| count_sql = f""" | |
| SELECT COUNT(*) as cnt | |
| FROM processed_files p JOIN files_fts f ON p.id = f.file_id | |
| WHERE {where_clause} | |
| """ | |
| cursor = self.conn.execute(count_sql, params) | |
| total = cursor.fetchone()["cnt"] | |
| # النتائج مع سياق | |
| sql = f""" | |
| SELECT | |
| p.id, p.file_name, p.file_path, p.category, p.subcategory, | |
| p.confidence_score, p.ocr_engine, p.language, p.process_date, | |
| snippet(files_fts, 0, '>>>', '<<<', '...', 64) as snippet | |
| FROM processed_files p | |
| JOIN files_fts f ON p.id = f.file_id | |
| WHERE {where_clause} | |
| ORDER BY rank | |
| LIMIT ? OFFSET ? | |
| """ | |
| params.extend([limit, offset]) | |
| cursor = self.conn.execute(sql, params) | |
| results = [dict(row) for row in cursor.fetchall()] | |
| return { | |
| "results": results, | |
| "total_count": total, | |
| "query": query, | |
| "search_mode": "fts5", | |
| } | |
| except sqlite3.OperationalError as e: | |
| logger.warning("FTS5 غير متاح، السقوط إلى البحث القياسي: %s", e) | |
| return self._search_standard( | |
| query, limit, offset, category, language, | |
| min_confidence, date_from, date_to | |
| ) | |
| def _search_advanced( | |
| self, query: str, limit: int, offset: int, | |
| category: Optional[str], language: Optional[str], | |
| min_confidence: Optional[float], date_from: Optional[str], | |
| date_to: Optional[str], | |
| ) -> Dict[str, Any]: | |
| """بحث متقدم يدعم عوامل AND, OR, NOT.""" | |
| # تحليل الاستعلام المتقدم | |
| terms = self._parse_advanced_query(query) | |
| conditions = [] | |
| params = [] | |
| for operator, term in terms: | |
| if operator == "AND": | |
| conditions.append("(extracted_text LIKE ? OR file_name LIKE ?)") | |
| params.extend([f"%{term}%", f"%{term}%"]) | |
| elif operator == "OR": | |
| conditions.append("(extracted_text LIKE ? OR file_name LIKE ?)") | |
| params.extend([f"%{term}%", f"%{term}%"]) | |
| elif operator == "NOT": | |
| conditions.append("(extracted_text NOT LIKE ? AND file_name NOT LIKE ?)") | |
| params.extend([f"%{term}%", f"%{term}%"]) | |
| if not conditions: | |
| return {"results": [], "total_count": 0, "query": query} | |
| # دمج شروط OR والأخرى | |
| and_conditions = [] | |
| or_conditions = [] | |
| not_conditions = [] | |
| for i, (op, _) in enumerate(terms): | |
| if op == "NOT": | |
| not_conditions.append(conditions[i]) | |
| elif op == "OR": | |
| or_conditions.append(conditions[i]) | |
| else: | |
| and_conditions.append(conditions[i]) | |
| where_parts = [] | |
| all_params = [] | |
| if and_conditions: | |
| and_sql = " AND ".join(and_conditions) | |
| where_parts.append(f"({and_sql})") | |
| if or_conditions: | |
| or_sql = " OR ".join(or_conditions) | |
| where_parts.append(f"({or_sql})") | |
| if not_conditions: | |
| not_sql = " AND ".join(not_conditions) | |
| where_parts.append(f"({not_sql})") | |
| if not where_parts: | |
| return {"results": [], "total_count": 0, "query": query} | |
| where_clause = " AND ".join(where_parts) | |
| # تجميع المعاملات بالترتيب الصحيح | |
| for i, (_, term) in enumerate(terms): | |
| if terms[i][0] != "NOT": | |
| params_all = [f"%{term}%", f"%{term}%"] | |
| all_params.extend(params_all) | |
| for i, (_, term) in enumerate(terms): | |
| if terms[i][0] == "NOT": | |
| params_all = [f"%{term}%", f"%{term}%"] | |
| all_params.extend(params_all) | |
| # فلاتر إضافية | |
| if category: | |
| where_clause += " AND category = ?" | |
| all_params.append(category) | |
| if language: | |
| where_clause += " AND language = ?" | |
| all_params.append(language) | |
| # تنفيذ الاستعلام | |
| sql = f""" | |
| SELECT | |
| id, file_name, file_path, category, confidence_score, | |
| ocr_engine, language, process_date, | |
| SUBSTR(extracted_text, 1, 500) as preview | |
| FROM processed_files | |
| WHERE {where_clause} | |
| ORDER BY confidence_score DESC | |
| LIMIT ? OFFSET ? | |
| """ | |
| all_params.extend([limit, offset]) | |
| try: | |
| cursor = self.conn.execute(sql, all_params) | |
| results = [] | |
| for row in cursor.fetchall(): | |
| result = dict(row) | |
| snippet = self._extract_context( | |
| result.get("preview", ""), query, context_length=100 | |
| ) | |
| result["snippet"] = snippet | |
| result.pop("preview", None) | |
| results.append(result) | |
| return { | |
| "results": results, | |
| "total_count": len(results), | |
| "query": query, | |
| "search_mode": "advanced", | |
| } | |
| except Exception as e: | |
| logger.error("خطأ في البحث المتقدم: %s", e) | |
| return {"results": [], "total_count": 0, "query": query, "error": str(e)} | |
| def _to_fts5_query(query: str) -> str: | |
| """تحويل استعلام نصي إلى صيغة FTS5.""" | |
| # إزالة الأحرف الخاصة | |
| clean = re.sub(r'[(){}[\]\\^*:"+-]', ' ', query) | |
| # تقسيم إلى كلمات | |
| words = clean.split() | |
| if not words: | |
| return query | |
| # ربط الكلمات بـ AND | |
| return " AND ".join(f'"{w}"' for w in words if w) | |
| def _parse_advanced_query(query: str) -> List[Tuple[str, str]]: | |
| """ | |
| تحليل استعلام متقدم مع عوامل AND, OR, NOT. | |
| أمثلة: | |
| - "كسر AND فخذ" → [("AND", "كسر"), ("AND", "فخذ")] | |
| - "قلب OR عظام" → [("OR", "قلب"), ("OR", "عظام")] | |
| - "جراحة NOT قلب" → [("AND", "جراحة"), ("NOT", "قلب")] | |
| """ | |
| operators = re.findall(r'\b(AND|OR|NOT)\b', query.upper()) | |
| parts = re.split(r'\b(AND|OR|NOT)\b', query, flags=re.IGNORECASE) | |
| result = [] | |
| current_op = "AND" # الافتراضي | |
| for part in parts: | |
| part = part.strip() | |
| if not part: | |
| continue | |
| if part.upper() in ("AND", "OR", "NOT"): | |
| current_op = part.upper() | |
| else: | |
| result.append((current_op, part)) | |
| return result | |
| def _extract_context(text: str, query: str, context_length: int = 100) -> str: | |
| """ | |
| استخراج سياق البحث من النص. | |
| Args: | |
| text: النص الكامل | |
| query: كلمة البحث | |
| context_length: عدد الأحرف قبل وبعد الكلمة | |
| Returns: | |
| مقتطف من النص مع السياق | |
| """ | |
| if not text or not query: | |
| return text[:200] if text else "" | |
| text_lower = text.lower() | |
| query_lower = query.lower() | |
| # البحث عن أول ظهور | |
| idx = text_lower.find(query_lower) | |
| if idx == -1: | |
| # البحث عن أي كلمة من الاستعلام | |
| words = query_lower.split() | |
| for word in words: | |
| idx = text_lower.find(word) | |
| if idx != -1: | |
| break | |
| if idx == -1: | |
| return text[:200] | |
| start = max(0, idx - context_length) | |
| end = min(len(text), idx + len(query) + context_length) | |
| snippet = text[start:end] | |
| if start > 0: | |
| snippet = "..." + snippet | |
| if end < len(text): | |
| snippet = snippet + "..." | |
| return snippet | |
| def search_files( | |
| self, | |
| directory: str, | |
| query: str, | |
| extensions: Optional[List[str]] = None, | |
| recursive: bool = True, | |
| ) -> List[Dict[str, Any]]: | |
| """ | |
| بحث مباشر في ملفات .txt في مجلد (بدون قاعدة بيانات). | |
| مفيد للبحث السريع في مجلدات التصدير. | |
| Args: | |
| directory: مسار المجلد | |
| query: كلمة البحث | |
| extensions: قائمة الامتدادات | |
| recursive: البحث في المجلدات الفرعية | |
| Returns: | |
| قائمة نتائج البحث | |
| """ | |
| if extensions is None: | |
| extensions = ['.txt'] | |
| results = [] | |
| query_lower = query.lower() | |
| for root, _, files in os.walk(directory): | |
| for filename in files: | |
| ext = os.path.splitext(filename)[1].lower() | |
| if ext not in extensions: | |
| continue | |
| filepath = os.path.join(root, filename) | |
| try: | |
| with open(filepath, "r", encoding="utf-8") as f: | |
| content = f.read() | |
| if query_lower in content.lower(): | |
| idx = content.lower().find(query_lower) | |
| start = max(0, idx - 60) | |
| end = min(len(content), idx + len(query) + 80) | |
| snippet = f"...{content[start:end]}..." | |
| results.append({ | |
| "file_name": filename, | |
| "file_path": filepath, | |
| "snippet": snippet, | |
| "size": os.path.getsize(filepath), | |
| }) | |
| except (UnicodeDecodeError, PermissionError, OSError): | |
| continue | |
| if not recursive: | |
| break | |
| logger.info( | |
| "بحث ملفات: '%s' في %s → %d نتيجة", | |
| query, directory, len(results) | |
| ) | |
| return results | |
| def get_categories(self) -> List[str]: | |
| """الحصول على قائمة التصنيفات المتاحة في الأرشيف.""" | |
| if not self.conn: | |
| return [] | |
| cursor = self.conn.execute( | |
| "SELECT DISTINCT category FROM processed_files ORDER BY category" | |
| ) | |
| return [row["category"] for row in cursor.fetchall()] | |
| def get_languages(self) -> List[str]: | |
| """الحصول على قائمة اللغات المتاحة في الأرشيف.""" | |
| if not self.conn: | |
| return [] | |
| cursor = self.conn.execute( | |
| "SELECT DISTINCT language FROM processed_files ORDER BY language" | |
| ) | |
| return [row["language"] for row in cursor.fetchall()] | |
| def export_results( | |
| self, | |
| results: List[Dict[str, Any]], | |
| output_path: str, | |
| format_type: str = "json", | |
| ) -> bool: | |
| """ | |
| تصدير نتائج البحث إلى ملف. | |
| Args: | |
| results: نتائج البحث | |
| output_path: مسار الملف | |
| format_type: التنسيق ('json', 'csv', 'txt') | |
| Returns: | |
| True إذا تم التصدير بنجاح | |
| """ | |
| try: | |
| os.makedirs(os.path.dirname(output_path) or ".", exist_ok=True) | |
| if format_type == "json": | |
| import json | |
| with open(output_path, "w", encoding="utf-8") as f: | |
| json.dump(results, f, ensure_ascii=False, indent=2, default=str) | |
| elif format_type == "csv": | |
| import csv | |
| if not results: | |
| return False | |
| with open(output_path, "w", encoding="utf-8", newline="") as f: | |
| writer = csv.DictWriter(f, fieldnames=results[0].keys()) | |
| writer.writeheader() | |
| writer.writerows(results) | |
| elif format_type == "txt": | |
| with open(output_path, "w", encoding="utf-8") as f: | |
| for r in results: | |
| f.write(f"--- {r.get('file_name', 'Unknown')} ---\n") | |
| f.write(f" التصنيف: {r.get('category', 'N/A')}\n") | |
| f.write(f" الثقة: {r.get('confidence_score', 'N/A')}\n") | |
| f.write(f" السياق: {r.get('snippet', 'N/A')}\n\n") | |
| logger.info("تم تصدير %d نتيجة إلى %s", len(results), output_path) | |
| return True | |
| except Exception as e: | |
| logger.error("خطأ في تصدير النتائج: %s", e) | |
| return False | |
| def close(self): | |
| """إغلاق الاتصال بقاعدة البيانات.""" | |
| if self.conn: | |
| self.conn.close() | |
| self.conn = None | |
| def __enter__(self): | |
| return self | |
| def __exit__(self, exc_type, exc_val, exc_tb): | |
| self.close() | |
| def __repr__(self): | |
| status = "متصل" if self.conn else "غير متصل" | |
| return f"SearchEngine(db='{self.db_path}', status={status})" | |