""" Gemini AI Service for image and video generation. Python port of the TypeScript geminiService.ts Uses server-side API key from environment. """ import asyncio import logging import os import uuid import httpx from typing import Optional, Literal from google import genai from google.genai import types logger = logging.getLogger(__name__) # Model names - easily configurable MODELS = { "text_generation": "gemini-2.5-flash", "image_edit": "gemini-2.5-flash-image", "video_generation": "veo-3.1-generate-preview" } # Type aliases AspectRatio = Literal["16:9", "9:16"] Resolution = Literal["720p", "1080p"] # Video downloads directory DOWNLOADS_DIR = os.path.join(os.path.dirname(os.path.dirname(__file__)), "downloads") # Ensure downloads directory exists os.makedirs(DOWNLOADS_DIR, exist_ok=True) # Mock mode for local testing (set GEMINI_MOCK_MODE=true to skip real API calls) MOCK_MODE = os.getenv("GEMINI_MOCK_MODE", "false").lower() == "true" MOCK_MODE_SLEEP_TIME = os.getenv("GEMINI_MOCK_MODE_SLEEP_TIME", "0.5") # Sample video URL for mock mode (a public test video) MOCK_VIDEO_URL = "https://video.twimg.com/amplify_video/1994083297756848128/vid/avc1/576x576/ue31qU0xts8L9tXD.mp4?tag=21" # Concurrency limits from environment (defaults) MAX_CONCURRENT_VIDEOS = int(os.getenv("MAX_CONCURRENT_VIDEOS", "2")) MAX_CONCURRENT_IMAGES = int(os.getenv("MAX_CONCURRENT_IMAGES", "5")) MAX_CONCURRENT_TEXT = int(os.getenv("MAX_CONCURRENT_TEXT", "10")) # Semaphores for concurrency control _video_semaphore: Optional[asyncio.Semaphore] = None _image_semaphore: Optional[asyncio.Semaphore] = None _text_semaphore: Optional[asyncio.Semaphore] = None def get_video_semaphore() -> asyncio.Semaphore: """Get or create video semaphore.""" global _video_semaphore if _video_semaphore is None: _video_semaphore = asyncio.Semaphore(MAX_CONCURRENT_VIDEOS) logger.info(f"Video semaphore initialized with limit: {MAX_CONCURRENT_VIDEOS}") return _video_semaphore def get_image_semaphore() -> asyncio.Semaphore: """Get or create image semaphore.""" global _image_semaphore if _image_semaphore is None: _image_semaphore = asyncio.Semaphore(MAX_CONCURRENT_IMAGES) logger.info(f"Image semaphore initialized with limit: {MAX_CONCURRENT_IMAGES}") return _image_semaphore def get_text_semaphore() -> asyncio.Semaphore: """Get or create text semaphore.""" global _text_semaphore if _text_semaphore is None: _text_semaphore = asyncio.Semaphore(MAX_CONCURRENT_TEXT) logger.info(f"Text semaphore initialized with limit: {MAX_CONCURRENT_TEXT}") return _text_semaphore def get_gemini_api_key() -> str: """Get Gemini API key from environment.""" api_key = os.getenv("GEMINI_API_KEY") if not api_key: raise ValueError("Server Authentication Error with GEMINI") return api_key class GeminiService: """ Gemini AI Service for text, image, and video generation. Uses server-side API key from environment. """ def __init__(self, api_key: Optional[str] = None): """Initialize the Gemini client with API key from env or provided.""" self.api_key = api_key or get_gemini_api_key() self.client = genai.Client(api_key=self.api_key) def _handle_api_error(self, error: Exception, context: str): """Handle API errors with descriptive messages.""" msg = str(error) if "404" in msg or "NOT_FOUND" in msg or "Requested entity was not found" in msg or "[5," in msg: raise ValueError( f"Model not found ({context}). Ensure your API key project has access to this model. " "Veo requires a paid account." ) raise error async def generate_animation_prompt( self, base64_image: str, mime_type: str, custom_prompt: Optional[str] = None ) -> str: """ Analyzes the image to generate a suitable animation prompt. """ # Mock mode for testing if MOCK_MODE: logger.info("[MOCK MODE] Generating animation prompt") await asyncio.sleep(GEMINI_MOCK_MODE_SLEEP_TIME) # Simulate API delay return "A gentle breeze rustles through the scene as soft light dances across the surface. The camera slowly zooms in with a subtle parallax effect, creating depth and movement." default_prompt = custom_prompt or "Describe how this image could be subtly animated with cinematic movement." async with get_text_semaphore(): try: response = await asyncio.to_thread( self.client.models.generate_content, model=MODELS["text_generation"], contents=types.Content( parts=[ types.Part.from_bytes( data=base64_image, mime_type=mime_type ), types.Part.from_text(text=default_prompt) ] ) ) return response.text or "Cinematic subtle movement" except Exception as error: self._handle_api_error(error, MODELS["text_generation"]) async def edit_image( self, base64_image: str, mime_type: str, prompt: str ) -> str: """ Edit an image using Gemini image model. Returns base64 data URI of the edited image. """ # Mock mode for testing - return a sample image if MOCK_MODE: logger.info(f"[MOCK MODE] Editing image with prompt: {prompt}") await asyncio.sleep(1) # Simulate API delay # Return a small red placeholder image (1x1 pixel) return "data:image/png;base64,iVBORw0KGgoAAAANSUhEUgAAAAEAAAABCAYAAAAfFcSJAAAADUlEQVR42mP8z8DwHwAFBQIAX8jx0gAAAABJRU5ErkJggg==" async with get_image_semaphore(): try: response = await asyncio.to_thread( self.client.models.generate_content, model=MODELS["image_edit"], contents=types.Content( parts=[ types.Part.from_bytes( data=base64_image, mime_type=mime_type ), types.Part.from_text(text=prompt or "Enhance this image") ] ) ) candidates = response.candidates if not candidates: raise ValueError("No candidates returned from Gemini.") for part in candidates[0].content.parts: if hasattr(part, 'inline_data') and part.inline_data and part.inline_data.data: result_mime = part.inline_data.mime_type or 'image/png' return f"data:{result_mime};base64,{part.inline_data.data}" raise ValueError("No image data found in the response.") except Exception as error: self._handle_api_error(error, MODELS["image_edit"]) async def start_video_generation( self, base64_image: str, mime_type: str, prompt: str, aspect_ratio: AspectRatio = "16:9", resolution: Resolution = "720p", number_of_videos: int = 1 ) -> dict: """ Start video generation using Veo model. Returns operation details for polling. """ # Mock mode for testing without API credits if MOCK_MODE: import uuid mock_operation_name = f"mock_operation_{uuid.uuid4().hex[:16]}" logger.info(f"[MOCK MODE] Starting video generation: {mock_operation_name}") return { "gemini_operation_name": mock_operation_name, "done": False, "status": "pending" } async with get_video_semaphore(): try: # Start video generation operation = await asyncio.to_thread( self.client.models.generate_videos, model=MODELS["video_generation"], prompt=prompt, image=types.Image( image_bytes=base64_image, mime_type=mime_type ), config=types.GenerateVideosConfig( number_of_videos=number_of_videos, resolution=resolution, aspect_ratio=aspect_ratio ) ) # Return operation details return { "gemini_operation_name": operation.name, "done": operation.done, "status": "completed" if operation.done else "pending" } except Exception as error: self._handle_api_error(error, MODELS["video_generation"]) async def check_video_status(self, gemini_operation_name: str) -> dict: """ Check the status of a video generation operation. Returns status and video URL if complete. """ # Mock mode for testing without API credits if MOCK_MODE: # Simulate processing time: complete after 2 checks (track via a simple mechanism) # For simplicity, always return completed with mock video URL logger.info(f"[MOCK MODE] Checking video status: {gemini_operation_name}") await asyncio.sleep(2) # Simulate API delay return { "gemini_operation_name": gemini_operation_name, "done": True, "status": "completed", "video_url": MOCK_VIDEO_URL } try: # Get operation status using the operation object # First, we need to recreate the operation from the name from google.genai.types import GenerateVideosOperation operation = await asyncio.to_thread( self.client.operations.get, GenerateVideosOperation(name=gemini_operation_name, done=False) ) if not operation.done: return { "gemini_operation_name": gemini_operation_name, "done": False, "status": "pending" } # Check for error - handle both string and object types if operation.error: error_msg = operation.error if hasattr(operation.error, 'message'): error_msg = operation.error.message return { "gemini_operation_name": gemini_operation_name, "done": True, "status": "failed", "error": str(error_msg) or "Unknown error" } # Extract video URI from result result = operation.result if result and hasattr(result, 'generated_videos') and result.generated_videos: video = result.generated_videos[0] if hasattr(video, 'video') and video.video and hasattr(video.video, 'uri'): video_uri = video.video.uri return { "gemini_operation_name": gemini_operation_name, "done": True, "status": "completed", "video_url": f"{video_uri}&key={self.api_key}" } return { "gemini_operation_name": gemini_operation_name, "done": True, "status": "failed", "error": "No video URI returned. May be due to safety filters." } except Exception as error: msg = str(error) if "404" in msg or "NOT_FOUND" in msg or "Requested entity was not found" in msg: return { "gemini_operation_name": gemini_operation_name, "done": True, "status": "failed", "error": "Operation not found (404). It may have expired." } raise error async def download_video(self, video_url: str, operation_id: str) -> str: """ Download video from Gemini to local storage. Returns the local filename. """ filename = f"{operation_id}.mp4" filepath = os.path.join(DOWNLOADS_DIR, filename) try: # follow_redirects=True is required as Gemini returns 302 redirects async with httpx.AsyncClient(timeout=120.0, follow_redirects=True) as client: response = await client.get(video_url) response.raise_for_status() with open(filepath, 'wb') as f: f.write(response.content) logger.info(f"Downloaded video to {filepath}") return filename except Exception as e: logger.error(f"Failed to download video: {e}") raise ValueError(f"Failed to download video: {e}") async def generate_text( self, prompt: str, model: Optional[str] = None ) -> str: """ Simple text generation with Gemini. """ # Mock mode for testing if MOCK_MODE: logger.info(f"[MOCK MODE] Generating text for prompt: {prompt[:50]}...") await asyncio.sleep(MOCK_MODE_SLEEP_TIME) # Simulate API delay return f"This is a mock response for your prompt: '{prompt[:100]}...'. In production, this would be generated by Gemini AI." model_name = model or MODELS["text_generation"] async with get_text_semaphore(): try: response = await asyncio.to_thread( self.client.models.generate_content, model=model_name, contents=types.Content( parts=[types.Part.from_text(text=prompt)] ) ) return response.text or "" except Exception as error: self._handle_api_error(error, model_name) async def analyze_image( self, base64_image: str, mime_type: str, prompt: str ) -> str: """ Analyze image with custom prompt. """ # Mock mode for testing if MOCK_MODE: logger.info(f"[MOCK MODE] Analyzing image with prompt: {prompt[:50]}...") await asyncio.sleep(MOCK_MODE_SLEEP_TIME) # Simulate API delay return f"Mock analysis result: The image appears to show a scene that matches your query '{prompt[:50]}...'. This is placeholder content for testing." async with get_text_semaphore(): try: response = await asyncio.to_thread( self.client.models.generate_content, model=MODELS["text_generation"], contents=types.Content( parts=[ types.Part.from_bytes( data=base64_image, mime_type=mime_type ), types.Part.from_text(text=prompt) ] ) ) return response.text or "" except Exception as error: self._handle_api_error(error, MODELS["text_generation"])