Spaces:
Paused
Paused
| """ | |
| وحدة الاتصال بقاعدة البيانات | |
| """ | |
| import os | |
| import sqlite3 | |
| from sqlalchemy import create_engine, MetaData, Table, Column, Integer, String, Float, DateTime, ForeignKey, Boolean, Text | |
| from sqlalchemy.ext.declarative import declarative_base | |
| from sqlalchemy.orm import sessionmaker, relationship | |
| from contextlib import contextmanager | |
| from datetime import datetime | |
| import config | |
| # إنشاء قاعدة البيانات الأساسية | |
| Base = declarative_base() | |
| class DatabaseConnector: | |
| """الفئة المسؤولة عن إدارة الاتصال بقاعدة البيانات""" | |
| def __init__(self): | |
| """تهيئة موصل قاعدة البيانات""" | |
| db_path = config.DB_PATH | |
| db_dir = os.path.dirname(db_path) | |
| # التأكد من وجود المجلد | |
| if not os.path.exists(db_dir): | |
| os.makedirs(db_dir) | |
| # إنشاء محرك قاعدة البيانات بناءاً على نوع قاعدة البيانات | |
| if config.DB_TYPE == "sqlite": | |
| self.engine = create_engine(f'sqlite:///{db_path}', echo=config.DEBUG_MODE) | |
| elif config.DB_TYPE == "mysql": | |
| # في حالة استخدام MySQL (يتطلب إضافة تفاصيل الاتصال في ملف config.py) | |
| self.engine = create_engine( | |
| f'mysql+pymysql://{config.DB_USER}:{config.DB_PASSWORD}@{config.DB_HOST}:{config.DB_PORT}/{config.DB_NAME}', | |
| echo=config.DEBUG_MODE | |
| ) | |
| elif config.DB_TYPE == "postgresql": | |
| # في حالة استخدام PostgreSQL (يتطلب إضافة تفاصيل الاتصال في ملف config.py) | |
| self.engine = create_engine( | |
| f'postgresql+psycopg2://{config.DB_USER}:{config.DB_PASSWORD}@{config.DB_HOST}:{config.DB_PORT}/{config.DB_NAME}', | |
| echo=config.DEBUG_MODE | |
| ) | |
| else: | |
| raise ValueError(f"نوع قاعدة البيانات غير مدعوم: {config.DB_TYPE}") | |
| # إنشاء جلسة للتعامل مع قاعدة البيانات | |
| self.Session = sessionmaker(bind=self.engine) | |
| def create_tables(self): | |
| """إنشاء جداول قاعدة البيانات""" | |
| Base.metadata.create_all(self.engine) | |
| def session_scope(self): | |
| """ | |
| توفير نطاق جلسة للتعامل مع قاعدة البيانات | |
| يتم استخدامه مع 'with' لضمان إغلاق الجلسة بشكل صحيح | |
| """ | |
| session = self.Session() | |
| try: | |
| yield session | |
| session.commit() | |
| except Exception as e: | |
| session.rollback() | |
| raise e | |
| finally: | |
| session.close() | |
| def execute_query(self, query): | |
| """ | |
| تنفيذ استعلام SQL مباشر | |
| المعلمات: | |
| query: استعلام SQL | |
| الإرجاع: | |
| نتائج الاستعلام | |
| """ | |
| with self.engine.connect() as connection: | |
| result = connection.execute(query) | |
| return result.fetchall() | |
| def backup_database(self, backup_path=None): | |
| """ | |
| إنشاء نسخة احتياطية من قاعدة البيانات | |
| المعلمات: | |
| backup_path: مسار النسخة الاحتياطية (اختياري) | |
| الإرجاع: | |
| مسار ملف النسخة الاحتياطية | |
| """ | |
| if config.DB_TYPE != "sqlite": | |
| raise NotImplementedError("النسخ الاحتياطي مدعوم فقط لقواعد بيانات SQLite حاليًا") | |
| # تحديد مسار النسخة الاحتياطية إذا لم يتم تحديده | |
| if backup_path is None: | |
| timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") | |
| backup_path = os.path.join( | |
| os.path.dirname(config.DB_PATH), | |
| f"backup_{timestamp}.sqlite" | |
| ) | |
| # إنشاء النسخة الاحتياطية | |
| try: | |
| # فتح اتصال مع قاعدة البيانات الأصلية | |
| source_conn = sqlite3.connect(config.DB_PATH) | |
| # فتح اتصال مع قاعدة البيانات الاحتياطية | |
| dest_conn = sqlite3.connect(backup_path) | |
| # نسخ البيانات | |
| source_conn.backup(dest_conn) | |
| # إغلاق الاتصالات | |
| source_conn.close() | |
| dest_conn.close() | |
| return backup_path | |
| except Exception as e: | |
| raise RuntimeError(f"فشل في إنشاء النسخة الاحتياطية: {str(e)}") | |
| def restore_database(self, backup_path): | |
| """ | |
| استعادة قاعدة البيانات من نسخة احتياطية | |
| المعلمات: | |
| backup_path: مسار ملف النسخة الاحتياطية | |
| الإرجاع: | |
| True في حالة نجاح الاستعادة | |
| """ | |
| if config.DB_TYPE != "sqlite": | |
| raise NotImplementedError("استعادة النسخ الاحتياطي مدعومة فقط لقواعد بيانات SQLite حاليًا") | |
| if not os.path.exists(backup_path): | |
| raise FileNotFoundError(f"ملف النسخة الاحتياطية غير موجود: {backup_path}") | |
| try: | |
| # فتح اتصال مع قاعدة البيانات الاحتياطية | |
| source_conn = sqlite3.connect(backup_path) | |
| # فتح اتصال مع قاعدة البيانات الهدف | |
| dest_conn = sqlite3.connect(config.DB_PATH) | |
| # استعادة البيانات | |
| source_conn.backup(dest_conn) | |
| # إغلاق الاتصالات | |
| source_conn.close() | |
| dest_conn.close() | |
| return True | |
| except Exception as e: | |
| raise RuntimeError(f"فشل في استعادة قاعدة البيانات: {str(e)}") | |
| # إنشاء نموذج الاتصال بقاعدة البيانات | |
| db_connector = DatabaseConnector() | |
| # الحصول على جلسة للتعامل مع قاعدة البيانات | |
| def get_db_session(): | |
| """ | |
| الحصول على جلسة للتعامل مع قاعدة البيانات | |
| الإرجاع: | |
| كائن الجلسة | |
| """ | |
| return db_connector.Session() | |
| # سياق الجلسة للتعامل مع قاعدة البيانات | |
| def session_scope(): | |
| """ | |
| توفير نطاق جلسة للتعامل مع قاعدة البيانات | |
| استخدام: | |
| with session_scope() as session: | |
| # عمليات قاعدة البيانات | |
| """ | |
| return db_connector.session_scope() |