"""Genesis Engine — 5-stage orchestrator with parallel LLM calls.""" from __future__ import annotations import logging import random from collections import Counter from concurrent.futures import ThreadPoolExecutor, as_completed from dataclasses import dataclass, field from typing import TYPE_CHECKING, Literal from pydantic import BaseModel, Field from aether_ad.blending.spaces import BlendingSpaceSet, build_spaces from aether_ad.core.collision import ALL_RULES, CollisionRule, sample_rules from aether_ad.core.creative_constraints import ( EMERGENT_AD_REFS, CreativeConstraints, pick_constraints, ) from aether_ad.core.narrative import SPINE_15S, SPINE_30S, Beat, PixarSpineRenderer from aether_ad.core.ohaeng import BeatSlot, pick_script, script_to_prompt from aether_ad.core.wow_anchors import pick_anchor from aether_ad.core.wow_filter import CreativityScore, WowFilter if TYPE_CHECKING: from aether_ad.core.context import ContextMatrix, TensionArchetype from aether_ad.core.genome import ProductGenome from aether_ad.llm.base import LLMBackend log = logging.getLogger("aether_ad.engine") class BeatModel(BaseModel): time_range: tuple[int, int] beat: str purpose: str content: str element: str | None = None # "wood" | "fire" | "earth" | "metal" | "water" element_emoji: str | None = None element_kr: str | None = None relation_in: str | None = None # "start" | "sheng" | "ke" relation_reason: str | None = None @classmethod def from_beat(cls, b: Beat, slot: BeatSlot | None = None) -> "BeatModel": base: dict = { "time_range": b.time_range, "beat": b.beat.value, "purpose": b.purpose, "content": b.content, } if slot is not None: from aether_ad.core.ohaeng import ELEMENTS el = ELEMENTS[slot.element] base.update({ "element": slot.element, "element_emoji": el.emoji, "element_kr": el.kr, "relation_in": slot.relation_in, "relation_reason": slot.relation_reason, }) return cls(**base) class AdSeed(BaseModel): """One emergent ad-idea candidate before + after narrative render.""" seed_id: str product_id: str tension_id: str persona_summary: str rules_applied: list[str] concept: str scene_summary: str | None = None # 10-char Korean headline above concept duration: int = 15 beats: list[BeatModel] = Field(default_factory=list) raw_collision: str | None = None raw_spine: str | None = None wow_anchor: str | None = None ohaeng: list[dict] = Field(default_factory=list) # per-beat element + relation creative_constraints: dict | None = None # the 6 hard constraints injected into prompt aether_critique: dict | None = None # AETHER 5-element metacognitive audit (Wood/Fire/Earth/Metal/Water + verdict) tournament: dict | None = None # LLM judge verdict: structural invention + why this won the round class ScoredSeed(BaseModel): seed: AdSeed score: CreativityScore @dataclass class GenesisEngine: llm: "LLMBackend | None" context: "ContextMatrix" corpus_texts: list[str] = field(default_factory=list) seed_count_per_run: int = 15 keep_top_k: int = 5 rng_seed: int | None = None max_workers: int = 4 def __post_init__(self) -> None: self.rng = random.Random(self.rng_seed) def generate( self, product: "ProductGenome", tension_id: str, persona_text: str, duration: Literal[15, 30] = 15, rules_per_seed: tuple[int, int] = (1, 3), ) -> list[ScoredSeed]: tension = self.context.get_tension(tension_id) log.info("stage=corpus_ingestion product=%s tension=%s", product.product_id, tension_id) spaces = build_spaces(product, tension, persona_text) log.info("stage=encoding inputs=%d generic=%d", len(spaces.inputs), len(spaces.generic)) plans = self._build_diverse_plans(product, rules_per_seed, duration=duration) log.info("stage=incubation_planning count=%d", len(plans)) raw_seeds = self._parallel_build_concepts( plans, spaces, product, tension, persona_text, duration ) log.info("stage=incubation count=%d", len(raw_seeds)) # With Director's Treatment, the same LLM call already produces beats. # Only run the legacy spine renderer for the template (no-LLM) path # OR if the treatment came back with empty beats. rendered: list[AdSeed] if self.llm is None or any( not any((b.content or "").strip() for b in s.beats) for s in raw_seeds ): slots_by_id = { f"{product.product_id}_{tension.id}_{p['index']:03d}": p["ohaeng_slots"] for p in plans } blueprint_by_id = { f"{product.product_id}_{tension.id}_{p['index']:03d}": p["ohaeng_blueprint"] for p in plans } rendered = self._parallel_render_spines(raw_seeds, duration, slots_by_id, blueprint_by_id) else: rendered = raw_seeds # treatment already has beats log.info("stage=emergence rendered=%d", len(rendered)) wow = WowFilter( llm=self.llm, forbidden_zones=self.context.cultural_context.forbidden_zones, corpus_texts=self.corpus_texts, ) scored = self._parallel_score(rendered, wow) log.info("stage=filtering scored=%d", len(scored)) # ── Tournament: LLM judge picks the most structurally emergent ─── # When the backend supports it AND we have multiple candidates, the # judge override beats the heuristic wow_filter score for the top slot. if ( self.llm is not None and len(scored) > 1 and hasattr(self.llm, "tournament_select") and self.keep_top_k <= 5 # only run tournament for tight selections ): try: cands = [] for s in scored: seed = s.seed raw_concept = seed.concept or "" # Pull KEY_VISUAL / HERO_SHOT directly from concept_text format. kv = "" hs = "" for line in raw_concept.split("\n\n"): if line.startswith("KEY VISUAL — "): kv = line[len("KEY VISUAL — "):].strip() elif line.startswith("HERO SHOT — "): hs = line[len("HERO SHOT — "):].strip() beats_summary = " | ".join( (b.content or "").replace("\n", " ")[:80] for b in seed.beats[:3] ) cands.append({ "seed_id": seed.seed_id, "logline": seed.scene_summary or "", "key_visual": kv, "hero_shot": hs, "beats_summary": beats_summary, }) verdict = self.llm.tournament_select( cands, product_id=product.product_id, brand=product.brand, ) widx = verdict.get("winner_index", 0) log.info( "stage=tournament winner=%d invention=%r", widx, (verdict.get("structural_invention") or "")[:80], ) # Reorder: winner first, then by wow_filter score for the rest if 0 <= widx < len(scored): winner = scored[widx] winner.seed.tournament = verdict rest = [s for i, s in enumerate(scored) if i != widx] rest.sort(key=lambda x: x.score.final, reverse=True) scored = [winner] + rest else: scored.sort(key=lambda x: x.score.final, reverse=True) except Exception as e: log.warning("tournament_select skipped: %s", e) scored.sort(key=lambda x: x.score.final, reverse=True) else: scored.sort(key=lambda x: x.score.final, reverse=True) return scored[: self.keep_top_k] def _build_diverse_plans( self, product: "ProductGenome", rules_per_seed: tuple[int, int], duration: int = 15, ) -> list[dict]: """Build per-seed plans with forced diversity along 4 axes: - wow_anchor: each seed gets a different anchor - atom: atoms distributed as evenly as possible - rule combination: no two seeds share the same sorted rule_id tuple - ohaeng script: different 5-element scripts across seeds for narrative variety """ atoms = product.functional_atoms atom_use: Counter[str] = Counter() seen_combos: set[tuple[str, ...]] = set() plans: list[dict] = [] for i in range(self.seed_count_per_run): min_use = min(atom_use.get(a.id, 0) for a in atoms) atom_candidates = [a for a in atoms if atom_use.get(a.id, 0) == min_use] atom = self.rng.choice(atom_candidates) atom_use[atom.id] += 1 potential = self.rng.choice(atom.abstract_potentials) rules = sample_rules(self.rng.randint(*rules_per_seed), self.rng) for _ in range(12): key = tuple(sorted(r.rule_id for r in rules)) if key not in seen_combos: seen_combos.add(key) break rules = sample_rules(self.rng.randint(*rules_per_seed), self.rng) anchor = pick_anchor(i, self.rng) # Ohaeng script — each seed gets its own 5-element spine slots = pick_script(duration, self.rng) blueprint = script_to_prompt(slots) # Creative constraints — 6 concrete imperatives per seed constraints = pick_constraints(self.rng) plans.append( { "index": i, "rules": rules, "atom": atom, "potential": potential, "wow_anchor": anchor, "ohaeng_slots": slots, "ohaeng_blueprint": blueprint, "constraints": constraints, } ) return plans def _parallel_build_concepts( self, plans: list[dict], spaces: BlendingSpaceSet, product: "ProductGenome", tension: "TensionArchetype", persona_text: str, duration: int, ) -> list[AdSeed]: def build_one(plan: dict) -> AdSeed: i = plan["index"] rules = plan["rules"] atom = plan["atom"] potential = plan["potential"] anchor = plan["wow_anchor"] blueprint = plan["ohaeng_blueprint"] slots = plan["ohaeng_slots"] constraints: CreativeConstraints = plan["constraints"] beat_count = 5 if duration == 15 else 6 logline, concept, beats_list, raw, critique = self._apply_rules( spaces, rules, product, tension, persona_text, atom, potential, anchor, blueprint, constraints, beat_count=beat_count, duration=duration, ) # Build BeatModel list from director treatment beats — time_range # comes from the spine layout; cinematic content from the LLM. spine = SPINE_15S if duration == 15 else SPINE_30S beat_models = [] for idx, (b_enum, t_range, purpose) in enumerate(spine): src = beats_list[idx] if idx < len(beats_list) else {} beat_models.append(BeatModel( time_range=t_range, beat=b_enum.value, purpose=purpose, content=src.get("content") or "", element=None, element_emoji=None, element_kr=None, relation_in=None, relation_reason=None, )) seed_obj = AdSeed( seed_id=f"{product.product_id}_{tension.id}_{i:03d}", product_id=product.product_id, tension_id=tension.id, persona_summary=persona_text, rules_applied=[r.rule_id for r in rules], concept=concept, scene_summary=logline or None, duration=duration, raw_collision=raw, wow_anchor=anchor, creative_constraints=constraints.as_summary(), aether_critique=critique, beats=beat_models, raw_spine=raw, # treatment includes beats; reuse raw ohaeng=[ { "index": s.index, "beat_name": s.beat_name, "element": s.element, "emoji": s.relation_reason.split()[0] if s.relation_in == "start" else "", "relation_in": s.relation_in, "relation_reason": s.relation_reason, } for s in slots ], ) return seed_obj if self.llm is None or len(plans) <= 1: return [build_one(p) for p in plans] results: list[AdSeed | None] = [None] * len(plans) with ThreadPoolExecutor(max_workers=self.max_workers) as ex: futs = {ex.submit(build_one, p): p["index"] for p in plans} for fut in as_completed(futs): idx = futs[fut] try: results[idx] = fut.result() except Exception as e: log.warning("build_one %d failed: %s", idx, e) return [r for r in results if r is not None] def _parallel_render_spines( self, seeds: list[AdSeed], duration: int, slots_by_id: dict[str, list[BeatSlot]] | None = None, blueprint_by_id: dict[str, str] | None = None, ) -> list[AdSeed]: renderer = PixarSpineRenderer(duration=duration, llm=self.llm) slots_by_id = slots_by_id or {} blueprint_by_id = blueprint_by_id or {} def render_one(s: AdSeed) -> AdSeed: blueprint = blueprint_by_id.get(s.seed_id, "") slots = slots_by_id.get(s.seed_id, []) beats, raw = renderer.render(s, ohaeng_blueprint=blueprint) s.beats = [ BeatModel.from_beat(b, slots[i] if i < len(slots) else None) for i, b in enumerate(beats) ] if raw is not None: s.raw_spine = raw return s if self.llm is None or len(seeds) <= 1: return [render_one(s) for s in seeds] results: list[AdSeed | None] = [None] * len(seeds) with ThreadPoolExecutor(max_workers=self.max_workers) as ex: futs = {ex.submit(render_one, s): i for i, s in enumerate(seeds)} for fut in as_completed(futs): idx = futs[fut] try: results[idx] = fut.result() except Exception as e: log.warning("render_one %d failed: %s", idx, e) results[idx] = seeds[idx] return [r for r in results if r is not None] def _parallel_score(self, seeds: list[AdSeed], wow: WowFilter) -> list[ScoredSeed]: def score_one(s: AdSeed) -> ScoredSeed: return ScoredSeed(seed=s, score=wow.score(s)) if self.llm is None or len(seeds) <= 1: return [score_one(s) for s in seeds] results: list[ScoredSeed | None] = [None] * len(seeds) with ThreadPoolExecutor(max_workers=self.max_workers) as ex: futs = {ex.submit(score_one, s): i for i, s in enumerate(seeds)} for fut in as_completed(futs): idx = futs[fut] try: results[idx] = fut.result() except Exception as e: log.warning("score_one %d failed: %s", idx, e) return [r for r in results if r is not None] def _apply_rules( self, spaces: BlendingSpaceSet, rules: list[CollisionRule], product: "ProductGenome", tension: "TensionArchetype", persona_text: str, atom, potential: str, wow_anchor: str, ohaeng_blueprint: str, constraints: CreativeConstraints, beat_count: int = 5, duration: int = 15, ) -> tuple[str, str, list[dict], str | None, dict | None]: """Return (logline, concept_text, beats_list, raw, critique). critique is the AETHER 5-element metacognitive audit (Wood/Fire/Earth/ Metal/Water + verdict + revision_brief). None if backend doesn't support metacognition or if the call failed. """ if self.llm is not None: # Prefer metacognition path (draft → critique → revise) when backend exposes it. if hasattr(self.llm, "apply_collision_with_metacognition"): try: logline, concept, beats_list, raw, critique = ( self.llm.apply_collision_with_metacognition( atom=atom, potential=potential, rules=rules, tension=tension, persona=persona_text, product=product, wow_anchor=wow_anchor, ohaeng_blueprint=ohaeng_blueprint, creative_constraints=constraints.as_prompt_block(), emergent_refs=EMERGENT_AD_REFS, beat_count=beat_count, duration=duration, ) ) return logline, concept, beats_list, raw, critique except Exception as e: log.warning("llm.apply_collision_with_metacognition failed: %s", e) try: logline, concept, beats_list, raw = self.llm.apply_collision( atom=atom, potential=potential, rules=rules, tension=tension, persona=persona_text, product=product, wow_anchor=wow_anchor, ohaeng_blueprint=ohaeng_blueprint, creative_constraints=constraints.as_prompt_block(), emergent_refs=EMERGENT_AD_REFS, beat_count=beat_count, duration=duration, ) return logline, concept, beats_list, raw, None except Exception as e: log.warning("llm.apply_collision failed: %s", e) # Template fallback (no LLM): produce a placeholder treatment rule_fragments = " | ".join(r.llm_prompt_fragment for r in rules) placeholder_beats = [ {"shot": "", "action": "", "sound": "", "duration": "", "content": ""} for _ in range(beat_count) ] return ( "", f"[{atom.name}] '{potential}' → " f"{tension.greimas_square.S1} vs {tension.greimas_square.S2} 충돌 " f"({persona_text}). 앵커: {wow_anchor}. 규칙: {rule_fragments}", placeholder_beats, None, None, ) def slai_feedback(self, approved_seeds: list[ScoredSeed]) -> str | None: if self.llm is None or not approved_seeds: return None try: return self.llm.propose_new_rule(approved_seeds) except Exception as e: log.warning("slai_feedback failed: %s", e) return None __all__ = ["AdSeed", "BeatModel", "ScoredSeed", "GenesisEngine"]