Spaces:
Sleeping
Sleeping
| """Higgsfield.ai cloud provider — access to Kling 3.0, Sora 2, Veo 3.1, and more. | |
| Higgsfield provides a unified API for multiple AI video and image generation models | |
| including Kling 3.0 (Kuaishou), Sora 2 (OpenAI), Veo 3.1 (Google), WAN 2.5 (Alibaba), | |
| and their own Higgsfield Soul for character consistency. | |
| SDK: pip install higgsfield-client | |
| Docs: https://cloud.higgsfield.ai/ | |
| """ | |
| from __future__ import annotations | |
| import asyncio | |
| import base64 | |
| import logging | |
| import os | |
| import time | |
| import uuid | |
| from typing import Any | |
| import httpx | |
| from content_engine.services.cloud_providers.base import CloudGenerationResult, CloudProvider | |
| logger = logging.getLogger(__name__) | |
| # Model IDs for Higgsfield platform | |
| # Format: provider/model/task | |
| TEXT_TO_IMAGE_MODELS = { | |
| "seedream-4": "bytedance/seedream/v4/text-to-image", | |
| "seedream-4.5": "bytedance/seedream/v4.5/text-to-image", | |
| "nano-banana-pro": "google/nano-banana-pro/text-to-image", | |
| "flux-2": "black-forest-labs/flux-2/text-to-image", | |
| "gpt-image": "openai/gpt-image/text-to-image", | |
| "default": "bytedance/seedream/v4/text-to-image", | |
| } | |
| TEXT_TO_VIDEO_MODELS = { | |
| "kling-3.0": "kuaishou/kling/v3.0/text-to-video", | |
| "kling-3.0-pro": "kuaishou/kling/v3.0-pro/text-to-video", | |
| "sora-2": "openai/sora-2/text-to-video", | |
| "veo-3.1": "google/veo-3.1/text-to-video", | |
| "wan-2.5": "alibaba/wan-2.5/text-to-video", | |
| "seedance-pro": "bytedance/seedance/pro/text-to-video", | |
| "default": "kuaishou/kling/v3.0/text-to-video", | |
| } | |
| IMAGE_TO_VIDEO_MODELS = { | |
| "kling-3.0": "kuaishou/kling/v3.0/image-to-video", | |
| "kling-3.0-pro": "kuaishou/kling/v3.0-pro/image-to-video", | |
| "sora-2": "openai/sora-2/image-to-video", | |
| "veo-3.1": "google/veo-3.1/image-to-video", | |
| "wan-2.5": "alibaba/wan-2.5/image-to-video", | |
| "higgsfield-dop": "higgsfield/dop/image-to-video", | |
| "default": "kuaishou/kling/v3.0/image-to-video", | |
| } | |
| IMAGE_EDIT_MODELS = { | |
| "higgsfield-soul": "higgsfield/soul/image-to-image", | |
| "seedream-4-edit": "bytedance/seedream/v4/edit", | |
| "default": "higgsfield/soul/image-to-image", | |
| } | |
| class HiggsFieldProvider(CloudProvider): | |
| """Cloud provider using Higgsfield.ai for Kling 3.0, Sora 2, Veo 3.1, etc.""" | |
| def __init__(self, api_key: str = None, api_secret: str = None): | |
| """Initialize with Higgsfield credentials. | |
| Can use either: | |
| - Combined key (HF_KEY env var) | |
| - Separate key/secret (HF_API_KEY/HF_API_SECRET env vars) | |
| """ | |
| self._api_key = api_key or os.getenv("HIGGSFIELD_API_KEY") or os.getenv("HF_API_KEY") | |
| self._api_secret = api_secret or os.getenv("HIGGSFIELD_API_SECRET") or os.getenv("HF_API_SECRET") | |
| self._combined_key = os.getenv("HF_KEY") | |
| self._http_client = httpx.AsyncClient(timeout=300) | |
| self._client = None | |
| # Try to initialize SDK if available | |
| try: | |
| from higgsfield_client import HiggsFieldClient | |
| if self._combined_key: | |
| self._client = HiggsFieldClient() | |
| elif self._api_key and self._api_secret: | |
| self._client = HiggsFieldClient(api_key=self._api_key, api_secret=self._api_secret) | |
| logger.info("Higgsfield SDK initialized") | |
| except ImportError: | |
| logger.warning("higgsfield-client not installed, using direct API") | |
| except Exception as e: | |
| logger.warning("Failed to init Higgsfield SDK: %s", e) | |
| def name(self) -> str: | |
| return "higgsfield" | |
| async def is_available(self) -> bool: | |
| """Check if Higgsfield API is configured.""" | |
| return bool(self._client or self._api_key) | |
| def _resolve_t2i_model(self, model_name: str | None) -> str: | |
| """Resolve friendly name to Higgsfield model ID for text-to-image.""" | |
| if model_name and model_name in TEXT_TO_IMAGE_MODELS: | |
| return TEXT_TO_IMAGE_MODELS[model_name] | |
| if model_name: | |
| return model_name | |
| return TEXT_TO_IMAGE_MODELS["default"] | |
| def _resolve_t2v_model(self, model_name: str | None) -> str: | |
| """Resolve friendly name to Higgsfield model ID for text-to-video.""" | |
| if model_name and model_name in TEXT_TO_VIDEO_MODELS: | |
| return TEXT_TO_VIDEO_MODELS[model_name] | |
| if model_name: | |
| return model_name | |
| return TEXT_TO_VIDEO_MODELS["default"] | |
| def _resolve_i2v_model(self, model_name: str | None) -> str: | |
| """Resolve friendly name to Higgsfield model ID for image-to-video.""" | |
| if model_name and model_name in IMAGE_TO_VIDEO_MODELS: | |
| return IMAGE_TO_VIDEO_MODELS[model_name] | |
| if model_name: | |
| return model_name | |
| return IMAGE_TO_VIDEO_MODELS["default"] | |
| def _resolve_edit_model(self, model_name: str | None) -> str: | |
| """Resolve friendly name to Higgsfield model ID for image editing.""" | |
| if model_name and model_name in IMAGE_EDIT_MODELS: | |
| return IMAGE_EDIT_MODELS[model_name] | |
| if model_name: | |
| return model_name | |
| return IMAGE_EDIT_MODELS["default"] | |
| async def generate_image( | |
| self, | |
| prompt: str, | |
| model: str | None = None, | |
| resolution: str = "2K", | |
| aspect_ratio: str = "16:9", | |
| **kwargs, | |
| ) -> CloudGenerationResult: | |
| """Generate an image using Higgsfield text-to-image models.""" | |
| start = time.time() | |
| model_id = self._resolve_t2i_model(model) | |
| if self._client: | |
| # Use SDK | |
| try: | |
| result = self._client.subscribe( | |
| model_id, | |
| { | |
| "prompt": prompt, | |
| "resolution": resolution, | |
| "aspect_ratio": aspect_ratio, | |
| **kwargs, | |
| } | |
| ) | |
| # Extract image URL | |
| images = result.get("images", []) | |
| if not images: | |
| raise RuntimeError(f"No images in Higgsfield response: {result}") | |
| image_url = images[0].get("url") if isinstance(images[0], dict) else images[0] | |
| # Download image | |
| resp = await self._http_client.get(image_url) | |
| resp.raise_for_status() | |
| return CloudGenerationResult( | |
| job_id=str(uuid.uuid4()), | |
| image_bytes=resp.content, | |
| generation_time_seconds=time.time() - start, | |
| ) | |
| except Exception as e: | |
| logger.error("Higgsfield image generation failed: %s", e) | |
| raise | |
| else: | |
| raise RuntimeError("Higgsfield SDK not initialized") | |
| async def generate_video( | |
| self, | |
| prompt: str, | |
| model: str | None = None, | |
| duration: int = 5, | |
| resolution: str = "720p", | |
| aspect_ratio: str = "16:9", | |
| enable_audio: bool = False, | |
| image_url: str | None = None, | |
| **kwargs, | |
| ) -> dict: | |
| """Generate a video using Higgsfield models (Kling 3.0, Sora 2, Veo 3.1, etc.). | |
| Args: | |
| prompt: Text description of desired video | |
| model: Model to use (kling-3.0, sora-2, veo-3.1, etc.) | |
| duration: Video duration in seconds (3-15 for Kling 3.0) | |
| resolution: Output resolution (720p, 1080p) | |
| aspect_ratio: Aspect ratio (16:9, 9:16, 1:1) | |
| enable_audio: Enable audio generation (Kling 3.0 supports this) | |
| image_url: Reference image URL for image-to-video | |
| Returns: | |
| Dict with job_id, status, and video_url when complete | |
| """ | |
| start = time.time() | |
| # Choose model based on whether we have an image | |
| if image_url: | |
| model_id = self._resolve_i2v_model(model) | |
| else: | |
| model_id = self._resolve_t2v_model(model) | |
| if self._client: | |
| try: | |
| payload = { | |
| "prompt": prompt, | |
| "resolution": resolution, | |
| "aspect_ratio": aspect_ratio, | |
| "duration": duration, | |
| } | |
| if enable_audio: | |
| payload["enable_audio"] = True | |
| if image_url: | |
| payload["image"] = image_url | |
| payload.update(kwargs) | |
| # Submit and wait for result | |
| result = self._client.subscribe(model_id, payload) | |
| # Extract video URL | |
| video_url = None | |
| if "video" in result: | |
| video_url = result["video"] | |
| elif "outputs" in result and result["outputs"]: | |
| video_url = result["outputs"][0] | |
| elif "output" in result: | |
| video_url = result["output"] | |
| if not video_url: | |
| raise RuntimeError(f"No video URL in Higgsfield response: {result}") | |
| return { | |
| "job_id": str(uuid.uuid4()), | |
| "status": "completed", | |
| "video_url": video_url, | |
| "generation_time": time.time() - start, | |
| } | |
| except Exception as e: | |
| logger.error("Higgsfield video generation failed: %s", e) | |
| raise | |
| else: | |
| raise RuntimeError("Higgsfield SDK not initialized") | |
| async def submit_generation( | |
| self, | |
| positive_prompt: str, | |
| negative_prompt: str = "", | |
| model: str | None = None, | |
| width: int = 1024, | |
| height: int = 1024, | |
| seed: int = -1, | |
| **kwargs, | |
| ) -> str: | |
| """Submit an image generation job. Returns job ID.""" | |
| # For now, use synchronous generation | |
| result = await self.generate_image( | |
| prompt=positive_prompt, | |
| model=model, | |
| **kwargs, | |
| ) | |
| # Cache result for get_result | |
| self._last_result = { | |
| "job_id": result.job_id, | |
| "result": result, | |
| "timestamp": time.time(), | |
| } | |
| return result.job_id | |
| async def check_status(self, job_id: str) -> str: | |
| """Check job status.""" | |
| if hasattr(self, '_last_result') and self._last_result.get("job_id") == job_id: | |
| return "completed" | |
| return "unknown" | |
| async def get_result(self, job_id: str) -> CloudGenerationResult: | |
| """Get the generation result.""" | |
| if not hasattr(self, '_last_result') or self._last_result.get("job_id") != job_id: | |
| raise RuntimeError(f"No cached result for job {job_id}") | |
| return self._last_result["result"] | |
| async def generate( | |
| self, | |
| positive_prompt: str, | |
| negative_prompt: str = "", | |
| model: str | None = None, | |
| width: int = 1024, | |
| height: int = 1024, | |
| seed: int = -1, | |
| **kwargs, | |
| ) -> CloudGenerationResult: | |
| """Convenience method: generate image in one call.""" | |
| return await self.generate_image( | |
| prompt=positive_prompt, | |
| model=model, | |
| **kwargs, | |
| ) | |