Spaces:
Runtime error
Runtime error
| from fastapi import APIRouter, HTTPException, Query, Path | |
| from pydantic import BaseModel | |
| from typing import List, Optional | |
| from uuid import UUID | |
| from datetime import datetime | |
| from src.config import logger | |
| from src.models import OperationCriteria, ProposalDetailAnalysis | |
| from src.services import ProposalDetailedAnalysisService | |
| class ProposalDetailedAnalysis(BaseModel): | |
| id: UUID | |
| proposal_id: UUID | |
| ai_assessment_result: str | |
| confidence: float | |
| operation_criteria: OperationCriteria | |
| created_at: datetime | |
| updated_at: datetime | |
| class ProposalDetailedAnalysisResponse(BaseModel): | |
| status: str | |
| data: Optional[List[ProposalDetailedAnalysis]] | |
| class OperationCriteriaResult(BaseModel): | |
| confidence: float | |
| ai_assessment_result: str | |
| class ProposalCompleteDetailedAnalysis(BaseModel): | |
| mission_alignment: Optional[OperationCriteriaResult] = None | |
| technical_feasibility: Optional[OperationCriteriaResult] = None | |
| cost_efficiency: Optional[OperationCriteriaResult] = None | |
| risk_assessment: Optional[OperationCriteriaResult] = None | |
| class CreateProposalDetailedAnalysisRequest(BaseModel): | |
| data: ProposalCompleteDetailedAnalysis | |
| class CompleteAnalysisResponse(BaseModel): | |
| status: str | |
| data: Optional[ProposalCompleteDetailedAnalysis] | |
| class DeleteResponse(BaseModel): | |
| status: str | |
| class CreateDetailedAnalysisRequest(BaseModel): | |
| proposal_id: UUID | |
| confidence: float | |
| ai_assessment_result: str | |
| operation_criteria: OperationCriteria | |
| class UpdateDetailedAnalysisRequest(BaseModel): | |
| proposal_id: Optional[UUID] = None | |
| confidence: Optional[float] = None | |
| ai_assessment_result: Optional[str] = None | |
| operation_criteria: Optional[OperationCriteria] = None | |
| class ProposalDetailedController: | |
| def __init__(self): | |
| self.service = ProposalDetailedAnalysisService | |
| self.router = APIRouter() | |
| self.router.add_api_route( | |
| "/proposal-detailed-analysis", | |
| self.get_proposal_analysis, | |
| methods=["GET"], | |
| tags=["Proposal Detailed Analysis"], | |
| response_model=ProposalDetailedAnalysisResponse, | |
| ) | |
| self.router.add_api_route( | |
| "/proposal-detailed-analysis/{id}", | |
| self.get_proposal_analysis_by_id, | |
| methods=["GET"], | |
| tags=["Proposal Detailed Analysis by ID"], | |
| response_model=ProposalDetailedAnalysisResponse, | |
| ) | |
| self.router.add_api_route( | |
| "/proposals/{proposal_id}/proposal-detailed-analysis", | |
| self.get_proposal_analysis_by_proposal_id, | |
| methods=["GET"], | |
| tags=["Proposal Detailed Analysis by Proposal ID"], | |
| response_model=CompleteAnalysisResponse, | |
| ) | |
| self.router.add_api_route( | |
| "/proposal-detailed-analysis", | |
| self.create_proposal_analysis, | |
| methods=["POST"], | |
| tags=["Proposal Detailed Analysis"], | |
| response_model=ProposalDetailedAnalysisResponse, | |
| ) | |
| self.router.add_api_route( | |
| "/proposals/{proposal_id}/proposal-detailed-analysis", | |
| self.create_complete_proposal_analysis, | |
| methods=["POST"], | |
| tags=["Proposal Detailed Analysis by Proposal ID"], | |
| response_model=CompleteAnalysisResponse, | |
| ) | |
| self.router.add_api_route( | |
| "/proposal-detailed-analysis/{id}", | |
| self.update_proposal_analysis, | |
| methods=["PUT"], | |
| tags=["Proposal Detailed Analysis by ID"], | |
| response_model=ProposalDetailedAnalysisResponse, | |
| ) | |
| self.router.add_api_route( | |
| "/proposals/{proposal_id}/proposal-detailed-analysis", | |
| self.update_complete_proposal_analysis, | |
| methods=["PUT"], | |
| tags=["Proposal Detailed Analysis by Proposal ID"], | |
| response_model=CompleteAnalysisResponse, | |
| ) | |
| self.router.add_api_route( | |
| "/proposal-detailed-analysis/{id}", | |
| self.delete_proposal_analysis, | |
| methods=["DELETE"], | |
| tags=["Proposal Detailed Analysis by ID"], | |
| response_model=DeleteResponse, | |
| ) | |
| self.router.add_api_route( | |
| "/proposals/{proposal_id}/proposal-detailed-analysis", | |
| self.delete_proposal_analysis_by_proposal_id, | |
| methods=["DELETE"], | |
| tags=["Proposal Detailed Analysis by Proposal ID"], | |
| response_model=DeleteResponse, | |
| ) | |
| self.router.add_api_route( | |
| "/proposals/{proposal_id}/proposal-detailed-analysis/{id}", | |
| self.delete_proposal_analysis_by_proposal_and_id, | |
| methods=["DELETE"], | |
| tags=["Proposal Detailed Analysis by Proposal and ID"], | |
| response_model=DeleteResponse, | |
| ) | |
| async def get_proposal_analysis(self): | |
| async with self.service() as service: | |
| try: | |
| result = await service.get_proposal_analysis() | |
| if not result: | |
| return ProposalDetailedAnalysisResponse(status="success", data=[]) | |
| return ProposalDetailedAnalysisResponse( | |
| status="success", | |
| data=[ProposalDetailedAnalysis(**r) for r in result], | |
| ) | |
| except Exception as e: | |
| logger.exception(e) | |
| raise HTTPException( | |
| status_code=500, detail="Failed to get proposal analysis." | |
| ) | |
| async def get_proposal_analysis_by_id(self, id: str = Path(...)): | |
| async with self.service() as service: | |
| try: | |
| result = await service.get_proposal_analysis(id=id) | |
| return ProposalDetailedAnalysisResponse( | |
| status="success", | |
| data=[ProposalDetailedAnalysis(**r) for r in result], | |
| ) | |
| except Exception as e: | |
| logger.exception(e) | |
| raise HTTPException( | |
| status_code=500, detail="Failed to get proposal analysis." | |
| ) | |
| async def get_proposal_analysis_by_proposal_id(self, proposal_id: str = Path(...)): | |
| async with self.service() as service: | |
| try: | |
| result = await service.get_proposal_analysis_by_proposal_id( | |
| proposal_id=proposal_id | |
| ) | |
| return CompleteAnalysisResponse(status="success", data=result) | |
| except Exception as e: | |
| logger.exception(e) | |
| raise HTTPException( | |
| status_code=500, detail="Failed to get proposal analysis." | |
| ) | |
| async def create_proposal_analysis(self, analysis: CreateDetailedAnalysisRequest): | |
| async with self.service() as service: | |
| try: | |
| result = await service.create_proposal_analysis( | |
| data=analysis.model_dump(mode="json", exclude_unset=True) | |
| ) | |
| return ProposalDetailedAnalysisResponse( | |
| status="success", | |
| data=[ProposalDetailedAnalysis(**r) for r in result], | |
| ) | |
| except Exception as e: | |
| logger.exception(e) | |
| raise HTTPException( | |
| status_code=500, detail="Failed to create proposal analysis." | |
| ) | |
| async def create_complete_proposal_analysis( | |
| self, | |
| analysis: CreateProposalDetailedAnalysisRequest, | |
| proposal_id: UUID = Path(...), | |
| ): | |
| async with self.service() as service: | |
| try: | |
| data = analysis.model_dump(mode="json", exclude_unset=True) | |
| data["proposal_id"] = proposal_id | |
| result = await service.create_complete_proposal_analysis(data=data) | |
| return CompleteAnalysisResponse(status="success", data=result) | |
| except Exception as e: | |
| logger.exception(e) | |
| raise HTTPException( | |
| status_code=500, detail="Failed to create proposal analysis." | |
| ) | |
| async def update_proposal_analysis( | |
| self, analysis: UpdateDetailedAnalysisRequest, id: UUID = Path(...) | |
| ): | |
| async with self.service() as service: | |
| try: | |
| data = analysis.model_dump(mode="json", exclude_unset=True) | |
| data["id"] = id | |
| result = await service.update_proposal_analysis(data=data) | |
| return ProposalDetailedAnalysisResponse( | |
| status="success", | |
| data=[ProposalDetailedAnalysis(**r) for r in result], | |
| ) | |
| except Exception as e: | |
| logger.exception(e) | |
| raise HTTPException( | |
| status_code=500, detail="Failed to update proposal analysis." | |
| ) | |
| async def update_complete_proposal_analysis( | |
| self, | |
| analysis: CreateProposalDetailedAnalysisRequest, | |
| proposal_id: UUID = Path(...), | |
| ): | |
| async with self.service() as service: | |
| try: | |
| data = analysis.model_dump(mode="json", exclude_unset=True) | |
| data["proposal_id"] = proposal_id | |
| result = await service.update_complete_proposal_analysis(data=data) | |
| return CompleteAnalysisResponse(status="success", data=result) | |
| except Exception as e: | |
| logger.exception(e) | |
| raise HTTPException( | |
| status_code=500, detail="Failed to update proposal analysis." | |
| ) | |
| async def delete_proposal_analysis(self, id: str = Path(...)): | |
| async with self.service() as service: | |
| try: | |
| result = await service.delete_proposal_analysis(id=id) | |
| return DeleteResponse(status="success") | |
| except Exception as e: | |
| logger.exception(e) | |
| raise HTTPException( | |
| status_code=500, detail="Failed to delete proposal analysis." | |
| ) | |
| async def delete_proposal_analysis_by_proposal_id( | |
| self, proposal_id: str = Path(...) | |
| ): | |
| async with self.service() as service: | |
| try: | |
| result = await service.delete_proposal_analysis(proposal_id=proposal_id) | |
| return DeleteResponse(status="success") | |
| except Exception as e: | |
| logger.exception(e) | |
| raise HTTPException( | |
| status_code=500, detail="Failed to delete proposal analysis." | |
| ) | |
| async def delete_proposal_analysis_by_proposal_and_id( | |
| self, proposal_id: str = Path(...), id: str = Path(...) | |
| ): | |
| async with self.service() as service: | |
| try: | |
| result = await service.delete_proposal_analysis( | |
| proposal_id=proposal_id, id=id | |
| ) | |
| return DeleteResponse(status="success") | |
| except Exception as e: | |
| logger.exception(e) | |
| raise HTTPException( | |
| status_code=500, detail="Failed to delete proposal analysis." | |
| ) | |