srivatsavdamaraju's picture
Upload 39 files
fd21f0c verified
# auth.py
from datetime import datetime, timedelta
from jose import JWTError, jwt
from passlib.context import CryptContext
from fastapi import APIRouter, Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from sqlalchemy.orm import Session
import os
import sys
from pathlib import Path
# ensure project root (agent/) is on sys.path so sibling packages like "s3" can be imported
project_root = Path(__file__).resolve().parents[2] # -> ...\openai_agents\agent
if str(project_root) not in sys.path:
sys.path.insert(0, str(project_root))
from . import models
from . import database
from dotenv import load_dotenv
load_dotenv()
# ============================================================
# JWT Configuration
# ============================================================
SECRET_KEY = os.getenv("SECRET_KEY", "fallback_value") # Load from .env
ALGORITHM = os.getenv("ALGORITHM", "HS256")
ACCESS_TOKEN_EXPIRE_MINUTES = int(os.getenv("ACCESS_TOKEN_EXPIRE_MINUTES", "60"))
# ============================================================
# Password Hashing Context
# ============================================================
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
# ============================================================
# OAuth2 Scheme
# ============================================================
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/auth/login")
# ============================================================
# FastAPI Router
# ============================================================
router = APIRouter(prefix="/auth", tags=["Authentication"])
# ============================================================
# Helper Functions
# ============================================================
def verify_password(plain_password: str, hashed_password: str) -> bool:
"""Verify a password against its hash."""
return pwd_context.verify(plain_password, hashed_password)
def get_password_hash(password: str) -> str:
"""Hash a plaintext password."""
return pwd_context.hash(password)
def create_access_token(data: dict, expires_delta: timedelta | None = None) -> str:
"""Generate a JWT token with expiry."""
to_encode = data.copy()
expire = datetime.utcnow() + (expires_delta or timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES))
to_encode.update({"exp": expire})
return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
def get_current_user(
token: str = Depends(oauth2_scheme),
db: Session = Depends(database.get_db)
):
"""Dependency to get currently authenticated user."""
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
username: str = payload.get("sub")
if username is None:
raise credentials_exception
except JWTError:
raise credentials_exception
user = db.query(models.User).filter(models.User.username == username).first()
if user is None:
raise credentials_exception
return user
# ============================================================
# 🧠 LOGIN ENDPOINT
# ============================================================
@router.post("/login")
def login(
form_data: OAuth2PasswordRequestForm = Depends(),
db: Session = Depends(database.get_db)
):
"""Authenticate user and return JWT token."""
user = db.query(models.User).filter(models.User.username == form_data.username).first()
if not user or not verify_password(form_data.password, user.password):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid username or password",
headers={"WWW-Authenticate": "Bearer"},
)
access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = create_access_token(
data={"sub": user.username}, expires_delta=access_token_expires
)
return {"access_token": access_token, "token_type": "bearer"}
# ============================================================
# 🚪 LOGOUT (Optional)
# ============================================================
@router.post("/logout")
def logout():
"""
Logout endpoint — since JWT is stateless,
just tell the client to delete its token.
"""
return {"message": "Successfully logged out. Please delete your token on the client side."}