File size: 15,968 Bytes
1bd7131
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
b49d66f
1bd7131
 
 
 
 
 
 
 
 
 
 
 
7b9d520
 
ec0e527
7b9d520
 
 
 
1bd7131
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
b49d66f
1bd7131
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
16eca32
 
 
ec0e527
16eca32
 
1bd7131
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
16eca32
 
 
 
 
 
 
1bd7131
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
7b9d520
 
 
 
 
 
 
 
 
 
 
1bd7131
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
7b9d520
 
 
 
 
 
 
 
 
 
 
 
 
1bd7131
8c4055f
 
 
 
1bd7131
 
8c4055f
1bd7131
 
 
 
 
 
 
 
 
8c4055f
1bd7131
8c4055f
 
 
1bd7131
 
 
 
8c4055f
1bd7131
 
8c4055f
 
 
 
 
 
 
 
 
 
 
 
1bd7131
 
 
 
8c4055f
 
1bd7131
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
8c4055f
 
1bd7131
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
16eca32
 
 
ec0e527
16eca32
 
1bd7131
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
16eca32
 
 
ec0e527
16eca32
 
1bd7131
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
"""
Gemini AI Service for image and video generation.
Python port of the TypeScript geminiService.ts
Uses server-side API key from environment.
"""
import asyncio
import logging
import os
import uuid
import httpx
from typing import Optional, Literal
from google import genai
from google.genai import types

logger = logging.getLogger(__name__)

# Model names - easily configurable
MODELS = {
    "text_generation": "gemini-2.5-flash",
    "image_edit": "gemini-2.5-flash-image",
    "video_generation": "veo-3.1-generate-preview"
}

# Type aliases
AspectRatio = Literal["16:9", "9:16"]
Resolution = Literal["720p", "1080p"]

# Video downloads directory
DOWNLOADS_DIR = os.path.join(os.path.dirname(os.path.dirname(__file__)), "downloads")

# Ensure downloads directory exists
os.makedirs(DOWNLOADS_DIR, exist_ok=True)

# Mock mode for local testing (set GEMINI_MOCK_MODE=true to skip real API calls)
MOCK_MODE = os.getenv("GEMINI_MOCK_MODE", "false").lower() == "true"
MOCK_MODE_SLEEP_TIME = os.getenv("GEMINI_MOCK_MODE_SLEEP_TIME", "0.5")

# Sample video URL for mock mode (a public test video)
MOCK_VIDEO_URL = "https://video.twimg.com/amplify_video/1994083297756848128/vid/avc1/576x576/ue31qU0xts8L9tXD.mp4?tag=21"

# Concurrency limits from environment (defaults)
MAX_CONCURRENT_VIDEOS = int(os.getenv("MAX_CONCURRENT_VIDEOS", "2"))
MAX_CONCURRENT_IMAGES = int(os.getenv("MAX_CONCURRENT_IMAGES", "5"))
MAX_CONCURRENT_TEXT = int(os.getenv("MAX_CONCURRENT_TEXT", "10"))

# Semaphores for concurrency control
_video_semaphore: Optional[asyncio.Semaphore] = None
_image_semaphore: Optional[asyncio.Semaphore] = None
_text_semaphore: Optional[asyncio.Semaphore] = None


def get_video_semaphore() -> asyncio.Semaphore:
    """Get or create video semaphore."""
    global _video_semaphore
    if _video_semaphore is None:
        _video_semaphore = asyncio.Semaphore(MAX_CONCURRENT_VIDEOS)
        logger.info(f"Video semaphore initialized with limit: {MAX_CONCURRENT_VIDEOS}")
    return _video_semaphore


def get_image_semaphore() -> asyncio.Semaphore:
    """Get or create image semaphore."""
    global _image_semaphore
    if _image_semaphore is None:
        _image_semaphore = asyncio.Semaphore(MAX_CONCURRENT_IMAGES)
        logger.info(f"Image semaphore initialized with limit: {MAX_CONCURRENT_IMAGES}")
    return _image_semaphore


def get_text_semaphore() -> asyncio.Semaphore:
    """Get or create text semaphore."""
    global _text_semaphore
    if _text_semaphore is None:
        _text_semaphore = asyncio.Semaphore(MAX_CONCURRENT_TEXT)
        logger.info(f"Text semaphore initialized with limit: {MAX_CONCURRENT_TEXT}")
    return _text_semaphore


def get_gemini_api_key() -> str:
    """Get Gemini API key from environment."""
    api_key = os.getenv("GEMINI_API_KEY")
    if not api_key:
        raise ValueError("Server Authentication Error with GEMINI")
    return api_key


class GeminiService:
    """
    Gemini AI Service for text, image, and video generation.
    Uses server-side API key from environment.
    """

    def __init__(self, api_key: Optional[str] = None):
        """Initialize the Gemini client with API key from env or provided."""
        self.api_key = api_key or get_gemini_api_key()
        self.client = genai.Client(api_key=self.api_key)

    def _handle_api_error(self, error: Exception, context: str):
        """Handle API errors with descriptive messages."""
        msg = str(error)
        if "404" in msg or "NOT_FOUND" in msg or "Requested entity was not found" in msg or "[5," in msg:
            raise ValueError(
                f"Model not found ({context}). Ensure your API key project has access to this model. "
                "Veo requires a paid account."
            )
        raise error

    async def generate_animation_prompt(
        self,
        base64_image: str,
        mime_type: str,
        custom_prompt: Optional[str] = None
    ) -> str:
        """
        Analyzes the image to generate a suitable animation prompt.
        """
        # Mock mode for testing
        if MOCK_MODE:
            logger.info("[MOCK MODE] Generating animation prompt")
            await asyncio.sleep(GEMINI_MOCK_MODE_SLEEP_TIME)  # Simulate API delay
            return "A gentle breeze rustles through the scene as soft light dances across the surface. The camera slowly zooms in with a subtle parallax effect, creating depth and movement."
        
        default_prompt = custom_prompt or "Describe how this image could be subtly animated with cinematic movement."
        async with get_text_semaphore():
            try:
                response = await asyncio.to_thread(
                    self.client.models.generate_content,
                    model=MODELS["text_generation"],
                    contents=types.Content(
                        parts=[
                            types.Part.from_bytes(
                                data=base64_image,
                                mime_type=mime_type
                            ),
                            types.Part.from_text(text=default_prompt)
                        ]
                    )
                )
                return response.text or "Cinematic subtle movement"
            except Exception as error:
                self._handle_api_error(error, MODELS["text_generation"])

    async def edit_image(
        self,
        base64_image: str,
        mime_type: str,
        prompt: str
    ) -> str:
        """
        Edit an image using Gemini image model.
        Returns base64 data URI of the edited image.
        """
        # Mock mode for testing - return a sample image
        if MOCK_MODE:
            logger.info(f"[MOCK MODE] Editing image with prompt: {prompt}")
            await asyncio.sleep(1)  # Simulate API delay
            # Return a small red placeholder image (1x1 pixel)
            return ""
        
        async with get_image_semaphore():
            try:
                response = await asyncio.to_thread(
                    self.client.models.generate_content,
                    model=MODELS["image_edit"],
                    contents=types.Content(
                        parts=[
                            types.Part.from_bytes(
                                data=base64_image,
                                mime_type=mime_type
                            ),
                            types.Part.from_text(text=prompt or "Enhance this image")
                        ]
                    )
                )

                candidates = response.candidates
                if not candidates:
                    raise ValueError("No candidates returned from Gemini.")

                for part in candidates[0].content.parts:
                    if hasattr(part, 'inline_data') and part.inline_data and part.inline_data.data:
                        result_mime = part.inline_data.mime_type or 'image/png'
                        return f"data:{result_mime};base64,{part.inline_data.data}"

                raise ValueError("No image data found in the response.")
            except Exception as error:
                self._handle_api_error(error, MODELS["image_edit"])

    async def start_video_generation(
        self,
        base64_image: str,
        mime_type: str,
        prompt: str,
        aspect_ratio: AspectRatio = "16:9",
        resolution: Resolution = "720p",
        number_of_videos: int = 1
    ) -> dict:
        """
        Start video generation using Veo model.
        Returns operation details for polling.
        """
        # Mock mode for testing without API credits
        if MOCK_MODE:
            import uuid
            mock_operation_name = f"mock_operation_{uuid.uuid4().hex[:16]}"
            logger.info(f"[MOCK MODE] Starting video generation: {mock_operation_name}")
            return {
                "gemini_operation_name": mock_operation_name,
                "done": False,
                "status": "pending"
            }
        
        async with get_video_semaphore():
            try:
                # Start video generation
                operation = await asyncio.to_thread(
                    self.client.models.generate_videos,
                    model=MODELS["video_generation"],
                    prompt=prompt,
                    image=types.Image(
                        image_bytes=base64_image,
                        mime_type=mime_type
                    ),
                    config=types.GenerateVideosConfig(
                        number_of_videos=number_of_videos,
                        resolution=resolution,
                        aspect_ratio=aspect_ratio
                    )
                )
                
                # Return operation details
                return {
                    "gemini_operation_name": operation.name,
                    "done": operation.done,
                    "status": "completed" if operation.done else "pending"
                }
            except Exception as error:
                self._handle_api_error(error, MODELS["video_generation"])

    async def check_video_status(self, gemini_operation_name: str) -> dict:
        """
        Check the status of a video generation operation.
        Returns status and video URL if complete.
        """
        # Mock mode for testing without API credits
        if MOCK_MODE:
            # Simulate processing time: complete after 2 checks (track via a simple mechanism)
            # For simplicity, always return completed with mock video URL
            logger.info(f"[MOCK MODE] Checking video status: {gemini_operation_name}")
            await asyncio.sleep(2)  # Simulate API delay
            return {
                "gemini_operation_name": gemini_operation_name,
                "done": True,
                "status": "completed",
                "video_url": MOCK_VIDEO_URL
            }
        
        try:
            # Get operation status using the operation object
            # First, we need to recreate the operation from the name
            from google.genai.types import GenerateVideosOperation
            
            operation = await asyncio.to_thread(
                self.client.operations.get,
                GenerateVideosOperation(name=gemini_operation_name, done=False)
            )
            
            if not operation.done:
                return {
                    "gemini_operation_name": gemini_operation_name,
                    "done": False,
                    "status": "pending"
                }
            
            # Check for error - handle both string and object types
            if operation.error:
                error_msg = operation.error
                if hasattr(operation.error, 'message'):
                    error_msg = operation.error.message
                return {
                    "gemini_operation_name": gemini_operation_name,
                    "done": True,
                    "status": "failed",
                    "error": str(error_msg) or "Unknown error"
                }
            
            # Extract video URI from result
            result = operation.result
            if result and hasattr(result, 'generated_videos') and result.generated_videos:
                video = result.generated_videos[0]
                if hasattr(video, 'video') and video.video and hasattr(video.video, 'uri'):
                    video_uri = video.video.uri
                    return {
                        "gemini_operation_name": gemini_operation_name,
                        "done": True,
                        "status": "completed",
                        "video_url": f"{video_uri}&key={self.api_key}"
                    }
            
            return {
                "gemini_operation_name": gemini_operation_name,
                "done": True,
                "status": "failed",
                "error": "No video URI returned. May be due to safety filters."
            }
            
        except Exception as error:
            msg = str(error)
            if "404" in msg or "NOT_FOUND" in msg or "Requested entity was not found" in msg:
                return {
                    "gemini_operation_name": gemini_operation_name,
                    "done": True,
                    "status": "failed",
                    "error": "Operation not found (404). It may have expired."
                }
            raise error

    async def download_video(self, video_url: str, operation_id: str) -> str:
        """
        Download video from Gemini to local storage.
        Returns the local filename.
        """
        filename = f"{operation_id}.mp4"
        filepath = os.path.join(DOWNLOADS_DIR, filename)
        
        try:
            # follow_redirects=True is required as Gemini returns 302 redirects
            async with httpx.AsyncClient(timeout=120.0, follow_redirects=True) as client:
                response = await client.get(video_url)
                response.raise_for_status()
                
                with open(filepath, 'wb') as f:
                    f.write(response.content)
                
                logger.info(f"Downloaded video to {filepath}")
                return filename
        except Exception as e:
            logger.error(f"Failed to download video: {e}")
            raise ValueError(f"Failed to download video: {e}")

    async def generate_text(
        self,
        prompt: str,
        model: Optional[str] = None
    ) -> str:
        """
        Simple text generation with Gemini.
        """
        # Mock mode for testing
        if MOCK_MODE:
            logger.info(f"[MOCK MODE] Generating text for prompt: {prompt[:50]}...")
            await asyncio.sleep(MOCK_MODE_SLEEP_TIME)  # Simulate API delay
            return f"This is a mock response for your prompt: '{prompt[:100]}...'. In production, this would be generated by Gemini AI."
        
        model_name = model or MODELS["text_generation"]
        async with get_text_semaphore():
            try:
                response = await asyncio.to_thread(
                    self.client.models.generate_content,
                    model=model_name,
                    contents=types.Content(
                        parts=[types.Part.from_text(text=prompt)]
                    )
                )
                return response.text or ""
            except Exception as error:
                self._handle_api_error(error, model_name)

    async def analyze_image(
        self,
        base64_image: str,
        mime_type: str,
        prompt: str
    ) -> str:
        """
        Analyze image with custom prompt.
        """
        # Mock mode for testing
        if MOCK_MODE:
            logger.info(f"[MOCK MODE] Analyzing image with prompt: {prompt[:50]}...")
            await asyncio.sleep(MOCK_MODE_SLEEP_TIME)  # Simulate API delay
            return f"Mock analysis result: The image appears to show a scene that matches your query '{prompt[:50]}...'. This is placeholder content for testing."
        
        async with get_text_semaphore():
            try:
                response = await asyncio.to_thread(
                    self.client.models.generate_content,
                    model=MODELS["text_generation"],
                    contents=types.Content(
                        parts=[
                            types.Part.from_bytes(
                                data=base64_image,
                                mime_type=mime_type
                            ),
                            types.Part.from_text(text=prompt)
                        ]
                    )
                )
                return response.text or ""
            except Exception as error:
                self._handle_api_error(error, MODELS["text_generation"])