dippoo's picture
Add Higgsfield provider for Kling 3.0, Sora 2, Veo 3.1
a4583f2
raw
history blame
8.43 kB
"""Content Engine — FastAPI application entry point.
Run with:
cd "D:\AI automation\content_engine"
uvicorn content_engine.main:app --host 0.0.0.0 --port 8000 --reload
Or:
python -m content_engine.main
"""
from __future__ import annotations
import logging
import os
from contextlib import asynccontextmanager
from pathlib import Path
import yaml
from dotenv import load_dotenv
from fastapi import FastAPI
from content_engine.config import settings
from content_engine.models.database import init_db
from content_engine.services.catalog import CatalogService
from content_engine.services.comfyui_client import ComfyUIClient
from content_engine.services.template_engine import TemplateEngine
from content_engine.services.variation_engine import CharacterProfile, VariationEngine
from content_engine.services.workflow_builder import WorkflowBuilder
from content_engine.workers.local_worker import LocalWorker
from content_engine.api import routes_catalog, routes_generation, routes_pod, routes_system, routes_training, routes_ui, routes_video
# Load .env file for API keys
import os
IS_HF_SPACES = os.environ.get("HF_SPACES") == "1" or os.environ.get("SPACE_ID") is not None
if IS_HF_SPACES:
load_dotenv(Path("/app/.env"))
else:
load_dotenv(Path("D:/AI automation/content_engine/.env"))
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s %(levelname)s %(name)s: %(message)s",
)
logger = logging.getLogger(__name__)
# Service instances (created at startup)
comfyui_client: ComfyUIClient | None = None
def load_character_profiles() -> dict[str, CharacterProfile]:
"""Load all character YAML profiles from config/characters/."""
if IS_HF_SPACES:
characters_dir = Path("/app/config/characters")
else:
characters_dir = Path("D:/AI automation/content_engine/config/characters")
profiles: dict[str, CharacterProfile] = {}
if not characters_dir.exists():
logger.warning("Characters directory not found: %s", characters_dir)
return profiles
for path in characters_dir.glob("*.yaml"):
try:
with open(path) as f:
data = yaml.safe_load(f)
profile = CharacterProfile(
id=data["id"],
name=data.get("name", data["id"]),
trigger_word=data["trigger_word"],
lora_filename=data["lora_filename"],
lora_strength=data.get("lora_strength", 0.85),
default_checkpoint=data.get("default_checkpoint"),
style_loras=data.get("style_loras", []),
description=data.get("description", ""),
physical_traits=data.get("physical_traits", {}),
)
profiles[profile.id] = profile
logger.info("Loaded character: %s (%s)", profile.name, profile.id)
except Exception:
logger.error("Failed to load character %s", path, exc_info=True)
return profiles
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Startup and shutdown lifecycle."""
global comfyui_client
logger.info("Starting Content Engine...")
# Initialize database
await init_db()
logger.info("Database initialized")
# Create service instances
comfyui_client = ComfyUIClient(settings.comfyui.url)
workflow_builder = WorkflowBuilder()
template_engine = TemplateEngine()
template_engine.load_all()
catalog = CatalogService()
character_profiles = load_character_profiles()
variation_engine = VariationEngine(template_engine)
local_worker = LocalWorker(
comfyui_client=comfyui_client,
workflow_builder=workflow_builder,
template_engine=template_engine,
catalog=catalog,
)
# Check ComfyUI connection
if await comfyui_client.is_available():
logger.info("ComfyUI connected at %s", settings.comfyui.url)
else:
logger.warning(
"ComfyUI not available at %s — generation will fail until connected",
settings.comfyui.url,
)
# Initialize WaveSpeed cloud provider if API key is set
wavespeed_provider = None
wavespeed_key = os.environ.get("WAVESPEED_API_KEY")
if wavespeed_key:
from content_engine.services.cloud_providers.wavespeed_provider import WaveSpeedProvider
wavespeed_provider = WaveSpeedProvider(api_key=wavespeed_key)
logger.info("WaveSpeed cloud provider initialized (NanoBanana, SeeDream)")
else:
logger.info("WaveSpeed not configured — cloud generation disabled")
# Initialize Higgsfield cloud provider if API key is set
higgsfield_provider = None
higgsfield_key = os.environ.get("HIGGSFIELD_API_KEY") or os.environ.get("HF_API_KEY")
if higgsfield_key:
try:
from content_engine.services.cloud_providers.higgsfield_provider import HiggsFieldProvider
higgsfield_provider = HiggsFieldProvider()
logger.info("Higgsfield cloud provider initialized (Kling 3.0, Sora 2, Veo 3.1)")
except Exception as e:
logger.warning("Failed to initialize Higgsfield provider: %s", e)
else:
logger.info("Higgsfield not configured — set HIGGSFIELD_API_KEY for Kling 3.0/Sora 2")
# Initialize route dependencies
routes_generation.init_routes(
local_worker, template_engine, variation_engine, character_profiles,
wavespeed_provider=wavespeed_provider, catalog=catalog,
comfyui_client=comfyui_client,
)
routes_catalog.init_routes(catalog)
routes_system.init_routes(comfyui_client, catalog, template_engine, character_profiles)
# Initialize video routes with cloud providers
if wavespeed_provider:
routes_video.init_wavespeed(wavespeed_provider)
if higgsfield_provider:
routes_video.init_higgsfield(higgsfield_provider)
# Initialize LoRA trainer (local)
from content_engine.services.lora_trainer import LoRATrainer
lora_trainer = LoRATrainer()
logger.info("LoRA trainer initialized (sd-scripts %s)",
"ready" if lora_trainer.sd_scripts_installed else "not installed — install via UI")
# Initialize RunPod cloud trainer if API key is set
runpod_trainer = None
runpod_provider = None
runpod_key = os.environ.get("RUNPOD_API_KEY")
runpod_endpoint_id = os.environ.get("RUNPOD_ENDPOINT_ID")
if runpod_key:
from content_engine.services.runpod_trainer import RunPodTrainer
runpod_trainer = RunPodTrainer(api_key=runpod_key)
logger.info("RunPod cloud trainer initialized — cloud LoRA training available")
# Initialize RunPod generation provider if endpoint ID is set
if runpod_endpoint_id:
from content_engine.services.cloud_providers.runpod_provider import RunPodProvider
runpod_provider = RunPodProvider(api_key=runpod_key, endpoint_id=runpod_endpoint_id)
logger.info("RunPod generation provider initialized (endpoint: %s)", runpod_endpoint_id)
else:
logger.info("RunPod endpoint not configured — set RUNPOD_ENDPOINT_ID for cloud generation")
else:
logger.info("RunPod not configured — set RUNPOD_API_KEY for cloud training/generation")
routes_training.init_routes(lora_trainer, runpod_trainer=runpod_trainer)
# Update generation routes with RunPod provider
routes_generation.set_runpod_provider(runpod_provider)
logger.info(
"Content Engine ready — %d templates, %d characters",
len(template_engine.list_templates()),
len(character_profiles),
)
yield
# Shutdown
if comfyui_client:
await comfyui_client.close()
logger.info("Content Engine stopped")
# Create the FastAPI app
app = FastAPI(
title="Content Engine",
description="Automated content generation system using ComfyUI",
version="0.1.0",
lifespan=lifespan,
)
# Register route modules
app.include_router(routes_ui.router) # UI at / (must be first)
app.include_router(routes_generation.router)
app.include_router(routes_catalog.router)
app.include_router(routes_system.router)
app.include_router(routes_training.router)
app.include_router(routes_pod.router)
app.include_router(routes_video.router)
if __name__ == "__main__":
import uvicorn
uvicorn.run(
"content_engine.main:app",
host="0.0.0.0",
port=8000,
reload=True,
)