Spaces:
Running
Running
| """ | |
| database/crud.py β All database Create/Read/Update/Delete operations. | |
| Each function accepts a session and returns ORM objects or primitives. | |
| """ | |
| from datetime import datetime, timedelta | |
| from sqlalchemy import select, delete, func, desc | |
| from sqlalchemy.ext.asyncio import AsyncSession | |
| from sqlalchemy.orm import selectinload | |
| from database.models import Admin, AdminLog, Folder, Item, User, UserLog | |
| # βββββββββββββββββββββββββββ USER CRUD βββββββββββββββββββββββββββ | |
| async def upsert_user(session: AsyncSession, user_id: int, username: str | None, full_name: str) -> User: | |
| """Create a new user or update their info if they already exist.""" | |
| user = await session.get(User, user_id) | |
| if not user: | |
| user = User(id=user_id, username=username, full_name=full_name) | |
| session.add(user) | |
| else: | |
| user.username = username | |
| user.full_name = full_name | |
| await session.commit() | |
| return user | |
| async def get_user(session: AsyncSession, user_id: int) -> User | None: | |
| return await session.get(User, user_id) | |
| async def get_all_users(session: AsyncSession) -> list[User]: | |
| result = await session.execute(select(User).order_by(User.joined_at.desc())) | |
| return result.scalars().all() | |
| async def log_user_action(session: AsyncSession, user_id: int, action: str): | |
| """Record a user navigation event.""" | |
| log = UserLog(user_id=user_id, action=action) | |
| session.add(log) | |
| await session.commit() | |
| # βββββββββββββββββββββββββββ ADMIN CRUD βββββββββββββββββββββββββββ | |
| async def get_admin(session: AsyncSession, user_id: int) -> Admin | None: | |
| result = await session.execute(select(Admin).where(Admin.user_id == user_id)) | |
| return result.scalar_one_or_none() | |
| async def get_all_admins(session: AsyncSession) -> list[Admin]: | |
| result = await session.execute(select(Admin).order_by(Admin.added_at.desc())) | |
| return result.scalars().all() | |
| async def add_admin( | |
| session: AsyncSession, user_id: int, username: str | None, | |
| full_name: str, added_by: int | |
| ) -> Admin: | |
| """Promote a user to admin. Ensures the user record exists first.""" | |
| # Upsert user record | |
| await upsert_user(session, user_id, username, full_name) | |
| admin = Admin(user_id=user_id, username=username, full_name=full_name, added_by=added_by) | |
| session.add(admin) | |
| await session.commit() | |
| return admin | |
| async def remove_admin(session: AsyncSession, user_id: int) -> bool: | |
| """Remove admin privileges. Returns True if an admin was found and deleted.""" | |
| result = await session.execute(delete(Admin).where(Admin.user_id == user_id)) | |
| await session.commit() | |
| return result.rowcount > 0 | |
| async def log_admin_action(session: AsyncSession, admin_user_id: int, action: str): | |
| """Record an admin's action for the Owner's audit trail.""" | |
| # Find the Admin record (may not exist if it's the Owner acting) | |
| result = await session.execute(select(Admin).where(Admin.user_id == admin_user_id)) | |
| admin = result.scalar_one_or_none() | |
| log = AdminLog( | |
| admin_id=admin.id if admin else None, | |
| admin_user_id=admin_user_id, | |
| action=action | |
| ) | |
| session.add(log) | |
| await session.commit() | |
| # βββββββββββββββββββββββββββ FOLDER CRUD βββββββββββββββββββββββββββ | |
| async def get_root_folders(session: AsyncSession) -> list[Folder]: | |
| """Get all top-level folders (no parent).""" | |
| result = await session.execute( | |
| select(Folder).where(Folder.parent_id.is_(None)).order_by(Folder.name) | |
| ) | |
| return result.scalars().all() | |
| async def get_subfolders(session: AsyncSession, parent_id: int) -> list[Folder]: | |
| """Get direct children of a given folder.""" | |
| result = await session.execute( | |
| select(Folder).where(Folder.parent_id == parent_id).order_by(Folder.name) | |
| ) | |
| return result.scalars().all() | |
| async def get_folder(session: AsyncSession, folder_id: int) -> Folder | None: | |
| return await session.get(Folder, folder_id) | |
| async def create_folder( | |
| session: AsyncSession, name: str, created_by: int, | |
| parent_id: int | None = None, emoji: str = "π" | |
| ) -> Folder: | |
| folder = Folder(name=name, created_by=created_by, parent_id=parent_id, emoji=emoji) | |
| session.add(folder) | |
| await session.commit() | |
| await session.refresh(folder) | |
| return folder | |
| async def update_folder(session: AsyncSession, folder_id: int, name: str, emoji: str) -> Folder | None: | |
| folder = await session.get(Folder, folder_id) | |
| if folder: | |
| folder.name = name | |
| folder.emoji = emoji | |
| await session.commit() | |
| return folder | |
| async def delete_folder(session: AsyncSession, folder_id: int) -> bool: | |
| """Delete a folder and all its children/items (CASCADE handles it in DB).""" | |
| result = await session.execute(delete(Folder).where(Folder.id == folder_id)) | |
| await session.commit() | |
| return result.rowcount > 0 | |
| # βββββββββββββββββββββββββββ ITEM CRUD βββββββββββββββββββββββββββ | |
| async def get_folder_items(session: AsyncSession, folder_id: int) -> list[Item]: | |
| result = await session.execute( | |
| select(Item).where(Item.folder_id == folder_id).order_by(Item.created_at) | |
| ) | |
| return result.scalars().all() | |
| async def get_item(session: AsyncSession, item_id: int) -> Item | None: | |
| return await session.get(Item, item_id) | |
| async def create_item( | |
| session: AsyncSession, folder_id: int, title: str, content_type: str, | |
| created_by: int, file_id: str | None = None, text_content: str | None = None | |
| ) -> Item: | |
| item = Item( | |
| folder_id=folder_id, | |
| title=title, | |
| content_type=content_type, | |
| created_by=created_by, | |
| file_id=file_id, | |
| text_content=text_content | |
| ) | |
| session.add(item) | |
| await session.commit() | |
| await session.refresh(item) | |
| return item | |
| async def update_item( | |
| session: AsyncSession, item_id: int, title: str, | |
| content_type: str | None = None, file_id: str | None = None, | |
| text_content: str | None = None | |
| ) -> Item | None: | |
| item = await session.get(Item, item_id) | |
| if item: | |
| item.title = title | |
| if content_type: | |
| item.content_type = content_type | |
| if file_id is not None: | |
| item.file_id = file_id | |
| if text_content is not None: | |
| item.text_content = text_content | |
| await session.commit() | |
| return item | |
| async def delete_item(session: AsyncSession, item_id: int) -> bool: | |
| result = await session.execute(delete(Item).where(Item.id == item_id)) | |
| await session.commit() | |
| return result.rowcount > 0 | |
| # βββββββββββββββββββββββββββ DASHBOARD STATS βββββββββββββββββββββββββββ | |
| async def get_admin_dashboard_stats(session: AsyncSession) -> dict: | |
| """Aggregate stats for the Admin dashboard.""" | |
| total_users = await session.scalar(select(func.count(User.id))) | |
| total_folders = await session.scalar(select(func.count(Folder.id))) | |
| total_items = await session.scalar(select(func.count(Item.id))) | |
| # Active users in last 7 days | |
| week_ago = datetime.utcnow() - timedelta(days=7) | |
| active_users = await session.scalar( | |
| select(func.count(func.distinct(UserLog.user_id))) | |
| .where(UserLog.timestamp >= week_ago) | |
| ) | |
| # Recent user logs | |
| recent_logs = await session.execute( | |
| select(UserLog, User) | |
| .join(User, UserLog.user_id == User.id) | |
| .order_by(desc(UserLog.timestamp)) | |
| .limit(5) | |
| ) | |
| recent_activity = [ | |
| {"user": r.User.full_name, "action": r.UserLog.action, | |
| "time": r.UserLog.timestamp.strftime("%Y-%m-%d %H:%M")} | |
| for r in recent_logs | |
| ] | |
| return { | |
| "total_users": total_users or 0, | |
| "total_folders": total_folders or 0, | |
| "total_items": total_items or 0, | |
| "active_users_7d": active_users or 0, | |
| "recent_activity": recent_activity, | |
| } | |
| async def get_owner_dashboard_stats(session: AsyncSession) -> dict: | |
| """Aggregate stats for the Owner dashboard (includes admin audit logs).""" | |
| base_stats = await get_admin_dashboard_stats(session) | |
| total_admins = await session.scalar(select(func.count(Admin.id))) | |
| # Recent admin actions | |
| recent_admin_logs = await session.execute( | |
| select(AdminLog).order_by(desc(AdminLog.timestamp)).limit(10) | |
| ) | |
| logs = recent_admin_logs.scalars().all() | |
| admin_activity = [ | |
| {"admin_id": log.admin_user_id, "action": log.action, | |
| "time": log.timestamp.strftime("%Y-%m-%d %H:%M")} | |
| for log in logs | |
| ] | |
| return { | |
| **base_stats, | |
| "total_admins": total_admins or 0, | |
| "admin_activity": admin_activity, | |
| } |