""" ============================================ RUHI-CORE - Authentication System ============================================ """ import os import secrets from datetime import datetime, timedelta from typing import Optional from fastapi import HTTPException, Security, Depends, Request, status from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials from jose import JWTError, jwt from passlib.context import CryptContext from loguru import logger from core.config import settings # Password hashing pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto") # JWT Settings ALGORITHM = "HS256" security = HTTPBearer(auto_error=False) def create_access_token(data: dict, expires_delta: Optional[timedelta] = None) -> str: """Create JWT access token""" to_encode = data.copy() expire = datetime.utcnow() + (expires_delta or timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES)) to_encode.update({ "exp": expire, "iat": datetime.utcnow(), "jti": secrets.token_hex(16) }) return jwt.encode(to_encode, settings.SECRET_KEY, algorithm=ALGORITHM) def verify_token(token: str) -> dict: """Verify and decode JWT token""" try: payload = jwt.decode(token, settings.SECRET_KEY, algorithms=[ALGORITHM]) username: str = payload.get("sub") if username is None: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid token: no subject" ) return payload except JWTError as e: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail=f"Invalid token: {str(e)}" ) def authenticate_admin(username: str, password: str) -> bool: """Authenticate admin credentials""" return ( username == settings.ADMIN_USERNAME and password == settings.ADMIN_PASSWORD ) async def get_current_user( request: Request, credentials: Optional[HTTPAuthorizationCredentials] = Security(security) ) -> dict: """Get current authenticated user from JWT token""" token = None # Try Bearer token first if credentials: token = credentials.credentials # Try cookie if not token: token = request.cookies.get("access_token") # Try query parameter (for WebSocket) if not token: token = request.query_params.get("token") if not token: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Not authenticated. Please login.", headers={"WWW-Authenticate": "Bearer"} ) payload = verify_token(token) return { "username": payload.get("sub"), "exp": payload.get("exp") } class IPWhitelist: """IP Whitelisting middleware""" @staticmethod def check_ip(request: Request): if not settings.IP_WHITELIST_ENABLED: return True client_ip = request.client.host forwarded = request.headers.get("X-Forwarded-For") if forwarded: client_ip = forwarded.split(",")[0].strip() if client_ip not in settings.WHITELISTED_IPS: logger.warning(f"🚫 Blocked access from IP: {client_ip}") raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail=f"Access denied. IP {client_ip} is not whitelisted." ) return True