AidMateAccount / app.py
TahaFawzyElshrif
modified error in schema
c9818d0
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from sqlalchemy import create_engine, Column, Integer, String
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
import bcrypt
import os
import ssl
# Database setup
DATABASE_URL = os.getenv("DATABASE_URL")
ssl_cert = os.getenv("MYSQL_CA_CERT")
ssl_context = ssl.create_default_context(cadata=ssl_cert)
engine = create_engine(DATABASE_URL)
Base = declarative_base()
SessionLocal = sessionmaker(bind=engine, autoflush=False, autocommit=False)
# User model
class User(Base):
__tablename__ = "Users"
id = Column(Integer, primary_key=True, index=True)
name = Column(String(100), nullable=False)
email = Column(String(100), unique=True, nullable=False)
hashed_password = Column(String(255), nullable=False)
Base.metadata.create_all(bind=engine)
class UserCreate(BaseModel):
username: str
email: str
password: str
class UserLogin(BaseModel):
email: str
password: str
# FastAPI app
app = FastAPI()
@app.post("/register")
def register(user: UserCreate):
db = SessionLocal()
try:
# Check if email already exists
existing_user = db.query(User).filter(User.email == user.email).first()
if existing_user:
raise HTTPException(status_code=400, detail="Email already registered")
# Hash password (user sends plain password, we hash it)
hashed_password = bcrypt.hashpw(user.password.encode('utf-8'), bcrypt.gensalt())
# Create new user (map username to name field in database)
new_user = User(
name=user.username, # Map username to name
email=user.email,
hashed_password=hashed_password.decode('utf-8')
)
db.add(new_user)
db.commit()
db.refresh(new_user)
return {"message": "User created successfully", "username": new_user.name}
finally:
db.close()
@app.post("/login")
def login(user: UserLogin):
db = SessionLocal()
try:
# Find user by email
db_user = db.query(User).filter(User.email == user.email).first()
if not db_user:
raise HTTPException(status_code=400, detail="User not found")
# Verify password (user sends plain password, we check against hash)
if not bcrypt.checkpw(user.password.encode('utf-8'), db_user.hashed_password.encode('utf-8')):
raise HTTPException(status_code=400, detail="Incorrect password")
return {"message": "Login successful", "username": db_user.name}
finally:
db.close()