"""Content Engine — FastAPI application entry point. Run with: cd "D:\AI automation\content_engine" uvicorn content_engine.main:app --host 0.0.0.0 --port 8000 --reload Or: python -m content_engine.main """ from __future__ import annotations import logging import os from contextlib import asynccontextmanager from pathlib import Path import yaml from dotenv import load_dotenv from fastapi import FastAPI from content_engine.config import settings from content_engine.models.database import init_db from content_engine.services.catalog import CatalogService from content_engine.services.comfyui_client import ComfyUIClient from content_engine.services.template_engine import TemplateEngine from content_engine.services.variation_engine import CharacterProfile, VariationEngine from content_engine.services.workflow_builder import WorkflowBuilder from content_engine.workers.local_worker import LocalWorker from content_engine.api import routes_catalog, routes_generation, routes_pod, routes_system, routes_training, routes_ui, routes_video # Load .env file for API keys import os IS_HF_SPACES = os.environ.get("HF_SPACES") == "1" or os.environ.get("SPACE_ID") is not None if IS_HF_SPACES: load_dotenv(Path("/app/.env")) else: load_dotenv(Path("D:/AI automation/content_engine/.env")) logging.basicConfig( level=logging.INFO, format="%(asctime)s %(levelname)s %(name)s: %(message)s", ) logger = logging.getLogger(__name__) # Service instances (created at startup) comfyui_client: ComfyUIClient | None = None def load_character_profiles() -> dict[str, CharacterProfile]: """Load all character YAML profiles from config/characters/.""" if IS_HF_SPACES: characters_dir = Path("/app/config/characters") else: characters_dir = Path("D:/AI automation/content_engine/config/characters") profiles: dict[str, CharacterProfile] = {} if not characters_dir.exists(): logger.warning("Characters directory not found: %s", characters_dir) return profiles for path in characters_dir.glob("*.yaml"): try: with open(path) as f: data = yaml.safe_load(f) profile = CharacterProfile( id=data["id"], name=data.get("name", data["id"]), trigger_word=data["trigger_word"], lora_filename=data["lora_filename"], lora_strength=data.get("lora_strength", 0.85), default_checkpoint=data.get("default_checkpoint"), style_loras=data.get("style_loras", []), description=data.get("description", ""), physical_traits=data.get("physical_traits", {}), ) profiles[profile.id] = profile logger.info("Loaded character: %s (%s)", profile.name, profile.id) except Exception: logger.error("Failed to load character %s", path, exc_info=True) return profiles @asynccontextmanager async def lifespan(app: FastAPI): """Startup and shutdown lifecycle.""" global comfyui_client logger.info("Starting Content Engine...") # Initialize database await init_db() logger.info("Database initialized") # Create service instances comfyui_client = ComfyUIClient(settings.comfyui.url) workflow_builder = WorkflowBuilder() template_engine = TemplateEngine() template_engine.load_all() catalog = CatalogService() character_profiles = load_character_profiles() variation_engine = VariationEngine(template_engine) local_worker = LocalWorker( comfyui_client=comfyui_client, workflow_builder=workflow_builder, template_engine=template_engine, catalog=catalog, ) # Check ComfyUI connection if await comfyui_client.is_available(): logger.info("ComfyUI connected at %s", settings.comfyui.url) else: logger.warning( "ComfyUI not available at %s — generation will fail until connected", settings.comfyui.url, ) # Initialize WaveSpeed cloud provider if API key is set wavespeed_provider = None wavespeed_key = os.environ.get("WAVESPEED_API_KEY") if wavespeed_key: from content_engine.services.cloud_providers.wavespeed_provider import WaveSpeedProvider wavespeed_provider = WaveSpeedProvider(api_key=wavespeed_key) logger.info("WaveSpeed cloud provider initialized (NanoBanana, SeeDream)") else: logger.info("WaveSpeed not configured — cloud generation disabled") # Initialize Higgsfield cloud provider if API key is set higgsfield_provider = None higgsfield_key = os.environ.get("HIGGSFIELD_API_KEY") or os.environ.get("HF_API_KEY") if higgsfield_key: try: from content_engine.services.cloud_providers.higgsfield_provider import HiggsFieldProvider higgsfield_provider = HiggsFieldProvider() logger.info("Higgsfield cloud provider initialized (Kling 3.0, Sora 2, Veo 3.1)") except Exception as e: logger.warning("Failed to initialize Higgsfield provider: %s", e) else: logger.info("Higgsfield not configured — set HIGGSFIELD_API_KEY for Kling 3.0/Sora 2") # Initialize route dependencies routes_generation.init_routes( local_worker, template_engine, variation_engine, character_profiles, wavespeed_provider=wavespeed_provider, catalog=catalog, comfyui_client=comfyui_client, ) routes_catalog.init_routes(catalog) routes_system.init_routes(comfyui_client, catalog, template_engine, character_profiles) # Initialize video routes with cloud providers if wavespeed_provider: routes_video.init_wavespeed(wavespeed_provider) if higgsfield_provider: routes_video.init_higgsfield(higgsfield_provider) # Initialize LoRA trainer (local) from content_engine.services.lora_trainer import LoRATrainer lora_trainer = LoRATrainer() logger.info("LoRA trainer initialized (sd-scripts %s)", "ready" if lora_trainer.sd_scripts_installed else "not installed — install via UI") # Initialize RunPod cloud trainer if API key is set runpod_trainer = None runpod_provider = None runpod_key = os.environ.get("RUNPOD_API_KEY") runpod_endpoint_id = os.environ.get("RUNPOD_ENDPOINT_ID") if runpod_key: from content_engine.services.runpod_trainer import RunPodTrainer runpod_trainer = RunPodTrainer(api_key=runpod_key) logger.info("RunPod cloud trainer initialized — cloud LoRA training available") # Initialize RunPod generation provider if endpoint ID is set if runpod_endpoint_id: from content_engine.services.cloud_providers.runpod_provider import RunPodProvider runpod_provider = RunPodProvider(api_key=runpod_key, endpoint_id=runpod_endpoint_id) logger.info("RunPod generation provider initialized (endpoint: %s)", runpod_endpoint_id) else: logger.info("RunPod endpoint not configured — set RUNPOD_ENDPOINT_ID for cloud generation") else: logger.info("RunPod not configured — set RUNPOD_API_KEY for cloud training/generation") routes_training.init_routes(lora_trainer, runpod_trainer=runpod_trainer) # Update generation routes with RunPod provider routes_generation.set_runpod_provider(runpod_provider) logger.info( "Content Engine ready — %d templates, %d characters", len(template_engine.list_templates()), len(character_profiles), ) yield # Shutdown if comfyui_client: await comfyui_client.close() logger.info("Content Engine stopped") # Create the FastAPI app app = FastAPI( title="Content Engine", description="Automated content generation system using ComfyUI", version="0.1.0", lifespan=lifespan, ) # Register route modules app.include_router(routes_ui.router) # UI at / (must be first) app.include_router(routes_generation.router) app.include_router(routes_catalog.router) app.include_router(routes_system.router) app.include_router(routes_training.router) app.include_router(routes_pod.router) app.include_router(routes_video.router) if __name__ == "__main__": import uvicorn uvicorn.run( "content_engine.main:app", host="0.0.0.0", port=8000, reload=True, )