Spaces:
Running
Running
feat: real emergence — anti-imitation system prompt + LLM tournament selection (structural-invention judge replaces heuristic top-1)
3b19377 verified | """Genesis Engine — 5-stage orchestrator with parallel LLM calls.""" | |
| from __future__ import annotations | |
| import logging | |
| import random | |
| from collections import Counter | |
| from concurrent.futures import ThreadPoolExecutor, as_completed | |
| from dataclasses import dataclass, field | |
| from typing import TYPE_CHECKING, Literal | |
| from pydantic import BaseModel, Field | |
| from aether_ad.blending.spaces import BlendingSpaceSet, build_spaces | |
| from aether_ad.core.collision import ALL_RULES, CollisionRule, sample_rules | |
| from aether_ad.core.creative_constraints import ( | |
| EMERGENT_AD_REFS, | |
| CreativeConstraints, | |
| pick_constraints, | |
| ) | |
| from aether_ad.core.narrative import SPINE_15S, SPINE_30S, Beat, PixarSpineRenderer | |
| from aether_ad.core.ohaeng import BeatSlot, pick_script, script_to_prompt | |
| from aether_ad.core.wow_anchors import pick_anchor | |
| from aether_ad.core.wow_filter import CreativityScore, WowFilter | |
| if TYPE_CHECKING: | |
| from aether_ad.core.context import ContextMatrix, TensionArchetype | |
| from aether_ad.core.genome import ProductGenome | |
| from aether_ad.llm.base import LLMBackend | |
| log = logging.getLogger("aether_ad.engine") | |
| class BeatModel(BaseModel): | |
| time_range: tuple[int, int] | |
| beat: str | |
| purpose: str | |
| content: str | |
| element: str | None = None # "wood" | "fire" | "earth" | "metal" | "water" | |
| element_emoji: str | None = None | |
| element_kr: str | None = None | |
| relation_in: str | None = None # "start" | "sheng" | "ke" | |
| relation_reason: str | None = None | |
| def from_beat(cls, b: Beat, slot: BeatSlot | None = None) -> "BeatModel": | |
| base: dict = { | |
| "time_range": b.time_range, | |
| "beat": b.beat.value, | |
| "purpose": b.purpose, | |
| "content": b.content, | |
| } | |
| if slot is not None: | |
| from aether_ad.core.ohaeng import ELEMENTS | |
| el = ELEMENTS[slot.element] | |
| base.update({ | |
| "element": slot.element, | |
| "element_emoji": el.emoji, | |
| "element_kr": el.kr, | |
| "relation_in": slot.relation_in, | |
| "relation_reason": slot.relation_reason, | |
| }) | |
| return cls(**base) | |
| class AdSeed(BaseModel): | |
| """One emergent ad-idea candidate before + after narrative render.""" | |
| seed_id: str | |
| product_id: str | |
| tension_id: str | |
| persona_summary: str | |
| rules_applied: list[str] | |
| concept: str | |
| scene_summary: str | None = None # 10-char Korean headline above concept | |
| duration: int = 15 | |
| beats: list[BeatModel] = Field(default_factory=list) | |
| raw_collision: str | None = None | |
| raw_spine: str | None = None | |
| wow_anchor: str | None = None | |
| ohaeng: list[dict] = Field(default_factory=list) # per-beat element + relation | |
| creative_constraints: dict | None = None # the 6 hard constraints injected into prompt | |
| aether_critique: dict | None = None # AETHER 5-element metacognitive audit (Wood/Fire/Earth/Metal/Water + verdict) | |
| tournament: dict | None = None # LLM judge verdict: structural invention + why this won the round | |
| class ScoredSeed(BaseModel): | |
| seed: AdSeed | |
| score: CreativityScore | |
| class GenesisEngine: | |
| llm: "LLMBackend | None" | |
| context: "ContextMatrix" | |
| corpus_texts: list[str] = field(default_factory=list) | |
| seed_count_per_run: int = 15 | |
| keep_top_k: int = 5 | |
| rng_seed: int | None = None | |
| max_workers: int = 4 | |
| def __post_init__(self) -> None: | |
| self.rng = random.Random(self.rng_seed) | |
| def generate( | |
| self, | |
| product: "ProductGenome", | |
| tension_id: str, | |
| persona_text: str, | |
| duration: Literal[15, 30] = 15, | |
| rules_per_seed: tuple[int, int] = (1, 3), | |
| ) -> list[ScoredSeed]: | |
| tension = self.context.get_tension(tension_id) | |
| log.info("stage=corpus_ingestion product=%s tension=%s", product.product_id, tension_id) | |
| spaces = build_spaces(product, tension, persona_text) | |
| log.info("stage=encoding inputs=%d generic=%d", len(spaces.inputs), len(spaces.generic)) | |
| plans = self._build_diverse_plans(product, rules_per_seed, duration=duration) | |
| log.info("stage=incubation_planning count=%d", len(plans)) | |
| raw_seeds = self._parallel_build_concepts( | |
| plans, spaces, product, tension, persona_text, duration | |
| ) | |
| log.info("stage=incubation count=%d", len(raw_seeds)) | |
| # With Director's Treatment, the same LLM call already produces beats. | |
| # Only run the legacy spine renderer for the template (no-LLM) path | |
| # OR if the treatment came back with empty beats. | |
| rendered: list[AdSeed] | |
| if self.llm is None or any( | |
| not any((b.content or "").strip() for b in s.beats) for s in raw_seeds | |
| ): | |
| slots_by_id = { | |
| f"{product.product_id}_{tension.id}_{p['index']:03d}": p["ohaeng_slots"] | |
| for p in plans | |
| } | |
| blueprint_by_id = { | |
| f"{product.product_id}_{tension.id}_{p['index']:03d}": p["ohaeng_blueprint"] | |
| for p in plans | |
| } | |
| rendered = self._parallel_render_spines(raw_seeds, duration, slots_by_id, blueprint_by_id) | |
| else: | |
| rendered = raw_seeds # treatment already has beats | |
| log.info("stage=emergence rendered=%d", len(rendered)) | |
| wow = WowFilter( | |
| llm=self.llm, | |
| forbidden_zones=self.context.cultural_context.forbidden_zones, | |
| corpus_texts=self.corpus_texts, | |
| ) | |
| scored = self._parallel_score(rendered, wow) | |
| log.info("stage=filtering scored=%d", len(scored)) | |
| # ── Tournament: LLM judge picks the most structurally emergent ─── | |
| # When the backend supports it AND we have multiple candidates, the | |
| # judge override beats the heuristic wow_filter score for the top slot. | |
| if ( | |
| self.llm is not None | |
| and len(scored) > 1 | |
| and hasattr(self.llm, "tournament_select") | |
| and self.keep_top_k <= 5 # only run tournament for tight selections | |
| ): | |
| try: | |
| cands = [] | |
| for s in scored: | |
| seed = s.seed | |
| raw_concept = seed.concept or "" | |
| # Pull KEY_VISUAL / HERO_SHOT directly from concept_text format. | |
| kv = "" | |
| hs = "" | |
| for line in raw_concept.split("\n\n"): | |
| if line.startswith("KEY VISUAL — "): kv = line[len("KEY VISUAL — "):].strip() | |
| elif line.startswith("HERO SHOT — "): hs = line[len("HERO SHOT — "):].strip() | |
| beats_summary = " | ".join( | |
| (b.content or "").replace("\n", " ")[:80] for b in seed.beats[:3] | |
| ) | |
| cands.append({ | |
| "seed_id": seed.seed_id, | |
| "logline": seed.scene_summary or "", | |
| "key_visual": kv, | |
| "hero_shot": hs, | |
| "beats_summary": beats_summary, | |
| }) | |
| verdict = self.llm.tournament_select( | |
| cands, product_id=product.product_id, brand=product.brand, | |
| ) | |
| widx = verdict.get("winner_index", 0) | |
| log.info( | |
| "stage=tournament winner=%d invention=%r", | |
| widx, (verdict.get("structural_invention") or "")[:80], | |
| ) | |
| # Reorder: winner first, then by wow_filter score for the rest | |
| if 0 <= widx < len(scored): | |
| winner = scored[widx] | |
| winner.seed.tournament = verdict | |
| rest = [s for i, s in enumerate(scored) if i != widx] | |
| rest.sort(key=lambda x: x.score.final, reverse=True) | |
| scored = [winner] + rest | |
| else: | |
| scored.sort(key=lambda x: x.score.final, reverse=True) | |
| except Exception as e: | |
| log.warning("tournament_select skipped: %s", e) | |
| scored.sort(key=lambda x: x.score.final, reverse=True) | |
| else: | |
| scored.sort(key=lambda x: x.score.final, reverse=True) | |
| return scored[: self.keep_top_k] | |
| def _build_diverse_plans( | |
| self, | |
| product: "ProductGenome", | |
| rules_per_seed: tuple[int, int], | |
| duration: int = 15, | |
| ) -> list[dict]: | |
| """Build per-seed plans with forced diversity along 4 axes: | |
| - wow_anchor: each seed gets a different anchor | |
| - atom: atoms distributed as evenly as possible | |
| - rule combination: no two seeds share the same sorted rule_id tuple | |
| - ohaeng script: different 5-element scripts across seeds for narrative variety | |
| """ | |
| atoms = product.functional_atoms | |
| atom_use: Counter[str] = Counter() | |
| seen_combos: set[tuple[str, ...]] = set() | |
| plans: list[dict] = [] | |
| for i in range(self.seed_count_per_run): | |
| min_use = min(atom_use.get(a.id, 0) for a in atoms) | |
| atom_candidates = [a for a in atoms if atom_use.get(a.id, 0) == min_use] | |
| atom = self.rng.choice(atom_candidates) | |
| atom_use[atom.id] += 1 | |
| potential = self.rng.choice(atom.abstract_potentials) | |
| rules = sample_rules(self.rng.randint(*rules_per_seed), self.rng) | |
| for _ in range(12): | |
| key = tuple(sorted(r.rule_id for r in rules)) | |
| if key not in seen_combos: | |
| seen_combos.add(key) | |
| break | |
| rules = sample_rules(self.rng.randint(*rules_per_seed), self.rng) | |
| anchor = pick_anchor(i, self.rng) | |
| # Ohaeng script — each seed gets its own 5-element spine | |
| slots = pick_script(duration, self.rng) | |
| blueprint = script_to_prompt(slots) | |
| # Creative constraints — 6 concrete imperatives per seed | |
| constraints = pick_constraints(self.rng) | |
| plans.append( | |
| { | |
| "index": i, | |
| "rules": rules, | |
| "atom": atom, | |
| "potential": potential, | |
| "wow_anchor": anchor, | |
| "ohaeng_slots": slots, | |
| "ohaeng_blueprint": blueprint, | |
| "constraints": constraints, | |
| } | |
| ) | |
| return plans | |
| def _parallel_build_concepts( | |
| self, | |
| plans: list[dict], | |
| spaces: BlendingSpaceSet, | |
| product: "ProductGenome", | |
| tension: "TensionArchetype", | |
| persona_text: str, | |
| duration: int, | |
| ) -> list[AdSeed]: | |
| def build_one(plan: dict) -> AdSeed: | |
| i = plan["index"] | |
| rules = plan["rules"] | |
| atom = plan["atom"] | |
| potential = plan["potential"] | |
| anchor = plan["wow_anchor"] | |
| blueprint = plan["ohaeng_blueprint"] | |
| slots = plan["ohaeng_slots"] | |
| constraints: CreativeConstraints = plan["constraints"] | |
| beat_count = 5 if duration == 15 else 6 | |
| logline, concept, beats_list, raw, critique = self._apply_rules( | |
| spaces, rules, product, tension, persona_text, | |
| atom, potential, anchor, blueprint, constraints, | |
| beat_count=beat_count, duration=duration, | |
| ) | |
| # Build BeatModel list from director treatment beats — time_range | |
| # comes from the spine layout; cinematic content from the LLM. | |
| spine = SPINE_15S if duration == 15 else SPINE_30S | |
| beat_models = [] | |
| for idx, (b_enum, t_range, purpose) in enumerate(spine): | |
| src = beats_list[idx] if idx < len(beats_list) else {} | |
| beat_models.append(BeatModel( | |
| time_range=t_range, | |
| beat=b_enum.value, | |
| purpose=purpose, | |
| content=src.get("content") or "", | |
| element=None, | |
| element_emoji=None, | |
| element_kr=None, | |
| relation_in=None, | |
| relation_reason=None, | |
| )) | |
| seed_obj = AdSeed( | |
| seed_id=f"{product.product_id}_{tension.id}_{i:03d}", | |
| product_id=product.product_id, | |
| tension_id=tension.id, | |
| persona_summary=persona_text, | |
| rules_applied=[r.rule_id for r in rules], | |
| concept=concept, | |
| scene_summary=logline or None, | |
| duration=duration, | |
| raw_collision=raw, | |
| wow_anchor=anchor, | |
| creative_constraints=constraints.as_summary(), | |
| aether_critique=critique, | |
| beats=beat_models, | |
| raw_spine=raw, # treatment includes beats; reuse raw | |
| ohaeng=[ | |
| { | |
| "index": s.index, | |
| "beat_name": s.beat_name, | |
| "element": s.element, | |
| "emoji": s.relation_reason.split()[0] if s.relation_in == "start" else "", | |
| "relation_in": s.relation_in, | |
| "relation_reason": s.relation_reason, | |
| } | |
| for s in slots | |
| ], | |
| ) | |
| return seed_obj | |
| if self.llm is None or len(plans) <= 1: | |
| return [build_one(p) for p in plans] | |
| results: list[AdSeed | None] = [None] * len(plans) | |
| with ThreadPoolExecutor(max_workers=self.max_workers) as ex: | |
| futs = {ex.submit(build_one, p): p["index"] for p in plans} | |
| for fut in as_completed(futs): | |
| idx = futs[fut] | |
| try: | |
| results[idx] = fut.result() | |
| except Exception as e: | |
| log.warning("build_one %d failed: %s", idx, e) | |
| return [r for r in results if r is not None] | |
| def _parallel_render_spines( | |
| self, | |
| seeds: list[AdSeed], | |
| duration: int, | |
| slots_by_id: dict[str, list[BeatSlot]] | None = None, | |
| blueprint_by_id: dict[str, str] | None = None, | |
| ) -> list[AdSeed]: | |
| renderer = PixarSpineRenderer(duration=duration, llm=self.llm) | |
| slots_by_id = slots_by_id or {} | |
| blueprint_by_id = blueprint_by_id or {} | |
| def render_one(s: AdSeed) -> AdSeed: | |
| blueprint = blueprint_by_id.get(s.seed_id, "") | |
| slots = slots_by_id.get(s.seed_id, []) | |
| beats, raw = renderer.render(s, ohaeng_blueprint=blueprint) | |
| s.beats = [ | |
| BeatModel.from_beat(b, slots[i] if i < len(slots) else None) | |
| for i, b in enumerate(beats) | |
| ] | |
| if raw is not None: | |
| s.raw_spine = raw | |
| return s | |
| if self.llm is None or len(seeds) <= 1: | |
| return [render_one(s) for s in seeds] | |
| results: list[AdSeed | None] = [None] * len(seeds) | |
| with ThreadPoolExecutor(max_workers=self.max_workers) as ex: | |
| futs = {ex.submit(render_one, s): i for i, s in enumerate(seeds)} | |
| for fut in as_completed(futs): | |
| idx = futs[fut] | |
| try: | |
| results[idx] = fut.result() | |
| except Exception as e: | |
| log.warning("render_one %d failed: %s", idx, e) | |
| results[idx] = seeds[idx] | |
| return [r for r in results if r is not None] | |
| def _parallel_score(self, seeds: list[AdSeed], wow: WowFilter) -> list[ScoredSeed]: | |
| def score_one(s: AdSeed) -> ScoredSeed: | |
| return ScoredSeed(seed=s, score=wow.score(s)) | |
| if self.llm is None or len(seeds) <= 1: | |
| return [score_one(s) for s in seeds] | |
| results: list[ScoredSeed | None] = [None] * len(seeds) | |
| with ThreadPoolExecutor(max_workers=self.max_workers) as ex: | |
| futs = {ex.submit(score_one, s): i for i, s in enumerate(seeds)} | |
| for fut in as_completed(futs): | |
| idx = futs[fut] | |
| try: | |
| results[idx] = fut.result() | |
| except Exception as e: | |
| log.warning("score_one %d failed: %s", idx, e) | |
| return [r for r in results if r is not None] | |
| def _apply_rules( | |
| self, | |
| spaces: BlendingSpaceSet, | |
| rules: list[CollisionRule], | |
| product: "ProductGenome", | |
| tension: "TensionArchetype", | |
| persona_text: str, | |
| atom, | |
| potential: str, | |
| wow_anchor: str, | |
| ohaeng_blueprint: str, | |
| constraints: CreativeConstraints, | |
| beat_count: int = 5, | |
| duration: int = 15, | |
| ) -> tuple[str, str, list[dict], str | None, dict | None]: | |
| """Return (logline, concept_text, beats_list, raw, critique). | |
| critique is the AETHER 5-element metacognitive audit (Wood/Fire/Earth/ | |
| Metal/Water + verdict + revision_brief). None if backend doesn't | |
| support metacognition or if the call failed. | |
| """ | |
| if self.llm is not None: | |
| # Prefer metacognition path (draft → critique → revise) when backend exposes it. | |
| if hasattr(self.llm, "apply_collision_with_metacognition"): | |
| try: | |
| logline, concept, beats_list, raw, critique = ( | |
| self.llm.apply_collision_with_metacognition( | |
| atom=atom, | |
| potential=potential, | |
| rules=rules, | |
| tension=tension, | |
| persona=persona_text, | |
| product=product, | |
| wow_anchor=wow_anchor, | |
| ohaeng_blueprint=ohaeng_blueprint, | |
| creative_constraints=constraints.as_prompt_block(), | |
| emergent_refs=EMERGENT_AD_REFS, | |
| beat_count=beat_count, | |
| duration=duration, | |
| ) | |
| ) | |
| return logline, concept, beats_list, raw, critique | |
| except Exception as e: | |
| log.warning("llm.apply_collision_with_metacognition failed: %s", e) | |
| try: | |
| logline, concept, beats_list, raw = self.llm.apply_collision( | |
| atom=atom, | |
| potential=potential, | |
| rules=rules, | |
| tension=tension, | |
| persona=persona_text, | |
| product=product, | |
| wow_anchor=wow_anchor, | |
| ohaeng_blueprint=ohaeng_blueprint, | |
| creative_constraints=constraints.as_prompt_block(), | |
| emergent_refs=EMERGENT_AD_REFS, | |
| beat_count=beat_count, | |
| duration=duration, | |
| ) | |
| return logline, concept, beats_list, raw, None | |
| except Exception as e: | |
| log.warning("llm.apply_collision failed: %s", e) | |
| # Template fallback (no LLM): produce a placeholder treatment | |
| rule_fragments = " | ".join(r.llm_prompt_fragment for r in rules) | |
| placeholder_beats = [ | |
| {"shot": "", "action": "", "sound": "", "duration": "", "content": ""} | |
| for _ in range(beat_count) | |
| ] | |
| return ( | |
| "", | |
| f"[{atom.name}] '{potential}' → " | |
| f"{tension.greimas_square.S1} vs {tension.greimas_square.S2} 충돌 " | |
| f"({persona_text}). 앵커: {wow_anchor}. 규칙: {rule_fragments}", | |
| placeholder_beats, | |
| None, | |
| None, | |
| ) | |
| def slai_feedback(self, approved_seeds: list[ScoredSeed]) -> str | None: | |
| if self.llm is None or not approved_seeds: | |
| return None | |
| try: | |
| return self.llm.propose_new_rule(approved_seeds) | |
| except Exception as e: | |
| log.warning("slai_feedback failed: %s", e) | |
| return None | |
| __all__ = ["AdSeed", "BeatModel", "ScoredSeed", "GenesisEngine"] | |