Spaces:
Running
Running
| from fastapi import FastAPI, HTTPException, Depends | |
| from fastapi.middleware.cors import CORSMiddleware | |
| from fastapi.security import OAuth2PasswordRequestForm | |
| from sqlalchemy.ext.asyncio import AsyncSession | |
| from sqlalchemy import select | |
| from . import schemas, database, models, auth | |
| from passlib.context import CryptContext | |
| import logging | |
| app = FastAPI( | |
| title="Auth Service", | |
| root_path="/auth", | |
| docs_url="/docs", | |
| openapi_url="/openapi.json" | |
| ) | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=["*"], | |
| allow_credentials=True, | |
| allow_methods=["*"], | |
| allow_headers=["*"], | |
| ) | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto") | |
| async def startup(): | |
| await database.init_db() | |
| async def get_db(): | |
| async with database.AsyncSessionLocal() as session: | |
| yield session | |
| def root(): | |
| return { | |
| "service": "auth", | |
| "message": "Auth service is running" | |
| } | |
| async def create_user(user: schemas.UserCreate, db: AsyncSession = Depends(get_db)): | |
| stmt = select(models.User).where(models.User.username == user.username) | |
| result = await db.execute(stmt) | |
| if result.scalars().first(): | |
| raise HTTPException(status_code=400, detail="Username already exists") | |
| hashed_password = pwd_context.hash(user.password) | |
| db_user = models.User(username=user.username, hashed_password=hashed_password) | |
| db.add(db_user) | |
| await db.commit() | |
| await db.refresh(db_user) | |
| return db_user | |
| async def get_users(db: AsyncSession = Depends(get_db), current_user: schemas.User = Depends(auth.get_current_user)): | |
| stmt = select(models.User) | |
| result = await db.execute(stmt) | |
| return result.scalars().all() | |
| async def get_user(username: str, db: AsyncSession = Depends(get_db), current_user: schemas.User = Depends(auth.get_current_user)): | |
| stmt = select(models.User).where(models.User.username == username) | |
| result = await db.execute(stmt) | |
| user = result.scalars().first() | |
| if not user: | |
| raise HTTPException(status_code=404, detail="User not found") | |
| return user | |
| async def update_user(username: str, user_update: schemas.UserCreate, db: AsyncSession = Depends(get_db), current_user: schemas.User = Depends(auth.get_current_user)): | |
| stmt = select(models.User).where(models.User.username == username) | |
| result = await db.execute(stmt) | |
| db_user = result.scalars().first() | |
| if not db_user: | |
| raise HTTPException(status_code=404, detail="User not found") | |
| db_user.hashed_password = pwd_context.hash(user_update.password) | |
| await db.commit() | |
| await db.refresh(db_user) | |
| return db_user | |
| async def delete_user(username: str, db: AsyncSession = Depends(get_db), current_user: schemas.User = Depends(auth.get_current_user)): | |
| stmt = select(models.User).where(models.User.username == username) | |
| result = await db.execute(stmt) | |
| db_user = result.scalars().first() | |
| if not db_user: | |
| raise HTTPException(status_code=404, detail="User not found") | |
| await db.delete(db_user) | |
| await db.commit() | |
| return { | |
| "message": "User deleted." | |
| } | |
| async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends(), db: AsyncSession = Depends(get_db)): | |
| stmt = select(models.User).where(models.User.username == form_data.username) | |
| result = await db.execute(stmt) | |
| user = result.scalars().first() | |
| if not user or not pwd_context.verify(form_data.password, user.hashed_password): | |
| raise HTTPException(status_code=401, detail="Incorrect username or password") | |
| access_token = auth.create_access_token(data={"sub": user.username}) | |
| return {"access_token": access_token, "token_type": "bearer"} |