""" BaseQuery - Base class for all query types with shared filtering logic. Uses DBServiceConfig for plug-and-play configuration of models and columns. """ import os import logging from typing import Type from fastapi import HTTPException, status as http_status from sqlalchemy import Select from sqlalchemy.ext.asyncio import AsyncSession from core.models import User from services.db_service.config import DBServiceConfig logger = logging.getLogger(__name__) class BaseQuery: """ Base class for all query operations. Uses DBServiceConfig for model scopes and column names. Configuration must be registered at application startup. """ def __init__(self, user: User, db: AsyncSession, is_system: bool = False): """Initialize base query with user and database session.""" # Ensure config is registered DBServiceConfig.assert_registered() self.user = user self.db = db self._is_system = is_system self._config = DBServiceConfig @property def is_admin(self) -> bool: """Check if current user is an admin.""" admin_emails_str = os.getenv("ADMIN_EMAILS", "") if not admin_emails_str: return False admin_emails = [email.strip() for email in admin_emails_str.split(",")] is_admin = self.user.email in admin_emails if is_admin: logger.info(f"Admin access granted for {self.user.email}") return is_admin def _verify_operation_access(self, model_class: Type, operation: str) -> None: """ Check if user has permission for this operation on this model. Permission hierarchy: SYSTEM > ADMIN > USER Uses DBServiceConfig for scope checking. """ # System operations have highest priority if self._is_system: logger.info(f"System operation: {operation} on {model_class.__name__}") return # Admins can do anything if self.is_admin: return # Map operation to config scope sets admin_only_sets = { 'read': self._config.admin_read_only, 'create': self._config.admin_create_only, 'update': self._config.admin_update_only, 'delete': self._config.admin_delete_only, } user_allowed_sets = { 'read': self._config.user_read_scoped, 'create': self._config.user_create_scoped, 'update': self._config.user_update_scoped, 'delete': self._config.user_delete_scoped, } system_only_sets = { 'read': self._config.system_read_scoped, 'create': self._config.system_create_scoped, 'update': self._config.system_update_scoped, 'delete': self._config.system_delete_scoped, } admin_only = admin_only_sets.get(operation, set()) user_allowed = user_allowed_sets.get(operation, set()) system_only = system_only_sets.get(operation, set()) # Check if this is system-only operation if model_class in system_only and model_class not in user_allowed and model_class not in admin_only: raise HTTPException( status_code=http_status.HTTP_403_FORBIDDEN, detail=f"Only system processes can {operation} {model_class.__name__}" ) # Check if admin-only if model_class in admin_only: raise HTTPException( status_code=http_status.HTTP_403_FORBIDDEN, detail=f"Only administrators can {operation} {model_class.__name__}" ) # Check if user is allowed if model_class not in user_allowed: raise HTTPException( status_code=http_status.HTTP_403_FORBIDDEN, detail=f"You do not have permission to {operation} {model_class.__name__}" ) def _apply_ownership_filter(self, stmt, model_class: Type, operation: str): """ Shared method to apply ownership filter for UPDATE/DELETE operations. Uses DBServiceConfig.user_filter_column for filtering. """ # Admins can modify all records if self.is_admin: logger.info(f"Admin {self.user.email} {operation}ing {model_class.__name__} records") return stmt # Non-admins can only modify their own records filter_column = self._config.user_filter_column if hasattr(model_class, filter_column): user_id_col = getattr(model_class, filter_column) stmt = stmt.where(user_id_col == self.user.id) logger.info(f"User {self.user.email} {operation}ing own {model_class.__name__} records") return stmt def _verify_admin_access(self, query: Select) -> None: """ Check if query is for admin-only models (READ operation). Uses DBServiceConfig.admin_read_only. """ if self.is_admin: return # Check if querying admin-only models if hasattr(query, 'column_descriptions'): for description in query.column_descriptions: entity = description.get('entity') or description.get('type') if entity in self._config.admin_read_only: raise HTTPException( status_code=http_status.HTTP_403_FORBIDDEN, detail=f"Only administrators can read {entity.__name__}" ) # Check froms for admin-only models for from_clause in query.froms: table_class = from_clause.entity_namespace.get('__class__') if table_class in self._config.admin_read_only: raise HTTPException( status_code=http_status.HTTP_403_FORBIDDEN, detail=f"Only administrators can read {table_class.__name__}" ) def _apply_user_filter(self, query: Select) -> Select: """ Automatically apply user_id filter to READ queries. Uses DBServiceConfig for model scopes and filter column. """ # First check if this is an admin-only query self._verify_admin_access(query) # Admins see all data if self.is_admin: logger.debug(f"Admin query - no user filter applied") return query # Get filter column from config filter_column = self._config.user_filter_column special_user_model = self._config.special_user_model user_id_column = self._config.user_id_column # Detect which model is being queried if hasattr(query, 'column_descriptions'): for description in query.column_descriptions: entity = description.get('entity') or description.get('type') if entity in self._config.user_read_scoped: logger.debug(f"Applying user filter to {entity.__name__} query") # Special handling for User model (uses id instead of user_id) if entity == special_user_model: user_col = getattr(entity, user_id_column) return query.where(user_col == self.user.id) # Standard user_id filtering if hasattr(entity, filter_column): user_col = getattr(entity, filter_column) return query.where(user_col == self.user.id) # Check froms for from_clause in query.froms: table_class = from_clause.entity_namespace.get('__class__') if table_class in self._config.user_read_scoped: logger.debug(f"Applying user filter to {table_class.__name__} query") # Special handling for User model if table_class == special_user_model: user_col = getattr(table_class, user_id_column) return query.where(user_col == self.user.id) # Standard user_id filtering if hasattr(table_class, filter_column): user_col = getattr(table_class, filter_column) return query.where(user_col == self.user.id) return query def _filter_deleted(self, query: Select) -> Select: """ Add filter to exclude soft-deleted records. Uses DBServiceConfig.soft_delete_column. """ delete_column = self._config.soft_delete_column # Detect which model is being queried if hasattr(query, 'column_descriptions'): for description in query.column_descriptions: entity = description.get('entity') or description.get('type') if entity and hasattr(entity, delete_column): deleted_at_col = getattr(entity, delete_column) return query.where(deleted_at_col == None) # Check froms for from_clause in query.froms: table_class = from_clause.entity_namespace.get('__class__') if table_class and hasattr(table_class, delete_column): deleted_at_col = getattr(table_class, delete_column) return query.where(deleted_at_col == None) return query