FRA / app.py
TawasoaAi's picture
Update app.py
37ad086 verified
# -*- coding: utf-8 -*-
"""
📕 TawasoaAi_OCR.py - الإصدار 23.4 (إصلاح حالة المعالجة وتحسين عرض الحالة)
النسخة المخصصة لبيئة Hugging Face: تقوم بالزحف وحفظ الملفات في GCS وإدخال السجلات الأساسية في DB.
**تخطي جميع عمليات استخلاص النصوص و OCR للملفات، واستخلاص بسيط لصفحات السجلات.**
✅ [إصلاح 412] تعديل gcs_upload_file لاستخدام صلاحيات حساب الخدمة.
✅ [حالة DB] توحيد حالة الملفات والسجلات إلى 'awaiting_processing'.
✅ [تحسين] تعزيز دالة الزحف للتعامل مع أخطاء Requests.
✅ [إصلاح التوقف] إضافة مهلة عامة ومعالجة استثناءات الخيوط.
✅ [تحقق من الروابط] إضافة تحقق من صحة الروابط.
✅ [إصلاح Gradio] تعديل get_status لإرجاع قيمتين.
✅ [إصلاح SSL] زيادة مهلة الاتصال ودعم SSL صريح.
✅ [جديد: إصلاح الحالة] ضمان تسجيل الحالة كـ 'awaiting_processing' عند الرفع الناجح إلى GCS.
"""
# ==============================================================================
# 0. IMPORTS AND GLOBAL SETUP
# ==============================================================================
import os
import re
import io
import json
import time
import signal
import uuid
import traceback
import logging
import threading
import sys
import zipfile
import mimetypes
from concurrent.futures import ThreadPoolExecutor, as_completed
from urllib.parse import urljoin, urlparse, unquote
from collections import deque
from datetime import datetime, timedelta
from typing import Optional, List, Dict, Any, Tuple, Set
from enum import Enum
import requests
from bs4 import BeautifulSoup
from tqdm import tqdm
import gradio as gr
try:
from dotenv import load_dotenv
DOTENV_AVAILABLE = True
except ImportError:
DOTENV_AVAILABLE = False
try:
import google.generativeai as genai
from google.cloud import storage
from google.oauth2 import service_account
GCS_GEMINI_AVAILABLE = True
except ImportError:
GCS_GEMINI_AVAILABLE = False
try:
import psycopg2
from psycopg2 import pool, Binary
from psycopg2.extras import RealDictCursor, execute_values
POSTGRES_AVAILABLE = True
except ImportError:
POSTGRES_AVAILABLE = False
PDF_AVAILABLE = False
DOCX_AVAILABLE = False
EXCEL_AVAILABLE = False
OCR_AVAILABLE = False
MD_AVAILABLE = False
# ==============================================================================
# 1. CONSTANTS
# ==============================================================================
class JobStatus(Enum):
IDLE = "⚪ خامل"
SCANNING = "⏳ فحص الأرشيف..."
CRAWLING = "🔍 يتم الزحف..."
PROCESSING = "⚙️ تتم المعالجة (جلب الخام)..."
DONE = "✅ اكتمل"
GCS_UP_TO_DATE = "✅ GCS محدث"
ERROR = "🔴 خطأ"
class CrawlMode(Enum):
ALL = "الكل"
PDF = "ملفات PDF فقط"
DOC = "ملفات DOC/DOCX فقط"
XLS = "ملفات XLS/XLSX فقط"
TXT = "ملفات TXT/MD فقط"
RECORDS = "سجلات HTML فقط"
SUPPORTED_FILE_EXTENSIONS = {'.pdf', '.doc', '.docx', '.xls', '.xlsx', '.txt', '.md'}
RECORD_PAGE_INDICATORS = {'/records/', '/company_records/', '/registration/'}
REQUEST_HEADERS = {'User-Agent': 'Mozilla/5.0'}
# قاموس ترجمة الحالات لعرضها في واجهة Gradio
STATUS_TRANSLATIONS = {
"awaiting_processing": "جارى المعالجة",
"failed": "فشلت المعالجة",
"processed": "تمت المعالجة",
"N/A": "غير متاح"
}
# ==============================================================================
# 2. CONFIGURATION AND CONTEXT
# ==============================================================================
def setup_logging() -> logging.Logger:
"""إعداد وتكوين نظام تسجيل الأحداث."""
log_format = "%(asctime)s [%(levelname)s] %(name)s: %(message)s"
date_format = "%Y-%m-%d %H:%M:%S"
logger = logging.getLogger("TawasoaAiOCR")
if not logger.handlers:
logger.setLevel(logging.INFO)
file_handler = logging.FileHandler('tawasoa_ai_ocr.log', encoding='utf-8')
file_handler.setFormatter(logging.Formatter(log_format, date_format))
logger.addHandler(file_handler)
stream_handler = logging.StreamHandler(sys.stdout)
stream_handler.setFormatter(logging.Formatter(log_format, date_format))
logger.addHandler(stream_handler)
return logger
class Config:
"""كلاس مركزي لتجميع إعدادات المشروع."""
def __init__(self, logger: logging.Logger):
if DOTENV_AVAILABLE:
load_dotenv()
self.log = logger
self.DB_CONFIG: Dict[str, Any] = {
"host": os.getenv("DB_HOST", "localhost"),
"user": os.getenv("DB_USERNAME", "postgres"),
"password": os.getenv("DB_PASSWORD"),
"dbname": os.getenv("DB_DATABASE", "tawasoa"),
"port": self._get_env_as_int("DB_PORT", 5432),
"sslmode": os.getenv("DB_SSLMODE", "prefer"),
"connect_timeout": 30
}
self.GCS_BUCKET_NAME: Optional[str] = os.getenv("GCS_BUCKET_NAME")
self.GOOGLE_API_KEY: Optional[str] = os.getenv("GOOGLE_API_KEY")
self.MAX_WORKERS: int = self._get_env_as_int("MAX_WORKERS_DEFAULT", 5)
def _get_env_as_int(self, key: str, default: int) -> int:
try:
return int(os.getenv(key, str(default)))
except (ValueError, TypeError):
self.log.warning(f"قيمة متغير البيئة '{key}' غير صالحة. استخدام الافتراضي: {default}")
return default
class AppContext:
"""يحتوي على حالة التطبيق الحية والمكونات المُهيأة."""
def __init__(self):
self.connection_pool = None
self.storage_client = None
self.app_shutdown = threading.Event()
self.is_db_ready: bool = False
self.is_gcs_ready: bool = False
self.is_gemini_ready: bool = False
# ==============================================================================
# 3. STATE MANAGEMENT
# ==============================================================================
class JobManager:
"""فئة لإدارة حالة المهمة التي تعمل في الخلفية."""
def __init__(self):
self.lock = threading.Lock()
self.job = {
"thread": None,
"type": None,
"status": JobStatus.IDLE.value,
"log": ["مرحبًا بك!"],
"stop_event": threading.Event()
}
def update_log(self, message: str):
if not message: return
with self.lock:
self.job["log"].append(f"[{time.strftime('%H:%M:%S')}] {message}")
if len(self.job["log"]) > 200:
self.job["log"] = self.job["log"][-200:]
def update_status(self, status: JobStatus):
with self.lock:
self.job["status"] = status.value
def get_status(self) -> tuple[str, str]:
with self.lock:
log_text = "\n".join(self.job["log"][-100:])
status = self.job["status"]
return log_text, status
def start_job(self, target_func, job_type: str, *args):
with self.lock:
if self.job["thread"] and self.job["thread"].is_alive():
self.update_log("⚠️ مهمة أخرى قيد التشغيل بالفعل.")
return
self.job["stop_event"].clear()
self.job["type"] = job_type
self.job["thread"] = threading.Thread(target=target_func, args=args)
self.job["thread"].start()
def stop_job(self):
with self.lock:
if self.job["thread"] and self.job["thread"].is_alive():
self.update_log("🛑 جاري إرسال إشارة الإيقاف...")
self.job["stop_event"].set()
return "تم إرسال طلب الإيقاف."
return "لا توجد مهمة قيد التشغيل."
def reset_thread(self):
with self.lock:
self.job["thread"] = None
# ==============================================================================
# 4. DATABASE OPERATIONS
# ==============================================================================
def check_pool_health(pool, config: Config, log: logging.Logger) -> bool:
"""فحص صحة الـ connection pool وإعادة تهيئته إذا لزم الأمر."""
try:
test_conn = pool.getconn()
with test_conn.cursor() as cursor:
cursor.execute("SELECT 1")
pool.putconn(test_conn)
log.info("✅ فحص صحة الـ connection pool: ناجح.")
return True
except Exception as e:
log.error(f"❌ فشل فحص الـ connection pool: {e}. محاولة إعادة التهيئة...")
try:
pool.closeall()
new_pool = psycopg2.pool.ThreadedConnectionPool(
minconn=2,
maxconn=config.MAX_WORKERS + 3,
**config.DB_CONFIG
)
pool = new_pool
log.info("✅ تم إعادة تهيئة الـ connection pool بنجاح.")
return True
except Exception as e:
log.error(f"❌ فشل إعادة تهيئة الـ connection pool: {e}.")
return False
def db_execute_query(pool, query: str, params=None, fetch_result=False, max_retries=7):
"""تنفيذ استعلام قاعدة البيانات مع إعادة المحاولة."""
retry_delays = [5, 10, 15, 20, 25, 30, 35]
for attempt in range(max_retries):
conn = None
try:
conn = pool.getconn()
with conn.cursor() as cursor:
cursor.execute(query, params)
if fetch_result:
result = cursor.fetchall()
else:
result = True
conn.commit()
log.info(f"✅ نجاح استعلام قاعدة البيانات في المحاولة {attempt + 1}.")
return result
except (psycopg2.OperationalError, psycopg2.InterfaceError) as e:
log.warning(f"⚠️ خطأ تشغيلي في قاعدة البيانات (محاولة {attempt + 1}/{max_retries}): {e}.")
if conn:
try:
conn.rollback()
except:
pass
pool.putconn(conn, close=True)
conn = None
if attempt < max_retries - 1:
delay = retry_delays[attempt]
log.info(f"إعادة المحاولة بعد {delay} ثوانٍ.")
time.sleep(delay)
continue
else:
log.error(f"❌ فشلت جميع محاولات الاتصال بعد {max_retries} محاولات.")
raise
except Exception as e:
if conn:
try:
conn.rollback()
except:
pass
raise
finally:
if conn:
try:
pool.putconn(conn)
except psycopg2.pool.PoolError as pe:
log.error(f"❌ خطأ في إعادة الاتصال إلى التجمع: {pe}. إغلاق الاتصال.")
pool.putconn(conn, close=True)
return False
def db_check_document_exists(pool, original_path: str) -> bool:
"""التحقق من وجود مستند."""
results = db_execute_query(
pool,
"SELECT id FROM documents WHERE original_path = %s;",
(original_path,),
fetch_result=True
)
return bool(results) if results is not False else False
def db_insert_document(pool, doc_data: Dict[str, Any]) -> int:
"""إدراج سجل مستند جديد."""
conn = pool.getconn()
try:
with conn.cursor() as cursor:
cursor.execute(
"""INSERT INTO documents (source, filename, original_path, storage_path, cover_image_path, status, summary, full_text_content, created_at, updated_at)
VALUES (%(source)s, %(filename)s, %(original_path)s, %(storage_path)s, %(cover_image_path)s, %(status)s, %(summary)s, %(full_text_content)s, NOW(), NOW())
RETURNING id;""",
doc_data
)
doc_id = cursor.fetchone()[0]
conn.commit()
log.info(f"✅ تم إدراج المستند '{doc_data['filename']}' بحالة '{doc_data['status']}'.")
return doc_id
except Exception as e:
conn.rollback()
log.error(f"❌ فشل إدراج المستند '{doc_data['filename']}': {e}")
raise
finally:
pool.putconn(conn)
def db_log_failed_document(pool, original_path: str, filename: str, error: str):
"""تسجيل المستندات الفاشلة."""
conn = pool.getconn()
try:
with conn.cursor() as cursor:
cursor.execute(
"INSERT INTO documents (source, filename, original_path, status, summary, created_at, updated_at) "
"VALUES (%s, %s, %s, 'failed', %s, NOW(), NOW()) ON CONFLICT (original_path) DO NOTHING",
(original_path, filename, original_path, f"فشل الرفع: {error}")
)
conn.commit()
log.info(f"📝 تم تسجيل المستند الفاشل '{filename}' بسبب: {error}")
except Exception as e:
log.error(f"❌ فشل تسجيل المستند الفاشل '{filename}': {e}")
finally:
pool.putconn(conn)
def db_search_records(pool, search_term: str) -> List[Dict[str, Any]]:
"""البحث في المستندات."""
conn = pool.getconn()
try:
with conn.cursor(cursor_factory=RealDictCursor) as cursor:
search_pattern = f"%{search_term}%"
cursor.execute(
"""
SELECT id, filename, original_path, status, summary, created_at, full_text_content
FROM documents
WHERE filename ILIKE %s OR full_text_content ILIKE %s
ORDER BY created_at DESC
LIMIT 50;
""",
(search_pattern, search_pattern)
)
return cursor.fetchall()
finally:
pool.putconn(conn)
# ==============================================================================
# 5. FILE PARSING AND OCR
# ==============================================================================
def decompose_html_record(content_bytes: bytes) -> str:
"""استخلاص النص المرئي من HTML."""
try:
soup = BeautifulSoup(content_bytes, 'html.parser')
for script_or_style in soup(['script', 'style', 'header', 'footer', 'nav']):
script_or_style.decompose()
return soup.get_text(separator=' ', strip=True)
except Exception as e:
log.error(f"❌ فشل في تحليل HTML: {e}")
return content_bytes.decode('utf-8', errors='ignore')
def is_valid_docx(content_bytes: bytes) -> bool: return True
def ocr_image_to_text(image_bytes: bytes) -> str: return ""
def generate_page_images_as_binary(content_bytes: bytes) -> List[Dict[str, Any]]: return []
def decompose_pdf(content_bytes: bytes, page_images: List[Dict[str, Any]]) -> List[Dict[str, Any]]: return []
def decompose_docx(content_bytes: bytes) -> List[Dict[str, Any]]: return []
def decompose_excel(content_bytes: bytes) -> List[Dict[str, Any]]: return []
def decompose_text_or_md(content_bytes: bytes, file_ext: str) -> List[Dict[str, Any]]: return []
# ==============================================================================
# 6. CLOUD AND AI SERVICES
# ==============================================================================
def gcs_upload_file(client, bucket_name: str, content_bytes: bytes, blob_name: str, content_type: str) -> str:
"""رفع ملف إلى GCS."""
bucket = client.bucket(bucket_name)
blob = bucket.blob(blob_name)
blob.upload_from_string(content_bytes, content_type=content_type)
log.info(f"✅ تم رفع الملف '{blob_name}' إلى GCS.")
return f"gs://{bucket_name}/{blob_name}"
def gcs_generate_cover_image(client, bucket_name: str, content_bytes: bytes, original_filename: str) -> Optional[str]:
"""تعطيل إنشاء الغلاف."""
return None
def ai_get_embedding(text: str) -> Optional[List[float]]:
"""تعطيل التضمين."""
return None
def ai_summarize_and_classify(text: str) -> Dict[str, str]:
"""تعطيل التلخيص والتصنيف."""
return {"summary": "لم تتم معالجة النص على الخادم.", "category": "غير مصنف"}
# ==============================================================================
# 7. CORE LOGIC / WORKERS
# ==============================================================================
def is_valid_url(url: str) -> bool:
"""التحقق من صحة الرابط."""
try:
result = urlparse(url)
return all([result.scheme, result.netloc])
except Exception:
return False
def process_single_document(link: str, content_type: str, session: requests.Session) -> str:
"""معالجة مستند واحد."""
safe_filename = unquote(os.path.basename(urlparse(link).path))
try:
if db_check_document_exists(app_context.connection_pool, link):
return f"🟡 (تخطي) ملف موجود مسبقاً: {safe_filename}"
resp = session.get(link, timeout=20, headers=REQUEST_HEADERS)
resp.raise_for_status()
content_bytes = resp.content
full_text_content = ""
document_uuid = str(uuid.uuid4())
blob_name = f"documents/{document_uuid}/{safe_filename}"
storage_path = gcs_upload_file(app_context.storage_client, app_config.GCS_BUCKET_NAME, content_bytes, blob_name, content_type)
cover_image_url = gcs_generate_cover_image(app_context.storage_client, app_config.GCS_BUCKET_NAME, content_bytes, safe_filename)
summary = "تم حفظ الملف الخام في GCS. (جارى انتظار المعالجة النصية محليًا)."
status = 'awaiting_processing'
doc_data = {
"source": link,
"filename": safe_filename,
"original_path": link,
"storage_path": storage_path,
"cover_image_path": cover_image_url,
"status": status,
"summary": summary,
"full_text_content": full_text_content,
}
doc_id = db_insert_document(app_context.connection_pool, doc_data)
return f"✅ تم حفظ الملف الخام '{safe_filename}' في GCS و DB (ID: {doc_id}). (الحالة: جارى المعالجة)"
except requests.exceptions.RequestException as e:
error_msg = f"فشل طلب HTTP: {str(e)}"
log.error(f"❌ خطأ فادح أثناء معالجة ملف {safe_filename}: {error_msg}\n{traceback.format_exc()}")
db_log_failed_document(app_context.connection_pool, link, safe_filename, error_msg)
return f"❌ فشلت معالجة ملف {safe_filename} - {error_msg}"
except Exception as e:
error_msg = f"خطأ غير متوقع: {str(e)}"
log.error(f"❌ خطأ فادح أثناء معالجة ملف {safe_filename}: {error_msg}\n{traceback.format_exc()}")
db_log_failed_document(app_context.connection_pool, link, safe_filename, error_msg)
return f"❌ فشلت معالجة ملف {safe_filename} - {error_msg}"
def process_single_record_page(link: str, session: requests.Session) -> str:
"""معالجة صفحة سجل واحدة (HTML)."""
safe_filename = unquote(os.path.basename(urlparse(link).path))
try:
if db_check_document_exists(app_context.connection_pool, link):
return f"🟡 (تخطي) سجل موجود مسبقاً: {safe_filename}"
resp = session.get(link, timeout=20, headers=REQUEST_HEADERS)
resp.raise_for_status()
content_bytes = resp.content
full_text_content = decompose_html_record(content_bytes)
summary = "سجل تم استخلاص نصه جزئياً (HTML). (جارى انتظار المعالجة المتبقية محليًا)."
status = 'awaiting_processing'
doc_data = {
"source": link,
"filename": safe_filename,
"original_path": link,
"storage_path": None,
"cover_image_path": None,
"status": status,
"summary": summary,
"full_text_content": full_text_content,
}
doc_id = db_insert_document(app_context.connection_pool, doc_data)
return f"✅ تم حفظ سجل الصفحة '{safe_filename}' في DB (ID: {doc_id}). (الحالة: جارى المعالجة)"
except requests.exceptions.RequestException as e:
error_msg = f"فشل طلب HTTP: {str(e)}"
log.error(f"❌ خطأ فادح أثناء معالجة سجل {safe_filename}: {error_msg}\n{traceback.format_exc()}")
db_log_failed_document(app_context.connection_pool, link, safe_filename, error_msg)
return f"❌ فشلت معالجة سجل {safe_filename} - {error_msg}"
except Exception as e:
error_msg = f"خطأ غير متوقع: {str(e)}"
log.error(f"❌ خطأ فادح أثناء معالجة سجل {safe_filename}: {error_msg}\n{traceback.format_exc()}")
db_log_failed_document(app_context.connection_pool, link, safe_filename, error_msg)
return f"❌ فشلت معالجة سجل {safe_filename} - {error_msg}"
def crawl_website(url: str, limit: int, mode: CrawlMode):
"""المحرك الرئيسي لعملية الزحف."""
job_manager.update_status(JobStatus.CRAWLING)
session = requests.Session()
try:
if not is_valid_url(url):
job_manager.update_log(f"❌ رابط البداية غير صالح: {url}")
job_manager.update_status(JobStatus.ERROR)
return
if not url.startswith(('http://', 'https://')):
url = 'https://' + url
base_domain = urlparse(url).netloc
visited = set()
queue = deque([url])
found_links = set()
target_file_extensions = {
CrawlMode.PDF: ['.pdf'], CrawlMode.DOC: ['.doc', '.docx'],
CrawlMode.XLS: ['.xls', '.xlsx'], CrawlMode.TXT: ['.txt', '.md'],
CrawlMode.ALL: SUPPORTED_FILE_EXTENSIONS
}.get(mode, set())
is_record_mode = mode == CrawlMode.RECORDS
pbar = tqdm(total=limit, desc=f"🔍 جمع ({mode.value})")
while queue and len(found_links) < limit:
if job_manager.job["stop_event"].is_set():
job_manager.update_log("🛑 تم استلام إشارة الإيقاف أثناء الزحف.")
break
current_url = queue.popleft()
if current_url in visited or not is_valid_url(current_url):
continue
visited.add(current_url)
try:
resp = session.get(current_url, timeout=20, headers=REQUEST_HEADERS)
resp.raise_for_status()
soup = BeautifulSoup(resp.content, "lxml")
for a_tag in soup.find_all("a", href=True):
href = urljoin(current_url, a_tag["href"]).split("#")[0].strip()
if not is_valid_url(href) or urlparse(href).netloc != base_domain:
continue
is_target_file = any(href.lower().endswith(ext) for ext in target_file_extensions)
is_record_page = any(indicator in href for indicator in RECORD_PAGE_INDICATORS)
if (is_target_file or (is_record_mode and is_record_page)) and href not in found_links:
found_links.add(href)
pbar.update(1)
job_manager.update_log(f"🔗 + {unquote(os.path.basename(href))}")
elif href not in visited:
queue.append(href)
except requests.exceptions.Timeout:
log.warning(f"⚠️ تجاوزت مهلة الزحف {current_url}.")
job_manager.update_log(f"⚠️ تجاوزت مهلة الرابط: {current_url}")
continue
except requests.exceptions.RequestException as e:
log.error(f"❌ فشل في زحف {current_url}: {e}")
job_manager.update_log(f"❌ فشل الرابط: {current_url} ({type(e).__name__})")
continue
except Exception as e:
log.error(f"❌ خطأ غير متوقع أثناء زحف {current_url}: {e}")
job_manager.update_log(f"❌ خطأ غير متوقع: {current_url}")
continue
pbar.close()
job_manager.update_log(f"🔎 تم العثور على {len(found_links)} رابط. بدء المعالجة...")
job_manager.update_status(JobStatus.PROCESSING)
with ThreadPoolExecutor(max_workers=app_config.MAX_WORKERS) as executor:
future_to_link = {}
for link in found_links:
if not is_valid_url(link):
job_manager.update_log(f"❌ تخطي رابط غير صالح: {link}")
continue
is_file = any(link.lower().endswith(ext) for ext in SUPPORTED_FILE_EXTENSIONS)
if is_file:
content_type = mimetypes.guess_type(link)[0] or 'application/octet-stream'
future_to_link[executor.submit(process_single_document, link, content_type, session)] = link
else:
future_to_link[executor.submit(process_single_record_page, link, session)] = link
timeout = 600
start_time = time.time()
for future in tqdm(as_completed(future_to_link, timeout=timeout), total=len(future_to_link), desc="⚙️ معالجة الأهداف"):
if job_manager.job["stop_event"].is_set():
job_manager.update_log("🛑 توقف المعالجة بناءً على طلب الإيقاف.")
break
if time.time() - start_time > timeout:
job_manager.update_log("🛑 توقف المعالجة بسبب انتهاء المهلة الزمنية.")
break
try:
if len(future_to_link) % 10 == 0:
if not check_pool_health(app_context.connection_pool, app_config, log):
job_manager.update_log("❌ فشل فحص الـ connection pool. إيقاف المعالجة.")
break
job_manager.update_log(future.result())
except Exception as e:
link = future_to_link[future]
job_manager.update_log(f"❌ فشل معالجة {link}: {str(e)}")
log.error(f"❌ خطأ في معالجة {link}: {str(e)}\n{traceback.format_exc()}")
job_manager.update_log(f"✅ اكتملت المعالجة. تم حفظ {len(future_to_link)} هدف في DB/GCS.")
job_manager.update_status(JobStatus.DONE)
except Exception as e:
log.error(f"❌ حدث خطأ فادح في عملية الزحف: {e}")
job_manager.update_status(JobStatus.ERROR)
finally:
session.close()
job_manager.reset_thread()
# ==============================================================================
# 8. APPLICATION INITIALIZATION
# ==============================================================================
def initialize_app(config: Config, log: logging.Logger) -> Optional[AppContext]:
"""تهيئة جميع المكونات."""
log.info("🔧 بدء تهيئة مكونات التطبيق...")
context = AppContext()
if POSTGRES_AVAILABLE:
try:
context.connection_pool = psycopg2.pool.ThreadedConnectionPool(
minconn=2,
maxconn=config.MAX_WORKERS + 3,
**config.DB_CONFIG,
options="-c tcp_keepalives_idle=300 -c tcp_keepalives_interval=10 -c tcp_keepalives_count=5"
)
if not check_pool_health(context.connection_pool, config, log):
log.error("❌ فشل فحص الـ connection pool الأولي. لا يمكن المتابعة.")
return None
context.is_db_ready = True
log.info("✅ تم إنشاء مجمع اتصالات قاعدة البيانات بنجاح.")
except Exception as e:
log.error(f"❌ فشل فادح في تهيئة قاعدة البيانات: {e}.")
return None
else:
log.error("❌ مكتبة 'psycopg2' غير مثبتة.")
return None
if GCS_GEMINI_AVAILABLE:
try:
if config.GCS_BUCKET_NAME:
gcs_credentials_json = os.getenv("GCS_CREDENTIALS")
if not gcs_credentials_json:
log.warning("⚠️ لم يتم العثور على GCS_CREDENTIALS.")
context.is_gcs_ready = False
else:
credentials_info = json.loads(gcs_credentials_json)
credentials = service_account.Credentials.from_service_account_info(credentials_info)
context.storage_client = storage.Client(credentials=credentials)
context.is_gcs_ready = True
log.info("✅ تم الاتصال بـ Google Cloud Storage.")
except Exception as e:
log.error(f"❌ فشل تهيئة Google Cloud Storage: {e}")
if config.GOOGLE_API_KEY:
try:
genai.configure(api_key=config.GOOGLE_API_KEY)
context.is_gemini_ready = True
log.info("✅ تم إعداد Google Gemini API.")
except Exception as e:
log.error(f"❌ فشل تهيئة Google Gemini API: {e}")
log.info("👍 اكتملت تهيئة المكونات.")
return context
# ==============================================================================
# 9. GRADIO UI DEFINITION
# ==============================================================================
def create_gradio_interface():
"""إنشاء واجهة المستخدم باستخدام Gradio."""
with gr.Blocks(theme=gr.themes.Soft(), title="TawasoaAI OCR & Crawler") as demo:
gr.Markdown("# 🚀 لوحة تحكم الزحف والمعالجة - TawasoaAI")
with gr.Tabs():
with gr.TabItem("🕷️ الزحف والتجميع"):
with gr.Row():
with gr.Column(scale=2):
gr.Markdown("### 1. إعدادات الزحف")
url_input = gr.Textbox(label="🔗 رابط الموقع للبدء", placeholder="example.com")
limit_input = gr.Slider(minimum=1, maximum=5000, value=100, step=10, label="🔢 الحد الأقصى للأهداف")
with gr.Row():
crawl_pdf_btn = gr.Button("📄 زحف PDF فقط", variant="secondary")
crawl_records_btn = gr.Button("📝 زحف السجلات/الصفحات (HTML)", variant="primary")
with gr.Row():
crawl_doc_btn = gr.Button("📑 زحف DOC/DOCX فقط", variant="secondary")
crawl_xls_btn = gr.Button("📊 زحف XLS/XLSX فقط", variant="secondary")
crawl_all_btn = gr.Button("🌍 زحف كل الملفات المدعومة", variant="secondary")
stop_btn = gr.Button("🛑 إيقاف المهمة الحالية", variant="stop")
with gr.Column(scale=3):
gr.Markdown("### 2. مراقبة الحالة")
status_output = gr.Textbox(label="الحالة الحالية", value=JobStatus.IDLE.value, interactive=False)
log_output = gr.Textbox(label="📝 سجل الأحداث", lines=15, max_lines=15, interactive=False, autoscroll=True)
def start_crawl_job(url, limit, mode):
job_manager.start_job(crawl_website, "crawl", url, limit, mode)
return "بدء مهمة الزحف...", JobStatus.CRAWLING.value
crawl_records_btn.click(fn=lambda u, l: start_crawl_job(u, l, CrawlMode.RECORDS), inputs=[url_input, limit_input], outputs=[log_output, status_output])
crawl_pdf_btn.click(fn=lambda u, l: start_crawl_job(u, l, CrawlMode.PDF), inputs=[url_input, limit_input], outputs=[log_output, status_output])
crawl_doc_btn.click(fn=lambda u, l: start_crawl_job(u, l, CrawlMode.DOC), inputs=[url_input, limit_input], outputs=[log_output, status_output])
crawl_xls_btn.click(fn=lambda u, l: start_crawl_job(u, l, CrawlMode.XLS), inputs=[url_input, limit_input], outputs=[log_output, status_output])
crawl_all_btn.click(fn=lambda u, l: start_crawl_job(u, l, CrawlMode.ALL), inputs=[url_input, limit_input], outputs=[log_output, status_output])
stop_btn.click(fn=job_manager.stop_job, inputs=[], outputs=[log_output])
demo.load(fn=job_manager.get_status, inputs=[], outputs=[log_output, status_output])
with gr.TabItem("📄 البحث عن السجلات"):
gr.Markdown("### 🔍 البحث عن الشركات والرخص")
gr.Markdown("*(نتائج البحث ستشمل النصوص المستخلصة من صفحات السجلات (HTML) والبيانات الأساسية للملفات.)*")
search_input = gr.Textbox(label="اسم الشركة / رقم الترخيص / كلمات مفتاحية", placeholder="أدخل اسم شركة أو رقم رخصة...")
search_btn = gr.Button("ابحث", variant="primary")
search_results_output = gr.Dataframe(
headers=["اسم الملف", "الحالة", "الخلاصة", "تاريخ الإضافة", "نص المحتوى (مقتطع)"],
datatype=["str", "str", "str", "str", "str"],
row_count=5,
col_count=5,
label="نتائج البحث"
)
def run_search(term: str) -> List[List[str]]:
if not app_context.is_db_ready:
return [["❌ فشل الاتصال بقاعدة البيانات", "", "", "", ""]]
results = db_search_records(app_context.connection_pool, term)
formatted_results = []
for res in results:
full_text_preview = res.get('full_text_content', 'N/A')
if full_text_preview:
full_text_preview = full_text_preview[:150] + "..." if len(full_text_preview) > 150 else full_text_preview
display_status = STATUS_TRANSLATIONS.get(res.get('status', 'N/A'), "غير متاح")
formatted_results.append([
res.get('filename', 'N/A'),
display_status,
res.get('summary', 'N/A'),
res.get('created_at', datetime.now()).strftime("%Y-%m-%d %H:%M:%S"),
full_text_preview
])
return formatted_results
search_btn.click(fn=run_search, inputs=[search_input], outputs=[search_results_output])
return demo
# ==============================================================================
# 10. MAIN APPLICATION ENTRY POINT
# ==============================================================================
if __name__ == "__main__":
log = setup_logging()
app_config = Config(logger=log)
job_manager = JobManager()
app_context = initialize_app(config=app_config, log=log)
if app_context and app_context.is_db_ready:
log.info("🚀 التطبيق جاهز للبدء.")
gradio_app = create_gradio_interface()
gradio_app.launch(server_name="0.0.0.0", server_port=7860, ssr_mode=False)
else:
log.critical("🔥 فشلت تهيئة التطبيق. سيتم إيقاف التشغيل.")
sys.exit(1)