File size: 9,484 Bytes
50c20bf
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
"""
BaseQuery - Base class for all query types with shared filtering logic.

Uses DBServiceConfig for plug-and-play configuration of models and columns.
"""

import os
import logging
from typing import Type
from fastapi import HTTPException, status as http_status
from sqlalchemy import Select
from sqlalchemy.ext.asyncio import AsyncSession

from core.models import User
from services.db_service.config import DBServiceConfig

logger = logging.getLogger(__name__)


class BaseQuery:
    """
    Base class for all query operations.
    
    Uses DBServiceConfig for model scopes and column names.
    Configuration must be registered at application startup.
    """
    
    def __init__(self, user: User, db: AsyncSession, is_system: bool = False):
        """Initialize base query with user and database session."""
        # Ensure config is registered
        DBServiceConfig.assert_registered()
        
        self.user = user
        self.db = db
        self._is_system = is_system
        self._config = DBServiceConfig
    
    @property
    def is_admin(self) -> bool:
        """Check if current user is an admin."""
        admin_emails_str = os.getenv("ADMIN_EMAILS", "")
        if not admin_emails_str:
            return False
        
        admin_emails = [email.strip() for email in admin_emails_str.split(",")]
        is_admin = self.user.email in admin_emails
        
        if is_admin:
            logger.info(f"Admin access granted for {self.user.email}")
        
        return is_admin
    
    def _verify_operation_access(self, model_class: Type, operation: str) -> None:
        """
        Check if user has permission for this operation on this model.
        
        Permission hierarchy: SYSTEM > ADMIN > USER
        Uses DBServiceConfig for scope checking.
        """
        # System operations have highest priority
        if self._is_system:
            logger.info(f"System operation: {operation} on {model_class.__name__}")
            return
        
        # Admins can do anything
        if self.is_admin:
            return
        
        # Map operation to config scope sets
        admin_only_sets = {
            'read': self._config.admin_read_only,
            'create': self._config.admin_create_only,
            'update': self._config.admin_update_only,
            'delete': self._config.admin_delete_only,
        }
        
        user_allowed_sets = {
            'read': self._config.user_read_scoped,
            'create': self._config.user_create_scoped,
            'update': self._config.user_update_scoped,
            'delete': self._config.user_delete_scoped,
        }
        
        system_only_sets = {
            'read': self._config.system_read_scoped,
            'create': self._config.system_create_scoped,
            'update': self._config.system_update_scoped,
            'delete': self._config.system_delete_scoped,
        }
        
        admin_only = admin_only_sets.get(operation, set())
        user_allowed = user_allowed_sets.get(operation, set())
        system_only = system_only_sets.get(operation, set())
        
        # Check if this is system-only operation
        if model_class in system_only and model_class not in user_allowed and model_class not in admin_only:
            raise HTTPException(
                status_code=http_status.HTTP_403_FORBIDDEN,
                detail=f"Only system processes can {operation} {model_class.__name__}"
            )
        
        # Check if admin-only
        if model_class in admin_only:
            raise HTTPException(
                status_code=http_status.HTTP_403_FORBIDDEN,
                detail=f"Only administrators can {operation} {model_class.__name__}"
            )
        
        # Check if user is allowed
        if model_class not in user_allowed:
            raise HTTPException(
                status_code=http_status.HTTP_403_FORBIDDEN,
                detail=f"You do not have permission to {operation} {model_class.__name__}"
            )
    
    def _apply_ownership_filter(self, stmt, model_class: Type, operation: str):
        """
        Shared method to apply ownership filter for UPDATE/DELETE operations.
        Uses DBServiceConfig.user_filter_column for filtering.
        """
        # Admins can modify all records
        if self.is_admin:
            logger.info(f"Admin {self.user.email} {operation}ing {model_class.__name__} records")
            return stmt
        
        # Non-admins can only modify their own records
        filter_column = self._config.user_filter_column
        if hasattr(model_class, filter_column):
            user_id_col = getattr(model_class, filter_column)
            stmt = stmt.where(user_id_col == self.user.id)
            logger.info(f"User {self.user.email} {operation}ing own {model_class.__name__} records")
        
        return stmt
    
    def _verify_admin_access(self, query: Select) -> None:
        """
        Check if query is for admin-only models (READ operation).
        Uses DBServiceConfig.admin_read_only.
        """
        if self.is_admin:
            return
        
        # Check if querying admin-only models
        if hasattr(query, 'column_descriptions'):
            for description in query.column_descriptions:
                entity = description.get('entity') or description.get('type')
                if entity in self._config.admin_read_only:
                    raise HTTPException(
                        status_code=http_status.HTTP_403_FORBIDDEN,
                        detail=f"Only administrators can read {entity.__name__}"
                    )
        
        # Check froms for admin-only models
        for from_clause in query.froms:
            table_class = from_clause.entity_namespace.get('__class__')
            if table_class in self._config.admin_read_only:
                raise HTTPException(
                    status_code=http_status.HTTP_403_FORBIDDEN,
                    detail=f"Only administrators can read {table_class.__name__}"
                )
    
    def _apply_user_filter(self, query: Select) -> Select:
        """
        Automatically apply user_id filter to READ queries.
        Uses DBServiceConfig for model scopes and filter column.
        """
        # First check if this is an admin-only query
        self._verify_admin_access(query)
        
        # Admins see all data
        if self.is_admin:
            logger.debug(f"Admin query - no user filter applied")
            return query
        
        # Get filter column from config
        filter_column = self._config.user_filter_column
        special_user_model = self._config.special_user_model
        user_id_column = self._config.user_id_column
        
        # Detect which model is being queried
        if hasattr(query, 'column_descriptions'):
            for description in query.column_descriptions:
                entity = description.get('entity') or description.get('type')
                if entity in self._config.user_read_scoped:
                    logger.debug(f"Applying user filter to {entity.__name__} query")
                    # Special handling for User model (uses id instead of user_id)
                    if entity == special_user_model:
                        user_col = getattr(entity, user_id_column)
                        return query.where(user_col == self.user.id)
                    # Standard user_id filtering
                    if hasattr(entity, filter_column):
                        user_col = getattr(entity, filter_column)
                        return query.where(user_col == self.user.id)
        
        # Check froms
        for from_clause in query.froms:
            table_class = from_clause.entity_namespace.get('__class__')
            if table_class in self._config.user_read_scoped:
                logger.debug(f"Applying user filter to {table_class.__name__} query")
                # Special handling for User model
                if table_class == special_user_model:
                    user_col = getattr(table_class, user_id_column)
                    return query.where(user_col == self.user.id)
                # Standard user_id filtering
                if hasattr(table_class, filter_column):
                    user_col = getattr(table_class, filter_column)
                    return query.where(user_col == self.user.id)
        
        return query
    
    def _filter_deleted(self, query: Select) -> Select:
        """
        Add filter to exclude soft-deleted records.
        Uses DBServiceConfig.soft_delete_column.
        """
        delete_column = self._config.soft_delete_column
        
        # Detect which model is being queried
        if hasattr(query, 'column_descriptions'):
            for description in query.column_descriptions:
                entity = description.get('entity') or description.get('type')
                if entity and hasattr(entity, delete_column):
                    deleted_at_col = getattr(entity, delete_column)
                    return query.where(deleted_at_col == None)
        
        # Check froms
        for from_clause in query.froms:
            table_class = from_clause.entity_namespace.get('__class__')
            if table_class and hasattr(table_class, delete_column):
                deleted_at_col = getattr(table_class, delete_column)
                return query.where(deleted_at_col == None)
        
        return query