Spaces:
Sleeping
Sleeping
File size: 8,850 Bytes
177c40c |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 |
"""
REST API endpoints.
Provides a standard API for:
- Listing and fetching demo data
- Starting/stopping solving
- Getting solution status
- Analyzing scores
"""
from fastapi import FastAPI
from fastapi.staticfiles import StaticFiles
from uuid import uuid4
from dataclasses import replace
from typing import Dict, List
import os
from .domain import Schedule, ScheduleModel, Resource, Task, ConstraintWeightsModel
from .demo_data import DemoData, generate_demo_data
from .solver import solver_manager, solution_manager
from . import constraints # For setting global weights
app = FastAPI(
title="SolverForge Quickstart",
description="Constraint optimization API",
docs_url='/q/swagger-ui'
)
# In-memory storage for solving jobs
data_sets: dict[str, Schedule] = {}
# =============================================================================
# DEMO DATA ENDPOINTS
# =============================================================================
@app.get("/demo-data")
async def demo_data_list() -> list[DemoData]:
"""List available demo datasets."""
return [e for e in DemoData]
@app.get("/demo-data/{dataset_id}", response_model_exclude_none=True)
async def get_demo_data(dataset_id: str) -> ScheduleModel:
"""Get a specific demo dataset."""
demo_data = getattr(DemoData, dataset_id)
domain_schedule = generate_demo_data(demo_data)
return _schedule_to_model(domain_schedule)
# =============================================================================
# SOLVING ENDPOINTS
# =============================================================================
@app.post("/schedules")
async def solve(schedule_model: ScheduleModel) -> str:
"""
Start solving a schedule.
Returns a job ID that can be used to check progress and get results.
Accepts optional constraint_weights to adjust constraint penalties.
"""
job_id = str(uuid4())
# Set constraint weights globally before solving
if schedule_model.constraint_weights:
weights = schedule_model.constraint_weights
constraints.CONSTRAINT_WEIGHTS = {
'required_skill': weights.required_skill,
'resource_capacity': weights.resource_capacity,
'minimize_duration': weights.minimize_duration,
'balance_load': weights.balance_load,
}
else:
# Reset to defaults
constraints.CONSTRAINT_WEIGHTS = {
'required_skill': 100,
'resource_capacity': 100,
'minimize_duration': 50,
'balance_load': 50,
}
schedule = _model_to_schedule(schedule_model)
data_sets[job_id] = schedule
solver_manager.solve_and_listen(
job_id,
schedule,
lambda solution: _update_schedule(job_id, solution)
)
return job_id
@app.get("/schedules")
async def list_schedules() -> List[str]:
"""List all job IDs."""
return list(data_sets.keys())
@app.get("/schedules/{job_id}", response_model_exclude_none=True)
async def get_schedule(job_id: str) -> ScheduleModel:
"""Get the current solution for a job."""
if job_id not in data_sets:
raise ValueError(f"No schedule found with ID {job_id}")
schedule = data_sets[job_id]
updated = replace(schedule, solver_status=solver_manager.get_solver_status(job_id))
return _schedule_to_model(updated)
@app.get("/schedules/{job_id}/status")
async def get_status(job_id: str) -> Dict:
"""Get solving status and score."""
if job_id not in data_sets:
raise ValueError(f"No schedule found with ID {job_id}")
schedule = data_sets[job_id]
solver_status = solver_manager.get_solver_status(job_id)
return {
"score": {
"hardScore": schedule.score.hard_score if schedule.score else 0,
"softScore": schedule.score.soft_score if schedule.score else 0,
},
"solverStatus": solver_status.name,
}
@app.delete("/schedules/{job_id}")
async def stop_solving(job_id: str) -> ScheduleModel:
"""Stop solving and return current solution."""
if job_id not in data_sets:
raise ValueError(f"No schedule found with ID {job_id}")
try:
solver_manager.terminate_early(job_id)
except Exception as e:
print(f"Warning: terminate_early failed for {job_id}: {e}")
return await get_schedule(job_id)
@app.put("/schedules/analyze")
async def analyze(schedule_model: ScheduleModel) -> Dict:
"""Analyze a schedule's score breakdown."""
schedule = _model_to_schedule(schedule_model)
analysis = solution_manager.analyze(schedule)
constraints = []
for constraint in getattr(analysis, 'constraint_analyses', []) or []:
matches = [
{
"name": str(getattr(getattr(match, 'constraint_ref', None), 'constraint_name', "")),
"score": str(getattr(match, 'score', "0hard/0soft")),
"justification": str(getattr(match, 'justification', "")),
}
for match in getattr(constraint, 'matches', []) or []
]
constraints.append({
"name": str(getattr(constraint, 'constraint_name', "")),
"weight": str(getattr(constraint, 'weight', "0hard/0soft")),
"score": str(getattr(constraint, 'score', "0hard/0soft")),
"matches": matches,
})
return {"constraints": constraints}
# =============================================================================
# HELPER FUNCTIONS
# =============================================================================
def _update_schedule(job_id: str, schedule: Schedule):
"""Callback for solver updates."""
global data_sets
data_sets[job_id] = schedule
def _schedule_to_model(schedule: Schedule) -> ScheduleModel:
"""Convert domain Schedule to Pydantic model."""
from .domain import ResourceModel, TaskModel, ScheduleModel
resources = [
ResourceModel(
name=r.name,
capacity=r.capacity,
skills=list(r.skills),
)
for r in schedule.resources
]
tasks = [
TaskModel(
id=t.id,
name=t.name,
duration=t.duration,
requiredSkill=t.required_skill,
resource=t.resource.name if t.resource else None,
)
for t in schedule.tasks
]
return ScheduleModel(
resources=resources,
tasks=tasks,
score=str(schedule.score) if schedule.score else None,
solverStatus=schedule.solver_status.name if schedule.solver_status else None,
)
def _model_to_schedule(model: ScheduleModel) -> Schedule:
"""Convert Pydantic model to domain Schedule."""
resources = {
r.name: Resource(
name=r.name,
capacity=r.capacity,
skills=set(r.skills),
)
for r in model.resources
}
tasks = [
Task(
id=t.id,
name=t.name,
duration=t.duration,
required_skill=t.required_skill or "",
resource=resources.get(t.resource) if t.resource else None,
)
for t in model.tasks
]
return Schedule(
resources=list(resources.values()),
tasks=tasks,
)
# =============================================================================
# SOURCE CODE VIEWER ENDPOINTS
# =============================================================================
# Whitelist of files that can be viewed
SOURCE_FILES = {
'domain.py': 'src/my_quickstart/domain.py',
'constraints.py': 'src/my_quickstart/constraints.py',
'solver.py': 'src/my_quickstart/solver.py',
'rest_api.py': 'src/my_quickstart/rest_api.py',
'demo_data.py': 'src/my_quickstart/demo_data.py',
'index.html': 'static/index.html',
'app.js': 'static/app.js',
'app.css': 'static/app.css',
}
@app.get("/source-code")
async def list_source_files() -> List[str]:
"""List available source files for the code viewer."""
return list(SOURCE_FILES.keys())
@app.get("/source-code/{filename}")
async def get_source_code(filename: str) -> Dict:
"""Get the contents of a source file."""
if filename not in SOURCE_FILES:
raise ValueError(f"File not available: {filename}")
filepath = SOURCE_FILES[filename]
if not os.path.exists(filepath):
raise ValueError(f"File not found: {filepath}")
with open(filepath, 'r') as f:
content = f.read()
return {"filename": filename, "content": content}
# =============================================================================
# STATIC FILES (optional web UI)
# =============================================================================
# Mount static files if directory exists
if os.path.exists("static"):
app.mount("/", StaticFiles(directory="static", html=True), name="static")
|