dippoo's picture
Add Higgsfield provider for Kling 3.0, Sora 2, Veo 3.1
a4583f2
raw
history blame
11.1 kB
"""Higgsfield.ai cloud provider — access to Kling 3.0, Sora 2, Veo 3.1, and more.
Higgsfield provides a unified API for multiple AI video and image generation models
including Kling 3.0 (Kuaishou), Sora 2 (OpenAI), Veo 3.1 (Google), WAN 2.5 (Alibaba),
and their own Higgsfield Soul for character consistency.
SDK: pip install higgsfield-client
Docs: https://cloud.higgsfield.ai/
"""
from __future__ import annotations
import asyncio
import base64
import logging
import os
import time
import uuid
from typing import Any
import httpx
from content_engine.services.cloud_providers.base import CloudGenerationResult, CloudProvider
logger = logging.getLogger(__name__)
# Model IDs for Higgsfield platform
# Format: provider/model/task
TEXT_TO_IMAGE_MODELS = {
"seedream-4": "bytedance/seedream/v4/text-to-image",
"seedream-4.5": "bytedance/seedream/v4.5/text-to-image",
"nano-banana-pro": "google/nano-banana-pro/text-to-image",
"flux-2": "black-forest-labs/flux-2/text-to-image",
"gpt-image": "openai/gpt-image/text-to-image",
"default": "bytedance/seedream/v4/text-to-image",
}
TEXT_TO_VIDEO_MODELS = {
"kling-3.0": "kuaishou/kling/v3.0/text-to-video",
"kling-3.0-pro": "kuaishou/kling/v3.0-pro/text-to-video",
"sora-2": "openai/sora-2/text-to-video",
"veo-3.1": "google/veo-3.1/text-to-video",
"wan-2.5": "alibaba/wan-2.5/text-to-video",
"seedance-pro": "bytedance/seedance/pro/text-to-video",
"default": "kuaishou/kling/v3.0/text-to-video",
}
IMAGE_TO_VIDEO_MODELS = {
"kling-3.0": "kuaishou/kling/v3.0/image-to-video",
"kling-3.0-pro": "kuaishou/kling/v3.0-pro/image-to-video",
"sora-2": "openai/sora-2/image-to-video",
"veo-3.1": "google/veo-3.1/image-to-video",
"wan-2.5": "alibaba/wan-2.5/image-to-video",
"higgsfield-dop": "higgsfield/dop/image-to-video",
"default": "kuaishou/kling/v3.0/image-to-video",
}
IMAGE_EDIT_MODELS = {
"higgsfield-soul": "higgsfield/soul/image-to-image",
"seedream-4-edit": "bytedance/seedream/v4/edit",
"default": "higgsfield/soul/image-to-image",
}
class HiggsFieldProvider(CloudProvider):
"""Cloud provider using Higgsfield.ai for Kling 3.0, Sora 2, Veo 3.1, etc."""
def __init__(self, api_key: str = None, api_secret: str = None):
"""Initialize with Higgsfield credentials.
Can use either:
- Combined key (HF_KEY env var)
- Separate key/secret (HF_API_KEY/HF_API_SECRET env vars)
"""
self._api_key = api_key or os.getenv("HIGGSFIELD_API_KEY") or os.getenv("HF_API_KEY")
self._api_secret = api_secret or os.getenv("HIGGSFIELD_API_SECRET") or os.getenv("HF_API_SECRET")
self._combined_key = os.getenv("HF_KEY")
self._http_client = httpx.AsyncClient(timeout=300)
self._client = None
# Try to initialize SDK if available
try:
from higgsfield_client import HiggsFieldClient
if self._combined_key:
self._client = HiggsFieldClient()
elif self._api_key and self._api_secret:
self._client = HiggsFieldClient(api_key=self._api_key, api_secret=self._api_secret)
logger.info("Higgsfield SDK initialized")
except ImportError:
logger.warning("higgsfield-client not installed, using direct API")
except Exception as e:
logger.warning("Failed to init Higgsfield SDK: %s", e)
@property
def name(self) -> str:
return "higgsfield"
async def is_available(self) -> bool:
"""Check if Higgsfield API is configured."""
return bool(self._client or self._api_key)
def _resolve_t2i_model(self, model_name: str | None) -> str:
"""Resolve friendly name to Higgsfield model ID for text-to-image."""
if model_name and model_name in TEXT_TO_IMAGE_MODELS:
return TEXT_TO_IMAGE_MODELS[model_name]
if model_name:
return model_name
return TEXT_TO_IMAGE_MODELS["default"]
def _resolve_t2v_model(self, model_name: str | None) -> str:
"""Resolve friendly name to Higgsfield model ID for text-to-video."""
if model_name and model_name in TEXT_TO_VIDEO_MODELS:
return TEXT_TO_VIDEO_MODELS[model_name]
if model_name:
return model_name
return TEXT_TO_VIDEO_MODELS["default"]
def _resolve_i2v_model(self, model_name: str | None) -> str:
"""Resolve friendly name to Higgsfield model ID for image-to-video."""
if model_name and model_name in IMAGE_TO_VIDEO_MODELS:
return IMAGE_TO_VIDEO_MODELS[model_name]
if model_name:
return model_name
return IMAGE_TO_VIDEO_MODELS["default"]
def _resolve_edit_model(self, model_name: str | None) -> str:
"""Resolve friendly name to Higgsfield model ID for image editing."""
if model_name and model_name in IMAGE_EDIT_MODELS:
return IMAGE_EDIT_MODELS[model_name]
if model_name:
return model_name
return IMAGE_EDIT_MODELS["default"]
async def generate_image(
self,
prompt: str,
model: str | None = None,
resolution: str = "2K",
aspect_ratio: str = "16:9",
**kwargs,
) -> CloudGenerationResult:
"""Generate an image using Higgsfield text-to-image models."""
start = time.time()
model_id = self._resolve_t2i_model(model)
if self._client:
# Use SDK
try:
result = self._client.subscribe(
model_id,
{
"prompt": prompt,
"resolution": resolution,
"aspect_ratio": aspect_ratio,
**kwargs,
}
)
# Extract image URL
images = result.get("images", [])
if not images:
raise RuntimeError(f"No images in Higgsfield response: {result}")
image_url = images[0].get("url") if isinstance(images[0], dict) else images[0]
# Download image
resp = await self._http_client.get(image_url)
resp.raise_for_status()
return CloudGenerationResult(
job_id=str(uuid.uuid4()),
image_bytes=resp.content,
generation_time_seconds=time.time() - start,
)
except Exception as e:
logger.error("Higgsfield image generation failed: %s", e)
raise
else:
raise RuntimeError("Higgsfield SDK not initialized")
async def generate_video(
self,
prompt: str,
model: str | None = None,
duration: int = 5,
resolution: str = "720p",
aspect_ratio: str = "16:9",
enable_audio: bool = False,
image_url: str | None = None,
**kwargs,
) -> dict:
"""Generate a video using Higgsfield models (Kling 3.0, Sora 2, Veo 3.1, etc.).
Args:
prompt: Text description of desired video
model: Model to use (kling-3.0, sora-2, veo-3.1, etc.)
duration: Video duration in seconds (3-15 for Kling 3.0)
resolution: Output resolution (720p, 1080p)
aspect_ratio: Aspect ratio (16:9, 9:16, 1:1)
enable_audio: Enable audio generation (Kling 3.0 supports this)
image_url: Reference image URL for image-to-video
Returns:
Dict with job_id, status, and video_url when complete
"""
start = time.time()
# Choose model based on whether we have an image
if image_url:
model_id = self._resolve_i2v_model(model)
else:
model_id = self._resolve_t2v_model(model)
if self._client:
try:
payload = {
"prompt": prompt,
"resolution": resolution,
"aspect_ratio": aspect_ratio,
"duration": duration,
}
if enable_audio:
payload["enable_audio"] = True
if image_url:
payload["image"] = image_url
payload.update(kwargs)
# Submit and wait for result
result = self._client.subscribe(model_id, payload)
# Extract video URL
video_url = None
if "video" in result:
video_url = result["video"]
elif "outputs" in result and result["outputs"]:
video_url = result["outputs"][0]
elif "output" in result:
video_url = result["output"]
if not video_url:
raise RuntimeError(f"No video URL in Higgsfield response: {result}")
return {
"job_id": str(uuid.uuid4()),
"status": "completed",
"video_url": video_url,
"generation_time": time.time() - start,
}
except Exception as e:
logger.error("Higgsfield video generation failed: %s", e)
raise
else:
raise RuntimeError("Higgsfield SDK not initialized")
async def submit_generation(
self,
positive_prompt: str,
negative_prompt: str = "",
model: str | None = None,
width: int = 1024,
height: int = 1024,
seed: int = -1,
**kwargs,
) -> str:
"""Submit an image generation job. Returns job ID."""
# For now, use synchronous generation
result = await self.generate_image(
prompt=positive_prompt,
model=model,
**kwargs,
)
# Cache result for get_result
self._last_result = {
"job_id": result.job_id,
"result": result,
"timestamp": time.time(),
}
return result.job_id
async def check_status(self, job_id: str) -> str:
"""Check job status."""
if hasattr(self, '_last_result') and self._last_result.get("job_id") == job_id:
return "completed"
return "unknown"
async def get_result(self, job_id: str) -> CloudGenerationResult:
"""Get the generation result."""
if not hasattr(self, '_last_result') or self._last_result.get("job_id") != job_id:
raise RuntimeError(f"No cached result for job {job_id}")
return self._last_result["result"]
async def generate(
self,
positive_prompt: str,
negative_prompt: str = "",
model: str | None = None,
width: int = 1024,
height: int = 1024,
seed: int = -1,
**kwargs,
) -> CloudGenerationResult:
"""Convenience method: generate image in one call."""
return await self.generate_image(
prompt=positive_prompt,
model=model,
**kwargs,
)