Spaces:
Runtime error
Runtime error
| from fastapi import APIRouter, Body, HTTPException, Query, Path | |
| from pydantic import BaseModel | |
| from typing import List, Optional | |
| from uuid import UUID | |
| from datetime import datetime | |
| from src.config import logger | |
| from src.models import AnalysisType, AnalysisStatus | |
| from src.services import AnalysisService | |
| class Analysis(BaseModel): | |
| id: UUID | |
| evaluation_id: UUID | |
| insights: str | |
| analysis_type: AnalysisType | |
| status: AnalysisStatus | |
| created_at: datetime | |
| updated_at: datetime | |
| class Response(BaseModel): | |
| status: str | |
| data: Optional[List[Analysis]] = None | |
| class CreateAnalysis(BaseModel): | |
| insights: str | |
| analysis_type: AnalysisType | |
| status: Optional[AnalysisStatus] = None | |
| class CreateAnalysisRequest(BaseModel): | |
| data: List[CreateAnalysis] | |
| analysis_score_type: Optional[str] = None | |
| class UpdateAnalysis(BaseModel): | |
| insights: Optional[str] = None | |
| analysis_type: Optional[AnalysisType] = None | |
| status: Optional[AnalysisStatus] = None | |
| class Deleteresponse(BaseModel): | |
| status: str | |
| class AnalysisController: | |
| def __init__(self): | |
| self.service = AnalysisService | |
| self.router = APIRouter() | |
| self.router.add_api_route( | |
| "/rfps/{rfp_id}/proposals/{proposal_id}/evaluations/{evaluation_id}/analysis", | |
| self.get_analysis_by_proposal_and_rfp_id_and_evaluation_id, | |
| methods=["GET"], | |
| response_model=Response, | |
| tags=["Analysis by Proposal and RFP ID and Evaluation ID"], | |
| ) | |
| self.router.add_api_route( | |
| "/rfps/{rfp_id}/proposals/{proposal_id}/evaluations/{evaluation_id}/analysis/{id}", | |
| self.get_analysis_by_proposal_and_rfp_id_and_evaluation_id_and_id, | |
| methods=["GET"], | |
| response_model=Response, | |
| tags=["Analysis by Proposal and RFP ID and Evaluation ID and ID"], | |
| ) | |
| self.router.add_api_route( | |
| "/rfps/{rfp_id}/proposals/{proposal_id}/evaluations/{evaluation_id}/analysis", | |
| self.create_analysis_by_proposal_and_rfp_id_and_evaluation_id, | |
| methods=["POST"], | |
| response_model=Response, | |
| tags=["Analysis by Proposal and RFP ID and Evaluation ID"], | |
| ) | |
| self.router.add_api_route( | |
| "/rfps/{rfp_id}/proposals/{proposal_id}/evaluations/{evaluation_id}/analysis", | |
| self.update_analysis_by_proposal_and_rfp_id_and_evaluation_id, | |
| methods=["PUT"], | |
| response_model=Response, | |
| tags=["Analysis by Proposal and RFP ID and Evaluation ID"], | |
| ) | |
| self.router.add_api_route( | |
| "/rfps/{rfp_id}/proposals/{proposal_id}/evaluations/{evaluation_id}/analysis/{id}", | |
| self.update_analysis_by_proposal_and_rfp_id_and_evaluation_id_and_id, | |
| methods=["PUT"], | |
| response_model=Response, | |
| tags=["Analysis by Proposal and RFP ID and Evaluation ID and ID"], | |
| ) | |
| self.router.add_api_route( | |
| "/rfps/{rfp_id}/proposals/{proposal_id}/evaluations/{evaluation_id}/analysis", | |
| self.delete_analysis_by_proposal_and_rfp_id_and_evaluation_id, | |
| methods=["DELETE"], | |
| response_model=Deleteresponse, | |
| tags=["Analysis by Proposal and RFP ID and Evaluation ID"], | |
| ) | |
| self.router.add_api_route( | |
| "/rfps/{rfp_id}/proposals/{proposal_id}/evaluations/{evaluation_id}/analysis/{id}", | |
| self.delete_analysis_by_proposal_and_rfp_id_and_evaluation_id_and_id, | |
| methods=["DELETE"], | |
| response_model=Deleteresponse, | |
| tags=["Analysis by Proposal and RFP ID and Evaluation ID and ID"], | |
| ) | |
| async def get_analysis_by_proposal_and_rfp_id_and_evaluation_id( | |
| self, | |
| rfp_id: str = Path(...), | |
| proposal_id: str = Path(...), | |
| evaluation_id: str = Path(...), | |
| analysis_type: AnalysisType = Query(None), | |
| ): | |
| try: | |
| async with self.service() as service: | |
| results = await service.get_analysis( | |
| evaluation_id=evaluation_id, analysis_type=analysis_type | |
| ) | |
| return Response( | |
| status="success", | |
| data=[Analysis(**result) for result in results], | |
| ) | |
| except Exception as e: | |
| logger.error(e) | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| async def get_analysis_by_proposal_and_rfp_id_and_evaluation_id_and_id( | |
| self, | |
| rfp_id: str = Path(...), | |
| proposal_id: str = Path(...), | |
| evaluation_id: str = Path(...), | |
| id: str = Path(...), | |
| ): | |
| try: | |
| async with self.service() as service: | |
| results = await service.get_analysis(evaluation_id=evaluation_id, id=id) | |
| return Response( | |
| status="success", | |
| data=[Analysis(**result) for result in results], | |
| ) | |
| except Exception as e: | |
| logger.error(e) | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| async def create_analysis_by_proposal_and_rfp_id_and_evaluation_id( | |
| self, | |
| rfp_id: str = Path(...), | |
| proposal_id: str = Path(...), | |
| evaluation_id: str = Path(...), | |
| analysis: CreateAnalysisRequest = Body(...), | |
| ): | |
| try: | |
| async with self.service() as service: | |
| evaluation_analysis = analysis.model_dump( | |
| mode="json", exclude_unset=True | |
| ) | |
| results = await service.create_analysis( | |
| evaluation_id=evaluation_id, | |
| evaluation_analysis=evaluation_analysis["data"], | |
| analysis_score_type=( | |
| evaluation_analysis["analysis_score_type"] | |
| if "analysis_score_type" in evaluation_analysis | |
| else None | |
| ), | |
| ) | |
| return Response( | |
| status="success", | |
| data=[Analysis(**result) for result in results], | |
| ) | |
| except Exception as e: | |
| logger.error(e) | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| async def update_analysis_by_proposal_and_rfp_id_and_evaluation_id( | |
| self, | |
| rfp_id: str = Path(...), | |
| proposal_id: str = Path(...), | |
| evaluation_id: str = Path(...), | |
| analysis: CreateAnalysisRequest = Body(...), | |
| ): | |
| try: | |
| async with self.service() as service: | |
| results = await service.update_analysis( | |
| evaluation_id=evaluation_id, | |
| proposal_analysis=analysis.model_dump( | |
| mode="json", exclude_unset=True | |
| )["data"], | |
| ) | |
| return Response( | |
| status="success", | |
| data=[Analysis(**result) for result in results], | |
| ) | |
| except Exception as e: | |
| logger.error(e) | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| async def update_analysis_by_proposal_and_rfp_id_and_evaluation_id_and_id( | |
| self, | |
| rfp_id: str = Path(...), | |
| proposal_id: str = Path(...), | |
| evaluation_id: str = Path(...), | |
| id: str = Path(...), | |
| analysis: UpdateAnalysis = Body(...), | |
| ): | |
| try: | |
| async with self.service() as service: | |
| results = await service.update_analysis( | |
| evaluation_id=evaluation_id, | |
| id=id, | |
| proposal_analysis=analysis.model_dump( | |
| mode="json", exclude_unset=True | |
| ), | |
| ) | |
| return Response( | |
| status="success", | |
| data=[Analysis(**result) for result in results], | |
| ) | |
| except Exception as e: | |
| logger.error(e) | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| async def delete_analysis_by_proposal_and_rfp_id_and_evaluation_id( | |
| self, | |
| rfp_id: str = Path(...), | |
| proposal_id: str = Path(...), | |
| evaluation_id: str = Path(...), | |
| ): | |
| try: | |
| async with self.service() as service: | |
| results = await service.delete_analysis(evaluation_id=evaluation_id) | |
| return Deleteresponse(status="success") | |
| except Exception as e: | |
| logger.error(e) | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| async def delete_analysis_by_proposal_and_rfp_id_and_evaluation_id_and_id( | |
| self, | |
| rfp_id: str = Path(...), | |
| proposal_id: str = Path(...), | |
| evaluation_id: str = Path(...), | |
| id: str = Path(...), | |
| ): | |
| try: | |
| async with self.service() as service: | |
| results = await service.delete_analysis( | |
| evaluation_id=evaluation_id, id=id | |
| ) | |
| return Deleteresponse(status="success") | |
| except Exception as e: | |
| logger.error(e) | |
| raise HTTPException(status_code=500, detail=str(e)) | |