Spaces:
Running
Running
| from fastapi import FastAPI, HTTPException, Depends | |
| from fastapi.middleware.cors import CORSMiddleware | |
| from sqlalchemy.ext.asyncio import AsyncSession | |
| from sqlalchemy import select | |
| from . import schemas, database, models, auth | |
| from passlib.context import CryptContext | |
| import logging | |
| app = FastAPI( | |
| title="Task Service", | |
| root_path="/task", | |
| docs_url="/docs", | |
| openapi_url="/openapi.json" | |
| ) | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=["*"], | |
| allow_credentials=True, | |
| allow_methods=["*"], | |
| allow_headers=["*"], | |
| ) | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto") | |
| async def startup(): | |
| await database.init_db() | |
| async def get_db(): | |
| async with database.AsyncSessionLocal() as session: | |
| yield session | |
| def root(): | |
| return { | |
| "service": "task", | |
| "message": "Task service is running" | |
| } | |
| async def create_task(task: schemas.TaskCreate, db: AsyncSession = Depends(get_db), current_user: dict = Depends(auth.get_current_user)): | |
| logger.info(f"Creating task for user: {current_user['username']}, title: {task.title}") | |
| stmt = select(models.Task).where(models.Task.title == task.title, models.Task.username == task.username) | |
| result = await db.execute(stmt) | |
| if result.scalars().first(): | |
| raise HTTPException(status_code=400, detail="Task with this title already exists for the user") | |
| db_task = models.Task(title=task.title, description=task.description, username=task.username) | |
| db.add(db_task) | |
| await db.commit() | |
| await db.refresh(db_task) | |
| logger.info(f"Task created with ID: {db_task.id}") | |
| return db_task | |
| async def get_tasks(db: AsyncSession = Depends(get_db), current_user: dict = Depends(auth.get_current_user)): | |
| stmt = select(models.Task).where(models.Task.username == current_user['username']) | |
| result = await db.execute(stmt) | |
| return result.scalars().all() | |
| async def get_task(task_id: int, db: AsyncSession = Depends(get_db), current_user: dict = Depends(auth.get_current_user)): | |
| stmt = select(models.Task).where(models.Task.id == task_id, models.Task.username == current_user['username']) | |
| result = await db.execute(stmt) | |
| task = result.scalars().first() | |
| if not task: | |
| raise HTTPException(status_code=404, detail="Task not found") | |
| return task | |
| async def update_task(task_id: int, task_update: schemas.TaskCreate, db: AsyncSession = Depends(get_db), current_user: dict = Depends(auth.get_current_user)): | |
| stmt = select(models.Task).where(models.Task.id == task_id, models.Task.username == current_user['username']) | |
| result = await db.execute(stmt) | |
| db_task = result.scalars().first() | |
| if not db_task: | |
| raise HTTPException(status_code=404, detail="Task not found") | |
| db_task.title = task_update.title | |
| db_task.description = task_update.description | |
| await db.commit() | |
| await db.refresh(db_task) | |
| return db_task | |
| async def delete_task(task_id: int, db: AsyncSession = Depends(get_db), current_user: dict = Depends(auth.get_current_user)): | |
| stmt = select(models.Task).where(models.Task.id == task_id, models.Task.username == current_user['username']) | |
| result = await db.execute(stmt) | |
| db_task = result.scalars().first() | |
| if not db_task: | |
| raise HTTPException(status_code=404, detail="Task not found") | |
| await db.delete(db_task) | |
| await db.commit() | |
| return { | |
| "message": "Task deleted." | |
| } |