dippoo's picture
Initial deployment - Content Engine
ed37502
raw
history blame
6.07 kB
"""Local worker that processes generation jobs through ComfyUI.
The local worker takes a generation job, builds a ComfyUI workflow,
submits it to the local ComfyUI instance, waits for completion,
downloads the result, saves it to the output directory, and records
metadata in the catalog.
"""
from __future__ import annotations
import logging
import time
import uuid
from typing import Any
from content_engine.config import settings
from content_engine.services.catalog import CatalogService
from content_engine.services.comfyui_client import ComfyUIClient
from content_engine.services.template_engine import TemplateEngine
from content_engine.services.workflow_builder import WorkflowBuilder
logger = logging.getLogger(__name__)
class LocalWorker:
"""Processes generation jobs on the local GPU via ComfyUI."""
def __init__(
self,
comfyui_client: ComfyUIClient,
workflow_builder: WorkflowBuilder,
template_engine: TemplateEngine,
catalog: CatalogService,
):
self.comfyui = comfyui_client
self.workflow_builder = workflow_builder
self.template_engine = template_engine
self.catalog = catalog
async def process_job(
self,
*,
job_id: str | None = None,
batch_id: str | None = None,
character_id: str | None = None,
template_id: str | None = None,
content_rating: str = "sfw",
positive_prompt: str | None = None,
negative_prompt: str | None = None,
checkpoint: str | None = None,
loras: list[dict[str, Any]] | None = None,
seed: int = -1,
steps: int | None = None,
cfg: float | None = None,
sampler: str | None = None,
scheduler: str | None = None,
width: int | None = None,
height: int | None = None,
variables: dict[str, str] | None = None,
denoise: float | None = None,
reference_image: str | None = None,
) -> str:
"""Process a single generation job. Returns the catalog image ID."""
job_id = job_id or str(uuid.uuid4())
gen = settings.generation
# Resolve prompt from template if template_id provided
rendered_loras = loras or []
if template_id and self.template_engine:
rendered = self.template_engine.render(template_id, variables or {})
if not positive_prompt:
positive_prompt = rendered.positive_prompt
if not negative_prompt:
negative_prompt = rendered.negative_prompt
if not rendered_loras:
rendered_loras = rendered.loras
# Apply defaults
checkpoint = checkpoint or gen.default_checkpoint
steps = steps or gen.default_steps
cfg = cfg or gen.default_cfg
sampler = sampler or gen.default_sampler
scheduler = scheduler or gen.default_scheduler
width = width or gen.default_width
height = height or gen.default_height
# Build filename
short_id = job_id[:8]
seed_val = seed if seed >= 0 else 0
char_prefix = character_id or "gen"
tmpl_prefix = template_id or "direct"
filename_prefix = f"{char_prefix}_{tmpl_prefix}_{short_id}_{seed_val}"
# Select workflow template based on content rating and mode
if reference_image:
workflow_template = f"sd15_img2img_{content_rating}"
else:
workflow_template = f"sd15_base_{content_rating}"
# Build ComfyUI workflow
workflow = self.workflow_builder.build(
template_name=workflow_template,
checkpoint=checkpoint,
positive_prompt=positive_prompt or "",
negative_prompt=negative_prompt or "",
loras=rendered_loras,
seed=seed,
steps=steps,
cfg=cfg,
sampler_name=sampler,
scheduler=scheduler,
width=width,
height=height,
filename_prefix=filename_prefix,
denoise=denoise,
reference_image=reference_image,
)
# Submit to ComfyUI and wait
logger.info("Submitting job %s to ComfyUI", job_id)
start_time = time.time()
result = await self.comfyui.generate(workflow)
generation_time = time.time() - start_time
logger.info(
"Job %s completed in %.1fs, %d images",
job_id,
generation_time,
len(result.images),
)
# Download and save each output image
image_id = None
for img_output in result.images:
image_bytes = await self.comfyui.download_image(img_output)
# Resolve output path
output_path = self.catalog.resolve_output_path(
character_id=character_id or "uncategorized",
content_rating=content_rating,
filename=img_output.filename,
)
# Save to disk
output_path.write_bytes(image_bytes)
logger.info("Saved image to %s", output_path)
# Record in catalog
image_id = await self.catalog.insert_image(
file_path=str(output_path),
image_bytes=image_bytes,
character_id=character_id,
template_id=template_id,
content_rating=content_rating,
batch_id=batch_id,
positive_prompt=positive_prompt,
negative_prompt=negative_prompt,
checkpoint=checkpoint,
loras=rendered_loras,
seed=seed,
steps=steps,
cfg=cfg,
sampler=sampler,
scheduler=scheduler,
width=width,
height=height,
generation_backend="local",
comfyui_prompt_id=result.prompt_id,
generation_time_seconds=generation_time,
variables=variables,
)
return image_id or ""