Spaces:
Runtime error
Runtime error
| """Serverless-compatible FastAPI application for Vercel deployment.""" | |
| import os | |
| import sys | |
| from pathlib import Path | |
| # Add src to path for imports | |
| src_path = Path(__file__).parent.parent / "src" | |
| sys.path.insert(0, str(src_path)) | |
| from contextlib import asynccontextmanager | |
| from datetime import datetime | |
| from typing import Optional | |
| from fastapi import FastAPI, Depends, HTTPException, Query | |
| from fastapi.middleware.cors import CORSMiddleware | |
| from sqlalchemy.orm import Session | |
| # Import components - delay heavy imports until needed to avoid initialization issues | |
| def get_database_components(): | |
| from database import get_db, engine, Base | |
| return get_db, engine, Base | |
| def get_models(): | |
| from models.todo import Todo | |
| from models.user import User | |
| return Todo, User | |
| def get_schemas(): | |
| from schemas.todo import TodoCreate, TodoUpdate, TodoResponse, TodoListResponse | |
| return TodoCreate, TodoUpdate, TodoResponse, TodoListResponse | |
| def get_services(): | |
| from services.todo_service import TodoService | |
| return TodoService | |
| def get_routers(): | |
| from api.routes.auth import router as auth_router | |
| from api.chat_router import router as chat_router | |
| from api.task_router import router as task_router | |
| return auth_router, chat_router, task_router | |
| def get_middlewares(): | |
| from middleware.auth import get_current_user | |
| return get_current_user | |
| app = FastAPI( | |
| title="Todo API", | |
| description="REST API for managing todo items with extended features", | |
| version="2.0.0" | |
| ) | |
| # Configure CORS for production - allow your frontend domain | |
| # For production, replace with your actual frontend URL | |
| frontend_url = os.getenv("FRONTEND_URL", "https://localhost:3000") | |
| allow_origins = [frontend_url] | |
| # Add localhost origins for development if in development mode | |
| environment = os.getenv("ENVIRONMENT", "development") | |
| if environment.lower() != "production": | |
| allow_origins.extend([ | |
| "http://localhost:3000", | |
| "http://localhost:3001", | |
| "http://localhost:3002", | |
| "http://localhost:3006", | |
| "http://localhost:3007" | |
| ]) | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=allow_origins, | |
| allow_credentials=True, | |
| allow_methods=["*"], | |
| allow_headers=["*"], | |
| ) | |
| # Include routers | |
| auth_router, chat_router, task_router = get_routers() | |
| app.include_router(auth_router, prefix="/api/v1") | |
| app.include_router(chat_router) | |
| app.include_router(task_router, prefix="/api/v1") | |
| def _calculate_overdue(todo, datetime_module=datetime): | |
| """Calculate if a todo is overdue.""" | |
| if todo.due_date is None or todo.completed: | |
| return False | |
| return todo.due_date < datetime_module.utcnow() | |
| def _todo_to_response(todo): | |
| """Convert Todo model to TodoResponse schema.""" | |
| TodoResponse = get_schemas()[2] # Get TodoResponse from schemas | |
| response_data = { | |
| "id": todo.id, | |
| "user_id": todo.user_id, | |
| "title": todo.title, | |
| "description": todo.description, | |
| "completed": todo.completed, | |
| "priority": todo.priority, | |
| "tags": todo.tags, | |
| "due_date": todo.due_date, | |
| "recurrence": todo.recurrence, | |
| "created_at": todo.created_at, | |
| "updated_at": todo.updated_at, | |
| "overdue": _calculate_overdue(todo) | |
| } | |
| return TodoResponse(**response_data) | |
| # Will set response_model dynamically | |
| def get_todos( | |
| db: Session = Depends(lambda: get_database_components()[0]()), | |
| current_user: object = Depends(lambda: get_middlewares()[0]()), | |
| search: Optional[str] = Query(None, description="Search in title and description"), | |
| status: Optional[str] = Query(None, pattern="^(completed|pending)$", description="Filter by status"), | |
| priority: Optional[str] = Query(None, pattern="^(low|medium|high)$", description="Filter by priority"), | |
| due_before: Optional[datetime] = Query(None, description="Filter todos due before this date"), | |
| due_after: Optional[datetime] = Query(None, description="Filter todos due after this date"), | |
| tag: Optional[str] = Query(None, description="Filter by specific tag"), | |
| sort_by: Optional[str] = Query("created_at", pattern="^(created_at|due_date|priority|title)$", description="Sort field"), | |
| sort_order: Optional[str] = Query("desc", pattern="^(asc|desc)$", description="Sort order"), | |
| ): | |
| """Get all todos with optional search, filtering, and sorting.""" | |
| TodoListResponse = get_schemas()[3] # Get TodoListResponse from schemas | |
| app.router.routes[-1].response_model = TodoListResponse # Set response model dynamically | |
| TodoService = get_services()[0] # Get TodoService from services | |
| service = TodoService(db, user_id=current_user.id) | |
| todos = service.get_all( | |
| search=search, | |
| status=status, | |
| priority=priority, | |
| due_before=due_before, | |
| due_after=due_after, | |
| tag=tag, | |
| sort_by=sort_by, | |
| sort_order=sort_order | |
| ) | |
| # Convert to response schema with overdue calculation | |
| todos_response = [_todo_to_response(todo) for todo in todos] | |
| return { | |
| "todos": todos_response, | |
| "count": len(todos_response), | |
| "has_more": False # Pagination not implemented yet | |
| } | |
| def create_todo( | |
| todo_data: object, # Will be validated dynamically | |
| db: Session = Depends(lambda: get_database_components()[0]()), | |
| current_user: object = Depends(lambda: get_middlewares()[0]()) | |
| ): | |
| """Create a new todo with optional extended fields.""" | |
| TodoResponse = get_schemas()[2] # Get TodoResponse from schemas | |
| app.router.routes[-1].response_model = TodoResponse # Set response model dynamically | |
| try: | |
| TodoService = get_services()[0] # Get TodoService from services | |
| service = TodoService(db, user_id=current_user.id) | |
| todo = service.create(todo_data) | |
| return _todo_to_response(todo) | |
| except ValueError as e: | |
| raise HTTPException(status_code=400, detail=str(e)) | |
| def get_todo( | |
| todo_id: int, | |
| db: Session = Depends(lambda: get_database_components()[0]()), | |
| current_user: object = Depends(lambda: get_middlewares()[0]()) | |
| ): | |
| """Get a single todo by ID.""" | |
| TodoResponse = get_schemas()[2] # Get TodoResponse from schemas | |
| app.router.routes[-1].response_model = TodoResponse # Set response model dynamically | |
| TodoService = get_services()[0] # Get TodoService from services | |
| service = TodoService(db, user_id=current_user.id) | |
| todo = service.get_by_id(todo_id) | |
| if not todo: | |
| raise HTTPException(status_code=404, detail="Todo not found") | |
| return _todo_to_response(todo) | |
| def update_todo( | |
| todo_id: int, | |
| todo_data: object, # Will be validated dynamically | |
| db: Session = Depends(lambda: get_database_components()[0]()), | |
| current_user: object = Depends(lambda: get_middlewares()[0]()) | |
| ): | |
| """Update a todo with optional extended fields.""" | |
| TodoResponse = get_schemas()[2] # Get TodoResponse from schemas | |
| app.router.routes[-1].response_model = TodoResponse # Set response model dynamically | |
| try: | |
| TodoService = get_services()[0] # Get TodoService from services | |
| service = TodoService(db, user_id=current_user.id) | |
| todo = service.update(todo_id, todo_data) | |
| if not todo: | |
| raise HTTPException(status_code=404, detail="Todo not found") | |
| return _todo_to_response(todo) | |
| except ValueError as e: | |
| raise HTTPException(status_code=400, detail=str(e)) | |
| def delete_todo( | |
| todo_id: int, | |
| db: Session = Depends(lambda: get_database_components()[0]()), | |
| current_user: object = Depends(lambda: get_middlewares()[0]()) | |
| ): | |
| """Delete a todo.""" | |
| TodoService = get_services()[0] # Get TodoService from services | |
| service = TodoService(db, user_id=current_user.id) | |
| if not service.delete(todo_id): | |
| raise HTTPException(status_code=404, detail="Todo not found") | |
| return {"message": "Todo deleted successfully"} | |
| def mark_todo_complete( | |
| todo_id: int, | |
| db: Session = Depends(lambda: get_database_components()[0]()), | |
| current_user: object = Depends(lambda: get_middlewares()[0]()) | |
| ): | |
| """ | |
| Mark a todo as complete. | |
| If the todo has a recurrence pattern, automatically creates the next instance. | |
| Returns both the completed task and the next task (if applicable). | |
| """ | |
| TodoService = get_services()[0] # Get TodoService from services | |
| service = TodoService(db, user_id=current_user.id) | |
| result = service.mark_complete(todo_id) | |
| if result is None: | |
| raise HTTPException(status_code=404, detail="Todo not found") | |
| Todo, _ = get_models() # Get Todo model | |
| TodoResponse = get_schemas()[2] # Get TodoResponse from schemas | |
| completed_task, next_task = result | |
| response = { | |
| "completed_task": _todo_to_response(completed_task) | |
| } | |
| if next_task: | |
| response["next_task"] = _todo_to_response(next_task) | |
| return response | |
| def get_todos_due_soon( | |
| hours: int = Query(1, ge=1, le=24, description="Number of hours to look ahead"), | |
| db: Session = Depends(lambda: get_database_components()[0]()), | |
| current_user: object = Depends(lambda: get_middlewares()[0]()) | |
| ): | |
| """Get todos due within the specified hours for reminder notifications.""" | |
| TodoService = get_services()[0] # Get TodoService from services | |
| service = TodoService(db, user_id=current_user.id) | |
| todos = service.get_todos_due_soon(hours=hours) | |
| todos_response = [_todo_to_response(todo) for todo in todos] | |
| return {"todos": todos_response, "count": len(todos_response)} | |
| def root(): | |
| """Root endpoint for health check.""" | |
| return {"message": "Todo API is running", "status": "healthy"} | |
| def health_check(): | |
| """Health check endpoint.""" | |
| return {"status": "healthy", "timestamp": datetime.now()} |