Spaces:
Sleeping
Sleeping
| from fastapi import FastAPI, HTTPException | |
| from fastapi.staticfiles import StaticFiles | |
| from uuid import uuid4 | |
| from typing import Dict, List | |
| from dataclasses import asdict | |
| from enum import Enum | |
| import logging | |
| from .domain import VMPlacementPlan, VMPlacementPlanModel | |
| from .converters import plan_to_model, model_to_plan | |
| from .demo_data import generate_demo_data, generate_custom_data, DemoData | |
| from .solver import solver_manager, solution_manager | |
| from pydantic import BaseModel, Field | |
| logger = logging.getLogger(__name__) | |
| app = FastAPI(docs_url='/q/swagger-ui') | |
| data_sets: Dict[str, VMPlacementPlan] = {} | |
| class ConstraintMatchDTO(BaseModel): | |
| name: str | |
| score: str | |
| justification: str | |
| class ConstraintAnalysisDTO(BaseModel): | |
| name: str | |
| weight: str | |
| score: str | |
| matches: List[ConstraintMatchDTO] | |
| async def get_demo_data(): | |
| """Get available demo data sets.""" | |
| return [demo.name for demo in DemoData] | |
| async def get_demo_data_by_name(demo_name: str) -> VMPlacementPlanModel: | |
| """Get a specific demo data set.""" | |
| try: | |
| demo_data = DemoData[demo_name] | |
| domain_plan = generate_demo_data(demo_data) | |
| return plan_to_model(domain_plan) | |
| except KeyError: | |
| raise HTTPException(status_code=404, detail=f"Demo data '{demo_name}' not found") | |
| class CustomDataRequest(BaseModel): | |
| rack_count: int = Field(default=3, ge=1, le=8, description="Number of racks") | |
| servers_per_rack: int = Field(default=4, ge=2, le=10, description="Servers per rack") | |
| vm_count: int = Field(default=20, ge=5, le=200, description="Number of VMs") | |
| async def generate_custom_demo_data(request: CustomDataRequest) -> VMPlacementPlanModel: | |
| """Generate custom demo data with configurable infrastructure and workload.""" | |
| domain_plan = generate_custom_data( | |
| rack_count=request.rack_count, | |
| servers_per_rack=request.servers_per_rack, | |
| vm_count=request.vm_count | |
| ) | |
| return plan_to_model(domain_plan) | |
| async def get_placement(problem_id: str) -> VMPlacementPlanModel: | |
| """Get the current VM placement solution for a given job ID.""" | |
| placement = data_sets.get(problem_id) | |
| if not placement: | |
| raise HTTPException(status_code=404, detail="Placement plan not found") | |
| placement.solver_status = solver_manager.get_solver_status(problem_id) | |
| return plan_to_model(placement) | |
| async def solve_placement(plan_model: VMPlacementPlanModel) -> str: | |
| """Start solving a VM placement problem. Returns a job ID.""" | |
| job_id = str(uuid4()) | |
| domain_plan = model_to_plan(plan_model) | |
| data_sets[job_id] = domain_plan | |
| solver_manager.solve_and_listen( | |
| job_id, | |
| domain_plan, | |
| lambda solution: data_sets.update({job_id: solution}) | |
| ) | |
| return job_id | |
| async def analyze_placement(plan_model: VMPlacementPlanModel) -> dict: | |
| """Analyze constraints for a given VM placement solution.""" | |
| domain_plan = model_to_plan(plan_model) | |
| analysis = solution_manager.analyze(domain_plan) | |
| constraints = [] | |
| for constraint in getattr(analysis, 'constraint_analyses', []) or []: | |
| matches = [ | |
| ConstraintMatchDTO( | |
| name=str(getattr(getattr(match, 'constraint_ref', None), 'constraint_name', "")), | |
| score=str(getattr(match, 'score', "0hard/0soft")), | |
| justification=str(getattr(match, 'justification', "")) | |
| ) | |
| for match in getattr(constraint, 'matches', []) or [] | |
| ] | |
| constraints.append(ConstraintAnalysisDTO( | |
| name=str(getattr(constraint, 'constraint_name', "")), | |
| weight=str(getattr(constraint, 'weight', "0hard/0soft")), | |
| score=str(getattr(constraint, 'score', "0hard/0soft")), | |
| matches=matches | |
| )) | |
| return {"constraints": [c.model_dump() for c in constraints]} | |
| async def list_placements() -> List[str]: | |
| """List the job IDs of all submitted placement problems.""" | |
| return list(data_sets.keys()) | |
| async def get_placement_status(problem_id: str) -> dict: | |
| """Get the placement status and score for a given job ID.""" | |
| placement = data_sets.get(problem_id) | |
| if not placement: | |
| raise HTTPException(status_code=404, detail="Placement plan not found") | |
| solver_status = solver_manager.get_solver_status(problem_id) | |
| return { | |
| "name": placement.name, | |
| "score": str(placement.score) if placement.score else None, | |
| "solverStatus": solver_status.name if solver_status else None, | |
| "activeServers": placement.active_servers, | |
| "unassignedVms": placement.unassigned_vms, | |
| } | |
| async def stop_solving(problem_id: str) -> VMPlacementPlanModel: | |
| """Terminate solving for a given job ID. Returns the best solution so far.""" | |
| solver_manager.terminate_early(problem_id) | |
| placement = data_sets.get(problem_id) | |
| if not placement: | |
| raise HTTPException(status_code=404, detail="Placement plan not found") | |
| placement.solver_status = solver_manager.get_solver_status(problem_id) | |
| return plan_to_model(placement) | |
| app.mount("/", StaticFiles(directory="static", html=True), name="static") | |