Spaces:
Sleeping
Sleeping
| """ | |
| Full Pipeline: Prompt β Manifest β Images β Selection β Composition | |
| Orchestrates the complete workflow from user prompt to final MP4 video | |
| """ | |
| import requests | |
| import json | |
| import os | |
| from pathlib import Path | |
| from PIL import Image | |
| from io import BytesIO | |
| import asyncio | |
| import subprocess | |
| import sys | |
| from datetime import datetime | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Configuration | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| MANIFEST_SERVER = "https://factorstudios-content-gen.hf.space" | |
| IMAGE_SERVER = "https://factorstudios-pinteresting.hf.space" | |
| PIPELINE_DIR = Path(__file__).parent | |
| CANDIDATES_DIR = PIPELINE_DIR / "candidates" | |
| SELECTED_DIR = PIPELINE_DIR / "selected" | |
| RENDERS_DIR = PIPELINE_DIR / "renders" | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Step 1: Generate Manifest from Prompt | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| async def step_generate_manifest(prompt: str, output_dir: Path = PIPELINE_DIR) -> dict: | |
| """ | |
| Call content-gen server to generate manifest from prompt. | |
| Saves manifest to manifest_response.json | |
| Args: | |
| prompt (str): User prompt describing video content | |
| output_dir (Path): Directory to save manifest | |
| Returns: | |
| dict: Manifest with title and scenes | |
| """ | |
| print("\n" + "="*70) | |
| print(f"[STEP 1] Generating Manifest from Prompt") | |
| print("="*70) | |
| print(f"Prompt: {prompt[:80]}...") | |
| try: | |
| # Call manifest generation server | |
| payload = {"prompt": prompt} | |
| print(f"Calling {MANIFEST_SERVER}/generate...") | |
| response = requests.post( | |
| f"{MANIFEST_SERVER}/generate", | |
| json=payload, | |
| timeout=60 | |
| ) | |
| response.raise_for_status() | |
| manifest = response.json() | |
| # Save manifest to file | |
| manifest_path = output_dir / "manifest_response.json" | |
| with open(manifest_path, "w") as f: | |
| json.dump(manifest, f, indent=2) | |
| scenes = manifest.get("scenes", []) | |
| print(f"β Generated manifest with {len(scenes)} scenes") | |
| print(f"β Saved to {manifest_path.name}") | |
| # Print scene details | |
| for idx, scene in enumerate(scenes): | |
| label = scene.get("label", f"Scene {idx}") | |
| query = scene.get("image_query", "") | |
| print(f" Scene {idx}: {label} (query: '{query[:30]}...')") | |
| return manifest | |
| except Exception as e: | |
| print(f"β Failed to generate manifest: {e}") | |
| raise | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Step 2: Download Images for Each Scene | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| async def step_download_images( | |
| manifest: dict, | |
| output_dir: Path = CANDIDATES_DIR, | |
| images_per_scene: int = 5 | |
| ) -> int: | |
| """ | |
| Download images from pinteresting server for each scene in manifest. | |
| IMPORTANT: Downloads image for TITLE (scene 0) + all scenes in manifest.scenes | |
| Follows the pattern from test_api.py | |
| Args: | |
| manifest (dict): Manifest with title and scenes | |
| output_dir (Path): Base directory to organize images | |
| images_per_scene (int): Number of images per scene | |
| Returns: | |
| int: Total number of images downloaded | |
| """ | |
| print("\n" + "="*70) | |
| print(f"[STEP 2] Downloading Images (Title + Scenes)") | |
| print("="*70) | |
| # Clear and recreate candidates directory | |
| if output_dir.exists(): | |
| import shutil | |
| shutil.rmtree(output_dir) | |
| output_dir.mkdir(parents=True, exist_ok=True) | |
| total_downloaded = 0 | |
| # STEP 2.0: Download image for TITLE (becomes scene_0) | |
| title = manifest.get("title", "") | |
| if title: | |
| scene_dir = output_dir / "scene_0" | |
| scene_dir.mkdir(parents=True, exist_ok=True) | |
| print(f"\n[Scene 0] {title} (TITLE/INTRO)") | |
| print(f" Query: {title}") | |
| try: | |
| payload = { | |
| "keyword": title, | |
| "count": images_per_scene | |
| } | |
| print(f" Calling {IMAGE_SERVER}/scrape...") | |
| response = requests.post( | |
| f"{IMAGE_SERVER}/scrape", | |
| json=payload, | |
| timeout=60 | |
| ) | |
| response.raise_for_status() | |
| data = response.json() | |
| images = data.get("images", []) | |
| print(f" Downloaded {len(images)} images") | |
| for img_idx, img_url in enumerate(images): | |
| try: | |
| img_response = requests.get(img_url, timeout=30) | |
| if img_response.status_code == 200: | |
| img_path = scene_dir / f"candidate_{img_idx:02d}.jpg" | |
| with open(img_path, "wb") as f: | |
| f.write(img_response.content) | |
| total_downloaded += 1 | |
| except Exception as e: | |
| print(f" β Failed to save image {img_idx}: {e}") | |
| except Exception as e: | |
| print(f" β Error downloading images for title: {e}") | |
| # STEP 2.1: Download images for each content scene (becomes scene_1, scene_2, etc) | |
| scenes = manifest.get("scenes", []) | |
| for scene_idx, scene in enumerate(scenes): | |
| actual_idx = scene_idx + 1 # scene_1, scene_2, etc (title is scene_0) | |
| scene_label = scene.get("label", f"Scene {actual_idx}") | |
| image_query = scene.get("image_query", "") | |
| if not image_query: | |
| print(f"\n[Scene {actual_idx}] β No image query found, skipping...") | |
| continue | |
| # Create scene-specific folder | |
| scene_dir = output_dir / f"scene_{actual_idx}" | |
| scene_dir.mkdir(parents=True, exist_ok=True) | |
| print(f"\n[Scene {actual_idx}] {scene_label}") | |
| print(f" Query: {image_query}") | |
| # Fetch images from pinteresting API | |
| try: | |
| payload = { | |
| "keyword": image_query, | |
| "count": images_per_scene | |
| } | |
| print(f" Calling {IMAGE_SERVER}/scrape...") | |
| response = requests.post( | |
| f"{IMAGE_SERVER}/scrape", | |
| json=payload, | |
| timeout=60 | |
| ) | |
| response.raise_for_status() | |
| data = response.json() | |
| if data.get("success"): | |
| images = data.get("images", []) | |
| print(f" β Found {len(images)} images") | |
| # Download each image | |
| for img_idx, img_data in enumerate(images): | |
| img_url = img_data.get("url") | |
| if not img_url: | |
| continue | |
| try: | |
| # Download image | |
| img_response = requests.get(img_url, timeout=15) | |
| img_response.raise_for_status() | |
| # Verify it's a valid image | |
| img = Image.open(BytesIO(img_response.content)) | |
| # Save image | |
| file_name = f"candidate_{img_idx:02d}.jpg" | |
| file_path = scene_dir / file_name | |
| with open(file_path, "wb") as f: | |
| f.write(img_response.content) | |
| size_kb = len(img_response.content) / 1024 | |
| dims = f"{img_data.get('width', '?')}x{img_data.get('height', '?')}" | |
| print(f" β {file_name} ({dims}, {size_kb:.0f}KB)") | |
| total_downloaded += 1 | |
| except Exception as e: | |
| print(f" β Image {img_idx} failed: {e}") | |
| else: | |
| print(f" β API Error: {data.get('message')}") | |
| except Exception as e: | |
| print(f" β Request failed: {e}") | |
| print(f"\nβ Downloaded {total_downloaded} images total") | |
| return total_downloaded | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Step 3: Select Best Image from Each Scene's Candidates | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| async def step_select_scenes(manifest: dict, candidates_dir: Path = CANDIDATES_DIR) -> dict: | |
| """ | |
| Select best image from each scene's candidate folder. | |
| IMPORTANT: Selects from TITLE (scene_0) + all scenes in manifest.scenes | |
| Evaluates by file size (largest = best quality). | |
| Args: | |
| manifest (dict): Manifest with scene count | |
| candidates_dir (Path): Directory with candidate images | |
| Returns: | |
| dict: Selection results | |
| """ | |
| print("\n" + "="*70) | |
| print(f"[STEP 3] Selecting Best Images from Candidates") | |
| print("="*70) | |
| # Ensure selected directory exists | |
| SELECTED_DIR.mkdir(parents=True, exist_ok=True) | |
| scenes = manifest.get("scenes", []) | |
| selected_count = 0 | |
| # Select from scene_0 (title) through scene_N (content scenes) | |
| # Total scenes = len(scenes) + 1 (for title as scene_0) | |
| total_scene_count = len(scenes) + 1 | |
| for scene_idx in range(total_scene_count): | |
| scene_folder = candidates_dir / f"scene_{scene_idx}" | |
| if not scene_folder.exists(): | |
| if scene_idx == 0: | |
| print(f"[Scene {scene_idx}] β No candidates found (TITLE)") | |
| else: | |
| print(f"[Scene {scene_idx}] β No candidates found") | |
| continue | |
| # Find largest image (best quality) | |
| images = list(scene_folder.glob("*.jpg")) | |
| if not images: | |
| if scene_idx == 0: | |
| print(f"[Scene {scene_idx}] β No JPEG images found (TITLE)") | |
| else: | |
| print(f"[Scene {scene_idx}] β No JPEG images found") | |
| continue | |
| best_img = max(images, key=lambda p: p.stat().st_size) | |
| size_kb = best_img.stat().st_size / 1024 | |
| # Copy to selected folder | |
| selected_path = SELECTED_DIR / f"scene_{scene_idx:02d}.jpg" | |
| import shutil | |
| shutil.copy2(best_img, selected_path) | |
| if scene_idx == 0: | |
| print(f"[Scene {scene_idx}] β Selected {best_img.name} ({size_kb:.0f}KB) [TITLE]") | |
| else: | |
| print(f"[Scene {scene_idx}] β Selected {best_img.name} ({size_kb:.0f}KB)") | |
| selected_count += 1 | |
| print(f"\nβ Selected {selected_count} images ({total_scene_count} total: title + {len(scenes)} scenes)") | |
| return { | |
| "status": "success", | |
| "selected": selected_count, | |
| "total": total_scene_count | |
| } | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Step 4: Compose Video with Selected Images and Manifest | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| async def step_compose_video(manifest: dict) -> dict: | |
| """ | |
| Compose final video using selected images and manifest labels. | |
| Calls the FastAPI /compose endpoint which handles scene config generation. | |
| Args: | |
| manifest (dict): Manifest with title and scenes | |
| Returns: | |
| dict: Composition results with video path and metadata | |
| """ | |
| print("\n" + "="*70) | |
| print(f"[STEP 4] Composing Video from Selected Images") | |
| print("="*70) | |
| scenes = manifest.get("scenes", []) | |
| selected_images = sorted(SELECTED_DIR.glob("scene_*.jpg")) | |
| print(f"Manifest title: {manifest.get('title', 'Untitled')}") | |
| print(f"Selected images: {len(selected_images)}") | |
| print(f"Required images: {len(scenes) + 1} (title + {len(scenes)} scenes)") | |
| # Expected: title + all scenes | |
| expected_images = len(scenes) + 1 | |
| if len(selected_images) != expected_images: | |
| raise Exception( | |
| f"Image count mismatch: expected {expected_images}, " | |
| f"found {len(selected_images)}" | |
| ) | |
| # Call the FastAPI /compose endpoint | |
| print(f"\nCalling /compose endpoint...") | |
| try: | |
| payload = { | |
| "title": manifest.get("title", "Untitled"), | |
| "scenes": [ | |
| { | |
| "label": s.get("label", f"Scene {idx}"), | |
| "image_query": s.get("image_query", "") | |
| } | |
| for idx, s in enumerate(scenes) | |
| ] | |
| } | |
| response = requests.post( | |
| f"http://localhost:7860/compose", | |
| json=payload, | |
| timeout=300 | |
| ) | |
| if response.status_code != 200: | |
| error_data = response.json() if response.headers.get("content-type") == "application/json" else response.text | |
| print(f"β Server returned {response.status_code}: {error_data}") | |
| raise Exception(f"Compose endpoint failed: {error_data}") | |
| # Check if response is binary (video file) or JSON | |
| if response.headers.get("content-type", "").startswith("video"): | |
| # Save video file | |
| output_path = PIPELINE_DIR / "output_video.mp4" | |
| with open(output_path, "wb") as f: | |
| f.write(response.content) | |
| size_mb = output_path.stat().st_size / (1024 * 1024) | |
| print(f"β Video saved: {output_path.name} ({size_mb:.2f}MB)") | |
| return { | |
| "status": "success", | |
| "video_path": str(output_path), | |
| "size_mb": size_mb, | |
| "scenes": len(scenes) + 1 | |
| } | |
| else: | |
| # Response is JSON (might be error or status) | |
| data = response.json() | |
| if data.get("status") == "success": | |
| print(f"β Compose completed successfully") | |
| return data | |
| else: | |
| raise Exception(f"Compose failed: {data.get('message', 'Unknown error')}") | |
| except Exception as e: | |
| print(f"β Composition failed: {e}") | |
| raise | |
| # Generate dynamic SCENE_CONFIG from manifest | |
| print(f"\nGenerating scene configuration...") | |
| try: | |
| payload = { | |
| "title": manifest.get("title", "Untitled"), | |
| "scenes": [ | |
| { | |
| "label": s.get("label", f"Scene {idx}"), | |
| "image_query": s.get("image_query", "") | |
| } | |
| for idx, s in enumerate(scenes) | |
| ] | |
| } | |
| response = requests.post( | |
| f"http://localhost:7860/compose", | |
| json=payload, | |
| timeout=300 | |
| ) | |
| if response.status_code != 200: | |
| error_data = response.json() if response.headers.get("content-type") == "application/json" else response.text | |
| print(f"β Server returned {response.status_code}: {error_data}") | |
| raise Exception(f"Compose endpoint failed: {error_data}") | |
| # Check if response is binary (video file) or JSON | |
| if response.headers.get("content-type", "").startswith("video"): | |
| # Save video file | |
| output_path = PIPELINE_DIR / "output_video.mp4" | |
| with open(output_path, "wb") as f: | |
| f.write(response.content) | |
| size_mb = output_path.stat().st_size / (1024 * 1024) | |
| print(f"β Video saved: {output_path.name} ({size_mb:.2f}MB)") | |
| return { | |
| "status": "success", | |
| "video_path": str(output_path), | |
| "size_mb": size_mb, | |
| "scenes": len(scenes) + 1 | |
| } | |
| else: | |
| # Response is JSON (might be error or status) | |
| data = response.json() | |
| if data.get("status") == "success": | |
| print(f"β Compose completed successfully") | |
| return data | |
| else: | |
| raise Exception(f"Compose failed: {data.get('message', 'Unknown error')}") | |
| except Exception as e: | |
| print(f"β Composition failed: {e}") | |
| raise | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Main Pipeline Orchestrator | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| async def generate_video_from_prompt(prompt: str) -> dict: | |
| """ | |
| Complete pipeline: Prompt β Manifest β Images β Selection β Video | |
| Args: | |
| prompt (str): User prompt describing video content | |
| Returns: | |
| dict: Final result with video path or error | |
| """ | |
| try: | |
| # Step 1: Generate manifest from prompt | |
| manifest = await step_generate_manifest(prompt) | |
| # Step 2: Download images for each scene | |
| downloaded = await step_download_images(manifest) | |
| if downloaded == 0: | |
| raise Exception("No images were downloaded") | |
| # Step 3: Select best images from candidates | |
| selection = await step_select_scenes(manifest) | |
| if selection["selected"] != selection["total"]: | |
| raise Exception( | |
| f"Selection incomplete: {selection['selected']}/{selection['total']}" | |
| ) | |
| # Step 4: Compose final video | |
| composition = await step_compose_video(manifest) | |
| # Success! | |
| print("\n" + "="*70) | |
| print("[SUCCESS] Pipeline Complete!") | |
| print("="*70) | |
| print(f"Title: {manifest.get('title', 'Untitled')}") | |
| print(f"Scenes: {len(manifest.get('scenes', []))}") | |
| print(f"Video: {composition['output_path']}") | |
| print(f"Size: {composition['size_mb']:.1f}MB") | |
| print("="*70) | |
| return { | |
| "status": "success", | |
| "message": "Video generated successfully", | |
| "title": manifest.get("title"), | |
| "scenes": len(manifest.get("scenes", [])), | |
| "output_path": composition["output_path"], | |
| "size_mb": composition["size_mb"], | |
| } | |
| except Exception as e: | |
| print("\n" + "="*70) | |
| print(f"[ERROR] Pipeline Failed: {e}") | |
| print("="*70) | |
| return { | |
| "status": "error", | |
| "message": str(e), | |
| "output_path": None, | |
| } | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Local Testing | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| if __name__ == "__main__": | |
| import sys | |
| if len(sys.argv) > 1: | |
| prompt = " ".join(sys.argv[1:]) | |
| else: | |
| prompt = "A motivational video about personal growth and success" | |
| # Ensure directories exist | |
| PIPELINE_DIR.mkdir(exist_ok=True) | |
| RENDERS_DIR.mkdir(exist_ok=True) | |
| # Run pipeline | |
| result = asyncio.run(generate_video_from_prompt(prompt)) | |
| # Print final status | |
| if result["status"] == "success": | |
| print(f"\nβ Video saved to: {result['output_path']}") | |
| sys.exit(0) | |
| else: | |
| print(f"\nβ Error: {result['message']}") | |
| sys.exit(1) | |