Spaces:
Sleeping
Sleeping
| """ | |
| SelectQuery - Read operations with access control. | |
| Inherits from BaseQuery for shared filtering logic. | |
| """ | |
| import logging | |
| from typing import TypeVar, Optional, List, Type | |
| from fastapi import HTTPException, status as http_status | |
| from sqlalchemy import Select, select, func | |
| from services.db_service.base_query import BaseQuery | |
| logger = logging.getLogger(__name__) | |
| T = TypeVar('T') | |
| class SelectQuery(BaseQuery): | |
| """ | |
| Handles SELECT operations with automatic filtering. | |
| Inherits filtering logic from BaseQuery: | |
| - User filtering (_apply_user_filter) | |
| - Deleted record filtering (_filter_deleted) | |
| - Admin checks (_check_admin, _is_admin) | |
| """ | |
| async def execute(self, query: Select) -> List[T]: | |
| """ | |
| Execute a query with automatic filtering. | |
| Filtering is applied automatically based on: | |
| - Model type (USER_SCOPED, ADMIN_ONLY) | |
| - User's admin status | |
| - Deleted records (always excluded) | |
| Returns: | |
| List of results | |
| """ | |
| query = self._apply_user_filter(query) | |
| query = self._filter_deleted(query) | |
| result = await self.db.execute(query) | |
| return result.scalars().all() | |
| async def execute_one(self, query: Select) -> Optional[T]: | |
| """ | |
| Execute a query expecting a single result. | |
| Automatic filtering applied. | |
| Returns: | |
| Single result or None | |
| """ | |
| query = self._apply_user_filter(query) | |
| query = self._filter_deleted(query) | |
| result = await self.db.execute(query) | |
| return result.scalar_one_or_none() | |
| async def count(self, query: Select) -> int: | |
| """ | |
| Count query results with automatic filtering. | |
| Returns: | |
| Count of results | |
| """ | |
| query = self._apply_user_filter(query) | |
| query = self._filter_deleted(query) | |
| # Convert to count query | |
| count_query = select(func.count()).select_from(query.alias()) | |
| result = await self.db.execute(count_query) | |
| return result.scalar() or 0 | |
| async def count_deleted(self, model_class: Type[T]) -> int: | |
| """ | |
| Count soft-deleted records for a model. | |
| Only admins can access this. | |
| Returns: | |
| Count of deleted records | |
| Raises: | |
| HTTPException: 403 if non-admin tries to access | |
| """ | |
| if not self.is_admin: | |
| raise HTTPException( | |
| status_code=http_status.HTTP_403_FORBIDDEN, | |
| detail="Only administrators can view deleted records" | |
| ) | |
| delete_col = getattr(model_class, self._config.soft_delete_column) | |
| query = select(func.count()).select_from(model_class).where( | |
| delete_col != None | |
| ) | |
| result = await self.db.execute(query) | |
| return result.scalar() or 0 | |