File size: 8,115 Bytes
53bec59
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
"""

Authentication and Authorization

JWT tokens, API keys, password hashing

"""

import secrets
from datetime import datetime, timedelta
from typing import Optional, Union
from jose import JWTError, jwt
from passlib.context import CryptContext
from fastapi import Depends, HTTPException, status, Security
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials, APIKeyHeader
from sqlalchemy.orm import Session

from src.core.config import settings
from src.core.exceptions import AuthenticationError, AuthorizationError
from src.db.models import User, APIKey, get_db


# Password hashing
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")

# Security schemes
bearer_scheme = HTTPBearer()
api_key_header = APIKeyHeader(name="X-API-Key", auto_error=False)


def verify_password(plain_password: str, hashed_password: str) -> bool:
    """Verify password against hash"""
    return pwd_context.verify(plain_password, hashed_password)


def get_password_hash(password: str) -> str:
    """Generate password hash"""
    return pwd_context.hash(password)


def create_access_token(

    data: dict,

    expires_delta: Optional[timedelta] = None

) -> str:
    """Create JWT access token"""
    to_encode = data.copy()
    
    if expires_delta:
        expire = datetime.utcnow() + expires_delta
    else:
        expire = datetime.utcnow() + timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES)
    
    to_encode.update({"exp": expire})
    encoded_jwt = jwt.encode(to_encode, settings.SECRET_KEY, algorithm=settings.ALGORITHM)
    return encoded_jwt


def create_refresh_token(

    data: dict,

    expires_delta: Optional[timedelta] = None

) -> str:
    """Create JWT refresh token"""
    to_encode = data.copy()
    
    if expires_delta:
        expire = datetime.utcnow() + expires_delta
    else:
        expire = datetime.utcnow() + timedelta(days=settings.REFRESH_TOKEN_EXPIRE_DAYS)
    
    to_encode.update({"exp": expire, "type": "refresh"})
    encoded_jwt = jwt.encode(to_encode, settings.SECRET_KEY, algorithm=settings.ALGORITHM)
    return encoded_jwt


def decode_token(token: str) -> dict:
    """Decode and validate JWT token"""
    try:
        payload = jwt.decode(token, settings.SECRET_KEY, algorithms=[settings.ALGORITHM])
        return payload
    except JWTError:
        raise AuthenticationError("Invalid or expired token")


def generate_api_key() -> str:
    """Generate secure API key"""
    return secrets.token_urlsafe(32)


# Dependency: Get current user from JWT token
async def get_current_user(

    credentials: HTTPAuthorizationCredentials = Security(bearer_scheme),

    db: Session = Depends(get_db)

) -> User:
    """Get current authenticated user from JWT token"""
    
    try:
        token = credentials.credentials
        payload = decode_token(token)
        
        user_id: int = payload.get("sub")
        if user_id is None:
            raise AuthenticationError("Invalid token payload")
        
    except JWTError:
        raise AuthenticationError("Could not validate credentials")
    
    user = db.query(User).filter(User.id == user_id).first()
    if user is None:
        raise AuthenticationError("User not found")
    
    if not user.is_active:
        raise AuthenticationError("User account is inactive")
    
    return user


# Dependency: Get current user from API key
async def get_current_user_from_api_key(

    api_key: Optional[str] = Security(api_key_header),

    db: Session = Depends(get_db)

) -> Optional[User]:
    """Get current user from API key"""
    
    if not api_key:
        return None
    
    # Find API key in database
    api_key_obj = db.query(APIKey).filter(
        APIKey.key == api_key,
        APIKey.is_active == True
    ).first()
    
    if not api_key_obj:
        raise AuthenticationError("Invalid API key")
    
    # Check expiration
    if api_key_obj.expires_at and api_key_obj.expires_at < datetime.utcnow():
        raise AuthenticationError("API key has expired")
    
    # Update last used timestamp
    api_key_obj.last_used_at = datetime.utcnow()
    db.commit()
    
    # Get user
    user = db.query(User).filter(User.id == api_key_obj.user_id).first()
    
    if not user or not user.is_active:
        raise AuthenticationError("User not found or inactive")
    
    return user


# Dependency: Get current user (try JWT first, then API key)
async def get_current_user_flexible(

    bearer: Optional[HTTPAuthorizationCredentials] = Security(bearer_scheme, auto_error=False),

    api_key: Optional[str] = Security(api_key_header),

    db: Session = Depends(get_db)

) -> User:
    """Get current user from JWT or API key"""
    
    # Try JWT token first
    if bearer:
        try:
            token = bearer.credentials
            payload = decode_token(token)
            user_id: int = payload.get("sub")
            
            user = db.query(User).filter(User.id == user_id).first()
            if user and user.is_active:
                return user
        except:
            pass
    
    # Try API key
    if api_key:
        user = await get_current_user_from_api_key(api_key, db)
        if user:
            return user
    
    raise AuthenticationError("Authentication required")


# Dependency: Require superuser
async def get_current_superuser(

    current_user: User = Depends(get_current_user_flexible)

) -> User:
    """Require superuser privileges"""
    
    if not current_user.is_superuser:
        raise AuthorizationError("Superuser privileges required")
    
    return current_user


# Helper: Authenticate user
def authenticate_user(

    db: Session,

    email: str,

    password: str

) -> Optional[User]:
    """Authenticate user with email and password"""
    
    user = db.query(User).filter(User.email == email).first()
    if not user:
        return None
    
    if not verify_password(password, user.hashed_password):
        return None
    
    return user


# Helper: Create user
def create_user(

    db: Session,

    email: str,

    password: str,

    full_name: Optional[str] = None,

    is_superuser: bool = False

) -> User:
    """Create new user"""
    
    # Check if user exists
    existing_user = db.query(User).filter(User.email == email).first()
    if existing_user:
        raise ValueError("User with this email already exists")
    
    # Create user
    user = User(
        email=email,
        hashed_password=get_password_hash(password),
        full_name=full_name,
        is_superuser=is_superuser,
        is_active=True
    )
    
    db.add(user)
    db.commit()
    db.refresh(user)
    
    return user


# Helper: Create API key
def create_api_key_for_user(

    db: Session,

    user_id: int,

    name: Optional[str] = None,

    expires_days: Optional[int] = None

) -> APIKey:
    """Create API key for user"""
    
    key = generate_api_key()
    
    api_key = APIKey(
        key=key,
        name=name or "API Key",
        user_id=user_id,
        is_active=True,
        rate_limit_per_minute=settings.RATE_LIMIT_PER_MINUTE,
        rate_limit_per_hour=settings.RATE_LIMIT_PER_HOUR,
        expires_at=datetime.utcnow() + timedelta(days=expires_days) if expires_days else None
    )
    
    db.add(api_key)
    db.commit()
    db.refresh(api_key)
    
    return api_key


if __name__ == "__main__":
    # Test password hashing
    password = "test_password_123"
    hashed = get_password_hash(password)
    print(f"Hashed: {hashed}")
    print(f"Verified: {verify_password(password, hashed)}")
    
    # Test JWT token creation
    token = create_access_token({"sub": 1, "email": "test@example.com"})
    print(f"Token: {token}")
    
    payload = decode_token(token)
    print(f"Decoded: {payload}")
    
    # Test API key generation
    api_key = generate_api_key()
    print(f"API Key: {api_key}")