""" Authentication endpoints for user login and access control """ from fastapi import APIRouter, HTTPException, Depends, status from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials from pydantic import BaseModel from datetime import datetime, timedelta from typing import Optional import os import hashlib from jose import JWTError, jwt router = APIRouter() security = HTTPBearer() # JWT Configuration SECRET_KEY = os.getenv("JWT_SECRET_KEY", "Adgenesis") ALGORITHM = "HS256" ACCESS_TOKEN_EXPIRE_HOURS = 24 * 7 # 7 days # User credentials (in production, use a database) # Format: username -> hashed_password # You can set these via environment variables or use a simple hash ALLOWED_USERS = {} def load_allowed_users(): """Load allowed users from environment variables""" users_str = os.getenv("ALLOWED_USERS", "") if not users_str: # Default user for development (username: admin, password: admin) # In production, always set ALLOWED_USERS env var print("⚠️ Using default credentials (admin/admin). Set ALLOWED_USERS env var for production.") return {"admin": hash_password("admin")} users = {} for user_entry in users_str.split(","): if ":" in user_entry: username, password = user_entry.split(":", 1) users[username.strip()] = hash_password(password.strip()) print(f"✅ Loaded {len(users)} user(s) from ALLOWED_USERS") return users def hash_password(password: str) -> str: """Hash password using SHA256 (simple, for basic auth)""" return hashlib.sha256(password.encode()).hexdigest() def verify_password(plain_password: str, hashed_password: str) -> bool: """Verify password against hash""" return hash_password(plain_password) == hashed_password def create_access_token(data: dict, expires_delta: Optional[timedelta] = None): """Create JWT access token""" to_encode = data.copy() if expires_delta: expire = datetime.utcnow() + expires_delta else: expire = datetime.utcnow() + timedelta(hours=ACCESS_TOKEN_EXPIRE_HOURS) to_encode.update({"exp": expire}) encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM) return encoded_jwt def verify_token(credentials: HTTPAuthorizationCredentials = Depends(security)): """Verify JWT token""" token = credentials.credentials try: payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) username: str = payload.get("sub") if username is None: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid authentication credentials", headers={"WWW-Authenticate": "Bearer"}, ) return username except JWTError: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid authentication credentials", headers={"WWW-Authenticate": "Bearer"}, ) # Load users on module import ALLOWED_USERS = load_allowed_users() # Request/Response Models class LoginRequest(BaseModel): username: str password: str class LoginResponse(BaseModel): access_token: str token_type: str = "bearer" username: str class VerifyResponse(BaseModel): authenticated: bool username: Optional[str] = None @router.post("/auth/login", response_model=LoginResponse) async def login(request: LoginRequest): """Login endpoint - returns JWT token""" username = request.username password = request.password # Check if user exists if username not in ALLOWED_USERS: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Incorrect username or password" ) # Verify password if not verify_password(password, ALLOWED_USERS[username]): raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Incorrect username or password" ) # Create access token access_token = create_access_token(data={"sub": username}) return LoginResponse( access_token=access_token, token_type="bearer", username=username ) @router.get("/auth/verify", response_model=VerifyResponse) async def verify_token_endpoint(username: str = Depends(verify_token)): """Verify if token is valid""" return VerifyResponse(authenticated=True, username=username) @router.get("/auth/me", response_model=VerifyResponse) async def get_current_user(username: str = Depends(verify_token)): """Get current authenticated user""" return VerifyResponse(authenticated=True, username=username)