SeaWolf-AI's picture
feat: real emergence — anti-imitation system prompt + LLM tournament selection (structural-invention judge replaces heuristic top-1)
3b19377 verified
"""Genesis Engine — 5-stage orchestrator with parallel LLM calls."""
from __future__ import annotations
import logging
import random
from collections import Counter
from concurrent.futures import ThreadPoolExecutor, as_completed
from dataclasses import dataclass, field
from typing import TYPE_CHECKING, Literal
from pydantic import BaseModel, Field
from aether_ad.blending.spaces import BlendingSpaceSet, build_spaces
from aether_ad.core.collision import ALL_RULES, CollisionRule, sample_rules
from aether_ad.core.creative_constraints import (
EMERGENT_AD_REFS,
CreativeConstraints,
pick_constraints,
)
from aether_ad.core.narrative import SPINE_15S, SPINE_30S, Beat, PixarSpineRenderer
from aether_ad.core.ohaeng import BeatSlot, pick_script, script_to_prompt
from aether_ad.core.wow_anchors import pick_anchor
from aether_ad.core.wow_filter import CreativityScore, WowFilter
if TYPE_CHECKING:
from aether_ad.core.context import ContextMatrix, TensionArchetype
from aether_ad.core.genome import ProductGenome
from aether_ad.llm.base import LLMBackend
log = logging.getLogger("aether_ad.engine")
class BeatModel(BaseModel):
time_range: tuple[int, int]
beat: str
purpose: str
content: str
element: str | None = None # "wood" | "fire" | "earth" | "metal" | "water"
element_emoji: str | None = None
element_kr: str | None = None
relation_in: str | None = None # "start" | "sheng" | "ke"
relation_reason: str | None = None
@classmethod
def from_beat(cls, b: Beat, slot: BeatSlot | None = None) -> "BeatModel":
base: dict = {
"time_range": b.time_range,
"beat": b.beat.value,
"purpose": b.purpose,
"content": b.content,
}
if slot is not None:
from aether_ad.core.ohaeng import ELEMENTS
el = ELEMENTS[slot.element]
base.update({
"element": slot.element,
"element_emoji": el.emoji,
"element_kr": el.kr,
"relation_in": slot.relation_in,
"relation_reason": slot.relation_reason,
})
return cls(**base)
class AdSeed(BaseModel):
"""One emergent ad-idea candidate before + after narrative render."""
seed_id: str
product_id: str
tension_id: str
persona_summary: str
rules_applied: list[str]
concept: str
scene_summary: str | None = None # 10-char Korean headline above concept
duration: int = 15
beats: list[BeatModel] = Field(default_factory=list)
raw_collision: str | None = None
raw_spine: str | None = None
wow_anchor: str | None = None
ohaeng: list[dict] = Field(default_factory=list) # per-beat element + relation
creative_constraints: dict | None = None # the 6 hard constraints injected into prompt
aether_critique: dict | None = None # AETHER 5-element metacognitive audit (Wood/Fire/Earth/Metal/Water + verdict)
tournament: dict | None = None # LLM judge verdict: structural invention + why this won the round
class ScoredSeed(BaseModel):
seed: AdSeed
score: CreativityScore
@dataclass
class GenesisEngine:
llm: "LLMBackend | None"
context: "ContextMatrix"
corpus_texts: list[str] = field(default_factory=list)
seed_count_per_run: int = 15
keep_top_k: int = 5
rng_seed: int | None = None
max_workers: int = 4
def __post_init__(self) -> None:
self.rng = random.Random(self.rng_seed)
def generate(
self,
product: "ProductGenome",
tension_id: str,
persona_text: str,
duration: Literal[15, 30] = 15,
rules_per_seed: tuple[int, int] = (1, 3),
) -> list[ScoredSeed]:
tension = self.context.get_tension(tension_id)
log.info("stage=corpus_ingestion product=%s tension=%s", product.product_id, tension_id)
spaces = build_spaces(product, tension, persona_text)
log.info("stage=encoding inputs=%d generic=%d", len(spaces.inputs), len(spaces.generic))
plans = self._build_diverse_plans(product, rules_per_seed, duration=duration)
log.info("stage=incubation_planning count=%d", len(plans))
raw_seeds = self._parallel_build_concepts(
plans, spaces, product, tension, persona_text, duration
)
log.info("stage=incubation count=%d", len(raw_seeds))
# With Director's Treatment, the same LLM call already produces beats.
# Only run the legacy spine renderer for the template (no-LLM) path
# OR if the treatment came back with empty beats.
rendered: list[AdSeed]
if self.llm is None or any(
not any((b.content or "").strip() for b in s.beats) for s in raw_seeds
):
slots_by_id = {
f"{product.product_id}_{tension.id}_{p['index']:03d}": p["ohaeng_slots"]
for p in plans
}
blueprint_by_id = {
f"{product.product_id}_{tension.id}_{p['index']:03d}": p["ohaeng_blueprint"]
for p in plans
}
rendered = self._parallel_render_spines(raw_seeds, duration, slots_by_id, blueprint_by_id)
else:
rendered = raw_seeds # treatment already has beats
log.info("stage=emergence rendered=%d", len(rendered))
wow = WowFilter(
llm=self.llm,
forbidden_zones=self.context.cultural_context.forbidden_zones,
corpus_texts=self.corpus_texts,
)
scored = self._parallel_score(rendered, wow)
log.info("stage=filtering scored=%d", len(scored))
# ── Tournament: LLM judge picks the most structurally emergent ───
# When the backend supports it AND we have multiple candidates, the
# judge override beats the heuristic wow_filter score for the top slot.
if (
self.llm is not None
and len(scored) > 1
and hasattr(self.llm, "tournament_select")
and self.keep_top_k <= 5 # only run tournament for tight selections
):
try:
cands = []
for s in scored:
seed = s.seed
raw_concept = seed.concept or ""
# Pull KEY_VISUAL / HERO_SHOT directly from concept_text format.
kv = ""
hs = ""
for line in raw_concept.split("\n\n"):
if line.startswith("KEY VISUAL — "): kv = line[len("KEY VISUAL — "):].strip()
elif line.startswith("HERO SHOT — "): hs = line[len("HERO SHOT — "):].strip()
beats_summary = " | ".join(
(b.content or "").replace("\n", " ")[:80] for b in seed.beats[:3]
)
cands.append({
"seed_id": seed.seed_id,
"logline": seed.scene_summary or "",
"key_visual": kv,
"hero_shot": hs,
"beats_summary": beats_summary,
})
verdict = self.llm.tournament_select(
cands, product_id=product.product_id, brand=product.brand,
)
widx = verdict.get("winner_index", 0)
log.info(
"stage=tournament winner=%d invention=%r",
widx, (verdict.get("structural_invention") or "")[:80],
)
# Reorder: winner first, then by wow_filter score for the rest
if 0 <= widx < len(scored):
winner = scored[widx]
winner.seed.tournament = verdict
rest = [s for i, s in enumerate(scored) if i != widx]
rest.sort(key=lambda x: x.score.final, reverse=True)
scored = [winner] + rest
else:
scored.sort(key=lambda x: x.score.final, reverse=True)
except Exception as e:
log.warning("tournament_select skipped: %s", e)
scored.sort(key=lambda x: x.score.final, reverse=True)
else:
scored.sort(key=lambda x: x.score.final, reverse=True)
return scored[: self.keep_top_k]
def _build_diverse_plans(
self,
product: "ProductGenome",
rules_per_seed: tuple[int, int],
duration: int = 15,
) -> list[dict]:
"""Build per-seed plans with forced diversity along 4 axes:
- wow_anchor: each seed gets a different anchor
- atom: atoms distributed as evenly as possible
- rule combination: no two seeds share the same sorted rule_id tuple
- ohaeng script: different 5-element scripts across seeds for narrative variety
"""
atoms = product.functional_atoms
atom_use: Counter[str] = Counter()
seen_combos: set[tuple[str, ...]] = set()
plans: list[dict] = []
for i in range(self.seed_count_per_run):
min_use = min(atom_use.get(a.id, 0) for a in atoms)
atom_candidates = [a for a in atoms if atom_use.get(a.id, 0) == min_use]
atom = self.rng.choice(atom_candidates)
atom_use[atom.id] += 1
potential = self.rng.choice(atom.abstract_potentials)
rules = sample_rules(self.rng.randint(*rules_per_seed), self.rng)
for _ in range(12):
key = tuple(sorted(r.rule_id for r in rules))
if key not in seen_combos:
seen_combos.add(key)
break
rules = sample_rules(self.rng.randint(*rules_per_seed), self.rng)
anchor = pick_anchor(i, self.rng)
# Ohaeng script — each seed gets its own 5-element spine
slots = pick_script(duration, self.rng)
blueprint = script_to_prompt(slots)
# Creative constraints — 6 concrete imperatives per seed
constraints = pick_constraints(self.rng)
plans.append(
{
"index": i,
"rules": rules,
"atom": atom,
"potential": potential,
"wow_anchor": anchor,
"ohaeng_slots": slots,
"ohaeng_blueprint": blueprint,
"constraints": constraints,
}
)
return plans
def _parallel_build_concepts(
self,
plans: list[dict],
spaces: BlendingSpaceSet,
product: "ProductGenome",
tension: "TensionArchetype",
persona_text: str,
duration: int,
) -> list[AdSeed]:
def build_one(plan: dict) -> AdSeed:
i = plan["index"]
rules = plan["rules"]
atom = plan["atom"]
potential = plan["potential"]
anchor = plan["wow_anchor"]
blueprint = plan["ohaeng_blueprint"]
slots = plan["ohaeng_slots"]
constraints: CreativeConstraints = plan["constraints"]
beat_count = 5 if duration == 15 else 6
logline, concept, beats_list, raw, critique = self._apply_rules(
spaces, rules, product, tension, persona_text,
atom, potential, anchor, blueprint, constraints,
beat_count=beat_count, duration=duration,
)
# Build BeatModel list from director treatment beats — time_range
# comes from the spine layout; cinematic content from the LLM.
spine = SPINE_15S if duration == 15 else SPINE_30S
beat_models = []
for idx, (b_enum, t_range, purpose) in enumerate(spine):
src = beats_list[idx] if idx < len(beats_list) else {}
beat_models.append(BeatModel(
time_range=t_range,
beat=b_enum.value,
purpose=purpose,
content=src.get("content") or "",
element=None,
element_emoji=None,
element_kr=None,
relation_in=None,
relation_reason=None,
))
seed_obj = AdSeed(
seed_id=f"{product.product_id}_{tension.id}_{i:03d}",
product_id=product.product_id,
tension_id=tension.id,
persona_summary=persona_text,
rules_applied=[r.rule_id for r in rules],
concept=concept,
scene_summary=logline or None,
duration=duration,
raw_collision=raw,
wow_anchor=anchor,
creative_constraints=constraints.as_summary(),
aether_critique=critique,
beats=beat_models,
raw_spine=raw, # treatment includes beats; reuse raw
ohaeng=[
{
"index": s.index,
"beat_name": s.beat_name,
"element": s.element,
"emoji": s.relation_reason.split()[0] if s.relation_in == "start" else "",
"relation_in": s.relation_in,
"relation_reason": s.relation_reason,
}
for s in slots
],
)
return seed_obj
if self.llm is None or len(plans) <= 1:
return [build_one(p) for p in plans]
results: list[AdSeed | None] = [None] * len(plans)
with ThreadPoolExecutor(max_workers=self.max_workers) as ex:
futs = {ex.submit(build_one, p): p["index"] for p in plans}
for fut in as_completed(futs):
idx = futs[fut]
try:
results[idx] = fut.result()
except Exception as e:
log.warning("build_one %d failed: %s", idx, e)
return [r for r in results if r is not None]
def _parallel_render_spines(
self,
seeds: list[AdSeed],
duration: int,
slots_by_id: dict[str, list[BeatSlot]] | None = None,
blueprint_by_id: dict[str, str] | None = None,
) -> list[AdSeed]:
renderer = PixarSpineRenderer(duration=duration, llm=self.llm)
slots_by_id = slots_by_id or {}
blueprint_by_id = blueprint_by_id or {}
def render_one(s: AdSeed) -> AdSeed:
blueprint = blueprint_by_id.get(s.seed_id, "")
slots = slots_by_id.get(s.seed_id, [])
beats, raw = renderer.render(s, ohaeng_blueprint=blueprint)
s.beats = [
BeatModel.from_beat(b, slots[i] if i < len(slots) else None)
for i, b in enumerate(beats)
]
if raw is not None:
s.raw_spine = raw
return s
if self.llm is None or len(seeds) <= 1:
return [render_one(s) for s in seeds]
results: list[AdSeed | None] = [None] * len(seeds)
with ThreadPoolExecutor(max_workers=self.max_workers) as ex:
futs = {ex.submit(render_one, s): i for i, s in enumerate(seeds)}
for fut in as_completed(futs):
idx = futs[fut]
try:
results[idx] = fut.result()
except Exception as e:
log.warning("render_one %d failed: %s", idx, e)
results[idx] = seeds[idx]
return [r for r in results if r is not None]
def _parallel_score(self, seeds: list[AdSeed], wow: WowFilter) -> list[ScoredSeed]:
def score_one(s: AdSeed) -> ScoredSeed:
return ScoredSeed(seed=s, score=wow.score(s))
if self.llm is None or len(seeds) <= 1:
return [score_one(s) for s in seeds]
results: list[ScoredSeed | None] = [None] * len(seeds)
with ThreadPoolExecutor(max_workers=self.max_workers) as ex:
futs = {ex.submit(score_one, s): i for i, s in enumerate(seeds)}
for fut in as_completed(futs):
idx = futs[fut]
try:
results[idx] = fut.result()
except Exception as e:
log.warning("score_one %d failed: %s", idx, e)
return [r for r in results if r is not None]
def _apply_rules(
self,
spaces: BlendingSpaceSet,
rules: list[CollisionRule],
product: "ProductGenome",
tension: "TensionArchetype",
persona_text: str,
atom,
potential: str,
wow_anchor: str,
ohaeng_blueprint: str,
constraints: CreativeConstraints,
beat_count: int = 5,
duration: int = 15,
) -> tuple[str, str, list[dict], str | None, dict | None]:
"""Return (logline, concept_text, beats_list, raw, critique).
critique is the AETHER 5-element metacognitive audit (Wood/Fire/Earth/
Metal/Water + verdict + revision_brief). None if backend doesn't
support metacognition or if the call failed.
"""
if self.llm is not None:
# Prefer metacognition path (draft → critique → revise) when backend exposes it.
if hasattr(self.llm, "apply_collision_with_metacognition"):
try:
logline, concept, beats_list, raw, critique = (
self.llm.apply_collision_with_metacognition(
atom=atom,
potential=potential,
rules=rules,
tension=tension,
persona=persona_text,
product=product,
wow_anchor=wow_anchor,
ohaeng_blueprint=ohaeng_blueprint,
creative_constraints=constraints.as_prompt_block(),
emergent_refs=EMERGENT_AD_REFS,
beat_count=beat_count,
duration=duration,
)
)
return logline, concept, beats_list, raw, critique
except Exception as e:
log.warning("llm.apply_collision_with_metacognition failed: %s", e)
try:
logline, concept, beats_list, raw = self.llm.apply_collision(
atom=atom,
potential=potential,
rules=rules,
tension=tension,
persona=persona_text,
product=product,
wow_anchor=wow_anchor,
ohaeng_blueprint=ohaeng_blueprint,
creative_constraints=constraints.as_prompt_block(),
emergent_refs=EMERGENT_AD_REFS,
beat_count=beat_count,
duration=duration,
)
return logline, concept, beats_list, raw, None
except Exception as e:
log.warning("llm.apply_collision failed: %s", e)
# Template fallback (no LLM): produce a placeholder treatment
rule_fragments = " | ".join(r.llm_prompt_fragment for r in rules)
placeholder_beats = [
{"shot": "", "action": "", "sound": "", "duration": "", "content": ""}
for _ in range(beat_count)
]
return (
"",
f"[{atom.name}] '{potential}' → "
f"{tension.greimas_square.S1} vs {tension.greimas_square.S2} 충돌 "
f"({persona_text}). 앵커: {wow_anchor}. 규칙: {rule_fragments}",
placeholder_beats,
None,
None,
)
def slai_feedback(self, approved_seeds: list[ScoredSeed]) -> str | None:
if self.llm is None or not approved_seeds:
return None
try:
return self.llm.propose_new_rule(approved_seeds)
except Exception as e:
log.warning("slai_feedback failed: %s", e)
return None
__all__ = ["AdSeed", "BeatModel", "ScoredSeed", "GenesisEngine"]