Spaces:
Runtime error
Runtime error
| from fastapi import APIRouter, HTTPException, Query, Path | |
| from pydantic import BaseModel | |
| from typing import List, Optional | |
| from uuid import UUID | |
| from datetime import datetime | |
| from src.config import logger | |
| from src.services import LetterService | |
| from src.models import LetterType | |
| class Letter(BaseModel): | |
| id: UUID | |
| proposal_id: UUID | |
| letter: str | |
| letter_type: LetterType | |
| created_at: datetime | |
| updated_at: datetime | |
| class Response(BaseModel): | |
| status: str | |
| data: List[Letter] | |
| class LetterRequest(BaseModel): | |
| proposal_id: UUID | |
| letter: str | |
| letter_type: LetterType | |
| class CreateLetterRequest(BaseModel): | |
| letter: str | |
| letter_type: LetterType | |
| class UpdateLetterRequest(BaseModel): | |
| proposal_id: Optional[UUID] = None | |
| letter: Optional[str] = None | |
| letter_type: Optional[LetterType] = None | |
| class UpdateLetterRequestByProposalAndId(BaseModel): | |
| letter: Optional[str] = None | |
| letter_type: Optional[LetterType] = None | |
| class DeleteResponse(BaseModel): | |
| status: str | |
| class LetterController: | |
| def __init__(self): | |
| self.letter_service = LetterService | |
| self.router = APIRouter() | |
| # self.router.add_api_route( | |
| # "/letters", | |
| # self.get_letters, | |
| # methods=["GET"], | |
| # response_model=Response, | |
| # tags=["Letters"], | |
| # ) | |
| # self.router.add_api_route( | |
| # "/letters/{id}", | |
| # self.get_letters_by_id, | |
| # methods=["GET"], | |
| # response_model=Response, | |
| # tags=["Letters by ID"], | |
| # ) | |
| self.router.add_api_route( | |
| "/rfps/{rfp_id}/proposals/{proposal_id}/letters", | |
| self.get_letters_by_proposal_id, | |
| methods=["GET"], | |
| response_model=Response, | |
| tags=["Letters by Proposal ID"], | |
| ) | |
| self.router.add_api_route( | |
| "/rfps/{rfp_id}/proposals/{proposal_id}/letters/{id}", | |
| self.get_letters_by_proposal_and_id, | |
| methods=["GET"], | |
| response_model=Response, | |
| tags=["Letters by Proposal and ID"], | |
| ) | |
| # self.router.add_api_route( | |
| # "/letters", | |
| # self.create_letter, | |
| # methods=["POST"], | |
| # response_model=Response, | |
| # tags=["Letters"], | |
| # ) | |
| self.router.add_api_route( | |
| "/rfps/{rfp_id}/proposals/{proposal_id}/letters", | |
| self.create_letters_by_proposal_id, | |
| methods=["POST"], | |
| response_model=Response, | |
| tags=["Letters by Proposal ID"], | |
| ) | |
| # self.router.add_api_route( | |
| # "/letters/{id}", | |
| # self.update_letter, | |
| # methods=["PUT"], | |
| # response_model=Response, | |
| # tags=["Letters by ID"], | |
| # ) | |
| self.router.add_api_route( | |
| "/rfps/{rfp_id}/proposals/{proposal_id}/letters/{id}", | |
| self.update_letters_by_proposal_and_id, | |
| methods=["PUT"], | |
| response_model=Response, | |
| tags=["Letters by Proposal and ID"], | |
| ) | |
| # self.router.add_api_route( | |
| # "/letters/{id}", | |
| # self.delete_letter, | |
| # methods=["DELETE"], | |
| # response_model=DeleteResponse, | |
| # tags=["Letters by ID"], | |
| # ) | |
| self.router.add_api_route( | |
| "/rfps/{rfp_id}/proposals/{proposal_id}/letters", | |
| self.delete_letters_by_proposal_id, | |
| methods=["DELETE"], | |
| response_model=DeleteResponse, | |
| tags=["Letters by Proposal ID"], | |
| ) | |
| self.router.add_api_route( | |
| "/rfps/{rfp_id}/proposals/{proposal_id}/letters/{id}", | |
| self.delete_letters_by_proposal_and_id, | |
| methods=["DELETE"], | |
| response_model=DeleteResponse, | |
| tags=["Letters by Proposal and ID"], | |
| ) | |
| async def get_letters(self): | |
| try: | |
| async with self.letter_service() as service: | |
| result = await service.get_letters() | |
| return Response(status="success", data=[Letter(**r) for r in result]) | |
| except Exception as e: | |
| logger.error(e) | |
| raise HTTPException(status_code=500, detail="Error fetching letters") | |
| async def get_letters_by_id(self, id: str = Path(...)): | |
| try: | |
| async with self.letter_service() as service: | |
| result = await service.get_letters(id=id) | |
| return Response(status="success", data=[Letter(**r) for r in result]) | |
| except Exception as e: | |
| logger.error(e) | |
| raise HTTPException(status_code=500, detail="Error fetching letter") | |
| async def get_letters_by_proposal_id( | |
| self, | |
| proposal_id: str = Path(...), | |
| letter_type: LetterType = Query(None), | |
| rfp_id: str = Path(...), | |
| ): | |
| try: | |
| async with self.letter_service() as service: | |
| result = await service.get_letters( | |
| proposal_id=proposal_id, letter_type=letter_type | |
| ) | |
| return Response(status="success", data=[Letter(**r) for r in result]) | |
| except Exception as e: | |
| logger.error(e) | |
| raise HTTPException(status_code=500, detail="Error fetching letter") | |
| async def get_letters_by_proposal_and_id( | |
| self, proposal_id: str = Path(...), id: str = Path(...), rfp_id: str = Path(...) | |
| ): | |
| try: | |
| async with self.letter_service() as service: | |
| result = await service.get_letters(proposal_id=proposal_id, id=id) | |
| return Response(status="success", data=[Letter(**r) for r in result]) | |
| except Exception as e: | |
| logger.error(e) | |
| raise HTTPException(status_code=500, detail="Error fetching letter") | |
| async def create_letter(self, letter: LetterRequest): | |
| try: | |
| async with self.letter_service() as service: | |
| result = await service.create_letter( | |
| letter.model_dump(exclude_unset=True, mode="json") | |
| ) | |
| return Response(status="success", data=[Letter(**r) for r in result]) | |
| except Exception as e: | |
| logger.error(e) | |
| raise HTTPException(status_code=500, detail="Error creating letter") | |
| async def create_letters_by_proposal_id( | |
| self, | |
| letter: CreateLetterRequest, | |
| proposal_id: str = Path(...), | |
| rfp_id: str = Path(...), | |
| ): | |
| try: | |
| async with self.letter_service() as service: | |
| data = letter.model_dump(exclude_unset=True, mode="json") | |
| data["proposal_id"] = proposal_id | |
| result = await service.create_letter(letter=data) | |
| return Response(status="success", data=[Letter(**r) for r in result]) | |
| except Exception as e: | |
| logger.error(e) | |
| raise HTTPException(status_code=500, detail="Error creating letter") | |
| async def update_letter(self, letter: UpdateLetterRequest, id: str = Path(...)): | |
| try: | |
| async with self.letter_service() as service: | |
| result = await service.update_letter( | |
| id=id, letter=letter.model_dump(exclude_unset=True, mode="json") | |
| ) | |
| return Response(status="success", data=[Letter(**r) for r in result]) | |
| except Exception as e: | |
| logger.error(e) | |
| raise HTTPException(status_code=500, detail="Error updating letter") | |
| async def update_letters_by_proposal_and_id( | |
| self, | |
| letter: UpdateLetterRequestByProposalAndId, | |
| proposal_id: str = Path(...), | |
| id: str = Path(...), | |
| rfp_id: str = Path(...), | |
| ): | |
| try: | |
| async with self.letter_service() as service: | |
| data = letter.model_dump(exclude_unset=True, mode="json") | |
| data["proposal_id"] = proposal_id | |
| result = await service.update_letter(id=id, letter=data) | |
| return Response(status="success", data=[Letter(**r) for r in result]) | |
| except Exception as e: | |
| logger.error(e) | |
| raise HTTPException(status_code=500, detail="Error updating letter") | |
| async def delete_letter(self, id: str = Path(...)): | |
| try: | |
| async with self.letter_service() as service: | |
| result = await service.delete_letter(id=id) | |
| return DeleteResponse(status="success") | |
| except Exception as e: | |
| logger.error(e) | |
| raise HTTPException(status_code=500, detail="Error deleting letter") | |
| async def delete_letters_by_proposal_id( | |
| self, proposal_id: str = Path(...), rfp_id: str = Path(...) | |
| ): | |
| try: | |
| async with self.letter_service() as service: | |
| result = await service.delete_letter(proposal_id=proposal_id) | |
| return DeleteResponse(status="success") | |
| except Exception as e: | |
| logger.error(e) | |
| raise HTTPException(status_code=500, detail="Error deleting letter") | |
| async def delete_letters_by_proposal_and_id( | |
| self, proposal_id: str = Path(...), id: str = Path(...), rfp_id: str = Path(...) | |
| ): | |
| try: | |
| async with self.letter_service() as service: | |
| result = await service.delete_letter(proposal_id=proposal_id, id=id) | |
| return DeleteResponse(status="success") | |
| except Exception as e: | |
| logger.error(e) | |
| raise HTTPException(status_code=500, detail="Error deleting letter") | |