| |
| """ |
| 📕 TawasoaAi_OCR.py - الإصدار 23.4 (إصلاح حالة المعالجة وتحسين عرض الحالة) |
| النسخة المخصصة لبيئة Hugging Face: تقوم بالزحف وحفظ الملفات في GCS وإدخال السجلات الأساسية في DB. |
| **تخطي جميع عمليات استخلاص النصوص و OCR للملفات، واستخلاص بسيط لصفحات السجلات.** |
| |
| ✅ [إصلاح 412] تعديل gcs_upload_file لاستخدام صلاحيات حساب الخدمة. |
| ✅ [حالة DB] توحيد حالة الملفات والسجلات إلى 'awaiting_processing'. |
| ✅ [تحسين] تعزيز دالة الزحف للتعامل مع أخطاء Requests. |
| ✅ [إصلاح التوقف] إضافة مهلة عامة ومعالجة استثناءات الخيوط. |
| ✅ [تحقق من الروابط] إضافة تحقق من صحة الروابط. |
| ✅ [إصلاح Gradio] تعديل get_status لإرجاع قيمتين. |
| ✅ [إصلاح SSL] زيادة مهلة الاتصال ودعم SSL صريح. |
| ✅ [جديد: إصلاح الحالة] ضمان تسجيل الحالة كـ 'awaiting_processing' عند الرفع الناجح إلى GCS. |
| """ |
|
|
| |
| |
| |
| import os |
| import re |
| import io |
| import json |
| import time |
| import signal |
| import uuid |
| import traceback |
| import logging |
| import threading |
| import sys |
| import zipfile |
| import mimetypes |
| from concurrent.futures import ThreadPoolExecutor, as_completed |
| from urllib.parse import urljoin, urlparse, unquote |
| from collections import deque |
| from datetime import datetime, timedelta |
| from typing import Optional, List, Dict, Any, Tuple, Set |
| from enum import Enum |
|
|
| import requests |
| from bs4 import BeautifulSoup |
| from tqdm import tqdm |
| import gradio as gr |
|
|
| try: |
| from dotenv import load_dotenv |
| DOTENV_AVAILABLE = True |
| except ImportError: |
| DOTENV_AVAILABLE = False |
|
|
| try: |
| import google.generativeai as genai |
| from google.cloud import storage |
| from google.oauth2 import service_account |
| GCS_GEMINI_AVAILABLE = True |
| except ImportError: |
| GCS_GEMINI_AVAILABLE = False |
|
|
| try: |
| import psycopg2 |
| from psycopg2 import pool, Binary |
| from psycopg2.extras import RealDictCursor, execute_values |
| POSTGRES_AVAILABLE = True |
| except ImportError: |
| POSTGRES_AVAILABLE = False |
|
|
| PDF_AVAILABLE = False |
| DOCX_AVAILABLE = False |
| EXCEL_AVAILABLE = False |
| OCR_AVAILABLE = False |
| MD_AVAILABLE = False |
|
|
| |
| |
| |
| class JobStatus(Enum): |
| IDLE = "⚪ خامل" |
| SCANNING = "⏳ فحص الأرشيف..." |
| CRAWLING = "🔍 يتم الزحف..." |
| PROCESSING = "⚙️ تتم المعالجة (جلب الخام)..." |
| DONE = "✅ اكتمل" |
| GCS_UP_TO_DATE = "✅ GCS محدث" |
| ERROR = "🔴 خطأ" |
|
|
| class CrawlMode(Enum): |
| ALL = "الكل" |
| PDF = "ملفات PDF فقط" |
| DOC = "ملفات DOC/DOCX فقط" |
| XLS = "ملفات XLS/XLSX فقط" |
| TXT = "ملفات TXT/MD فقط" |
| RECORDS = "سجلات HTML فقط" |
|
|
| SUPPORTED_FILE_EXTENSIONS = {'.pdf', '.doc', '.docx', '.xls', '.xlsx', '.txt', '.md'} |
| RECORD_PAGE_INDICATORS = {'/records/', '/company_records/', '/registration/'} |
| REQUEST_HEADERS = {'User-Agent': 'Mozilla/5.0'} |
|
|
| |
| STATUS_TRANSLATIONS = { |
| "awaiting_processing": "جارى المعالجة", |
| "failed": "فشلت المعالجة", |
| "processed": "تمت المعالجة", |
| "N/A": "غير متاح" |
| } |
|
|
| |
| |
| |
| def setup_logging() -> logging.Logger: |
| """إعداد وتكوين نظام تسجيل الأحداث.""" |
| log_format = "%(asctime)s [%(levelname)s] %(name)s: %(message)s" |
| date_format = "%Y-%m-%d %H:%M:%S" |
| logger = logging.getLogger("TawasoaAiOCR") |
| if not logger.handlers: |
| logger.setLevel(logging.INFO) |
| file_handler = logging.FileHandler('tawasoa_ai_ocr.log', encoding='utf-8') |
| file_handler.setFormatter(logging.Formatter(log_format, date_format)) |
| logger.addHandler(file_handler) |
| stream_handler = logging.StreamHandler(sys.stdout) |
| stream_handler.setFormatter(logging.Formatter(log_format, date_format)) |
| logger.addHandler(stream_handler) |
| return logger |
|
|
| class Config: |
| """كلاس مركزي لتجميع إعدادات المشروع.""" |
| def __init__(self, logger: logging.Logger): |
| if DOTENV_AVAILABLE: |
| load_dotenv() |
| self.log = logger |
| self.DB_CONFIG: Dict[str, Any] = { |
| "host": os.getenv("DB_HOST", "localhost"), |
| "user": os.getenv("DB_USERNAME", "postgres"), |
| "password": os.getenv("DB_PASSWORD"), |
| "dbname": os.getenv("DB_DATABASE", "tawasoa"), |
| "port": self._get_env_as_int("DB_PORT", 5432), |
| "sslmode": os.getenv("DB_SSLMODE", "prefer"), |
| "connect_timeout": 30 |
| } |
| self.GCS_BUCKET_NAME: Optional[str] = os.getenv("GCS_BUCKET_NAME") |
| self.GOOGLE_API_KEY: Optional[str] = os.getenv("GOOGLE_API_KEY") |
| self.MAX_WORKERS: int = self._get_env_as_int("MAX_WORKERS_DEFAULT", 5) |
|
|
| def _get_env_as_int(self, key: str, default: int) -> int: |
| try: |
| return int(os.getenv(key, str(default))) |
| except (ValueError, TypeError): |
| self.log.warning(f"قيمة متغير البيئة '{key}' غير صالحة. استخدام الافتراضي: {default}") |
| return default |
|
|
| class AppContext: |
| """يحتوي على حالة التطبيق الحية والمكونات المُهيأة.""" |
| def __init__(self): |
| self.connection_pool = None |
| self.storage_client = None |
| self.app_shutdown = threading.Event() |
| self.is_db_ready: bool = False |
| self.is_gcs_ready: bool = False |
| self.is_gemini_ready: bool = False |
|
|
| |
| |
| |
| class JobManager: |
| """فئة لإدارة حالة المهمة التي تعمل في الخلفية.""" |
| def __init__(self): |
| self.lock = threading.Lock() |
| self.job = { |
| "thread": None, |
| "type": None, |
| "status": JobStatus.IDLE.value, |
| "log": ["مرحبًا بك!"], |
| "stop_event": threading.Event() |
| } |
|
|
| def update_log(self, message: str): |
| if not message: return |
| with self.lock: |
| self.job["log"].append(f"[{time.strftime('%H:%M:%S')}] {message}") |
| if len(self.job["log"]) > 200: |
| self.job["log"] = self.job["log"][-200:] |
|
|
| def update_status(self, status: JobStatus): |
| with self.lock: |
| self.job["status"] = status.value |
|
|
| def get_status(self) -> tuple[str, str]: |
| with self.lock: |
| log_text = "\n".join(self.job["log"][-100:]) |
| status = self.job["status"] |
| return log_text, status |
| |
| def start_job(self, target_func, job_type: str, *args): |
| with self.lock: |
| if self.job["thread"] and self.job["thread"].is_alive(): |
| self.update_log("⚠️ مهمة أخرى قيد التشغيل بالفعل.") |
| return |
| self.job["stop_event"].clear() |
| self.job["type"] = job_type |
| self.job["thread"] = threading.Thread(target=target_func, args=args) |
| self.job["thread"].start() |
|
|
| def stop_job(self): |
| with self.lock: |
| if self.job["thread"] and self.job["thread"].is_alive(): |
| self.update_log("🛑 جاري إرسال إشارة الإيقاف...") |
| self.job["stop_event"].set() |
| return "تم إرسال طلب الإيقاف." |
| return "لا توجد مهمة قيد التشغيل." |
| |
| def reset_thread(self): |
| with self.lock: |
| self.job["thread"] = None |
|
|
| |
| |
| |
| def check_pool_health(pool, config: Config, log: logging.Logger) -> bool: |
| """فحص صحة الـ connection pool وإعادة تهيئته إذا لزم الأمر.""" |
| try: |
| test_conn = pool.getconn() |
| with test_conn.cursor() as cursor: |
| cursor.execute("SELECT 1") |
| pool.putconn(test_conn) |
| log.info("✅ فحص صحة الـ connection pool: ناجح.") |
| return True |
| except Exception as e: |
| log.error(f"❌ فشل فحص الـ connection pool: {e}. محاولة إعادة التهيئة...") |
| try: |
| pool.closeall() |
| new_pool = psycopg2.pool.ThreadedConnectionPool( |
| minconn=2, |
| maxconn=config.MAX_WORKERS + 3, |
| **config.DB_CONFIG |
| ) |
| pool = new_pool |
| log.info("✅ تم إعادة تهيئة الـ connection pool بنجاح.") |
| return True |
| except Exception as e: |
| log.error(f"❌ فشل إعادة تهيئة الـ connection pool: {e}.") |
| return False |
|
|
| def db_execute_query(pool, query: str, params=None, fetch_result=False, max_retries=7): |
| """تنفيذ استعلام قاعدة البيانات مع إعادة المحاولة.""" |
| retry_delays = [5, 10, 15, 20, 25, 30, 35] |
| for attempt in range(max_retries): |
| conn = None |
| try: |
| conn = pool.getconn() |
| with conn.cursor() as cursor: |
| cursor.execute(query, params) |
| if fetch_result: |
| result = cursor.fetchall() |
| else: |
| result = True |
| conn.commit() |
| log.info(f"✅ نجاح استعلام قاعدة البيانات في المحاولة {attempt + 1}.") |
| return result |
| except (psycopg2.OperationalError, psycopg2.InterfaceError) as e: |
| log.warning(f"⚠️ خطأ تشغيلي في قاعدة البيانات (محاولة {attempt + 1}/{max_retries}): {e}.") |
| if conn: |
| try: |
| conn.rollback() |
| except: |
| pass |
| pool.putconn(conn, close=True) |
| conn = None |
| if attempt < max_retries - 1: |
| delay = retry_delays[attempt] |
| log.info(f"إعادة المحاولة بعد {delay} ثوانٍ.") |
| time.sleep(delay) |
| continue |
| else: |
| log.error(f"❌ فشلت جميع محاولات الاتصال بعد {max_retries} محاولات.") |
| raise |
| except Exception as e: |
| if conn: |
| try: |
| conn.rollback() |
| except: |
| pass |
| raise |
| finally: |
| if conn: |
| try: |
| pool.putconn(conn) |
| except psycopg2.pool.PoolError as pe: |
| log.error(f"❌ خطأ في إعادة الاتصال إلى التجمع: {pe}. إغلاق الاتصال.") |
| pool.putconn(conn, close=True) |
| |
| return False |
|
|
| def db_check_document_exists(pool, original_path: str) -> bool: |
| """التحقق من وجود مستند.""" |
| results = db_execute_query( |
| pool, |
| "SELECT id FROM documents WHERE original_path = %s;", |
| (original_path,), |
| fetch_result=True |
| ) |
| return bool(results) if results is not False else False |
|
|
| def db_insert_document(pool, doc_data: Dict[str, Any]) -> int: |
| """إدراج سجل مستند جديد.""" |
| conn = pool.getconn() |
| try: |
| with conn.cursor() as cursor: |
| cursor.execute( |
| """INSERT INTO documents (source, filename, original_path, storage_path, cover_image_path, status, summary, full_text_content, created_at, updated_at) |
| VALUES (%(source)s, %(filename)s, %(original_path)s, %(storage_path)s, %(cover_image_path)s, %(status)s, %(summary)s, %(full_text_content)s, NOW(), NOW()) |
| RETURNING id;""", |
| doc_data |
| ) |
| doc_id = cursor.fetchone()[0] |
| conn.commit() |
| log.info(f"✅ تم إدراج المستند '{doc_data['filename']}' بحالة '{doc_data['status']}'.") |
| return doc_id |
| except Exception as e: |
| conn.rollback() |
| log.error(f"❌ فشل إدراج المستند '{doc_data['filename']}': {e}") |
| raise |
| finally: |
| pool.putconn(conn) |
|
|
| def db_log_failed_document(pool, original_path: str, filename: str, error: str): |
| """تسجيل المستندات الفاشلة.""" |
| conn = pool.getconn() |
| try: |
| with conn.cursor() as cursor: |
| cursor.execute( |
| "INSERT INTO documents (source, filename, original_path, status, summary, created_at, updated_at) " |
| "VALUES (%s, %s, %s, 'failed', %s, NOW(), NOW()) ON CONFLICT (original_path) DO NOTHING", |
| (original_path, filename, original_path, f"فشل الرفع: {error}") |
| ) |
| conn.commit() |
| log.info(f"📝 تم تسجيل المستند الفاشل '{filename}' بسبب: {error}") |
| except Exception as e: |
| log.error(f"❌ فشل تسجيل المستند الفاشل '{filename}': {e}") |
| finally: |
| pool.putconn(conn) |
|
|
| def db_search_records(pool, search_term: str) -> List[Dict[str, Any]]: |
| """البحث في المستندات.""" |
| conn = pool.getconn() |
| try: |
| with conn.cursor(cursor_factory=RealDictCursor) as cursor: |
| search_pattern = f"%{search_term}%" |
| cursor.execute( |
| """ |
| SELECT id, filename, original_path, status, summary, created_at, full_text_content |
| FROM documents |
| WHERE filename ILIKE %s OR full_text_content ILIKE %s |
| ORDER BY created_at DESC |
| LIMIT 50; |
| """, |
| (search_pattern, search_pattern) |
| ) |
| return cursor.fetchall() |
| finally: |
| pool.putconn(conn) |
|
|
| |
| |
| |
| def decompose_html_record(content_bytes: bytes) -> str: |
| """استخلاص النص المرئي من HTML.""" |
| try: |
| soup = BeautifulSoup(content_bytes, 'html.parser') |
| for script_or_style in soup(['script', 'style', 'header', 'footer', 'nav']): |
| script_or_style.decompose() |
| return soup.get_text(separator=' ', strip=True) |
| except Exception as e: |
| log.error(f"❌ فشل في تحليل HTML: {e}") |
| return content_bytes.decode('utf-8', errors='ignore') |
|
|
| def is_valid_docx(content_bytes: bytes) -> bool: return True |
| def ocr_image_to_text(image_bytes: bytes) -> str: return "" |
| def generate_page_images_as_binary(content_bytes: bytes) -> List[Dict[str, Any]]: return [] |
| def decompose_pdf(content_bytes: bytes, page_images: List[Dict[str, Any]]) -> List[Dict[str, Any]]: return [] |
| def decompose_docx(content_bytes: bytes) -> List[Dict[str, Any]]: return [] |
| def decompose_excel(content_bytes: bytes) -> List[Dict[str, Any]]: return [] |
| def decompose_text_or_md(content_bytes: bytes, file_ext: str) -> List[Dict[str, Any]]: return [] |
|
|
| |
| |
| |
| def gcs_upload_file(client, bucket_name: str, content_bytes: bytes, blob_name: str, content_type: str) -> str: |
| """رفع ملف إلى GCS.""" |
| bucket = client.bucket(bucket_name) |
| blob = bucket.blob(blob_name) |
| blob.upload_from_string(content_bytes, content_type=content_type) |
| log.info(f"✅ تم رفع الملف '{blob_name}' إلى GCS.") |
| return f"gs://{bucket_name}/{blob_name}" |
|
|
| def gcs_generate_cover_image(client, bucket_name: str, content_bytes: bytes, original_filename: str) -> Optional[str]: |
| """تعطيل إنشاء الغلاف.""" |
| return None |
|
|
| def ai_get_embedding(text: str) -> Optional[List[float]]: |
| """تعطيل التضمين.""" |
| return None |
|
|
| def ai_summarize_and_classify(text: str) -> Dict[str, str]: |
| """تعطيل التلخيص والتصنيف.""" |
| return {"summary": "لم تتم معالجة النص على الخادم.", "category": "غير مصنف"} |
|
|
| |
| |
| |
| def is_valid_url(url: str) -> bool: |
| """التحقق من صحة الرابط.""" |
| try: |
| result = urlparse(url) |
| return all([result.scheme, result.netloc]) |
| except Exception: |
| return False |
|
|
| def process_single_document(link: str, content_type: str, session: requests.Session) -> str: |
| """معالجة مستند واحد.""" |
| safe_filename = unquote(os.path.basename(urlparse(link).path)) |
| try: |
| if db_check_document_exists(app_context.connection_pool, link): |
| return f"🟡 (تخطي) ملف موجود مسبقاً: {safe_filename}" |
| |
| resp = session.get(link, timeout=20, headers=REQUEST_HEADERS) |
| resp.raise_for_status() |
| content_bytes = resp.content |
| full_text_content = "" |
| |
| document_uuid = str(uuid.uuid4()) |
| blob_name = f"documents/{document_uuid}/{safe_filename}" |
| storage_path = gcs_upload_file(app_context.storage_client, app_config.GCS_BUCKET_NAME, content_bytes, blob_name, content_type) |
| cover_image_url = gcs_generate_cover_image(app_context.storage_client, app_config.GCS_BUCKET_NAME, content_bytes, safe_filename) |
|
|
| summary = "تم حفظ الملف الخام في GCS. (جارى انتظار المعالجة النصية محليًا)." |
| status = 'awaiting_processing' |
| |
| doc_data = { |
| "source": link, |
| "filename": safe_filename, |
| "original_path": link, |
| "storage_path": storage_path, |
| "cover_image_path": cover_image_url, |
| "status": status, |
| "summary": summary, |
| "full_text_content": full_text_content, |
| } |
| doc_id = db_insert_document(app_context.connection_pool, doc_data) |
| |
| return f"✅ تم حفظ الملف الخام '{safe_filename}' في GCS و DB (ID: {doc_id}). (الحالة: جارى المعالجة)" |
| except requests.exceptions.RequestException as e: |
| error_msg = f"فشل طلب HTTP: {str(e)}" |
| log.error(f"❌ خطأ فادح أثناء معالجة ملف {safe_filename}: {error_msg}\n{traceback.format_exc()}") |
| db_log_failed_document(app_context.connection_pool, link, safe_filename, error_msg) |
| return f"❌ فشلت معالجة ملف {safe_filename} - {error_msg}" |
| except Exception as e: |
| error_msg = f"خطأ غير متوقع: {str(e)}" |
| log.error(f"❌ خطأ فادح أثناء معالجة ملف {safe_filename}: {error_msg}\n{traceback.format_exc()}") |
| db_log_failed_document(app_context.connection_pool, link, safe_filename, error_msg) |
| return f"❌ فشلت معالجة ملف {safe_filename} - {error_msg}" |
|
|
| def process_single_record_page(link: str, session: requests.Session) -> str: |
| """معالجة صفحة سجل واحدة (HTML).""" |
| safe_filename = unquote(os.path.basename(urlparse(link).path)) |
| try: |
| if db_check_document_exists(app_context.connection_pool, link): |
| return f"🟡 (تخطي) سجل موجود مسبقاً: {safe_filename}" |
| |
| resp = session.get(link, timeout=20, headers=REQUEST_HEADERS) |
| resp.raise_for_status() |
| content_bytes = resp.content |
| full_text_content = decompose_html_record(content_bytes) |
| |
| summary = "سجل تم استخلاص نصه جزئياً (HTML). (جارى انتظار المعالجة المتبقية محليًا)." |
| status = 'awaiting_processing' |
|
|
| doc_data = { |
| "source": link, |
| "filename": safe_filename, |
| "original_path": link, |
| "storage_path": None, |
| "cover_image_path": None, |
| "status": status, |
| "summary": summary, |
| "full_text_content": full_text_content, |
| } |
| doc_id = db_insert_document(app_context.connection_pool, doc_data) |
|
|
| return f"✅ تم حفظ سجل الصفحة '{safe_filename}' في DB (ID: {doc_id}). (الحالة: جارى المعالجة)" |
| except requests.exceptions.RequestException as e: |
| error_msg = f"فشل طلب HTTP: {str(e)}" |
| log.error(f"❌ خطأ فادح أثناء معالجة سجل {safe_filename}: {error_msg}\n{traceback.format_exc()}") |
| db_log_failed_document(app_context.connection_pool, link, safe_filename, error_msg) |
| return f"❌ فشلت معالجة سجل {safe_filename} - {error_msg}" |
| except Exception as e: |
| error_msg = f"خطأ غير متوقع: {str(e)}" |
| log.error(f"❌ خطأ فادح أثناء معالجة سجل {safe_filename}: {error_msg}\n{traceback.format_exc()}") |
| db_log_failed_document(app_context.connection_pool, link, safe_filename, error_msg) |
| return f"❌ فشلت معالجة سجل {safe_filename} - {error_msg}" |
|
|
| def crawl_website(url: str, limit: int, mode: CrawlMode): |
| """المحرك الرئيسي لعملية الزحف.""" |
| job_manager.update_status(JobStatus.CRAWLING) |
| session = requests.Session() |
| try: |
| if not is_valid_url(url): |
| job_manager.update_log(f"❌ رابط البداية غير صالح: {url}") |
| job_manager.update_status(JobStatus.ERROR) |
| return |
| |
| if not url.startswith(('http://', 'https://')): |
| url = 'https://' + url |
| base_domain = urlparse(url).netloc |
| visited = set() |
| queue = deque([url]) |
| found_links = set() |
| |
| target_file_extensions = { |
| CrawlMode.PDF: ['.pdf'], CrawlMode.DOC: ['.doc', '.docx'], |
| CrawlMode.XLS: ['.xls', '.xlsx'], CrawlMode.TXT: ['.txt', '.md'], |
| CrawlMode.ALL: SUPPORTED_FILE_EXTENSIONS |
| }.get(mode, set()) |
| |
| is_record_mode = mode == CrawlMode.RECORDS |
| pbar = tqdm(total=limit, desc=f"🔍 جمع ({mode.value})") |
| |
| while queue and len(found_links) < limit: |
| if job_manager.job["stop_event"].is_set(): |
| job_manager.update_log("🛑 تم استلام إشارة الإيقاف أثناء الزحف.") |
| break |
| |
| current_url = queue.popleft() |
| if current_url in visited or not is_valid_url(current_url): |
| continue |
| visited.add(current_url) |
| |
| try: |
| resp = session.get(current_url, timeout=20, headers=REQUEST_HEADERS) |
| resp.raise_for_status() |
| soup = BeautifulSoup(resp.content, "lxml") |
| |
| for a_tag in soup.find_all("a", href=True): |
| href = urljoin(current_url, a_tag["href"]).split("#")[0].strip() |
| if not is_valid_url(href) or urlparse(href).netloc != base_domain: |
| continue |
| |
| is_target_file = any(href.lower().endswith(ext) for ext in target_file_extensions) |
| is_record_page = any(indicator in href for indicator in RECORD_PAGE_INDICATORS) |
| |
| if (is_target_file or (is_record_mode and is_record_page)) and href not in found_links: |
| found_links.add(href) |
| pbar.update(1) |
| job_manager.update_log(f"🔗 + {unquote(os.path.basename(href))}") |
| elif href not in visited: |
| queue.append(href) |
| |
| except requests.exceptions.Timeout: |
| log.warning(f"⚠️ تجاوزت مهلة الزحف {current_url}.") |
| job_manager.update_log(f"⚠️ تجاوزت مهلة الرابط: {current_url}") |
| continue |
| except requests.exceptions.RequestException as e: |
| log.error(f"❌ فشل في زحف {current_url}: {e}") |
| job_manager.update_log(f"❌ فشل الرابط: {current_url} ({type(e).__name__})") |
| continue |
| except Exception as e: |
| log.error(f"❌ خطأ غير متوقع أثناء زحف {current_url}: {e}") |
| job_manager.update_log(f"❌ خطأ غير متوقع: {current_url}") |
| continue |
| |
| pbar.close() |
| job_manager.update_log(f"🔎 تم العثور على {len(found_links)} رابط. بدء المعالجة...") |
| job_manager.update_status(JobStatus.PROCESSING) |
| |
| with ThreadPoolExecutor(max_workers=app_config.MAX_WORKERS) as executor: |
| future_to_link = {} |
| for link in found_links: |
| if not is_valid_url(link): |
| job_manager.update_log(f"❌ تخطي رابط غير صالح: {link}") |
| continue |
| is_file = any(link.lower().endswith(ext) for ext in SUPPORTED_FILE_EXTENSIONS) |
| if is_file: |
| content_type = mimetypes.guess_type(link)[0] or 'application/octet-stream' |
| future_to_link[executor.submit(process_single_document, link, content_type, session)] = link |
| else: |
| future_to_link[executor.submit(process_single_record_page, link, session)] = link |
| |
| timeout = 600 |
| start_time = time.time() |
| for future in tqdm(as_completed(future_to_link, timeout=timeout), total=len(future_to_link), desc="⚙️ معالجة الأهداف"): |
| if job_manager.job["stop_event"].is_set(): |
| job_manager.update_log("🛑 توقف المعالجة بناءً على طلب الإيقاف.") |
| break |
| if time.time() - start_time > timeout: |
| job_manager.update_log("🛑 توقف المعالجة بسبب انتهاء المهلة الزمنية.") |
| break |
| try: |
| if len(future_to_link) % 10 == 0: |
| if not check_pool_health(app_context.connection_pool, app_config, log): |
| job_manager.update_log("❌ فشل فحص الـ connection pool. إيقاف المعالجة.") |
| break |
| job_manager.update_log(future.result()) |
| except Exception as e: |
| link = future_to_link[future] |
| job_manager.update_log(f"❌ فشل معالجة {link}: {str(e)}") |
| log.error(f"❌ خطأ في معالجة {link}: {str(e)}\n{traceback.format_exc()}") |
| |
| job_manager.update_log(f"✅ اكتملت المعالجة. تم حفظ {len(future_to_link)} هدف في DB/GCS.") |
| job_manager.update_status(JobStatus.DONE) |
| except Exception as e: |
| log.error(f"❌ حدث خطأ فادح في عملية الزحف: {e}") |
| job_manager.update_status(JobStatus.ERROR) |
| finally: |
| session.close() |
| job_manager.reset_thread() |
|
|
| |
| |
| |
| def initialize_app(config: Config, log: logging.Logger) -> Optional[AppContext]: |
| """تهيئة جميع المكونات.""" |
| log.info("🔧 بدء تهيئة مكونات التطبيق...") |
| context = AppContext() |
|
|
| if POSTGRES_AVAILABLE: |
| try: |
| context.connection_pool = psycopg2.pool.ThreadedConnectionPool( |
| minconn=2, |
| maxconn=config.MAX_WORKERS + 3, |
| **config.DB_CONFIG, |
| options="-c tcp_keepalives_idle=300 -c tcp_keepalives_interval=10 -c tcp_keepalives_count=5" |
| ) |
| if not check_pool_health(context.connection_pool, config, log): |
| log.error("❌ فشل فحص الـ connection pool الأولي. لا يمكن المتابعة.") |
| return None |
| context.is_db_ready = True |
| log.info("✅ تم إنشاء مجمع اتصالات قاعدة البيانات بنجاح.") |
| except Exception as e: |
| log.error(f"❌ فشل فادح في تهيئة قاعدة البيانات: {e}.") |
| return None |
| else: |
| log.error("❌ مكتبة 'psycopg2' غير مثبتة.") |
| return None |
|
|
| if GCS_GEMINI_AVAILABLE: |
| try: |
| if config.GCS_BUCKET_NAME: |
| gcs_credentials_json = os.getenv("GCS_CREDENTIALS") |
| if not gcs_credentials_json: |
| log.warning("⚠️ لم يتم العثور على GCS_CREDENTIALS.") |
| context.is_gcs_ready = False |
| else: |
| credentials_info = json.loads(gcs_credentials_json) |
| credentials = service_account.Credentials.from_service_account_info(credentials_info) |
| context.storage_client = storage.Client(credentials=credentials) |
| context.is_gcs_ready = True |
| log.info("✅ تم الاتصال بـ Google Cloud Storage.") |
| except Exception as e: |
| log.error(f"❌ فشل تهيئة Google Cloud Storage: {e}") |
| |
| if config.GOOGLE_API_KEY: |
| try: |
| genai.configure(api_key=config.GOOGLE_API_KEY) |
| context.is_gemini_ready = True |
| log.info("✅ تم إعداد Google Gemini API.") |
| except Exception as e: |
| log.error(f"❌ فشل تهيئة Google Gemini API: {e}") |
| |
| log.info("👍 اكتملت تهيئة المكونات.") |
| return context |
|
|
| |
| |
| |
| def create_gradio_interface(): |
| """إنشاء واجهة المستخدم باستخدام Gradio.""" |
| with gr.Blocks(theme=gr.themes.Soft(), title="TawasoaAI OCR & Crawler") as demo: |
| gr.Markdown("# 🚀 لوحة تحكم الزحف والمعالجة - TawasoaAI") |
| |
| with gr.Tabs(): |
| with gr.TabItem("🕷️ الزحف والتجميع"): |
| with gr.Row(): |
| with gr.Column(scale=2): |
| gr.Markdown("### 1. إعدادات الزحف") |
| url_input = gr.Textbox(label="🔗 رابط الموقع للبدء", placeholder="example.com") |
| limit_input = gr.Slider(minimum=1, maximum=5000, value=100, step=10, label="🔢 الحد الأقصى للأهداف") |
| with gr.Row(): |
| crawl_pdf_btn = gr.Button("📄 زحف PDF فقط", variant="secondary") |
| crawl_records_btn = gr.Button("📝 زحف السجلات/الصفحات (HTML)", variant="primary") |
| with gr.Row(): |
| crawl_doc_btn = gr.Button("📑 زحف DOC/DOCX فقط", variant="secondary") |
| crawl_xls_btn = gr.Button("📊 زحف XLS/XLSX فقط", variant="secondary") |
| crawl_all_btn = gr.Button("🌍 زحف كل الملفات المدعومة", variant="secondary") |
| stop_btn = gr.Button("🛑 إيقاف المهمة الحالية", variant="stop") |
| with gr.Column(scale=3): |
| gr.Markdown("### 2. مراقبة الحالة") |
| status_output = gr.Textbox(label="الحالة الحالية", value=JobStatus.IDLE.value, interactive=False) |
| log_output = gr.Textbox(label="📝 سجل الأحداث", lines=15, max_lines=15, interactive=False, autoscroll=True) |
|
|
| def start_crawl_job(url, limit, mode): |
| job_manager.start_job(crawl_website, "crawl", url, limit, mode) |
| return "بدء مهمة الزحف...", JobStatus.CRAWLING.value |
| |
| crawl_records_btn.click(fn=lambda u, l: start_crawl_job(u, l, CrawlMode.RECORDS), inputs=[url_input, limit_input], outputs=[log_output, status_output]) |
| crawl_pdf_btn.click(fn=lambda u, l: start_crawl_job(u, l, CrawlMode.PDF), inputs=[url_input, limit_input], outputs=[log_output, status_output]) |
| crawl_doc_btn.click(fn=lambda u, l: start_crawl_job(u, l, CrawlMode.DOC), inputs=[url_input, limit_input], outputs=[log_output, status_output]) |
| crawl_xls_btn.click(fn=lambda u, l: start_crawl_job(u, l, CrawlMode.XLS), inputs=[url_input, limit_input], outputs=[log_output, status_output]) |
| crawl_all_btn.click(fn=lambda u, l: start_crawl_job(u, l, CrawlMode.ALL), inputs=[url_input, limit_input], outputs=[log_output, status_output]) |
| stop_btn.click(fn=job_manager.stop_job, inputs=[], outputs=[log_output]) |
| demo.load(fn=job_manager.get_status, inputs=[], outputs=[log_output, status_output]) |
|
|
| with gr.TabItem("📄 البحث عن السجلات"): |
| gr.Markdown("### 🔍 البحث عن الشركات والرخص") |
| gr.Markdown("*(نتائج البحث ستشمل النصوص المستخلصة من صفحات السجلات (HTML) والبيانات الأساسية للملفات.)*") |
| search_input = gr.Textbox(label="اسم الشركة / رقم الترخيص / كلمات مفتاحية", placeholder="أدخل اسم شركة أو رقم رخصة...") |
| search_btn = gr.Button("ابحث", variant="primary") |
| search_results_output = gr.Dataframe( |
| headers=["اسم الملف", "الحالة", "الخلاصة", "تاريخ الإضافة", "نص المحتوى (مقتطع)"], |
| datatype=["str", "str", "str", "str", "str"], |
| row_count=5, |
| col_count=5, |
| label="نتائج البحث" |
| ) |
|
|
| def run_search(term: str) -> List[List[str]]: |
| if not app_context.is_db_ready: |
| return [["❌ فشل الاتصال بقاعدة البيانات", "", "", "", ""]] |
| results = db_search_records(app_context.connection_pool, term) |
| formatted_results = [] |
| for res in results: |
| full_text_preview = res.get('full_text_content', 'N/A') |
| if full_text_preview: |
| full_text_preview = full_text_preview[:150] + "..." if len(full_text_preview) > 150 else full_text_preview |
| display_status = STATUS_TRANSLATIONS.get(res.get('status', 'N/A'), "غير متاح") |
| formatted_results.append([ |
| res.get('filename', 'N/A'), |
| display_status, |
| res.get('summary', 'N/A'), |
| res.get('created_at', datetime.now()).strftime("%Y-%m-%d %H:%M:%S"), |
| full_text_preview |
| ]) |
| return formatted_results |
|
|
| search_btn.click(fn=run_search, inputs=[search_input], outputs=[search_results_output]) |
| |
| return demo |
|
|
| |
| |
| |
| if __name__ == "__main__": |
| log = setup_logging() |
| app_config = Config(logger=log) |
| job_manager = JobManager() |
| app_context = initialize_app(config=app_config, log=log) |
|
|
| if app_context and app_context.is_db_ready: |
| log.info("🚀 التطبيق جاهز للبدء.") |
| gradio_app = create_gradio_interface() |
| gradio_app.launch(server_name="0.0.0.0", server_port=7860, ssr_mode=False) |
| else: |
| log.critical("🔥 فشلت تهيئة التطبيق. سيتم إيقاف التشغيل.") |
| sys.exit(1) |