Spaces:
Running
Running
| from fastapi import APIRouter, Depends, HTTPException, Request, status, BackgroundTasks, Header | |
| from sqlalchemy.orm import Session | |
| from app.schemas.auth_schema import LoginRequest, PasswordResetRequest, PasswordResetConfirm, VerifyEmailConfirm, ResendVerificationRequest | |
| from app.services.auth_service import login_user, request_password_reset, confirm_password_reset, confirm_email_verification, resend_verification | |
| from app.db.session import get_db | |
| from app.auth.oauth import oauth | |
| from app.services.oauth_service import handle_oauth_login | |
| from app.services.email_service import send_reset_password_email, send_verification_email | |
| from app.security.turnstile import verify_turnstile_token | |
| from app.core.auth_dependancy import get_current_user | |
| from app.utils.jwt_handler import create_access_token | |
| router = APIRouter(prefix="/auth", tags=["Auth"]) | |
| async def login(data: LoginRequest, request: Request, cf_turnstile_response: str = Header(None, alias="cf-turnstile-response"), db: Session = Depends(get_db)): | |
| try: | |
| client_ip = request.client.host | |
| await verify_turnstile_token(cf_turnstile_response, client_ip) | |
| token = login_user(db, data.identifier, data.password, request) | |
| if not token: | |
| raise HTTPException(status_code=401, detail="Invalid Credentials") | |
| return {"access_token": token} | |
| except ValueError as e: | |
| error_msg = str(e) | |
| if error_msg.startswith("locked_"): | |
| minutes = error_msg.split("_")[1] | |
| min_str = "minute" if int(minutes) <= 1 else "minutes" | |
| raise HTTPException( | |
| status_code=403, | |
| detail=f"Account locked due to too many failed attempts. Try again in {minutes} {min_str}." | |
| ) | |
| elif error_msg.startswith("attempt_"): | |
| left = error_msg.split("_")[1] | |
| att_str = "attempt" if int(left) <= 1 else "attempts" | |
| raise HTTPException( | |
| status_code=401, | |
| detail=f"Invalid Credentials. {left} {att_str} remaining before lockout." | |
| ) | |
| elif error_msg == "unverified": | |
| raise HTTPException(status_code=403, detail="Please verify your email before logging in.") | |
| raise HTTPException(status_code=400, detail=error_msg) | |
| async def login_google(request: Request): | |
| import os | |
| redirect_url = os.getenv("GOOGLE_REDIRECT_URI", str(request.url_for('auth_google_callback'))) | |
| return await oauth.google.authorize_redirect(request, redirect_url) | |
| async def auth_google_callback(request: Request, db: Session = Depends(get_db)): | |
| try: | |
| token = await oauth.google.authorize_access_token(request) | |
| except Exception as e: | |
| with open("app_logs/oauth_debug.txt", "a") as f: | |
| f.write(f"Access Token Error: {str(e)}\n") | |
| raise HTTPException(status_code=400, detail=f"Google Auth Failed: {str(e)}") | |
| user_info = token.get("userinfo") | |
| if not user_info: | |
| try: | |
| user_info = await oauth.google.parse_id_token(request, token) | |
| except Exception as e: | |
| with open("app_logs/oauth_debug.txt", "a") as f: | |
| f.write(f"Parse ID Token Error: {str(e)}\n") | |
| if not user_info: | |
| with open("app_logs/oauth_debug.txt", "a") as f: | |
| f.write("No userinfo found in token.\n") | |
| raise HTTPException(status_code=400, detail="Could not fetch Google profile") | |
| try: | |
| system_jwt = handle_oauth_login(db, user_info) | |
| except Exception as e: | |
| with open("app_logs/oauth_debug.txt", "a") as f: | |
| f.write(f"Handle Login Error: {str(e)}\n") | |
| raise HTTPException(status_code=400, detail=str(e)) | |
| from fastapi.responses import RedirectResponse | |
| import os | |
| frontend_url = os.getenv("FRONTEND_URL", "http://localhost:3000") | |
| # Ensure no trailing slash | |
| if frontend_url.endswith("/"): | |
| frontend_url = frontend_url[:-1] | |
| return RedirectResponse(url=f"{frontend_url}/login?access_token={system_jwt}") | |
| def request_reset( | |
| data: PasswordResetRequest, | |
| background_tasks: BackgroundTasks, | |
| db: Session = Depends(get_db) | |
| ): | |
| raw_token = request_password_reset(db, data.email) | |
| if raw_token: | |
| background_tasks.add_task(send_reset_password_email, data.email, raw_token) | |
| return {"message": "If that email exists, a reset link has been sent."} | |
| def reset_password(data: PasswordResetConfirm, request: Request, db: Session = Depends(get_db)): | |
| success = confirm_password_reset(db, data.token, data.new_password, request) | |
| if not success: | |
| raise HTTPException( | |
| status_code=status.HTTP_400_BAD_REQUEST, | |
| detail="Invalid or expired reset token" | |
| ) | |
| return {"message": "Password successfully reset."} | |
| def verify_email(data: VerifyEmailConfirm, db: Session = Depends(get_db)): | |
| user = confirm_email_verification(db, data.email, data.code) | |
| if not user: | |
| raise HTTPException(status_code=400, detail="Invalid or expired verification code") | |
| token = create_access_token({"user_id": user.id}) | |
| return {"message": "Email successfully verified!", "access_token": token} | |
| def resend_verification_endpoint( | |
| data: ResendVerificationRequest, | |
| background_tasks: BackgroundTasks, | |
| db: Session = Depends(get_db) | |
| ): | |
| raw_token = resend_verification(db, data.email) | |
| if raw_token: | |
| background_tasks.add_task(send_verification_email, data.email, raw_token) | |
| return {"message": "If the account exists and is unverified, a verification code has been sent."} | |
| def get_current_active_user( | |
| current_user = Depends(get_current_user) | |
| ): | |
| return { | |
| "id": current_user.id, | |
| "email": current_user.email, | |
| "name": getattr(current_user, 'username', getattr(current_user, 'email', 'User')), | |
| "is_admin": getattr(current_user, 'is_admin', False), | |
| "avatar_url": getattr(current_user, 'avatar_url', None), | |
| "google_avatar_url": getattr(current_user, 'google_avatar_url', None), | |
| "created_at": current_user.created_at.isoformat() if getattr(current_user, 'created_at', None) else None | |
| } |