Ruhi_hosting / core /auth.py
Ruhivig65's picture
Upload 11 files
9684770 verified
"""
============================================
RUHI-CORE - Authentication System
============================================
"""
import os
import secrets
from datetime import datetime, timedelta
from typing import Optional
from fastapi import HTTPException, Security, Depends, Request, status
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from jose import JWTError, jwt
from passlib.context import CryptContext
from loguru import logger
from core.config import settings
# Password hashing
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
# JWT Settings
ALGORITHM = "HS256"
security = HTTPBearer(auto_error=False)
def create_access_token(data: dict, expires_delta: Optional[timedelta] = None) -> str:
"""Create JWT access token"""
to_encode = data.copy()
expire = datetime.utcnow() + (expires_delta or timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES))
to_encode.update({
"exp": expire,
"iat": datetime.utcnow(),
"jti": secrets.token_hex(16)
})
return jwt.encode(to_encode, settings.SECRET_KEY, algorithm=ALGORITHM)
def verify_token(token: str) -> dict:
"""Verify and decode JWT token"""
try:
payload = jwt.decode(token, settings.SECRET_KEY, algorithms=[ALGORITHM])
username: str = payload.get("sub")
if username is None:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid token: no subject"
)
return payload
except JWTError as e:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail=f"Invalid token: {str(e)}"
)
def authenticate_admin(username: str, password: str) -> bool:
"""Authenticate admin credentials"""
return (
username == settings.ADMIN_USERNAME and
password == settings.ADMIN_PASSWORD
)
async def get_current_user(
request: Request,
credentials: Optional[HTTPAuthorizationCredentials] = Security(security)
) -> dict:
"""Get current authenticated user from JWT token"""
token = None
# Try Bearer token first
if credentials:
token = credentials.credentials
# Try cookie
if not token:
token = request.cookies.get("access_token")
# Try query parameter (for WebSocket)
if not token:
token = request.query_params.get("token")
if not token:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Not authenticated. Please login.",
headers={"WWW-Authenticate": "Bearer"}
)
payload = verify_token(token)
return {
"username": payload.get("sub"),
"exp": payload.get("exp")
}
class IPWhitelist:
"""IP Whitelisting middleware"""
@staticmethod
def check_ip(request: Request):
if not settings.IP_WHITELIST_ENABLED:
return True
client_ip = request.client.host
forwarded = request.headers.get("X-Forwarded-For")
if forwarded:
client_ip = forwarded.split(",")[0].strip()
if client_ip not in settings.WHITELISTED_IPS:
logger.warning(f"🚫 Blocked access from IP: {client_ip}")
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=f"Access denied. IP {client_ip} is not whitelisted."
)
return True