import httpx import uuid import asyncio from typing import Optional import config class PixVerseService: def __init__(self): self.base_url = config.PIXVERSE_BASE_URL self.api_key = config.PIXVERSE_API_KEY self.default_timeout = 120.0 # 2 minutes timeout self.max_retries = 3 self.retry_delay = 3 # seconds between retries def _get_headers(self) -> dict: return { "API-KEY": self.api_key, "Ai-trace-id": str(uuid.uuid4()) } def _is_html_response(self, response: httpx.Response) -> bool: """Check if response is HTML (error page) instead of JSON""" content_type = response.headers.get("content-type", "") if "text/html" in content_type: return True # Also check response text for HTML doctype try: text = response.text if text.strip().startswith(" httpx.Response: """ Make HTTP request with automatic retry on failure or HTML error response. Args: method: HTTP method (GET, POST) url: Full URL to request retries: Number of retries (default: self.max_retries) **kwargs: Additional arguments for httpx request Returns: httpx.Response object Raises: Exception if all retries exhausted """ if retries is None: retries = self.max_retries last_error = None for attempt in range(retries + 1): try: async with httpx.AsyncClient(timeout=self.default_timeout) as client: if method.upper() == "GET": response = await client.get(url, **kwargs) elif method.upper() == "POST": response = await client.post(url, **kwargs) else: raise ValueError(f"Unsupported HTTP method: {method}") # Check for HTML error response (Hugging Face/upstream error) if self._is_html_response(response): error_msg = f"Received HTML error response from upstream (attempt {attempt + 1}/{retries + 1})" print(f"[PixVerse] {error_msg}") if attempt < retries: print(f"[PixVerse] Retrying in {self.retry_delay} seconds...") await asyncio.sleep(self.retry_delay) continue else: raise Exception("Upstream service returned HTML error. Please try again later.") # Raise for HTTP errors response.raise_for_status() return response except httpx.TimeoutException as e: last_error = e error_msg = f"Request timeout (attempt {attempt + 1}/{retries + 1})" print(f"[PixVerse] {error_msg}") if attempt < retries: print(f"[PixVerse] Retrying in {self.retry_delay} seconds...") await asyncio.sleep(self.retry_delay) continue except httpx.HTTPStatusError as e: last_error = e # Don't retry on 4xx client errors (except 429 rate limit) if 400 <= e.response.status_code < 500 and e.response.status_code != 429: raise error_msg = f"HTTP error {e.response.status_code} (attempt {attempt + 1}/{retries + 1})" print(f"[PixVerse] {error_msg}") if attempt < retries: print(f"[PixVerse] Retrying in {self.retry_delay} seconds...") await asyncio.sleep(self.retry_delay) continue except Exception as e: last_error = e error_msg = f"Request failed: {str(e)[:100]} (attempt {attempt + 1}/{retries + 1})" print(f"[PixVerse] {error_msg}") if attempt < retries: print(f"[PixVerse] Retrying in {self.retry_delay} seconds...") await asyncio.sleep(self.retry_delay) continue # All retries exhausted raise Exception(f"Request failed after {retries + 1} attempts: {str(last_error)}") async def upload_image(self, image_content: bytes, content_type: str) -> dict: """Upload image to PixVerse with retry logic""" headers = self._get_headers() extension = content_type.split('/')[-1] if extension == 'jpeg': extension = 'jpg' filename = f"image.{extension}" files = {"image": (filename, image_content, content_type)} response = await self._request_with_retry( method="POST", url=f"{self.base_url}/image/upload", headers=headers, files=files ) return response.json() async def generate_video( self, img_id: int, prompt: str, duration: int = 5, model: str = "v4.5", motion_mode: str = "normal", quality: str = "540p", negative_prompt: Optional[str] = None, seed: Optional[int] = None ) -> dict: """Generate video from image with retry logic""" headers = self._get_headers() headers["Content-Type"] = "application/json" payload = { "duration": duration, "img_id": img_id, "model": model, "motion_mode": motion_mode, "prompt": prompt, "quality": quality } if negative_prompt: payload["negative_prompt"] = negative_prompt if seed is not None: payload["seed"] = seed response = await self._request_with_retry( method="POST", url=f"{self.base_url}/video/img/generate", headers=headers, json=payload ) return response.json() async def get_video_status(self, video_id: int) -> dict: """Get video generation status with retry logic""" headers = self._get_headers() response = await self._request_with_retry( method="GET", url=f"{self.base_url}/video/result/{video_id}", headers=headers ) return response.json() async def wait_for_video(self, video_id: int, max_attempts: int = 60, delay: int = 5) -> dict: """Wait for video generation to complete""" for attempt in range(max_attempts): result = await self.get_video_status(video_id) if result.get("ErrCode") != 0: raise Exception(f"PixVerse API error: {result.get('ErrMsg')}") status = result.get("Resp", {}).get("status") # Status codes: 1=success, 5=in progress, 7=moderation failed, 8=generation failed if status == 1: return result elif status == 7: raise Exception("Content moderation failed. Please modify your prompt and try again.") elif status == 8: raise Exception("Video generation failed. Please try again.") elif status == 5: await asyncio.sleep(delay) else: await asyncio.sleep(delay) raise Exception("Video generation timed out. Please try again later.") async def download_video(self, video_url: str) -> bytes: """Download generated video with retry logic""" response = await self._request_with_retry( method="GET", url=video_url, retries=3 # More retries for download ) return response.content pixverse_service = PixVerseService()