Spaces:
Sleeping
Sleeping
| """ | |
| app/routes/auth.py | |
| Auth endpoints: OTP request, OTP verify, logout, profile. | |
| Wraps app/services/auth.py logic. | |
| """ | |
| import logging | |
| from fastapi import APIRouter, Request, Form, HTTPException | |
| from fastapi.responses import JSONResponse | |
| from app.services.auth import ( | |
| send_email_otp, verify_email_otp, | |
| create_session, revoke_session, get_user_from_token, | |
| ) | |
| from app.models.db import db_conn | |
| logger = logging.getLogger(__name__) | |
| router = APIRouter(prefix="/auth", tags=["auth"]) | |
| async def request_otp(email: str = Form(...)): | |
| """Send a 6-digit OTP to the user's email. Dev mode: returns OTP in response.""" | |
| try: | |
| otp = send_email_otp(email) | |
| return JSONResponse({ | |
| "sent" : True, | |
| "message": "OTP sent. Check your email.", | |
| # Remove next line in production — only for dev/HF testing | |
| "_dev_otp": otp, | |
| }) | |
| except Exception as exc: | |
| logger.error("OTP request failed: %s", exc) | |
| raise HTTPException(status_code=500, detail="Could not send OTP") | |
| async def verify_otp( | |
| request: Request, | |
| email: str = Form(...), | |
| otp : str = Form(...), | |
| ): | |
| """Verify OTP. Returns session token on success.""" | |
| user = verify_email_otp(email, otp) | |
| if not user: | |
| raise HTTPException(status_code=401, detail="Invalid or expired OTP") | |
| device_hint = request.headers.get("user-agent", "")[:100] | |
| token = create_session(user["id"], device_hint) | |
| return JSONResponse({ | |
| "token" : token, | |
| "user_id" : user["id"], | |
| "email" : user.get("email", ""), | |
| "is_pro" : bool(user.get("is_pro", 0)), | |
| "message" : "Login successful", | |
| }) | |
| async def logout(request: Request): | |
| """Revoke the current session token.""" | |
| auth = request.headers.get("Authorization", "") | |
| token = auth.removeprefix("Bearer ").strip() if auth.startswith("Bearer ") else None | |
| if token: | |
| revoke_session(token) | |
| return JSONResponse({"logged_out": True}) | |
| async def get_me(request: Request): | |
| """Return current user profile from Bearer token.""" | |
| auth = request.headers.get("Authorization", "") | |
| token = auth.removeprefix("Bearer ").strip() if auth.startswith("Bearer ") else None | |
| user = get_user_from_token(token) if token else None | |
| if not user: | |
| raise HTTPException(status_code=401, detail="Not authenticated") | |
| return JSONResponse({ | |
| "user_id" : user["id"], | |
| "email" : user.get("email", ""), | |
| "name" : user.get("name", ""), | |
| "is_pro" : bool(user.get("is_pro", 0)), | |
| "streak_days": user.get("streak_days", 0), | |
| "persona" : user.get("persona", "General Adult"), | |
| "language" : user.get("language", "en"), | |
| }) | |
| async def update_profile( | |
| request : Request, | |
| name : str = Form(""), | |
| persona : str = Form("General Adult"), | |
| language: str = Form("en"), | |
| tdee : float = Form(2000), | |
| ): | |
| """Update user profile fields.""" | |
| auth = request.headers.get("Authorization", "") | |
| token = auth.removeprefix("Bearer ").strip() if auth.startswith("Bearer ") else None | |
| user = get_user_from_token(token) if token else None | |
| if not user: | |
| raise HTTPException(status_code=401, detail="Not authenticated") | |
| with db_conn() as conn: | |
| conn.execute( | |
| "UPDATE users SET name=?, persona=?, language=?, tdee=? WHERE id=?", | |
| (name, persona, language, tdee, user["id"]) | |
| ) | |
| return JSONResponse({"updated": True}) | |