composer / full_pipeline.py
factorstudios's picture
Upload 13 files
4e7bafb verified
Raw
History Blame Contribute Delete
22 kB
"""
Full Pipeline: Prompt β†’ Manifest β†’ Images β†’ Selection β†’ Composition
Orchestrates the complete workflow from user prompt to final MP4 video
"""
import requests
import json
import os
from pathlib import Path
from PIL import Image
from io import BytesIO
import asyncio
import subprocess
import sys
from datetime import datetime
# ─────────────────────────────────────────────────────────────────────────
# Configuration
# ─────────────────────────────────────────────────────────────────────────
MANIFEST_SERVER = "https://factorstudios-content-gen.hf.space"
IMAGE_SERVER = "https://factorstudios-pinteresting.hf.space"
PIPELINE_DIR = Path(__file__).parent
CANDIDATES_DIR = PIPELINE_DIR / "candidates"
SELECTED_DIR = PIPELINE_DIR / "selected"
RENDERS_DIR = PIPELINE_DIR / "renders"
# ─────────────────────────────────────────────────────────────────────────
# Step 1: Generate Manifest from Prompt
# ─────────────────────────────────────────────────────────────────────────
async def step_generate_manifest(prompt: str, output_dir: Path = PIPELINE_DIR) -> dict:
"""
Call content-gen server to generate manifest from prompt.
Saves manifest to manifest_response.json
Args:
prompt (str): User prompt describing video content
output_dir (Path): Directory to save manifest
Returns:
dict: Manifest with title and scenes
"""
print("\n" + "="*70)
print(f"[STEP 1] Generating Manifest from Prompt")
print("="*70)
print(f"Prompt: {prompt[:80]}...")
try:
# Call manifest generation server
payload = {"prompt": prompt}
print(f"Calling {MANIFEST_SERVER}/generate...")
response = requests.post(
f"{MANIFEST_SERVER}/generate",
json=payload,
timeout=60
)
response.raise_for_status()
manifest = response.json()
# Save manifest to file
manifest_path = output_dir / "manifest_response.json"
with open(manifest_path, "w") as f:
json.dump(manifest, f, indent=2)
scenes = manifest.get("scenes", [])
print(f"βœ“ Generated manifest with {len(scenes)} scenes")
print(f"βœ“ Saved to {manifest_path.name}")
# Print scene details
for idx, scene in enumerate(scenes):
label = scene.get("label", f"Scene {idx}")
query = scene.get("image_query", "")
print(f" Scene {idx}: {label} (query: '{query[:30]}...')")
return manifest
except Exception as e:
print(f"βœ— Failed to generate manifest: {e}")
raise
# ─────────────────────────────────────────────────────────────────────────
# Step 2: Download Images for Each Scene
# ─────────────────────────────────────────────────────────────────────────
async def step_download_images(
manifest: dict,
output_dir: Path = CANDIDATES_DIR,
images_per_scene: int = 5
) -> int:
"""
Download images from pinteresting server for each scene in manifest.
IMPORTANT: Downloads image for TITLE (scene 0) + all scenes in manifest.scenes
Follows the pattern from test_api.py
Args:
manifest (dict): Manifest with title and scenes
output_dir (Path): Base directory to organize images
images_per_scene (int): Number of images per scene
Returns:
int: Total number of images downloaded
"""
print("\n" + "="*70)
print(f"[STEP 2] Downloading Images (Title + Scenes)")
print("="*70)
# Clear and recreate candidates directory
if output_dir.exists():
import shutil
shutil.rmtree(output_dir)
output_dir.mkdir(parents=True, exist_ok=True)
total_downloaded = 0
# STEP 2.0: Download image for TITLE (becomes scene_0)
title = manifest.get("title", "")
if title:
scene_dir = output_dir / "scene_0"
scene_dir.mkdir(parents=True, exist_ok=True)
print(f"\n[Scene 0] {title} (TITLE/INTRO)")
print(f" Query: {title}")
try:
payload = {
"keyword": title,
"count": images_per_scene
}
print(f" Calling {IMAGE_SERVER}/scrape...")
response = requests.post(
f"{IMAGE_SERVER}/scrape",
json=payload,
timeout=60
)
response.raise_for_status()
data = response.json()
images = data.get("images", [])
print(f" Downloaded {len(images)} images")
for img_idx, img_url in enumerate(images):
try:
img_response = requests.get(img_url, timeout=30)
if img_response.status_code == 200:
img_path = scene_dir / f"candidate_{img_idx:02d}.jpg"
with open(img_path, "wb") as f:
f.write(img_response.content)
total_downloaded += 1
except Exception as e:
print(f" ⚠ Failed to save image {img_idx}: {e}")
except Exception as e:
print(f" ⚠ Error downloading images for title: {e}")
# STEP 2.1: Download images for each content scene (becomes scene_1, scene_2, etc)
scenes = manifest.get("scenes", [])
for scene_idx, scene in enumerate(scenes):
actual_idx = scene_idx + 1 # scene_1, scene_2, etc (title is scene_0)
scene_label = scene.get("label", f"Scene {actual_idx}")
image_query = scene.get("image_query", "")
if not image_query:
print(f"\n[Scene {actual_idx}] ⚠ No image query found, skipping...")
continue
# Create scene-specific folder
scene_dir = output_dir / f"scene_{actual_idx}"
scene_dir.mkdir(parents=True, exist_ok=True)
print(f"\n[Scene {actual_idx}] {scene_label}")
print(f" Query: {image_query}")
# Fetch images from pinteresting API
try:
payload = {
"keyword": image_query,
"count": images_per_scene
}
print(f" Calling {IMAGE_SERVER}/scrape...")
response = requests.post(
f"{IMAGE_SERVER}/scrape",
json=payload,
timeout=60
)
response.raise_for_status()
data = response.json()
if data.get("success"):
images = data.get("images", [])
print(f" βœ“ Found {len(images)} images")
# Download each image
for img_idx, img_data in enumerate(images):
img_url = img_data.get("url")
if not img_url:
continue
try:
# Download image
img_response = requests.get(img_url, timeout=15)
img_response.raise_for_status()
# Verify it's a valid image
img = Image.open(BytesIO(img_response.content))
# Save image
file_name = f"candidate_{img_idx:02d}.jpg"
file_path = scene_dir / file_name
with open(file_path, "wb") as f:
f.write(img_response.content)
size_kb = len(img_response.content) / 1024
dims = f"{img_data.get('width', '?')}x{img_data.get('height', '?')}"
print(f" βœ“ {file_name} ({dims}, {size_kb:.0f}KB)")
total_downloaded += 1
except Exception as e:
print(f" βœ— Image {img_idx} failed: {e}")
else:
print(f" βœ— API Error: {data.get('message')}")
except Exception as e:
print(f" βœ— Request failed: {e}")
print(f"\nβœ“ Downloaded {total_downloaded} images total")
return total_downloaded
# ─────────────────────────────────────────────────────────────────────────
# Step 3: Select Best Image from Each Scene's Candidates
# ─────────────────────────────────────────────────────────────────────────
async def step_select_scenes(manifest: dict, candidates_dir: Path = CANDIDATES_DIR) -> dict:
"""
Select best image from each scene's candidate folder.
IMPORTANT: Selects from TITLE (scene_0) + all scenes in manifest.scenes
Evaluates by file size (largest = best quality).
Args:
manifest (dict): Manifest with scene count
candidates_dir (Path): Directory with candidate images
Returns:
dict: Selection results
"""
print("\n" + "="*70)
print(f"[STEP 3] Selecting Best Images from Candidates")
print("="*70)
# Ensure selected directory exists
SELECTED_DIR.mkdir(parents=True, exist_ok=True)
scenes = manifest.get("scenes", [])
selected_count = 0
# Select from scene_0 (title) through scene_N (content scenes)
# Total scenes = len(scenes) + 1 (for title as scene_0)
total_scene_count = len(scenes) + 1
for scene_idx in range(total_scene_count):
scene_folder = candidates_dir / f"scene_{scene_idx}"
if not scene_folder.exists():
if scene_idx == 0:
print(f"[Scene {scene_idx}] βœ— No candidates found (TITLE)")
else:
print(f"[Scene {scene_idx}] βœ— No candidates found")
continue
# Find largest image (best quality)
images = list(scene_folder.glob("*.jpg"))
if not images:
if scene_idx == 0:
print(f"[Scene {scene_idx}] βœ— No JPEG images found (TITLE)")
else:
print(f"[Scene {scene_idx}] βœ— No JPEG images found")
continue
best_img = max(images, key=lambda p: p.stat().st_size)
size_kb = best_img.stat().st_size / 1024
# Copy to selected folder
selected_path = SELECTED_DIR / f"scene_{scene_idx:02d}.jpg"
import shutil
shutil.copy2(best_img, selected_path)
if scene_idx == 0:
print(f"[Scene {scene_idx}] βœ“ Selected {best_img.name} ({size_kb:.0f}KB) [TITLE]")
else:
print(f"[Scene {scene_idx}] βœ“ Selected {best_img.name} ({size_kb:.0f}KB)")
selected_count += 1
print(f"\nβœ“ Selected {selected_count} images ({total_scene_count} total: title + {len(scenes)} scenes)")
return {
"status": "success",
"selected": selected_count,
"total": total_scene_count
}
# ─────────────────────────────────────────────────────────────────────────
# Step 4: Compose Video with Selected Images and Manifest
# ─────────────────────────────────────────────────────────────────────────
async def step_compose_video(manifest: dict) -> dict:
"""
Compose final video using selected images and manifest labels.
Calls the FastAPI /compose endpoint which handles scene config generation.
Args:
manifest (dict): Manifest with title and scenes
Returns:
dict: Composition results with video path and metadata
"""
print("\n" + "="*70)
print(f"[STEP 4] Composing Video from Selected Images")
print("="*70)
scenes = manifest.get("scenes", [])
selected_images = sorted(SELECTED_DIR.glob("scene_*.jpg"))
print(f"Manifest title: {manifest.get('title', 'Untitled')}")
print(f"Selected images: {len(selected_images)}")
print(f"Required images: {len(scenes) + 1} (title + {len(scenes)} scenes)")
# Expected: title + all scenes
expected_images = len(scenes) + 1
if len(selected_images) != expected_images:
raise Exception(
f"Image count mismatch: expected {expected_images}, "
f"found {len(selected_images)}"
)
# Call the FastAPI /compose endpoint
print(f"\nCalling /compose endpoint...")
try:
payload = {
"title": manifest.get("title", "Untitled"),
"scenes": [
{
"label": s.get("label", f"Scene {idx}"),
"image_query": s.get("image_query", "")
}
for idx, s in enumerate(scenes)
]
}
response = requests.post(
f"http://localhost:7860/compose",
json=payload,
timeout=300
)
if response.status_code != 200:
error_data = response.json() if response.headers.get("content-type") == "application/json" else response.text
print(f"βœ— Server returned {response.status_code}: {error_data}")
raise Exception(f"Compose endpoint failed: {error_data}")
# Check if response is binary (video file) or JSON
if response.headers.get("content-type", "").startswith("video"):
# Save video file
output_path = PIPELINE_DIR / "output_video.mp4"
with open(output_path, "wb") as f:
f.write(response.content)
size_mb = output_path.stat().st_size / (1024 * 1024)
print(f"βœ“ Video saved: {output_path.name} ({size_mb:.2f}MB)")
return {
"status": "success",
"video_path": str(output_path),
"size_mb": size_mb,
"scenes": len(scenes) + 1
}
else:
# Response is JSON (might be error or status)
data = response.json()
if data.get("status") == "success":
print(f"βœ“ Compose completed successfully")
return data
else:
raise Exception(f"Compose failed: {data.get('message', 'Unknown error')}")
except Exception as e:
print(f"βœ— Composition failed: {e}")
raise
# Generate dynamic SCENE_CONFIG from manifest
print(f"\nGenerating scene configuration...")
try:
payload = {
"title": manifest.get("title", "Untitled"),
"scenes": [
{
"label": s.get("label", f"Scene {idx}"),
"image_query": s.get("image_query", "")
}
for idx, s in enumerate(scenes)
]
}
response = requests.post(
f"http://localhost:7860/compose",
json=payload,
timeout=300
)
if response.status_code != 200:
error_data = response.json() if response.headers.get("content-type") == "application/json" else response.text
print(f"βœ— Server returned {response.status_code}: {error_data}")
raise Exception(f"Compose endpoint failed: {error_data}")
# Check if response is binary (video file) or JSON
if response.headers.get("content-type", "").startswith("video"):
# Save video file
output_path = PIPELINE_DIR / "output_video.mp4"
with open(output_path, "wb") as f:
f.write(response.content)
size_mb = output_path.stat().st_size / (1024 * 1024)
print(f"βœ“ Video saved: {output_path.name} ({size_mb:.2f}MB)")
return {
"status": "success",
"video_path": str(output_path),
"size_mb": size_mb,
"scenes": len(scenes) + 1
}
else:
# Response is JSON (might be error or status)
data = response.json()
if data.get("status") == "success":
print(f"βœ“ Compose completed successfully")
return data
else:
raise Exception(f"Compose failed: {data.get('message', 'Unknown error')}")
except Exception as e:
print(f"βœ— Composition failed: {e}")
raise
# ─────────────────────────────────────────────────────────────────────────
# Main Pipeline Orchestrator
# ─────────────────────────────────────────────────────────────────────────
async def generate_video_from_prompt(prompt: str) -> dict:
"""
Complete pipeline: Prompt β†’ Manifest β†’ Images β†’ Selection β†’ Video
Args:
prompt (str): User prompt describing video content
Returns:
dict: Final result with video path or error
"""
try:
# Step 1: Generate manifest from prompt
manifest = await step_generate_manifest(prompt)
# Step 2: Download images for each scene
downloaded = await step_download_images(manifest)
if downloaded == 0:
raise Exception("No images were downloaded")
# Step 3: Select best images from candidates
selection = await step_select_scenes(manifest)
if selection["selected"] != selection["total"]:
raise Exception(
f"Selection incomplete: {selection['selected']}/{selection['total']}"
)
# Step 4: Compose final video
composition = await step_compose_video(manifest)
# Success!
print("\n" + "="*70)
print("[SUCCESS] Pipeline Complete!")
print("="*70)
print(f"Title: {manifest.get('title', 'Untitled')}")
print(f"Scenes: {len(manifest.get('scenes', []))}")
print(f"Video: {composition['output_path']}")
print(f"Size: {composition['size_mb']:.1f}MB")
print("="*70)
return {
"status": "success",
"message": "Video generated successfully",
"title": manifest.get("title"),
"scenes": len(manifest.get("scenes", [])),
"output_path": composition["output_path"],
"size_mb": composition["size_mb"],
}
except Exception as e:
print("\n" + "="*70)
print(f"[ERROR] Pipeline Failed: {e}")
print("="*70)
return {
"status": "error",
"message": str(e),
"output_path": None,
}
# ─────────────────────────────────────────────────────────────────────────
# Local Testing
# ─────────────────────────────────────────────────────────────────────────
if __name__ == "__main__":
import sys
if len(sys.argv) > 1:
prompt = " ".join(sys.argv[1:])
else:
prompt = "A motivational video about personal growth and success"
# Ensure directories exist
PIPELINE_DIR.mkdir(exist_ok=True)
RENDERS_DIR.mkdir(exist_ok=True)
# Run pipeline
result = asyncio.run(generate_video_from_prompt(prompt))
# Print final status
if result["status"] == "success":
print(f"\nβœ“ Video saved to: {result['output_path']}")
sys.exit(0)
else:
print(f"\nβœ— Error: {result['message']}")
sys.exit(1)