jebin2's picture
refac
3c56e03
"""
Gemini AI Service for image and video generation.
Python port of the TypeScript geminiService.ts
Uses server-side API key from environment.
"""
import asyncio
import logging
import os
import uuid
import httpx
from typing import Optional, Literal
from google import genai
from google.genai import types
logger = logging.getLogger(__name__)
# Model names - easily configurable
MODELS = {
"text_generation": "gemini-2.5-flash",
"image_edit": "gemini-2.5-flash-image",
"video_generation": "veo-3.1-generate-preview"
}
# Type aliases
AspectRatio = Literal["16:9", "9:16"]
Resolution = Literal["720p", "1080p"]
# Video downloads directory
DOWNLOADS_DIR = os.path.join(os.path.dirname(os.path.dirname(__file__)), "downloads")
# Ensure downloads directory exists
os.makedirs(DOWNLOADS_DIR, exist_ok=True)
# Mock mode for local testing (set GEMINI_MOCK_MODE=true to skip real API calls)
MOCK_MODE = os.getenv("GEMINI_MOCK_MODE", "false").lower() == "true"
MOCK_MODE_SLEEP_TIME = os.getenv("GEMINI_MOCK_MODE_SLEEP_TIME", "0.5")
# Sample video URL for mock mode (a public test video)
MOCK_VIDEO_URL = "https://video.twimg.com/amplify_video/1994083297756848128/vid/avc1/576x576/ue31qU0xts8L9tXD.mp4?tag=21"
# Concurrency limits from environment (defaults)
MAX_CONCURRENT_VIDEOS = int(os.getenv("MAX_CONCURRENT_VIDEOS", "2"))
MAX_CONCURRENT_IMAGES = int(os.getenv("MAX_CONCURRENT_IMAGES", "5"))
MAX_CONCURRENT_TEXT = int(os.getenv("MAX_CONCURRENT_TEXT", "10"))
# Semaphores for concurrency control
_video_semaphore: Optional[asyncio.Semaphore] = None
_image_semaphore: Optional[asyncio.Semaphore] = None
_text_semaphore: Optional[asyncio.Semaphore] = None
def get_video_semaphore() -> asyncio.Semaphore:
"""Get or create video semaphore."""
global _video_semaphore
if _video_semaphore is None:
_video_semaphore = asyncio.Semaphore(MAX_CONCURRENT_VIDEOS)
logger.info(f"Video semaphore initialized with limit: {MAX_CONCURRENT_VIDEOS}")
return _video_semaphore
def get_image_semaphore() -> asyncio.Semaphore:
"""Get or create image semaphore."""
global _image_semaphore
if _image_semaphore is None:
_image_semaphore = asyncio.Semaphore(MAX_CONCURRENT_IMAGES)
logger.info(f"Image semaphore initialized with limit: {MAX_CONCURRENT_IMAGES}")
return _image_semaphore
def get_text_semaphore() -> asyncio.Semaphore:
"""Get or create text semaphore."""
global _text_semaphore
if _text_semaphore is None:
_text_semaphore = asyncio.Semaphore(MAX_CONCURRENT_TEXT)
logger.info(f"Text semaphore initialized with limit: {MAX_CONCURRENT_TEXT}")
return _text_semaphore
def get_gemini_api_key() -> str:
"""Get Gemini API key from environment."""
api_key = os.getenv("GEMINI_API_KEY")
if not api_key:
raise ValueError("Server Authentication Error with GEMINI")
return api_key
class GeminiService:
"""
Gemini AI Service for text, image, and video generation.
Uses server-side API key from environment.
"""
def __init__(self, api_key: Optional[str] = None):
"""Initialize the Gemini client with API key from env or provided."""
self.api_key = api_key or get_gemini_api_key()
self.client = genai.Client(api_key=self.api_key)
def _handle_api_error(self, error: Exception, context: str):
"""Handle API errors with descriptive messages."""
msg = str(error)
if "404" in msg or "NOT_FOUND" in msg or "Requested entity was not found" in msg or "[5," in msg:
raise ValueError(
f"Model not found ({context}). Ensure your API key project has access to this model. "
"Veo requires a paid account."
)
raise error
async def generate_animation_prompt(
self,
base64_image: str,
mime_type: str,
custom_prompt: Optional[str] = None
) -> str:
"""
Analyzes the image to generate a suitable animation prompt.
"""
# Mock mode for testing
if MOCK_MODE:
logger.info("[MOCK MODE] Generating animation prompt")
await asyncio.sleep(GEMINI_MOCK_MODE_SLEEP_TIME) # Simulate API delay
return "A gentle breeze rustles through the scene as soft light dances across the surface. The camera slowly zooms in with a subtle parallax effect, creating depth and movement."
default_prompt = custom_prompt or "Describe how this image could be subtly animated with cinematic movement."
async with get_text_semaphore():
try:
response = await asyncio.to_thread(
self.client.models.generate_content,
model=MODELS["text_generation"],
contents=types.Content(
parts=[
types.Part.from_bytes(
data=base64_image,
mime_type=mime_type
),
types.Part.from_text(text=default_prompt)
]
)
)
return response.text or "Cinematic subtle movement"
except Exception as error:
self._handle_api_error(error, MODELS["text_generation"])
async def edit_image(
self,
base64_image: str,
mime_type: str,
prompt: str
) -> str:
"""
Edit an image using Gemini image model.
Returns base64 data URI of the edited image.
"""
# Mock mode for testing - return a sample image
if MOCK_MODE:
logger.info(f"[MOCK MODE] Editing image with prompt: {prompt}")
await asyncio.sleep(1) # Simulate API delay
# Return a small red placeholder image (1x1 pixel)
return "data:image/png;base64,iVBORw0KGgoAAAANSUhEUgAAAAEAAAABCAYAAAAfFcSJAAAADUlEQVR42mP8z8DwHwAFBQIAX8jx0gAAAABJRU5ErkJggg=="
async with get_image_semaphore():
try:
response = await asyncio.to_thread(
self.client.models.generate_content,
model=MODELS["image_edit"],
contents=types.Content(
parts=[
types.Part.from_bytes(
data=base64_image,
mime_type=mime_type
),
types.Part.from_text(text=prompt or "Enhance this image")
]
)
)
candidates = response.candidates
if not candidates:
raise ValueError("No candidates returned from Gemini.")
for part in candidates[0].content.parts:
if hasattr(part, 'inline_data') and part.inline_data and part.inline_data.data:
result_mime = part.inline_data.mime_type or 'image/png'
return f"data:{result_mime};base64,{part.inline_data.data}"
raise ValueError("No image data found in the response.")
except Exception as error:
self._handle_api_error(error, MODELS["image_edit"])
async def start_video_generation(
self,
base64_image: str,
mime_type: str,
prompt: str,
aspect_ratio: AspectRatio = "16:9",
resolution: Resolution = "720p",
number_of_videos: int = 1
) -> dict:
"""
Start video generation using Veo model.
Returns operation details for polling.
"""
# Mock mode for testing without API credits
if MOCK_MODE:
import uuid
mock_operation_name = f"mock_operation_{uuid.uuid4().hex[:16]}"
logger.info(f"[MOCK MODE] Starting video generation: {mock_operation_name}")
return {
"gemini_operation_name": mock_operation_name,
"done": False,
"status": "pending"
}
async with get_video_semaphore():
try:
# Start video generation
operation = await asyncio.to_thread(
self.client.models.generate_videos,
model=MODELS["video_generation"],
prompt=prompt,
image=types.Image(
image_bytes=base64_image,
mime_type=mime_type
),
config=types.GenerateVideosConfig(
number_of_videos=number_of_videos,
resolution=resolution,
aspect_ratio=aspect_ratio
)
)
# Return operation details
return {
"gemini_operation_name": operation.name,
"done": operation.done,
"status": "completed" if operation.done else "pending"
}
except Exception as error:
self._handle_api_error(error, MODELS["video_generation"])
async def check_video_status(self, gemini_operation_name: str) -> dict:
"""
Check the status of a video generation operation.
Returns status and video URL if complete.
"""
# Mock mode for testing without API credits
if MOCK_MODE:
# Simulate processing time: complete after 2 checks (track via a simple mechanism)
# For simplicity, always return completed with mock video URL
logger.info(f"[MOCK MODE] Checking video status: {gemini_operation_name}")
await asyncio.sleep(2) # Simulate API delay
return {
"gemini_operation_name": gemini_operation_name,
"done": True,
"status": "completed",
"video_url": MOCK_VIDEO_URL
}
try:
# Get operation status using the operation object
# First, we need to recreate the operation from the name
from google.genai.types import GenerateVideosOperation
operation = await asyncio.to_thread(
self.client.operations.get,
GenerateVideosOperation(name=gemini_operation_name, done=False)
)
if not operation.done:
return {
"gemini_operation_name": gemini_operation_name,
"done": False,
"status": "pending"
}
# Check for error - handle both string and object types
if operation.error:
error_msg = operation.error
if hasattr(operation.error, 'message'):
error_msg = operation.error.message
return {
"gemini_operation_name": gemini_operation_name,
"done": True,
"status": "failed",
"error": str(error_msg) or "Unknown error"
}
# Extract video URI from result
result = operation.result
if result and hasattr(result, 'generated_videos') and result.generated_videos:
video = result.generated_videos[0]
if hasattr(video, 'video') and video.video and hasattr(video.video, 'uri'):
video_uri = video.video.uri
return {
"gemini_operation_name": gemini_operation_name,
"done": True,
"status": "completed",
"video_url": f"{video_uri}&key={self.api_key}"
}
return {
"gemini_operation_name": gemini_operation_name,
"done": True,
"status": "failed",
"error": "No video URI returned. May be due to safety filters."
}
except Exception as error:
msg = str(error)
if "404" in msg or "NOT_FOUND" in msg or "Requested entity was not found" in msg:
return {
"gemini_operation_name": gemini_operation_name,
"done": True,
"status": "failed",
"error": "Operation not found (404). It may have expired."
}
raise error
async def download_video(self, video_url: str, operation_id: str) -> str:
"""
Download video from Gemini to local storage.
Returns the local filename.
"""
filename = f"{operation_id}.mp4"
filepath = os.path.join(DOWNLOADS_DIR, filename)
try:
# follow_redirects=True is required as Gemini returns 302 redirects
async with httpx.AsyncClient(timeout=120.0, follow_redirects=True) as client:
response = await client.get(video_url)
response.raise_for_status()
with open(filepath, 'wb') as f:
f.write(response.content)
logger.info(f"Downloaded video to {filepath}")
return filename
except Exception as e:
logger.error(f"Failed to download video: {e}")
raise ValueError(f"Failed to download video: {e}")
async def generate_text(
self,
prompt: str,
model: Optional[str] = None
) -> str:
"""
Simple text generation with Gemini.
"""
# Mock mode for testing
if MOCK_MODE:
logger.info(f"[MOCK MODE] Generating text for prompt: {prompt[:50]}...")
await asyncio.sleep(MOCK_MODE_SLEEP_TIME) # Simulate API delay
return f"This is a mock response for your prompt: '{prompt[:100]}...'. In production, this would be generated by Gemini AI."
model_name = model or MODELS["text_generation"]
async with get_text_semaphore():
try:
response = await asyncio.to_thread(
self.client.models.generate_content,
model=model_name,
contents=types.Content(
parts=[types.Part.from_text(text=prompt)]
)
)
return response.text or ""
except Exception as error:
self._handle_api_error(error, model_name)
async def analyze_image(
self,
base64_image: str,
mime_type: str,
prompt: str
) -> str:
"""
Analyze image with custom prompt.
"""
# Mock mode for testing
if MOCK_MODE:
logger.info(f"[MOCK MODE] Analyzing image with prompt: {prompt[:50]}...")
await asyncio.sleep(MOCK_MODE_SLEEP_TIME) # Simulate API delay
return f"Mock analysis result: The image appears to show a scene that matches your query '{prompt[:50]}...'. This is placeholder content for testing."
async with get_text_semaphore():
try:
response = await asyncio.to_thread(
self.client.models.generate_content,
model=MODELS["text_generation"],
contents=types.Content(
parts=[
types.Part.from_bytes(
data=base64_image,
mime_type=mime_type
),
types.Part.from_text(text=prompt)
]
)
)
return response.text or ""
except Exception as error:
self._handle_api_error(error, MODELS["text_generation"])