""" Gemini Job Worker - Specific implementation using the modular PriorityWorkerPool. This file shows how to use the modular PriorityWorkerPool with Gemini-specific job processing logic. """ import logging from datetime import datetime, timedelta from typing import Optional from sqlalchemy.ext.asyncio import AsyncSession from core.database import DATABASE_URL from core.models import GeminiJob from services.priority_worker_pool import ( PriorityWorkerPool, JobProcessor, WorkerConfig, get_interval_for_priority ) from services.gemini_service.api_client import GeminiService from services.drive_service import DriveService import asyncio logger = logging.getLogger(__name__) # Job type to priority mapping for Gemini jobs JOB_PRIORITY_MAP = { "text": "fast", "analyze": "fast", "animation_prompt": "fast", "image": "medium", "edit_image": "medium", "video": "slow" } def get_priority_for_job_type(job_type: str) -> str: """Get the priority tier for a Gemini job type.""" return JOB_PRIORITY_MAP.get(job_type, "fast") class GeminiJobProcessor(JobProcessor[GeminiJob]): """Processes Gemini AI jobs (text, image, video generation) with round-robin API keys.""" def __init__(self): self.drive_service = DriveService() async def _get_service_with_key(self, session: AsyncSession) -> tuple: """Get a GeminiService with the least-used API key.""" from services.api_key_manager import get_least_used_key key_index, api_key = await get_least_used_key(session) return key_index, GeminiService(api_key=api_key) async def _record_usage(self, session: AsyncSession, key_index: int, success: bool, error_message: Optional[str] = None): """Record API key usage after request.""" from services.api_key_manager import record_usage await record_usage(session, key_index, success, error_message) def _handle_error(self, job: GeminiJob, error: Exception, reset_to_queued: bool = False) -> tuple[bool, str]: """ Handle job errors with retry logic. Args: job: The job object error: The exception raised reset_to_queued: Whether to reset status to 'queued' on retry (for process()) Returns: Tuple of (success, error_message) success is False (since it's an error) error_message is the formatted error string """ error_str = str(error) is_retryable = False log_msg = "" # Check for Rate Limit (429) if "429" in error_str or "ResourceExhausted" in error_str: is_retryable = True log_msg = f"Rate limit hit for job {job.job_id}" # Check for Auth/Billing errors (401, 403, API key not found, API key not valid, FAILED_PRECONDITION) elif "401" in error_str or "403" in error_str or "Unauthenticated" in error_str or "PermissionDenied" in error_str or "API key not found" in error_str or "API key not valid" in error_str or "FAILED_PRECONDITION" in error_str: is_retryable = True log_msg = f"Auth/Billing error for job {job.job_id}: {error_str}. Rescheduling to try different key." # Check for Server errors (500, 503, 504) elif "500" in error_str or "503" in error_str or "504" in error_str or "INTERNAL" in error_str or "UNAVAILABLE" in error_str or "DEADLINE_EXCEEDED" in error_str: is_retryable = True log_msg = f"Server error for job {job.job_id}: {error_str}" # Try to parse JSON error details if present try: import json import re # Look for JSON-like structure in error string json_match = re.search(r"(\{.*\})", error_str) if json_match: job.api_response = json.loads(json_match.group(1)) else: job.api_response = {"error": error_str} except Exception: job.api_response = {"error": error_str} if is_retryable: logger.warning(f"{log_msg}. Rescheduling.") job.retry_count += 1 config = WorkerConfig.from_env() # Use a longer delay for these errors (e.g., 30s) interval = 30 job.next_process_at = datetime.utcnow() + timedelta(seconds=interval) if reset_to_queued: job.status = "queued" return False, f"Retryable error: {error_str}" else: logger.error(f"Error processing job {job.job_id}: {error}") job.status = "failed" job.error_message = str(error) job.completed_at = datetime.utcnow() return False, str(error) async def process(self, job: GeminiJob, session: AsyncSession) -> GeminiJob: """Start processing a new job with round-robin API key.""" key_index, service = await self._get_service_with_key(session) input_data = job.input_data or {} success = False error_msg = None try: if job.job_type == "video": job = await self._start_video(job, session, service, input_data) success = True elif job.job_type == "image": job = await self._process_image(job, service, input_data) success = True elif job.job_type == "text": job = await self._process_text(job, service, input_data) success = True elif job.job_type == "analyze": job = await self._process_analyze(job, service, input_data) success = True elif job.job_type == "animation_prompt": job = await self._process_animation_prompt(job, service, input_data) success = True else: job.status = "failed" job.error_message = f"Unknown job type: {job.job_type}" job.completed_at = datetime.utcnow() error_msg = job.error_message except Exception as e: # Use helper for error handling # reset_to_queued=True because if we fail to start, we want to try starting again from scratch success, error_msg = self._handle_error(job, e, reset_to_queued=True) # Record usage await self._record_usage(session, key_index, success, error_msg) return job async def check_status(self, job: GeminiJob, session: AsyncSession) -> GeminiJob: """Check status of an in-progress job (video generation).""" if job.job_type != "video" or not job.third_party_id: job.status = "failed" job.error_message = "Invalid job state for status check" job.completed_at = datetime.utcnow() return job # Use round-robin key for status check key_index, service = await self._get_service_with_key(session) success = False error_msg = None try: status_result = await service.check_video_status(job.third_party_id) # Save raw response job.api_response = status_result if status_result.get("done"): if status_result.get("status") == "completed": video_url = status_result.get("video_url") if video_url: # Store video URL - download will happen on-demand when client requests job.status = "completed" job.output_data = {"video_url": video_url} job.error_message = None # Clear any previous error job.completed_at = datetime.utcnow() success = True # Sync DB on success from services.backup_service import get_backup_service backup_service = get_backup_service() await backup_service.backup_async() else: job.status = "failed" job.error_message = "No video URL returned" job.completed_at = datetime.utcnow() error_msg = job.error_message else: job.status = "failed" job.error_message = status_result.get("error", "Unknown error") job.completed_at = datetime.utcnow() error_msg = job.error_message else: # Not done - reschedule job.retry_count += 1 config = WorkerConfig.from_env() interval = get_interval_for_priority(job.priority, config) job.next_process_at = datetime.utcnow() + timedelta(seconds=interval) success = True # Status check succeeded even if video not ready except Exception as e: # Use helper for error handling # reset_to_queued=False because we want to continue checking status, not restart success, error_msg = self._handle_error(job, e, reset_to_queued=False) # Record usage await self._record_usage(session, key_index, success, error_msg) return job # Record usage await self._record_usage(session, key_index, success, error_msg) return job async def _start_video(self, job: GeminiJob, session: AsyncSession, service: GeminiService, input_data: dict) -> GeminiJob: """Start async video generation.""" prompt = input_data.get("prompt", "") # If prompt is missing, generate one using the animation template if not prompt: try: import os template_path = os.path.join(os.path.dirname(os.path.dirname(__file__)), "prompt", "animation.md") if os.path.exists(template_path): with open(template_path, "r") as f: template_prompt = f.read().strip() logger.info(f"Generating auto-prompt for job {job.job_id} using template") prompt = await service.generate_animation_prompt( base64_image=input_data.get("base64_image", ""), mime_type=input_data.get("mime_type", "image/jpeg"), custom_prompt=template_prompt ) logger.info(f"Generated prompt for job {job.job_id}: {prompt}") # Update input data with generated prompt for reference # Create a new dictionary to ensure SQLAlchemy detects the change new_input_data = dict(input_data) new_input_data["prompt"] = prompt job.input_data = new_input_data # We need to commit this change to DB so it persists # But session commit happens outside this method usually? # Actually process() calls this, and process() returns job, # but doesn't explicitly commit job changes until later? # The worker loop commits after process() returns. else: logger.warning(f"Animation prompt template not found at {template_path}") except Exception as e: logger.error(f"Failed to generate auto-prompt: {e}") # Fallback to empty prompt or error? # Let's let it proceed with empty prompt which might fail at API level or use API default result = await service.start_video_generation( base64_image=input_data.get("base64_image", ""), mime_type=input_data.get("mime_type", "image/jpeg"), prompt=prompt, aspect_ratio=input_data.get("aspect_ratio", "16:9"), resolution=input_data.get("resolution", "720p"), number_of_videos=input_data.get("number_of_videos", 1) ) job.third_party_id = result.get("gemini_operation_name") job.api_response = result # Schedule first status check config = WorkerConfig.from_env() interval = get_interval_for_priority(job.priority, config) job.next_process_at = datetime.utcnow() + timedelta(seconds=interval) return job async def _process_image(self, job: GeminiJob, service: GeminiService, input_data: dict) -> GeminiJob: """Process image edit (synchronous).""" result = await service.edit_image( base64_image=input_data.get("base64_image", ""), mime_type=input_data.get("mime_type", "image/jpeg"), prompt=input_data.get("prompt", "") ) job.status = "completed" job.output_data = {"image": result} # Don't save full base64 image to api_response job.api_response = {"status": "success", "type": "image_edit"} job.completed_at = datetime.utcnow() # Sync DB on success from services.backup_service import get_backup_service backup_service = get_backup_service() await backup_service.backup_async() return job async def _process_text(self, job: GeminiJob, service: GeminiService, input_data: dict) -> GeminiJob: """Process text generation (synchronous).""" result = await service.generate_text( prompt=input_data.get("prompt", ""), model=input_data.get("model") ) job.status = "completed" job.output_data = {"text": result} job.api_response = {"result": result} job.completed_at = datetime.utcnow() # Sync DB on success from services.backup_service import get_backup_service backup_service = get_backup_service() await backup_service.backup_async() return job async def _process_analyze(self, job: GeminiJob, service: GeminiService, input_data: dict) -> GeminiJob: """Process image analysis (synchronous).""" result = await service.analyze_image( base64_image=input_data.get("base64_image", ""), mime_type=input_data.get("mime_type", "image/jpeg"), prompt=input_data.get("prompt", "") ) job.status = "completed" job.output_data = {"analysis": result} job.api_response = {"result": result} job.completed_at = datetime.utcnow() # Sync DB on success from services.backup_service import get_backup_service backup_service = get_backup_service() await backup_service.backup_async() return job async def _process_animation_prompt(self, job: GeminiJob, service: GeminiService, input_data: dict) -> GeminiJob: """Process animation prompt generation (synchronous).""" result = await service.generate_animation_prompt( base64_image=input_data.get("base64_image", ""), mime_type=input_data.get("mime_type", "image/jpeg"), custom_prompt=input_data.get("custom_prompt") ) job.status = "completed" job.output_data = {"prompt": result} job.api_response = {"result": result} job.completed_at = datetime.utcnow() # Sync DB on success from services.backup_service import get_backup_service backup_service = get_backup_service() await backup_service.backup_async() return job # Singleton pool instance _pool: Optional[PriorityWorkerPool] = None def get_pool() -> PriorityWorkerPool: """Get the global Gemini worker pool instance.""" global _pool if _pool is None: _pool = PriorityWorkerPool( database_url=DATABASE_URL, job_model=GeminiJob, job_processor=GeminiJobProcessor(), config=WorkerConfig.from_env() ) return _pool async def start_worker(): """Start the Gemini job worker pool.""" pool = get_pool() await pool.start() async def stop_worker(): """Stop the Gemini job worker pool.""" pool = get_pool() await pool.stop()