sushilideaclan01's picture
Update JWT secret key in auth.py for development
8e0b1d0
"""
Authentication endpoints for user login and access control
"""
from fastapi import APIRouter, HTTPException, Depends, status
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from pydantic import BaseModel
from datetime import datetime, timedelta
from typing import Optional
import os
import hashlib
from jose import JWTError, jwt
router = APIRouter()
security = HTTPBearer()
# JWT Configuration
SECRET_KEY = os.getenv("JWT_SECRET_KEY", "Adgenesis")
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_HOURS = 24 * 7 # 7 days
# User credentials (in production, use a database)
# Format: username -> hashed_password
# You can set these via environment variables or use a simple hash
ALLOWED_USERS = {}
def load_allowed_users():
"""Load allowed users from environment variables"""
users_str = os.getenv("ALLOWED_USERS", "")
if not users_str:
# Default user for development (username: admin, password: admin)
# In production, always set ALLOWED_USERS env var
print("⚠️ Using default credentials (admin/admin). Set ALLOWED_USERS env var for production.")
return {"admin": hash_password("admin")}
users = {}
for user_entry in users_str.split(","):
if ":" in user_entry:
username, password = user_entry.split(":", 1)
users[username.strip()] = hash_password(password.strip())
print(f"✅ Loaded {len(users)} user(s) from ALLOWED_USERS")
return users
def hash_password(password: str) -> str:
"""Hash password using SHA256 (simple, for basic auth)"""
return hashlib.sha256(password.encode()).hexdigest()
def verify_password(plain_password: str, hashed_password: str) -> bool:
"""Verify password against hash"""
return hash_password(plain_password) == hashed_password
def create_access_token(data: dict, expires_delta: Optional[timedelta] = None):
"""Create JWT access token"""
to_encode = data.copy()
if expires_delta:
expire = datetime.utcnow() + expires_delta
else:
expire = datetime.utcnow() + timedelta(hours=ACCESS_TOKEN_EXPIRE_HOURS)
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt
def verify_token(credentials: HTTPAuthorizationCredentials = Depends(security)):
"""Verify JWT token"""
token = credentials.credentials
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
username: str = payload.get("sub")
if username is None:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid authentication credentials",
headers={"WWW-Authenticate": "Bearer"},
)
return username
except JWTError:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid authentication credentials",
headers={"WWW-Authenticate": "Bearer"},
)
# Load users on module import
ALLOWED_USERS = load_allowed_users()
# Request/Response Models
class LoginRequest(BaseModel):
username: str
password: str
class LoginResponse(BaseModel):
access_token: str
token_type: str = "bearer"
username: str
class VerifyResponse(BaseModel):
authenticated: bool
username: Optional[str] = None
@router.post("/auth/login", response_model=LoginResponse)
async def login(request: LoginRequest):
"""Login endpoint - returns JWT token"""
username = request.username
password = request.password
# Check if user exists
if username not in ALLOWED_USERS:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect username or password"
)
# Verify password
if not verify_password(password, ALLOWED_USERS[username]):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect username or password"
)
# Create access token
access_token = create_access_token(data={"sub": username})
return LoginResponse(
access_token=access_token,
token_type="bearer",
username=username
)
@router.get("/auth/verify", response_model=VerifyResponse)
async def verify_token_endpoint(username: str = Depends(verify_token)):
"""Verify if token is valid"""
return VerifyResponse(authenticated=True, username=username)
@router.get("/auth/me", response_model=VerifyResponse)
async def get_current_user(username: str = Depends(verify_token)):
"""Get current authenticated user"""
return VerifyResponse(authenticated=True, username=username)