anyonehomep1mane
Task CRUD Changes
18d6976
from fastapi import FastAPI, HTTPException, Depends
from fastapi.middleware.cors import CORSMiddleware
from fastapi.security import OAuth2PasswordRequestForm
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select
from . import schemas, database, models, auth
from passlib.context import CryptContext
import logging
app = FastAPI(
title="Auth Service",
root_path="/auth",
docs_url="/docs",
openapi_url="/openapi.json"
)
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
@app.on_event("startup")
async def startup():
await database.init_db()
async def get_db():
async with database.AsyncSessionLocal() as session:
yield session
@app.get("/")
def root():
return {
"service": "auth",
"message": "Auth service is running"
}
@app.post("/users/", response_model=schemas.User)
async def create_user(user: schemas.UserCreate, db: AsyncSession = Depends(get_db)):
stmt = select(models.User).where(models.User.username == user.username)
result = await db.execute(stmt)
if result.scalars().first():
raise HTTPException(status_code=400, detail="Username already exists")
hashed_password = pwd_context.hash(user.password)
db_user = models.User(username=user.username, hashed_password=hashed_password)
db.add(db_user)
await db.commit()
await db.refresh(db_user)
return db_user
@app.get("/users/", response_model=list[schemas.User])
async def get_users(db: AsyncSession = Depends(get_db), current_user: schemas.User = Depends(auth.get_current_user)):
stmt = select(models.User)
result = await db.execute(stmt)
return result.scalars().all()
@app.get("/users/{username}", response_model=schemas.User)
async def get_user(username: str, db: AsyncSession = Depends(get_db), current_user: schemas.User = Depends(auth.get_current_user)):
stmt = select(models.User).where(models.User.username == username)
result = await db.execute(stmt)
user = result.scalars().first()
if not user:
raise HTTPException(status_code=404, detail="User not found")
return user
@app.put("/users/{username}", response_model=schemas.User)
async def update_user(username: str, user_update: schemas.UserCreate, db: AsyncSession = Depends(get_db), current_user: schemas.User = Depends(auth.get_current_user)):
stmt = select(models.User).where(models.User.username == username)
result = await db.execute(stmt)
db_user = result.scalars().first()
if not db_user:
raise HTTPException(status_code=404, detail="User not found")
db_user.hashed_password = pwd_context.hash(user_update.password)
await db.commit()
await db.refresh(db_user)
return db_user
@app.delete("/users/{username}", status_code=204)
async def delete_user(username: str, db: AsyncSession = Depends(get_db), current_user: schemas.User = Depends(auth.get_current_user)):
stmt = select(models.User).where(models.User.username == username)
result = await db.execute(stmt)
db_user = result.scalars().first()
if not db_user:
raise HTTPException(status_code=404, detail="User not found")
await db.delete(db_user)
await db.commit()
return {
"message": "User deleted."
}
@app.post("/token", response_model=schemas.Token)
async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends(), db: AsyncSession = Depends(get_db)):
stmt = select(models.User).where(models.User.username == form_data.username)
result = await db.execute(stmt)
user = result.scalars().first()
if not user or not pwd_context.verify(form_data.password, user.hashed_password):
raise HTTPException(status_code=401, detail="Incorrect username or password")
access_token = auth.create_access_token(data={"sub": user.username})
return {"access_token": access_token, "token_type": "bearer"}