File size: 4,548 Bytes
e7cf451 e510416 e7cf451 e510416 e7cf451 e510416 e7cf451 e510416 e7cf451 e510416 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 |
from fastapi import FastAPI, Request
from fastapi.staticfiles import StaticFiles
from uuid import uuid4
from dataclasses import replace
from typing import Dict, List
from .domain import EmployeeSchedule, EmployeeScheduleModel
from .converters import (
schedule_to_model, model_to_schedule
)
from .demo_data import DemoData, generate_demo_data
from .solver import solver_manager, solution_manager
from .score_analysis import ConstraintAnalysisDTO, MatchAnalysisDTO
app = FastAPI(docs_url='/q/swagger-ui')
data_sets: dict[str, EmployeeSchedule] = {}
@app.get("/demo-data")
async def demo_data_list() -> list[DemoData]:
return [e for e in DemoData]
@app.get("/demo-data/{dataset_id}", response_model_exclude_none=True)
async def get_demo_data(dataset_id: str) -> EmployeeScheduleModel:
demo_data = getattr(DemoData, dataset_id)
domain_schedule = generate_demo_data(demo_data)
return schedule_to_model(domain_schedule)
@app.get("/schedules/{problem_id}", response_model_exclude_none=True)
async def get_timetable(problem_id: str) -> EmployeeScheduleModel:
schedule = data_sets[problem_id]
updated_schedule = replace(schedule, solver_status=solver_manager.get_solver_status(problem_id))
return schedule_to_model(updated_schedule)
def update_schedule(problem_id: str, schedule: EmployeeSchedule):
global data_sets
data_sets[problem_id] = schedule
@app.post("/schedules")
async def solve_timetable(schedule_model: EmployeeScheduleModel) -> str:
job_id = str(uuid4())
schedule = model_to_schedule(schedule_model)
data_sets[job_id] = schedule
solver_manager.solve_and_listen(job_id, schedule,
lambda solution: update_schedule(job_id, solution))
return job_id
@app.get("/schedules")
async def list_schedules() -> List[str]:
"""List all job IDs of submitted schedules."""
return list(data_sets.keys())
@app.get("/schedules/{problem_id}/status")
async def get_status(problem_id: str) -> Dict:
"""Get the schedule status and score for a given job ID."""
if problem_id not in data_sets:
raise ValueError(f"No schedule found with ID {problem_id}")
schedule = data_sets[problem_id]
solver_status = solver_manager.get_solver_status(problem_id)
return {
"score": {
"hardScore": schedule.score.hard_score if schedule.score else 0,
"softScore": schedule.score.soft_score if schedule.score else 0,
},
"solverStatus": solver_status.name,
}
@app.delete("/schedules/{problem_id}")
async def stop_solving(problem_id: str) -> EmployeeScheduleModel:
"""Terminate solving for a given job ID."""
if problem_id not in data_sets:
raise ValueError(f"No schedule found with ID {problem_id}")
try:
solver_manager.terminate_early(problem_id)
except Exception as e:
print(f"Warning: terminate_early failed for {problem_id}: {e}")
return await get_timetable(problem_id)
@app.put("/schedules/analyze")
async def analyze_schedule(request: Request) -> Dict:
"""Submit a schedule to analyze its score."""
json_data = await request.json()
# Parse the incoming JSON using Pydantic models
schedule_model = EmployeeScheduleModel.model_validate(json_data)
# Convert to domain model for analysis
domain_schedule = model_to_schedule(schedule_model)
analysis = solution_manager.analyze(domain_schedule)
# Convert to proper DTOs for correct serialization
# Use str() for scores and justification to avoid Java object serialization issues
constraints = []
for constraint in getattr(analysis, 'constraint_analyses', []) or []:
matches = [
MatchAnalysisDTO(
name=str(getattr(getattr(match, 'constraint_ref', None), 'constraint_name', "")),
score=str(getattr(match, 'score', "0hard/0soft")),
justification=str(getattr(match, 'justification', "")),
)
for match in getattr(constraint, 'matches', []) or []
]
constraint_dto = ConstraintAnalysisDTO(
name=str(getattr(constraint, 'constraint_name', "")),
weight=str(getattr(constraint, 'weight', "0hard/0soft")),
score=str(getattr(constraint, 'score', "0hard/0soft")),
matches=matches,
)
constraints.append(constraint_dto)
return {"constraints": [constraint.model_dump() for constraint in constraints]}
app.mount("/", StaticFiles(directory="static", html=True), name="static")
|