"""Local worker that processes generation jobs through ComfyUI. The local worker takes a generation job, builds a ComfyUI workflow, submits it to the local ComfyUI instance, waits for completion, downloads the result, saves it to the output directory, and records metadata in the catalog. """ from __future__ import annotations import logging import time import uuid from typing import Any from content_engine.config import settings from content_engine.services.catalog import CatalogService from content_engine.services.comfyui_client import ComfyUIClient from content_engine.services.template_engine import TemplateEngine from content_engine.services.workflow_builder import WorkflowBuilder logger = logging.getLogger(__name__) class LocalWorker: """Processes generation jobs on the local GPU via ComfyUI.""" def __init__( self, comfyui_client: ComfyUIClient, workflow_builder: WorkflowBuilder, template_engine: TemplateEngine, catalog: CatalogService, ): self.comfyui = comfyui_client self.workflow_builder = workflow_builder self.template_engine = template_engine self.catalog = catalog async def process_job( self, *, job_id: str | None = None, batch_id: str | None = None, character_id: str | None = None, template_id: str | None = None, content_rating: str = "sfw", positive_prompt: str | None = None, negative_prompt: str | None = None, checkpoint: str | None = None, loras: list[dict[str, Any]] | None = None, seed: int = -1, steps: int | None = None, cfg: float | None = None, sampler: str | None = None, scheduler: str | None = None, width: int | None = None, height: int | None = None, variables: dict[str, str] | None = None, denoise: float | None = None, reference_image: str | None = None, ) -> str: """Process a single generation job. Returns the catalog image ID.""" job_id = job_id or str(uuid.uuid4()) gen = settings.generation # Resolve prompt from template if template_id provided rendered_loras = loras or [] if template_id and self.template_engine: rendered = self.template_engine.render(template_id, variables or {}) if not positive_prompt: positive_prompt = rendered.positive_prompt if not negative_prompt: negative_prompt = rendered.negative_prompt if not rendered_loras: rendered_loras = rendered.loras # Apply defaults checkpoint = checkpoint or gen.default_checkpoint steps = steps or gen.default_steps cfg = cfg or gen.default_cfg sampler = sampler or gen.default_sampler scheduler = scheduler or gen.default_scheduler width = width or gen.default_width height = height or gen.default_height # Build filename short_id = job_id[:8] seed_val = seed if seed >= 0 else 0 char_prefix = character_id or "gen" tmpl_prefix = template_id or "direct" filename_prefix = f"{char_prefix}_{tmpl_prefix}_{short_id}_{seed_val}" # Select workflow template based on content rating and mode if reference_image: workflow_template = f"sd15_img2img_{content_rating}" else: workflow_template = f"sd15_base_{content_rating}" # Build ComfyUI workflow workflow = self.workflow_builder.build( template_name=workflow_template, checkpoint=checkpoint, positive_prompt=positive_prompt or "", negative_prompt=negative_prompt or "", loras=rendered_loras, seed=seed, steps=steps, cfg=cfg, sampler_name=sampler, scheduler=scheduler, width=width, height=height, filename_prefix=filename_prefix, denoise=denoise, reference_image=reference_image, ) # Submit to ComfyUI and wait logger.info("Submitting job %s to ComfyUI", job_id) start_time = time.time() result = await self.comfyui.generate(workflow) generation_time = time.time() - start_time logger.info( "Job %s completed in %.1fs, %d images", job_id, generation_time, len(result.images), ) # Download and save each output image image_id = None for img_output in result.images: image_bytes = await self.comfyui.download_image(img_output) # Resolve output path output_path = self.catalog.resolve_output_path( character_id=character_id or "uncategorized", content_rating=content_rating, filename=img_output.filename, ) # Save to disk output_path.write_bytes(image_bytes) logger.info("Saved image to %s", output_path) # Record in catalog image_id = await self.catalog.insert_image( file_path=str(output_path), image_bytes=image_bytes, character_id=character_id, template_id=template_id, content_rating=content_rating, batch_id=batch_id, positive_prompt=positive_prompt, negative_prompt=negative_prompt, checkpoint=checkpoint, loras=rendered_loras, seed=seed, steps=steps, cfg=cfg, sampler=sampler, scheduler=scheduler, width=width, height=height, generation_backend="local", comfyui_prompt_id=result.prompt_id, generation_time_seconds=generation_time, variables=variables, ) return image_id or ""