| import bcrypt |
| import smtplib |
| import os |
| from datetime import datetime, timedelta |
| from email.mime.text import MIMEText |
| from fastapi import Depends, HTTPException, status |
| from fastapi.security import OAuth2PasswordBearer |
| from jose import jwt, JWTError |
| from sqlalchemy.orm import Session |
|
|
| from config import SECRET_KEY, ALGORITHM, ACCESS_TOKEN_EXPIRE_MINUTES, REFRESH_TOKEN_EXPIRE_DAYS, SMTP_SERVER, SMTP_PORT, SMTP_USER, SMTP_PASSWORD |
| from database import get_db |
| from models import User, RevokedToken |
|
|
| oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/auth/login") |
|
|
| |
|
|
| def hash_password(password: str) -> str: |
| salt = bcrypt.gensalt() |
| return bcrypt.hashpw(password.encode('utf-8'), salt).decode('utf-8') |
|
|
| def check_password(password: str, hashed_password: str) -> bool: |
| try: |
| return bcrypt.checkpw(password.encode('utf-8'), hashed_password.encode('utf-8')) |
| except Exception: |
| return False |
|
|
| |
|
|
| def make_access_token(user_id: int, email: str) -> str: |
| expire = datetime.utcnow() + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES) |
| data = {"sub": str(user_id), "email": email, "type": "access", "exp": expire} |
| return jwt.encode(data, SECRET_KEY, algorithm=ALGORITHM) |
|
|
| def make_refresh_token(user_id: int) -> str: |
| expire = datetime.utcnow() + timedelta(days=REFRESH_TOKEN_EXPIRE_DAYS) |
| data = {"sub": str(user_id), "type": "refresh", "exp": expire} |
| return jwt.encode(data, SECRET_KEY, algorithm=ALGORITHM) |
|
|
| def get_current_user(token: str = Depends(oauth2_scheme), db: Session = Depends(get_db)) -> User: |
| credentials_exception = HTTPException( |
| status_code=status.HTTP_401_UNAUTHORIZED, |
| detail="Could not validate credentials", |
| headers={"WWW-Authenticate": "Bearer"}, |
| ) |
|
|
| revoked = db.query(RevokedToken).filter(RevokedToken.token == token).first() |
| if revoked: |
| raise HTTPException( |
| status_code=status.HTTP_401_UNAUTHORIZED, |
| detail="Session has expired or you logged out. Please sign in again.", |
| headers={"WWW-Authenticate": "Bearer"}, |
| ) |
|
|
| try: |
| payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) |
| if payload.get("type") != "access": |
| raise HTTPException(status_code=401, detail="Invalid token type") |
| user_id: str = payload.get("sub") |
| if user_id is None: |
| raise credentials_exception |
| except JWTError: |
| raise credentials_exception |
|
|
| user = db.query(User).filter(User.id == int(user_id)).first() |
| if user is None: |
| raise HTTPException(status_code=404, detail="User not found") |
| if not user.is_active: |
| raise HTTPException(status_code=400, detail="User account is inactive") |
| if not user.is_verified: |
| raise HTTPException(status_code=400, detail="User email is not verified yet") |
| |
| return user |
|
|
| |
|
|
| def send_email(to_email: str, subject: str, body: str) -> bool: |
| """ |
| Sends an email using built-in smtplib. |
| If SMTP variables are not set in the .env, logs a block in the console so it can still be tested. |
| """ |
| |
| if not all([SMTP_SERVER, SMTP_USER, SMTP_PASSWORD]): |
| print(f"\nββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ") |
| print(f"β [SIMULATION] Email Sent To: {to_email:<27} β") |
| print(f"β Subject: {subject:<44} β") |
| print(f"β Code/OTP: {body:<44} β") |
| print(f"ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ\n") |
| return True |
|
|
| try: |
| msg = MIMEText(body) |
| msg["Subject"] = subject |
| msg["From"] = SMTP_USER |
| msg["To"] = to_email |
|
|
| |
| with smtplib.SMTP(SMTP_SERVER, SMTP_PORT, timeout=5) as server: |
| server.starttls() |
| server.login(SMTP_USER, SMTP_PASSWORD) |
| server.send_message(msg) |
| return True |
| except Exception as e: |
| |
| print(f"\n[SMTP ERROR] Sending failed: {e}") |
| print(f"ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ") |
| print(f"β [SIMULATION FALLBACK] Email To: {to_email:<22} β") |
| print(f"β Subject: {subject:<44} β") |
| print(f"β Code/OTP: {body:<44} β") |
| print(f"ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ\n") |
| return False |
|
|