SanadLLM / app /auth /routes.py
Hydra-Bolt
added
7b10350
from fastapi import APIRouter, HTTPException, status, Depends, Request
from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer
from supabase import create_client, Client
from datetime import datetime, timedelta
from typing import Optional
import bcrypt
import logging
from app.config.settings import settings
from app.db.models import (
LoginRequest,
RegisterRequest,
AuthResponse,
TokenRefreshRequest,
User,
UserSession,
UserRole
)
from app.middleware import auth_middleware, get_user_ip
router = APIRouter(prefix="/auth", tags=["authentication"])
security = HTTPBearer()
logger = logging.getLogger(__name__)
# Initialize Supabase client
def get_supabase_client() -> Client:
"""Get Supabase client instance."""
if not settings.SUPABASE_URL or not settings.SUPABASE_SERVICE_KEY:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Supabase configuration is missing"
)
return create_client(settings.SUPABASE_URL, settings.SUPABASE_SERVICE_KEY)
def hash_password(password: str) -> str:
"""Hash password using bcrypt."""
salt = bcrypt.gensalt()
return bcrypt.hashpw(password.encode('utf-8'), salt).decode('utf-8')
def verify_password(password: str, hashed_password: str) -> bool:
"""Verify password against hash."""
return bcrypt.checkpw(password.encode('utf-8'), hashed_password.encode('utf-8'))
async def create_user_session(
user_id: str,
access_token: str,
refresh_token: str,
request: Request
) -> UserSession:
"""Create user session record."""
supabase = get_supabase_client()
session_data = {
"user_id": user_id,
"access_token": access_token,
"refresh_token": refresh_token,
"expires_at": (datetime.utcnow() + timedelta(minutes=settings.JWT_ACCESS_TOKEN_EXPIRE_MINUTES)).isoformat(),
"created_at": datetime.utcnow().isoformat(),
"last_used": datetime.utcnow().isoformat(),
"user_agent": request.headers.get("user-agent", ""),
"ip_address": get_user_ip(request)
}
try:
response = supabase.table("user_sessions").insert(session_data).execute()
if response.data:
return UserSession(**response.data[0])
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Failed to create session"
)
except Exception as e:
logger.error(f"Error creating user session: {e}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Failed to create session"
)
@router.post("/register", response_model=AuthResponse)
async def register(request: Request, user_data: RegisterRequest):
"""Register a new user."""
supabase = get_supabase_client()
logger.info(f"Registration request received for email={user_data.email}, username={user_data.username}")
logger.debug(f"Request source ip={get_user_ip(request)}, user_agent={request.headers.get('user-agent','')}")
try:
# Check if user already exists in our custom table
existing_user = supabase.table("users").select("id").eq("email", user_data.email).execute()
logger.debug(f"Existing user check result for email={user_data.email}: {existing_user.data}")
if existing_user.data:
logger.warning(f"Registration attempt for already existing email={user_data.email}")
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="User with this email already exists"
)
# Create user using Supabase Auth
logger.info(f"Creating auth user in Supabase for email={user_data.email}")
auth_response = supabase.auth.sign_up({
"email": user_data.email,
"password": user_data.password,
"options": {
"data": {
"username": user_data.username,
"full_name": user_data.full_name
}
}
})
logger.debug(f"Supabase auth sign_up response for email={user_data.email}: user_present={bool(getattr(auth_response, 'user', None))}")
if not auth_response.user:
logger.error(f"Supabase auth failed to create user for email={user_data.email}")
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Failed to create user"
)
logger.info(f"Supabase auth created user id={auth_response.user.id} for email={user_data.email}")
# Create user record in our custom table with error handling for duplicates
user_record = {
"id": auth_response.user.id,
"email": user_data.email,
"username": user_data.username,
"full_name": user_data.full_name,
"role": UserRole.USER.value,
"is_active": True,
"created_at": datetime.utcnow().isoformat()
}
logger.debug(f"Prepared user_record for db insert (id and metadata only): id={user_record['id']}, email={user_record['email']}, username={user_record['username']}")
try:
db_response = supabase.table("users").insert(user_record).execute()
logger.debug(f"DB insert response for user id={user_record['id']}: {db_response.data}")
if not db_response.data:
logger.error(f"DB did not return data after insert for user id={user_record['id']}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Failed to create user record"
)
user = User(**db_response.data[0])
logger.info(f"User record created in DB for id={user.id}, email={user.email}")
except Exception as db_error:
logger.warning(f"DB insert error for user id={user_record['id']}: {db_error}")
# Check if this is a duplicate key error (user already exists)
if "duplicate key" in str(db_error) or "23505" in str(db_error):
logger.info(f"Duplicate key detected when inserting user id={user_record['id']}, attempting to fetch existing record")
# User already exists in our table, fetch existing user
existing_user_response = supabase.table("users").select("*").eq("id", auth_response.user.id).execute()
logger.debug(f"Existing user fetch result for id={auth_response.user.id}: {existing_user_response.data}")
if existing_user_response.data:
user = User(**existing_user_response.data[0])
logger.info(f"Fetched existing user record for id={user.id}")
else:
logger.error(f"Duplicate key error but failed to fetch existing user id={auth_response.user.id}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="User creation failed"
)
else:
logger.exception("Unexpected database error during user creation")
raise db_error
# Create JWT tokens
if not user.id:
logger.error("User ID missing after creation")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="User ID is missing"
)
access_token = auth_middleware.create_access_token(data={"sub": user.id})
refresh_token = auth_middleware.create_refresh_token(data={"sub": user.id})
logger.info(f"Generated JWT tokens for user id={user.id} (not logging token values)")
# Create session
await create_user_session(user.id, access_token, refresh_token, request)
logger.info(f"Session created for user id={user.id}")
return AuthResponse(
access_token=access_token,
refresh_token=refresh_token,
token_type="bearer",
expires_in=settings.JWT_ACCESS_TOKEN_EXPIRE_MINUTES * 60,
user=user
)
except HTTPException:
logger.debug("Reraising HTTPException from register flow")
raise
except Exception as e:
logger.exception(f"Registration error for email={user_data.email}: {e}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Registration failed"
)
@router.post("/login", response_model=AuthResponse)
async def login(request: Request, credentials: LoginRequest):
"""Authenticate user and return tokens."""
supabase = get_supabase_client()
try:
# Authenticate with Supabase
auth_response = supabase.auth.sign_in_with_password({
"email": credentials.email,
"password": credentials.password
})
if not auth_response.user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid email or password"
)
# Get user from our custom table
user_response = supabase.table("users").select("*").eq("id", auth_response.user.id).execute()
if not user_response.data:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="User record not found"
)
user = User(**user_response.data[0])
if not user.is_active:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Account is disabled"
)
# Update last login
supabase.table("users").update({
"last_login": datetime.utcnow().isoformat()
}).eq("id", user.id).execute()
# Create JWT tokens
if not user.id:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="User ID is missing"
)
access_token = auth_middleware.create_access_token(data={"sub": user.id})
refresh_token = auth_middleware.create_refresh_token(data={"sub": user.id})
# Create session
await create_user_session(user.id, access_token, refresh_token, request)
return AuthResponse(
access_token=access_token,
refresh_token=refresh_token,
token_type="bearer",
expires_in=settings.JWT_ACCESS_TOKEN_EXPIRE_MINUTES * 60,
user=user
)
except HTTPException:
raise
except Exception as e:
logger.error(f"Login error: {e}")
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Authentication failed"
)
@router.post("/refresh", response_model=AuthResponse)
async def refresh_token(request: Request, token_data: TokenRefreshRequest):
"""Refresh access token using refresh token."""
try:
# Verify refresh token
payload = auth_middleware.verify_token(token_data.refresh_token)
if payload.get("type") != "refresh":
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid token type"
)
user_id = payload.get("sub")
if not user_id:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid token"
)
# Get user
user = await auth_middleware.get_user_from_supabase(user_id)
if not user or not user.is_active:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="User not found or inactive"
)
# Create new tokens
if not user.id:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="User ID is missing"
)
new_access_token = auth_middleware.create_access_token(data={"sub": user.id})
new_refresh_token = auth_middleware.create_refresh_token(data={"sub": user.id})
# Create new session
await create_user_session(user.id, new_access_token, new_refresh_token, request)
return AuthResponse(
access_token=new_access_token,
refresh_token=new_refresh_token,
token_type="bearer",
expires_in=settings.JWT_ACCESS_TOKEN_EXPIRE_MINUTES * 60,
user=user
)
except HTTPException:
raise
except Exception as e:
logger.error(f"Token refresh error: {e}")
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Token refresh failed"
)
@router.post("/logout")
async def logout(credentials: HTTPAuthorizationCredentials = Depends(security)):
"""Logout user and blacklist token."""
try:
token = credentials.credentials
payload = auth_middleware.verify_token(token)
# Update session as ended
user_id = payload.get("sub")
if user_id:
supabase = get_supabase_client()
supabase.table("user_sessions").update({
"ended_at": datetime.utcnow().isoformat()
}).eq("user_id", user_id).eq("access_token", token).execute()
return {"message": "Successfully logged out"}
except Exception as e:
logger.error(f"Logout error: {e}")
return {"message": "Logout completed"}
@router.get("/me", response_model=User)
async def get_current_user_info(current_user: User = Depends(auth_middleware.get_current_user)):
"""Get current user information."""
return current_user
@router.get("/sessions")
async def get_user_sessions(current_user: User = Depends(auth_middleware.get_current_user)):
"""Get user's active sessions."""
supabase = get_supabase_client()
try:
response = supabase.table("user_sessions").select("*").eq("user_id", current_user.id).is_("ended_at", "null").execute()
return {"sessions": response.data}
except Exception as e:
logger.error(f"Error fetching sessions: {e}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Failed to fetch sessions"
)