CPS-API / app.py
Ali2206's picture
Update app.py
3e4fae7 verified
raw
history blame
2.85 kB
from fastapi import FastAPI, HTTPException, Depends
from fastapi.middleware.cors import CORSMiddleware
from fastapi.security import OAuth2PasswordRequestForm
from pydantic import BaseModel, EmailStr
from jose import JWTError, jwt
from passlib.context import CryptContext
from motor.motor_asyncio import AsyncIOMotorClient
import certifi
import os
import datetime
# === Environment and config ===
SECRET_KEY = os.getenv("SECRET_KEY", "your-secret-key")
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30
MONGO_URI = os.getenv("MONGO_URI")
if not MONGO_URI:
raise RuntimeError("MONGO_URI not set")
# === MongoDB client ===
client = AsyncIOMotorClient(MONGO_URI, tls=True, tlsCAFile=certifi.where())
db = client["cps_db"]
users_collection = db["users"]
# === App setup ===
app = FastAPI()
app.add_middleware(
CORSMiddleware,
allow_origins=["*"], # Replace with frontend origin in prod
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# === Password hashing ===
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
def verify_password(plain: str, hashed: str) -> bool:
return pwd_context.verify(plain, hashed)
def hash_password(password: str) -> str:
return pwd_context.hash(password)
# === JWT ===
def create_access_token(data: dict, expires_delta: datetime.timedelta = None):
to_encode = data.copy()
expire = datetime.datetime.utcnow() + (expires_delta or datetime.timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES))
to_encode.update({"exp": expire})
return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
# === Schemas ===
class SignupForm(BaseModel):
email: EmailStr
password: str
class TokenResponse(BaseModel):
access_token: str
token_type: str
# === Routes ===
@app.post("/signup")
async def signup(data: SignupForm):
email = data.email.lower().strip()
existing = await users_collection.find_one({"email": email})
if existing:
raise HTTPException(status_code=409, detail="Email already exists")
hashed_pw = hash_password(data.password)
await users_collection.insert_one({"email": email, "password": hashed_pw})
return {"success": True, "message": "Account created successfully"}
@app.post("/login", response_model=TokenResponse)
async def login(form_data: OAuth2PasswordRequestForm = Depends()):
email = form_data.username.lower().strip()
user = await users_collection.find_one({"email": email})
if not user or not verify_password(form_data.password, user["password"]):
raise HTTPException(status_code=401, detail="Invalid credentials")
access_token = create_access_token(data={"sub": user["email"]})
return {"access_token": access_token, "token_type": "bearer"}
@app.get("/")
def root():
return {"message": "✅ Auth-secured FastAPI + MongoDB backend is running"}