| import httpx |
| import uuid |
| import asyncio |
| from typing import Optional |
| import config |
|
|
|
|
| class PixVerseService: |
| def __init__(self): |
| self.base_url = config.PIXVERSE_BASE_URL |
| self.api_key = config.PIXVERSE_API_KEY |
| self.default_timeout = 120.0 |
| self.max_retries = 3 |
| self.retry_delay = 3 |
|
|
| def _get_headers(self) -> dict: |
| return { |
| "API-KEY": self.api_key, |
| "Ai-trace-id": str(uuid.uuid4()) |
| } |
|
|
| def _is_html_response(self, response: httpx.Response) -> bool: |
| """Check if response is HTML (error page) instead of JSON""" |
| content_type = response.headers.get("content-type", "") |
| if "text/html" in content_type: |
| return True |
| |
| try: |
| text = response.text |
| if text.strip().startswith("<!DOCTYPE") or text.strip().startswith("<html"): |
| return True |
| except: |
| pass |
| return False |
|
|
| async def _request_with_retry( |
| self, |
| method: str, |
| url: str, |
| retries: int = None, |
| **kwargs |
| ) -> httpx.Response: |
| """ |
| Make HTTP request with automatic retry on failure or HTML error response. |
| |
| Args: |
| method: HTTP method (GET, POST) |
| url: Full URL to request |
| retries: Number of retries (default: self.max_retries) |
| **kwargs: Additional arguments for httpx request |
| |
| Returns: |
| httpx.Response object |
| |
| Raises: |
| Exception if all retries exhausted |
| """ |
| if retries is None: |
| retries = self.max_retries |
| |
| last_error = None |
| |
| for attempt in range(retries + 1): |
| try: |
| async with httpx.AsyncClient(timeout=self.default_timeout) as client: |
| if method.upper() == "GET": |
| response = await client.get(url, **kwargs) |
| elif method.upper() == "POST": |
| response = await client.post(url, **kwargs) |
| else: |
| raise ValueError(f"Unsupported HTTP method: {method}") |
| |
| |
| if self._is_html_response(response): |
| error_msg = f"Received HTML error response from upstream (attempt {attempt + 1}/{retries + 1})" |
| print(f"[PixVerse] {error_msg}") |
| |
| if attempt < retries: |
| print(f"[PixVerse] Retrying in {self.retry_delay} seconds...") |
| await asyncio.sleep(self.retry_delay) |
| continue |
| else: |
| raise Exception("Upstream service returned HTML error. Please try again later.") |
| |
| |
| response.raise_for_status() |
| return response |
| |
| except httpx.TimeoutException as e: |
| last_error = e |
| error_msg = f"Request timeout (attempt {attempt + 1}/{retries + 1})" |
| print(f"[PixVerse] {error_msg}") |
| |
| if attempt < retries: |
| print(f"[PixVerse] Retrying in {self.retry_delay} seconds...") |
| await asyncio.sleep(self.retry_delay) |
| continue |
| |
| except httpx.HTTPStatusError as e: |
| last_error = e |
| |
| if 400 <= e.response.status_code < 500 and e.response.status_code != 429: |
| raise |
| |
| error_msg = f"HTTP error {e.response.status_code} (attempt {attempt + 1}/{retries + 1})" |
| print(f"[PixVerse] {error_msg}") |
| |
| if attempt < retries: |
| print(f"[PixVerse] Retrying in {self.retry_delay} seconds...") |
| await asyncio.sleep(self.retry_delay) |
| continue |
| |
| except Exception as e: |
| last_error = e |
| error_msg = f"Request failed: {str(e)[:100]} (attempt {attempt + 1}/{retries + 1})" |
| print(f"[PixVerse] {error_msg}") |
| |
| if attempt < retries: |
| print(f"[PixVerse] Retrying in {self.retry_delay} seconds...") |
| await asyncio.sleep(self.retry_delay) |
| continue |
| |
| |
| raise Exception(f"Request failed after {retries + 1} attempts: {str(last_error)}") |
|
|
| async def upload_image(self, image_content: bytes, content_type: str) -> dict: |
| """Upload image to PixVerse with retry logic""" |
| headers = self._get_headers() |
| |
| extension = content_type.split('/')[-1] |
| if extension == 'jpeg': |
| extension = 'jpg' |
| filename = f"image.{extension}" |
| |
| files = {"image": (filename, image_content, content_type)} |
| |
| response = await self._request_with_retry( |
| method="POST", |
| url=f"{self.base_url}/image/upload", |
| headers=headers, |
| files=files |
| ) |
| return response.json() |
|
|
| async def generate_video( |
| self, |
| img_id: int, |
| prompt: str, |
| duration: int = 5, |
| model: str = "v4.5", |
| motion_mode: str = "normal", |
| quality: str = "540p", |
| negative_prompt: Optional[str] = None, |
| seed: Optional[int] = None |
| ) -> dict: |
| """Generate video from image with retry logic""" |
| headers = self._get_headers() |
| headers["Content-Type"] = "application/json" |
| |
| payload = { |
| "duration": duration, |
| "img_id": img_id, |
| "model": model, |
| "motion_mode": motion_mode, |
| "prompt": prompt, |
| "quality": quality |
| } |
| |
| if negative_prompt: |
| payload["negative_prompt"] = negative_prompt |
| if seed is not None: |
| payload["seed"] = seed |
| |
| response = await self._request_with_retry( |
| method="POST", |
| url=f"{self.base_url}/video/img/generate", |
| headers=headers, |
| json=payload |
| ) |
| return response.json() |
|
|
| async def get_video_status(self, video_id: int) -> dict: |
| """Get video generation status with retry logic""" |
| headers = self._get_headers() |
| |
| response = await self._request_with_retry( |
| method="GET", |
| url=f"{self.base_url}/video/result/{video_id}", |
| headers=headers |
| ) |
| return response.json() |
|
|
| async def wait_for_video(self, video_id: int, max_attempts: int = 60, delay: int = 5) -> dict: |
| """Wait for video generation to complete""" |
| for attempt in range(max_attempts): |
| result = await self.get_video_status(video_id) |
| |
| if result.get("ErrCode") != 0: |
| raise Exception(f"PixVerse API error: {result.get('ErrMsg')}") |
| |
| status = result.get("Resp", {}).get("status") |
| |
| |
| if status == 1: |
| return result |
| elif status == 7: |
| raise Exception("Content moderation failed. Please modify your prompt and try again.") |
| elif status == 8: |
| raise Exception("Video generation failed. Please try again.") |
| elif status == 5: |
| await asyncio.sleep(delay) |
| else: |
| await asyncio.sleep(delay) |
| |
| raise Exception("Video generation timed out. Please try again later.") |
|
|
| async def download_video(self, video_url: str) -> bytes: |
| """Download generated video with retry logic""" |
| response = await self._request_with_retry( |
| method="GET", |
| url=video_url, |
| retries=3 |
| ) |
| return response.content |
|
|
|
|
| pixverse_service = PixVerseService() |
|
|