Spaces:
Runtime error
Runtime error
| from fastapi import APIRouter, Body, HTTPException, Query, Path | |
| from pydantic import BaseModel | |
| from typing import List, Optional | |
| from uuid import UUID | |
| from datetime import datetime | |
| from src.config import logger | |
| from src.services import GateEvaluationService | |
| class GateEvaluation(BaseModel): | |
| id: UUID | |
| proposal_id: UUID | |
| result: str | |
| created_at: datetime | |
| updated_at: datetime | |
| class GateEvaluationResponse(BaseModel): | |
| status: str | |
| data: List[GateEvaluation] | |
| class GateEvaluationSingleResponse(BaseModel): | |
| status: str | |
| data: GateEvaluation | |
| class GateEvaluationRequest(BaseModel): | |
| result: List[str] | |
| class GateEvaluationUpdateRequest(BaseModel): | |
| result: str | |
| class DeleteResponse(BaseModel): | |
| status: str | |
| class GateEvaluationController: | |
| def __init__(self): | |
| self.gate_evaluation_service = GateEvaluationService | |
| self.router = APIRouter() | |
| self.router.add_api_route( | |
| "/rfps/{rfp_id}/proposals/{proposal_id}/gate-evaluations", | |
| self.get_gate_evaluation_by_proposal_and_rfp_id, | |
| methods=["GET"], | |
| response_model=GateEvaluationResponse, | |
| tags=["Gate Evaluation by Proposal and RFP ID"], | |
| ) | |
| self.router.add_api_route( | |
| "/rfps/{rfp_id}/proposals/{proposal_id}/gate-evaluations/{id}", | |
| self.get_gate_evaluation_by_proposal_and_rfp_id_and_id, | |
| methods=["GET"], | |
| response_model=GateEvaluationSingleResponse, | |
| tags=["Gate Evaluation by Proposal and RFP ID and ID"], | |
| ) | |
| self.router.add_api_route( | |
| "/rfps/{rfp_id}/proposals/{proposal_id}/gate-evaluations", | |
| self.create_gate_evaluation_by_proposal_and_rfp_id, | |
| methods=["POST"], | |
| response_model=GateEvaluationResponse, | |
| tags=["Gate Evaluation by Proposal and RFP ID"], | |
| ) | |
| self.router.add_api_route( | |
| "/rfps/{rfp_id}/proposals/{proposal_id}/gate-evaluations", | |
| self.update_gate_evaluation_by_proposal_and_rfp_id, | |
| methods=["PUT"], | |
| response_model=GateEvaluationResponse, | |
| tags=["Gate Evaluation by Proposal and RFP ID"], | |
| ) | |
| self.router.add_api_route( | |
| "/rfps/{rfp_id}/proposals/{proposal_id}/gate-evaluations/{id}", | |
| self.update_gate_evaluation_by_proposal_and_rfp_id_and_id, | |
| methods=["PATCH"], | |
| response_model=GateEvaluationSingleResponse, | |
| tags=["Gate Evaluation by Proposal and RFP ID and ID"], | |
| ) | |
| self.router.add_api_route( | |
| "/rfps/{rfp_id}/proposals/{proposal_id}/gate-evaluations", | |
| self.delete_gate_evaluation_by_proposal_and_rfp_id, | |
| methods=["DELETE"], | |
| response_model=DeleteResponse, | |
| tags=["Gate Evaluation by Proposal and RFP ID"], | |
| ) | |
| self.router.add_api_route( | |
| "/rfps/{rfp_id}/proposals/{proposal_id}/gate-evaluations/{id}", | |
| self.delete_gate_evaluation_by_proposal_and_rfp_id_and_id, | |
| methods=["DELETE"], | |
| response_model=DeleteResponse, | |
| tags=["Gate Evaluation by Proposal and RFP ID and ID"], | |
| ) | |
| async def get_gate_evaluation_by_proposal_and_rfp_id( | |
| self, rfp_id: str = Path(...), proposal_id: str = Path(...) | |
| ): | |
| async with self.gate_evaluation_service() as service: | |
| try: | |
| result = await service.get_gate_evaluation(proposal_id=proposal_id) | |
| return GateEvaluationResponse( | |
| status="success", data=[GateEvaluation(**r) for r in result] | |
| ) | |
| except Exception as e: | |
| logger.exception(e) | |
| raise HTTPException( | |
| status_code=500, detail="Failed to get gate evaluation." | |
| ) | |
| async def get_gate_evaluation_by_proposal_and_rfp_id_and_id( | |
| self, rfp_id: str = Path(...), proposal_id: str = Path(...), id: str = Path(...) | |
| ): | |
| async with self.gate_evaluation_service() as service: | |
| try: | |
| result = await service.get_gate_evaluation(id=id) | |
| if result: | |
| result = result[0] | |
| else: | |
| return GateEvaluationSingleResponse(status="success", data=None) | |
| return GateEvaluationSingleResponse( | |
| status="success", data=GateEvaluation(**result) | |
| ) | |
| except Exception as e: | |
| logger.exception(e) | |
| raise HTTPException( | |
| status_code=500, detail="Failed to get gate evaluation." | |
| ) | |
| async def create_gate_evaluation_by_proposal_and_rfp_id( | |
| self, | |
| rfp_id: str = Path(...), | |
| proposal_id: str = Path(...), | |
| gate_evaluation: GateEvaluationRequest = Body(...), | |
| ): | |
| async with self.gate_evaluation_service() as service: | |
| try: | |
| data = gate_evaluation.model_dump(exclude_unset=True, mode="json") | |
| result = await service.create_gate_evaluation( | |
| proposal_id=proposal_id, gate_evaluation=data["result"] | |
| ) | |
| return GateEvaluationResponse( | |
| status="success", data=[GateEvaluation(**r) for r in result] | |
| ) | |
| except Exception as e: | |
| logger.exception(e) | |
| raise HTTPException( | |
| status_code=500, detail="Failed to create gate evaluation." | |
| ) | |
| async def update_gate_evaluation_by_proposal_and_rfp_id( | |
| self, | |
| rfp_id: str = Path(...), | |
| proposal_id: str = Path(...), | |
| gate_evaluation: GateEvaluationRequest = Body(...), | |
| ): | |
| async with self.gate_evaluation_service() as service: | |
| try: | |
| data = gate_evaluation.model_dump(exclude_unset=True, mode="json") | |
| result = await service.update_gate_evaluation( | |
| proposal_id=proposal_id, gate_evaluation=data["result"] | |
| ) | |
| return GateEvaluationResponse( | |
| status="success", data=[GateEvaluation(**r) for r in result] | |
| ) | |
| except Exception as e: | |
| logger.exception(e) | |
| raise HTTPException( | |
| status_code=500, detail="Failed to update gate evaluation." | |
| ) | |
| async def update_gate_evaluation_by_proposal_and_rfp_id_and_id( | |
| self, | |
| rfp_id: str = Path(...), | |
| proposal_id: str = Path(...), | |
| id: str = Path(...), | |
| gate_evaluation: GateEvaluationUpdateRequest = Body(...), | |
| ): | |
| async with self.gate_evaluation_service() as service: | |
| try: | |
| data = gate_evaluation.model_dump(exclude_unset=True, mode="json") | |
| result = await service.update_gate_evaluation( | |
| id=id, gate_evaluation=data | |
| ) | |
| if result: | |
| return GateEvaluationSingleResponse( | |
| status="success", data=GateEvaluation(**result) | |
| ) | |
| else: | |
| return GateEvaluationSingleResponse(status="success", data=None) | |
| except Exception as e: | |
| logger.exception(e) | |
| raise HTTPException( | |
| status_code=500, detail="Failed to update gate evaluation." | |
| ) | |
| async def delete_gate_evaluation_by_proposal_and_rfp_id( | |
| self, rfp_id: str = Path(...), proposal_id: str = Path(...) | |
| ): | |
| async with self.gate_evaluation_service() as service: | |
| try: | |
| result = await service.delete_gate_evaluation(proposal_id=proposal_id) | |
| return DeleteResponse(status="success", data=result) | |
| except Exception as e: | |
| logger.exception(e) | |
| raise HTTPException( | |
| status_code=500, detail="Failed to delete gate evaluation." | |
| ) | |
| async def delete_gate_evaluation_by_proposal_and_rfp_id_and_id( | |
| self, rfp_id: str = Path(...), proposal_id: str = Path(...), id: str = Path(...) | |
| ): | |
| async with self.gate_evaluation_service() as service: | |
| try: | |
| result = await service.delete_gate_evaluation(id=id) | |
| return DeleteResponse(status="success", data=result) | |
| except Exception as e: | |
| logger.exception(e) | |
| raise HTTPException( | |
| status_code=500, detail="Failed to delete gate evaluation." | |
| ) | |