Spaces:
Running
Running
| from __future__ import annotations | |
| import copy | |
| import math | |
| import random | |
| import uuid | |
| from typing import Dict, List, Optional, Sequence, Tuple | |
| try: | |
| from openenv.core.env_server import Environment | |
| except Exception: # pragma: no cover | |
| class Environment: | |
| pass | |
| try: | |
| from ..models import DesignGymAction, DesignGymObservation, DesignGymState | |
| except Exception: # pragma: no cover | |
| from models import DesignGymAction, DesignGymObservation, DesignGymState | |
| try: | |
| from .briefs import choose_brief | |
| from .phases import get_phase, allowed_actions_for_phase, phase_score_for_action | |
| from .rewards import instruction_score, critic_feedback, compose_reward | |
| except Exception: # pragma: no cover | |
| from server.briefs import choose_brief | |
| from server.phases import get_phase, allowed_actions_for_phase, phase_score_for_action | |
| from server.rewards import instruction_score, critic_feedback, compose_reward | |
| EPS = 1e-9 | |
| def _clamp(v: float, lo: float, hi: float) -> float: | |
| return max(lo, min(hi, v)) | |
| def _safe_exp(value: float) -> float: | |
| return math.exp(max(-50.0, min(50.0, value))) | |
| def _area(box: Sequence[float]) -> float: | |
| return max(0.0, box[2]) * max(0.0, box[3]) | |
| def _intersect(a: Sequence[float], b: Sequence[float]) -> float: | |
| left = max(a[0], b[0]) | |
| top = max(a[1], b[1]) | |
| right = min(a[0] + a[2], b[0] + b[2]) | |
| bottom = min(a[1] + a[3], b[1] + b[3]) | |
| return max(0.0, right - left) * max(0.0, bottom - top) | |
| def _center(box: Sequence[float]) -> Tuple[float, float]: | |
| return (box[0] + box[2] / 2.0, box[1] + box[3] / 2.0) | |
| def _anchors(box: Sequence[float]) -> Dict[str, float]: | |
| x, y, w, h = box | |
| return { | |
| "left": x, | |
| "center": x + w / 2.0, | |
| "right": x + w, | |
| "top": y, | |
| "middle": y + h / 2.0, | |
| "bottom": y + h, | |
| } | |
| def _mean(values: Sequence[float]) -> float: | |
| return sum(values) / len(values) if values else 0.0 | |
| def _std(values: Sequence[float]) -> float: | |
| if len(values) < 2: | |
| return 0.0 | |
| m = _mean(values) | |
| return math.sqrt(sum((v - m) ** 2 for v in values) / len(values)) | |
| def _rank(values: Sequence[float]) -> List[float]: | |
| indexed = list(enumerate(values)) | |
| indexed.sort(key=lambda item: item[1]) | |
| ranks = [0.0] * len(values) | |
| i = 0 | |
| while i < len(indexed): | |
| j = i | |
| while j + 1 < len(indexed) and indexed[j + 1][1] == indexed[i][1]: | |
| j += 1 | |
| avg_rank = (i + j + 2) / 2.0 | |
| for k in range(i, j + 1): | |
| ranks[indexed[k][0]] = avg_rank | |
| i = j + 1 | |
| return ranks | |
| def _spearman(a: Sequence[float], b: Sequence[float]) -> float: | |
| if len(a) != len(b) or len(a) < 2: | |
| return 0.0 | |
| ra, rb = _rank(a), _rank(b) | |
| ma, mb = _mean(ra), _mean(rb) | |
| num = sum((x - ma) * (y - mb) for x, y in zip(ra, rb)) | |
| den_a = math.sqrt(sum((x - ma) ** 2 for x in ra)) | |
| den_b = math.sqrt(sum((y - mb) ** 2 for y in rb)) | |
| if den_a <= EPS or den_b <= EPS: | |
| return 0.0 | |
| return num / (den_a * den_b) | |
| def _deepcopy_elements(elements: List[Dict[str, object]]) -> List[Dict[str, object]]: | |
| return [copy.deepcopy(e) for e in elements] | |
| def _element_map(elements: List[Dict[str, object]]) -> Dict[str, Dict[str, object]]: | |
| return {str(e["id"]): e for e in elements} | |
| def _el( | |
| element_id: str, | |
| role: str, | |
| typ: str, | |
| importance: float, | |
| group: str, | |
| content_len: int, | |
| min_size: List[float], | |
| max_size: List[float], | |
| aspect_ratio: Optional[float], | |
| precedence: int, | |
| ) -> Dict[str, object]: | |
| return { | |
| "id": element_id, | |
| "role": role, | |
| "type": typ, | |
| "importance": importance, | |
| "group": group, | |
| "content_len": content_len, | |
| "min_size": min_size, | |
| "max_size": max_size, | |
| "aspect_ratio": aspect_ratio, | |
| "precedence": precedence, | |
| "movable": True, | |
| "resizable": True, | |
| } | |
| TASKS: Dict[str, Dict[str, object]] = { | |
| "poster_basic_v1": { | |
| "instance_id": "poster_basic_001", | |
| "max_steps": 7, | |
| "occupancy_target": 0.58, | |
| "occupancy_tolerance": 0.20, | |
| "init_noise": 0.020, | |
| "text_density_target": 0.62, | |
| "intent_regions": { | |
| "title": "top_band", | |
| "subtitle": "top_band", | |
| "hero_image": "hero_center", | |
| "cta": "safe_lower_right", | |
| "logo": "top_right", | |
| "badge": "right_column", | |
| }, | |
| "weights": { | |
| "overlap": 0.14, | |
| "alignment": 0.12, | |
| "spacing": 0.09, | |
| "balance": 0.08, | |
| "hierarchy": 0.14, | |
| "grouping": 0.07, | |
| "reading_order": 0.08, | |
| "aspect_ratio": 0.08, | |
| "occupancy": 0.08, | |
| "text_fit": 0.04, | |
| "negative_space": 0.04, | |
| "intent_fit": 0.04, | |
| }, | |
| "templates": { | |
| "hero": { | |
| "title": [0.08, 0.07, 0.68, 0.13], | |
| "subtitle": [0.08, 0.21, 0.56, 0.08], | |
| "hero_image": [0.08, 0.33, 0.64, 0.40], | |
| "cta": [0.08, 0.79, 0.28, 0.10], | |
| "logo": [0.78, 0.08, 0.14, 0.14], | |
| "badge": [0.74, 0.72, 0.18, 0.12], | |
| }, | |
| "split": { | |
| "title": [0.06, 0.08, 0.42, 0.12], | |
| "subtitle": [0.06, 0.22, 0.36, 0.08], | |
| "hero_image": [0.52, 0.08, 0.40, 0.58], | |
| "cta": [0.06, 0.74, 0.26, 0.10], | |
| "logo": [0.06, 0.88, 0.12, 0.08], | |
| "badge": [0.66, 0.70, 0.18, 0.12], | |
| }, | |
| "draft": { | |
| "title": [0.10, 0.08, 0.50, 0.12], | |
| "subtitle": [0.12, 0.24, 0.44, 0.08], | |
| "hero_image": [0.22, 0.34, 0.58, 0.36], | |
| "cta": [0.10, 0.76, 0.30, 0.10], | |
| "logo": [0.72, 0.10, 0.14, 0.14], | |
| "badge": [0.68, 0.72, 0.20, 0.14], | |
| }, | |
| }, | |
| "canvas": {"width": 1.0, "height": 1.0, "safe_margin": [0.04, 0.04, 0.04, 0.04], "forbidden_regions": []}, | |
| "reading_order": [["title", "subtitle"], ["subtitle", "cta"]], | |
| "elements": [ | |
| _el("title", "title", "text", 1.0, "headline", 46, [0.22, 0.08], [0.82, 0.18], None, 1), | |
| _el("subtitle", "subtitle", "text", 0.78, "headline", 74, [0.20, 0.06], [0.72, 0.12], None, 2), | |
| _el("hero_image", "image", "image", 0.92, "hero", 0, [0.30, 0.24], [0.82, 0.58], 1.6, 3), | |
| _el("cta", "cta", "text", 0.86, "footer", 18, [0.18, 0.08], [0.40, 0.14], None, 4), | |
| _el("logo", "logo", "image", 0.55, "brand", 0, [0.10, 0.08], [0.18, 0.18], 1.0, 5), | |
| _el("badge", "badge", "shape", 0.62, "support", 10, [0.12, 0.08], [0.24, 0.18], None, 6), | |
| ], | |
| }, | |
| "editorial_cover_v1": { | |
| "instance_id": "editorial_cover_001", | |
| "max_steps": 9, | |
| "occupancy_target": 0.62, | |
| "occupancy_tolerance": 0.18, | |
| "init_noise": 0.018, | |
| "text_density_target": 0.70, | |
| "intent_regions": { | |
| "masthead": "top_band", | |
| "hero_image": "hero_center", | |
| "headline_1": "lower_left", | |
| "headline_2": "lower_left", | |
| "headline_3": "lower_left", | |
| "teaser": "right_column", | |
| "barcode": "footer_strip", | |
| "logo": "footer_left", | |
| }, | |
| "weights": { | |
| "overlap": 0.12, | |
| "alignment": 0.11, | |
| "spacing": 0.09, | |
| "balance": 0.07, | |
| "hierarchy": 0.12, | |
| "grouping": 0.08, | |
| "reading_order": 0.13, | |
| "aspect_ratio": 0.06, | |
| "occupancy": 0.06, | |
| "text_fit": 0.05, | |
| "negative_space": 0.05, | |
| "intent_fit": 0.06, | |
| }, | |
| "templates": { | |
| "editorial": { | |
| "masthead": [0.08, 0.05, 0.72, 0.10], | |
| "hero_image": [0.10, 0.18, 0.78, 0.44], | |
| "headline_1": [0.12, 0.66, 0.56, 0.10], | |
| "headline_2": [0.12, 0.77, 0.52, 0.08], | |
| "headline_3": [0.12, 0.86, 0.46, 0.06], | |
| "teaser": [0.72, 0.67, 0.16, 0.12], | |
| "barcode": [0.80, 0.88, 0.10, 0.08], | |
| "logo": [0.08, 0.88, 0.12, 0.08], | |
| }, | |
| "grid": { | |
| "masthead": [0.08, 0.06, 0.70, 0.09], | |
| "hero_image": [0.08, 0.20, 0.44, 0.50], | |
| "headline_1": [0.56, 0.22, 0.30, 0.12], | |
| "headline_2": [0.56, 0.37, 0.28, 0.10], | |
| "headline_3": [0.56, 0.50, 0.26, 0.08], | |
| "teaser": [0.56, 0.64, 0.24, 0.12], | |
| "barcode": [0.78, 0.88, 0.12, 0.08], | |
| "logo": [0.08, 0.88, 0.12, 0.08], | |
| }, | |
| "draft": { | |
| "masthead": [0.10, 0.06, 0.62, 0.10], | |
| "hero_image": [0.14, 0.22, 0.68, 0.38], | |
| "headline_1": [0.12, 0.63, 0.54, 0.10], | |
| "headline_2": [0.16, 0.76, 0.46, 0.08], | |
| "headline_3": [0.18, 0.86, 0.40, 0.06], | |
| "teaser": [0.72, 0.66, 0.16, 0.12], | |
| "barcode": [0.78, 0.88, 0.12, 0.08], | |
| "logo": [0.10, 0.88, 0.12, 0.08], | |
| }, | |
| }, | |
| "canvas": {"width": 1.0, "height": 1.0, "safe_margin": [0.04, 0.04, 0.04, 0.04], "forbidden_regions": []}, | |
| "reading_order": [["masthead", "headline_1"], ["headline_1", "headline_2"], ["headline_2", "headline_3"]], | |
| "elements": [ | |
| _el("masthead", "title", "text", 1.0, "header", 24, [0.40, 0.07], [0.82, 0.14], None, 1), | |
| _el("hero_image", "image", "image", 0.94, "hero", 0, [0.32, 0.28], [0.82, 0.58], None, 2), | |
| _el("headline_1", "title", "text", 0.88, "stories", 38, [0.28, 0.08], [0.64, 0.14], None, 3), | |
| _el("headline_2", "subtitle", "text", 0.78, "stories", 34, [0.26, 0.06], [0.56, 0.12], None, 4), | |
| _el("headline_3", "subtitle", "text", 0.68, "stories", 28, [0.20, 0.05], [0.48, 0.10], None, 5), | |
| _el("teaser", "badge", "text", 0.55, "support", 18, [0.12, 0.08], [0.28, 0.16], None, 6), | |
| _el("barcode", "caption", "shape", 0.25, "footer", 0, [0.08, 0.06], [0.16, 0.12], 1.5, 7), | |
| _el("logo", "logo", "image", 0.48, "brand", 0, [0.10, 0.06], [0.18, 0.12], 1.5, 8), | |
| ], | |
| }, | |
| "dense_flyer_v1": { | |
| "instance_id": "dense_flyer_001", | |
| "max_steps": 10, | |
| "occupancy_target": 0.70, | |
| "occupancy_tolerance": 0.16, | |
| "init_noise": 0.016, | |
| "text_density_target": 0.76, | |
| "intent_regions": { | |
| "title": "top_band", | |
| "image_left": "left_column", | |
| "image_right": "right_column", | |
| "price_badge": "upper_right", | |
| "cta": "safe_lower_right", | |
| "details": "middle_band", | |
| "caption_1": "lower_left", | |
| "caption_2": "lower_right", | |
| "sponsor_strip": "footer_strip", | |
| }, | |
| "weights": { | |
| "overlap": 0.12, | |
| "alignment": 0.11, | |
| "spacing": 0.11, | |
| "balance": 0.05, | |
| "hierarchy": 0.09, | |
| "grouping": 0.10, | |
| "reading_order": 0.09, | |
| "aspect_ratio": 0.05, | |
| "occupancy": 0.10, | |
| "text_fit": 0.06, | |
| "negative_space": 0.05, | |
| "intent_fit": 0.07, | |
| }, | |
| "templates": { | |
| "grid": { | |
| "title": [0.06, 0.06, 0.60, 0.10], | |
| "image_left": [0.06, 0.20, 0.28, 0.24], | |
| "image_right": [0.38, 0.20, 0.28, 0.24], | |
| "price_badge": [0.72, 0.20, 0.18, 0.12], | |
| "cta": [0.72, 0.36, 0.18, 0.10], | |
| "details": [0.06, 0.50, 0.60, 0.16], | |
| "caption_1": [0.06, 0.70, 0.26, 0.10], | |
| "caption_2": [0.36, 0.70, 0.26, 0.10], | |
| "sponsor_strip": [0.06, 0.86, 0.84, 0.08], | |
| }, | |
| "hero": { | |
| "title": [0.08, 0.06, 0.64, 0.11], | |
| "image_left": [0.08, 0.22, 0.36, 0.30], | |
| "image_right": [0.48, 0.22, 0.28, 0.22], | |
| "price_badge": [0.78, 0.22, 0.14, 0.12], | |
| "cta": [0.78, 0.38, 0.14, 0.10], | |
| "details": [0.08, 0.56, 0.56, 0.18], | |
| "caption_1": [0.08, 0.78, 0.24, 0.10], | |
| "caption_2": [0.36, 0.78, 0.24, 0.10], | |
| "sponsor_strip": [0.08, 0.90, 0.82, 0.06], | |
| }, | |
| "draft": { | |
| "title": [0.08, 0.08, 0.56, 0.10], | |
| "image_left": [0.10, 0.24, 0.30, 0.22], | |
| "image_right": [0.42, 0.26, 0.30, 0.22], | |
| "price_badge": [0.74, 0.24, 0.16, 0.12], | |
| "cta": [0.70, 0.40, 0.20, 0.10], | |
| "details": [0.12, 0.52, 0.62, 0.18], | |
| "caption_1": [0.10, 0.74, 0.22, 0.10], | |
| "caption_2": [0.38, 0.74, 0.22, 0.10], | |
| "sponsor_strip": [0.10, 0.88, 0.78, 0.07], | |
| }, | |
| }, | |
| "canvas": {"width": 1.0, "height": 1.0, "safe_margin": [0.04, 0.04, 0.04, 0.04], "forbidden_regions": []}, | |
| "reading_order": [["title", "details"], ["details", "cta"], ["cta", "sponsor_strip"]], | |
| "elements": [ | |
| _el("title", "title", "text", 1.0, "headline", 42, [0.24, 0.08], [0.74, 0.14], None, 1), | |
| _el("image_left", "image", "image", 0.84, "visuals", 0, [0.20, 0.18], [0.42, 0.34], 1.2, 2), | |
| _el("image_right", "image", "image", 0.76, "visuals", 0, [0.20, 0.18], [0.40, 0.34], 1.2, 3), | |
| _el("price_badge", "badge", "shape", 0.82, "conversion", 10, [0.12, 0.08], [0.22, 0.16], None, 4), | |
| _el("cta", "cta", "text", 0.90, "conversion", 16, [0.14, 0.08], [0.26, 0.14], None, 5), | |
| _el("details", "body", "text", 0.72, "details", 160, [0.34, 0.12], [0.72, 0.24], None, 6), | |
| _el("caption_1", "caption", "text", 0.44, "support", 24, [0.18, 0.06], [0.30, 0.12], None, 7), | |
| _el("caption_2", "caption", "text", 0.40, "support", 22, [0.18, 0.06], [0.30, 0.12], None, 8), | |
| _el("sponsor_strip", "caption", "shape", 0.30, "footer", 0, [0.46, 0.05], [0.90, 0.12], None, 9), | |
| ], | |
| }, | |
| } | |
| class DesignGymEnvironment(Environment): | |
| def __init__(self): | |
| super().__init__() | |
| self._state = DesignGymState() | |
| self._task_spec: Dict[str, object] = {} | |
| def _ensure_task_spec(self) -> None: | |
| task_id = getattr(self._state, "task_id", "") or "poster_basic_v1" | |
| if "templates" not in self._task_spec: | |
| self._task_spec = copy.deepcopy(TASKS.get(task_id, TASKS["poster_basic_v1"])) | |
| def _refresh_round2_context(self, phase_score_value: float = 1.0) -> None: | |
| phase = get_phase( | |
| step_count=int(self._state.step_count), | |
| max_steps=int(self._state.max_steps), | |
| current_score=float(self._state.current_score), | |
| done=bool(self._state.done), | |
| ) | |
| instr = instruction_score(self._state.elements, self._state.brief) | |
| self._state.phase = phase | |
| self._state.allowed_actions = allowed_actions_for_phase(phase) | |
| self._state.instruction_score = instr | |
| self._state.phase_score = phase_score_value | |
| self._state.critic_feedback = critic_feedback( | |
| self._state.metrics, | |
| self._state.elements, | |
| self._state.brief, | |
| instr, | |
| phase, | |
| ) | |
| memory = dict(self._state.memory or {}) | |
| phase_history = list(memory.get("phase_history", [])) | |
| if not phase_history or phase_history[-1] != phase: | |
| phase_history.append(phase) | |
| memory["phase_history"] = phase_history[-8:] | |
| memory["last_phase"] = phase | |
| self._state.memory = memory | |
| def _early_finalize_penalty(self, action: DesignGymAction) -> float: | |
| if action.action_type != "finalize": | |
| return 0.0 | |
| too_early = self._state.step_count < max(3, int(0.70 * self._state.max_steps)) | |
| not_ready = self._state.current_score < 0.75 or self._state.instruction_score < 0.65 | |
| return 0.20 if too_early and not_ready else 0.0 | |
| def _final_success_bonus(self, action: DesignGymAction) -> float: | |
| if action.action_type != "finalize": | |
| return 0.0 | |
| if self._state.current_score >= 0.75 and self._state.instruction_score >= 0.65: | |
| return 1.0 | |
| return 0.0 | |
| def reset( | |
| self, | |
| seed: Optional[int] = None, | |
| episode_id: Optional[str] = None, | |
| task_id: Optional[str] = None, | |
| **kwargs, | |
| ) -> DesignGymObservation: | |
| selected_task = task_id or kwargs.get("task_id") or "poster_basic_v1" | |
| if selected_task not in TASKS: | |
| selected_task = "poster_basic_v1" | |
| self._task_spec = copy.deepcopy(TASKS[selected_task]) | |
| local_seed = int(seed if seed is not None else kwargs.get("seed", 0) or 0) | |
| rng = random.Random(local_seed) | |
| initial_template = str(kwargs.get("template_id") or "draft") | |
| if initial_template not in self._task_spec["templates"]: | |
| initial_template = "draft" | |
| elements = self._build_initial_elements(self._task_spec, initial_template) | |
| elements = self._apply_seeded_imperfections(elements, rng) | |
| brief = choose_brief(selected_task, local_seed) | |
| self._state = DesignGymState( | |
| episode_id=episode_id or str(uuid.uuid4()), | |
| seed=local_seed, | |
| step_count=0, | |
| task_id=selected_task, | |
| instance_id=str(self._task_spec["instance_id"]), | |
| max_steps=int(self._task_spec["max_steps"]), | |
| done=False, | |
| total_reward=0.0, | |
| last_reward=0.0, | |
| current_score=0.0, | |
| current_utility=0.0, | |
| best_score_so_far=0.0, | |
| last_action_error=None, | |
| invalid_actions=0, | |
| no_progress_steps=0, | |
| canvas=copy.deepcopy(self._task_spec["canvas"]), | |
| constraints={ | |
| "reading_order": copy.deepcopy(self._task_spec["reading_order"]), | |
| "occupancy_target": self._task_spec["occupancy_target"], | |
| "required_elements": [e["id"] for e in self._task_spec["elements"]], | |
| "templates": list(self._task_spec["templates"].keys()), | |
| "intent_regions": copy.deepcopy(self._task_spec["intent_regions"]), | |
| }, | |
| metrics={}, | |
| previous_metrics={}, | |
| metric_deltas={}, | |
| elements=elements, | |
| action_history=[], | |
| brief=brief, | |
| phase="structure", | |
| allowed_actions=[], | |
| instruction_score=0.0, | |
| phase_score=1.0, | |
| reward_components={}, | |
| memory={ | |
| "selected_template": initial_template, | |
| "phase_history": ["structure"], | |
| "brief_id": brief.get("brief_id"), | |
| }, | |
| critic_feedback=[], | |
| ) | |
| score_info = self._score_layout(self._state.elements) | |
| self._state.metrics = score_info["metrics"] | |
| self._state.previous_metrics = dict(score_info["metrics"]) | |
| self._state.metric_deltas = {k: 0.0 for k in score_info["metrics"]} | |
| self._state.current_utility = float(score_info["utility"]) | |
| self._state.current_score = float(score_info["score"]) | |
| self._state.best_score_so_far = float(score_info["utility"]) | |
| self._refresh_round2_context(phase_score_value=1.0) | |
| return self._observation(message=f"Ready: {selected_task}") | |
| def state(self) -> DesignGymState: | |
| return self._state | |
| def step(self, action: DesignGymAction, timeout_s: Optional[int] = None, **kwargs) -> DesignGymObservation: | |
| self._ensure_task_spec() | |
| if self._state.done: | |
| self._state.last_reward = 0.0 | |
| self._state.last_action_error = "episode_already_done" | |
| return self._observation(message="Episode already finished.") | |
| canonical_action = action.canonical() if hasattr(action, "canonical") else str(action.action_type) | |
| proposed_elements = _deepcopy_elements(self._state.elements) | |
| if action.action_type == "finalize": | |
| phase_value = phase_score_for_action(action.action_type, self._state.phase) | |
| early_penalty = self._early_finalize_penalty(action) | |
| final_bonus = self._final_success_bonus(action) | |
| components = compose_reward( | |
| layout_delta=0.0, | |
| best_score_delta=0.0, | |
| instruction_progress=0.0, | |
| phase_correctness=phase_value, | |
| validity_score=1.0, | |
| final_success_bonus=final_bonus, | |
| no_op_penalty=0.0, | |
| oscillation_penalty=0.0, | |
| early_finalize_penalty=early_penalty, | |
| ) | |
| self._state.done = True | |
| self._state.last_reward = float(components["total"]) | |
| self._state.total_reward = _clamp(self._state.total_reward + self._state.last_reward, 0.0, 1.0) | |
| self._state.last_action_error = None | |
| self._state.action_history.append(canonical_action) | |
| self._state.reward_components = components | |
| self._refresh_round2_context(phase_score_value=phase_value) | |
| return self._observation(message="Layout finalized.") | |
| ok, error = self._apply_action(proposed_elements, action) | |
| self._state.step_count += 1 | |
| self._state.action_history.append(canonical_action) | |
| if not ok: | |
| self._state.invalid_actions += 1 | |
| self._state.no_progress_steps += 1 | |
| self._state.last_reward = 0.0 | |
| self._state.last_action_error = error | |
| if self._state.step_count >= self._state.max_steps: | |
| self._state.done = True | |
| return self._observation(message="Action rejected.") | |
| hard_valid, hard_error = self._check_hard_constraints(proposed_elements) | |
| if not hard_valid: | |
| self._state.invalid_actions += 1 | |
| self._state.no_progress_steps += 1 | |
| self._state.last_reward = 0.0 | |
| self._state.last_action_error = hard_error | |
| if self._state.step_count >= self._state.max_steps: | |
| self._state.done = True | |
| return self._observation(message="Constraint violation; reverted.") | |
| prev_score = float(self._state.current_score) | |
| prev_utility = float(self._state.current_utility) | |
| prev_best = float(self._state.best_score_so_far) | |
| prev_metrics = dict(self._state.metrics) | |
| score_info = self._score_layout(proposed_elements) | |
| curr_score = float(score_info["score"]) | |
| curr_utility = float(score_info["utility"]) | |
| curr_metrics = dict(score_info["metrics"]) | |
| neighborhood = self._neighborhood_utilities( | |
| base_elements=self._state.elements, | |
| focus_metrics=self._worst_metrics(prev_metrics, k=2), | |
| ) | |
| pref_rank = self._percentile_rank(curr_utility, neighborhood) | |
| step_gain = max(0.0, curr_score - prev_score) | |
| best_gain = max(0.0, curr_utility - prev_best) | |
| frontier_keys = self._worst_metrics(prev_metrics, k=2) | |
| frontier_gain = _mean([max(0.0, curr_metrics.get(k, 0.0) - prev_metrics.get(k, 0.0)) for k in frontier_keys]) | |
| oscillation_penalty = self._oscillation_penalty(action) | |
| waste_penalty = 0.03 if step_gain <= 1e-6 and best_gain <= 1e-6 and frontier_gain <= 1e-6 else 0.0 | |
| prev_instruction = float(self._state.instruction_score or 0.0) | |
| new_instruction = instruction_score(proposed_elements, self._state.brief) | |
| instruction_progress = max(0.0, new_instruction - prev_instruction) | |
| current_phase = get_phase( | |
| step_count=int(self._state.step_count), | |
| max_steps=int(self._state.max_steps), | |
| current_score=curr_score, | |
| done=False, | |
| ) | |
| phase_value = phase_score_for_action(action.action_type, current_phase) | |
| validity_score = 1.0 | |
| early_finalize_penalty = 0.0 | |
| final_success_bonus = 0.0 | |
| no_op_penalty = waste_penalty | |
| components = compose_reward( | |
| layout_delta=step_gain, | |
| best_score_delta=best_gain, | |
| instruction_progress=instruction_progress, | |
| phase_correctness=phase_value, | |
| validity_score=validity_score, | |
| final_success_bonus=final_success_bonus, | |
| no_op_penalty=no_op_penalty, | |
| oscillation_penalty=oscillation_penalty, | |
| early_finalize_penalty=early_finalize_penalty, | |
| ) | |
| reward = float(components["total"]) | |
| if step_gain <= 1e-6 and best_gain <= 1e-6: | |
| self._state.no_progress_steps += 1 | |
| else: | |
| self._state.no_progress_steps = 0 | |
| self._state.previous_metrics = prev_metrics | |
| self._state.metric_deltas = { | |
| key: round(curr_metrics.get(key, 0.0) - prev_metrics.get(key, 0.0), 6) | |
| for key in curr_metrics | |
| } | |
| self._state.best_score_so_far = max(prev_best, curr_utility) | |
| self._state.elements = proposed_elements | |
| self._state.metrics = curr_metrics | |
| self._state.current_utility = curr_utility | |
| efficiency = max(0.70, 1.0 - 0.05 * self._state.invalid_actions - 0.02 * self._state.no_progress_steps) | |
| self._state.current_score = _clamp(curr_utility * efficiency, 0.0, 1.0) | |
| self._state.reward_components = components | |
| self._state.total_reward = _clamp(self._state.total_reward + reward, 0.0, 1.0) | |
| self._state.last_reward = reward | |
| self._state.last_action_error = None | |
| self._refresh_round2_context(phase_score_value=phase_value) | |
| if self._state.step_count >= self._state.max_steps: | |
| self._state.done = True | |
| return self._observation(message="Max steps reached.") | |
| return self._observation(message="Action applied.") | |
| def _build_initial_elements(self, task_spec: Dict[str, object], template_name: str) -> List[Dict[str, object]]: | |
| template = task_spec["templates"][template_name] | |
| elements: List[Dict[str, object]] = [] | |
| for z, base in enumerate(task_spec["elements"], start=1): | |
| bbox = list(template[base["id"]]) | |
| elements.append( | |
| { | |
| "id": base["id"], | |
| "role": base["role"], | |
| "type": base["type"], | |
| "importance": float(base["importance"]), | |
| "group": base["group"], | |
| "content_len": int(base.get("content_len", 0)), | |
| "bbox": bbox, | |
| "z": z, | |
| "min_size": list(base["min_size"]), | |
| "max_size": list(base["max_size"]), | |
| "aspect_ratio": base["aspect_ratio"], | |
| "precedence": int(base["precedence"]), | |
| "movable": bool(base["movable"]), | |
| "resizable": bool(base["resizable"]), | |
| "placed": True, | |
| } | |
| ) | |
| return elements | |
| def _apply_seeded_imperfections(self, elements: List[Dict[str, object]], rng: random.Random) -> List[Dict[str, object]]: | |
| intensity = float(self._task_spec.get("init_noise", 0.018)) | |
| trial = _deepcopy_elements(elements) | |
| by_id = _element_map(trial) | |
| for e in trial: | |
| if not e.get("movable", False): | |
| continue | |
| e["bbox"][0] += rng.uniform(-intensity, intensity) | |
| e["bbox"][1] += rng.uniform(-intensity, intensity) | |
| task_id = str(self._task_spec["instance_id"]) | |
| if "poster_basic" in task_id: | |
| by_id["subtitle"]["bbox"][0] += 0.045 | |
| by_id["cta"]["bbox"][0] -= 0.035 | |
| by_id["hero_image"]["bbox"][2] -= 0.06 | |
| by_id["badge"]["bbox"][1] -= 0.03 | |
| elif "editorial" in task_id: | |
| by_id["headline_2"]["bbox"][0] += 0.05 | |
| by_id["headline_3"]["bbox"][0] += 0.07 | |
| by_id["teaser"]["bbox"][1] += 0.03 | |
| by_id["masthead"]["bbox"][2] -= 0.06 | |
| else: | |
| by_id["caption_1"]["bbox"][1] -= 0.04 | |
| by_id["caption_2"]["bbox"][1] -= 0.01 | |
| by_id["cta"]["bbox"][0] -= 0.04 | |
| by_id["details"]["bbox"][0] += 0.04 | |
| by_id["details"]["bbox"][2] -= 0.10 | |
| self._repair_layout_in_place(trial) | |
| return trial | |
| def _repair_layout_in_place(self, elements: List[Dict[str, object]]) -> None: | |
| left_m, top_m, right_m, bottom_m = [float(v) for v in self._task_spec["canvas"]["safe_margin"]] | |
| for e in elements: | |
| x, y, w, h = [float(v) for v in e["bbox"]] | |
| min_w, min_h = [float(v) for v in e["min_size"]] | |
| max_w, max_h = [float(v) for v in e["max_size"]] | |
| w = _clamp(w, min_w, max_w) | |
| h = _clamp(h, min_h, max_h) | |
| ar = e.get("aspect_ratio") | |
| if ar: | |
| target_w = _clamp(h * float(ar), min_w, max_w) | |
| target_h = _clamp(w / float(ar), min_h, max_h) | |
| if abs(target_w - w) <= abs(target_h - h) * float(ar): | |
| w = target_w | |
| else: | |
| h = target_h | |
| x = _clamp(x, left_m, 1.0 - right_m - w) | |
| y = _clamp(y, top_m, 1.0 - bottom_m - h) | |
| e["bbox"] = [x, y, w, h] | |
| def _observation(self, message: str) -> DesignGymObservation: | |
| summary_lines = [] | |
| for e in sorted(self._state.elements, key=lambda item: item["z"]): | |
| x, y, w, h = e["bbox"] | |
| summary_lines.append(f"{e['id']}@({x:.2f},{y:.2f},{w:.2f},{h:.2f})") | |
| blame = self._element_blame(self._state.elements) | |
| focus = [k for k, _ in sorted(blame.items(), key=lambda item: item[1], reverse=True)[:3]] | |
| warnings = self._constraint_warnings(self._state.elements) | |
| worst = self._worst_metrics(self._state.metrics, k=3) | |
| return DesignGymObservation( | |
| message=message, | |
| task_id=self._state.task_id, | |
| step_count=self._state.step_count, | |
| max_steps=self._state.max_steps, | |
| done=self._state.done, | |
| reward=_clamp(self._state.last_reward, 0.0, 1.0), | |
| current_score=_clamp(self._state.current_score, 0.0, 1.0), | |
| best_score_so_far=_clamp(self._state.best_score_so_far, 0.0, 1.0), | |
| last_action_error=self._state.last_action_error, | |
| legal_actions=[ | |
| "apply_template(template_id)", | |
| "move(element_id, dx, dy)", | |
| "resize(element_id, dw, dh, anchor)", | |
| "align(element_ids, axis, mode)", | |
| "distribute(element_ids, axis)", | |
| "swap_z(element_ids[0], element_ids[1])", | |
| "snap(element_id, grid)", | |
| "promote(element_id, strength)", | |
| "reflow_group(group_id, pattern)", | |
| "anchor_to_region(element_id, region_id, mode)", | |
| "finalize()", | |
| ], | |
| layout_summary="; ".join(summary_lines), | |
| metrics={k: round(float(v), 4) for k, v in self._state.metrics.items()}, | |
| metric_deltas={k: round(float(v), 4) for k, v in self._state.metric_deltas.items()}, | |
| worst_metrics=worst, | |
| focus_elements=focus, | |
| element_blame={k: round(float(v), 4) for k, v in blame.items()}, | |
| constraint_warnings=warnings, | |
| suggested_edits=self._suggested_edits(worst, focus), | |
| brief=self._state.brief, | |
| phase=self._state.phase, | |
| allowed_actions=self._state.allowed_actions, | |
| instruction_score=round(float(self._state.instruction_score), 4), | |
| phase_score=round(float(self._state.phase_score), 4), | |
| reward_components={k: round(float(v), 4) for k, v in self._state.reward_components.items()}, | |
| memory=self._state.memory, | |
| critic_feedback=self._state.critic_feedback, | |
| ) | |
| def _apply_action(self, elements: List[Dict[str, object]], action: DesignGymAction) -> Tuple[bool, Optional[str]]: | |
| by_id = _element_map(elements) | |
| if action.action_type == "apply_template": | |
| template_id = action.template_id or "draft" | |
| templates = self._task_spec["templates"] | |
| if template_id not in templates: | |
| return False, "unknown_template" | |
| for e in elements: | |
| e["bbox"] = list(templates[template_id][e["id"]]) | |
| return True, None | |
| if action.action_type == "move": | |
| if not action.element_id or action.element_id not in by_id: | |
| return False, "unknown_element" | |
| e = by_id[action.element_id] | |
| if not e["movable"]: | |
| return False, "element_not_movable" | |
| x, y, w, h = e["bbox"] | |
| e["bbox"] = [x + action.dx, y + action.dy, w, h] | |
| self._repair_layout_in_place(elements) | |
| return True, None | |
| if action.action_type == "resize": | |
| if not action.element_id or action.element_id not in by_id: | |
| return False, "unknown_element" | |
| e = by_id[action.element_id] | |
| if not e["resizable"]: | |
| return False, "element_not_resizable" | |
| x, y, w, h = e["bbox"] | |
| new_w = w + action.dw | |
| new_h = h + action.dh | |
| if action.anchor == "center": | |
| x -= action.dw / 2.0 | |
| y -= action.dh / 2.0 | |
| elif action.anchor == "east": | |
| y -= action.dh / 2.0 | |
| elif action.anchor == "south": | |
| x -= action.dw / 2.0 | |
| elif action.anchor == "ne": | |
| y -= action.dh | |
| elif action.anchor == "nw": | |
| x -= action.dw | |
| y -= action.dh | |
| elif action.anchor == "sw": | |
| x -= action.dw | |
| elif action.anchor == "north": | |
| x -= action.dw / 2.0 | |
| y -= action.dh | |
| elif action.anchor == "west": | |
| x -= action.dw | |
| y -= action.dh / 2.0 | |
| e["bbox"] = [x, y, new_w, new_h] | |
| self._repair_layout_in_place(elements) | |
| return True, None | |
| if action.action_type == "align": | |
| ids = [i for i in action.element_ids if i in by_id] | |
| if len(ids) < 2: | |
| return False, "align_needs_two_or_more_elements" | |
| boxes = [by_id[i]["bbox"] for i in ids] | |
| if action.axis == "x": | |
| if action.mode == "left": | |
| target = min(b[0] for b in boxes) | |
| for i in ids: | |
| by_id[i]["bbox"][0] = target | |
| elif action.mode == "center": | |
| target = _mean([b[0] + b[2] / 2.0 for b in boxes]) | |
| for i in ids: | |
| by_id[i]["bbox"][0] = target - by_id[i]["bbox"][2] / 2.0 | |
| elif action.mode == "right": | |
| target = max(b[0] + b[2] for b in boxes) | |
| for i in ids: | |
| by_id[i]["bbox"][0] = target - by_id[i]["bbox"][2] | |
| else: | |
| return False, "invalid_align_mode" | |
| elif action.axis == "y": | |
| if action.mode == "top": | |
| target = min(b[1] for b in boxes) | |
| for i in ids: | |
| by_id[i]["bbox"][1] = target | |
| elif action.mode == "middle": | |
| target = _mean([b[1] + b[3] / 2.0 for b in boxes]) | |
| for i in ids: | |
| by_id[i]["bbox"][1] = target - by_id[i]["bbox"][3] / 2.0 | |
| elif action.mode == "bottom": | |
| target = max(b[1] + b[3] for b in boxes) | |
| for i in ids: | |
| by_id[i]["bbox"][1] = target - by_id[i]["bbox"][3] | |
| else: | |
| return False, "invalid_align_mode" | |
| else: | |
| return False, "invalid_axis" | |
| self._repair_layout_in_place(elements) | |
| return True, None | |
| if action.action_type == "distribute": | |
| ids = [i for i in action.element_ids if i in by_id] | |
| if len(ids) < 3: | |
| return False, "distribute_needs_three_or_more_elements" | |
| if action.axis == "x": | |
| ids.sort(key=lambda i: by_id[i]["bbox"][0]) | |
| left = by_id[ids[0]]["bbox"][0] | |
| right = by_id[ids[-1]]["bbox"][0] + by_id[ids[-1]]["bbox"][2] | |
| total_w = sum(by_id[i]["bbox"][2] for i in ids) | |
| gap = (right - left - total_w) / (len(ids) - 1) | |
| if gap < -EPS: | |
| return False, "negative_distribution_gap" | |
| cursor = left | |
| for i in ids: | |
| by_id[i]["bbox"][0] = cursor | |
| cursor += by_id[i]["bbox"][2] + gap | |
| elif action.axis == "y": | |
| ids.sort(key=lambda i: by_id[i]["bbox"][1]) | |
| top = by_id[ids[0]]["bbox"][1] | |
| bottom = by_id[ids[-1]]["bbox"][1] + by_id[ids[-1]]["bbox"][3] | |
| total_h = sum(by_id[i]["bbox"][3] for i in ids) | |
| gap = (bottom - top - total_h) / (len(ids) - 1) | |
| if gap < -EPS: | |
| return False, "negative_distribution_gap" | |
| cursor = top | |
| for i in ids: | |
| by_id[i]["bbox"][1] = cursor | |
| cursor += by_id[i]["bbox"][3] + gap | |
| else: | |
| return False, "invalid_axis" | |
| self._repair_layout_in_place(elements) | |
| return True, None | |
| if action.action_type == "swap_z": | |
| ids = [i for i in action.element_ids if i in by_id] | |
| if len(ids) != 2: | |
| return False, "swap_z_needs_exactly_two_elements" | |
| by_id[ids[0]]["z"], by_id[ids[1]]["z"] = by_id[ids[1]]["z"], by_id[ids[0]]["z"] | |
| return True, None | |
| if action.action_type == "snap": | |
| if not action.element_id or action.element_id not in by_id: | |
| return False, "unknown_element" | |
| grid = int(action.grid) | |
| if grid <= 0: | |
| return False, "grid_must_be_positive" | |
| e = by_id[action.element_id] | |
| x, y, w, h = e["bbox"] | |
| e["bbox"] = [round(x * grid) / grid, round(y * grid) / grid, round(w * grid) / grid, round(h * grid) / grid] | |
| self._repair_layout_in_place(elements) | |
| return True, None | |
| if action.action_type == "promote": | |
| if not action.element_id or action.element_id not in by_id: | |
| return False, "unknown_element" | |
| e = by_id[action.element_id] | |
| strength = action.strength if abs(action.strength) > EPS else 0.06 | |
| x, y, w, h = [float(v) for v in e["bbox"]] | |
| grow = abs(strength) | |
| if e["type"] == "text": | |
| e["bbox"] = [x - 0.5 * grow, y - 0.25 * grow, w + grow, h + 0.5 * grow] | |
| else: | |
| e["bbox"] = [x - 0.4 * grow, y - 0.4 * grow, w + 0.8 * grow, h + 0.8 * grow] | |
| e["z"] = max(int(item["z"]) for item in elements) | |
| self._repair_layout_in_place(elements) | |
| return True, None | |
| if action.action_type == "reflow_group": | |
| if not action.group_id: | |
| return False, "missing_group_id" | |
| members = [e for e in elements if str(e["group"]) == action.group_id] | |
| if len(members) < 2: | |
| return False, "group_not_found_or_too_small" | |
| pattern = action.pattern or "stack" | |
| xs = [e["bbox"][0] for e in members] | |
| ys = [e["bbox"][1] for e in members] | |
| rights = [e["bbox"][0] + e["bbox"][2] for e in members] | |
| bottoms = [e["bbox"][1] + e["bbox"][3] for e in members] | |
| left, top = min(xs), min(ys) | |
| width, height = max(rights) - left, max(bottoms) - top | |
| ordered = sorted(members, key=lambda e: (e["precedence"], e["id"])) | |
| if pattern == "stack": | |
| gap = max(0.012, (height - sum(e["bbox"][3] for e in ordered)) / max(1, len(ordered) - 1)) | |
| cursor = top | |
| for e in ordered: | |
| e["bbox"][0] = left | |
| e["bbox"][1] = cursor | |
| cursor += e["bbox"][3] + gap | |
| elif pattern == "row": | |
| gap = max(0.012, (width - sum(e["bbox"][2] for e in ordered)) / max(1, len(ordered) - 1)) | |
| cursor = left | |
| for e in ordered: | |
| e["bbox"][0] = cursor | |
| e["bbox"][1] = top | |
| cursor += e["bbox"][2] + gap | |
| elif pattern == "grid2": | |
| col_w = max(e["bbox"][2] for e in ordered) | |
| row_h = max(e["bbox"][3] for e in ordered) | |
| for idx, e in enumerate(ordered): | |
| row = idx // 2 | |
| col = idx % 2 | |
| e["bbox"][0] = left + col * (col_w + 0.018) | |
| e["bbox"][1] = top + row * (row_h + 0.018) | |
| elif pattern == "sidebar": | |
| col_x = max(0.52, left) | |
| cursor = top | |
| for e in ordered: | |
| e["bbox"][0] = col_x | |
| e["bbox"][1] = cursor | |
| cursor += e["bbox"][3] + 0.016 | |
| else: | |
| return False, "unknown_reflow_pattern" | |
| self._repair_layout_in_place(elements) | |
| return True, None | |
| if action.action_type == "anchor_to_region": | |
| if not action.element_id or action.element_id not in by_id: | |
| return False, "unknown_element" | |
| if not action.region_id: | |
| return False, "missing_region_id" | |
| region = self._region_boxes().get(action.region_id) | |
| if region is None: | |
| return False, "unknown_region" | |
| e = by_id[action.element_id] | |
| ex, ey, ew, eh = [float(v) for v in e["bbox"]] | |
| rx, ry, rw, rh = region | |
| mode = action.mode or "center" | |
| if mode == "fill": | |
| ew = min(ew, rw) | |
| eh = min(eh, rh) | |
| ex = rx + (rw - ew) / 2.0 | |
| ey = ry + (rh - eh) / 2.0 | |
| elif mode == "start": | |
| ex = rx | |
| ey = ry | |
| elif mode == "end": | |
| ex = rx + rw - ew | |
| ey = ry + rh - eh | |
| else: | |
| ex = rx + (rw - ew) / 2.0 | |
| ey = ry + (rh - eh) / 2.0 | |
| e["bbox"] = [ex, ey, ew, eh] | |
| self._repair_layout_in_place(elements) | |
| return True, None | |
| return False, "unknown_action_type" | |
| def _check_hard_constraints(self, elements: List[Dict[str, object]]) -> Tuple[bool, Optional[str]]: | |
| left_m, top_m, right_m, bottom_m = [float(v) for v in self._task_spec["canvas"]["safe_margin"]] | |
| for e in elements: | |
| x, y, w, h = [float(v) for v in e["bbox"]] | |
| min_w, min_h = [float(v) for v in e["min_size"]] | |
| max_w, max_h = [float(v) for v in e["max_size"]] | |
| if w < min_w - EPS or h < min_h - EPS: | |
| return False, f"min_size_violation:{e['id']}" | |
| if w > max_w + EPS or h > max_h + EPS: | |
| return False, f"max_size_violation:{e['id']}" | |
| if x < left_m - EPS or y < top_m - EPS: | |
| return False, f"outside_safe_region:{e['id']}" | |
| if x + w > 1.0 - right_m + EPS or y + h > 1.0 - bottom_m + EPS: | |
| return False, f"outside_safe_region:{e['id']}" | |
| ar = e.get("aspect_ratio") | |
| if ar: | |
| ratio = w / max(h, EPS) | |
| if abs(math.log(ratio / float(ar))) > 0.18: | |
| return False, f"aspect_ratio_violation:{e['id']}" | |
| for region in self._task_spec["canvas"].get("forbidden_regions", []): | |
| for e in elements: | |
| if _intersect(e["bbox"], region) > EPS: | |
| return False, f"forbidden_region_overlap:{e['id']}" | |
| return True, None | |
| def _score_layout(self, elements: List[Dict[str, object]]) -> Dict[str, object]: | |
| hard_valid, _ = self._check_hard_constraints(elements) | |
| metrics = { | |
| "overlap": self._metric_overlap(elements), | |
| "alignment": self._metric_alignment(elements), | |
| "spacing": self._metric_spacing(elements), | |
| "balance": self._metric_balance(elements), | |
| "hierarchy": self._metric_hierarchy(elements), | |
| "grouping": self._metric_grouping(elements), | |
| "reading_order": self._metric_reading_order(elements), | |
| "aspect_ratio": self._metric_aspect_ratio(elements), | |
| "occupancy": self._metric_occupancy(elements), | |
| "text_fit": self._metric_text_fit(elements), | |
| "negative_space": self._metric_negative_space(elements), | |
| "intent_fit": self._metric_intent_fit(elements), | |
| } | |
| utility = 0.0 | |
| for key, weight in self._task_spec["weights"].items(): | |
| utility += float(weight) * float(metrics[key]) | |
| utility = _clamp(utility, 0.0, 1.0) | |
| score = utility if hard_valid else 0.0 | |
| return {"utility": utility, "score": score, "metrics": metrics} | |
| def _metric_overlap(self, elements: List[Dict[str, object]]) -> float: | |
| total_overlap = 0.0 | |
| total_area = 0.0 | |
| for i, a in enumerate(elements): | |
| total_area += _area(a["bbox"]) | |
| for b in elements[i + 1:]: | |
| total_overlap += _intersect(a["bbox"], b["bbox"]) | |
| return _clamp(_safe_exp(-(total_overlap / (total_area + EPS))), 0.0, 1.0) | |
| def _metric_alignment(self, elements: List[Dict[str, object]]) -> float: | |
| if len(elements) < 2: | |
| return 0.5 | |
| canvas_guides_x = [0.04, 0.50, 0.96] | |
| canvas_guides_y = [0.04, 0.50, 0.96] | |
| distances: List[float] = [] | |
| for i, e in enumerate(elements): | |
| anchors = _anchors(e["bbox"]) | |
| other_x = canvas_guides_x[:] | |
| other_y = canvas_guides_y[:] | |
| for j, o in enumerate(elements): | |
| if i == j: | |
| continue | |
| oa = _anchors(o["bbox"]) | |
| other_x.extend([oa["left"], oa["center"], oa["right"]]) | |
| other_y.extend([oa["top"], oa["middle"], oa["bottom"]]) | |
| for name in ("left", "center", "right"): | |
| distances.append(min(abs(anchors[name] - g) for g in other_x)) | |
| for name in ("top", "middle", "bottom"): | |
| distances.append(min(abs(anchors[name] - g) for g in other_y)) | |
| return _clamp(_mean([_safe_exp(-d / 0.055) for d in distances]), 0.0, 1.0) | |
| def _metric_spacing(self, elements: List[Dict[str, object]]) -> float: | |
| gaps: List[float] = [] | |
| xs = sorted(elements, key=lambda e: e["bbox"][0]) | |
| ys = sorted(elements, key=lambda e: e["bbox"][1]) | |
| for items, axis in ((xs, "x"), (ys, "y")): | |
| for a, b in zip(items, items[1:]): | |
| if axis == "x": | |
| gap = b["bbox"][0] - (a["bbox"][0] + a["bbox"][2]) | |
| overlap_other = min(a["bbox"][1] + a["bbox"][3], b["bbox"][1] + b["bbox"][3]) - max(a["bbox"][1], b["bbox"][1]) | |
| else: | |
| gap = b["bbox"][1] - (a["bbox"][1] + a["bbox"][3]) | |
| overlap_other = min(a["bbox"][0] + a["bbox"][2], b["bbox"][0] + b["bbox"][2]) - max(a["bbox"][0], b["bbox"][0]) | |
| if gap > 0 and overlap_other > 0: | |
| gaps.append(gap) | |
| if len(gaps) < 2: | |
| return 0.5 | |
| cv = _std(gaps) / (_mean(gaps) + EPS) | |
| return _clamp(_safe_exp(-(cv / 0.70)), 0.0, 1.0) | |
| def _metric_balance(self, elements: List[Dict[str, object]]) -> float: | |
| masses = [] | |
| centers = [] | |
| for e in elements: | |
| a = _area(e["bbox"]) | |
| masses.append(a * float(e["importance"])) | |
| centers.append(_center(e["bbox"])) | |
| total_mass = sum(masses) | |
| if total_mass <= EPS: | |
| return 0.0 | |
| cx = sum(m * c[0] for m, c in zip(masses, centers)) / total_mass | |
| cy = sum(m * c[1] for m, c in zip(masses, centers)) / total_mass | |
| dist = math.sqrt((cx - 0.5) ** 2 + (cy - 0.5) ** 2) | |
| return _clamp(_safe_exp(-(dist / 0.22)), 0.0, 1.0) | |
| def _metric_hierarchy(self, elements: List[Dict[str, object]]) -> float: | |
| importance = [] | |
| salience = [] | |
| for e in elements: | |
| x, y, w, h = e["bbox"] | |
| a = _area(e["bbox"]) | |
| focus_x = 1.0 - abs((x + w / 2.0) - 0.5) / 0.5 | |
| zeta = 0.55 * math.log(a + 1e-4) - 0.22 * y + 0.18 * focus_x + 0.10 * e["z"] / max(1, len(elements)) | |
| importance.append(float(e["importance"])) | |
| salience.append(zeta) | |
| rho = _spearman(importance, salience) | |
| return _clamp((1.0 + rho) / 2.0, 0.0, 1.0) | |
| def _metric_grouping(self, elements: List[Dict[str, object]]) -> float: | |
| groups: Dict[str, List[Tuple[float, float]]] = {} | |
| for e in elements: | |
| groups.setdefault(str(e["group"]), []).append(_center(e["bbox"])) | |
| if len(groups) < 2: | |
| return 0.5 | |
| within = [] | |
| group_centers: List[Tuple[float, float]] = [] | |
| for centers in groups.values(): | |
| gx = _mean([c[0] for c in centers]) | |
| gy = _mean([c[1] for c in centers]) | |
| group_centers.append((gx, gy)) | |
| within.append(_mean([math.dist(c, (gx, gy)) for c in centers])) | |
| between = [] | |
| for i, c1 in enumerate(group_centers): | |
| for c2 in group_centers[i + 1:]: | |
| between.append(math.dist(c1, c2)) | |
| within_term = _safe_exp(-(_mean(within) / 0.22)) | |
| between_term = 1.0 - _safe_exp(-(_mean(between) / 0.28)) | |
| return _clamp(within_term * between_term, 0.0, 1.0) | |
| def _metric_reading_order(self, elements: List[Dict[str, object]]) -> float: | |
| if not self._task_spec.get("reading_order"): | |
| return 0.5 | |
| by_id = _element_map(elements) | |
| good = 0 | |
| total = 0 | |
| for first_id, second_id in self._task_spec["reading_order"]: | |
| if first_id not in by_id or second_id not in by_id: | |
| continue | |
| total += 1 | |
| a = by_id[first_id]["bbox"] | |
| b = by_id[second_id]["bbox"] | |
| if abs(a[1] - b[1]) <= 0.05: | |
| ok = a[0] <= b[0] | |
| else: | |
| ok = a[1] <= b[1] | |
| good += 1 if ok else 0 | |
| return _clamp(good / total if total else 0.5, 0.0, 1.0) | |
| def _metric_aspect_ratio(self, elements: List[Dict[str, object]]) -> float: | |
| locked = [e for e in elements if e.get("aspect_ratio")] | |
| if not locked: | |
| return 1.0 | |
| penalties = [] | |
| for e in locked: | |
| w, h = e["bbox"][2], e["bbox"][3] | |
| penalties.append(abs(math.log((w / max(h, EPS)) / float(e["aspect_ratio"])))) | |
| return _clamp(_safe_exp(-_mean(penalties) / 0.9), 0.0, 1.0) | |
| def _metric_occupancy(self, elements: List[Dict[str, object]]) -> float: | |
| occ = sum(_area(e["bbox"]) for e in elements) | |
| target = float(self._task_spec["occupancy_target"]) | |
| tol = float(self._task_spec["occupancy_tolerance"]) | |
| return _clamp(max(0.0, 1.0 - abs(occ - target) / max(tol, EPS)), 0.0, 1.0) | |
| def _metric_text_fit(self, elements: List[Dict[str, object]]) -> float: | |
| penalties = [] | |
| target = float(self._task_spec.get("text_density_target", 0.68)) | |
| for e in elements: | |
| if e["type"] != "text": | |
| continue | |
| w, h = float(e["bbox"][2]), float(e["bbox"][3]) | |
| capacity = max(EPS, w * h * 900.0) | |
| demand = float(e.get("content_len", 0)) | |
| ratio = demand / capacity | |
| penalties.append(abs(ratio - target)) | |
| if not penalties: | |
| return 1.0 | |
| return _clamp(_safe_exp(-_mean(penalties) / 0.45), 0.0, 1.0) | |
| def _metric_negative_space(self, elements: List[Dict[str, object]]) -> float: | |
| occ = sum(_area(e["bbox"]) for e in elements) | |
| whitespace = max(0.0, 1.0 - occ) | |
| xs = sorted([e["bbox"][0] for e in elements] + [e["bbox"][0] + e["bbox"][2] for e in elements]) | |
| ys = sorted([e["bbox"][1] for e in elements] + [e["bbox"][1] + e["bbox"][3] for e in elements]) | |
| x_gaps = [max(0.0, b - a) for a, b in zip(xs, xs[1:])] | |
| y_gaps = [max(0.0, b - a) for a, b in zip(ys, ys[1:])] | |
| rhythm = 1.0 - min(1.0, (_std(x_gaps) + _std(y_gaps)) / 0.18) if x_gaps and y_gaps else 0.5 | |
| whitespace_term = 1.0 - abs(whitespace - (1.0 - float(self._task_spec["occupancy_target"]))) / 0.30 | |
| return _clamp(0.6 * rhythm + 0.4 * max(0.0, whitespace_term), 0.0, 1.0) | |
| def _metric_intent_fit(self, elements: List[Dict[str, object]]) -> float: | |
| regions = self._region_boxes() | |
| intent_regions = self._task_spec.get("intent_regions", {}) | |
| scores = [] | |
| for e in elements: | |
| region_id = intent_regions.get(e["id"]) | |
| if not region_id or region_id not in regions: | |
| continue | |
| rx, ry, rw, rh = regions[region_id] | |
| cx, cy = _center(e["bbox"]) | |
| tx, ty = rx + rw / 2.0, ry + rh / 2.0 | |
| dist = math.dist((cx, cy), (tx, ty)) | |
| diag = math.sqrt(rw * rw + rh * rh) + EPS | |
| scores.append(_safe_exp(-(dist / diag) / 0.65)) | |
| return _clamp(_mean(scores) if scores else 0.5, 0.0, 1.0) | |
| def _region_boxes(self) -> Dict[str, List[float]]: | |
| left_m, top_m, right_m, bottom_m = [float(v) for v in self._task_spec["canvas"]["safe_margin"]] | |
| usable_x = left_m | |
| usable_y = top_m | |
| usable_w = 1.0 - left_m - right_m | |
| usable_h = 1.0 - top_m - bottom_m | |
| return { | |
| "top_band": [usable_x, usable_y, usable_w, usable_h * 0.18], | |
| "hero_center": [usable_x + usable_w * 0.12, usable_y + usable_h * 0.18, usable_w * 0.58, usable_h * 0.46], | |
| "left_column": [usable_x, usable_y + usable_h * 0.18, usable_w * 0.40, usable_h * 0.58], | |
| "right_column": [usable_x + usable_w * 0.60, usable_y + usable_h * 0.18, usable_w * 0.28, usable_h * 0.58], | |
| "upper_right": [usable_x + usable_w * 0.68, usable_y + usable_h * 0.12, usable_w * 0.24, usable_h * 0.18], | |
| "middle_band": [usable_x + usable_w * 0.08, usable_y + usable_h * 0.45, usable_w * 0.60, usable_h * 0.20], | |
| "lower_left": [usable_x + usable_w * 0.08, usable_y + usable_h * 0.60, usable_w * 0.44, usable_h * 0.22], | |
| "lower_right": [usable_x + usable_w * 0.54, usable_y + usable_h * 0.60, usable_w * 0.30, usable_h * 0.22], | |
| "footer_strip": [usable_x, usable_y + usable_h * 0.86, usable_w, usable_h * 0.10], | |
| "footer_left": [usable_x, usable_y + usable_h * 0.84, usable_w * 0.24, usable_h * 0.12], | |
| "top_right": [usable_x + usable_w * 0.72, usable_y, usable_w * 0.20, usable_h * 0.18], | |
| "safe_lower_right": [usable_x + usable_w * 0.64, usable_y + usable_h * 0.66, usable_w * 0.24, usable_h * 0.20], | |
| } | |
| def _worst_metrics(self, metrics: Dict[str, float], k: int = 3) -> List[str]: | |
| return [name for name, _ in sorted(metrics.items(), key=lambda item: item[1])[:k]] | |
| def _element_blame(self, elements: List[Dict[str, object]]) -> Dict[str, float]: | |
| by_id = _element_map(elements) | |
| blame = {str(e["id"]): 0.0 for e in elements} | |
| for i, a in enumerate(elements): | |
| for b in elements[i + 1:]: | |
| overlap = _intersect(a["bbox"], b["bbox"]) | |
| if overlap > EPS: | |
| norm = overlap / max(EPS, min(_area(a["bbox"]), _area(b["bbox"]))) | |
| blame[str(a["id"])] += norm | |
| blame[str(b["id"])] += norm | |
| for e in elements: | |
| ea = _anchors(e["bbox"]) | |
| dx = [] | |
| dy = [] | |
| for o in elements: | |
| if e["id"] == o["id"]: | |
| continue | |
| oa = _anchors(o["bbox"]) | |
| dx.extend([abs(ea["left"] - oa["left"]), abs(ea["center"] - oa["center"]), abs(ea["right"] - oa["right"])]) | |
| dy.extend([abs(ea["top"] - oa["top"]), abs(ea["middle"] - oa["middle"]), abs(ea["bottom"] - oa["bottom"])]) | |
| align_bad = min(dx) + min(dy) if dx and dy else 0.0 | |
| blame[str(e["id"])] += align_bad * 1.4 | |
| for first_id, second_id in self._task_spec.get("reading_order", []): | |
| if first_id not in by_id or second_id not in by_id: | |
| continue | |
| a = by_id[first_id]["bbox"] | |
| b = by_id[second_id]["bbox"] | |
| if not (a[1] <= b[1] or (abs(a[1] - b[1]) <= 0.05 and a[0] <= b[0])): | |
| blame[str(first_id)] += 0.15 | |
| blame[str(second_id)] += 0.15 | |
| importance = [float(e["importance"]) for e in elements] | |
| salience = [] | |
| for e in elements: | |
| x, y, w, h = e["bbox"] | |
| salience.append(0.55 * math.log(_area(e["bbox"]) + 1e-4) - 0.22 * y + 0.18 * (1.0 - abs((x + w / 2.0) - 0.5) / 0.5)) | |
| imp_r = _rank(importance) | |
| sal_r = _rank(salience) | |
| for e, ri, rs in zip(elements, imp_r, sal_r): | |
| blame[str(e["id"])] += abs(ri - rs) / max(1.0, len(elements)) | |
| max_blame = max(blame.values()) if blame else 1.0 | |
| if max_blame <= EPS: | |
| return blame | |
| return {k: _clamp(v / max_blame, 0.0, 1.0) for k, v in blame.items()} | |
| def _constraint_warnings(self, elements: List[Dict[str, object]]) -> List[str]: | |
| warnings: List[str] = [] | |
| for e in elements: | |
| x, y, w, h = [float(v) for v in e["bbox"]] | |
| min_w, min_h = [float(v) for v in e["min_size"]] | |
| max_w, max_h = [float(v) for v in e["max_size"]] | |
| if w - min_w < 0.02 or h - min_h < 0.02: | |
| warnings.append(f"{e['id']}:near_min_size") | |
| if max_w - w < 0.02 or max_h - h < 0.02: | |
| warnings.append(f"{e['id']}:near_max_size") | |
| return warnings[:6] | |
| def _suggested_edits(self, worst: List[str], focus: List[str]) -> List[str]: | |
| suggestions: List[str] = [] | |
| for metric in worst: | |
| if metric == "alignment": | |
| suggestions.append("align related elements on x or y") | |
| elif metric == "spacing": | |
| suggestions.append("distribute a crowded group") | |
| elif metric == "hierarchy": | |
| suggestions.append("promote the focal element") | |
| elif metric == "intent_fit": | |
| suggestions.append("anchor an important element to its semantic region") | |
| elif metric == "reading_order": | |
| suggestions.append("reflow a story group or vertical stack") | |
| elif metric == "occupancy": | |
| suggestions.append("resize the hero or body block toward target fill") | |
| elif metric == "text_fit": | |
| suggestions.append("resize text blocks to improve copy fit") | |
| if focus: | |
| suggestions.append(f"inspect focus elements: {', '.join(focus[:2])}") | |
| out = [] | |
| seen = set() | |
| for item in suggestions: | |
| if item not in seen: | |
| seen.add(item) | |
| out.append(item) | |
| return out[:5] | |
| def _neighborhood_utilities(self, base_elements: List[Dict[str, object]], focus_metrics: List[str]) -> List[float]: | |
| candidates: List[List[Dict[str, object]]] = [] | |
| by_group: Dict[str, List[str]] = {} | |
| for e in base_elements: | |
| by_group.setdefault(str(e["group"]), []).append(str(e["id"])) | |
| if "alignment" in focus_metrics or "spacing" in focus_metrics: | |
| headline_ids = [e["id"] for e in base_elements if e["group"] in {"headline", "header", "stories"}] | |
| if len(headline_ids) >= 2: | |
| cand = _deepcopy_elements(base_elements) | |
| self._apply_action(cand, DesignGymAction(action_type="align", element_ids=headline_ids[:3], axis="x", mode="left")) | |
| candidates.append(cand) | |
| if "hierarchy" in focus_metrics or "occupancy" in focus_metrics: | |
| important = max(base_elements, key=lambda e: float(e["importance"])) | |
| cand = _deepcopy_elements(base_elements) | |
| self._apply_action(cand, DesignGymAction(action_type="promote", element_id=str(important["id"]), strength=0.05)) | |
| candidates.append(cand) | |
| if "intent_fit" in focus_metrics or "reading_order" in focus_metrics: | |
| for element_id, region_id in self._task_spec.get("intent_regions", {}).items(): | |
| if any(str(e["id"]) == element_id for e in base_elements): | |
| cand = _deepcopy_elements(base_elements) | |
| self._apply_action(cand, DesignGymAction(action_type="anchor_to_region", element_id=element_id, region_id=str(region_id), mode="center")) | |
| candidates.append(cand) | |
| break | |
| for group_id, ids in by_group.items(): | |
| if len(ids) >= 3: | |
| cand = _deepcopy_elements(base_elements) | |
| self._apply_action(cand, DesignGymAction(action_type="reflow_group", group_id=group_id, pattern="stack")) | |
| candidates.append(cand) | |
| break | |
| utilities = [] | |
| for cand in candidates[:4]: | |
| utilities.append(float(self._score_layout(cand)["utility"])) | |
| return utilities | |
| def _percentile_rank(self, utility: float, neighborhood: List[float]) -> float: | |
| if not neighborhood: | |
| return 0.5 | |
| wins = sum(1 for value in neighborhood if utility >= value - 1e-9) | |
| return _clamp(wins / len(neighborhood), 0.0, 1.0) | |
| def _oscillation_penalty(self, action: DesignGymAction) -> float: | |
| history = self._state.action_history[-2:] | |
| if len(history) < 2: | |
| return 0.0 | |
| prev = history[-1] | |
| current = action.action_type | |
| if current == "move" and '"action_type":"move"' in prev: | |
| return 0.01 | |
| if current == "apply_template" and '"action_type":"apply_template"' in prev: | |
| return 0.02 | |
| return 0.0 |