from fastapi import FastAPI, HTTPException from pydantic import BaseModel from sqlalchemy import create_engine, Column, Integer, String from sqlalchemy.ext.declarative import declarative_base from sqlalchemy.orm import sessionmaker import bcrypt import os import ssl # Database setup DATABASE_URL = os.getenv("DATABASE_URL") ssl_cert = os.getenv("MYSQL_CA_CERT") ssl_context = ssl.create_default_context(cadata=ssl_cert) engine = create_engine(DATABASE_URL) Base = declarative_base() SessionLocal = sessionmaker(bind=engine, autoflush=False, autocommit=False) # User model class User(Base): __tablename__ = "Users" id = Column(Integer, primary_key=True, index=True) name = Column(String(100), nullable=False) email = Column(String(100), unique=True, nullable=False) hashed_password = Column(String(255), nullable=False) Base.metadata.create_all(bind=engine) class UserCreate(BaseModel): username: str email: str password: str class UserLogin(BaseModel): email: str password: str # FastAPI app app = FastAPI() @app.post("/register") def register(user: UserCreate): db = SessionLocal() try: # Check if email already exists existing_user = db.query(User).filter(User.email == user.email).first() if existing_user: raise HTTPException(status_code=400, detail="Email already registered") # Hash password (user sends plain password, we hash it) hashed_password = bcrypt.hashpw(user.password.encode('utf-8'), bcrypt.gensalt()) # Create new user (map username to name field in database) new_user = User( name=user.username, # Map username to name email=user.email, hashed_password=hashed_password.decode('utf-8') ) db.add(new_user) db.commit() db.refresh(new_user) return {"message": "User created successfully", "username": new_user.name} finally: db.close() @app.post("/login") def login(user: UserLogin): db = SessionLocal() try: # Find user by email db_user = db.query(User).filter(User.email == user.email).first() if not db_user: raise HTTPException(status_code=400, detail="User not found") # Verify password (user sends plain password, we check against hash) if not bcrypt.checkpw(user.password.encode('utf-8'), db_user.hashed_password.encode('utf-8')): raise HTTPException(status_code=400, detail="Incorrect password") return {"message": "Login successful", "username": db_user.name} finally: db.close()