mgpt / api /database.py
Mark-Lasfar
Fix ChunkedIteratorResult in SQLAlchemyUserDatabase and toggleBtn null error
77b0d42
raw
history blame
4.91 kB
# SPDX-FileCopyrightText: Hadad <hadad@linuxmail.org>
# SPDX-License-Identifier: Apache-2.0
import os
import logging
from datetime import datetime
from typing import AsyncGenerator
from sqlalchemy import Column, String, Integer, ForeignKey, DateTime, Boolean, Text
from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker, AsyncSession
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import relationship
from fastapi_users.db import SQLAlchemyBaseUserTable, SQLAlchemyUserDatabase
from fastapi import Depends
import aiosqlite # مهم جداً: حتى يضمن استخدام aiosqlite وليس pysqlite
# إعداد اللوج
logger = logging.getLogger(__name__)
# ✅ استخدم القيمة مباشرة إذا لم يكن هناك متغير بيئة
SQLALCHEMY_DATABASE_URL = os.environ.get(
"SQLALCHEMY_DATABASE_URL"
) or "sqlite+aiosqlite:///./data/mgzon_users.db"
# ✅ تأكد أن الدرايفر async
if "aiosqlite" not in SQLALCHEMY_DATABASE_URL:
raise ValueError("Database URL must use 'sqlite+aiosqlite' for async support")
# إنشاء محرك async
async_engine = create_async_engine(
SQLALCHEMY_DATABASE_URL,
echo=True,
connect_args={"check_same_thread": False}
)
# إعداد الجلسة async
AsyncSessionLocal = async_sessionmaker(
async_engine,
expire_on_commit=False,
class_=AsyncSession
)
# القاعدة الأساسية للنماذج
Base = declarative_base()
# النماذج (Models)
class OAuthAccount(Base):
__tablename__ = "oauth_account"
id = Column(Integer, primary_key=True, index=True)
user_id = Column(Integer, ForeignKey("user.id"), nullable=False)
oauth_name = Column(String, nullable=False)
access_token = Column(String, nullable=False)
expires_at = Column(Integer, nullable=True)
refresh_token = Column(String, nullable=True)
account_id = Column(String, index=True, nullable=False)
account_email = Column(String, nullable=False)
user = relationship("User", back_populates="oauth_accounts", lazy="selectin")
class User(SQLAlchemyBaseUserTable[int], Base):
__tablename__ = "user"
id = Column(Integer, primary_key=True, index=True)
email = Column(String, unique=True, index=True, nullable=False)
hashed_password = Column(String, nullable=False)
is_active = Column(Boolean, default=True)
is_superuser = Column(Boolean, default=False)
is_verified = Column(Boolean, default=False)
display_name = Column(String, nullable=True)
preferred_model = Column(String, nullable=True)
job_title = Column(String, nullable=True)
education = Column(String, nullable=True)
interests = Column(String, nullable=True)
additional_info = Column(Text, nullable=True)
conversation_style = Column(String, nullable=True)
oauth_accounts = relationship("OAuthAccount", back_populates="user", cascade="all, delete-orphan")
conversations = relationship("Conversation", back_populates="user", cascade="all, delete-orphan")
class Conversation(Base):
__tablename__ = "conversation"
id = Column(Integer, primary_key=True, index=True)
conversation_id = Column(String, unique=True, index=True, nullable=False)
user_id = Column(Integer, ForeignKey("user.id"), nullable=False)
title = Column(String, nullable=False)
created_at = Column(DateTime, default=datetime.utcnow)
updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
user = relationship("User", back_populates="conversations")
messages = relationship("Message", back_populates="conversation", cascade="all, delete-orphan")
class Message(Base):
__tablename__ = "message"
id = Column(Integer, primary_key=True, index=True)
conversation_id = Column(Integer, ForeignKey("conversation.id"), nullable=False)
role = Column(String, nullable=False)
content = Column(Text, nullable=False)
created_at = Column(DateTime, default=datetime.utcnow)
conversation = relationship("Conversation", back_populates="messages")
# دالة لجلب الجلسة async
async def get_db() -> AsyncGenerator[AsyncSession, None]:
async with AsyncSessionLocal() as session:
try:
yield session
finally:
await session.close()
# دالة لجلب قاعدة بيانات المستخدمين لـ fastapi-users
async def get_user_db(session: AsyncSession = Depends(get_db)) -> AsyncGenerator[SQLAlchemyUserDatabase, None]:
yield SQLAlchemyUserDatabase(session, User, OAuthAccount)
# (اختياري) دالة لإنشاء الجداول
async def init_db():
try:
async with async_engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
logger.info("Database tables created successfully")
except Exception as e:
logger.error(f"Error creating database tables: {e}")
raise