Spaces:
Build error
Build error
| import os | |
| import time | |
| import json | |
| import logging | |
| import requests | |
| from typing import Optional, Dict, Any, List | |
| from urllib.parse import urlparse | |
| # --- Logging Setup --- | |
| def get_logger(name: str = __name__): | |
| logger = logging.getLogger(name) | |
| if not logger.handlers: | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format='%(asctime)s - %(levelname)s - %(message)s', | |
| handlers=[ | |
| logging.FileHandler('/tmp/sora_video_downloader.log'), | |
| logging.StreamHandler() | |
| ] | |
| ) | |
| return logger | |
| logger = get_logger() | |
| # --- Sora API Client --- | |
| class SoraClient: | |
| def __init__(self, api_key: str, base_url: str, api_version: str = "preview"): | |
| self.api_key = api_key | |
| self.base_url = base_url.rstrip('/') | |
| self.api_version = api_version | |
| self.session = requests.Session() | |
| self.session.headers.update({ | |
| 'api-key': self.api_key, | |
| 'Content-Type': 'application/json' | |
| }) | |
| logger.info("SoraClient initialized") | |
| def start_video_job(self, prompt: str, height: int = 1080, width: int = 1080, n_seconds: int = 5, n_variants: int = 1) -> Optional[str]: | |
| url = f"{self.base_url}/openai/v1/video/generations/jobs?api-version={self.api_version}" | |
| payload = { | |
| "model": "sora", | |
| "prompt": prompt, | |
| "height": str(height), | |
| "width": str(width), | |
| "n_seconds": str(n_seconds), | |
| "n_variants": str(n_variants) | |
| } | |
| try: | |
| response = self.session.post(url, json=payload) | |
| if response.status_code in [200, 201, 202]: | |
| result = response.json() | |
| job_id = result.get('id') or result.get('job_id') or result.get('jobId') | |
| return job_id | |
| else: | |
| logger.error(f"Failed to start job: {response.status_code} {response.text}") | |
| return None | |
| except Exception as e: | |
| logger.error(f"Exception in start_video_job: {e}") | |
| return None | |
| def get_job_status(self, job_id: str) -> Dict[str, Any]: | |
| url = f"{self.base_url}/openai/v1/video/generations/jobs/{job_id}?api-version={self.api_version}" | |
| try: | |
| response = self.session.get(url) | |
| if response.status_code == 200: | |
| return response.json() | |
| else: | |
| logger.error(f"Failed to get job status: {response.status_code} {response.text}") | |
| return {"status": "error", "error": f"HTTP {response.status_code}"} | |
| except Exception as e: | |
| logger.error(f"Exception in get_job_status: {e}") | |
| return {"status": "error", "error": str(e)} | |
| def wait_for_job(self, job_id: str, max_wait_time: int = 300, poll_interval: int = 10) -> Optional[Dict[str, Any]]: | |
| start_time = time.time() | |
| while time.time() - start_time < max_wait_time: | |
| status_result = self.get_job_status(job_id) | |
| status = status_result.get('status', '').lower() | |
| if status in ['completed', 'succeeded', 'success']: | |
| return status_result | |
| elif status in ['failed', 'error']: | |
| return None | |
| time.sleep(poll_interval) | |
| logger.error(f"Job {job_id} timed out after {max_wait_time}s") | |
| return None | |
| def get_generation_details(self, generation_id: str) -> Dict[str, Any]: | |
| url = f"{self.base_url}/openai/v1/video/generations/{generation_id}?api-version={self.api_version}" | |
| try: | |
| resp = self.session.get(url) | |
| if resp.status_code == 200: | |
| return resp.json() | |
| else: | |
| logger.error(f"Failed to fetch generation details: HTTP {resp.status_code}") | |
| return {} | |
| except Exception as e: | |
| logger.error(f"Exception in get_generation_details: {e}") | |
| return {} | |
| def extract_video_urls(self, job_result: Dict[str, Any]) -> List[str]: | |
| video_urls = [] | |
| generations = job_result.get('generations', []) | |
| if isinstance(generations, list) and generations: | |
| for g in generations: | |
| gen_id = g.get('id') if isinstance(g, dict) else None | |
| if not gen_id: | |
| continue | |
| content_url = f"{self.base_url}/openai/v1/video/generations/{gen_id}/content/video?api-version={self.api_version}" | |
| video_urls.append(content_url) | |
| return video_urls | |
| def download_video(self, video_url: str, output_filename: str) -> bool: | |
| try: | |
| download_session = requests.Session() | |
| download_session.headers.update({'api-key': self.api_key}) | |
| response = download_session.get(video_url, stream=True) | |
| if response.status_code == 200: | |
| with open(output_filename, 'wb') as f: | |
| for chunk in response.iter_content(chunk_size=8192): | |
| if chunk: | |
| f.write(chunk) | |
| return True | |
| else: | |
| logger.error(f"Failed to download video: {response.status_code} {response.text}") | |
| return False | |
| except Exception as e: | |
| logger.error(f"Exception in download_video: {e}") | |
| return False | |
| # --- Video Job Abstraction --- | |
| class VideoJob: | |
| def __init__(self, sora_client: SoraClient, prompt: str, height: int = 1080, width: int = 1080, n_seconds: int = 5, n_variants: int = 1): | |
| self.sora_client = sora_client | |
| self.prompt = prompt | |
| self.height = height | |
| self.width = width | |
| self.n_seconds = n_seconds | |
| self.n_variants = n_variants | |
| self.job_id: Optional[str] = None | |
| self.result: Optional[Dict[str, Any]] = None | |
| self.video_urls: List[str] = [] | |
| def run(self, wait: bool = True) -> bool: | |
| self.job_id = self.sora_client.start_video_job( | |
| self.prompt, self.height, self.width, self.n_seconds, self.n_variants | |
| ) | |
| if not self.job_id: | |
| logger.error("Failed to start video job") | |
| return False | |
| if wait: | |
| self.result = self.sora_client.wait_for_job(self.job_id) | |
| if not self.result: | |
| logger.error("Job failed or timed out") | |
| return False | |
| self.video_urls = self.sora_client.extract_video_urls(self.result) | |
| if not self.video_urls: | |
| logger.error("No video URLs found") | |
| return False | |
| return True | |
| def download_videos(self, output_dir: str) -> List[str]: | |
| saved_files = [] | |
| for i, url in enumerate(self.video_urls): | |
| filename = f"sora_video_{self.job_id}_{i+1}.mp4" | |
| filepath = os.path.join(output_dir, filename) | |
| if self.sora_client.download_video(url, filepath): | |
| saved_files.append(filepath) | |
| return saved_files | |
| # --- Video Storage Handler --- | |
| class VideoStorage: | |
| def __init__(self, storage_dir: str = "/tmp/videos"): | |
| self.storage_dir = storage_dir | |
| os.makedirs(self.storage_dir, exist_ok=True) | |
| def save_video(self, src_path: str, filename: str) -> str: | |
| dst_path = os.path.join(self.storage_dir, filename) | |
| os.rename(src_path, dst_path) | |
| return dst_path | |
| def list_videos(self) -> List[str]: | |
| return [os.path.join(self.storage_dir, f) for f in os.listdir(self.storage_dir) if f.endswith('.mp4')] | |
| # --- Main for CLI usage (optional) --- | |
| def main(): | |
| api_key = os.getenv('AZURE_OPENAI_API_KEY') or os.getenv('AZURE_API_KEY') | |
| base_url = os.getenv('AZURE_OPENAI_ENDPOINT') or "https://levm3-me7f7pgq-eastus2.cognitiveservices.azure.com" | |
| if not api_key or not base_url: | |
| print("Please set your Azure API key and endpoint.") | |
| return | |
| sora = SoraClient(api_key, base_url) | |
| prompt = "A video of a cat playing with a ball of yarn in a sunny room" | |
| job = VideoJob(sora, prompt) | |
| if job.run(): | |
| storage = VideoStorage() | |
| files = job.download_videos(storage.storage_dir) | |
| print(f"Downloaded: {files}") | |
| else: | |
| print("Video generation failed.") | |
| if __name__ == "__main__": | |
| main() | |