OmniFile-Processor / modules /core /database_manager.py
Dr. Abdulmalek
deploy: OmniFile AI Processor v4.3.0
900df0b
"""
database_manager.py — نظام قاعدة البيانات المحسّن لـ OmniFile Processor
الميزات:
- بصمة الملف (SHA-256 Hash) لتجنب إعادة المعالجة
- البحث النصي الكامل (FTS5) للاسترجاع الفوري
- نسبة الثقة (Confidence Score) لتتبع جودة OCR
- نظام الكاش (Cache) لتسريع المعالجة على الدفعات
الاستخدام:
from modules.core.database_manager import OmniDatabase
db = OmniDatabase("my_archive.db")
db.process_file(file_path, my_ai_engine)
results = db.search_text("كسر عنق الفخذ")
"""
import sqlite3
import hashlib
import os
from datetime import datetime
from typing import Optional, Tuple, List, Dict, Any, Callable
class OmniDatabase:
"""
نظام إدارة قاعدة البيانات لمعالجة الملفات والأرشفة الرقمية.
يجمع بين ثلاثة أنظمة:
1. Hash-based Cache — تجنب إعادة المعالجة
2. Full-Text Search (FTS5) — بحث دلالي فوري
3. Confidence Tracking — تتبع جودة الاستخراج
"""
def __init__(self, db_name: str = "omni_processor.db"):
"""
تهيئة قاعدة البيانات.
Args:
db_name: مسار ملف قاعدة البيانات SQLite
"""
self.conn = sqlite3.connect(db_name, check_same_thread=False)
self.conn.row_factory = sqlite3.Row
self.cursor = self.conn.cursor()
self._create_tables()
def _create_tables(self):
"""إنشاء الجداول اللازمة إذا لم تكن موجودة."""
# 1. الجدول الأساسي لتخزين البيانات الوصفية
self.cursor.execute('''
CREATE TABLE IF NOT EXISTS processed_files (
id INTEGER PRIMARY KEY AUTOINCREMENT,
file_hash TEXT UNIQUE NOT NULL,
file_name TEXT NOT NULL,
file_path TEXT,
file_extension TEXT,
file_size INTEGER,
category TEXT,
subcategory TEXT,
tags TEXT,
extracted_text TEXT,
process_date DATETIME,
confidence_score REAL,
ocr_engine TEXT,
language TEXT,
page_count INTEGER,
processing_time REAL
)
''')
# 2. فهرس لتحسين البحث حسب التصنيف
self.cursor.execute('''
CREATE INDEX IF NOT EXISTS idx_category
ON processed_files(category)
''')
# 3. فهرس لتحسين البحث حسب التاريخ
self.cursor.execute('''
CREATE INDEX IF NOT EXISTS idx_date
ON processed_files(process_date)
''')
# 4. فهرس للبحث حسب نسبة الثقة
self.cursor.execute('''
CREATE INDEX IF NOT EXISTS idx_confidence
ON processed_files(confidence_score)
''')
# 5. جدول البحث النصي الكامل (FTS5) — محرك البحث السريع
self.cursor.execute('''
CREATE VIRTUAL TABLE IF NOT EXISTS files_fts
USING fts5(
content,
file_name,
category,
tags,
file_id UNINDEXED
)
''')
# 6. جدول لتخزين تاريخ التصحيحات اليدوية
self.cursor.execute('''
CREATE TABLE IF NOT EXISTS corrections_log (
id INTEGER PRIMARY KEY AUTOINCREMENT,
file_id INTEGER NOT NULL,
original_text TEXT,
corrected_text TEXT,
correction_date DATETIME,
FOREIGN KEY (file_id) REFERENCES processed_files(id)
)
''')
self.conn.commit()
@staticmethod
def calculate_file_hash(file_path: str) -> str:
"""
توليد بصمة فريدة للملف بناءً على محتواه (SHA-256).
Args:
file_path: مسار الملف
Returns:
سلسلة hex تمثل البصمة الفريدة (64 حرف)
"""
sha256_hash = hashlib.sha256()
with open(file_path, "rb") as f:
for byte_block in iter(lambda: f.read(4096), b""):
sha256_hash.update(byte_block)
return sha256_hash.hexdigest()
def check_file_exists(self, file_hash: str) -> Optional[Tuple]:
"""
التحقق مما إذا كان الملف قد تمت معالجته سابقاً.
Args:
file_hash: بصمة الملف
Returns:
tuple (category, extracted_text, confidence_score) أو None
"""
self.cursor.execute(
"SELECT category, extracted_text, confidence_score, ocr_engine "
"FROM processed_files WHERE file_hash = ?",
(file_hash,)
)
return self.cursor.fetchone()
def save_record(
self,
file_hash: str,
file_name: str,
file_path: str,
category: str,
text: str,
confidence: float = 0.0,
ocr_engine: str = "unknown",
language: str = "ar",
tags: str = "",
subcategory: str = "",
page_count: int = 1,
processing_time: float = 0.0
) -> bool:
"""
حفظ سجل معالجة جديد في قاعدة البيانات.
Args:
file_hash: بصمة الملف الفريدة
file_name: اسم الملف الأصلي
file_path: مسار الملف
category: التصنيف الرئيسي
text: النص المستخرج
confidence: نسبة الثقة (0.0 - 1.0)
ocr_engine: محرك OCR المستخدم
language: لغة المحتوى
tags: وسوم إضافية (مفصولة بفواصل)
subcategory: التصنيف الفرعي
page_count: عدد الصفحات
processing_time: زمن المعالجة بالثواني
Returns:
True إذا تم الحفظ بنجاح، False إذا كان الملف موجوداً
"""
try:
file_ext = os.path.splitext(file_name)[1].lower()
file_size = os.path.getsize(file_path) if os.path.exists(file_path) else 0
self.cursor.execute('''
INSERT INTO processed_files
(file_hash, file_name, file_path, file_extension, file_size,
category, subcategory, tags, extracted_text, process_date,
confidence_score, ocr_engine, language, page_count, processing_time)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
''', (
file_hash, file_name, file_path, file_ext, file_size,
category, subcategory, tags, text, datetime.now(),
confidence, ocr_engine, language, page_count, processing_time
))
# إضافة النص إلى محرك البحث السريع
last_id = self.cursor.lastrowid
self.cursor.execute(
"INSERT INTO files_fts(content, file_name, category, tags, file_id) VALUES (?, ?, ?, ?, ?)",
(text, file_name, category, tags, last_id)
)
self.conn.commit()
return True
except sqlite3.IntegrityError:
return False
def search_text(self, query: str, limit: int = 50) -> List[Dict[str, Any]]:
"""
البحث السريع جداً عن أي كلمة أو جملة داخل الملفات المعالجة.
يستخدم محرك FTS5 للاسترجاع الفوري مع تمييز السياق.
Args:
query: كلمة أو جملة البحث
limit: الحد الأقصى للنتائج
Returns:
قائمة من القواميس تحتوي على تفاصيل كل نتيجة
"""
search_query = """
SELECT
p.file_name,
p.file_path,
p.category,
p.confidence_score,
p.process_date,
snippet(files_fts, 0, '<b>', '</b>', '...', 32) as context_snippet
FROM processed_files p
JOIN files_fts f ON p.id = f.file_id
WHERE files_fts MATCH ?
ORDER BY rank
LIMIT ?
"""
try:
self.cursor.execute(search_query, (query, limit))
rows = self.cursor.fetchall()
return [dict(row) for row in rows]
except sqlite3.OperationalError:
return []
def search_by_category(self, category: str, limit: int = 100) -> List[Dict[str, Any]]:
"""
البحث حسب التصنيف الرئيسي.
Args:
category: اسم التصنيف
limit: الحد الأقصى للنتائج
"""
self.cursor.execute(
"SELECT * FROM processed_files WHERE category = ? ORDER BY process_date DESC LIMIT ?",
(category, limit)
)
return [dict(row) for row in self.cursor.fetchall()]
def get_low_confidence_files(self, threshold: float = 0.7, limit: int = 50) -> List[Dict[str, Any]]:
"""
الحصول على الملفات التي حصلت على نسبة ثقة منخفضة — تحتاج مراجعة يدوية.
Args:
threshold: حد الثقة الأدنى
limit: الحد الأقصى للنتائج
"""
self.cursor.execute(
"SELECT file_name, file_path, category, confidence_score, extracted_text "
"FROM processed_files WHERE confidence_score < ? "
"ORDER BY confidence_score ASC LIMIT ?",
(threshold, limit)
)
return [dict(row) for row in self.cursor.fetchall()]
def log_correction(self, file_id: int, original_text: str, corrected_text: str):
"""
تسجيل تصحيح يدوي للنص المستخرج.
Args:
file_id: معرف الملف في قاعدة البيانات
original_text: النص الأصلي (قبل التصحيح)
corrected_text: النص المصحح
"""
self.cursor.execute('''
INSERT INTO corrections_log (file_id, original_text, corrected_text, correction_date)
VALUES (?, ?, ?, ?)
''', (file_id, original_text, corrected_text, datetime.now()))
self.conn.commit()
def get_statistics(self) -> Dict[str, Any]:
"""
الحصول على إحصائيات شاملة عن قاعدة البيانات.
Returns:
قاموس يحتوي على الإحصائيات
"""
stats = {}
# إجمالي الملفات المعالجة
self.cursor.execute("SELECT COUNT(*) as total FROM processed_files")
stats["total_files"] = self.cursor.fetchone()["total"]
# إجمالي الملفات المخزنة مؤقتاً (cache hits)
self.cursor.execute("SELECT COUNT(*) as cached FROM processed_files WHERE processing_time = 0")
stats["cached_files"] = self.cursor.fetchone()["cached"]
# متوسط نسبة الثقة
self.cursor.execute("SELECT AVG(confidence_score) as avg_conf FROM processed_files")
result = self.cursor.fetchone()["avg_conf"]
stats["average_confidence"] = round(result, 4) if result else 0.0
# عدد الملفات حسب التصنيف
self.cursor.execute(
"SELECT category, COUNT(*) as count FROM processed_files "
"GROUP BY category ORDER BY count DESC"
)
stats["categories"] = [dict(row) for row in self.cursor.fetchall()]
# عدد الملفات حسب اللغة
self.cursor.execute(
"SELECT language, COUNT(*) as count FROM processed_files "
"GROUP BY language ORDER BY count DESC"
)
stats["languages"] = [dict(row) for row in self.cursor.fetchall()]
# عدد الملفات حسب المحرك
self.cursor.execute(
"SELECT ocr_engine, COUNT(*) as count FROM processed_files "
"GROUP BY ocr_engine ORDER BY count DESC"
)
stats["engines"] = [dict(row) for row in self.cursor.fetchall()]
# إجمالي التصحيحات اليدوية
self.cursor.execute("SELECT COUNT(*) as total FROM corrections_log")
stats["total_corrections"] = self.cursor.fetchone()["total"]
return stats
def process_file(
self,
file_path: str,
ai_engine: Callable,
force_reprocess: bool = False
) -> Tuple[str, str, float]:
"""
معالجة ملف كامل مع نظام الكاش التلقائي.
هذه هي الدالة الرئيسية التي تربط قاعدة البيانات بمحرك المعالجة.
Args:
file_path: مسار الملف
ai_engine: دالة المعالجة يجب أن ترجع (category, text, confidence, engine, lang)
force_reprocess: إعادة المعالجة حتى لو كان الملف موجوداً في الكاش
Returns:
tuple (category, extracted_text, confidence_score)
"""
import time
# 1. حساب بصمة الملف
file_hash = self.calculate_file_hash(file_path)
# 2. التحقق من الكاش (إلا إذا طلب إعادة المعالجة)
if not force_reprocess:
cached = self.check_file_exists(file_hash)
if cached:
print(f" ✅ كاش: {os.path.basename(file_path)}{cached[0]} ({cached[2]:.0%})")
return cached[0], cached[1], cached[2]
# 3. المعالجة الثقيلة (OCR + NLP)
start_time = time.time()
result = ai_engine(file_path)
processing_time = time.time() - start_time
# 4. حفظ النتيجة
if len(result) >= 5:
category, text, confidence, engine, lang = result[0], result[1], result[2], result[3], result[4]
elif len(result) == 3:
category, text, confidence = result
engine, lang = "unknown", "ar"
else:
category, text = result[0], result[1]
confidence, engine, lang = 0.0, "unknown", "ar"
self.save_record(
file_hash=file_hash,
file_name=os.path.basename(file_path),
file_path=file_path,
category=category,
text=text,
confidence=confidence,
ocr_engine=engine,
language=lang,
processing_time=processing_time
)
print(f" 🆕 جديد: {os.path.basename(file_path)}{category} ({confidence:.0%}) [{processing_time:.1f}s]")
return category, text, confidence
def close(self):
"""إغلاق اتصال قاعدة البيانات."""
self.conn.close()
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.close()