""" Full Pipeline: Prompt → Manifest → Images → Selection → Composition Orchestrates the complete workflow from user prompt to final MP4 video """ import requests import json import os from pathlib import Path from PIL import Image from io import BytesIO import asyncio import subprocess import sys from datetime import datetime # ───────────────────────────────────────────────────────────────────────── # Configuration # ───────────────────────────────────────────────────────────────────────── MANIFEST_SERVER = "https://factorstudios-content-gen.hf.space" IMAGE_SERVER = "https://factorstudios-pinteresting.hf.space" PIPELINE_DIR = Path(__file__).parent CANDIDATES_DIR = PIPELINE_DIR / "candidates" SELECTED_DIR = PIPELINE_DIR / "selected" RENDERS_DIR = PIPELINE_DIR / "renders" # ───────────────────────────────────────────────────────────────────────── # Step 1: Generate Manifest from Prompt # ───────────────────────────────────────────────────────────────────────── async def step_generate_manifest(prompt: str, output_dir: Path = PIPELINE_DIR) -> dict: """ Call content-gen server to generate manifest from prompt. Saves manifest to manifest_response.json Args: prompt (str): User prompt describing video content output_dir (Path): Directory to save manifest Returns: dict: Manifest with title and scenes """ print("\n" + "="*70) print(f"[STEP 1] Generating Manifest from Prompt") print("="*70) print(f"Prompt: {prompt[:80]}...") try: # Call manifest generation server payload = {"prompt": prompt} print(f"Calling {MANIFEST_SERVER}/generate...") response = requests.post( f"{MANIFEST_SERVER}/generate", json=payload, timeout=60 ) response.raise_for_status() manifest = response.json() # Save manifest to file manifest_path = output_dir / "manifest_response.json" with open(manifest_path, "w") as f: json.dump(manifest, f, indent=2) scenes = manifest.get("scenes", []) print(f"✓ Generated manifest with {len(scenes)} scenes") print(f"✓ Saved to {manifest_path.name}") # Print scene details for idx, scene in enumerate(scenes): label = scene.get("label", f"Scene {idx}") query = scene.get("image_query", "") print(f" Scene {idx}: {label} (query: '{query[:30]}...')") return manifest except Exception as e: print(f"✗ Failed to generate manifest: {e}") raise # ───────────────────────────────────────────────────────────────────────── # Step 2: Download Images for Each Scene # ───────────────────────────────────────────────────────────────────────── async def step_download_images( manifest: dict, output_dir: Path = CANDIDATES_DIR, images_per_scene: int = 5 ) -> int: """ Download images from pinteresting server for each scene in manifest. IMPORTANT: Downloads image for TITLE (scene 0) + all scenes in manifest.scenes Follows the pattern from test_api.py Args: manifest (dict): Manifest with title and scenes output_dir (Path): Base directory to organize images images_per_scene (int): Number of images per scene Returns: int: Total number of images downloaded """ print("\n" + "="*70) print(f"[STEP 2] Downloading Images (Title + Scenes)") print("="*70) # Clear and recreate candidates directory if output_dir.exists(): import shutil shutil.rmtree(output_dir) output_dir.mkdir(parents=True, exist_ok=True) total_downloaded = 0 # STEP 2.0: Download image for TITLE (becomes scene_0) title = manifest.get("title", "") if title: scene_dir = output_dir / "scene_0" scene_dir.mkdir(parents=True, exist_ok=True) print(f"\n[Scene 0] {title} (TITLE/INTRO)") print(f" Query: {title}") try: payload = { "keyword": title, "count": images_per_scene } print(f" Calling {IMAGE_SERVER}/scrape...") response = requests.post( f"{IMAGE_SERVER}/scrape", json=payload, timeout=60 ) response.raise_for_status() data = response.json() images = data.get("images", []) print(f" Downloaded {len(images)} images") for img_idx, img_url in enumerate(images): try: img_response = requests.get(img_url, timeout=30) if img_response.status_code == 200: img_path = scene_dir / f"candidate_{img_idx:02d}.jpg" with open(img_path, "wb") as f: f.write(img_response.content) total_downloaded += 1 except Exception as e: print(f" ⚠ Failed to save image {img_idx}: {e}") except Exception as e: print(f" ⚠ Error downloading images for title: {e}") # STEP 2.1: Download images for each content scene (becomes scene_1, scene_2, etc) scenes = manifest.get("scenes", []) for scene_idx, scene in enumerate(scenes): actual_idx = scene_idx + 1 # scene_1, scene_2, etc (title is scene_0) scene_label = scene.get("label", f"Scene {actual_idx}") image_query = scene.get("image_query", "") if not image_query: print(f"\n[Scene {actual_idx}] ⚠ No image query found, skipping...") continue # Create scene-specific folder scene_dir = output_dir / f"scene_{actual_idx}" scene_dir.mkdir(parents=True, exist_ok=True) print(f"\n[Scene {actual_idx}] {scene_label}") print(f" Query: {image_query}") # Fetch images from pinteresting API try: payload = { "keyword": image_query, "count": images_per_scene } print(f" Calling {IMAGE_SERVER}/scrape...") response = requests.post( f"{IMAGE_SERVER}/scrape", json=payload, timeout=60 ) response.raise_for_status() data = response.json() if data.get("success"): images = data.get("images", []) print(f" ✓ Found {len(images)} images") # Download each image for img_idx, img_data in enumerate(images): img_url = img_data.get("url") if not img_url: continue try: # Download image img_response = requests.get(img_url, timeout=15) img_response.raise_for_status() # Verify it's a valid image img = Image.open(BytesIO(img_response.content)) # Save image file_name = f"candidate_{img_idx:02d}.jpg" file_path = scene_dir / file_name with open(file_path, "wb") as f: f.write(img_response.content) size_kb = len(img_response.content) / 1024 dims = f"{img_data.get('width', '?')}x{img_data.get('height', '?')}" print(f" ✓ {file_name} ({dims}, {size_kb:.0f}KB)") total_downloaded += 1 except Exception as e: print(f" ✗ Image {img_idx} failed: {e}") else: print(f" ✗ API Error: {data.get('message')}") except Exception as e: print(f" ✗ Request failed: {e}") print(f"\n✓ Downloaded {total_downloaded} images total") return total_downloaded # ───────────────────────────────────────────────────────────────────────── # Step 3: Select Best Image from Each Scene's Candidates # ───────────────────────────────────────────────────────────────────────── async def step_select_scenes(manifest: dict, candidates_dir: Path = CANDIDATES_DIR) -> dict: """ Select best image from each scene's candidate folder. IMPORTANT: Selects from TITLE (scene_0) + all scenes in manifest.scenes Evaluates by file size (largest = best quality). Args: manifest (dict): Manifest with scene count candidates_dir (Path): Directory with candidate images Returns: dict: Selection results """ print("\n" + "="*70) print(f"[STEP 3] Selecting Best Images from Candidates") print("="*70) # Ensure selected directory exists SELECTED_DIR.mkdir(parents=True, exist_ok=True) scenes = manifest.get("scenes", []) selected_count = 0 # Select from scene_0 (title) through scene_N (content scenes) # Total scenes = len(scenes) + 1 (for title as scene_0) total_scene_count = len(scenes) + 1 for scene_idx in range(total_scene_count): scene_folder = candidates_dir / f"scene_{scene_idx}" if not scene_folder.exists(): if scene_idx == 0: print(f"[Scene {scene_idx}] ✗ No candidates found (TITLE)") else: print(f"[Scene {scene_idx}] ✗ No candidates found") continue # Find largest image (best quality) images = list(scene_folder.glob("*.jpg")) if not images: if scene_idx == 0: print(f"[Scene {scene_idx}] ✗ No JPEG images found (TITLE)") else: print(f"[Scene {scene_idx}] ✗ No JPEG images found") continue best_img = max(images, key=lambda p: p.stat().st_size) size_kb = best_img.stat().st_size / 1024 # Copy to selected folder selected_path = SELECTED_DIR / f"scene_{scene_idx:02d}.jpg" import shutil shutil.copy2(best_img, selected_path) if scene_idx == 0: print(f"[Scene {scene_idx}] ✓ Selected {best_img.name} ({size_kb:.0f}KB) [TITLE]") else: print(f"[Scene {scene_idx}] ✓ Selected {best_img.name} ({size_kb:.0f}KB)") selected_count += 1 print(f"\n✓ Selected {selected_count} images ({total_scene_count} total: title + {len(scenes)} scenes)") return { "status": "success", "selected": selected_count, "total": total_scene_count } # ───────────────────────────────────────────────────────────────────────── # Step 4: Compose Video with Selected Images and Manifest # ───────────────────────────────────────────────────────────────────────── async def step_compose_video(manifest: dict) -> dict: """ Compose final video using selected images and manifest labels. Calls the FastAPI /compose endpoint which handles scene config generation. Args: manifest (dict): Manifest with title and scenes Returns: dict: Composition results with video path and metadata """ print("\n" + "="*70) print(f"[STEP 4] Composing Video from Selected Images") print("="*70) scenes = manifest.get("scenes", []) selected_images = sorted(SELECTED_DIR.glob("scene_*.jpg")) print(f"Manifest title: {manifest.get('title', 'Untitled')}") print(f"Selected images: {len(selected_images)}") print(f"Required images: {len(scenes) + 1} (title + {len(scenes)} scenes)") # Expected: title + all scenes expected_images = len(scenes) + 1 if len(selected_images) != expected_images: raise Exception( f"Image count mismatch: expected {expected_images}, " f"found {len(selected_images)}" ) # Call the FastAPI /compose endpoint print(f"\nCalling /compose endpoint...") try: payload = { "title": manifest.get("title", "Untitled"), "scenes": [ { "label": s.get("label", f"Scene {idx}"), "image_query": s.get("image_query", "") } for idx, s in enumerate(scenes) ] } response = requests.post( f"http://localhost:7860/compose", json=payload, timeout=300 ) if response.status_code != 200: error_data = response.json() if response.headers.get("content-type") == "application/json" else response.text print(f"✗ Server returned {response.status_code}: {error_data}") raise Exception(f"Compose endpoint failed: {error_data}") # Check if response is binary (video file) or JSON if response.headers.get("content-type", "").startswith("video"): # Save video file output_path = PIPELINE_DIR / "output_video.mp4" with open(output_path, "wb") as f: f.write(response.content) size_mb = output_path.stat().st_size / (1024 * 1024) print(f"✓ Video saved: {output_path.name} ({size_mb:.2f}MB)") return { "status": "success", "video_path": str(output_path), "size_mb": size_mb, "scenes": len(scenes) + 1 } else: # Response is JSON (might be error or status) data = response.json() if data.get("status") == "success": print(f"✓ Compose completed successfully") return data else: raise Exception(f"Compose failed: {data.get('message', 'Unknown error')}") except Exception as e: print(f"✗ Composition failed: {e}") raise # Generate dynamic SCENE_CONFIG from manifest print(f"\nGenerating scene configuration...") try: payload = { "title": manifest.get("title", "Untitled"), "scenes": [ { "label": s.get("label", f"Scene {idx}"), "image_query": s.get("image_query", "") } for idx, s in enumerate(scenes) ] } response = requests.post( f"http://localhost:7860/compose", json=payload, timeout=300 ) if response.status_code != 200: error_data = response.json() if response.headers.get("content-type") == "application/json" else response.text print(f"✗ Server returned {response.status_code}: {error_data}") raise Exception(f"Compose endpoint failed: {error_data}") # Check if response is binary (video file) or JSON if response.headers.get("content-type", "").startswith("video"): # Save video file output_path = PIPELINE_DIR / "output_video.mp4" with open(output_path, "wb") as f: f.write(response.content) size_mb = output_path.stat().st_size / (1024 * 1024) print(f"✓ Video saved: {output_path.name} ({size_mb:.2f}MB)") return { "status": "success", "video_path": str(output_path), "size_mb": size_mb, "scenes": len(scenes) + 1 } else: # Response is JSON (might be error or status) data = response.json() if data.get("status") == "success": print(f"✓ Compose completed successfully") return data else: raise Exception(f"Compose failed: {data.get('message', 'Unknown error')}") except Exception as e: print(f"✗ Composition failed: {e}") raise # ───────────────────────────────────────────────────────────────────────── # Main Pipeline Orchestrator # ───────────────────────────────────────────────────────────────────────── async def generate_video_from_prompt(prompt: str) -> dict: """ Complete pipeline: Prompt → Manifest → Images → Selection → Video Args: prompt (str): User prompt describing video content Returns: dict: Final result with video path or error """ try: # Step 1: Generate manifest from prompt manifest = await step_generate_manifest(prompt) # Step 2: Download images for each scene downloaded = await step_download_images(manifest) if downloaded == 0: raise Exception("No images were downloaded") # Step 3: Select best images from candidates selection = await step_select_scenes(manifest) if selection["selected"] != selection["total"]: raise Exception( f"Selection incomplete: {selection['selected']}/{selection['total']}" ) # Step 4: Compose final video composition = await step_compose_video(manifest) # Success! print("\n" + "="*70) print("[SUCCESS] Pipeline Complete!") print("="*70) print(f"Title: {manifest.get('title', 'Untitled')}") print(f"Scenes: {len(manifest.get('scenes', []))}") print(f"Video: {composition['output_path']}") print(f"Size: {composition['size_mb']:.1f}MB") print("="*70) return { "status": "success", "message": "Video generated successfully", "title": manifest.get("title"), "scenes": len(manifest.get("scenes", [])), "output_path": composition["output_path"], "size_mb": composition["size_mb"], } except Exception as e: print("\n" + "="*70) print(f"[ERROR] Pipeline Failed: {e}") print("="*70) return { "status": "error", "message": str(e), "output_path": None, } # ───────────────────────────────────────────────────────────────────────── # Local Testing # ───────────────────────────────────────────────────────────────────────── if __name__ == "__main__": import sys if len(sys.argv) > 1: prompt = " ".join(sys.argv[1:]) else: prompt = "A motivational video about personal growth and success" # Ensure directories exist PIPELINE_DIR.mkdir(exist_ok=True) RENDERS_DIR.mkdir(exist_ok=True) # Run pipeline result = asyncio.run(generate_video_from_prompt(prompt)) # Print final status if result["status"] == "success": print(f"\n✓ Video saved to: {result['output_path']}") sys.exit(0) else: print(f"\n✗ Error: {result['message']}") sys.exit(1)