File size: 8,448 Bytes
534df99 ade7b32 534df99 ade7b32 534df99 ade7b32 534df99 ade7b32 534df99 ade7b32 534df99 ade7b32 534df99 ade7b32 534df99 ade7b32 534df99 ade7b32 534df99 ade7b32 534df99 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 | import httpx
import uuid
import asyncio
from typing import Optional
import config
class PixVerseService:
def __init__(self):
self.base_url = config.PIXVERSE_BASE_URL
self.api_key = config.PIXVERSE_API_KEY
self.default_timeout = 120.0 # 2 minutes timeout
self.max_retries = 3
self.retry_delay = 3 # seconds between retries
def _get_headers(self) -> dict:
return {
"API-KEY": self.api_key,
"Ai-trace-id": str(uuid.uuid4())
}
def _is_html_response(self, response: httpx.Response) -> bool:
"""Check if response is HTML (error page) instead of JSON"""
content_type = response.headers.get("content-type", "")
if "text/html" in content_type:
return True
# Also check response text for HTML doctype
try:
text = response.text
if text.strip().startswith("<!DOCTYPE") or text.strip().startswith("<html"):
return True
except:
pass
return False
async def _request_with_retry(
self,
method: str,
url: str,
retries: int = None,
**kwargs
) -> httpx.Response:
"""
Make HTTP request with automatic retry on failure or HTML error response.
Args:
method: HTTP method (GET, POST)
url: Full URL to request
retries: Number of retries (default: self.max_retries)
**kwargs: Additional arguments for httpx request
Returns:
httpx.Response object
Raises:
Exception if all retries exhausted
"""
if retries is None:
retries = self.max_retries
last_error = None
for attempt in range(retries + 1):
try:
async with httpx.AsyncClient(timeout=self.default_timeout) as client:
if method.upper() == "GET":
response = await client.get(url, **kwargs)
elif method.upper() == "POST":
response = await client.post(url, **kwargs)
else:
raise ValueError(f"Unsupported HTTP method: {method}")
# Check for HTML error response (Hugging Face/upstream error)
if self._is_html_response(response):
error_msg = f"Received HTML error response from upstream (attempt {attempt + 1}/{retries + 1})"
print(f"[PixVerse] {error_msg}")
if attempt < retries:
print(f"[PixVerse] Retrying in {self.retry_delay} seconds...")
await asyncio.sleep(self.retry_delay)
continue
else:
raise Exception("Upstream service returned HTML error. Please try again later.")
# Raise for HTTP errors
response.raise_for_status()
return response
except httpx.TimeoutException as e:
last_error = e
error_msg = f"Request timeout (attempt {attempt + 1}/{retries + 1})"
print(f"[PixVerse] {error_msg}")
if attempt < retries:
print(f"[PixVerse] Retrying in {self.retry_delay} seconds...")
await asyncio.sleep(self.retry_delay)
continue
except httpx.HTTPStatusError as e:
last_error = e
# Don't retry on 4xx client errors (except 429 rate limit)
if 400 <= e.response.status_code < 500 and e.response.status_code != 429:
raise
error_msg = f"HTTP error {e.response.status_code} (attempt {attempt + 1}/{retries + 1})"
print(f"[PixVerse] {error_msg}")
if attempt < retries:
print(f"[PixVerse] Retrying in {self.retry_delay} seconds...")
await asyncio.sleep(self.retry_delay)
continue
except Exception as e:
last_error = e
error_msg = f"Request failed: {str(e)[:100]} (attempt {attempt + 1}/{retries + 1})"
print(f"[PixVerse] {error_msg}")
if attempt < retries:
print(f"[PixVerse] Retrying in {self.retry_delay} seconds...")
await asyncio.sleep(self.retry_delay)
continue
# All retries exhausted
raise Exception(f"Request failed after {retries + 1} attempts: {str(last_error)}")
async def upload_image(self, image_content: bytes, content_type: str) -> dict:
"""Upload image to PixVerse with retry logic"""
headers = self._get_headers()
extension = content_type.split('/')[-1]
if extension == 'jpeg':
extension = 'jpg'
filename = f"image.{extension}"
files = {"image": (filename, image_content, content_type)}
response = await self._request_with_retry(
method="POST",
url=f"{self.base_url}/image/upload",
headers=headers,
files=files
)
return response.json()
async def generate_video(
self,
img_id: int,
prompt: str,
duration: int = 5,
model: str = "v4.5",
motion_mode: str = "normal",
quality: str = "540p",
negative_prompt: Optional[str] = None,
seed: Optional[int] = None
) -> dict:
"""Generate video from image with retry logic"""
headers = self._get_headers()
headers["Content-Type"] = "application/json"
payload = {
"duration": duration,
"img_id": img_id,
"model": model,
"motion_mode": motion_mode,
"prompt": prompt,
"quality": quality
}
if negative_prompt:
payload["negative_prompt"] = negative_prompt
if seed is not None:
payload["seed"] = seed
response = await self._request_with_retry(
method="POST",
url=f"{self.base_url}/video/img/generate",
headers=headers,
json=payload
)
return response.json()
async def get_video_status(self, video_id: int) -> dict:
"""Get video generation status with retry logic"""
headers = self._get_headers()
response = await self._request_with_retry(
method="GET",
url=f"{self.base_url}/video/result/{video_id}",
headers=headers
)
return response.json()
async def wait_for_video(self, video_id: int, max_attempts: int = 60, delay: int = 5) -> dict:
"""Wait for video generation to complete"""
for attempt in range(max_attempts):
result = await self.get_video_status(video_id)
if result.get("ErrCode") != 0:
raise Exception(f"PixVerse API error: {result.get('ErrMsg')}")
status = result.get("Resp", {}).get("status")
# Status codes: 1=success, 5=in progress, 7=moderation failed, 8=generation failed
if status == 1:
return result
elif status == 7:
raise Exception("Content moderation failed. Please modify your prompt and try again.")
elif status == 8:
raise Exception("Video generation failed. Please try again.")
elif status == 5:
await asyncio.sleep(delay)
else:
await asyncio.sleep(delay)
raise Exception("Video generation timed out. Please try again later.")
async def download_video(self, video_url: str) -> bytes:
"""Download generated video with retry logic"""
response = await self._request_with_retry(
method="GET",
url=video_url,
retries=3 # More retries for download
)
return response.content
pixverse_service = PixVerseService()
|