Spaces:
Sleeping
Sleeping
| """ | |
| Video Generation API endpoints | |
| Handles KIE API integration with SSE support for real-time updates | |
| """ | |
| from fastapi import APIRouter, HTTPException, Request | |
| from fastapi.responses import StreamingResponse, JSONResponse, Response | |
| from pydantic import BaseModel | |
| from typing import List, Optional, Dict, Any | |
| import httpx | |
| import asyncio | |
| import json | |
| import os | |
| from datetime import datetime | |
| from utils.image_processor import compress_and_store_image | |
| from utils.storage import video_results, sse_clients, cleanup_old_results | |
| import asyncio | |
| router = APIRouter() | |
| KIE_API_BASE = "https://api.kie.ai" | |
| # Request/Response Models | |
| class VideoGenerationRequest(BaseModel): | |
| prompt: Any # Can be string (legacy) or dict/object (structured JSON) | |
| imageUrls: Optional[List[str]] = [] | |
| model: Optional[str] = "veo3_fast" | |
| aspectRatio: Optional[str] = "9:16" | |
| generationType: Optional[str] = None | |
| seeds: Optional[int] = None # Seed for consistent lighting/style (e.g., 12005) | |
| voiceType: Optional[str] = None # Voice type for audio generation (e.g., "Deep", "Warm", "Crisp", "None") | |
| class VideoExtendRequest(BaseModel): | |
| taskId: str | |
| prompt: Any # Can be string or structured JSON | |
| seeds: Optional[int] = None | |
| watermark: Optional[str] = None | |
| voiceType: Optional[str] = None # Voice type for audio generation | |
| class VideoGenerationResponse(BaseModel): | |
| taskId: str | |
| status: str | |
| class CallbackData(BaseModel): | |
| code: int | |
| msg: str | |
| data: Optional[Dict[str, Any]] = None | |
| # Helper functions | |
| def get_kie_api_key(): | |
| """Get KIE API key from environment""" | |
| api_key = os.getenv('KIE_API_KEY') | |
| if not api_key: | |
| raise HTTPException( | |
| status_code=500, | |
| detail="KIE_API_KEY not configured on server." | |
| ) | |
| return api_key | |
| async def send_sse_event(task_id: str, data: dict): | |
| """Send Server-Sent Event to connected client""" | |
| if task_id in sse_clients: | |
| queue = sse_clients[task_id] | |
| await queue.put(data) | |
| # Endpoints | |
| async def generate_video(request: VideoGenerationRequest, req: Request): | |
| """ | |
| Generate video using KIE Veo 3.1 API | |
| Supports text-to-video and image-to-video generation | |
| """ | |
| try: | |
| api_key = get_kie_api_key() | |
| # Build public URL for callback | |
| public_url = os.getenv('VITE_API_BASE_URL', f"http://localhost:{os.getenv('SERVER_PORT', 4000)}") | |
| callback_url = f"{public_url}/api/veo/callback" | |
| # Process image URLs | |
| public_image_urls = [] | |
| if request.imageUrls: | |
| print(f"π· Processing {len(request.imageUrls)} images...") | |
| for image_url in request.imageUrls: | |
| # If it's already a public URL, use it as-is | |
| if image_url.startswith(('http://', 'https://')): | |
| print(f" Using external URL: {image_url}") | |
| public_image_urls.append(image_url) | |
| else: | |
| # Compress and host the data URL | |
| hosted_url = await compress_and_store_image(image_url, public_url) | |
| print(f" Hosted image: {hosted_url}") | |
| public_image_urls.append(hosted_url) | |
| # Determine generation type | |
| generation_type = request.generationType | |
| if not generation_type: | |
| generation_type = "FIRST_AND_LAST_FRAMES_2_VIDEO" if public_image_urls else "TEXT_2_VIDEO" | |
| # Log prompt format and seed | |
| if isinstance(request.prompt, dict): | |
| print(f"π Sending structured JSON prompt to Veo 3.1") | |
| else: | |
| print(f"π Sending text prompt to Veo 3.1") | |
| if request.seeds: | |
| print(f"π² Using seed: {request.seeds} (warm, flattering lighting)") | |
| # Call KIE API | |
| async with httpx.AsyncClient(timeout=30.0) as client: | |
| payload = { | |
| "prompt": request.prompt, # Can be string or structured JSON object | |
| "imageUrls": public_image_urls, | |
| "model": request.model, | |
| "aspectRatio": request.aspectRatio, | |
| "generationType": generation_type, | |
| "enableTranslation": True, | |
| "callBackUrl": callback_url | |
| } | |
| # Add optional seed parameter | |
| if request.seeds is not None: | |
| payload["seeds"] = request.seeds | |
| # Add voice type for audio generation (if not "None") | |
| if request.voiceType and request.voiceType.lower() != "none": | |
| payload["voiceType"] = request.voiceType | |
| print(f"π€ Using voice type: {request.voiceType}") | |
| else: | |
| print(f"π No voice/audio requested (voiceType: {request.voiceType})") | |
| response = await client.post( | |
| f"{KIE_API_BASE}/api/v1/veo/generate", | |
| headers={ | |
| "Authorization": f"Bearer {api_key}", | |
| "Content-Type": "application/json" | |
| }, | |
| json=payload | |
| ) | |
| # Log raw response for debugging | |
| print(f"π‘ KIE API response status: {response.status_code}") | |
| # Check HTTP status first | |
| if response.status_code != 200: | |
| error_text = response.text | |
| content_type = response.headers.get('content-type', '').lower() | |
| # Handle HTML error responses (like 502 Bad Gateway pages) | |
| if 'text/html' in content_type or error_text.strip().startswith('<!DOCTYPE') or error_text.strip().startswith('<html'): | |
| # Extract meaningful error from HTML if possible | |
| error_message = f"KIE API service unavailable (HTTP {response.status_code})" | |
| # Try to extract title or error message from HTML | |
| if '<title>' in error_text: | |
| import re | |
| title_match = re.search(r'<title>(.*?)</title>', error_text, re.IGNORECASE | re.DOTALL) | |
| if title_match: | |
| title = title_match.group(1).strip() | |
| # Extract just the error part (e.g., "502: Bad gateway" from "kie.ai | 502: Bad gateway") | |
| if ':' in title: | |
| error_message = f"KIE API error: {title.split('|')[-1].strip()}" | |
| else: | |
| error_message = f"KIE API error: {title}" | |
| print(f"β KIE API HTTP error: {response.status_code} - {error_message}") | |
| raise HTTPException( | |
| status_code=502, # Bad Gateway - the KIE API is down/unavailable | |
| detail=error_message | |
| ) | |
| else: | |
| # Non-HTML error response, try to extract JSON error if possible | |
| try: | |
| error_data = response.json() | |
| error_message = error_data.get('msg') or error_data.get('message') or error_data.get('detail') or f"KIE API error (HTTP {response.status_code})" | |
| except (json.JSONDecodeError, ValueError): | |
| # Not JSON, use text (truncated) | |
| error_message = error_text[:200] if len(error_text) > 200 else error_text | |
| print(f"β KIE API HTTP error: {response.status_code} - {error_message[:200]}") | |
| raise HTTPException( | |
| status_code=response.status_code, | |
| detail=f"KIE API error: {error_message}" | |
| ) | |
| result = response.json() | |
| print(f"π‘ KIE API result code: {result.get('code')}, msg: {result.get('msg')}") | |
| if result.get('code') != 200: | |
| raise HTTPException( | |
| status_code=result.get('code', 500), | |
| detail=result.get('msg', 'KIE API request failed') | |
| ) | |
| task_id = result['data']['taskId'] | |
| print(f"β Video generation started: {task_id}") | |
| return VideoGenerationResponse( | |
| taskId=task_id, | |
| status="processing" | |
| ) | |
| except HTTPException: | |
| raise | |
| except httpx.HTTPStatusError as e: | |
| error_text = e.response.text | |
| content_type = e.response.headers.get('content-type', '').lower() | |
| # Handle HTML error responses | |
| if 'text/html' in content_type or error_text.strip().startswith('<!DOCTYPE') or error_text.strip().startswith('<html'): | |
| error_msg = f"KIE API service unavailable (HTTP {e.response.status_code})" | |
| # Try to extract meaningful error from HTML | |
| if '<title>' in error_text: | |
| import re | |
| title_match = re.search(r'<title>(.*?)</title>', error_text, re.IGNORECASE | re.DOTALL) | |
| if title_match: | |
| title = title_match.group(1).strip() | |
| if ':' in title: | |
| error_msg = f"KIE API error: {title.split('|')[-1].strip()}" | |
| else: | |
| # Try to extract JSON error if possible | |
| try: | |
| error_data = e.response.json() | |
| error_msg = error_data.get('msg') or error_data.get('message') or error_data.get('detail') or f"KIE API error (HTTP {e.response.status_code})" | |
| except (json.JSONDecodeError, ValueError): | |
| error_msg = error_text[:200] if len(error_text) > 200 else error_text | |
| print(f"β {error_msg}") | |
| raise HTTPException(status_code=502 if 'text/html' in content_type else e.response.status_code, detail=error_msg) | |
| except httpx.RequestError as e: | |
| error_msg = f"KIE API request error: {type(e).__name__} - {str(e)}" | |
| print(f"β {error_msg}") | |
| raise HTTPException(status_code=502, detail=error_msg) | |
| except json.JSONDecodeError as e: | |
| error_msg = f"Invalid JSON response from KIE API. The service may be unavailable." | |
| print(f"β JSON decode error: {str(e)}") | |
| raise HTTPException(status_code=502, detail=error_msg) | |
| except Exception as e: | |
| import traceback | |
| error_msg = f"{type(e).__name__}: {str(e)}" | |
| print(f"β Video generation error: {error_msg}") | |
| traceback.print_exc() | |
| raise HTTPException( | |
| status_code=500, | |
| detail=f"Video generation request failed: {error_msg}" | |
| ) | |
| async def veo_callback(callback_data: CallbackData): | |
| """ | |
| Callback endpoint for KIE API | |
| Receives video generation status updates | |
| """ | |
| try: | |
| data = callback_data.data or {} | |
| task_id = data.get('taskId') | |
| info = data.get('info', {}) | |
| fallback_flag = data.get('fallbackFlag') | |
| print(f"π₯ Callback received for task {task_id}: code={callback_data.code}, msg={callback_data.msg}") | |
| # Store result | |
| video_results[task_id] = { | |
| 'code': callback_data.code, | |
| 'msg': callback_data.msg, | |
| 'taskId': task_id, | |
| 'info': info, | |
| 'fallbackFlag': fallback_flag, | |
| 'timestamp': datetime.now().timestamp() | |
| } | |
| # Send SSE update to client | |
| if callback_data.code == 200 and info: | |
| await send_sse_event(task_id, { | |
| 'status': 'succeeded', | |
| 'url': info.get('resultUrls', [None])[0], | |
| 'resultUrls': info.get('resultUrls', []), | |
| 'originUrls': info.get('originUrls', []), | |
| 'resolution': info.get('resolution'), | |
| 'fallbackFlag': fallback_flag | |
| }) | |
| else: | |
| await send_sse_event(task_id, { | |
| 'status': 'failed', | |
| 'error': callback_data.msg, | |
| 'code': callback_data.code | |
| }) | |
| # Clean up old results | |
| cleanup_old_results() | |
| return JSONResponse( | |
| status_code=200, | |
| content={'code': 200, 'msg': 'success'} | |
| ) | |
| except Exception as e: | |
| print(f"β Callback processing error: {str(e)}") | |
| raise HTTPException( | |
| status_code=500, | |
| detail="Failed to process callback" | |
| ) | |
| async def extend_video(request: VideoExtendRequest): | |
| """ | |
| Extend an existing video using KIE Veo 3.1 extend API | |
| Takes an existing taskId and extends it with new prompt | |
| """ | |
| try: | |
| api_key = get_kie_api_key() | |
| # Build public URL for callback | |
| public_url = os.getenv('VITE_API_BASE_URL', f"http://localhost:{os.getenv('SERVER_PORT', 4000)}") | |
| callback_url = f"{public_url}/api/veo/callback" | |
| print(f"π¬ Extending video from task: {request.taskId}") | |
| # Log prompt format and seed | |
| if isinstance(request.prompt, dict): | |
| print(f"π Extending with structured JSON prompt") | |
| else: | |
| print(f"π Extending with text prompt") | |
| if request.seeds: | |
| print(f"π² Using seed: {request.seeds} (consistent lighting)") | |
| # Call KIE extend API | |
| async with httpx.AsyncClient(timeout=30.0) as client: | |
| payload = { | |
| "taskId": request.taskId, | |
| "prompt": request.prompt, | |
| "callBackUrl": callback_url | |
| } | |
| # Add optional parameters | |
| if request.seeds is not None: | |
| payload["seeds"] = request.seeds | |
| if request.watermark: | |
| payload["watermark"] = request.watermark | |
| if request.voiceType and request.voiceType.lower() != "none": | |
| payload["voiceType"] = request.voiceType | |
| print(f"π€ Using voice type: {request.voiceType}") | |
| else: | |
| print(f"π No voice/audio requested (voiceType: {request.voiceType})") | |
| response = await client.post( | |
| f"{KIE_API_BASE}/api/v1/veo/extend", | |
| headers={ | |
| "Authorization": f"Bearer {api_key}", | |
| "Content-Type": "application/json" | |
| }, | |
| json=payload | |
| ) | |
| # Check for HTML error responses | |
| if response.status_code != 200: | |
| error_text = response.text | |
| content_type = response.headers.get('content-type', '').lower() | |
| if 'text/html' in content_type or error_text.strip().startswith('<!DOCTYPE') or error_text.strip().startswith('<html'): | |
| error_message = f"KIE API service unavailable (HTTP {response.status_code})" | |
| if '<title>' in error_text: | |
| import re | |
| title_match = re.search(r'<title>(.*?)</title>', error_text, re.IGNORECASE | re.DOTALL) | |
| if title_match: | |
| title = title_match.group(1).strip() | |
| if ':' in title: | |
| error_message = f"KIE API error: {title.split('|')[-1].strip()}" | |
| raise HTTPException(status_code=502, detail=error_message) | |
| result = response.json() | |
| if result.get('code') != 200: | |
| raise HTTPException( | |
| status_code=result.get('code', 500), | |
| detail=result.get('msg', 'KIE extend API request failed') | |
| ) | |
| new_task_id = result['data']['taskId'] | |
| print(f"β Video extension started: {new_task_id}") | |
| return VideoGenerationResponse( | |
| taskId=new_task_id, | |
| status="processing" | |
| ) | |
| except HTTPException: | |
| raise | |
| except httpx.HTTPStatusError as e: | |
| error_text = e.response.text | |
| content_type = e.response.headers.get('content-type', '').lower() | |
| if 'text/html' in content_type or error_text.strip().startswith('<!DOCTYPE') or error_text.strip().startswith('<html'): | |
| error_msg = f"KIE API service unavailable (HTTP {e.response.status_code})" | |
| if '<title>' in error_text: | |
| import re | |
| title_match = re.search(r'<title>(.*?)</title>', error_text, re.IGNORECASE | re.DOTALL) | |
| if title_match: | |
| title = title_match.group(1).strip() | |
| if ':' in title: | |
| error_msg = f"KIE API error: {title.split('|')[-1].strip()}" | |
| else: | |
| try: | |
| error_data = e.response.json() | |
| error_msg = error_data.get('msg') or error_data.get('message') or error_data.get('detail') or f"KIE API error (HTTP {e.response.status_code})" | |
| except (json.JSONDecodeError, ValueError): | |
| error_msg = error_text[:200] if len(error_text) > 200 else error_text | |
| print(f"β {error_msg}") | |
| raise HTTPException(status_code=502 if 'text/html' in content_type else e.response.status_code, detail=error_msg) | |
| except httpx.RequestError as e: | |
| error_msg = f"KIE API request error: {type(e).__name__} - {str(e)}" | |
| print(f"β {error_msg}") | |
| raise HTTPException(status_code=502, detail=error_msg) | |
| except json.JSONDecodeError as e: | |
| error_msg = f"Invalid JSON response from KIE API. The service may be unavailable." | |
| print(f"β JSON decode error: {str(e)}") | |
| raise HTTPException(status_code=502, detail=error_msg) | |
| except Exception as e: | |
| import traceback | |
| error_msg = f"{type(e).__name__}: {str(e)}" | |
| print(f"β Video extension error: {error_msg}") | |
| traceback.print_exc() | |
| raise HTTPException( | |
| status_code=500, | |
| detail=f"Video extension error: {error_msg}" | |
| ) | |
| async def sse_events(task_id: str): | |
| """ | |
| Server-Sent Events endpoint for real-time updates | |
| """ | |
| async def event_generator(): | |
| # Create queue for this client | |
| queue = asyncio.Queue() | |
| sse_clients[task_id] = queue | |
| print(f"π SSE client connected for task {task_id}") | |
| try: | |
| # Check if result already exists | |
| if task_id in video_results: | |
| result = video_results[task_id] | |
| if result['code'] == 200 and result.get('info'): | |
| info = result['info'] | |
| event_data = { | |
| 'status': 'succeeded', | |
| 'url': info.get('resultUrls', [None])[0], | |
| 'resultUrls': info.get('resultUrls', []), | |
| 'originUrls': info.get('originUrls', []), | |
| 'resolution': info.get('resolution'), | |
| 'fallbackFlag': result.get('fallbackFlag') | |
| } | |
| else: | |
| event_data = { | |
| 'status': 'failed', | |
| 'error': result['msg'], | |
| 'code': result['code'] | |
| } | |
| yield f"data: {json.dumps(event_data)}\n\n" | |
| # Stream events | |
| while True: | |
| data = await queue.get() | |
| yield f"data: {json.dumps(data)}\n\n" | |
| except asyncio.CancelledError: | |
| print(f"π SSE client disconnected for task {task_id}") | |
| finally: | |
| if task_id in sse_clients: | |
| del sse_clients[task_id] | |
| return StreamingResponse( | |
| event_generator(), | |
| media_type="text/event-stream", | |
| headers={ | |
| "Cache-Control": "no-cache", | |
| "Connection": "keep-alive" | |
| } | |
| ) | |
| async def get_video_status(task_id: str): | |
| """ | |
| Get video generation status from KIE API | |
| """ | |
| try: | |
| api_key = get_kie_api_key() | |
| async with httpx.AsyncClient(timeout=30.0) as client: | |
| response = await client.get( | |
| f"{KIE_API_BASE}/api/v1/veo/video/{task_id}", | |
| headers={ | |
| "Authorization": f"Bearer {api_key}" | |
| } | |
| ) | |
| # Check for HTML error responses | |
| if response.status_code != 200: | |
| error_text = response.text | |
| content_type = response.headers.get('content-type', '').lower() | |
| if 'text/html' in content_type or error_text.strip().startswith('<!DOCTYPE') or error_text.strip().startswith('<html'): | |
| error_message = f"KIE API service unavailable (HTTP {response.status_code})" | |
| if '<title>' in error_text: | |
| import re | |
| title_match = re.search(r'<title>(.*?)</title>', error_text, re.IGNORECASE | re.DOTALL) | |
| if title_match: | |
| title = title_match.group(1).strip() | |
| if ':' in title: | |
| error_message = f"KIE API error: {title.split('|')[-1].strip()}" | |
| raise HTTPException(status_code=502, detail=error_message) | |
| result = response.json() | |
| if result.get('code') != 200: | |
| raise HTTPException( | |
| status_code=result.get('code', 500), | |
| detail=result.get('msg', 'Failed to get video status') | |
| ) | |
| # Transform response | |
| status = result['data'].get('status') | |
| video_url = result['data'].get('videoUrl') | |
| return { | |
| 'status': 'succeeded' if status == 'completed' else 'failed' if status == 'failed' else 'processing', | |
| 'output': video_url if status == 'completed' else None, | |
| 'url': video_url if status == 'completed' else None | |
| } | |
| except HTTPException: | |
| raise | |
| except httpx.HTTPStatusError as e: | |
| error_text = e.response.text | |
| content_type = e.response.headers.get('content-type', '').lower() | |
| if 'text/html' in content_type or error_text.strip().startswith('<!DOCTYPE') or error_text.strip().startswith('<html'): | |
| error_msg = f"KIE API service unavailable (HTTP {e.response.status_code})" | |
| if '<title>' in error_text: | |
| import re | |
| title_match = re.search(r'<title>(.*?)</title>', error_text, re.IGNORECASE | re.DOTALL) | |
| if title_match: | |
| title = title_match.group(1).strip() | |
| if ':' in title: | |
| error_msg = f"KIE API error: {title.split('|')[-1].strip()}" | |
| else: | |
| try: | |
| error_data = e.response.json() | |
| error_msg = error_data.get('msg') or error_data.get('message') or error_data.get('detail') or f"KIE API error (HTTP {e.response.status_code})" | |
| except (json.JSONDecodeError, ValueError): | |
| error_msg = error_text[:200] if len(error_text) > 200 else error_text | |
| print(f"β {error_msg}") | |
| raise HTTPException(status_code=502 if 'text/html' in content_type else e.response.status_code, detail=error_msg) | |
| except httpx.RequestError as e: | |
| error_msg = f"KIE API request error: {type(e).__name__} - {str(e)}" | |
| print(f"β {error_msg}") | |
| raise HTTPException(status_code=502, detail=error_msg) | |
| except json.JSONDecodeError as e: | |
| error_msg = f"Invalid JSON response from KIE API. The service may be unavailable." | |
| print(f"β JSON decode error: {str(e)}") | |
| raise HTTPException(status_code=502, detail=error_msg) | |
| except Exception as e: | |
| import traceback | |
| error_msg = f"{type(e).__name__}: {str(e)}" | |
| print(f"β Status check error: {error_msg}") | |
| traceback.print_exc() | |
| raise HTTPException( | |
| status_code=500, | |
| detail=f"Failed to check video status: {error_msg}" | |
| ) | |
| async def cancel_video_generation(task_id: str): | |
| """ | |
| Cancel an ongoing video generation task | |
| """ | |
| try: | |
| # Send cancellation event to SSE clients | |
| if task_id in sse_clients: | |
| queue = sse_clients[task_id] | |
| await send_sse_event(task_id, { | |
| 'status': 'cancelled', | |
| 'message': 'Video generation cancelled by user' | |
| }) | |
| # Close the SSE connection | |
| del sse_clients[task_id] | |
| print(f"β Cancelled video generation: {task_id}") | |
| # Mark result as cancelled | |
| if task_id in video_results: | |
| video_results[task_id] = { | |
| 'code': 499, # Client Closed Request | |
| 'msg': 'Video generation cancelled by user', | |
| 'taskId': task_id, | |
| 'timestamp': datetime.now().timestamp() | |
| } | |
| return JSONResponse( | |
| status_code=200, | |
| content={'code': 200, 'msg': 'Video generation cancelled', 'taskId': task_id} | |
| ) | |
| except Exception as e: | |
| print(f"β Error cancelling video generation: {str(e)}") | |
| raise HTTPException( | |
| status_code=500, | |
| detail=f"Failed to cancel video generation: {str(e)}" | |
| ) | |
| async def download_video(url: str): | |
| """ | |
| Download video from external URL | |
| Proxies the video stream to avoid CORS issues | |
| """ | |
| if not url: | |
| raise HTTPException(status_code=400, detail="Missing url query parameter") | |
| try: | |
| async with httpx.AsyncClient(timeout=60.0) as client: | |
| response = await client.get(url) | |
| if response.status_code != 200: | |
| raise HTTPException( | |
| status_code=response.status_code, | |
| detail="Failed to download asset" | |
| ) | |
| # Return the video content directly | |
| return Response( | |
| content=response.content, | |
| media_type=response.headers.get('content-type', 'video/mp4'), | |
| headers={ | |
| 'Content-Disposition': 'attachment; filename="video.mp4"', | |
| 'Content-Length': str(len(response.content)) | |
| } | |
| ) | |
| except HTTPException: | |
| raise | |
| except Exception as e: | |
| print(f"β Download error: {str(e)}") | |
| raise HTTPException( | |
| status_code=500, | |
| detail=f"Failed to download asset: {str(e)}" | |
| ) | |