Spaces:
Sleeping
Sleeping
File size: 9,484 Bytes
50c20bf |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 |
"""
BaseQuery - Base class for all query types with shared filtering logic.
Uses DBServiceConfig for plug-and-play configuration of models and columns.
"""
import os
import logging
from typing import Type
from fastapi import HTTPException, status as http_status
from sqlalchemy import Select
from sqlalchemy.ext.asyncio import AsyncSession
from core.models import User
from services.db_service.config import DBServiceConfig
logger = logging.getLogger(__name__)
class BaseQuery:
"""
Base class for all query operations.
Uses DBServiceConfig for model scopes and column names.
Configuration must be registered at application startup.
"""
def __init__(self, user: User, db: AsyncSession, is_system: bool = False):
"""Initialize base query with user and database session."""
# Ensure config is registered
DBServiceConfig.assert_registered()
self.user = user
self.db = db
self._is_system = is_system
self._config = DBServiceConfig
@property
def is_admin(self) -> bool:
"""Check if current user is an admin."""
admin_emails_str = os.getenv("ADMIN_EMAILS", "")
if not admin_emails_str:
return False
admin_emails = [email.strip() for email in admin_emails_str.split(",")]
is_admin = self.user.email in admin_emails
if is_admin:
logger.info(f"Admin access granted for {self.user.email}")
return is_admin
def _verify_operation_access(self, model_class: Type, operation: str) -> None:
"""
Check if user has permission for this operation on this model.
Permission hierarchy: SYSTEM > ADMIN > USER
Uses DBServiceConfig for scope checking.
"""
# System operations have highest priority
if self._is_system:
logger.info(f"System operation: {operation} on {model_class.__name__}")
return
# Admins can do anything
if self.is_admin:
return
# Map operation to config scope sets
admin_only_sets = {
'read': self._config.admin_read_only,
'create': self._config.admin_create_only,
'update': self._config.admin_update_only,
'delete': self._config.admin_delete_only,
}
user_allowed_sets = {
'read': self._config.user_read_scoped,
'create': self._config.user_create_scoped,
'update': self._config.user_update_scoped,
'delete': self._config.user_delete_scoped,
}
system_only_sets = {
'read': self._config.system_read_scoped,
'create': self._config.system_create_scoped,
'update': self._config.system_update_scoped,
'delete': self._config.system_delete_scoped,
}
admin_only = admin_only_sets.get(operation, set())
user_allowed = user_allowed_sets.get(operation, set())
system_only = system_only_sets.get(operation, set())
# Check if this is system-only operation
if model_class in system_only and model_class not in user_allowed and model_class not in admin_only:
raise HTTPException(
status_code=http_status.HTTP_403_FORBIDDEN,
detail=f"Only system processes can {operation} {model_class.__name__}"
)
# Check if admin-only
if model_class in admin_only:
raise HTTPException(
status_code=http_status.HTTP_403_FORBIDDEN,
detail=f"Only administrators can {operation} {model_class.__name__}"
)
# Check if user is allowed
if model_class not in user_allowed:
raise HTTPException(
status_code=http_status.HTTP_403_FORBIDDEN,
detail=f"You do not have permission to {operation} {model_class.__name__}"
)
def _apply_ownership_filter(self, stmt, model_class: Type, operation: str):
"""
Shared method to apply ownership filter for UPDATE/DELETE operations.
Uses DBServiceConfig.user_filter_column for filtering.
"""
# Admins can modify all records
if self.is_admin:
logger.info(f"Admin {self.user.email} {operation}ing {model_class.__name__} records")
return stmt
# Non-admins can only modify their own records
filter_column = self._config.user_filter_column
if hasattr(model_class, filter_column):
user_id_col = getattr(model_class, filter_column)
stmt = stmt.where(user_id_col == self.user.id)
logger.info(f"User {self.user.email} {operation}ing own {model_class.__name__} records")
return stmt
def _verify_admin_access(self, query: Select) -> None:
"""
Check if query is for admin-only models (READ operation).
Uses DBServiceConfig.admin_read_only.
"""
if self.is_admin:
return
# Check if querying admin-only models
if hasattr(query, 'column_descriptions'):
for description in query.column_descriptions:
entity = description.get('entity') or description.get('type')
if entity in self._config.admin_read_only:
raise HTTPException(
status_code=http_status.HTTP_403_FORBIDDEN,
detail=f"Only administrators can read {entity.__name__}"
)
# Check froms for admin-only models
for from_clause in query.froms:
table_class = from_clause.entity_namespace.get('__class__')
if table_class in self._config.admin_read_only:
raise HTTPException(
status_code=http_status.HTTP_403_FORBIDDEN,
detail=f"Only administrators can read {table_class.__name__}"
)
def _apply_user_filter(self, query: Select) -> Select:
"""
Automatically apply user_id filter to READ queries.
Uses DBServiceConfig for model scopes and filter column.
"""
# First check if this is an admin-only query
self._verify_admin_access(query)
# Admins see all data
if self.is_admin:
logger.debug(f"Admin query - no user filter applied")
return query
# Get filter column from config
filter_column = self._config.user_filter_column
special_user_model = self._config.special_user_model
user_id_column = self._config.user_id_column
# Detect which model is being queried
if hasattr(query, 'column_descriptions'):
for description in query.column_descriptions:
entity = description.get('entity') or description.get('type')
if entity in self._config.user_read_scoped:
logger.debug(f"Applying user filter to {entity.__name__} query")
# Special handling for User model (uses id instead of user_id)
if entity == special_user_model:
user_col = getattr(entity, user_id_column)
return query.where(user_col == self.user.id)
# Standard user_id filtering
if hasattr(entity, filter_column):
user_col = getattr(entity, filter_column)
return query.where(user_col == self.user.id)
# Check froms
for from_clause in query.froms:
table_class = from_clause.entity_namespace.get('__class__')
if table_class in self._config.user_read_scoped:
logger.debug(f"Applying user filter to {table_class.__name__} query")
# Special handling for User model
if table_class == special_user_model:
user_col = getattr(table_class, user_id_column)
return query.where(user_col == self.user.id)
# Standard user_id filtering
if hasattr(table_class, filter_column):
user_col = getattr(table_class, filter_column)
return query.where(user_col == self.user.id)
return query
def _filter_deleted(self, query: Select) -> Select:
"""
Add filter to exclude soft-deleted records.
Uses DBServiceConfig.soft_delete_column.
"""
delete_column = self._config.soft_delete_column
# Detect which model is being queried
if hasattr(query, 'column_descriptions'):
for description in query.column_descriptions:
entity = description.get('entity') or description.get('type')
if entity and hasattr(entity, delete_column):
deleted_at_col = getattr(entity, delete_column)
return query.where(deleted_at_col == None)
# Check froms
for from_clause in query.froms:
table_class = from_clause.entity_namespace.get('__class__')
if table_class and hasattr(table_class, delete_column):
deleted_at_col = getattr(table_class, delete_column)
return query.where(deleted_at_col == None)
return query
|