| from fastapi import FastAPI, Request |
| from fastapi.staticfiles import StaticFiles |
| from typing import Dict, List |
| from uuid import uuid4 |
|
|
| from .domain import MeetingSchedule |
| from .converters import MeetingScheduleModel, schedule_to_model, model_to_schedule |
| from .demo_data import generate_demo_data |
| from .solver import solver_manager, solution_manager |
| from .score_analysis import ConstraintAnalysisDTO, MatchAnalysisDTO |
|
|
| app = FastAPI(docs_url="/q/swagger-ui") |
|
|
| |
| data_sets: Dict[str, MeetingSchedule] = {} |
|
|
|
|
| @app.get("/demo-data") |
| async def list_demo_data() -> List[str]: |
| """List available demo data sets.""" |
| return ["SMALL", "MEDIUM", "LARGE"] |
|
|
|
|
| @app.get("/demo-data/{dataset_id}") |
| async def get_demo_data(dataset_id: str) -> MeetingScheduleModel: |
| """Get a demo data set by ID.""" |
| domain_schedule = generate_demo_data() |
| return schedule_to_model(domain_schedule) |
|
|
|
|
| @app.get("/schedules") |
| async def list_schedules() -> List[str]: |
| """List all job IDs of submitted schedules.""" |
| return list(data_sets.keys()) |
|
|
|
|
| @app.get("/schedules/{schedule_id}") |
| async def get_schedule(schedule_id: str) -> MeetingScheduleModel: |
| """Get the solution and score for a given job ID.""" |
| if schedule_id not in data_sets: |
| raise ValueError(f"No schedule found with ID {schedule_id}") |
|
|
| schedule = data_sets[schedule_id] |
| solver_status = solver_manager.get_solver_status(schedule_id) |
| schedule.solver_status = solver_status |
|
|
| return schedule_to_model(schedule) |
|
|
|
|
| @app.get("/schedules/{problem_id}/status") |
| async def get_status(problem_id: str) -> Dict: |
| """Get the schedule status and score for a given job ID.""" |
| if problem_id not in data_sets: |
| raise ValueError(f"No schedule found with ID {problem_id}") |
|
|
| schedule = data_sets[problem_id] |
| solver_status = solver_manager.get_solver_status(problem_id) |
|
|
| return { |
| "score": { |
| "hardScore": schedule.score.hard_score if schedule.score else 0, |
| "mediumScore": schedule.score.medium_score if schedule.score else 0, |
| "softScore": schedule.score.soft_score if schedule.score else 0, |
| }, |
| "solverStatus": solver_status.name, |
| } |
|
|
|
|
| @app.delete("/schedules/{problem_id}") |
| async def terminate_solving(problem_id: str) -> MeetingScheduleModel: |
| """Terminate solving for a given job ID.""" |
| if problem_id not in data_sets: |
| raise ValueError(f"No schedule found with ID {problem_id}") |
|
|
| try: |
| solver_manager.terminate_early(problem_id) |
| except Exception as e: |
| print(f"Warning: terminate_early failed for {problem_id}: {e}") |
|
|
| return await get_schedule(problem_id) |
|
|
|
|
| def update_schedule(problem_id: str, schedule: MeetingSchedule) -> None: |
| """Update the schedule in the data sets.""" |
| global data_sets |
| data_sets[problem_id] = schedule |
|
|
|
|
| @app.post("/schedules") |
| async def solve_schedule(request: Request) -> str: |
| json_data = await request.json() |
| job_id = str(uuid4()) |
|
|
| |
| schedule_model = MeetingScheduleModel.model_validate(json_data) |
|
|
| |
| domain_schedule = model_to_schedule(schedule_model) |
|
|
| data_sets[job_id] = domain_schedule |
| solver_manager.solve_and_listen( |
| job_id, domain_schedule, lambda solution: update_schedule(job_id, solution) |
| ) |
| return job_id |
|
|
|
|
| @app.put("/schedules/analyze") |
| async def analyze_schedule(request: Request) -> Dict: |
| """Submit a schedule to analyze its score.""" |
| json_data = await request.json() |
|
|
| |
| schedule_model = MeetingScheduleModel.model_validate(json_data) |
|
|
| |
| domain_schedule = model_to_schedule(schedule_model) |
|
|
| analysis = solution_manager.analyze(domain_schedule) |
|
|
| def serialize_justification(justification): |
| """Convert justification facts to serializable dicts.""" |
| if justification is None: |
| return None |
| facts = [] |
| for fact in getattr(justification, 'facts', []): |
| fact_dict = {'id': getattr(fact, 'id', None)} |
| |
| if hasattr(fact, 'meeting'): |
| fact_dict['type'] = 'assignment' |
| fact_dict['meeting'] = getattr(fact.meeting, 'id', None) if fact.meeting else None |
| |
| elif hasattr(fact, 'person') and hasattr(fact, 'meeting_id'): |
| fact_dict['type'] = 'attendance' |
| fact_dict['personId'] = getattr(fact.person, 'id', None) if fact.person else None |
| fact_dict['meetingId'] = fact.meeting_id |
| facts.append(fact_dict) |
| return {'facts': facts} |
|
|
| |
| constraints = [] |
| for constraint in analysis.constraint_analyses: |
| matches = [ |
| MatchAnalysisDTO( |
| name=match.constraint_ref.constraint_name, |
| score=match.score, |
| justification=serialize_justification(match.justification), |
| ) |
| for match in constraint.matches |
| ] |
|
|
| constraint_dto = ConstraintAnalysisDTO( |
| name=constraint.constraint_name, |
| weight=constraint.weight, |
| score=constraint.score, |
| matches=matches, |
| ) |
| constraints.append(constraint_dto) |
|
|
| return {"constraints": [constraint.model_dump() for constraint in constraints]} |
|
|
|
|
| |
| app.mount("/", StaticFiles(directory="static", html=True), name="static") |
|
|