apigateway / services /db_service /select_query.py
jebin2's picture
db services
50c20bf
"""
SelectQuery - Read operations with access control.
Inherits from BaseQuery for shared filtering logic.
"""
import logging
from typing import TypeVar, Optional, List, Type
from fastapi import HTTPException, status as http_status
from sqlalchemy import Select, select, func
from services.db_service.base_query import BaseQuery
logger = logging.getLogger(__name__)
T = TypeVar('T')
class SelectQuery(BaseQuery):
"""
Handles SELECT operations with automatic filtering.
Inherits filtering logic from BaseQuery:
- User filtering (_apply_user_filter)
- Deleted record filtering (_filter_deleted)
- Admin checks (_check_admin, _is_admin)
"""
async def execute(self, query: Select) -> List[T]:
"""
Execute a query with automatic filtering.
Filtering is applied automatically based on:
- Model type (USER_SCOPED, ADMIN_ONLY)
- User's admin status
- Deleted records (always excluded)
Returns:
List of results
"""
query = self._apply_user_filter(query)
query = self._filter_deleted(query)
result = await self.db.execute(query)
return result.scalars().all()
async def execute_one(self, query: Select) -> Optional[T]:
"""
Execute a query expecting a single result.
Automatic filtering applied.
Returns:
Single result or None
"""
query = self._apply_user_filter(query)
query = self._filter_deleted(query)
result = await self.db.execute(query)
return result.scalar_one_or_none()
async def count(self, query: Select) -> int:
"""
Count query results with automatic filtering.
Returns:
Count of results
"""
query = self._apply_user_filter(query)
query = self._filter_deleted(query)
# Convert to count query
count_query = select(func.count()).select_from(query.alias())
result = await self.db.execute(count_query)
return result.scalar() or 0
async def count_deleted(self, model_class: Type[T]) -> int:
"""
Count soft-deleted records for a model.
Only admins can access this.
Returns:
Count of deleted records
Raises:
HTTPException: 403 if non-admin tries to access
"""
if not self.is_admin:
raise HTTPException(
status_code=http_status.HTTP_403_FORBIDDEN,
detail="Only administrators can view deleted records"
)
delete_col = getattr(model_class, self._config.soft_delete_column)
query = select(func.count()).select_from(model_class).where(
delete_col != None
)
result = await self.db.execute(query)
return result.scalar() or 0