File size: 7,663 Bytes
bbc1784 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 | # Copyright (c) 2026 CtrlAltWin Team
"""
Deterministic Grader — Scores packing quality from 0.0 to 1.0.
Scoring formula:
score = 0.4 * validity + 0.3 * efficiency + 0.2 * constraints + 0.1 * neatness
Each component:
validity — food placed in type-compatible container?
efficiency — space utilization vs total capacity used
constraints — temperature separation, fragility, flavor isolation
neatness — all items packed? nothing dropped?
"""
from __future__ import annotations
from typing import Any, Dict, List, Optional
from .tasks import TaskConfig
from .simulation.engine import is_type_compatible
def grade(
packing_log: List[Dict[str, Any]],
task_config: TaskConfig,
) -> float:
"""
Grade a packing episode. Returns score between 0.0 and 1.0.
Args:
packing_log: List of placement records from the simulation.
task_config: The task configuration used for this episode.
Returns:
Final score (0.0 to 1.0), rounded to 4 decimal places.
"""
total_items = len(task_config.food_items)
if total_items == 0:
return 0.0
# ---- Validity (40%) ----
validity = _score_validity(packing_log, total_items)
# ---- Efficiency (30%) ----
efficiency = _score_efficiency(packing_log, task_config)
# ---- Constraint Satisfaction (20%) ----
constraints = _score_constraints(packing_log, task_config)
# ---- Neatness (10%) ----
neatness = _score_neatness(packing_log, total_items)
# ---- Final score ----
score = 0.4 * validity + 0.3 * efficiency + 0.2 * constraints + 0.1 * neatness
return round(max(0.0, min(1.0, score)), 4)
def grade_detailed(
packing_log: List[Dict[str, Any]],
task_config: TaskConfig,
) -> Dict[str, Any]:
"""Grade with full breakdown for debugging."""
total_items = len(task_config.food_items)
validity = _score_validity(packing_log, total_items)
efficiency = _score_efficiency(packing_log, task_config)
constraints = _score_constraints(packing_log, task_config)
neatness = _score_neatness(packing_log, total_items)
score = 0.4 * validity + 0.3 * efficiency + 0.2 * constraints + 0.1 * neatness
score = round(max(0.0, min(1.0, score)), 4)
return {
"final_score": score,
"validity": round(validity, 4),
"efficiency": round(efficiency, 4),
"constraints": round(constraints, 4),
"neatness": round(neatness, 4),
"items_packed": len(packing_log),
"total_items": total_items,
"weights": {
"validity": 0.4,
"efficiency": 0.3,
"constraints": 0.2,
"neatness": 0.1,
},
}
# -----------------------------------------------------------------------
# Component scorers
# -----------------------------------------------------------------------
def _score_validity(packing_log: List[Dict], total_items: int) -> float:
"""Score: food placed in type-compatible container? (0-1)"""
if not packing_log:
return 0.0
correct = sum(1 for entry in packing_log if entry.get("type_compatible", False))
return correct / max(total_items, 1)
def _score_efficiency(packing_log: List[Dict], task_config: TaskConfig) -> float:
"""Score: how well is container space utilized? (0-1)"""
if not packing_log:
return 0.0
total_food_vol = sum(entry.get("food_volume", 0) for entry in packing_log)
# Find which containers were used
used_container_ids = set(entry.get("container_id") for entry in packing_log)
total_capacity = sum(
c.capacity_ml
for c in task_config.containers
if c.id in used_container_ids
)
if total_capacity == 0:
return 0.0
utilization = total_food_vol / total_capacity
# Penalize overflow
overflow_count = sum(1 for entry in packing_log if entry.get("overflow", False))
if overflow_count > 0:
utilization *= max(0.3, 1.0 - 0.2 * overflow_count)
return min(1.0, utilization)
def _score_constraints(packing_log: List[Dict], task_config: TaskConfig) -> float:
"""Score: task-specific constraints satisfied? (0-1)"""
if not packing_log:
return 0.0
scores = []
active = set(task_config.constraints)
if "temperature_separation" in active:
scores.append(_check_temperature(packing_log))
if "fragility_ordering" in active:
scores.append(_check_fragility(packing_log))
if "flavor_isolation" in active:
scores.append(_check_flavor_isolation(packing_log))
if "no_overflow" in active:
overflow_count = sum(1 for e in packing_log if e.get("overflow", False))
scores.append(1.0 if overflow_count == 0 else max(0.0, 1.0 - 0.3 * overflow_count))
if "type_match" in active:
correct = sum(1 for e in packing_log if e.get("type_compatible", False))
scores.append(correct / max(len(packing_log), 1))
if not scores:
return 1.0 # no constraints to violate
return sum(scores) / len(scores)
def _check_temperature(packing_log: List[Dict]) -> float:
"""Check if hot and cold items are kept separate."""
# Group items by container
container_temps: Dict[int, List[str]] = {}
for entry in packing_log:
cid = entry.get("container_id")
temp = entry.get("food_temperature", "room")
container_temps.setdefault(cid, []).append(temp)
violations = 0
total_containers = len(container_temps)
for temps in container_temps.values():
if "hot" in temps and "cold" in temps:
violations += 1
if total_containers == 0:
return 1.0
return max(0.0, 1.0 - violations / total_containers)
def _check_fragility(packing_log: List[Dict]) -> float:
"""Check if fragile items are not crushed by heavy items placed after them."""
# Group by container, check placement order
container_order: Dict[int, List[float]] = {}
for entry in packing_log:
cid = entry.get("container_id")
frag = entry.get("food_fragility", 0.5)
container_order.setdefault(cid, []).append(frag)
violations = 0
checks = 0
for fragilites in container_order.values():
for i in range(1, len(fragilites)):
checks += 1
# If a less fragile (heavy) item is placed AFTER a more fragile item
if fragilites[i] < 0.4 and fragilites[i - 1] > 0.6:
violations += 1
if checks == 0:
return 1.0
return max(0.0, 1.0 - violations / max(checks, 1))
def _check_flavor_isolation(packing_log: List[Dict]) -> float:
"""Check that strong-flavor items (pickle, chutney) are isolated."""
strong_flavors = {"pickle", "chutney"}
# Group by container
container_contents: Dict[int, List[str]] = {}
for entry in packing_log:
cid = entry.get("container_id")
name = entry.get("food_name", "")
container_contents.setdefault(cid, []).append(name)
violations = 0
total = 0
for contents in container_contents.values():
has_strong = any(c in strong_flavors for c in contents)
has_others = any(c not in strong_flavors for c in contents)
if has_strong and has_others and len(contents) > 1:
violations += 1
total += 1
elif has_strong:
total += 1
if total == 0:
return 1.0
return max(0.0, 1.0 - violations / max(total, 1))
def _score_neatness(packing_log: List[Dict], total_items: int) -> float:
"""Score: fraction of items successfully packed. (0-1)"""
if total_items == 0:
return 0.0
return len(packing_log) / total_items
|