ipns-poc-eval / src /controllers /_proposal_detailed_analysis_controller.py
Aryan Jain
update apis
b2f266f
from fastapi import APIRouter, HTTPException, Query, Path
from pydantic import BaseModel
from typing import List, Optional
from uuid import UUID
from datetime import datetime
from src.config import logger
from src.models import OperationCriteria, ProposalDetailAnalysis
from src.services import ProposalDetailedAnalysisService
class ProposalDetailedAnalysis(BaseModel):
id: UUID
proposal_id: UUID
ai_assessment_result: str
confidence: float
operation_criteria: OperationCriteria
created_at: datetime
updated_at: datetime
class ProposalDetailedAnalysisResponse(BaseModel):
status: str
data: Optional[List[ProposalDetailedAnalysis]]
class OperationCriteriaResult(BaseModel):
confidence: float
ai_assessment_result: str
class ProposalCompleteDetailedAnalysis(BaseModel):
mission_alignment: Optional[OperationCriteriaResult] = None
technical_feasibility: Optional[OperationCriteriaResult] = None
cost_efficiency: Optional[OperationCriteriaResult] = None
risk_assessment: Optional[OperationCriteriaResult] = None
class CreateProposalDetailedAnalysisRequest(BaseModel):
data: ProposalCompleteDetailedAnalysis
class CompleteAnalysisResponse(BaseModel):
status: str
data: Optional[ProposalCompleteDetailedAnalysis]
class DeleteResponse(BaseModel):
status: str
class CreateDetailedAnalysisRequest(BaseModel):
proposal_id: UUID
confidence: float
ai_assessment_result: str
operation_criteria: OperationCriteria
class UpdateDetailedAnalysisRequest(BaseModel):
proposal_id: Optional[UUID] = None
confidence: Optional[float] = None
ai_assessment_result: Optional[str] = None
operation_criteria: Optional[OperationCriteria] = None
class ProposalDetailedController:
def __init__(self):
self.service = ProposalDetailedAnalysisService
self.router = APIRouter()
self.router.add_api_route(
"/proposal-detailed-analysis",
self.get_proposal_analysis,
methods=["GET"],
tags=["Proposal Detailed Analysis"],
response_model=ProposalDetailedAnalysisResponse,
)
self.router.add_api_route(
"/proposal-detailed-analysis/{id}",
self.get_proposal_analysis_by_id,
methods=["GET"],
tags=["Proposal Detailed Analysis by ID"],
response_model=ProposalDetailedAnalysisResponse,
)
self.router.add_api_route(
"/proposals/{proposal_id}/proposal-detailed-analysis",
self.get_proposal_analysis_by_proposal_id,
methods=["GET"],
tags=["Proposal Detailed Analysis by Proposal ID"],
response_model=CompleteAnalysisResponse,
)
self.router.add_api_route(
"/proposal-detailed-analysis",
self.create_proposal_analysis,
methods=["POST"],
tags=["Proposal Detailed Analysis"],
response_model=ProposalDetailedAnalysisResponse,
)
self.router.add_api_route(
"/proposals/{proposal_id}/proposal-detailed-analysis",
self.create_complete_proposal_analysis,
methods=["POST"],
tags=["Proposal Detailed Analysis by Proposal ID"],
response_model=CompleteAnalysisResponse,
)
self.router.add_api_route(
"/proposal-detailed-analysis/{id}",
self.update_proposal_analysis,
methods=["PUT"],
tags=["Proposal Detailed Analysis by ID"],
response_model=ProposalDetailedAnalysisResponse,
)
self.router.add_api_route(
"/proposals/{proposal_id}/proposal-detailed-analysis",
self.update_complete_proposal_analysis,
methods=["PUT"],
tags=["Proposal Detailed Analysis by Proposal ID"],
response_model=CompleteAnalysisResponse,
)
self.router.add_api_route(
"/proposal-detailed-analysis/{id}",
self.delete_proposal_analysis,
methods=["DELETE"],
tags=["Proposal Detailed Analysis by ID"],
response_model=DeleteResponse,
)
self.router.add_api_route(
"/proposals/{proposal_id}/proposal-detailed-analysis",
self.delete_proposal_analysis_by_proposal_id,
methods=["DELETE"],
tags=["Proposal Detailed Analysis by Proposal ID"],
response_model=DeleteResponse,
)
self.router.add_api_route(
"/proposals/{proposal_id}/proposal-detailed-analysis/{id}",
self.delete_proposal_analysis_by_proposal_and_id,
methods=["DELETE"],
tags=["Proposal Detailed Analysis by Proposal and ID"],
response_model=DeleteResponse,
)
async def get_proposal_analysis(self):
async with self.service() as service:
try:
result = await service.get_proposal_analysis()
if not result:
return ProposalDetailedAnalysisResponse(status="success", data=[])
return ProposalDetailedAnalysisResponse(
status="success",
data=[ProposalDetailedAnalysis(**r) for r in result],
)
except Exception as e:
logger.exception(e)
raise HTTPException(
status_code=500, detail="Failed to get proposal analysis."
)
async def get_proposal_analysis_by_id(self, id: str = Path(...)):
async with self.service() as service:
try:
result = await service.get_proposal_analysis(id=id)
return ProposalDetailedAnalysisResponse(
status="success",
data=[ProposalDetailedAnalysis(**r) for r in result],
)
except Exception as e:
logger.exception(e)
raise HTTPException(
status_code=500, detail="Failed to get proposal analysis."
)
async def get_proposal_analysis_by_proposal_id(self, proposal_id: str = Path(...)):
async with self.service() as service:
try:
result = await service.get_proposal_analysis_by_proposal_id(
proposal_id=proposal_id
)
return CompleteAnalysisResponse(status="success", data=result)
except Exception as e:
logger.exception(e)
raise HTTPException(
status_code=500, detail="Failed to get proposal analysis."
)
async def create_proposal_analysis(self, analysis: CreateDetailedAnalysisRequest):
async with self.service() as service:
try:
result = await service.create_proposal_analysis(
data=analysis.model_dump(mode="json", exclude_unset=True)
)
return ProposalDetailedAnalysisResponse(
status="success",
data=[ProposalDetailedAnalysis(**r) for r in result],
)
except Exception as e:
logger.exception(e)
raise HTTPException(
status_code=500, detail="Failed to create proposal analysis."
)
async def create_complete_proposal_analysis(
self,
analysis: CreateProposalDetailedAnalysisRequest,
proposal_id: UUID = Path(...),
):
async with self.service() as service:
try:
data = analysis.model_dump(mode="json", exclude_unset=True)
data["proposal_id"] = proposal_id
result = await service.create_complete_proposal_analysis(data=data)
return CompleteAnalysisResponse(status="success", data=result)
except Exception as e:
logger.exception(e)
raise HTTPException(
status_code=500, detail="Failed to create proposal analysis."
)
async def update_proposal_analysis(
self, analysis: UpdateDetailedAnalysisRequest, id: UUID = Path(...)
):
async with self.service() as service:
try:
data = analysis.model_dump(mode="json", exclude_unset=True)
data["id"] = id
result = await service.update_proposal_analysis(data=data)
return ProposalDetailedAnalysisResponse(
status="success",
data=[ProposalDetailedAnalysis(**r) for r in result],
)
except Exception as e:
logger.exception(e)
raise HTTPException(
status_code=500, detail="Failed to update proposal analysis."
)
async def update_complete_proposal_analysis(
self,
analysis: CreateProposalDetailedAnalysisRequest,
proposal_id: UUID = Path(...),
):
async with self.service() as service:
try:
data = analysis.model_dump(mode="json", exclude_unset=True)
data["proposal_id"] = proposal_id
result = await service.update_complete_proposal_analysis(data=data)
return CompleteAnalysisResponse(status="success", data=result)
except Exception as e:
logger.exception(e)
raise HTTPException(
status_code=500, detail="Failed to update proposal analysis."
)
async def delete_proposal_analysis(self, id: str = Path(...)):
async with self.service() as service:
try:
result = await service.delete_proposal_analysis(id=id)
return DeleteResponse(status="success")
except Exception as e:
logger.exception(e)
raise HTTPException(
status_code=500, detail="Failed to delete proposal analysis."
)
async def delete_proposal_analysis_by_proposal_id(
self, proposal_id: str = Path(...)
):
async with self.service() as service:
try:
result = await service.delete_proposal_analysis(proposal_id=proposal_id)
return DeleteResponse(status="success")
except Exception as e:
logger.exception(e)
raise HTTPException(
status_code=500, detail="Failed to delete proposal analysis."
)
async def delete_proposal_analysis_by_proposal_and_id(
self, proposal_id: str = Path(...), id: str = Path(...)
):
async with self.service() as service:
try:
result = await service.delete_proposal_analysis(
proposal_id=proposal_id, id=id
)
return DeleteResponse(status="success")
except Exception as e:
logger.exception(e)
raise HTTPException(
status_code=500, detail="Failed to delete proposal analysis."
)