Spaces:
Sleeping
Sleeping
File size: 4,702 Bytes
91d209c 8e0b1d0 91d209c |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 |
"""
Authentication endpoints for user login and access control
"""
from fastapi import APIRouter, HTTPException, Depends, status
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from pydantic import BaseModel
from datetime import datetime, timedelta
from typing import Optional
import os
import hashlib
from jose import JWTError, jwt
router = APIRouter()
security = HTTPBearer()
# JWT Configuration
SECRET_KEY = os.getenv("JWT_SECRET_KEY", "Adgenesis")
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_HOURS = 24 * 7 # 7 days
# User credentials (in production, use a database)
# Format: username -> hashed_password
# You can set these via environment variables or use a simple hash
ALLOWED_USERS = {}
def load_allowed_users():
"""Load allowed users from environment variables"""
users_str = os.getenv("ALLOWED_USERS", "")
if not users_str:
# Default user for development (username: admin, password: admin)
# In production, always set ALLOWED_USERS env var
print("⚠️ Using default credentials (admin/admin). Set ALLOWED_USERS env var for production.")
return {"admin": hash_password("admin")}
users = {}
for user_entry in users_str.split(","):
if ":" in user_entry:
username, password = user_entry.split(":", 1)
users[username.strip()] = hash_password(password.strip())
print(f"✅ Loaded {len(users)} user(s) from ALLOWED_USERS")
return users
def hash_password(password: str) -> str:
"""Hash password using SHA256 (simple, for basic auth)"""
return hashlib.sha256(password.encode()).hexdigest()
def verify_password(plain_password: str, hashed_password: str) -> bool:
"""Verify password against hash"""
return hash_password(plain_password) == hashed_password
def create_access_token(data: dict, expires_delta: Optional[timedelta] = None):
"""Create JWT access token"""
to_encode = data.copy()
if expires_delta:
expire = datetime.utcnow() + expires_delta
else:
expire = datetime.utcnow() + timedelta(hours=ACCESS_TOKEN_EXPIRE_HOURS)
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt
def verify_token(credentials: HTTPAuthorizationCredentials = Depends(security)):
"""Verify JWT token"""
token = credentials.credentials
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
username: str = payload.get("sub")
if username is None:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid authentication credentials",
headers={"WWW-Authenticate": "Bearer"},
)
return username
except JWTError:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid authentication credentials",
headers={"WWW-Authenticate": "Bearer"},
)
# Load users on module import
ALLOWED_USERS = load_allowed_users()
# Request/Response Models
class LoginRequest(BaseModel):
username: str
password: str
class LoginResponse(BaseModel):
access_token: str
token_type: str = "bearer"
username: str
class VerifyResponse(BaseModel):
authenticated: bool
username: Optional[str] = None
@router.post("/auth/login", response_model=LoginResponse)
async def login(request: LoginRequest):
"""Login endpoint - returns JWT token"""
username = request.username
password = request.password
# Check if user exists
if username not in ALLOWED_USERS:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect username or password"
)
# Verify password
if not verify_password(password, ALLOWED_USERS[username]):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect username or password"
)
# Create access token
access_token = create_access_token(data={"sub": username})
return LoginResponse(
access_token=access_token,
token_type="bearer",
username=username
)
@router.get("/auth/verify", response_model=VerifyResponse)
async def verify_token_endpoint(username: str = Depends(verify_token)):
"""Verify if token is valid"""
return VerifyResponse(authenticated=True, username=username)
@router.get("/auth/me", response_model=VerifyResponse)
async def get_current_user(username: str = Depends(verify_token)):
"""Get current authenticated user"""
return VerifyResponse(authenticated=True, username=username)
|