File size: 9,251 Bytes
238cf71
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
"""
Authentication and authorization dependencies for Silver Table Assistant.
Provides JWT verification and user role management using Supabase Auth.
"""

import os
import jwt
from typing import Optional, Dict, Any, Callable
from fastapi import Depends, HTTPException, status, Request
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from supabase import create_client, Client
from sqlalchemy.ext.asyncio import AsyncSession

from database import get_session
from models import Profile


# Initialize Supabase client
SUPABASE_URL = os.getenv("SUPABASE_URL")
SUPABASE_SERVICE_ROLE_KEY = os.getenv("SUPABASE_SERVICE_ROLE_KEY")

if not SUPABASE_URL or not SUPABASE_SERVICE_ROLE_KEY:
    raise ValueError("SUPABASE_URL and SUPABASE_SERVICE_ROLE_KEY environment variables are required")

supabase: Client = create_client(SUPABASE_URL, SUPABASE_SERVICE_ROLE_KEY)

# Security scheme
security = HTTPBearer()


class AuthenticationError(HTTPException):
    """Custom exception for authentication failures."""
    
    def __init__(self, detail: str = "Could not validate credentials"):
        super().__init__(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail=detail,
            headers={"WWW-Authenticate": "Bearer"},
        )


class AuthorizationError(HTTPException):
    """Custom exception for authorization failures."""
    
    def __init__(self, detail: str = "Not enough permissions"):
        super().__init__(
            status_code=status.HTTP_403_FORBIDDEN,
            detail=detail,
        )


class User:
    """User class representing an authenticated user."""
    
    def __init__(
        self,
        user_id: str,
        email: Optional[str] = None,
        role: Optional[str] = None,
        metadata: Optional[Dict[str, Any]] = None,
        raw_user: Optional[Dict[str, Any]] = None
    ):
        self.user_id = user_id
        self.email = email
        self.role = role or "family" # Default to family
        self.metadata = metadata or {}
        self.raw_user = raw_user or {}


async def verify_jwt_token(token: str) -> Dict[str, Any]:
    """
    Verify JWT token using Supabase Auth.
    
    Args:
        token: JWT token to verify
        
    Returns:
        Decoded user information
        
    Raises:
        AuthenticationError: If token is invalid
    """
    try:
        # Verify the token with Supabase
        response = supabase.auth.get_user(token)
        
        if response.user is None:
            raise AuthenticationError("Invalid token")
            
        return {
            "user_id": response.user.id,
            "email": response.user.email,
            "role": response.user.user_metadata.get("role", "user"),
            "metadata": response.user.user_metadata,
            "raw_user": response.user.__dict__
        }
    except Exception as e:
        raise AuthenticationError(f"Token verification failed: {str(e)}")


async def get_current_user(
    credentials: HTTPAuthorizationCredentials = Depends(security)
) -> User:
    """
    Get current authenticated user from JWT token.
    
    Args:
        credentials: HTTP authorization credentials
        
    Returns:
        User object with authentication information
        
    Raises:
        AuthenticationError: If authentication fails
    """
    if not credentials:
        raise AuthenticationError("No credentials provided")
        
    try:
        # Extract token from Bearer scheme
        token = credentials.credentials
        
        # Verify token and get user info
        user_data = await verify_jwt_token(token)
        
        return User(
            user_id=user_data["user_id"],
            email=user_data["email"],
            role=user_data["role"],
            metadata=user_data["metadata"],
            raw_user=user_data["raw_user"]
        )
    except AuthenticationError:
        raise
    except Exception as e:
        raise AuthenticationError(f"Authentication failed: {str(e)}")


async def get_optional_user(
    request: Request
) -> Optional[User]:
    """
    Get current user if authenticated, otherwise return None.
    
    Args:
        request: FastAPI request object
        
    Returns:
        User object or None
    """
    try:
        # Extract token from Authorization header if present
        auth_header = request.headers.get("Authorization")
        if not auth_header or not auth_header.startswith("Bearer "):
            return None
            
        token = auth_header.split(" ", 1)[1]
        credentials = HTTPAuthorizationCredentials(scheme="Bearer", credentials=token)
        
        return await get_current_user(credentials)
    except AuthenticationError:
        return None
    except Exception:
        return None


def require_role(required_role: str) -> Callable:
    """
    Dependency factory to require specific user role.
    
    Args:
        required_role: Required user role
        
    Returns:
        Dependency function
    """
    async def role_checker(user: User = Depends(get_current_user)) -> User:
        if user.role != required_role and user.role != "admin":
            raise AuthorizationError(
                f"Required role: {required_role}, your role: {user.role}"
            )
        return user
    return role_checker


def require_roles(allowed_roles: list[str]) -> Callable:
    """
    Dependency factory to require one of multiple user roles.
    
    Args:
        allowed_roles: List of allowed user roles
        
    Returns:
        Dependency function
    """
    async def roles_checker(user: User = Depends(get_current_user)) -> User:
        if user.role not in allowed_roles and user.role != "admin":
            raise AuthorizationError(
                f"Required roles: {allowed_roles}, your role: {user.role}"
            )
        return user
    return roles_checker


async def get_user_profile(
    user: User = Depends(get_current_user),
    db: AsyncSession = Depends(get_session)
) -> Optional[Profile]:
    """
    Get user's first profile from database (legacy/helper).
    """
    try:
        from sqlmodel import select
        from uuid import UUID
        user_id_uuid = UUID(user.user_id)
        result = await db.execute(
            select(Profile).where(Profile.user_id == user_id_uuid)
        )
        profile = result.scalars().first()
        return profile
    except Exception as e:
        raise HTTPException(
            status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
            detail=f"Failed to fetch user profile: {str(e)}"
        )


async def get_or_create_user_profile(
    user: User = Depends(get_current_user),
    db: AsyncSession = Depends(get_session)
) -> Profile:
    """
    Get existing user profile or create a generic one.
    """
    profile = await get_user_profile(user, db)
    
    if profile is None:
        from models import Profile
        from uuid import UUID, uuid4
        
        user_id_uuid = UUID(user.user_id)
        
        # Create new profile with default values matching new schema
        profile = Profile(
            id=uuid4(),
            user_id=user_id_uuid,
            name=user.email.split('@')[0] if user.email else "User",
            age=70,
            gender="male",
            height=165.0,
            weight=60.0,
            chronic_diseases=[],
            dietary_restrictions=[],
            chewing_ability="normal"
        )
        
        db.add(profile)
        await db.commit()
        await db.refresh(profile)
    
    return profile


def get_user_metadata(user: User = Depends(get_current_user)) -> Dict[str, Any]:
    """
    Get user metadata for use in AI prompts and recommendations.
    
    Args:
        user: Authenticated user
        
    Returns:
        User metadata dictionary
    """
    return {
        "user_id": user.user_id,
        "email": user.email,
        "role": user.role,
        **user.metadata
    }


def get_admin_user(
    admin_only: User = Depends(require_role("admin"))
) -> User:
    """
    Dependency for admin-only endpoints.
    
    Args:
        admin_only: User with admin role
        
    Returns:
        Admin user
    """
    return admin_only


# Utility functions for role-based access control

def is_admin(user: User) -> bool:
    """Check if user is an admin."""
    return user.role == "admin"


def is_staff(user: User) -> bool:
    """Check if user is staff or admin."""
    return user.role in ["staff", "admin"]


def can_access_health_data(user: User) -> bool:
    """Check if user can access health-related data."""
    return user.role in ["admin", "staff", "user"]


def can_manage_orders(user: User) -> bool:
    """Check if user can manage orders."""
    return user.role in ["admin", "staff"]


def can_view_analytics(user: User) -> bool:
    """Check if user can view analytics."""
    return user.role == "admin"


# CORS and security utilities

def get_allowed_origins() -> list[str]:
    """Get allowed CORS origins from environment."""
    from config import settings
    return settings.cors_origins


def get_api_version() -> str:
    """Get API version from environment or default."""
    from config import settings
    return settings.api_version