Spaces:
Sleeping
Sleeping
| """ | |
| Gemini AI Service for image and video generation. | |
| Python port of the TypeScript geminiService.ts | |
| Uses server-side API key from environment. | |
| """ | |
| import asyncio | |
| import logging | |
| import os | |
| import uuid | |
| import httpx | |
| from typing import Optional, Literal | |
| from google import genai | |
| from google.genai import types | |
| logger = logging.getLogger(__name__) | |
| # Model names - easily configurable | |
| MODELS = { | |
| "text_generation": "gemini-2.5-flash", | |
| "image_edit": "gemini-2.5-flash-image", | |
| "video_generation": "veo-3.1-generate-preview" | |
| } | |
| # Type aliases | |
| AspectRatio = Literal["16:9", "9:16"] | |
| Resolution = Literal["720p", "1080p"] | |
| # Video downloads directory | |
| DOWNLOADS_DIR = os.path.join(os.path.dirname(os.path.dirname(__file__)), "downloads") | |
| # Ensure downloads directory exists | |
| os.makedirs(DOWNLOADS_DIR, exist_ok=True) | |
| # Mock mode for local testing (set GEMINI_MOCK_MODE=true to skip real API calls) | |
| MOCK_MODE = os.getenv("GEMINI_MOCK_MODE", "false").lower() == "true" | |
| MOCK_MODE_SLEEP_TIME = os.getenv("GEMINI_MOCK_MODE_SLEEP_TIME", "0.5") | |
| # Sample video URL for mock mode (a public test video) | |
| MOCK_VIDEO_URL = "https://video.twimg.com/amplify_video/1994083297756848128/vid/avc1/576x576/ue31qU0xts8L9tXD.mp4?tag=21" | |
| # Concurrency limits from environment (defaults) | |
| MAX_CONCURRENT_VIDEOS = int(os.getenv("MAX_CONCURRENT_VIDEOS", "2")) | |
| MAX_CONCURRENT_IMAGES = int(os.getenv("MAX_CONCURRENT_IMAGES", "5")) | |
| MAX_CONCURRENT_TEXT = int(os.getenv("MAX_CONCURRENT_TEXT", "10")) | |
| # Semaphores for concurrency control | |
| _video_semaphore: Optional[asyncio.Semaphore] = None | |
| _image_semaphore: Optional[asyncio.Semaphore] = None | |
| _text_semaphore: Optional[asyncio.Semaphore] = None | |
| def get_video_semaphore() -> asyncio.Semaphore: | |
| """Get or create video semaphore.""" | |
| global _video_semaphore | |
| if _video_semaphore is None: | |
| _video_semaphore = asyncio.Semaphore(MAX_CONCURRENT_VIDEOS) | |
| logger.info(f"Video semaphore initialized with limit: {MAX_CONCURRENT_VIDEOS}") | |
| return _video_semaphore | |
| def get_image_semaphore() -> asyncio.Semaphore: | |
| """Get or create image semaphore.""" | |
| global _image_semaphore | |
| if _image_semaphore is None: | |
| _image_semaphore = asyncio.Semaphore(MAX_CONCURRENT_IMAGES) | |
| logger.info(f"Image semaphore initialized with limit: {MAX_CONCURRENT_IMAGES}") | |
| return _image_semaphore | |
| def get_text_semaphore() -> asyncio.Semaphore: | |
| """Get or create text semaphore.""" | |
| global _text_semaphore | |
| if _text_semaphore is None: | |
| _text_semaphore = asyncio.Semaphore(MAX_CONCURRENT_TEXT) | |
| logger.info(f"Text semaphore initialized with limit: {MAX_CONCURRENT_TEXT}") | |
| return _text_semaphore | |
| def get_gemini_api_key() -> str: | |
| """Get Gemini API key from environment.""" | |
| api_key = os.getenv("GEMINI_API_KEY") | |
| if not api_key: | |
| raise ValueError("Server Authentication Error with GEMINI") | |
| return api_key | |
| class GeminiService: | |
| """ | |
| Gemini AI Service for text, image, and video generation. | |
| Uses server-side API key from environment. | |
| """ | |
| def __init__(self, api_key: Optional[str] = None): | |
| """Initialize the Gemini client with API key from env or provided.""" | |
| self.api_key = api_key or get_gemini_api_key() | |
| self.client = genai.Client(api_key=self.api_key) | |
| def _handle_api_error(self, error: Exception, context: str): | |
| """Handle API errors with descriptive messages.""" | |
| msg = str(error) | |
| if "404" in msg or "NOT_FOUND" in msg or "Requested entity was not found" in msg or "[5," in msg: | |
| raise ValueError( | |
| f"Model not found ({context}). Ensure your API key project has access to this model. " | |
| "Veo requires a paid account." | |
| ) | |
| raise error | |
| async def generate_animation_prompt( | |
| self, | |
| base64_image: str, | |
| mime_type: str, | |
| custom_prompt: Optional[str] = None | |
| ) -> str: | |
| """ | |
| Analyzes the image to generate a suitable animation prompt. | |
| """ | |
| # Mock mode for testing | |
| if MOCK_MODE: | |
| logger.info("[MOCK MODE] Generating animation prompt") | |
| await asyncio.sleep(GEMINI_MOCK_MODE_SLEEP_TIME) # Simulate API delay | |
| return "A gentle breeze rustles through the scene as soft light dances across the surface. The camera slowly zooms in with a subtle parallax effect, creating depth and movement." | |
| default_prompt = custom_prompt or "Describe how this image could be subtly animated with cinematic movement." | |
| async with get_text_semaphore(): | |
| try: | |
| response = await asyncio.to_thread( | |
| self.client.models.generate_content, | |
| model=MODELS["text_generation"], | |
| contents=types.Content( | |
| parts=[ | |
| types.Part.from_bytes( | |
| data=base64_image, | |
| mime_type=mime_type | |
| ), | |
| types.Part.from_text(text=default_prompt) | |
| ] | |
| ) | |
| ) | |
| return response.text or "Cinematic subtle movement" | |
| except Exception as error: | |
| self._handle_api_error(error, MODELS["text_generation"]) | |
| async def edit_image( | |
| self, | |
| base64_image: str, | |
| mime_type: str, | |
| prompt: str | |
| ) -> str: | |
| """ | |
| Edit an image using Gemini image model. | |
| Returns base64 data URI of the edited image. | |
| """ | |
| # Mock mode for testing - return a sample image | |
| if MOCK_MODE: | |
| logger.info(f"[MOCK MODE] Editing image with prompt: {prompt}") | |
| await asyncio.sleep(1) # Simulate API delay | |
| # Return a small red placeholder image (1x1 pixel) | |
| return "" | |
| async with get_image_semaphore(): | |
| try: | |
| response = await asyncio.to_thread( | |
| self.client.models.generate_content, | |
| model=MODELS["image_edit"], | |
| contents=types.Content( | |
| parts=[ | |
| types.Part.from_bytes( | |
| data=base64_image, | |
| mime_type=mime_type | |
| ), | |
| types.Part.from_text(text=prompt or "Enhance this image") | |
| ] | |
| ) | |
| ) | |
| candidates = response.candidates | |
| if not candidates: | |
| raise ValueError("No candidates returned from Gemini.") | |
| for part in candidates[0].content.parts: | |
| if hasattr(part, 'inline_data') and part.inline_data and part.inline_data.data: | |
| result_mime = part.inline_data.mime_type or 'image/png' | |
| return f"data:{result_mime};base64,{part.inline_data.data}" | |
| raise ValueError("No image data found in the response.") | |
| except Exception as error: | |
| self._handle_api_error(error, MODELS["image_edit"]) | |
| async def start_video_generation( | |
| self, | |
| base64_image: str, | |
| mime_type: str, | |
| prompt: str, | |
| aspect_ratio: AspectRatio = "16:9", | |
| resolution: Resolution = "720p", | |
| number_of_videos: int = 1 | |
| ) -> dict: | |
| """ | |
| Start video generation using Veo model. | |
| Returns operation details for polling. | |
| """ | |
| # Mock mode for testing without API credits | |
| if MOCK_MODE: | |
| import uuid | |
| mock_operation_name = f"mock_operation_{uuid.uuid4().hex[:16]}" | |
| logger.info(f"[MOCK MODE] Starting video generation: {mock_operation_name}") | |
| return { | |
| "gemini_operation_name": mock_operation_name, | |
| "done": False, | |
| "status": "pending" | |
| } | |
| async with get_video_semaphore(): | |
| try: | |
| # Start video generation | |
| operation = await asyncio.to_thread( | |
| self.client.models.generate_videos, | |
| model=MODELS["video_generation"], | |
| prompt=prompt, | |
| image=types.Image( | |
| image_bytes=base64_image, | |
| mime_type=mime_type | |
| ), | |
| config=types.GenerateVideosConfig( | |
| number_of_videos=number_of_videos, | |
| resolution=resolution, | |
| aspect_ratio=aspect_ratio | |
| ) | |
| ) | |
| # Return operation details | |
| return { | |
| "gemini_operation_name": operation.name, | |
| "done": operation.done, | |
| "status": "completed" if operation.done else "pending" | |
| } | |
| except Exception as error: | |
| self._handle_api_error(error, MODELS["video_generation"]) | |
| async def check_video_status(self, gemini_operation_name: str) -> dict: | |
| """ | |
| Check the status of a video generation operation. | |
| Returns status and video URL if complete. | |
| """ | |
| # Mock mode for testing without API credits | |
| if MOCK_MODE: | |
| # Simulate processing time: complete after 2 checks (track via a simple mechanism) | |
| # For simplicity, always return completed with mock video URL | |
| logger.info(f"[MOCK MODE] Checking video status: {gemini_operation_name}") | |
| await asyncio.sleep(2) # Simulate API delay | |
| return { | |
| "gemini_operation_name": gemini_operation_name, | |
| "done": True, | |
| "status": "completed", | |
| "video_url": MOCK_VIDEO_URL | |
| } | |
| try: | |
| # Get operation status using the operation object | |
| # First, we need to recreate the operation from the name | |
| from google.genai.types import GenerateVideosOperation | |
| operation = await asyncio.to_thread( | |
| self.client.operations.get, | |
| GenerateVideosOperation(name=gemini_operation_name, done=False) | |
| ) | |
| if not operation.done: | |
| return { | |
| "gemini_operation_name": gemini_operation_name, | |
| "done": False, | |
| "status": "pending" | |
| } | |
| # Check for error - handle both string and object types | |
| if operation.error: | |
| error_msg = operation.error | |
| if hasattr(operation.error, 'message'): | |
| error_msg = operation.error.message | |
| return { | |
| "gemini_operation_name": gemini_operation_name, | |
| "done": True, | |
| "status": "failed", | |
| "error": str(error_msg) or "Unknown error" | |
| } | |
| # Extract video URI from result | |
| result = operation.result | |
| if result and hasattr(result, 'generated_videos') and result.generated_videos: | |
| video = result.generated_videos[0] | |
| if hasattr(video, 'video') and video.video and hasattr(video.video, 'uri'): | |
| video_uri = video.video.uri | |
| return { | |
| "gemini_operation_name": gemini_operation_name, | |
| "done": True, | |
| "status": "completed", | |
| "video_url": f"{video_uri}&key={self.api_key}" | |
| } | |
| return { | |
| "gemini_operation_name": gemini_operation_name, | |
| "done": True, | |
| "status": "failed", | |
| "error": "No video URI returned. May be due to safety filters." | |
| } | |
| except Exception as error: | |
| msg = str(error) | |
| if "404" in msg or "NOT_FOUND" in msg or "Requested entity was not found" in msg: | |
| return { | |
| "gemini_operation_name": gemini_operation_name, | |
| "done": True, | |
| "status": "failed", | |
| "error": "Operation not found (404). It may have expired." | |
| } | |
| raise error | |
| async def download_video(self, video_url: str, operation_id: str) -> str: | |
| """ | |
| Download video from Gemini to local storage. | |
| Returns the local filename. | |
| """ | |
| filename = f"{operation_id}.mp4" | |
| filepath = os.path.join(DOWNLOADS_DIR, filename) | |
| try: | |
| # follow_redirects=True is required as Gemini returns 302 redirects | |
| async with httpx.AsyncClient(timeout=120.0, follow_redirects=True) as client: | |
| response = await client.get(video_url) | |
| response.raise_for_status() | |
| with open(filepath, 'wb') as f: | |
| f.write(response.content) | |
| logger.info(f"Downloaded video to {filepath}") | |
| return filename | |
| except Exception as e: | |
| logger.error(f"Failed to download video: {e}") | |
| raise ValueError(f"Failed to download video: {e}") | |
| async def generate_text( | |
| self, | |
| prompt: str, | |
| model: Optional[str] = None | |
| ) -> str: | |
| """ | |
| Simple text generation with Gemini. | |
| """ | |
| # Mock mode for testing | |
| if MOCK_MODE: | |
| logger.info(f"[MOCK MODE] Generating text for prompt: {prompt[:50]}...") | |
| await asyncio.sleep(MOCK_MODE_SLEEP_TIME) # Simulate API delay | |
| return f"This is a mock response for your prompt: '{prompt[:100]}...'. In production, this would be generated by Gemini AI." | |
| model_name = model or MODELS["text_generation"] | |
| async with get_text_semaphore(): | |
| try: | |
| response = await asyncio.to_thread( | |
| self.client.models.generate_content, | |
| model=model_name, | |
| contents=types.Content( | |
| parts=[types.Part.from_text(text=prompt)] | |
| ) | |
| ) | |
| return response.text or "" | |
| except Exception as error: | |
| self._handle_api_error(error, model_name) | |
| async def analyze_image( | |
| self, | |
| base64_image: str, | |
| mime_type: str, | |
| prompt: str | |
| ) -> str: | |
| """ | |
| Analyze image with custom prompt. | |
| """ | |
| # Mock mode for testing | |
| if MOCK_MODE: | |
| logger.info(f"[MOCK MODE] Analyzing image with prompt: {prompt[:50]}...") | |
| await asyncio.sleep(MOCK_MODE_SLEEP_TIME) # Simulate API delay | |
| return f"Mock analysis result: The image appears to show a scene that matches your query '{prompt[:50]}...'. This is placeholder content for testing." | |
| async with get_text_semaphore(): | |
| try: | |
| response = await asyncio.to_thread( | |
| self.client.models.generate_content, | |
| model=MODELS["text_generation"], | |
| contents=types.Content( | |
| parts=[ | |
| types.Part.from_bytes( | |
| data=base64_image, | |
| mime_type=mime_type | |
| ), | |
| types.Part.from_text(text=prompt) | |
| ] | |
| ) | |
| ) | |
| return response.text or "" | |
| except Exception as error: | |
| self._handle_api_error(error, MODELS["text_generation"]) | |