anyonehomep1mane
Task CRUD Changes
18d6976
from fastapi import FastAPI, HTTPException, Depends
from fastapi.middleware.cors import CORSMiddleware
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select
from . import schemas, database, models, auth
from passlib.context import CryptContext
import logging
app = FastAPI(
title="Task Service",
root_path="/task",
docs_url="/docs",
openapi_url="/openapi.json"
)
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
@app.on_event("startup")
async def startup():
await database.init_db()
async def get_db():
async with database.AsyncSessionLocal() as session:
yield session
@app.get("/")
def root():
return {
"service": "task",
"message": "Task service is running"
}
@app.post("/tasks/", response_model=schemas.Task)
async def create_task(task: schemas.TaskCreate, db: AsyncSession = Depends(get_db), current_user: dict = Depends(auth.get_current_user)):
logger.info(f"Creating task for user: {current_user['username']}, title: {task.title}")
stmt = select(models.Task).where(models.Task.title == task.title, models.Task.username == task.username)
result = await db.execute(stmt)
if result.scalars().first():
raise HTTPException(status_code=400, detail="Task with this title already exists for the user")
db_task = models.Task(title=task.title, description=task.description, username=task.username)
db.add(db_task)
await db.commit()
await db.refresh(db_task)
logger.info(f"Task created with ID: {db_task.id}")
return db_task
@app.get("/tasks/", response_model=list[schemas.Task])
async def get_tasks(db: AsyncSession = Depends(get_db), current_user: dict = Depends(auth.get_current_user)):
stmt = select(models.Task).where(models.Task.username == current_user['username'])
result = await db.execute(stmt)
return result.scalars().all()
@app.get("/tasks/{task_id}", response_model=schemas.Task)
async def get_task(task_id: int, db: AsyncSession = Depends(get_db), current_user: dict = Depends(auth.get_current_user)):
stmt = select(models.Task).where(models.Task.id == task_id, models.Task.username == current_user['username'])
result = await db.execute(stmt)
task = result.scalars().first()
if not task:
raise HTTPException(status_code=404, detail="Task not found")
return task
@app.put("/tasks/{task_id}", response_model=schemas.Task)
async def update_task(task_id: int, task_update: schemas.TaskCreate, db: AsyncSession = Depends(get_db), current_user: dict = Depends(auth.get_current_user)):
stmt = select(models.Task).where(models.Task.id == task_id, models.Task.username == current_user['username'])
result = await db.execute(stmt)
db_task = result.scalars().first()
if not db_task:
raise HTTPException(status_code=404, detail="Task not found")
db_task.title = task_update.title
db_task.description = task_update.description
await db.commit()
await db.refresh(db_task)
return db_task
@app.delete("/tasks/{task_id}", status_code=204)
async def delete_task(task_id: int, db: AsyncSession = Depends(get_db), current_user: dict = Depends(auth.get_current_user)):
stmt = select(models.Task).where(models.Task.id == task_id, models.Task.username == current_user['username'])
result = await db.execute(stmt)
db_task = result.scalars().first()
if not db_task:
raise HTTPException(status_code=404, detail="Task not found")
await db.delete(db_task)
await db.commit()
return {
"message": "Task deleted."
}