Spotix-API / backend /app /api /auth_routes.py
Anish-530
Added email code verif
c041d0d
from fastapi import APIRouter, Depends, HTTPException, Request, status, BackgroundTasks, Header
from sqlalchemy.orm import Session
from app.schemas.auth_schema import LoginRequest, PasswordResetRequest, PasswordResetConfirm, VerifyEmailConfirm, ResendVerificationRequest
from app.services.auth_service import login_user, request_password_reset, confirm_password_reset, confirm_email_verification, resend_verification
from app.db.session import get_db
from app.auth.oauth import oauth
from app.services.oauth_service import handle_oauth_login
from app.services.email_service import send_reset_password_email, send_verification_email
from app.security.turnstile import verify_turnstile_token
from app.core.auth_dependancy import get_current_user
from app.utils.jwt_handler import create_access_token
router = APIRouter(prefix="/auth", tags=["Auth"])
@router.post("/login")
async def login(data: LoginRequest, request: Request, cf_turnstile_response: str = Header(None, alias="cf-turnstile-response"), db: Session = Depends(get_db)):
try:
client_ip = request.client.host
await verify_turnstile_token(cf_turnstile_response, client_ip)
token = login_user(db, data.identifier, data.password, request)
if not token:
raise HTTPException(status_code=401, detail="Invalid Credentials")
return {"access_token": token}
except ValueError as e:
error_msg = str(e)
if error_msg.startswith("locked_"):
minutes = error_msg.split("_")[1]
min_str = "minute" if int(minutes) <= 1 else "minutes"
raise HTTPException(
status_code=403,
detail=f"Account locked due to too many failed attempts. Try again in {minutes} {min_str}."
)
elif error_msg.startswith("attempt_"):
left = error_msg.split("_")[1]
att_str = "attempt" if int(left) <= 1 else "attempts"
raise HTTPException(
status_code=401,
detail=f"Invalid Credentials. {left} {att_str} remaining before lockout."
)
elif error_msg == "unverified":
raise HTTPException(status_code=403, detail="Please verify your email before logging in.")
raise HTTPException(status_code=400, detail=error_msg)
@router.get("/google/login")
async def login_google(request: Request):
import os
redirect_url = os.getenv("GOOGLE_REDIRECT_URI", str(request.url_for('auth_google_callback')))
return await oauth.google.authorize_redirect(request, redirect_url)
@router.get("/google/callback")
async def auth_google_callback(request: Request, db: Session = Depends(get_db)):
try:
token = await oauth.google.authorize_access_token(request)
except Exception as e:
with open("app_logs/oauth_debug.txt", "a") as f:
f.write(f"Access Token Error: {str(e)}\n")
raise HTTPException(status_code=400, detail=f"Google Auth Failed: {str(e)}")
user_info = token.get("userinfo")
if not user_info:
try:
user_info = await oauth.google.parse_id_token(request, token)
except Exception as e:
with open("app_logs/oauth_debug.txt", "a") as f:
f.write(f"Parse ID Token Error: {str(e)}\n")
if not user_info:
with open("app_logs/oauth_debug.txt", "a") as f:
f.write("No userinfo found in token.\n")
raise HTTPException(status_code=400, detail="Could not fetch Google profile")
try:
system_jwt = handle_oauth_login(db, user_info)
except Exception as e:
with open("app_logs/oauth_debug.txt", "a") as f:
f.write(f"Handle Login Error: {str(e)}\n")
raise HTTPException(status_code=400, detail=str(e))
from fastapi.responses import RedirectResponse
import os
frontend_url = os.getenv("FRONTEND_URL", "http://localhost:3000")
# Ensure no trailing slash
if frontend_url.endswith("/"):
frontend_url = frontend_url[:-1]
return RedirectResponse(url=f"{frontend_url}/login?access_token={system_jwt}")
@router.post("/request-reset")
def request_reset(
data: PasswordResetRequest,
background_tasks: BackgroundTasks,
db: Session = Depends(get_db)
):
raw_token = request_password_reset(db, data.email)
if raw_token:
background_tasks.add_task(send_reset_password_email, data.email, raw_token)
return {"message": "If that email exists, a reset link has been sent."}
@router.post("/reset-password")
def reset_password(data: PasswordResetConfirm, request: Request, db: Session = Depends(get_db)):
success = confirm_password_reset(db, data.token, data.new_password, request)
if not success:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Invalid or expired reset token"
)
return {"message": "Password successfully reset."}
@router.post("/verify-email")
def verify_email(data: VerifyEmailConfirm, db: Session = Depends(get_db)):
user = confirm_email_verification(db, data.email, data.code)
if not user:
raise HTTPException(status_code=400, detail="Invalid or expired verification code")
token = create_access_token({"user_id": user.id})
return {"message": "Email successfully verified!", "access_token": token}
@router.post("/resend-verification")
def resend_verification_endpoint(
data: ResendVerificationRequest,
background_tasks: BackgroundTasks,
db: Session = Depends(get_db)
):
raw_token = resend_verification(db, data.email)
if raw_token:
background_tasks.add_task(send_verification_email, data.email, raw_token)
return {"message": "If the account exists and is unverified, a verification code has been sent."}
@router.get("/me")
def get_current_active_user(
current_user = Depends(get_current_user)
):
return {
"id": current_user.id,
"email": current_user.email,
"name": getattr(current_user, 'username', getattr(current_user, 'email', 'User')),
"is_admin": getattr(current_user, 'is_admin', False),
"avatar_url": getattr(current_user, 'avatar_url', None),
"google_avatar_url": getattr(current_user, 'google_avatar_url', None),
"created_at": current_user.created_at.isoformat() if getattr(current_user, 'created_at', None) else None
}