book / backend /api /auth.py
Ammar Ahmed Khan
Add Physical AI Humanoid Book Platform
e2eff86
from fastapi import APIRouter, Depends, HTTPException, status
from sqlalchemy.orm import Session
from datetime import timedelta
import logging
import sys
import os
# Add the backend directory to the path to allow absolute imports
sys.path.append(os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))))
from database import get_db
from models.user import User, UserCreate, UserResponse, UserUpdate
from utils.hash import hash_password, verify_password
from utils.token import create_access_token
from api.schemas import ChatbotQueryResponse
from middleware.auth import get_current_user
# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/api/auth", tags=["auth"])
@router.post("/signup", response_model=UserResponse)
async def signup_user(
user_data: UserCreate,
db: Session = Depends(get_db)
):
"""
Register a new user account
"""
# Check if user with email already exists
existing_user = db.query(User).filter(User.email == user_data.email).first()
if existing_user:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Email already registered"
)
# Create new user
hashed_password = hash_password(user_data.password)
new_user = User(
email=user_data.email,
password_hash=hashed_password,
first_name=user_data.first_name,
last_name=user_data.last_name
)
# Add to database
db.add(new_user)
db.commit()
db.refresh(new_user)
logger.info(f"New user registered: {new_user.email}")
# Create and return access token
access_token_expires = timedelta(minutes=30)
access_token = create_access_token(
data={"user_id": new_user.id}, expires_delta=access_token_expires
)
# Return user data and token
return UserResponse(
id=new_user.id,
email=new_user.email,
first_name=new_user.first_name,
last_name=new_user.last_name,
is_active=new_user.is_active,
created_at=new_user.created_at.isoformat()
)
@router.post("/register", response_model=UserResponse)
async def register_user(
user_data: UserCreate,
db: Session = Depends(get_db)
):
"""
Register a new user account (duplicate endpoint for compatibility)
"""
# Check if user with email already exists
existing_user = db.query(User).filter(User.email == user_data.email).first()
if existing_user:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Email already registered"
)
# Create new user
hashed_password = hash_password(user_data.password)
new_user = User(
email=user_data.email,
password_hash=hashed_password,
first_name=user_data.first_name,
last_name=user_data.last_name
)
# Add to database
db.add(new_user)
db.commit()
db.refresh(new_user)
logger.info(f"New user registered: {new_user.email}")
# Create and return access token
access_token_expires = timedelta(minutes=30)
access_token = create_access_token(
data={"user_id": new_user.id}, expires_delta=access_token_expires
)
# Return user data and token
return UserResponse(
id=new_user.id,
email=new_user.email,
first_name=new_user.first_name,
last_name=new_user.last_name,
is_active=new_user.is_active,
created_at=new_user.created_at.isoformat()
)
@router.post("/login")
async def login_user(
user_data: UserCreate, # Using the same schema for login (email, password)
db: Session = Depends(get_db)
):
"""
Authenticate user and return JWT token
"""
# Find user by email
user = db.query(User).filter(User.email == user_data.email).first()
if not user or not verify_password(user_data.password, user.password_hash):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid credentials",
headers={"WWW-Authenticate": "Bearer"},
)
if not user.is_active:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Inactive account",
headers={"WWW-Authenticate": "Bearer"},
)
# Create and return access token
access_token_expires = timedelta(minutes=30)
access_token = create_access_token(
data={"user_id": user.id}, expires_delta=access_token_expires
)
logger.info(f"User logged in: {user.email}")
return {
"access_token": access_token,
"token_type": "bearer",
"user": {
"id": user.id,
"email": user.email,
"first_name": user.first_name
}
}
@router.post("/logout")
async def logout_user():
"""
Revoke the current user's JWT token
"""
# In a real implementation, this would invalidate the token
# For now, we'll just return a success message
return {"message": "Successfully logged out"}
@router.get("/profile", response_model=UserResponse)
async def get_profile(
current_user: User = Depends(get_current_user),
db: Session = Depends(get_db)
):
"""
Get current user profile information
"""
return UserResponse(
id=current_user.id,
email=current_user.email,
first_name=current_user.first_name,
last_name=current_user.last_name,
is_active=current_user.is_active,
created_at=current_user.created_at.isoformat()
)
@router.get("/me", response_model=UserResponse)
async def get_current_user_info(
current_user: User = Depends(get_current_user),
db: Session = Depends(get_db)
):
"""
Get authenticated user details
"""
return UserResponse(
id=current_user.id,
email=current_user.email,
first_name=current_user.first_name,
last_name=current_user.last_name,
is_active=current_user.is_active,
created_at=current_user.created_at.isoformat()
)
@router.put("/profile", response_model=UserResponse)
async def update_profile(
user_update: UserUpdate,
current_user: User = Depends(get_current_user),
db: Session = Depends(get_db)
):
"""
Update user profile information
"""
# Update user fields if they are provided
if user_update.first_name is not None:
current_user.first_name = user_update.first_name
if user_update.last_name is not None:
current_user.last_name = user_update.last_name
# Commit changes to database
db.commit()
db.refresh(current_user)
logger.info(f"User profile updated: {current_user.email}")
return UserResponse(
id=current_user.id,
email=current_user.email,
first_name=current_user.first_name,
last_name=current_user.last_name,
is_active=current_user.is_active,
created_at=current_user.created_at.isoformat()
)