Spaces:
Running
Running
| """Local worker that processes generation jobs through ComfyUI. | |
| The local worker takes a generation job, builds a ComfyUI workflow, | |
| submits it to the local ComfyUI instance, waits for completion, | |
| downloads the result, saves it to the output directory, and records | |
| metadata in the catalog. | |
| """ | |
| from __future__ import annotations | |
| import logging | |
| import time | |
| import uuid | |
| from typing import Any | |
| from content_engine.config import settings | |
| from content_engine.services.catalog import CatalogService | |
| from content_engine.services.comfyui_client import ComfyUIClient | |
| from content_engine.services.template_engine import TemplateEngine | |
| from content_engine.services.workflow_builder import WorkflowBuilder | |
| logger = logging.getLogger(__name__) | |
| class LocalWorker: | |
| """Processes generation jobs on the local GPU via ComfyUI.""" | |
| def __init__( | |
| self, | |
| comfyui_client: ComfyUIClient, | |
| workflow_builder: WorkflowBuilder, | |
| template_engine: TemplateEngine, | |
| catalog: CatalogService, | |
| ): | |
| self.comfyui = comfyui_client | |
| self.workflow_builder = workflow_builder | |
| self.template_engine = template_engine | |
| self.catalog = catalog | |
| async def process_job( | |
| self, | |
| *, | |
| job_id: str | None = None, | |
| batch_id: str | None = None, | |
| character_id: str | None = None, | |
| template_id: str | None = None, | |
| content_rating: str = "sfw", | |
| positive_prompt: str | None = None, | |
| negative_prompt: str | None = None, | |
| checkpoint: str | None = None, | |
| loras: list[dict[str, Any]] | None = None, | |
| seed: int = -1, | |
| steps: int | None = None, | |
| cfg: float | None = None, | |
| sampler: str | None = None, | |
| scheduler: str | None = None, | |
| width: int | None = None, | |
| height: int | None = None, | |
| variables: dict[str, str] | None = None, | |
| denoise: float | None = None, | |
| reference_image: str | None = None, | |
| ) -> str: | |
| """Process a single generation job. Returns the catalog image ID.""" | |
| job_id = job_id or str(uuid.uuid4()) | |
| gen = settings.generation | |
| # Resolve prompt from template if template_id provided | |
| rendered_loras = loras or [] | |
| if template_id and self.template_engine: | |
| rendered = self.template_engine.render(template_id, variables or {}) | |
| if not positive_prompt: | |
| positive_prompt = rendered.positive_prompt | |
| if not negative_prompt: | |
| negative_prompt = rendered.negative_prompt | |
| if not rendered_loras: | |
| rendered_loras = rendered.loras | |
| # Apply defaults | |
| checkpoint = checkpoint or gen.default_checkpoint | |
| steps = steps or gen.default_steps | |
| cfg = cfg or gen.default_cfg | |
| sampler = sampler or gen.default_sampler | |
| scheduler = scheduler or gen.default_scheduler | |
| width = width or gen.default_width | |
| height = height or gen.default_height | |
| # Build filename | |
| short_id = job_id[:8] | |
| seed_val = seed if seed >= 0 else 0 | |
| char_prefix = character_id or "gen" | |
| tmpl_prefix = template_id or "direct" | |
| filename_prefix = f"{char_prefix}_{tmpl_prefix}_{short_id}_{seed_val}" | |
| # Select workflow template based on content rating and mode | |
| if reference_image: | |
| workflow_template = f"sd15_img2img_{content_rating}" | |
| else: | |
| workflow_template = f"sd15_base_{content_rating}" | |
| # Build ComfyUI workflow | |
| workflow = self.workflow_builder.build( | |
| template_name=workflow_template, | |
| checkpoint=checkpoint, | |
| positive_prompt=positive_prompt or "", | |
| negative_prompt=negative_prompt or "", | |
| loras=rendered_loras, | |
| seed=seed, | |
| steps=steps, | |
| cfg=cfg, | |
| sampler_name=sampler, | |
| scheduler=scheduler, | |
| width=width, | |
| height=height, | |
| filename_prefix=filename_prefix, | |
| denoise=denoise, | |
| reference_image=reference_image, | |
| ) | |
| # Submit to ComfyUI and wait | |
| logger.info("Submitting job %s to ComfyUI", job_id) | |
| start_time = time.time() | |
| result = await self.comfyui.generate(workflow) | |
| generation_time = time.time() - start_time | |
| logger.info( | |
| "Job %s completed in %.1fs, %d images", | |
| job_id, | |
| generation_time, | |
| len(result.images), | |
| ) | |
| # Download and save each output image | |
| image_id = None | |
| for img_output in result.images: | |
| image_bytes = await self.comfyui.download_image(img_output) | |
| # Resolve output path | |
| output_path = self.catalog.resolve_output_path( | |
| character_id=character_id or "uncategorized", | |
| content_rating=content_rating, | |
| filename=img_output.filename, | |
| ) | |
| # Save to disk | |
| output_path.write_bytes(image_bytes) | |
| logger.info("Saved image to %s", output_path) | |
| # Record in catalog | |
| image_id = await self.catalog.insert_image( | |
| file_path=str(output_path), | |
| image_bytes=image_bytes, | |
| character_id=character_id, | |
| template_id=template_id, | |
| content_rating=content_rating, | |
| batch_id=batch_id, | |
| positive_prompt=positive_prompt, | |
| negative_prompt=negative_prompt, | |
| checkpoint=checkpoint, | |
| loras=rendered_loras, | |
| seed=seed, | |
| steps=steps, | |
| cfg=cfg, | |
| sampler=sampler, | |
| scheduler=scheduler, | |
| width=width, | |
| height=height, | |
| generation_backend="local", | |
| comfyui_prompt_id=result.prompt_id, | |
| generation_time_seconds=generation_time, | |
| variables=variables, | |
| ) | |
| return image_id or "" | |