"""Higgsfield.ai cloud provider — access to Kling 3.0, Sora 2, Veo 3.1, and more. Higgsfield provides a unified API for multiple AI video and image generation models including Kling 3.0 (Kuaishou), Sora 2 (OpenAI), Veo 3.1 (Google), WAN 2.5 (Alibaba), and their own Higgsfield Soul for character consistency. SDK: pip install higgsfield-client Docs: https://cloud.higgsfield.ai/ """ from __future__ import annotations import asyncio import base64 import logging import os import time import uuid from typing import Any import httpx from content_engine.services.cloud_providers.base import CloudGenerationResult, CloudProvider logger = logging.getLogger(__name__) # Model IDs for Higgsfield platform # Format: provider/model/task TEXT_TO_IMAGE_MODELS = { "seedream-4": "bytedance/seedream/v4/text-to-image", "seedream-4.5": "bytedance/seedream/v4.5/text-to-image", "nano-banana-pro": "google/nano-banana-pro/text-to-image", "flux-2": "black-forest-labs/flux-2/text-to-image", "gpt-image": "openai/gpt-image/text-to-image", "default": "bytedance/seedream/v4/text-to-image", } TEXT_TO_VIDEO_MODELS = { "kling-3.0": "kuaishou/kling/v3.0/text-to-video", "kling-3.0-pro": "kuaishou/kling/v3.0-pro/text-to-video", "sora-2": "openai/sora-2/text-to-video", "veo-3.1": "google/veo-3.1/text-to-video", "wan-2.5": "alibaba/wan-2.5/text-to-video", "seedance-pro": "bytedance/seedance/pro/text-to-video", "default": "kuaishou/kling/v3.0/text-to-video", } IMAGE_TO_VIDEO_MODELS = { "kling-3.0": "kuaishou/kling/v3.0/image-to-video", "kling-3.0-pro": "kuaishou/kling/v3.0-pro/image-to-video", "sora-2": "openai/sora-2/image-to-video", "veo-3.1": "google/veo-3.1/image-to-video", "wan-2.5": "alibaba/wan-2.5/image-to-video", "higgsfield-dop": "higgsfield/dop/image-to-video", "default": "kuaishou/kling/v3.0/image-to-video", } IMAGE_EDIT_MODELS = { "higgsfield-soul": "higgsfield/soul/image-to-image", "seedream-4-edit": "bytedance/seedream/v4/edit", "default": "higgsfield/soul/image-to-image", } class HiggsFieldProvider(CloudProvider): """Cloud provider using Higgsfield.ai for Kling 3.0, Sora 2, Veo 3.1, etc.""" def __init__(self, api_key: str = None, api_secret: str = None): """Initialize with Higgsfield credentials. Can use either: - Combined key (HF_KEY env var) - Separate key/secret (HF_API_KEY/HF_API_SECRET env vars) """ self._api_key = api_key or os.getenv("HIGGSFIELD_API_KEY") or os.getenv("HF_API_KEY") self._api_secret = api_secret or os.getenv("HIGGSFIELD_API_SECRET") or os.getenv("HF_API_SECRET") self._combined_key = os.getenv("HF_KEY") self._http_client = httpx.AsyncClient(timeout=300) self._client = None # Try to initialize SDK if available try: from higgsfield_client import HiggsFieldClient if self._combined_key: self._client = HiggsFieldClient() elif self._api_key and self._api_secret: self._client = HiggsFieldClient(api_key=self._api_key, api_secret=self._api_secret) logger.info("Higgsfield SDK initialized") except ImportError: logger.warning("higgsfield-client not installed, using direct API") except Exception as e: logger.warning("Failed to init Higgsfield SDK: %s", e) @property def name(self) -> str: return "higgsfield" async def is_available(self) -> bool: """Check if Higgsfield API is configured.""" return bool(self._client or self._api_key) def _resolve_t2i_model(self, model_name: str | None) -> str: """Resolve friendly name to Higgsfield model ID for text-to-image.""" if model_name and model_name in TEXT_TO_IMAGE_MODELS: return TEXT_TO_IMAGE_MODELS[model_name] if model_name: return model_name return TEXT_TO_IMAGE_MODELS["default"] def _resolve_t2v_model(self, model_name: str | None) -> str: """Resolve friendly name to Higgsfield model ID for text-to-video.""" if model_name and model_name in TEXT_TO_VIDEO_MODELS: return TEXT_TO_VIDEO_MODELS[model_name] if model_name: return model_name return TEXT_TO_VIDEO_MODELS["default"] def _resolve_i2v_model(self, model_name: str | None) -> str: """Resolve friendly name to Higgsfield model ID for image-to-video.""" if model_name and model_name in IMAGE_TO_VIDEO_MODELS: return IMAGE_TO_VIDEO_MODELS[model_name] if model_name: return model_name return IMAGE_TO_VIDEO_MODELS["default"] def _resolve_edit_model(self, model_name: str | None) -> str: """Resolve friendly name to Higgsfield model ID for image editing.""" if model_name and model_name in IMAGE_EDIT_MODELS: return IMAGE_EDIT_MODELS[model_name] if model_name: return model_name return IMAGE_EDIT_MODELS["default"] async def generate_image( self, prompt: str, model: str | None = None, resolution: str = "2K", aspect_ratio: str = "16:9", **kwargs, ) -> CloudGenerationResult: """Generate an image using Higgsfield text-to-image models.""" start = time.time() model_id = self._resolve_t2i_model(model) if self._client: # Use SDK try: result = self._client.subscribe( model_id, { "prompt": prompt, "resolution": resolution, "aspect_ratio": aspect_ratio, **kwargs, } ) # Extract image URL images = result.get("images", []) if not images: raise RuntimeError(f"No images in Higgsfield response: {result}") image_url = images[0].get("url") if isinstance(images[0], dict) else images[0] # Download image resp = await self._http_client.get(image_url) resp.raise_for_status() return CloudGenerationResult( job_id=str(uuid.uuid4()), image_bytes=resp.content, generation_time_seconds=time.time() - start, ) except Exception as e: logger.error("Higgsfield image generation failed: %s", e) raise else: raise RuntimeError("Higgsfield SDK not initialized") async def generate_video( self, prompt: str, model: str | None = None, duration: int = 5, resolution: str = "720p", aspect_ratio: str = "16:9", enable_audio: bool = False, image_url: str | None = None, **kwargs, ) -> dict: """Generate a video using Higgsfield models (Kling 3.0, Sora 2, Veo 3.1, etc.). Args: prompt: Text description of desired video model: Model to use (kling-3.0, sora-2, veo-3.1, etc.) duration: Video duration in seconds (3-15 for Kling 3.0) resolution: Output resolution (720p, 1080p) aspect_ratio: Aspect ratio (16:9, 9:16, 1:1) enable_audio: Enable audio generation (Kling 3.0 supports this) image_url: Reference image URL for image-to-video Returns: Dict with job_id, status, and video_url when complete """ start = time.time() # Choose model based on whether we have an image if image_url: model_id = self._resolve_i2v_model(model) else: model_id = self._resolve_t2v_model(model) if self._client: try: payload = { "prompt": prompt, "resolution": resolution, "aspect_ratio": aspect_ratio, "duration": duration, } if enable_audio: payload["enable_audio"] = True if image_url: payload["image"] = image_url payload.update(kwargs) # Submit and wait for result result = self._client.subscribe(model_id, payload) # Extract video URL video_url = None if "video" in result: video_url = result["video"] elif "outputs" in result and result["outputs"]: video_url = result["outputs"][0] elif "output" in result: video_url = result["output"] if not video_url: raise RuntimeError(f"No video URL in Higgsfield response: {result}") return { "job_id": str(uuid.uuid4()), "status": "completed", "video_url": video_url, "generation_time": time.time() - start, } except Exception as e: logger.error("Higgsfield video generation failed: %s", e) raise else: raise RuntimeError("Higgsfield SDK not initialized") async def submit_generation( self, positive_prompt: str, negative_prompt: str = "", model: str | None = None, width: int = 1024, height: int = 1024, seed: int = -1, **kwargs, ) -> str: """Submit an image generation job. Returns job ID.""" # For now, use synchronous generation result = await self.generate_image( prompt=positive_prompt, model=model, **kwargs, ) # Cache result for get_result self._last_result = { "job_id": result.job_id, "result": result, "timestamp": time.time(), } return result.job_id async def check_status(self, job_id: str) -> str: """Check job status.""" if hasattr(self, '_last_result') and self._last_result.get("job_id") == job_id: return "completed" return "unknown" async def get_result(self, job_id: str) -> CloudGenerationResult: """Get the generation result.""" if not hasattr(self, '_last_result') or self._last_result.get("job_id") != job_id: raise RuntimeError(f"No cached result for job {job_id}") return self._last_result["result"] async def generate( self, positive_prompt: str, negative_prompt: str = "", model: str | None = None, width: int = 1024, height: int = 1024, seed: int = -1, **kwargs, ) -> CloudGenerationResult: """Convenience method: generate image in one call.""" return await self.generate_image( prompt=positive_prompt, model=model, **kwargs, )