File size: 8,958 Bytes
693e4e3
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
"""
Fal.ai Service for video generation.
Python implementation using fal-client SDK.
Uses server-side API key from environment.
"""
import asyncio
import logging
import os
from typing import Optional, Literal

logger = logging.getLogger(__name__)

# Model names - easily configurable
MODELS = {
    "video_generation": "fal-ai/veo3.1/fast/image-to-video"
}

# Type aliases
AspectRatio = Literal["16:9", "9:16", "auto"]
Resolution = Literal["720p", "1080p"]

# Mock mode for local testing (set FAL_MOCK_MODE=true to skip real API calls)
MOCK_MODE = os.getenv("FAL_MOCK_MODE", "false").lower() == "true"

# Sample video URL for mock mode
MOCK_VIDEO_URL = "https://v3b.fal.media/files/mock/mock-video.mp4"


def get_fal_api_key() -> str:
    """Get Fal.ai API key from environment."""
    api_key = os.getenv("FAL_KEY")
    if not api_key:
        raise ValueError("Server Authentication Error: FAL_KEY not configured")
    return api_key


class FalService:
    """
    Fal.ai Service for video generation.
    Uses server-side API key from environment (FAL_KEY).
    """

    def __init__(self, api_key: Optional[str] = None):
        """Initialize the Fal.ai client with API key from env or provided."""
        self.api_key = api_key or get_fal_api_key()
        # fal_client reads from FAL_KEY env var by default
        # Set it explicitly if a custom key is provided
        if api_key:
            os.environ["FAL_KEY"] = api_key

    def _handle_api_error(self, error: Exception, context: str):
        """Handle API errors with descriptive messages."""
        msg = str(error)
        if "401" in msg or "Unauthorized" in msg:
            raise ValueError(
                f"Authentication failed ({context}). Check your FAL_KEY is valid."
            )
        if "402" in msg or "Payment Required" in msg:
            raise ValueError(
                f"Insufficient credits ({context}). Add credits at fal.ai."
            )
        if "429" in msg or "Rate limit" in msg.lower():
            raise ValueError(
                f"Rate limit exceeded ({context}). Wait and retry."
            )
        raise error

    async def start_video_generation(
        self,
        base64_image: str,
        mime_type: str,
        prompt: str,
        aspect_ratio: AspectRatio = "16:9",
        resolution: Resolution = "720p",
        number_of_videos: int = 1
    ) -> dict:
        """
        Start video generation using Fal.ai Veo 3.1 model.
        Unlike Gemini, fal.ai subscribe() handles polling internally,
        so this returns the completed video directly.
        
        Returns dict with:
            - fal_request_id: Request ID for reference
            - done: Always True (fal.ai waits for completion)
            - status: "completed" or "failed"
            - video_url: URL to the generated video
        """
        # Mock mode for testing without API credits
        if MOCK_MODE:
            import uuid
            mock_request_id = f"mock_fal_{uuid.uuid4().hex[:16]}"
            logger.info(f"[MOCK MODE] Video generation: {mock_request_id}")
            await asyncio.sleep(2)  # Simulate API delay
            return {
                "fal_request_id": mock_request_id,
                "done": True,
                "status": "completed",
                "video_url": MOCK_VIDEO_URL
            }

        try:
            import fal_client

            # Use submit() instead of subscribe() - returns immediately without waiting
            # This starts the job and returns a request_id for status checking
            handle = await asyncio.to_thread(
                fal_client.submit,
                MODELS["video_generation"],
                arguments={
                    "prompt": prompt,
                    "image_url": f"data:{mime_type};base64,{base64_image}",
                    "aspect_ratio": aspect_ratio,
                    "resolution": resolution,
                    "generate_audio": True,
                },
            )

            # Get the request ID from the handle
            request_id = handle.request_id if hasattr(handle, 'request_id') else str(handle)

            return {
                "fal_request_id": request_id,
                "done": False,
                "status": "processing",
            }

        except ImportError:
            raise ValueError(
                "fal-client package not installed. Run: pip install fal-client"
            )
        except Exception as error:
            self._handle_api_error(error, MODELS["video_generation"])

    async def check_video_status(self, fal_request_id: str) -> dict:
        """
        Check the status of a video generation request.
        Returns immediately with current status (does not wait).
        """
        # Mock mode for testing
        if MOCK_MODE:
            import random
            # Simulate completion after a few checks
            if random.random() > 0.7:
                return {
                    "fal_request_id": fal_request_id,
                    "done": True,
                    "status": "completed",
                    "video_url": MOCK_VIDEO_URL
                }
            return {
                "fal_request_id": fal_request_id,
                "done": False,
                "status": "processing"
            }

        try:
            import fal_client

            # Get status without waiting
            status = await asyncio.to_thread(
                fal_client.status,
                MODELS["video_generation"],
                fal_request_id,
                with_logs=False
            )

            # Check if completed
            if hasattr(status, 'status'):
                if status.status == "COMPLETED":
                    # Get the result
                    result = await asyncio.to_thread(
                        fal_client.result,
                        MODELS["video_generation"],
                        fal_request_id
                    )
                    
                    # Extract video URL
                    video_url = None
                    if isinstance(result, dict) and "video" in result:
                        video_url = result["video"].get("url")
                    elif hasattr(result, "video") and hasattr(result.video, "url"):
                        video_url = result.video.url

                    return {
                        "fal_request_id": fal_request_id,
                        "done": True,
                        "status": "completed",
                        "video_url": video_url
                    }
                elif status.status == "FAILED":
                    return {
                        "fal_request_id": fal_request_id,
                        "done": True,
                        "status": "failed",
                        "error": getattr(status, 'error', 'Unknown error')
                    }
                else:
                    # Still processing (IN_QUEUE, IN_PROGRESS)
                    return {
                        "fal_request_id": fal_request_id,
                        "done": False,
                        "status": "processing"
                    }
            
            # Fallback - assume still processing
            return {
                "fal_request_id": fal_request_id,
                "done": False,
                "status": "processing"
            }

        except ImportError:
            raise ValueError(
                "fal-client package not installed. Run: pip install fal-client"
            )
        except Exception as error:
            logger.error(f"Error checking status for {fal_request_id}: {error}")
            return {
                "fal_request_id": fal_request_id,
                "done": False,
                "status": "processing",
                "error": str(error)
            }

    async def download_video(self, video_url: str, request_id: str) -> str:
        """
        Download video from fal.ai to local storage.
        Returns the local filename.
        """
        import httpx
        
        # Use same downloads directory as Gemini service
        downloads_dir = os.path.join(
            os.path.dirname(os.path.dirname(__file__)), 
            "downloads"
        )
        os.makedirs(downloads_dir, exist_ok=True)
        
        filename = f"{request_id}.mp4"
        filepath = os.path.join(downloads_dir, filename)

        try:
            async with httpx.AsyncClient(timeout=120.0, follow_redirects=True) as client:
                response = await client.get(video_url)
                response.raise_for_status()

                with open(filepath, 'wb') as f:
                    f.write(response.content)

                logger.info(f"Downloaded video to {filepath}")
                return filename
        except Exception as e:
            logger.error(f"Failed to download video: {e}")
            raise ValueError(f"Failed to download video: {e}")