Spaces:
Build error
Build error
| """ | |
| Fal.ai Service for video generation. | |
| Python implementation using fal-client SDK. | |
| Uses server-side API key from environment. | |
| """ | |
| import asyncio | |
| import logging | |
| import os | |
| from typing import Optional, Literal | |
| logger = logging.getLogger(__name__) | |
| # Model names - easily configurable | |
| MODELS = { | |
| "video_generation": "fal-ai/veo3.1/fast/image-to-video" | |
| } | |
| # Type aliases | |
| AspectRatio = Literal["16:9", "9:16", "auto"] | |
| Resolution = Literal["720p", "1080p"] | |
| # Mock mode for local testing (set FAL_MOCK_MODE=true to skip real API calls) | |
| MOCK_MODE = os.getenv("FAL_MOCK_MODE", "false").lower() == "true" | |
| # Sample video URL for mock mode | |
| MOCK_VIDEO_URL = "https://v3b.fal.media/files/mock/mock-video.mp4" | |
| def get_fal_api_key() -> str: | |
| """Get Fal.ai API key from environment.""" | |
| api_key = os.getenv("FAL_KEY") | |
| if not api_key: | |
| raise ValueError("Server Authentication Error: FAL_KEY not configured") | |
| return api_key | |
| class FalService: | |
| """ | |
| Fal.ai Service for video generation. | |
| Uses server-side API key from environment (FAL_KEY). | |
| """ | |
| def __init__(self, api_key: Optional[str] = None): | |
| """Initialize the Fal.ai client with API key from env or provided.""" | |
| self.api_key = api_key or get_fal_api_key() | |
| # fal_client reads from FAL_KEY env var by default | |
| # Set it explicitly if a custom key is provided | |
| if api_key: | |
| os.environ["FAL_KEY"] = api_key | |
| def _handle_api_error(self, error: Exception, context: str): | |
| """Handle API errors with descriptive messages.""" | |
| msg = str(error) | |
| if "401" in msg or "Unauthorized" in msg: | |
| raise ValueError( | |
| f"Authentication failed ({context}). Check your FAL_KEY is valid." | |
| ) | |
| if "402" in msg or "Payment Required" in msg: | |
| raise ValueError( | |
| f"Insufficient credits ({context}). Add credits at fal.ai." | |
| ) | |
| if "429" in msg or "Rate limit" in msg.lower(): | |
| raise ValueError( | |
| f"Rate limit exceeded ({context}). Wait and retry." | |
| ) | |
| raise error | |
| async def start_video_generation( | |
| self, | |
| base64_image: str, | |
| mime_type: str, | |
| prompt: str, | |
| aspect_ratio: AspectRatio = "16:9", | |
| resolution: Resolution = "720p", | |
| number_of_videos: int = 1 | |
| ) -> dict: | |
| """ | |
| Start video generation using Fal.ai Veo 3.1 model. | |
| Unlike Gemini, fal.ai subscribe() handles polling internally, | |
| so this returns the completed video directly. | |
| Returns dict with: | |
| - fal_request_id: Request ID for reference | |
| - done: Always True (fal.ai waits for completion) | |
| - status: "completed" or "failed" | |
| - video_url: URL to the generated video | |
| """ | |
| # Mock mode for testing without API credits | |
| if MOCK_MODE: | |
| import uuid | |
| mock_request_id = f"mock_fal_{uuid.uuid4().hex[:16]}" | |
| logger.info(f"[MOCK MODE] Video generation: {mock_request_id}") | |
| await asyncio.sleep(2) # Simulate API delay | |
| return { | |
| "fal_request_id": mock_request_id, | |
| "done": True, | |
| "status": "completed", | |
| "video_url": MOCK_VIDEO_URL | |
| } | |
| try: | |
| import fal_client | |
| # Use submit() instead of subscribe() - returns immediately without waiting | |
| # This starts the job and returns a request_id for status checking | |
| handle = await asyncio.to_thread( | |
| fal_client.submit, | |
| MODELS["video_generation"], | |
| arguments={ | |
| "prompt": prompt, | |
| "image_url": f"data:{mime_type};base64,{base64_image}", | |
| "aspect_ratio": aspect_ratio, | |
| "resolution": resolution, | |
| "generate_audio": True, | |
| }, | |
| ) | |
| # Get the request ID from the handle | |
| request_id = handle.request_id if hasattr(handle, 'request_id') else str(handle) | |
| return { | |
| "fal_request_id": request_id, | |
| "done": False, | |
| "status": "processing", | |
| } | |
| except ImportError: | |
| raise ValueError( | |
| "fal-client package not installed. Run: pip install fal-client" | |
| ) | |
| except Exception as error: | |
| self._handle_api_error(error, MODELS["video_generation"]) | |
| async def check_video_status(self, fal_request_id: str) -> dict: | |
| """ | |
| Check the status of a video generation request. | |
| Returns immediately with current status (does not wait). | |
| """ | |
| # Mock mode for testing | |
| if MOCK_MODE: | |
| import random | |
| # Simulate completion after a few checks | |
| if random.random() > 0.7: | |
| return { | |
| "fal_request_id": fal_request_id, | |
| "done": True, | |
| "status": "completed", | |
| "video_url": MOCK_VIDEO_URL | |
| } | |
| return { | |
| "fal_request_id": fal_request_id, | |
| "done": False, | |
| "status": "processing" | |
| } | |
| try: | |
| import fal_client | |
| # Get status without waiting | |
| status = await asyncio.to_thread( | |
| fal_client.status, | |
| MODELS["video_generation"], | |
| fal_request_id, | |
| with_logs=False | |
| ) | |
| # Check if completed | |
| if hasattr(status, 'status'): | |
| if status.status == "COMPLETED": | |
| # Get the result | |
| result = await asyncio.to_thread( | |
| fal_client.result, | |
| MODELS["video_generation"], | |
| fal_request_id | |
| ) | |
| # Extract video URL | |
| video_url = None | |
| if isinstance(result, dict) and "video" in result: | |
| video_url = result["video"].get("url") | |
| elif hasattr(result, "video") and hasattr(result.video, "url"): | |
| video_url = result.video.url | |
| return { | |
| "fal_request_id": fal_request_id, | |
| "done": True, | |
| "status": "completed", | |
| "video_url": video_url | |
| } | |
| elif status.status == "FAILED": | |
| return { | |
| "fal_request_id": fal_request_id, | |
| "done": True, | |
| "status": "failed", | |
| "error": getattr(status, 'error', 'Unknown error') | |
| } | |
| else: | |
| # Still processing (IN_QUEUE, IN_PROGRESS) | |
| return { | |
| "fal_request_id": fal_request_id, | |
| "done": False, | |
| "status": "processing" | |
| } | |
| # Fallback - assume still processing | |
| return { | |
| "fal_request_id": fal_request_id, | |
| "done": False, | |
| "status": "processing" | |
| } | |
| except ImportError: | |
| raise ValueError( | |
| "fal-client package not installed. Run: pip install fal-client" | |
| ) | |
| except Exception as error: | |
| logger.error(f"Error checking status for {fal_request_id}: {error}") | |
| return { | |
| "fal_request_id": fal_request_id, | |
| "done": False, | |
| "status": "processing", | |
| "error": str(error) | |
| } | |
| async def download_video(self, video_url: str, request_id: str) -> str: | |
| """ | |
| Download video from fal.ai to local storage. | |
| Returns the local filename. | |
| """ | |
| import httpx | |
| # Use same downloads directory as Gemini service | |
| downloads_dir = os.path.join( | |
| os.path.dirname(os.path.dirname(__file__)), | |
| "downloads" | |
| ) | |
| os.makedirs(downloads_dir, exist_ok=True) | |
| filename = f"{request_id}.mp4" | |
| filepath = os.path.join(downloads_dir, filename) | |
| try: | |
| async with httpx.AsyncClient(timeout=120.0, follow_redirects=True) as client: | |
| response = await client.get(video_url) | |
| response.raise_for_status() | |
| with open(filepath, 'wb') as f: | |
| f.write(response.content) | |
| logger.info(f"Downloaded video to {filepath}") | |
| return filename | |
| except Exception as e: | |
| logger.error(f"Failed to download video: {e}") | |
| raise ValueError(f"Failed to download video: {e}") | |