anime-gen-api / test_full_episode.py
AswinMathew's picture
Upload folder using huggingface_hub
7190fd0 verified
"""Full episode pipeline test — all 10 cuts from Chapter 1 storyboard.
Generates portraits, images, TTS, video, muxes audio, and assembles final episode.
Everything caches — safe to re-run if interrupted.
Estimated cost: ~0.45 pollen (portraits + images + video)
Estimated wall time: 30-45 minutes (mostly video generation)
"""
import asyncio
import json
import math
import os
import sys
import time
from pathlib import Path
sys.path.insert(0, ".")
from app.utils.prompt_builder import build_image_prompt, _build_identity_anchor
from app.services.pollinations import (
generate_image as poll_generate_image,
generate_video,
upload_media,
VISION_MODELS,
)
from app.services.tts import generate_tts
from app.services.ffmpeg import mux_audio, get_duration, concat_clips
OUTPUT_DIR = Path("test_episode_output")
PORTRAIT_DIR = OUTPUT_DIR / "portraits"
IMAGE_DIR = OUTPUT_DIR / "images"
AUDIO_DIR = OUTPUT_DIR / "audio"
VIDEO_DIR = OUTPUT_DIR / "video"
CLIP_DIR = OUTPUT_DIR / "clips"
# ============================================================
# Character data
# ============================================================
CHARACTERS = {
"Ye Chen": {
"visual_prompt": (
"young adult male, jet black short messy hair with side-swept bangs, "
"deep amber eyes, fair skin, athletic build, "
"wearing white inner sect disciple robes with silver trim, "
"determined intense expression"
),
"role": "protagonist",
},
"Gu Changge": {
"visual_prompt": (
"young adult male, long flowing silver-white hair, "
"cold piercing violet eyes, pale ivory skin, tall elegant build, "
"wearing luxurious dark purple and gold noble robes with intricate embroidery, "
"calm indifferent expression"
),
"role": "antagonist",
},
"Taixuan Holy Lord": {
"visual_prompt": (
"middle-aged male, dark brown hair tied in a topknot, "
"golden glowing eyes, warm bronze skin, imposing muscular build, "
"wearing ornate golden and white holy lord ceremonial robes with dragon motifs, "
"stern authoritative expression"
),
"role": "supporting",
},
"Holy Maiden": {
"visual_prompt": (
"young adult female, long flowing black hair with jade hairpin, "
"autumn water-like gentle brown eyes, porcelain skin, slim graceful build, "
"wearing elegant green silk dress with fluttering sleeves, "
"ethereal calm expression like a fairy"
),
"role": "supporting",
},
}
# Character name -> visual_prompt (flat dict for build_image_prompt)
CHAR_VISUALS = {name: data["visual_prompt"] for name, data in CHARACTERS.items()}
# ============================================================
# Voice configs (Edge TTS)
# ============================================================
VOICE_CONFIGS = {
"Ye Chen": {"voice_name": "en-US-AndrewNeural", "rate": "+5%", "pitch": "+2Hz"},
"Narrator": {"voice_name": "en-US-GuyNeural", "rate": "-5%", "pitch": "-3Hz"},
"Taixuan Holy Lord": {"voice_name": "en-US-RogerNeural", "rate": "-10%", "pitch": "-8Hz"},
}
# ============================================================
# Model config
# ============================================================
PORTRAIT_MODEL = "klein-large"
PORTRAIT_SEED = 42
IMAGE_SEED = 42
CHARACTER_IMAGE_MODEL = "klein-large" # For cuts with focal character + portrait ref
GENERIC_IMAGE_MODEL = "grok-imagine" # For cuts without characters (wide/establishing)
VIDEO_MODEL = "grok-video"
API_DELAY = 7 # seconds between Pollinations API calls (rate limit safety)
# ============================================================
# Portrait prompt template
# ============================================================
from app.utils.prompt_builder import MANHWA_STYLE_PREFIX
PORTRAIT_PROMPT = (
"{style_prefix}, character portrait sheet, front-facing bust shot, "
"{visual_prompt}, clean white background, reference sheet style, "
"sharp details, no background elements, studio lighting, "
"high detail face and eyes, character design reference"
)
# Track costs
total_cost = 0.0
def log_cost(label: str, amount: float):
global total_cost
total_cost += amount
print(f" [COST] +{amount:.4f} pollen ({label}) | Running total: {total_cost:.4f}")
# ============================================================
# Stage 1: Portrait Generation
# ============================================================
async def generate_portraits(storyboard: dict) -> dict[str, str]:
"""Generate reference portraits for characters that appear as focal.
Returns {character_name: portrait_url}."""
print("\n" + "=" * 70)
print("STAGE 1: CHARACTER PORTRAITS")
print("=" * 70)
# Find which characters appear as focal in any cut
focal_chars = set()
for scene in storyboard.get("scenes", []):
for cut in scene.get("cuts", scene.get("shots", [])):
focal = cut.get("focal_character")
if focal and focal in CHARACTERS:
focal_chars.add(focal)
print(f" Focal characters needing portraits: {sorted(focal_chars)}")
portrait_urls = {}
PORTRAIT_DIR.mkdir(parents=True, exist_ok=True)
for i, name in enumerate(sorted(focal_chars)):
portrait_path = str(PORTRAIT_DIR / f"{name.replace(' ', '_')}_portrait.png")
url_cache_path = portrait_path + ".url"
# Check cache
if Path(url_cache_path).exists():
url = Path(url_cache_path).read_text().strip()
portrait_urls[name] = url
print(f"\n [{i+1}/{len(focal_chars)}] {name}: CACHED ({url[:60]}...)")
continue
print(f"\n [{i+1}/{len(focal_chars)}] {name}:")
char_data = CHARACTERS[name]
# Generate portrait image
if Path(portrait_path).exists():
print(f" Image CACHED: {portrait_path}")
else:
prompt = PORTRAIT_PROMPT.format(
style_prefix=MANHWA_STYLE_PREFIX,
visual_prompt=char_data["visual_prompt"],
)
print(f" Generating portrait ({PORTRAIT_MODEL}, seed={PORTRAIT_SEED})...")
t0 = time.time()
await poll_generate_image(
prompt=prompt,
output_path=portrait_path,
model=PORTRAIT_MODEL,
width=768,
height=1024,
seed=PORTRAIT_SEED,
)
elapsed = time.time() - t0
print(f" OK - portrait saved ({elapsed:.1f}s)")
log_cost(f"portrait {name}", 0.012)
await asyncio.sleep(API_DELAY)
# Upload to get permanent URL
print(f" Uploading to media.pollinations.ai...")
t0 = time.time()
url = await upload_media(portrait_path)
elapsed = time.time() - t0
print(f" URL: {url[:60]}... ({elapsed:.1f}s)")
# Cache the URL
Path(url_cache_path).write_text(url)
portrait_urls[name] = url
print(f"\n Portraits done: {len(portrait_urls)} characters")
return portrait_urls
# ============================================================
# Stage 2: Image Generation
# ============================================================
async def generate_images(
all_cuts: list[dict],
portrait_urls: dict[str, str],
) -> None:
"""Generate keyframe images for all cuts."""
print("\n" + "=" * 70)
print("STAGE 2: IMAGE GENERATION")
print("=" * 70)
IMAGE_DIR.mkdir(parents=True, exist_ok=True)
total = len(all_cuts)
for i, cut in enumerate(all_cuts):
cut_id = cut["cut_id"]
image_path = str(IMAGE_DIR / f"{cut_id}.png")
cut["_image_path"] = image_path
if Path(image_path).exists():
print(f"\n [{i+1}/{total}] {cut_id}: CACHED")
continue
# Determine model based on focal character
focal = cut.get("focal_character")
shot_type = cut.get("shot_type", "medium")
ref_url = None
if focal and focal in portrait_urls and shot_type not in ("establishing", "wide", "birds_eye"):
model = CHARACTER_IMAGE_MODEL
ref_url = portrait_urls[focal]
model_label = f"{model} + portrait ref"
else:
model = GENERIC_IMAGE_MODEL
model_label = model
# Build prompt
prompt = build_image_prompt(cut, CHAR_VISUALS)
print(f"\n [{i+1}/{total}] {cut_id} ({shot_type}, {model_label}):")
print(f" Prompt ({len(prompt)} chars): {prompt[:100]}...")
t0 = time.time()
try:
await poll_generate_image(
prompt=prompt,
output_path=image_path,
model=model,
width=1024,
height=768,
seed=IMAGE_SEED,
reference_image_url=ref_url,
)
elapsed = time.time() - t0
size_kb = Path(image_path).stat().st_size // 1024
print(f" OK - {size_kb}KB ({elapsed:.1f}s)")
cost = 0.012 if model == CHARACTER_IMAGE_MODEL else 0.0025
log_cost(f"image {cut_id}", cost)
except Exception as e:
print(f" FAILED: {e}")
# Try fallback model
fallback = "grok-imagine" if model != "grok-imagine" else "flux"
print(f" Retrying with {fallback}...")
try:
await asyncio.sleep(3)
await poll_generate_image(
prompt=prompt,
output_path=image_path,
model=fallback,
width=1024,
height=768,
seed=IMAGE_SEED,
)
elapsed = time.time() - t0
print(f" OK (fallback) - ({elapsed:.1f}s)")
cost = 0.0025 if fallback == "grok-imagine" else 0.0
log_cost(f"image {cut_id} fallback", cost)
except Exception as e2:
print(f" FALLBACK ALSO FAILED: {e2}")
cut["_image_path"] = None
await asyncio.sleep(API_DELAY)
# ============================================================
# Stage 3: TTS Generation
# ============================================================
async def generate_all_tts(all_cuts: list[dict]) -> None:
"""Generate TTS audio for all cuts."""
print("\n" + "=" * 70)
print("STAGE 3: TTS GENERATION (Edge TTS - FREE)")
print("=" * 70)
AUDIO_DIR.mkdir(parents=True, exist_ok=True)
total = len(all_cuts)
for i, cut in enumerate(all_cuts):
cut_id = cut["cut_id"]
audio_path = str(AUDIO_DIR / f"{cut_id}.mp3")
cut["_audio_path"] = audio_path
dialogue = cut.get("dialogue", {})
text = dialogue.get("text") or ""
if not text.strip():
print(f" [{i+1}/{total}] {cut_id}: No dialogue, skipping TTS")
cut["_audio_path"] = None
cut["_tts_duration"] = 0.0
continue
if Path(audio_path).exists():
dur = await get_duration(audio_path)
cut["_tts_duration"] = dur
print(f" [{i+1}/{total}] {cut_id}: CACHED ({dur:.2f}s)")
continue
speaker = dialogue.get("speaker", "Narrator")
voice_config = VOICE_CONFIGS.get(speaker, VOICE_CONFIGS["Narrator"])
emotion = dialogue.get("emotion", "neutral")
print(f" [{i+1}/{total}] {cut_id}: {speaker} ({emotion}) - \"{text[:60]}...\"")
t0 = time.time()
try:
result = await generate_tts(
text=text,
output_path=audio_path,
voice_name=voice_config["voice_name"],
rate=voice_config.get("rate", "+0%"),
pitch=voice_config.get("pitch", "+0Hz"),
emotion=emotion,
)
dur = result["duration_sec"]
# Fallback: measure with ffprobe if timestamps failed
if dur < 0.1:
dur = await get_duration(audio_path)
cut["_tts_duration"] = dur
elapsed = time.time() - t0
print(f" OK - {dur:.2f}s ({elapsed:.1f}s)")
except Exception as e:
print(f" FAILED: {e}")
cut["_audio_path"] = None
cut["_tts_duration"] = 0.0
# ============================================================
# Stage 4: Video Generation (img2vid)
# ============================================================
async def generate_all_videos(all_cuts: list[dict]) -> None:
"""Generate video clips for all cuts via grok-video img2vid."""
print("\n" + "=" * 70)
print("STAGE 4: VIDEO GENERATION (grok-video img2vid)")
print("This will take 20-30 minutes. Each clip needs 2-3 min to generate.")
print("=" * 70)
VIDEO_DIR.mkdir(parents=True, exist_ok=True)
total = len(all_cuts)
# We need to upload each image first, then generate video
for i, cut in enumerate(all_cuts):
cut_id = cut["cut_id"]
image_path = cut.get("_image_path")
silent_path = str(VIDEO_DIR / f"{cut_id}_silent.mp4")
cut["_silent_video_path"] = silent_path
if not image_path or not Path(image_path).exists():
print(f"\n [{i+1}/{total}] {cut_id}: SKIPPED (no keyframe image)")
cut["_silent_video_path"] = None
continue
if Path(silent_path).exists():
dur = await get_duration(silent_path)
print(f"\n [{i+1}/{total}] {cut_id}: CACHED ({dur:.2f}s)")
continue
# Calculate video duration from TTS
tts_dur = cut.get("_tts_duration", 0.0)
storyboard_dur = cut.get("duration_sec", 3.0)
# Use TTS duration if available, otherwise storyboard estimate
target_dur = tts_dur if tts_dur > 0.5 else storyboard_dur
# Clamp to grok-video limits (1-10s)
video_dur = max(1, min(math.ceil(target_dur), 10))
# Build video prompt
video_prompt_parts = []
if cut.get("video_prompt"):
video_prompt_parts.append(cut["video_prompt"])
if cut.get("action_description"):
video_prompt_parts.append(cut["action_description"])
video_prompt = ", ".join(video_prompt_parts) if video_prompt_parts else "subtle idle animation"
print(f"\n [{i+1}/{total}] {cut_id} (requesting {video_dur}s, TTS={tts_dur:.1f}s):")
print(f" Video prompt: {video_prompt[:100]}...")
# Upload keyframe
t0 = time.time()
print(f" Uploading keyframe...")
try:
image_url = await upload_media(image_path)
except Exception as e:
print(f" Upload FAILED: {e}")
cut["_silent_video_path"] = None
continue
# Generate video
print(f" Generating video (this takes 2-3 minutes)...")
try:
await generate_video(
prompt=video_prompt,
output_path=silent_path,
model=VIDEO_MODEL,
duration=video_dur,
image_url=image_url,
)
elapsed = time.time() - t0
file_size = Path(silent_path).stat().st_size
actual_dur = await get_duration(silent_path)
print(f" OK - {file_size // 1024}KB, {actual_dur:.2f}s actual ({elapsed:.1f}s)")
log_cost(f"video {cut_id} ({video_dur}s)", 0.003 * video_dur)
except Exception as e:
print(f" FAILED: {e}")
cut["_silent_video_path"] = None
await asyncio.sleep(API_DELAY)
# ============================================================
# Stage 5: Audio Mux
# ============================================================
async def mux_all_audio(all_cuts: list[dict]) -> None:
"""Mux TTS audio into each video clip."""
print("\n" + "=" * 70)
print("STAGE 5: AUDIO MUX (FFmpeg)")
print("=" * 70)
CLIP_DIR.mkdir(parents=True, exist_ok=True)
total = len(all_cuts)
for i, cut in enumerate(all_cuts):
cut_id = cut["cut_id"]
silent_path = cut.get("_silent_video_path")
audio_path = cut.get("_audio_path")
clip_path = str(CLIP_DIR / f"{cut_id}.mp4")
cut["_clip_path"] = clip_path
if Path(clip_path).exists():
dur = await get_duration(clip_path)
print(f" [{i+1}/{total}] {cut_id}: CACHED ({dur:.2f}s)")
continue
if not silent_path or not Path(silent_path).exists():
print(f" [{i+1}/{total}] {cut_id}: SKIPPED (no video)")
cut["_clip_path"] = None
continue
video_dur = await get_duration(silent_path)
audio_dur = await get_duration(audio_path) if audio_path and Path(audio_path).exists() else 0.0
strategy = "simple remux"
if audio_dur > video_dur + 0.5:
slowdown = min(audio_dur / video_dur, 3.0)
strategy = f"slow-mo {slowdown:.1f}x"
if audio_dur > video_dur * 3.0 + 0.3:
strategy += f" + freeze {audio_dur - video_dur * 3.0:.1f}s"
elif video_dur > audio_dur + 0.5:
strategy = "trim to audio (-shortest)"
print(f" [{i+1}/{total}] {cut_id}: video={video_dur:.2f}s, audio={audio_dur:.2f}s -> {strategy}")
try:
tts_dur = cut.get("_tts_duration", 0.0)
await mux_audio(
video_path=silent_path,
audio_path=audio_path,
output_path=clip_path,
duration_sec=tts_dur if tts_dur > 0 else None,
)
final_dur = await get_duration(clip_path)
print(f" OK - {final_dur:.2f}s final")
except Exception as e:
print(f" FAILED: {e}")
cut["_clip_path"] = None
# ============================================================
# Stage 6: Assembly
# ============================================================
async def assemble_episode(all_cuts: list[dict], storyboard: dict) -> str:
"""Concatenate all clips into the final episode."""
print("\n" + "=" * 70)
print("STAGE 6: FINAL ASSEMBLY")
print("=" * 70)
episode_path = str(OUTPUT_DIR / "episode_final.mp4")
# Collect valid clips
clip_paths = []
transitions = []
for cut in all_cuts:
clip_path = cut.get("_clip_path")
if clip_path and Path(clip_path).exists():
clip_paths.append(clip_path)
# Use cut's transition_out for next boundary
trans_out = cut.get("transition_out", "cut")
transitions.append(trans_out)
if not clip_paths:
print(" ERROR: No clips to assemble!")
return ""
print(f" Clips: {len(clip_paths)}/{len(all_cuts)}")
for cp in clip_paths:
dur = await get_duration(cp)
print(f" {Path(cp).stem}: {dur:.2f}s")
# Transitions: first clip has no transition before it
# transitions[i] = transition AFTER clip i (before clip i+1)
# For concat_clips, transitions[i] = transition BEFORE clip i
# Shift: transitions_for_concat[0] = scene's transition_in, rest follow transition_out of previous
scene_trans_in = storyboard["scenes"][0].get("transition_in", "fade_black")
trans_for_concat = [scene_trans_in] # Before first clip
for j in range(len(clip_paths) - 1):
trans_for_concat.append(transitions[j]) # transition_out of clip j = transition before clip j+1
print(f"\n Transitions: {trans_for_concat}")
print(f" Assembling...")
t0 = time.time()
try:
await concat_clips(clip_paths, episode_path, trans_for_concat)
elapsed = time.time() - t0
final_dur = await get_duration(episode_path)
final_size = Path(episode_path).stat().st_size
print(f" OK - {final_dur:.2f}s, {final_size // (1024*1024):.1f}MB ({elapsed:.1f}s)")
except Exception as e:
print(f" xfade failed: {e}")
print(f" Falling back to simple concat...")
try:
from app.services.ffmpeg import _concat_simple
await _concat_simple(clip_paths, episode_path)
final_dur = await get_duration(episode_path)
final_size = Path(episode_path).stat().st_size
print(f" OK (simple) - {final_dur:.2f}s, {final_size // (1024*1024):.1f}MB")
except Exception as e2:
print(f" Simple concat also failed: {e2}")
return ""
return episode_path
# ============================================================
# Main
# ============================================================
async def main():
global total_cost
overall_start = time.time()
# Create output dirs
for d in [OUTPUT_DIR, PORTRAIT_DIR, IMAGE_DIR, AUDIO_DIR, VIDEO_DIR, CLIP_DIR]:
d.mkdir(parents=True, exist_ok=True)
print("=" * 70)
print("FULL EPISODE PIPELINE TEST")
print(f"Output: {OUTPUT_DIR.resolve()}")
print("=" * 70)
# Load storyboard
storyboard_path = Path("test_storyboard_output.json")
if not storyboard_path.exists():
print("ERROR: Run test_storyboard.py first to generate the storyboard")
return
with open(storyboard_path, "r", encoding="utf-8") as f:
storyboard = json.load(f)
print(f"\nEpisode: {storyboard.get('episode_title', 'Unknown')}")
print(f"Arc: {storyboard.get('emotional_arc', '')[:80]}...")
# Extract all cuts
all_cuts = []
for scene in storyboard.get("scenes", []):
for cut in scene.get("cuts", scene.get("shots", [])):
all_cuts.append(cut)
print(f"Total cuts: {len(all_cuts)}")
total_storyboard_dur = sum(c.get("duration_sec", 3.0) for c in all_cuts)
print(f"Storyboard duration: {total_storyboard_dur:.1f}s")
# Summary table
print(f"\n {'Cut':<6} {'Type':<18} {'Focal':<20} {'Model':<15} {'Dur':>5}")
print(f" {'-'*68}")
for cut in all_cuts:
cut_id = cut["cut_id"]
shot_type = cut.get("shot_type", "?")
focal = cut.get("focal_character") or "-"
model = CHARACTER_IMAGE_MODEL if focal != "-" and shot_type not in ("wide", "establishing", "birds_eye") else GENERIC_IMAGE_MODEL
dur = cut.get("duration_sec", 0)
print(f" {cut_id:<6} {shot_type:<18} {focal:<20} {model:<15} {dur:>4.1f}s")
# ---- Run pipeline stages ----
# Stage 1: Portraits
portrait_urls = await generate_portraits(storyboard)
# Stage 2: Images
await generate_images(all_cuts, portrait_urls)
# Stage 3: TTS (fast, free)
await generate_all_tts(all_cuts)
# Print TTS duration summary
print(f"\n TTS Duration Summary:")
total_tts_dur = 0.0
for cut in all_cuts:
tts_dur = cut.get("_tts_duration", 0.0)
sb_dur = cut.get("duration_sec", 0.0)
total_tts_dur += tts_dur
ratio = tts_dur / sb_dur if sb_dur > 0 else 0
print(f" {cut['cut_id']}: TTS={tts_dur:.2f}s vs storyboard={sb_dur:.1f}s (ratio={ratio:.1f}x)")
print(f" Total TTS: {total_tts_dur:.2f}s vs storyboard {total_storyboard_dur:.1f}s")
# Stage 4: Video (slow — 20-30 min)
await generate_all_videos(all_cuts)
# Stage 5: Audio mux
await mux_all_audio(all_cuts)
# Stage 6: Assembly
episode_path = await assemble_episode(all_cuts, storyboard)
# ---- Final Summary ----
overall_elapsed = time.time() - overall_start
print(f"\n{'=' * 70}")
print("FINAL SUMMARY")
print(f"{'=' * 70}")
print(f"\n Episode: {storyboard.get('episode_title', 'Unknown')}")
print(f" Cuts: {len(all_cuts)}")
# Clip durations
successful_clips = 0
total_clip_dur = 0.0
for cut in all_cuts:
clip_path = cut.get("_clip_path")
if clip_path and Path(clip_path).exists():
dur = await get_duration(clip_path)
total_clip_dur += dur
successful_clips += 1
print(f" Successful clips: {successful_clips}/{len(all_cuts)}")
print(f" Total clip duration: {total_clip_dur:.2f}s ({total_clip_dur/60:.1f} min)")
if episode_path and Path(episode_path).exists():
ep_dur = await get_duration(episode_path)
ep_size = Path(episode_path).stat().st_size
print(f"\n Final episode: {episode_path}")
print(f" Duration: {ep_dur:.2f}s ({ep_dur/60:.1f} min)")
print(f" File size: {ep_size / (1024*1024):.1f}MB")
print(f"\n Total pollen spent: {total_cost:.4f}")
print(f" Wall time: {overall_elapsed:.0f}s ({overall_elapsed/60:.1f} min)")
print(f"\n Output directory: {OUTPUT_DIR.resolve()}")
# Failed cuts
failed = [cut["cut_id"] for cut in all_cuts if not cut.get("_clip_path") or not Path(cut.get("_clip_path", "")).exists()]
if failed:
print(f"\n FAILED CUTS: {failed}")
print(f" Re-run this script to retry failed cuts (cached steps will be skipped)")
if episode_path:
print(f"\n Play the final episode:")
print(f" {Path(episode_path).resolve()}")
if __name__ == "__main__":
asyncio.run(main())