Spaces:
Runtime error
Runtime error
| from fastapi import APIRouter, HTTPException, Query, Path | |
| from pydantic import BaseModel | |
| from typing import List, Optional | |
| from uuid import UUID | |
| from datetime import datetime | |
| from src.config import logger | |
| from src.services import EvaluationService | |
| from src.models import EvaluationCriteriaType | |
| class EvaluationCriteria(BaseModel): | |
| id: UUID | |
| rfp_id: UUID | |
| evaluation_criteria: str | |
| evaluation_criteria_type: EvaluationCriteriaType | |
| created_at: datetime | |
| updated_at: datetime | |
| class EvaluationResponse(BaseModel): | |
| status: str | |
| data: List[EvaluationCriteria] | |
| class EvaluationRequest(BaseModel): | |
| rfp_id: UUID | |
| evaluation_criteria: str | |
| evaluation_criteria_type: EvaluationCriteriaType | |
| class CreateEvaluationRequest(BaseModel): | |
| evaluation_criteria: str | |
| evaluation_criteria_type: EvaluationCriteriaType | |
| class UpdateEvaluationRequest(BaseModel): | |
| rfp_id: Optional[UUID] = None | |
| evaluation_criteria: Optional[str] = None | |
| evaluation_criteria_type: Optional[EvaluationCriteriaType] = None | |
| class UpdateEvaluationCriteriaByRFPRequest(BaseModel): | |
| evaluation_criteria: Optional[str] = None | |
| evaluation_criteria_type: Optional[EvaluationCriteriaType] = None | |
| class DeleteResponse(BaseModel): | |
| status: str | |
| class EvaluationController: | |
| def __init__(self): | |
| self.evaluation_service = EvaluationService | |
| self.router = APIRouter() | |
| # self.router.add_api_route( | |
| # "/evaluation-criteria", | |
| # self.get_criteria, | |
| # methods=["GET"], | |
| # response_model=EvaluationResponse, | |
| # tags=["Evaluation Criteria"], | |
| # ) | |
| # self.router.add_api_route( | |
| # "/evaluation-criteria/{id}", | |
| # self.get_criteria_by_id, | |
| # methods=["GET"], | |
| # response_model=EvaluationResponse, | |
| # tags=["Evaluation Criteria by ID"], | |
| # ) | |
| self.router.add_api_route( | |
| "/rfps/{rfp_id}/evaluation-criteria", | |
| self.get_criteria_by_rfp_id, | |
| methods=["GET"], | |
| response_model=EvaluationResponse, | |
| tags=["Evaluation Criteria by RFP ID"], | |
| ) | |
| self.router.add_api_route( | |
| "/rfps/{rfp_id}/evaluation-criteria/{id}", | |
| self.get_criteria_by_proposal_and_rfp_id, | |
| methods=["GET"], | |
| response_model=EvaluationResponse, | |
| tags=["Evaluation Criteria by Proposal and RFP ID"], | |
| ) | |
| # self.router.add_api_route( | |
| # "/evaluation-criteria", | |
| # self.create_criteria, | |
| # methods=["POST"], | |
| # response_model=EvaluationResponse, | |
| # tags=["Evaluation Criteria"], | |
| # ) | |
| self.router.add_api_route( | |
| "/rfps/{rfp_id}/evaluation-criteria", | |
| self.create_criteria_by_rfp_id, | |
| methods=["POST"], | |
| response_model=EvaluationResponse, | |
| tags=["Evaluation Criteria by RFP ID"], | |
| ) | |
| # self.router.add_api_route( | |
| # "/evaluation-criteria/{id}", | |
| # self.update_criteria, | |
| # methods=["PUT"], | |
| # response_model=EvaluationResponse, | |
| # tags=["Evaluation Criteria by ID"], | |
| # ) | |
| self.router.add_api_route( | |
| "/rfps/{rfp_id}/evaluation-criteria/{id}", | |
| self.update_criteria_by_proposal_and_rfp_id, | |
| methods=["PUT"], | |
| response_model=EvaluationResponse, | |
| tags=["Evaluation Criteria by Proposal and RFP ID"], | |
| ) | |
| # self.router.add_api_route( | |
| # "/evaluation-criteria/{id}", | |
| # self.delete_criteria, | |
| # methods=["DELETE"], | |
| # response_model=DeleteResponse, | |
| # tags=["Evaluation Criteria by ID"], | |
| # ) | |
| self.router.add_api_route( | |
| "/rfps/{rfp_id}/evaluation-criteria", | |
| self.delete_criteria_by_rfp_id, | |
| methods=["DELETE"], | |
| response_model=DeleteResponse, | |
| tags=["Evaluation Criteria by RFP ID"], | |
| ) | |
| self.router.add_api_route( | |
| "/rfps/{rfp_id}/evaluation-criteria/{id}", | |
| self.delete_criteria_by_proposal_and_rfp_id, | |
| methods=["DELETE"], | |
| response_model=DeleteResponse, | |
| tags=["Evaluation Criteria by Proposal and RFP ID"], | |
| ) | |
| async def get_criteria(self): | |
| try: | |
| async with self.evaluation_service() as service: | |
| result = await service.get_criteria() | |
| return EvaluationResponse( | |
| status="success", data=[EvaluationCriteria(**r) for r in result] | |
| ) | |
| except Exception as e: | |
| logger.error(e) | |
| raise HTTPException( | |
| status_code=500, detail="Error fetching evaluation criteria" | |
| ) | |
| async def get_criteria_by_id(self, id: str = Path(...)): | |
| try: | |
| async with self.evaluation_service() as service: | |
| result = await service.get_criteria(id=id) | |
| return EvaluationResponse( | |
| status="success", data=[EvaluationCriteria(**r) for r in result] | |
| ) | |
| except Exception as e: | |
| logger.error(e) | |
| raise HTTPException( | |
| status_code=500, detail="Error fetching evaluation criteria" | |
| ) | |
| async def get_criteria_by_rfp_id( | |
| self, | |
| rfp_id: str = Path(...), | |
| criteria_type: Optional[EvaluationCriteriaType] = Query(None), | |
| ): | |
| try: | |
| async with self.evaluation_service() as service: | |
| result = await service.get_criteria( | |
| rfp_id=rfp_id, evaluation_criteria_type=criteria_type | |
| ) | |
| return EvaluationResponse( | |
| status="success", data=[EvaluationCriteria(**r) for r in result] | |
| ) | |
| except Exception as e: | |
| logger.error(e) | |
| raise HTTPException( | |
| status_code=500, detail="Error fetching evaluation criteria" | |
| ) | |
| async def get_criteria_by_proposal_and_rfp_id( | |
| self, rfp_id: str = Path(...), id: str = Path(...) | |
| ): | |
| try: | |
| async with self.evaluation_service() as service: | |
| result = await service.get_criteria(rfp_id=rfp_id, id=id) | |
| return EvaluationResponse( | |
| status="success", data=[EvaluationCriteria(**r) for r in result] | |
| ) | |
| except Exception as e: | |
| logger.error(e) | |
| raise HTTPException( | |
| status_code=500, detail="Error fetching evaluation criteria" | |
| ) | |
| async def create_criteria(self, criteria: EvaluationRequest): | |
| try: | |
| async with self.evaluation_service() as service: | |
| result = await service.create_criteria( | |
| criteria.model_dump(exclude_unset=True, mode="json") | |
| ) | |
| return EvaluationResponse( | |
| status="success", data=[EvaluationCriteria(**r) for r in result] | |
| ) | |
| except Exception as e: | |
| logger.error(e) | |
| raise HTTPException( | |
| status_code=500, detail="Error creating evaluation criteria" | |
| ) | |
| async def create_criteria_by_rfp_id( | |
| self, criteria: CreateEvaluationRequest, rfp_id: str = Path(...) | |
| ): | |
| try: | |
| async with self.evaluation_service() as service: | |
| data = criteria.model_dump(exclude_unset=True, mode="json") | |
| data["rfp_id"] = rfp_id | |
| result = await service.create_criteria(evaluation_criteria=data) | |
| return EvaluationResponse( | |
| status="success", data=[EvaluationCriteria(**r) for r in result] | |
| ) | |
| except Exception as e: | |
| logger.error(e) | |
| raise HTTPException( | |
| status_code=500, detail="Error creating evaluation criteria" | |
| ) | |
| async def update_criteria( | |
| self, criteria: UpdateEvaluationRequest, id: str = Path(...) | |
| ): | |
| try: | |
| async with self.evaluation_service() as service: | |
| result = await service.update_criteria( | |
| id=id, | |
| evaluation_criteria=criteria.model_dump( | |
| exclude_unset=True, mode="json" | |
| ), | |
| ) | |
| return EvaluationResponse( | |
| status="success", data=[EvaluationCriteria(**r) for r in result] | |
| ) | |
| except Exception as e: | |
| logger.error(e) | |
| raise HTTPException( | |
| status_code=500, detail="Error updating evaluation criteria" | |
| ) | |
| async def update_criteria_by_proposal_and_rfp_id( | |
| self, | |
| criteria: UpdateEvaluationCriteriaByRFPRequest, | |
| rfp_id: str = Path(...), | |
| id: str = Path(...), | |
| ): | |
| try: | |
| async with self.evaluation_service() as service: | |
| data = criteria.model_dump(exclude_unset=True, mode="json") | |
| data["rfp_id"] = rfp_id | |
| result = await service.update_criteria(id=id, evaluation_criteria=data) | |
| return EvaluationResponse( | |
| status="success", data=[EvaluationCriteria(**r) for r in result] | |
| ) | |
| except Exception as e: | |
| logger.error(e) | |
| raise HTTPException( | |
| status_code=500, detail="Error updating evaluation criteria" | |
| ) | |
| async def delete_criteria(self, id: str = Path(...)): | |
| try: | |
| async with self.evaluation_service() as service: | |
| await service.delete_criteria(id=id) | |
| return DeleteResponse(status="success") | |
| except Exception as e: | |
| logger.error(e) | |
| raise HTTPException( | |
| status_code=500, detail="Error deleting evaluation criteria" | |
| ) | |
| async def delete_criteria_by_rfp_id(self, rfp_id: str = Path(...)): | |
| try: | |
| async with self.evaluation_service() as service: | |
| await service.delete_criteria(rfp_id=rfp_id) | |
| return DeleteResponse(status="success") | |
| except Exception as e: | |
| logger.error(e) | |
| raise HTTPException( | |
| status_code=500, detail="Error deleting evaluation criteria" | |
| ) | |
| async def delete_criteria_by_proposal_and_rfp_id( | |
| self, rfp_id: str = Path(...), id: str = Path(...) | |
| ): | |
| try: | |
| async with self.evaluation_service() as service: | |
| await service.delete_criteria(rfp_id=rfp_id, id=id) | |
| return DeleteResponse(status="success") | |
| except Exception as e: | |
| logger.error(e) | |
| raise HTTPException( | |
| status_code=500, detail="Error deleting evaluation criteria" | |
| ) | |