diff --git a/app/core/__init__.py b/app/core/__init__.py deleted file mode 100644 index 55b7c2e5385d09661a9e55736ec5ba58ed6df432..0000000000000000000000000000000000000000 --- a/app/core/__init__.py +++ /dev/null @@ -1 +0,0 @@ -"""Package initialization file for Stable Diffusion Image Generator.""" diff --git a/app/generator.py b/app/generator.py deleted file mode 100644 index a664d95f8781fecd427d5ace9865377651ce6d66..0000000000000000000000000000000000000000 --- a/app/generator.py +++ /dev/null @@ -1,83 +0,0 @@ -"""Image generation wrapper around a loaded StableDiffusionPipeline. - -Provides: -- generate_image(...) -> (PIL.Image, metadata) -- deterministic seed handling -""" - -import time -from typing import Any, Dict, Optional - -import torch - -from app.utils.logger import get_logger - -logger = get_logger(__name__) - - -def _validate_resolution(width: int, height: int): - # clamp and snap to multiples of 64 (SD requirement) - width = max(256, min(width, 768)) - height = max(256, min(height, 768)) - width = (width // 64) * 64 - height = (height // 64) * 64 - return int(width), int(height) - - -def generate_image( - pipe, - prompt: str, - negative_prompt: Optional[str] = None, - steps: int = 30, - guidance_scale: float = 7.5, - width: int = 512, - height: int = 512, - seed: Optional[int] = None, - device: str = "cuda", -): - """Generate a single image and return (PIL.Image, metadata dict).""" - start = time.time() - width, height = _validate_resolution(width, height) - - # Generator for reproducibility - if seed is None: - # create a new seed and use it - seed = int(torch.seed() & ((1 << 63) - 1)) - gen = torch.Generator(device if device != "cpu" else "cpu").manual_seed(int(seed)) - - logger.info( - ( - f"Generating: steps={steps}, cfg={guidance_scale},\ - res={width}x{height}, seed={seed}" - ) - ) - - # Use autocast for speed/precision management - device_type = "cuda" if device != "cpu" else "cpu" - with torch.autocast(device_type=device_type): - result = pipe( - prompt=prompt, - negative_prompt=negative_prompt if negative_prompt else None, - num_inference_steps=int(steps), - guidance_scale=float(guidance_scale), - width=width, - height=height, - generator=gen, - ) - - img = result.images[0] # PIL image - elapsed = time.time() - start - - metadata: Dict[str, Any] = { - "prompt": prompt, - "negative_prompt": negative_prompt, - "steps": steps, - "guidance_scale": guidance_scale, - "width": width, - "height": height, - "seed": int(seed), - "elapsed_seconds": elapsed, - } - - logger.info(f"Generation finished in {elapsed:.2f}s") - return img, metadata diff --git a/app/img2img.py b/app/img2img.py deleted file mode 100644 index 0de570f36b45b55ae62600ea857612eb0d6c8ae9..0000000000000000000000000000000000000000 --- a/app/img2img.py +++ /dev/null @@ -1,175 +0,0 @@ -"""Image-to-image generation using Stable Diffusion. - -This module provides: -- prepare_img2img_pipeline: build an Img2Img pipeline from an existing txt2img pipe. -- generate_img2img: run image-to-image generation and return (PIL.Image, metadata). -""" - -from __future__ import annotations - -import time -from pathlib import Path -from typing import Any, Dict, Optional, Union - -import torch -from diffusers import StableDiffusionImg2ImgPipeline -from PIL import Image - -from app.utils.logger import get_logger - -logger = get_logger(__name__) - - -def _validate_resolution(width: int, height: int) -> tuple[int, int]: - """Clamp resolution to a safe range and snap to multiples of 64.""" - width = max(256, min(width, 768)) - height = max(256, min(height, 768)) - width = (width // 64) * 64 - height = (height // 64) * 64 - return int(width), int(height) - - -def _load_init_image( - image: Union[Image.Image, str, Path], - width: int, - height: int, -) -> Image.Image: - """Load and preprocess the init image for img2img.""" - if isinstance(image, (str, Path)): - image = Image.open(image) - - if not isinstance(image, Image.Image): - raise TypeError("init_image must be a PIL.Image or a valid image path.") - - image = image.convert("RGB") - image = image.resize((width, height), resample=Image.LANCZOS) - return image - - -def prepare_img2img_pipeline( - base_pipe, - model_id: str = "runwayml/stable-diffusion-v1-5", -) -> StableDiffusionImg2ImgPipeline: - """Create an Img2Img pipeline that shares weights with the base txt2img pipe. - - Tries to use StableDiffusionImg2ImgPipeline.from_pipe to reuse: - - UNet - - VAE - - text encoder - - tokenizer - - scheduler - """ - try: - img2img_pipe = StableDiffusionImg2ImgPipeline.from_pipe(base_pipe) - logger.info("Created Img2Img pipeline from existing base pipeline.") - except Exception as err: - logger.info("from_pipe failed (%s); falling back to from_pretrained.", err) - img2img_pipe = StableDiffusionImg2ImgPipeline.from_pretrained( - model_id, - torch_dtype=base_pipe.unet.dtype, - safety_checker=None, - ) - device = next(base_pipe.unet.parameters()).device - img2img_pipe = img2img_pipe.to(device) - - # memory optimizations similar to txt2img pipeline - try: - img2img_pipe.enable_attention_slicing() - logger.info("Enabled attention slicing on Img2Img pipeline.") - except Exception: - logger.info("Attention slicing not available on Img2Img pipeline.") - - try: - if hasattr(img2img_pipe.vae, "enable_tiling"): - img2img_pipe.vae.enable_tiling() - logger.info("Enabled VAE tiling on Img2Img pipeline.") - except Exception: - pass - - return img2img_pipe - - -def generate_img2img( - pipe: StableDiffusionImg2ImgPipeline, - init_image: Union[Image.Image, str, Path], - prompt: str, - negative_prompt: Optional[str] = None, - strength: float = 0.7, - steps: int = 30, - guidance_scale: float = 7.5, - width: int = 512, - height: int = 512, - seed: Optional[int] = None, - device: str = "cuda", -) -> tuple[Image.Image, Dict[str, Any]]: - """Run image-to-image generation. - - Args: - pipe: A StableDiffusionImg2ImgPipeline. - init_image: Base image (PIL or path). - prompt: Text prompt to guide the transformation. - negative_prompt: What to avoid in the output. - strength: How strong the transformation is (0-1). - steps: Number of inference steps. - guidance_scale: Prompt adherence strength. - width: Target width (snapped to 64 multiple). - height: Target height (snapped to 64 multiple). - seed: Optional random seed for reproducibility. - device: "cuda" or "cpu". - - Returns: - (PIL.Image, metadata dict) - """ - if not (0.0 < strength <= 1.0): - raise ValueError("strength must be in (0, 1].") - - start = time.time() - width, height = _validate_resolution(width, height) - init_image = _load_init_image(init_image, width, height) - - # Seed handling - if seed is None: - seed = int(torch.seed() & ((1 << 63) - 1)) - - gen = torch.Generator(device if device != "cpu" else "cpu").manual_seed(int(seed)) - - logger.info( - "Img2Img: steps=%s cfg=%s strength=%.2f res=%sx%s seed=%s", - steps, - guidance_scale, - strength, - width, - height, - seed, - ) - - device_type = "cuda" if device != "cpu" else "cpu" - with torch.autocast(device_type=device_type): - result = pipe( - prompt=prompt, - negative_prompt=negative_prompt if negative_prompt else None, - image=init_image, - strength=float(strength), - num_inference_steps=int(steps), - guidance_scale=float(guidance_scale), - generator=gen, - ) - - out_image = result.images[0] - elapsed = time.time() - start - - metadata: Dict[str, Any] = { - "mode": "img2img", - "prompt": prompt, - "negative_prompt": negative_prompt, - "steps": steps, - "guidance_scale": guidance_scale, - "width": width, - "height": height, - "seed": int(seed), - "strength": float(strength), - "elapsed_seconds": elapsed, - } - - logger.info("Img2Img finished in %.2fs", elapsed) - return out_image, metadata diff --git a/app/models/__init__.py b/app/models/__init__.py deleted file mode 100644 index 55b7c2e5385d09661a9e55736ec5ba58ed6df432..0000000000000000000000000000000000000000 --- a/app/models/__init__.py +++ /dev/null @@ -1 +0,0 @@ -"""Package initialization file for Stable Diffusion Image Generator.""" diff --git a/app/models/metadata.py b/app/models/metadata.py deleted file mode 100644 index aec96b53702ff65149d5ad82672b5dd4fd0fba6f..0000000000000000000000000000000000000000 --- a/app/models/metadata.py +++ /dev/null @@ -1 +0,0 @@ -"""Auto-generated placeholder module for Stable Diffusion Image Generator.""" diff --git a/app/presets/__init__.py b/app/presets/__init__.py deleted file mode 100644 index 55b7c2e5385d09661a9e55736ec5ba58ed6df432..0000000000000000000000000000000000000000 --- a/app/presets/__init__.py +++ /dev/null @@ -1 +0,0 @@ -"""Package initialization file for Stable Diffusion Image Generator.""" diff --git a/app/presets/styles.py b/app/presets/styles.py deleted file mode 100644 index aec96b53702ff65149d5ad82672b5dd4fd0fba6f..0000000000000000000000000000000000000000 --- a/app/presets/styles.py +++ /dev/null @@ -1 +0,0 @@ -"""Auto-generated placeholder module for Stable Diffusion Image Generator.""" diff --git a/app/ui.py b/app/ui.py deleted file mode 100644 index aec96b53702ff65149d5ad82672b5dd4fd0fba6f..0000000000000000000000000000000000000000 --- a/app/ui.py +++ /dev/null @@ -1 +0,0 @@ -"""Auto-generated placeholder module for Stable Diffusion Image Generator.""" diff --git a/app/upscaler/__init__.py b/app/upscaler/__init__.py deleted file mode 100644 index 55b7c2e5385d09661a9e55736ec5ba58ed6df432..0000000000000000000000000000000000000000 --- a/app/upscaler/__init__.py +++ /dev/null @@ -1 +0,0 @@ -"""Package initialization file for Stable Diffusion Image Generator.""" diff --git a/app/upscaler/realesrgan.py b/app/upscaler/realesrgan.py deleted file mode 100644 index c25486243fdd9627dba2b1e17b7e26e2664049af..0000000000000000000000000000000000000000 --- a/app/upscaler/realesrgan.py +++ /dev/null @@ -1,55 +0,0 @@ -"""NCNN RealESRGAN upscaler wrapper. - -This module exposes: -- NCNNUpscaler: provides lightweight 2x/4x super-resolution via realesrgan-ncnn-py. -""" - -from __future__ import annotations - -from PIL import Image -from realesrgan_ncnn_py import Realesrgan - -from app.utils.logger import get_logger - -logger = get_logger(__name__) - -# Supported scales mapped to internal model indices -SCALE_TO_MODEL = { - 2.0: 3, # realesrgan-x2plus - 4.0: 0, # realesrgan-x4plus -} - - -class NCNNUpscaler: - """Lightweight NCNN RealESRGAN engine using realesrgan-ncnn-py. - - Args: - scale (float): Supported values = 2.0 or 4.0. - """ - - def __init__(self, scale: float = 2.0): - """Initialize the NCNN upscaler.""" - if scale not in SCALE_TO_MODEL: - raise ValueError("Only 2.0x and 4.0x supported for your NCNN build") - - self.scale = scale - self.model_index = SCALE_TO_MODEL[scale] - - logger.info( - f"[NCNN] Loading RealESRGAN model index={self.model_index} \ - for scale={scale}x" - ) - - self.model = Realesrgan(model=self.model_index) - - def upscale(self, image: Image.Image) -> Image.Image: - """Upscale a PIL image using NCNN RealESRGAN.""" - if not isinstance(image, Image.Image): - raise TypeError("Input must be a PIL.Image") - - logger.info( - f"[NCNN] Upscaling ({image.width}x{image.height}) " - f"by {self.scale}x using model={self.model_index}" - ) - - return self.model.process_pil(image) diff --git a/app/upscaler/upscaler.py b/app/upscaler/upscaler.py deleted file mode 100644 index 8cbd7546c79fd83aaed8e83eb45ebcaaf0d8deb6..0000000000000000000000000000000000000000 --- a/app/upscaler/upscaler.py +++ /dev/null @@ -1,39 +0,0 @@ -"""Unified upscaler interface. - -Chooses between: -- NCNN RealESRGAN (fastest, works on NVIDIA/AMD/Intel) -- Future SD-upscaler backend -""" - -from __future__ import annotations - -from PIL import Image - -from app.upscaler.realesrgan import NCNNUpscaler -from app.utils.logger import get_logger - -logger = get_logger(__name__) - - -class Upscaler: - """Unified high-level upscaling wrapper.""" - - def __init__(self, scale: float = 2.0, prefer: str = "ncnn"): - """Initialize the upscaler with given backend preference.""" - logger.info(f"Upscaler initializing (prefer={prefer}, scale={scale})") - - self.engine = None - - if prefer in ("ncnn", "auto"): - try: - self.engine = NCNNUpscaler(scale=scale) - logger.info("Using NCNN RealESRGAN engine.") - return - except Exception as err: - logger.warning(f"NCNN RealESRGAN init failed: {err}") - - raise RuntimeError("No valid upscaler engine available.") - - def upscale(self, image: Image.Image) -> Image.Image: - """Upscale the given image.""" - return self.engine.upscale(image) diff --git a/app/utils/__init__.py b/app/utils/__init__.py deleted file mode 100644 index 55b7c2e5385d09661a9e55736ec5ba58ed6df432..0000000000000000000000000000000000000000 --- a/app/utils/__init__.py +++ /dev/null @@ -1 +0,0 @@ -"""Package initialization file for Stable Diffusion Image Generator.""" diff --git a/app/utils/history.py b/app/utils/history.py deleted file mode 100644 index aec96b53702ff65149d5ad82672b5dd4fd0fba6f..0000000000000000000000000000000000000000 --- a/app/utils/history.py +++ /dev/null @@ -1 +0,0 @@ -"""Auto-generated placeholder module for Stable Diffusion Image Generator.""" diff --git a/app/utils/logger.py b/app/utils/logger.py deleted file mode 100644 index 72d94fb09b0d57b54a5c8ba720d6913c23883471..0000000000000000000000000000000000000000 --- a/app/utils/logger.py +++ /dev/null @@ -1,51 +0,0 @@ -"""Centralized logging utility for the project. - -Features: -- Colored console logs -- File logs (logs/app.log) -- Timestamped + module-aware output -""" - -import logging -import os -from logging.handlers import RotatingFileHandler - -LOG_DIR = "logs" -LOG_FILE = os.path.join(LOG_DIR, "app.log") - -os.makedirs(LOG_DIR, exist_ok=True) - - -def get_logger(name: str = "app", level=logging.INFO) -> logging.Logger: - """Returns a configured logger instance. - - Safe to call from any module. - """ - logger = logging.getLogger(name) - logger.setLevel(level) - - if logger.hasHandlers(): - return logger - - # Console handler - console_handler = logging.StreamHandler() - console_format = ( - "\033[36m[%(asctime)s] [%(name)s] \ - [%(levelname)s]\033[0m " - "%(message)s" - ) - console_handler.setFormatter(logging.Formatter(console_format, "%Y-%m-%d %H:%M:%S")) - - # File handler - file_handler = RotatingFileHandler( - LOG_FILE, - maxBytes=5_000_000, - backupCount=3, - ) - file_format = "[%(asctime)s] [%(name)s] [%(levelname)s] %(message)s" - file_handler.setFormatter(logging.Formatter(file_format, "%Y-%m-%d %H:%M:%S")) - - logger.addHandler(console_handler) - logger.addHandler(file_handler) - - return logger diff --git a/app/utils/seed.py b/app/utils/seed.py deleted file mode 100644 index aec96b53702ff65149d5ad82672b5dd4fd0fba6f..0000000000000000000000000000000000000000 --- a/app/utils/seed.py +++ /dev/null @@ -1 +0,0 @@ -"""Auto-generated placeholder module for Stable Diffusion Image Generator.""" diff --git a/assets/__init__.py b/assets/__init__.py deleted file mode 100644 index 55b7c2e5385d09661a9e55736ec5ba58ed6df432..0000000000000000000000000000000000000000 --- a/assets/__init__.py +++ /dev/null @@ -1 +0,0 @@ -"""Package initialization file for Stable Diffusion Image Generator.""" diff --git a/assets/lora/__init__.py b/assets/lora/__init__.py deleted file mode 100644 index 55b7c2e5385d09661a9e55736ec5ba58ed6df432..0000000000000000000000000000000000000000 --- a/assets/lora/__init__.py +++ /dev/null @@ -1 +0,0 @@ -"""Package initialization file for Stable Diffusion Image Generator.""" diff --git a/main.py b/main.py deleted file mode 100644 index aec96b53702ff65149d5ad82672b5dd4fd0fba6f..0000000000000000000000000000000000000000 --- a/main.py +++ /dev/null @@ -1 +0,0 @@ -"""Auto-generated placeholder module for Stable Diffusion Image Generator.""" diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000000000000000000000000000000000000..f0c4f8df38e7e0aadbb6ae2298db619d6f8c011e --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,12 @@ +[project] +name = "sdgen" +version = "0.0.0" +requires-python = ">=3.10" +dependencies = [] + +[build-system] +requires = ["setuptools", "wheel"] +build-backend = "setuptools.build_meta" + +[project.scripts] +sdgen = "sdgen.main:main" diff --git a/requirements.txt b/requirements.txt index 6b77aa024c24b57f1e201a91a4750680f11573bb..76a7764b05a93af6d49b3eebeb3ab92a720b0008 100644 --- a/requirements.txt +++ b/requirements.txt @@ -16,7 +16,7 @@ safetensors==0.4.2 # UI FRAMEWORK -gradio==4.29.0 +gradio==3.50.2 # IMAGE PROCESSING & UTILITIES @@ -36,3 +36,5 @@ realesrgan-ncnn-py==2.0.0 black==24.3.0 ruff==0.3.5 pre-commit==3.7.0 + +-e . \ No newline at end of file diff --git a/src/assets/history/entries/30517a2b-2b3f-468d-a22c-0365852e9fd4.json b/src/assets/history/entries/30517a2b-2b3f-468d-a22c-0365852e9fd4.json new file mode 100644 index 0000000000000000000000000000000000000000..b56b5af5d6675984b15856d0159e8aaaa909e278 --- /dev/null +++ b/src/assets/history/entries/30517a2b-2b3f-468d-a22c-0365852e9fd4.json @@ -0,0 +1,15 @@ +{ + "mode": "txt2img", + "prompt": "oil painting, impasto brush strokes, classical lighting, Rembrandt style", + "negative_prompt": "blurry, cartoonish, digital artifacts", + "steps": 40, + "guidance_scale": 8.5, + "width": 512, + "height": 768, + "seed": 7008176382479260353, + "elapsed_seconds": 20.270400285720825, + "timestamp": "2025-12-04T10:18:33.634636", + "id": "30517a2b-2b3f-468d-a22c-0365852e9fd4", + "thumbnail": "/home/sanskar-modi/current_working_personal_projects/stable-diffusion-image-generator/src/assets/history/thumbnails/30517a2b-2b3f-468d-a22c-0365852e9fd4.png", + "full_image": "/home/sanskar-modi/current_working_personal_projects/stable-diffusion-image-generator/src/assets/history/full/30517a2b-2b3f-468d-a22c-0365852e9fd4.png" +} \ No newline at end of file diff --git a/src/assets/history/entries/6c2372b4-ad89-4f9a-845d-729447fbfc42.json b/src/assets/history/entries/6c2372b4-ad89-4f9a-845d-729447fbfc42.json new file mode 100644 index 0000000000000000000000000000000000000000..b69db2c7c64d0a5d5597832d7a343eeefe8d0e46 --- /dev/null +++ b/src/assets/history/entries/6c2372b4-ad89-4f9a-845d-729447fbfc42.json @@ -0,0 +1,15 @@ +{ + "mode": "txt2img", + "prompt": "oil painting, impasto brush strokes, classical lighting, Rembrandt style", + "negative_prompt": "blurry, cartoonish, digital artifacts", + "steps": 40, + "guidance_scale": 8.5, + "width": 512, + "height": 768, + "seed": 8697126389267085321, + "elapsed_seconds": 18.847933292388916, + "timestamp": "2025-12-04T07:59:00.004141", + "id": "6c2372b4-ad89-4f9a-845d-729447fbfc42", + "thumbnail": "/home/sanskar-modi/current_working_personal_projects/stable-diffusion-image-generator/src/assets/history/thumbnails/6c2372b4-ad89-4f9a-845d-729447fbfc42.png", + "full_image": "/home/sanskar-modi/current_working_personal_projects/stable-diffusion-image-generator/src/assets/history/full/6c2372b4-ad89-4f9a-845d-729447fbfc42.png" +} \ No newline at end of file diff --git a/src/assets/history/entries/763d106c-d607-4a42-a4c7-4264c54d0033.json b/src/assets/history/entries/763d106c-d607-4a42-a4c7-4264c54d0033.json new file mode 100644 index 0000000000000000000000000000000000000000..8fd9d0b3619a7aec4c6df1acb653d0819cfb8a00 --- /dev/null +++ b/src/assets/history/entries/763d106c-d607-4a42-a4c7-4264c54d0033.json @@ -0,0 +1,15 @@ +{ + "mode": "txt2img", + "prompt": "ultra realistic, 35mm photography, photorealistic, cinematic lighting", + "negative_prompt": "low quality, blurry, deformed, extra limbs", + "steps": 28, + "guidance_scale": 7.5, + "width": 512, + "height": 512, + "seed": 7647575900507438056, + "elapsed_seconds": 8.190003871917725, + "timestamp": "2025-12-04T07:58:10.667954", + "id": "763d106c-d607-4a42-a4c7-4264c54d0033", + "thumbnail": "/home/sanskar-modi/current_working_personal_projects/stable-diffusion-image-generator/src/assets/history/thumbnails/763d106c-d607-4a42-a4c7-4264c54d0033.png", + "full_image": "/home/sanskar-modi/current_working_personal_projects/stable-diffusion-image-generator/src/assets/history/full/763d106c-d607-4a42-a4c7-4264c54d0033.png" +} \ No newline at end of file diff --git a/src/assets/history/entries/a6861b54-0afb-4b32-bb33-cacdadd7e639.json b/src/assets/history/entries/a6861b54-0afb-4b32-bb33-cacdadd7e639.json new file mode 100644 index 0000000000000000000000000000000000000000..b81d1072f36740f1f95f5307caa2972ff631a969 --- /dev/null +++ b/src/assets/history/entries/a6861b54-0afb-4b32-bb33-cacdadd7e639.json @@ -0,0 +1,15 @@ +{ + "mode": "txt2img", + "prompt": "dramatic cinematic lighting, moody, film grain, Kodak Portra, filmic color grading", + "negative_prompt": "oversaturated, low detail, flat lighting", + "steps": 30, + "guidance_scale": 7.0, + "width": 768, + "height": 512, + "seed": 2005184672833822731, + "elapsed_seconds": 16.372806072235107, + "timestamp": "2025-12-04T10:26:46.533003", + "id": "a6861b54-0afb-4b32-bb33-cacdadd7e639", + "thumbnail": "/home/sanskar-modi/current_working_personal_projects/stable-diffusion-image-generator/src/assets/history/thumbnails/a6861b54-0afb-4b32-bb33-cacdadd7e639.png", + "full_image": "/home/sanskar-modi/current_working_personal_projects/stable-diffusion-image-generator/src/assets/history/full/a6861b54-0afb-4b32-bb33-cacdadd7e639.png" +} \ No newline at end of file diff --git a/src/assets/history/entries/dbc7d811-d607-4432-a10f-94245b06a629.json b/src/assets/history/entries/dbc7d811-d607-4432-a10f-94245b06a629.json new file mode 100644 index 0000000000000000000000000000000000000000..f58d105ac18d63989f61559642e4dea2225aaa8c --- /dev/null +++ b/src/assets/history/entries/dbc7d811-d607-4432-a10f-94245b06a629.json @@ -0,0 +1,15 @@ +{ + "mode": "txt2img", + "prompt": "cyberpunk city, neon reflections, wet streets, high detail, synthwave", + "negative_prompt": "low detail, daytime, blurry", + "steps": 50, + "guidance_scale": 15.0, + "width": 768, + "height": 768, + "seed": 1759688396546594556, + "elapsed_seconds": 43.618977308273315, + "timestamp": "2025-12-04T09:47:30.607670", + "id": "dbc7d811-d607-4432-a10f-94245b06a629", + "thumbnail": "/home/sanskar-modi/current_working_personal_projects/stable-diffusion-image-generator/src/assets/history/thumbnails/dbc7d811-d607-4432-a10f-94245b06a629.png", + "full_image": "/home/sanskar-modi/current_working_personal_projects/stable-diffusion-image-generator/src/assets/history/full/dbc7d811-d607-4432-a10f-94245b06a629.png" +} \ No newline at end of file diff --git a/src/assets/history/index.json b/src/assets/history/index.json new file mode 100644 index 0000000000000000000000000000000000000000..88cbd9302269e5a674277959e7de5b064295e21c --- /dev/null +++ b/src/assets/history/index.json @@ -0,0 +1,52 @@ +[ + { + "id": "a6861b54-0afb-4b32-bb33-cacdadd7e639", + "prompt": "dramatic cinematic lighting, moody, film grain, Kodak Portra, filmic color grading", + "mode": "txt2img", + "seed": 2005184672833822731, + "width": 768, + "height": 512, + "timestamp": "2025-12-04T10:26:46.533003", + "thumbnail": "/home/sanskar-modi/current_working_personal_projects/stable-diffusion-image-generator/src/assets/history/thumbnails/a6861b54-0afb-4b32-bb33-cacdadd7e639.png" + }, + { + "id": "30517a2b-2b3f-468d-a22c-0365852e9fd4", + "prompt": "oil painting, impasto brush strokes, classical lighting, Rembrandt style", + "mode": "txt2img", + "seed": 7008176382479260353, + "width": 512, + "height": 768, + "timestamp": "2025-12-04T10:18:33.634636", + "thumbnail": "/home/sanskar-modi/current_working_personal_projects/stable-diffusion-image-generator/src/assets/history/thumbnails/30517a2b-2b3f-468d-a22c-0365852e9fd4.png" + }, + { + "id": "dbc7d811-d607-4432-a10f-94245b06a629", + "prompt": "cyberpunk city, neon reflections, wet streets, high detail, synthwave", + "mode": "txt2img", + "seed": 1759688396546594556, + "width": 768, + "height": 768, + "timestamp": "2025-12-04T09:47:30.607670", + "thumbnail": "/home/sanskar-modi/current_working_personal_projects/stable-diffusion-image-generator/src/assets/history/thumbnails/dbc7d811-d607-4432-a10f-94245b06a629.png" + }, + { + "id": "6c2372b4-ad89-4f9a-845d-729447fbfc42", + "prompt": "oil painting, impasto brush strokes, classical lighting, Rembrandt style", + "mode": "txt2img", + "seed": 8697126389267085321, + "width": 512, + "height": 768, + "timestamp": "2025-12-04T07:59:00.004141", + "thumbnail": "/home/sanskar-modi/current_working_personal_projects/stable-diffusion-image-generator/src/assets/history/thumbnails/6c2372b4-ad89-4f9a-845d-729447fbfc42.png" + }, + { + "id": "763d106c-d607-4a42-a4c7-4264c54d0033", + "prompt": "ultra realistic, 35mm photography, photorealistic, cinematic lighting", + "mode": "txt2img", + "seed": 7647575900507438056, + "width": 512, + "height": 512, + "timestamp": "2025-12-04T07:58:10.667954", + "thumbnail": "/home/sanskar-modi/current_working_personal_projects/stable-diffusion-image-generator/src/assets/history/thumbnails/763d106c-d607-4a42-a4c7-4264c54d0033.png" + } +] \ No newline at end of file diff --git a/src/sdgen/__init__.py b/src/sdgen/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..d1ff4eea818bfbcd9bf3ec8c3ece1ce387c9fc44 --- /dev/null +++ b/src/sdgen/__init__.py @@ -0,0 +1,5 @@ +from __future__ import annotations + +from .main import main + +__all__ = ["main"] diff --git a/src/sdgen/config/__init__.py b/src/sdgen/config/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..1634f7b8e873da224060588313e812f20ad786ae --- /dev/null +++ b/src/sdgen/config/__init__.py @@ -0,0 +1,29 @@ +"""Configuration exports for the sdgen package. + +This module re-exports commonly used configuration paths and settings +so they can be imported directly from `sdgen.config`. +""" + +from __future__ import annotations + +from .paths import ( + ASSETS_ROOT, + HISTORY_ENTRIES_DIR, + HISTORY_FULL_DIR, + HISTORY_ROOT, + HISTORY_THUMBS_DIR, + LOGS_ROOT, + PROJECT_ROOT, +) +from .settings import AppSettings + +__all__ = [ + "AppSettings", + "PROJECT_ROOT", + "ASSETS_ROOT", + "HISTORY_ROOT", + "HISTORY_ENTRIES_DIR", + "HISTORY_THUMBS_DIR", + "HISTORY_FULL_DIR", + "LOGS_ROOT", +] diff --git a/src/sdgen/config/paths.py b/src/sdgen/config/paths.py new file mode 100644 index 0000000000000000000000000000000000000000..7efeeab585b984b782d4495d851305250e13f4c8 --- /dev/null +++ b/src/sdgen/config/paths.py @@ -0,0 +1,44 @@ +"""Path configuration for sdgen. + +All filesystem paths are resolved relative to the project root. +The project root is detected by walking upward until a marker +file (e.g., `pyproject.toml` or `.git`) is found. +""" + +from __future__ import annotations + +from pathlib import Path + + +def _detect_project_root() -> Path: + """Return the project root by scanning upward for a marker file.""" + current = Path(__file__).resolve() + + for parent in current.parents: + if (parent / "pyproject.toml").exists() or (parent / ".git").exists(): + return parent + + # Fallback: use the last resolved parent + return current.parents[-1] + + +PROJECT_ROOT: Path = _detect_project_root() + +ASSETS_ROOT: Path = PROJECT_ROOT / "src" / "assets" +ASSETS_ROOT.mkdir(parents=True, exist_ok=True) + +HISTORY_ROOT: Path = ASSETS_ROOT / "history" +HISTORY_ENTRIES_DIR: Path = HISTORY_ROOT / "entries" +HISTORY_THUMBS_DIR: Path = HISTORY_ROOT / "thumbnails" +HISTORY_FULL_DIR: Path = HISTORY_ROOT / "full" + +for p in [ + HISTORY_ROOT, + HISTORY_ENTRIES_DIR, + HISTORY_THUMBS_DIR, + HISTORY_FULL_DIR, +]: + p.mkdir(parents=True, exist_ok=True) + +LOGS_ROOT: Path = PROJECT_ROOT / "logs" +LOGS_ROOT.mkdir(parents=True, exist_ok=True) diff --git a/src/sdgen/config/settings.py b/src/sdgen/config/settings.py new file mode 100644 index 0000000000000000000000000000000000000000..bf486365065b86cc5bdcf3400647f78305be7f91 --- /dev/null +++ b/src/sdgen/config/settings.py @@ -0,0 +1,31 @@ +"""Application runtime settings for sdgen. + +AppSettings reads configuration values from environment variables at +process start and exposes them as strongly typed attributes. +""" + +from __future__ import annotations + +import os +from dataclasses import dataclass + + +@dataclass +class AppSettings: + """Config values for the Stable Diffusion app. + + Supported environment variables: + - MODEL_ID: HuggingFace model name + - XFORMERS: 1/0 to enable xformers + - WARMUP: 1/0 to warm up CUDA kernels + - PORT: server port for Gradio + - HOST: server host address + - SHARE: enable Gradio public sharing link + """ + + model_id: str = os.getenv("MODEL_ID", "runwayml/stable-diffusion-v1-5") + enable_xformers: bool = bool(int(os.getenv("XFORMERS", "0"))) + warmup: bool = bool(int(os.getenv("WARMUP", "1"))) + server_port: int = int(os.getenv("PORT", "7860")) + server_host: str = os.getenv("HOST", "0.0.0.0") + share: bool = bool(int(os.getenv("SHARE", "1"))) diff --git a/src/sdgen/main.py b/src/sdgen/main.py new file mode 100644 index 0000000000000000000000000000000000000000..c9968b674ee5df3a722fc0573b289231143b8784 --- /dev/null +++ b/src/sdgen/main.py @@ -0,0 +1,65 @@ +"""Main entrypoint for the Stable Diffusion application. + +This module initializes the text-to-image and image-to-image pipelines, +sets up the UI, and launches the Gradio interface. +""" + +from __future__ import annotations + +import torch +from dotenv import load_dotenv + +from sdgen.config import AppSettings +from sdgen.sd.img2img import prepare_img2img_pipeline +from sdgen.sd.pipeline import load_pipeline, warmup_pipeline +from sdgen.ui import build_ui +from sdgen.utils.logger import get_logger + +logger = get_logger(__name__) +load_dotenv() + + +def detect_device() -> str: + """Return `"cuda"` if a GPU is available, otherwise `"cpu"`. + + Returns: + The selected device string. + """ + if torch.cuda.is_available(): + logger.info("CUDA available → using GPU") + return "cuda" + + logger.warning("CUDA not detected → falling back to CPU") + return "cpu" + + +def main() -> None: + """Start the Stable Diffusion UI and initialize inference pipelines.""" + settings = AppSettings() + model_id = settings.model_id + + device = detect_device() + + logger.info("Loading pipeline %s", model_id) + pipe = load_pipeline( + model_id=model_id, + device=device, + use_fp16=device == "cuda", + enable_xformers=settings.enable_xformers, + ) + + if device == "cuda" and settings.warmup: + warmup_pipeline(pipe) + + img2img_pipe = prepare_img2img_pipeline(pipe) + + demo = build_ui(pipe, img2img_pipe) + demo.launch( + server_name=settings.server_host, + server_port=settings.server_port, + share=settings.share, + ) + + +if __name__ == "__main__": + main() diff --git a/src/sdgen/presets/__init__.py b/src/sdgen/presets/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..cc7835593c1a32ba00bd1957bd8d48d126ac6c74 --- /dev/null +++ b/src/sdgen/presets/__init__.py @@ -0,0 +1,5 @@ +from __future__ import annotations + +from .styles import get_preset, list_presets + +__all__ = ["get_preset", "list_presets"] diff --git a/src/sdgen/presets/styles.py b/src/sdgen/presets/styles.py new file mode 100644 index 0000000000000000000000000000000000000000..5f08a97d7b021572c2f3025e2890184fde494c2b --- /dev/null +++ b/src/sdgen/presets/styles.py @@ -0,0 +1,95 @@ +"""Preset configurations for text-to-image generation. + +This module defines a collection of named presets including prompt, +negative prompt, sampler parameters, and recommended resolutions. +""" + +from __future__ import annotations + +from typing import Any, Dict, List + +# Global preset registry: {preset_name: parameters} +PRESETS: Dict[str, Dict[str, Any]] = { + "Realistic Photo": { + "prompt": ( + "ultra realistic, 35mm photography, \ + photorealistic, " + "cinematic lighting" + ), + "negative_prompt": "low quality, blurry, deformed, extra limbs", + "steps": 28, + "guidance_scale": 7.5, + "width": 512, + "height": 512, + "note": "Natural lighting, sharp details, realistic skin texture", + "tags": ["realistic", "photo"], + }, + "Anime": { + "prompt": ( + "high quality anime, clean lines, vibrant colors, \ + soft rim lighting, " + "studio lighting" + ), + "negative_prompt": "blurry, low detail, mutation, deformed", + "steps": 30, + "guidance_scale": 8.0, + "width": 512, + "height": 512, + "note": "Use for anime-style character generation", + "tags": ["anime", "stylized"], + }, + "Cinematic / Moody": { + "prompt": ( + "dramatic cinematic lighting, moody, film grain, \ + Kodak Portra, " + "filmic color grading" + ), + "negative_prompt": "oversaturated, low detail, flat lighting", + "steps": 30, + "guidance_scale": 7.0, + "width": 768, + "height": 512, + "note": "Wider aspect ratio for cinematic feel", + "tags": ["cinematic", "moody"], + }, + "Oil Painting / Classic Art": { + "prompt": ( + "oil painting, impasto brush strokes, classical \ + lighting, " + "Rembrandt style" + ), + "negative_prompt": "blurry, cartoonish, digital artifacts", + "steps": 40, + "guidance_scale": 8.5, + "width": 512, + "height": 768, + "note": "Painterly aesthetic reminiscent of classical oil art", + "tags": ["art", "oil", "painterly"], + }, + "Cyberpunk / Neon": { + "prompt": ( + "cyberpunk city, neon reflections, wet streets, \ + high detail, " + "synthwave aesthetic" + ), + "negative_prompt": "low detail, daytime, blurry", + "steps": 30, + "guidance_scale": 7.5, + "width": 512, + "height": 768, + "note": "Vibrant neon-lit futuristic look", + "tags": ["cyberpunk", "neon"], + }, +} + + +def get_preset(name: str) -> Dict[str, Any] | None: + """Return a shallow copy of a preset by name.""" + data = PRESETS.get(name) + return dict(data) if data else None + + +def list_presets() -> List[str]: + """List preset names in a stable UI order.""" + # Avoid unexpected reordering: use insertion order + return list(PRESETS.keys()) diff --git a/src/sdgen/sd/__init__.py b/src/sdgen/sd/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..cbcfc8a2030ae1431149cdba2f9474f94c2827af --- /dev/null +++ b/src/sdgen/sd/__init__.py @@ -0,0 +1,18 @@ +from __future__ import annotations + +from .generator import generate_image +from .img2img import generate_img2img, prepare_img2img_pipeline +from .models import GenerationMetadata, HistorySummary, Img2ImgConfig, Txt2ImgConfig +from .pipeline import load_pipeline, warmup_pipeline + +__all__ = [ + "Txt2ImgConfig", + "Img2ImgConfig", + "GenerationMetadata", + "HistorySummary", + "generate_image", + "generate_img2img", + "prepare_img2img_pipeline", + "load_pipeline", + "warmup_pipeline", +] diff --git a/src/sdgen/sd/generator.py b/src/sdgen/sd/generator.py new file mode 100644 index 0000000000000000000000000000000000000000..480c09b2fc304f2d534168ecef4f2e964e24171f --- /dev/null +++ b/src/sdgen/sd/generator.py @@ -0,0 +1,76 @@ +"""Text-to-image generation with clean metadata output.""" + +from __future__ import annotations + +import time +from typing import Tuple + +import torch +from PIL import Image + +from sdgen.sd.models import GenerationMetadata, Txt2ImgConfig +from sdgen.utils.common import validate_resolution +from sdgen.utils.logger import get_logger + +logger = get_logger(__name__) + + +def generate_image( + pipe: any, + cfg: Txt2ImgConfig, +) -> Tuple[Image.Image, GenerationMetadata]: + """Generate an image from text using a Stable Diffusion pipeline. + + Args: + pipe: A diffusers StableDiffusionPipeline instance. + cfg: Structured configuration for text-to-image generation. + + Returns: + A tuple of (PIL image, GenerationMetadata). + """ + width, height = validate_resolution(cfg.width, cfg.height) + start = time.time() + + seed = cfg.seed + if seed is None: + seed = int(torch.seed() & ((1 << 63) - 1)) + + device = cfg.device + gen = torch.Generator("cpu" if device == "cpu" else device).manual_seed(int(seed)) + + logger.info( + "txt2img: steps=%s cfg=%s res=%sx%s seed=%s", + cfg.steps, + cfg.guidance_scale, + width, + height, + seed, + ) + + autocast_device = device if device == "cuda" else "cpu" + with torch.autocast(device_type=autocast_device): + out = pipe( + prompt=cfg.prompt, + negative_prompt=cfg.negative_prompt or None, + width=width, + height=height, + num_inference_steps=int(cfg.steps), + guidance_scale=float(cfg.guidance_scale), + generator=gen, + ) + + img = out.images[0] + elapsed = time.time() - start + + meta = GenerationMetadata( + mode="txt2img", + prompt=cfg.prompt, + negative_prompt=cfg.negative_prompt or "", + steps=int(cfg.steps), + guidance_scale=float(cfg.guidance_scale), + width=width, + height=height, + seed=int(seed), + elapsed_seconds=float(elapsed), + ) + return img, meta diff --git a/src/sdgen/sd/img2img.py b/src/sdgen/sd/img2img.py new file mode 100644 index 0000000000000000000000000000000000000000..697bee3248ed387ee54811b9f4e4ae6a29a8ebed --- /dev/null +++ b/src/sdgen/sd/img2img.py @@ -0,0 +1,136 @@ +"""Img2Img pipeline setup and generation utilities.""" + +from __future__ import annotations + +import time + +import torch +from diffusers import StableDiffusionImg2ImgPipeline +from PIL import Image + +from sdgen.sd.models import GenerationMetadata, Img2ImgConfig +from sdgen.utils.common import validate_resolution +from sdgen.utils.logger import get_logger + +logger = get_logger(__name__) + + +def prepare_img2img_pipeline( + base_pipe: StableDiffusionImg2ImgPipeline, + model_id: str = "runwayml/stable-diffusion-v1-5", +) -> StableDiffusionImg2ImgPipeline: + """Create an Img2Img pipeline using an existing base pipeline. + + Attempts `from_pipe` first for efficiency, then falls back to + a clean `from_pretrained` load if necessary. + + Args: + base_pipe: Loaded text-to-image Stable Diffusion pipeline. + model_id: Fallback Hugging Face model ID. + + Returns: + Configured `StableDiffusionImg2ImgPipeline`. + """ + try: + pipe = StableDiffusionImg2ImgPipeline.from_pipe(base_pipe) + logger.info("Img2Img pipeline created via from_pipe().") + except Exception as exc: + logger.warning("from_pipe() failed: %s → falling back.", exc) + pipe = StableDiffusionImg2ImgPipeline.from_pretrained( + model_id, + torch_dtype=base_pipe.unet.dtype, + safety_checker=None, + ) + device = next(base_pipe.unet.parameters()).device + pipe = pipe.to(device) + + # Optimizations + try: + pipe.enable_attention_slicing() + except Exception: + pass + + try: + if hasattr(pipe.vae, "enable_tiling"): + pipe.vae.enable_tiling() + except Exception: + pass + + return pipe + + +def generate_img2img( + pipe: StableDiffusionImg2ImgPipeline, + cfg: Img2ImgConfig, + init_image: Image.Image, +) -> tuple[Image.Image, GenerationMetadata]: + """Run Img2Img generation using the configured pipeline and metadata config. + + Args: + pipe: Stable Diffusion Img2Img pipeline. + cfg: Img2Img inference settings (prompt, steps, etc.). + init_image: The source image to transform. + + Raises: + ValueError: If strength is outside (0, 1]. + + Returns: + A tuple of `(output_image, metadata)`. + """ + if not (0.0 < cfg.strength <= 1.0): + raise ValueError("strength must be in (0, 1].") + + width, height = validate_resolution(cfg.width, cfg.height) + start = time.time() + + # Deterministic seed + seed = cfg.seed + if seed is None: + seed = int(torch.seed() & ((1 << 63) - 1)) + + # Resize input + init = init_image.convert("RGB").resize((width, height), Image.LANCZOS) + + # Correct generator device + device = cfg.device if cfg.device in ("cuda", "cpu") else "cuda" + generator = torch.Generator(device).manual_seed(int(seed)) + + logger.info( + "img2img: steps=%s cfg=%s strength=%.2f res=%sx%s seed=%s", + cfg.steps, + cfg.guidance_scale, + cfg.strength, + width, + height, + seed, + ) + + # Autocast context + autocast_device = "cuda" if device == "cuda" else "cpu" + with torch.autocast(device_type=autocast_device): + out = pipe( + prompt=cfg.prompt, + negative_prompt=cfg.neg_prompt or None, + image=init, + strength=float(cfg.strength), + num_inference_steps=int(cfg.steps), + guidance_scale=float(cfg.guidance_scale), + generator=generator, + ) + + img = out.images[0] + elapsed = time.time() - start + + meta = GenerationMetadata( + mode="img2img", + prompt=cfg.prompt, + negative_prompt=cfg.neg_prompt or "", + steps=int(cfg.steps), + guidance_scale=float(cfg.guidance_scale), + width=width, + height=height, + seed=int(seed), + strength=float(cfg.strength), + elapsed_seconds=float(elapsed), + ) + return img, meta diff --git a/src/sdgen/sd/models.py b/src/sdgen/sd/models.py new file mode 100644 index 0000000000000000000000000000000000000000..7c4b80d8ee1ec9d564183d0e63d4bec0b0ed1140 --- /dev/null +++ b/src/sdgen/sd/models.py @@ -0,0 +1,121 @@ +"""Configuration dataclasses for Stable Diffusion execution and history storage.""" + +from __future__ import annotations + +from dataclasses import asdict, dataclass, field +from datetime import datetime +from typing import Any, Dict, Optional + + +@dataclass +class Txt2ImgConfig: + """Configuration for text-to-image generation. + + Attributes: + prompt: Positive prompt text. + negative_prompt: Negative prompt text. + steps: Number of diffusion steps. + guidance_scale: Classifier-free guidance scale. + width: Requested image width. + height: Requested image height. + seed: Optional random seed. + device: Target torch device ("cuda" or "cpu"). + """ + + prompt: str + negative_prompt: str = "" + steps: int = 30 + guidance_scale: float = 7.5 + width: int = 512 + height: int = 512 + seed: Optional[int] = None + device: str = "cuda" + + +@dataclass +class Img2ImgConfig: + """Configuration for image-to-image generation. + + Attributes: + prompt: Positive prompt text. + init_image_path: Optional file path to source image. + negative_prompt: Negative prompt text. + strength: Img2Img blend strength in (0, 1]. + steps: Number of diffusion steps. + guidance_scale: CFG scale. + width: Requested image width. + height: Requested image height. + seed: Optional random seed. + device: Target device. + """ + + prompt: str + init_image_path: Optional[str] = None + negative_prompt: str = "" + strength: float = 0.7 + steps: int = 30 + guidance_scale: float = 7.5 + width: int = 512 + height: int = 512 + seed: Optional[int] = None + device: str = "cuda" + + +@dataclass +class GenerationMetadata: + """Output metadata for a generated image. + + Attributes: + mode: Generation mode ("txt2img", "img2img", "upscale", ...). + prompt: Prompt text. + negative_prompt: Negative prompt text. + steps: Number of diffusion steps. + guidance_scale: CFG scale. + width: Output width. + height: Output height. + seed: Resolved random seed. + strength: Img2Img strength; None for Txt2Img. + elapsed_seconds: Wall-clock runtime. + timestamp: UTC timestamp. + id: Unique entry ID. + thumbnail: Local thumbnail path. + full_image: Local full-size image path. + """ + + mode: str + prompt: str + negative_prompt: str = "" + steps: int = 30 + guidance_scale: float = 7.5 + width: int = 512 + height: int = 512 + seed: Optional[int] = None + strength: Optional[float] = None + elapsed_seconds: float = 0.0 + timestamp: str = field(default_factory=lambda: datetime.utcnow().isoformat()) + id: Optional[str] = None + thumbnail: Optional[str] = None + full_image: Optional[str] = None + + def to_dict(self) -> Dict[str, Any]: + """Return a dict representation excluding None values.""" + data = asdict(self) + return {key: value for key, value in data.items() if value is not None} + + +@dataclass +class HistorySummary: + """Minimal entry used for UI history lists.""" + + id: str + prompt: str + mode: str + seed: Optional[int] + width: int + height: int + timestamp: str + thumbnail: str + + def to_dict(self) -> Dict[str, Any]: + """Return a serializable dict representation.""" + return asdict(self) diff --git a/app/pipeline.py b/src/sdgen/sd/pipeline.py similarity index 58% rename from app/pipeline.py rename to src/sdgen/sd/pipeline.py index 78a4d0c553cd98d33754079b3599b1dd49dd4617..747800a75709654c40f96d27ea7a6a114e0b7030 100644 --- a/app/pipeline.py +++ b/src/sdgen/sd/pipeline.py @@ -1,33 +1,26 @@ -"""Model pipeline loader for Stable Diffusion (HuggingFace Diffusers). +"""Stable Diffusion pipeline loading and warmup helpers.""" -load_pipeline(...) returns a GPU-ready pipeline with memory optimizations. -""" +from __future__ import annotations import os from typing import Optional import torch -from diffusers import ( - DPMSolverMultistepScheduler, - StableDiffusionPipeline, -) -from dotenv import load_dotenv +from diffusers import DPMSolverMultistepScheduler, StableDiffusionPipeline -from app.utils.logger import get_logger +from sdgen.utils.logger import get_logger logger = get_logger(__name__) -load_dotenv() -def _try_enable_xformers(pipe): +def _try_enable_xformers(pipe: StableDiffusionPipeline) -> None: + """Enable xFormers memory-efficient attention if available.""" try: if hasattr(pipe, "enable_xformers_memory_efficient_attention"): pipe.enable_xformers_memory_efficient_attention() logger.info("Enabled xFormers memory-efficient attention.") - else: - logger.info("xFormers not available via API; skipping.") - except Exception as err: - logger.info(f"xFormers not enabled: {err}") + except Exception as exc: + logger.info("xFormers not enabled: %s", exc) def load_pipeline( @@ -36,9 +29,21 @@ def load_pipeline( use_fp16: bool = True, enable_xformers: bool = False, torch_dtype: Optional[torch.dtype] = None, - scheduler=None, -): - """Load and return an optimized StableDiffusionPipeline.""" + scheduler: Optional[DPMSolverMultistepScheduler] = None, +) -> StableDiffusionPipeline: + """Load the Stable Diffusion pipeline with optional scheduler and xFormers. + + Args: + model_id: HuggingFace model ID. + device: Execution device ("cuda" or "cpu"). + use_fp16: Enable float16 precision on CUDA. + enable_xformers: Whether to enable xFormers attention. + torch_dtype: Explicit dtype override. + scheduler: Optional preconfigured scheduler. + + Returns: + A configured `StableDiffusionPipeline` instance. + """ if torch_dtype is None: torch_dtype = torch.float16 if use_fp16 and device == "cuda" else torch.float32 @@ -51,7 +56,12 @@ def load_pipeline( except Exception: scheduler = None - logger.info(f"Loading pipeline {model_id} " f"dtype={torch_dtype} on {device} ...") + logger.info( + "Loading pipeline %s dtype=%s on %s", + model_id, + torch_dtype, + device, + ) pipe = StableDiffusionPipeline.from_pretrained( model_id, @@ -59,9 +69,7 @@ def load_pipeline( safety_checker=None, scheduler=scheduler, use_auth_token=os.getenv("HUGGINGFACE_HUB_TOKEN"), - ) - - pipe = pipe.to(device) + ).to(device) try: pipe.enable_attention_slicing() @@ -87,36 +95,37 @@ def load_pipeline( def warmup_pipeline( - pipe, + pipe: StableDiffusionPipeline, prompt: str = "A photo of a cat", height: int = 512, width: int = 512, -): - """Run a quick inference to allocate CUDA kernels and memory.""" +) -> None: + """Run a one-step warmup pass to initialize CUDA kernels.""" try: if hasattr(pipe, "parameters"): device = next(pipe.parameters()).device else: device = "cuda" - except Exception: device = "cuda" try: - gen = torch.Generator(device if device != "cpu" else "cpu").manual_seed(0) + gen_device = "cpu" if str(device) == "cpu" else device + generator = torch.Generator(gen_device).manual_seed(0) logger.info("Warmup: running one-step inference to initialize kernels.") - - _ = pipe( + pipe( prompt=prompt, num_inference_steps=1, guidance_scale=1.0, height=height, width=width, - generator=gen, + generator=generator, ) - torch.cuda.empty_cache() + if device == "cuda": + torch.cuda.empty_cache() + logger.info("Warmup complete.") - except Exception as err: - logger.warning(f"Warmup failed: {err}") + except Exception as exc: + logger.warning("Warmup failed: %s", exc) diff --git a/src/sdgen/ui/__init__.py b/src/sdgen/ui/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..605d66d4b4fdef233faabe72f1e843faea7768dc --- /dev/null +++ b/src/sdgen/ui/__init__.py @@ -0,0 +1,5 @@ +from __future__ import annotations + +from sdgen.ui.layout import build_ui + +__all__ = ["build_ui"] diff --git a/src/sdgen/ui/layout.py b/src/sdgen/ui/layout.py new file mode 100644 index 0000000000000000000000000000000000000000..756d0834bea351864de2ecae586326b726681900 --- /dev/null +++ b/src/sdgen/ui/layout.py @@ -0,0 +1,184 @@ +"""UI layout builder for the Stable Diffusion Gradio app.""" + +from __future__ import annotations + +from typing import Any, Dict, Tuple + +import gradio as gr + +from sdgen.sd.generator import generate_image +from sdgen.sd.img2img import generate_img2img +from sdgen.sd.models import Img2ImgConfig, Txt2ImgConfig +from sdgen.ui.tabs import ( + build_history_tab, + build_img2img_tab, + build_presets_tab, + build_txt2img_tab, + build_upscaler_tab, +) +from sdgen.ui.tabs.img2img_tab import Img2ImgControls +from sdgen.ui.tabs.txt2img_tab import Txt2ImgControls +from sdgen.upscaler.upscaler import Upscaler +from sdgen.utils.common import pretty_json, to_pil +from sdgen.utils.history import save_history_entry +from sdgen.utils.logger import get_logger + +logger = get_logger(__name__) + + +def _resolve_seed(value: Any) -> int | None: + """Return integer seed if valid, otherwise None.""" + if value is None: + return None + if isinstance(value, int): + return value + text = str(value).strip() + if not text: + return None + try: + return int(text) + except ValueError: + logger.warning("Invalid seed input: %s", value) + return None + + +def _txt2img_handler( + pipe: Any, + prompt: str, + negative: str, + steps: int, + guidance: float, + width: int, + height: int, + seed: Any, +) -> Tuple[Any, str]: + """Run text-to-image generation.""" + cfg = Txt2ImgConfig( + prompt=prompt or "", + negative_prompt=negative or "", + steps=int(steps), + guidance_scale=float(guidance), + width=int(width), + height=int(height), + seed=_resolve_seed(seed), + device=pipe.device.type, + ) + + image, meta = generate_image(pipe, cfg) + + try: + save_history_entry(meta, image) + except Exception as exc: # noqa: BLE001 + logger.exception("Failed to save history entry: %s", exc) + + return image, pretty_json(meta.to_dict()) + + +def _img2img_handler( + pipe: Any, + input_image: Any, + prompt: str, + negative: str, + strength: float, + steps: int, + guidance: float, + seed: Any, +) -> Tuple[Any, str]: + """Run image-to-image generation.""" + if input_image is None: + raise gr.Error("Upload an image to continue.") + + pil_image = to_pil(input_image) + + cfg = Img2ImgConfig( + prompt=prompt or "", + negative_prompt=negative or "", + strength=float(strength), + steps=int(steps), + guidance_scale=float(guidance), + width=pil_image.width, + height=pil_image.height, + seed=_resolve_seed(seed), + device=pipe.device.type, + ) + + image, meta = generate_img2img(pipe, cfg, pil_image) + + try: + save_history_entry(meta, image) + except Exception as exc: # noqa: BLE001 + logger.exception("Failed to save history entry: %s", exc) + + return image, pretty_json(meta.to_dict()) + + +def _upscale_handler( + input_image: Any, + scale: str, +) -> Tuple[Any, str]: + """Run image upscaling.""" + if input_image is None: + raise gr.Error("Upload an image to continue.") + + pil_image = to_pil(input_image) + + # scale is str → convert to int + try: + scale_int = int(float(scale)) + except Exception as exc: # noqa: BLE001 + raise gr.Error("Scale must be numeric (2 or 4).") from exc + + upscaler = Upscaler(scale=scale_int, prefer="ncnn") + out_image = upscaler.upscale(pil_image) + + meta: Dict[str, Any] = { + "mode": "upscale", + "scale": scale_int, + "width": out_image.width, + "height": out_image.height, + } + + try: + save_history_entry(meta, out_image) + except Exception as exc: # noqa: BLE001 + logger.exception("Failed to save history entry: %s", exc) + + return out_image, pretty_json(meta) + + +def build_ui(txt2img_pipe: Any, img2img_pipe: Any) -> gr.Blocks: + """Build the entire Gradio UI.""" + with gr.Blocks() as demo: + gr.Markdown( + "# Stable Diffusion Generator\n" + "Clean, local Stable \ + Diffusion toolkit." + ) + + txt_controls: Txt2ImgControls = build_txt2img_tab( + handler=lambda *args: _txt2img_handler(txt2img_pipe, *args), + ) + + img_controls: Img2ImgControls = build_img2img_tab( + handler=lambda *args: _img2img_handler(img2img_pipe, *args), + ) + + build_upscaler_tab( + handler=_upscale_handler, + ) + + build_presets_tab( + txt_controls=txt_controls, + img_controls=img_controls, + ) + + build_history_tab() + + gr.Markdown( + "### Notes\n" + "- Seeds left blank will be randomized.\n" + "- Use **History → Refresh History** if new thumbnails do not appear.\n" + "- Presets apply to both **Text → Image** and **Image → Image** tabs.\n" + ) + + return demo diff --git a/src/sdgen/ui/tabs/__init__.py b/src/sdgen/ui/tabs/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..f7b8b88d03b6d889f8f6f323155e49d69a036e33 --- /dev/null +++ b/src/sdgen/ui/tabs/__init__.py @@ -0,0 +1,15 @@ +from __future__ import annotations + +from .history_tab import build_history_tab +from .img2img_tab import build_img2img_tab +from .presets_tab import build_presets_tab +from .txt2img_tab import build_txt2img_tab +from .upscaler_tab import build_upscaler_tab + +__all__ = [ + "build_txt2img_tab", + "build_img2img_tab", + "build_upscaler_tab", + "build_presets_tab", + "build_history_tab", +] diff --git a/src/sdgen/ui/tabs/history_tab.py b/src/sdgen/ui/tabs/history_tab.py new file mode 100644 index 0000000000000000000000000000000000000000..6df5b1c1b8021713e098e550163cadab6f6d7b43 --- /dev/null +++ b/src/sdgen/ui/tabs/history_tab.py @@ -0,0 +1,162 @@ +"""UI for History section.""" + +from __future__ import annotations + +from typing import Any, Dict, List, Optional, Tuple + +import gradio as gr +from PIL import Image + +from sdgen.utils.common import pretty_json, short_prompt +from sdgen.utils.history import ( + delete_history_entry, + list_history, + load_entry, +) +from sdgen.utils.logger import get_logger + +logger = get_logger(__name__) + + +# Internal helpers + + +def _label(entry: Dict[str, Any]) -> str: + """Human-readable dropdown label.""" + ts = entry.get("timestamp", "")[:19].replace("T", " ") + mode = entry.get("mode", "unknown") + prompt = short_prompt(entry.get("prompt", ""), 60) + return f"{ts} — {mode} — {prompt}" if prompt else f"{ts} — {mode}" + + +def _build_index(limit: int = 500) -> Tuple[List[str], List[str], List[Dict[str, Any]]]: + """Load history index → (ids, labels, raw entries).""" + entries = list_history(limit) + ids = [e.get("id", "") for e in entries] + labels = [_label(e) for e in entries] + return ids, labels, entries + + +def _id_from_label(label: str, entries: List[Dict[str, Any]]) -> Optional[str]: + """Resolve entry ID from label text.""" + for e in entries: + if _label(e) == label: + return e.get("id") + return None + + +# Operations + + +def load_from_dropdown(selected_label: str, entries: List[Dict[str, Any]]): + """Load a history entry from dropdown.""" + if not selected_label: + raise gr.Error("No entry selected.") + + entry_id = _id_from_label(selected_label, entries) + if not entry_id: + raise gr.Error("Entry not found.") + + data = load_entry(entry_id) + if not data: + raise gr.Error("Entry JSON missing.") + + thumb_path = data.get("thumbnail") + img = Image.open(thumb_path) if thumb_path else None + + # pretty_json returns string → JSON component will parse it + return img, pretty_json(data) + + +def refresh_history(): + """Refresh dropdown + state. + + Clear output. + """ + _, labels, entries = _build_index() + if labels: + dd = gr.update(choices=labels, value=labels[0]) + else: + dd = gr.update(choices=[], value=None) + + return dd, entries, None, "" + + +def delete_entry(selected_label: str, entries: List[Dict[str, Any]]): + """Delete and refresh UI.""" + if not selected_label: + raise gr.Error("Select an entry first.") + + entry_id = _id_from_label(selected_label, entries) + if not entry_id: + raise gr.Error("Entry not found.") + + ok = delete_history_entry(entry_id) + if not ok: + raise gr.Error("Delete failed.") + + _, labels, new_entries = _build_index() + + if labels: + dd = gr.update(choices=labels, value=labels[0]) + else: + dd = gr.update(choices=[], value=None) + + return None, "", dd, new_entries + + +# UI + + +def build_history_tab() -> None: + """History tab: dropdown, load button, delete, refresh.""" + _, labels, entries = _build_index() + initial = labels[0] if labels else None + + with gr.Tab("History"): + with gr.Row(): + # Left panel: controls + with gr.Column(scale=1): + dropdown = gr.Dropdown( + label="History entries", + choices=labels, + value=initial, + interactive=True, + ) + + load_btn = gr.Button("Load entry") + refresh_btn = gr.Button("Refresh") + delete_btn = gr.Button("Delete selected", variant="stop") + + # Right panel: output + with gr.Column(scale=2): + thumb = gr.Image( + label="Thumbnail", + show_label=True, + type="pil", + ) + meta = gr.JSON( + label="Metadata", + ) + + state = gr.State(entries) + + # Events + + load_btn.click( + fn=load_from_dropdown, + inputs=[dropdown, state], + outputs=[thumb, meta], + ) + + refresh_btn.click( + fn=refresh_history, + inputs=None, + outputs=[dropdown, state, thumb, meta], + ) + + delete_btn.click( + fn=delete_entry, + inputs=[dropdown, state], + outputs=[thumb, meta, dropdown, state], + ) diff --git a/src/sdgen/ui/tabs/img2img_tab.py b/src/sdgen/ui/tabs/img2img_tab.py new file mode 100644 index 0000000000000000000000000000000000000000..06dd34e491c5c3e269b3392d84c5c63cc56a3ad8 --- /dev/null +++ b/src/sdgen/ui/tabs/img2img_tab.py @@ -0,0 +1,122 @@ +"""UI for image to image generation section.""" + +from __future__ import annotations + +from dataclasses import dataclass +from typing import Any, Callable, Tuple + +import gradio as gr + + +@dataclass +class Img2ImgControls: + """References to Image → Image controls used by the presets tab.""" + + input_image: gr.Image + prompt: gr.Textbox + negative: gr.Textbox + strength: gr.Slider + steps: gr.Slider + guidance: gr.Slider + seed: gr.Textbox + + +def build_img2img_tab(handler: Callable[..., Tuple[Any, dict]]) -> Img2ImgControls: + """Build the Image → Image tab and connect it to the provided handler. + + Args: + handler: A callable accepting the UI inputs and returning: + (output_image, metadata_dict) + + Returns: + Img2ImgControls: A container with references to UI components. + """ + with gr.Tab("Image → Image"): + with gr.Row(): + # Left: Controls + with gr.Column(scale=1): + input_image = gr.Image( + label="Input Image", + type="numpy", + tool="editor", + ) + + prompt = gr.Textbox( + label="Prompt", + placeholder="Describe desired changes...", + ) + + negative = gr.Textbox( + label="Negative Prompt", + placeholder="Artifacts to avoid...", + ) + + strength = gr.Slider( + minimum=0.1, + maximum=1.0, + value=0.6, + step=0.05, + label="Strength", + ) + gr.Markdown( + "Controls how strongly the prompt \ + alters the original image." + ) + + steps = gr.Slider( + minimum=10, + maximum=50, + value=25, + step=1, + label="Steps", + ) + + guidance = gr.Slider( + minimum=1, + maximum=15, + value=7.0, + step=0.5, + label="Guidance Scale", + ) + + seed = gr.Textbox( + label="Seed", + value="", + placeholder="Leave empty for random", + ) + + generate_button = gr.Button("Generate") + + # Right: Output preview + with gr.Column(scale=2): + out_image = gr.Image( + label="Output", + type="pil", + ) + out_metadata = gr.JSON( + label="Metadata", + ) + + generate_button.click( + fn=handler, + inputs=[ + input_image, + prompt, + negative, + strength, + steps, + guidance, + seed, + ], + outputs=[out_image, out_metadata], + ) + + return Img2ImgControls( + input_image=input_image, + prompt=prompt, + negative=negative, + strength=strength, + steps=steps, + guidance=guidance, + seed=seed, + ) diff --git a/src/sdgen/ui/tabs/presets_tab.py b/src/sdgen/ui/tabs/presets_tab.py new file mode 100644 index 0000000000000000000000000000000000000000..1e722e14be04017b2c306ce73e890b03a5ff7859 --- /dev/null +++ b/src/sdgen/ui/tabs/presets_tab.py @@ -0,0 +1,119 @@ +"""UI for presets section.""" + +from __future__ import annotations + +from typing import Any, Tuple + +import gradio as gr + +from sdgen.presets.styles import get_preset, list_presets +from sdgen.ui.tabs.img2img_tab import Img2ImgControls +from sdgen.ui.tabs.txt2img_tab import Txt2ImgControls + + +def apply_preset(preset_name: Any) -> Tuple[Any, ...]: + """Return values to populate txt2img and img2img controls. + + Args: + preset_name: A string or a one-element list representing the preset key. + + Returns: + A tuple with values mapped to Text→Image and Image→Image UI controls. + """ + # unwrap dropdown list behavior + if isinstance(preset_name, (list, tuple)): + preset_name = preset_name[0] if preset_name else None + + if not preset_name: + raise gr.Error("Select a preset first.") + + preset = get_preset(str(preset_name)) + if preset is None: + raise gr.Error("Invalid preset selected.") + + prompt = preset.get("prompt", "") + negative = preset.get("negative_prompt", "") + + steps = int(preset.get("steps", 30)) + guidance = float(preset.get("guidance_scale", 7.5)) + width = int(preset.get("width", 512)) + height = int(preset.get("height", 512)) + + # For Img2Img: + img_steps = max(10, steps) + img_guidance = guidance + img_strength = 0.6 # neutral default + img_seed = "" + + # only return data; UI wiring chooses what to set + status_msg = f"Applied preset: {preset_name}" + + return ( + # txt2img + prompt, + negative, + steps, + guidance, + width, + height, + # img2img + prompt, + negative, + img_steps, + img_guidance, + img_strength, + img_seed, + # status + status_msg, + ) + + +def build_presets_tab( + txt_controls: Txt2ImgControls, + img_controls: Img2ImgControls, +) -> None: + """Construct the Presets tab and link values to both txt2img and img2img controls. + + Args: + txt_controls: References to Text→Image input controls. + img_controls: References to Image→Image input controls. + """ + with gr.Tab("Presets"): + with gr.Row(): + with gr.Column(): + preset_name = gr.Dropdown( + choices=list_presets(), + label="Select style", + ) + apply_button = gr.Button("Apply Preset") + status_box = gr.Markdown("") + + with gr.Column(): + gr.Markdown( + "Applying a preset fills prompt, negative prompt, steps, " + "guidance, and resolution for both **Text → Image** " + "and **Image → Image** tabs.", + ) + + apply_button.click( + fn=apply_preset, + inputs=[preset_name], + outputs=[ + # txt2img + txt_controls.prompt, + txt_controls.negative, + txt_controls.steps, + txt_controls.guidance, + txt_controls.width, + txt_controls.height, + # img2img + img_controls.prompt, + img_controls.negative, + img_controls.steps, + img_controls.guidance, + img_controls.strength, + img_controls.seed, + # status markdown + status_box, + ], + ) diff --git a/src/sdgen/ui/tabs/txt2img_tab.py b/src/sdgen/ui/tabs/txt2img_tab.py new file mode 100644 index 0000000000000000000000000000000000000000..091a36b69efae6c53e9a43464fa8783a9245c4bc --- /dev/null +++ b/src/sdgen/ui/tabs/txt2img_tab.py @@ -0,0 +1,112 @@ +"""UI for text to image generation section.""" + +from __future__ import annotations + +from dataclasses import dataclass +from typing import Callable, Tuple + +import gradio as gr + + +@dataclass +class Txt2ImgControls: + """UI element references for the Text → Image tab. + + These allow the Presets tab to populate the fields programmatically. + """ + + prompt: gr.components.Textbox + negative: gr.components.Textbox + steps: gr.components.Slider + guidance: gr.components.Slider + width: gr.components.Slider + height: gr.components.Slider + seed: gr.components.Textbox + + +def build_txt2img_tab(handler: Callable[..., Tuple]) -> Txt2ImgControls: + """Construct the Text → Image tab and bind the Generate button. + + Args: + handler: Function that performs txt2img and returns (image, metadata). + + Returns: + A Txt2ImgControls instance containing references to all UI controls. + """ + with gr.Tab("Text → Image"): + with gr.Row(): + with gr.Column(): + prompt = gr.Textbox( + label="Prompt", + placeholder="A futuristic city at dusk, cinematic lighting", + ) + negative = gr.Textbox( + label="Negative prompt", + placeholder="low quality, blurry, extra limbs", + ) + + steps = gr.Slider( + minimum=10, + maximum=50, + value=30, + step=1, + label="Steps", + ) + gr.Markdown( + "More steps → finer detail, slower runtime. 20–40 is typical.", + ) + + guidance = gr.Slider( + minimum=1, + maximum=15, + value=7.5, + step=0.5, + label="Guidance Scale (CFG)", + ) + gr.Markdown( + "Higher values make generation match the prompt more strictly. " + "7–9 is a common range.", + ) + + width = gr.Slider( + minimum=256, + maximum=768, + value=512, + step=64, + label="Width", + ) + height = gr.Slider( + minimum=256, + maximum=768, + value=512, + step=64, + label="Height", + ) + + seed = gr.Textbox( + label="Seed (optional)", + value="", + placeholder="Leave empty for random", + ) + + generate_button = gr.Button("Generate") + + with gr.Column(): + out_image = gr.Image(label="Output") + out_meta = gr.JSON(label="Metadata (JSON)") + + generate_button.click( + fn=handler, + inputs=[prompt, negative, steps, guidance, width, height, seed], + outputs=[out_image, out_meta], + ) + + return Txt2ImgControls( + prompt=prompt, + negative=negative, + steps=steps, + guidance=guidance, + width=width, + height=height, + seed=seed, + ) diff --git a/src/sdgen/ui/tabs/upscaler_tab.py b/src/sdgen/ui/tabs/upscaler_tab.py new file mode 100644 index 0000000000000000000000000000000000000000..34a6f54072556478b62779dcde9a9be5d4e5c47f --- /dev/null +++ b/src/sdgen/ui/tabs/upscaler_tab.py @@ -0,0 +1,36 @@ +"""UI for upscaler section.""" + +from __future__ import annotations + +from typing import Callable + +import gradio as gr + + +def build_upscaler_tab(handler: Callable[..., tuple]) -> None: + """Build the Upscaler tab and wire it to the given handler.""" + with gr.Tab("Upscaler"): + with gr.Row(): + with gr.Column(): + input_image = gr.Image( + label="Upload Image to Upscale", + type="numpy", + ) + scale = gr.Radio( + choices=["2.0", "4.0"], + value="2.0", + label="Upscale Factor", + ) + upscale_button = gr.Button("Upscale") + + with gr.Column(): + out_image = gr.Image(label="Upscaled Image") + out_meta = gr.JSON( + label="Metadata (JSON)", + ) + + upscale_button.click( + fn=handler, + inputs=[input_image, scale], + outputs=[out_image, out_meta], + ) diff --git a/app/__init__.py b/src/sdgen/upscaler/__init__.py similarity index 100% rename from app/__init__.py rename to src/sdgen/upscaler/__init__.py diff --git a/src/sdgen/upscaler/realesrgan.py b/src/sdgen/upscaler/realesrgan.py new file mode 100644 index 0000000000000000000000000000000000000000..9741152ec4cd1fc82d71db56c06efd25b3c5e338 --- /dev/null +++ b/src/sdgen/upscaler/realesrgan.py @@ -0,0 +1,85 @@ +"""NCNN RealESRGAN upscaler wrapper. + +This module exposes: + - NCNNUpscaler: lightweight RealESRGAN upscaling (2× or 4×) + backed by realesrgan-ncnn-py. +""" + +from __future__ import annotations + +from typing import Final + +from PIL import Image +from realesrgan_ncnn_py import Realesrgan + +from sdgen.utils.logger import get_logger + +logger = get_logger(__name__) + +# Map scale → realesrgan-ncnn model index +_SCALE_MODEL_MAP: Final[dict[int, int]] = { + 2: 3, # realesrgan-x2plus + 4: 0, # realesrgan-x4plus +} + + +class NCNNUpscaler: + """NCNN RealESRGAN engine using realesrgan-ncnn-py. + + This class provides 2× or 4× super-resolution on CPU/GPU + without requiring the full PyTorch RealESRGAN stack. + + Args: + scale: Target scale factor. Valid values: 2 or 4. + + Raises: + ValueError: If an unsupported scale is provided. + RuntimeError: If the model cannot be loaded. + """ + + def __init__(self, scale: int = 2) -> None: + """Initialize realesrgan.""" + if scale not in _SCALE_MODEL_MAP: + msg = "Scale must be 2 or 4 for NCNN RealESRGAN, got: %s" + raise ValueError(msg % scale) + + self.scale: int = scale + model_index = _SCALE_MODEL_MAP[scale] + + logger.info( + "Initializing NCNN RealESRGAN (scale=%s, model_index=%s)", + scale, + model_index, + ) + + try: + self.model = Realesrgan(model=model_index) + except Exception as exc: # noqa: BLE001 + msg = "Failed to initialize Realesrgan engine: %s" + logger.error(msg, exc) + raise RuntimeError(msg % exc) from exc + + def upscale(self, image: Image.Image) -> Image.Image: + """Upscale a PIL image using the NCNN RealESRGAN engine. + + Args: + image: A PIL.Image instance. + + Returns: + The upscaled PIL.Image. + + Raises: + TypeError: If the input is not a PIL.Image. + """ + if not isinstance(image, Image.Image): + msg = "Input must be a PIL.Image, got: %s" + raise TypeError(msg % type(image).__name__) + + logger.info( + "Upscaling image (%sx%s) by %sx", + image.width, + image.height, + self.scale, + ) + + return self.model.process_pil(image) diff --git a/src/sdgen/upscaler/upscaler.py b/src/sdgen/upscaler/upscaler.py new file mode 100644 index 0000000000000000000000000000000000000000..f316b15922a6d797f5755ee8565078cda0c180a6 --- /dev/null +++ b/src/sdgen/upscaler/upscaler.py @@ -0,0 +1,95 @@ +"""Unified interface for image upscaling. + +This module selects an upscaling backend at runtime. +Currently supported: +- NCNN RealESRGAN (recommended) + +Planned: +- Stable Diffusion-based upscaler +""" + +from __future__ import annotations + +from typing import Optional + +from PIL import Image + +from sdgen.upscaler.realesrgan import NCNNUpscaler +from sdgen.utils.logger import get_logger + +logger = get_logger(__name__) + + +class Upscaler: + """Unified high-level upscaler wrapper. + + Args: + scale: Target scale factor. Typically 2 or 4. + prefer: Preferred backend name: + - "ncnn": NCNN RealESRGAN (local, fast) + - "auto": Try known engines in order + + Raises: + RuntimeError: If no backend could be initialized. + ValueError: Invalid scale value given. + """ + + _VALID_SCALES = {2, 4} + _BACKENDS_ORDER = ("ncnn",) + + def __init__(self, scale: float = 2.0, prefer: str = "ncnn") -> None: + """Initialize upscaler class.""" + if int(scale) not in self._VALID_SCALES: + msg = "Scale must be 2 or 4 for RealESRGAN. Got: %s" + raise ValueError(msg % scale) + + self.scale = int(scale) + self.engine: Optional[object] = None + + logger.info("Upscaler init (prefer=%s, scale=%s)", prefer, self.scale) + + if prefer == "auto": + self._init_auto() + elif prefer == "ncnn": + self._init_ncnn() + else: + msg = "Unknown upscaler backend: %s" + raise ValueError(msg % prefer) + + if self.engine is None: + raise RuntimeError("No valid upscaler engine available.") + + def _init_auto(self) -> None: + """Try available engines in priority order.""" + for backend in self._BACKENDS_ORDER: + try: + if backend == "ncnn": + self._init_ncnn() + return + except Exception as err: # noqa: BLE001 + logger.warning("Upscaler init failed (%s): %s", backend, err) + + def _init_ncnn(self) -> None: + """Initialize RealESRGAN NCNN backend.""" + try: + self.engine = NCNNUpscaler(scale=self.scale) + logger.info("Using NCNN RealESRGAN engine.") + except Exception as err: # noqa: BLE001 + logger.warning("NCNN RealESRGAN init failed: %s", err) + self.engine = None + + def upscale(self, image: Image.Image) -> Image.Image: + """Upscale the given image. + + Args: + image: Input PIL image. + + Returns: + The upscaled PIL image. + + Raises: + RuntimeError: If the engine is not initialized. + """ + if self.engine is None: + raise RuntimeError("Upscaler is not initialized.") + return self.engine.upscale(image) diff --git a/src/sdgen/utils/__init__.py b/src/sdgen/utils/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/src/sdgen/utils/common.py b/src/sdgen/utils/common.py new file mode 100644 index 0000000000000000000000000000000000000000..1797c923adb788e70432b6627bb5faccb247cbe6 --- /dev/null +++ b/src/sdgen/utils/common.py @@ -0,0 +1,112 @@ +"""Utility helpers for image conversion, resolution validation, and formatting.""" + +from __future__ import annotations + +import json +from typing import Any + +import numpy as np +from PIL import Image + + +def validate_resolution(width: int, height: int) -> tuple[int, int]: + """Clamp and align the resolution to multiples of 64 within the SD range. + + Stable Diffusion models expect spatial dimensions that are multiples of 64. + The allowed range is clamped to [256, 768] to avoid excessive memory use. + + Args: + width: Requested width in pixels. + height: Requested height in pixels. + + Returns: + A (width, height) tuple aligned to the valid grid. + """ + width = (max(256, min(width, 768)) // 64) * 64 + height = (max(256, min(height, 768)) // 64) * 64 + return width, height + + +def to_pil(image: Any) -> Image.Image: + """Convert a numpy array to a PIL image, or return the existing PIL image. + + Supports: + - uint8 arrays in shape (H, W) or (H, W, C) + - float arrays assumed to be normalized in [0, 1] + - PIL.Image is returned unchanged + + Args: + image: Input image data, either PIL.Image or numpy.ndarray. + + Returns: + A PIL.Image instance. + + Raises: + TypeError: If the input type is unsupported. + """ + if isinstance(image, Image.Image): + return image + + if isinstance(image, np.ndarray): + arr = image + + # Normalize floats to uint8 safely + if np.issubdtype(arr.dtype, np.floating): + # Clip first to avoid wraparound + arr = np.clip(arr, 0.0, 1.0) + arr = (arr * 255.0).astype("uint8") + elif arr.dtype != np.uint8: + arr = arr.astype("uint8") + + # Grayscale → RGB + if arr.ndim == 2: + arr = np.stack([arr] * 3, axis=-1) + + # Drop alpha channel if present + if arr.ndim == 3 and arr.shape[2] == 4: + arr = arr[..., :3] + + return Image.fromarray(arr) + + raise TypeError( + f"Expected PIL.Image or numpy.ndarray for 'image', got {type(image).__name__!r}" + ) + + +def pretty_json(data: Any) -> str: + """Return a pretty-printed JSON string representation of data. + + Args: + data: Any JSON-serializable object. + + Returns: + A formatted JSON string. If serialization fails, a best-effort string + representation is returned. + """ + try: + return json.dumps(data, ensure_ascii=False, indent=2) + except Exception: + return str(data) + + +def short_prompt(text: str | None, max_len: int = 50) -> str: + """Return a compact single-line prompt suitable for labels. + + Removes newlines and truncates with an ellipsis if longer than max_len. + + Args: + text: The full text prompt. + max_len: Maximum number of characters including ellipsis. + + Returns: + A short display string. + """ + if not text: + return "" + + text = text.replace("\n", " ") + if len(text) <= max_len: + return text + + # Reserve 1 char for ellipsis + return text[: max_len - 1] + "…" diff --git a/src/sdgen/utils/history.py b/src/sdgen/utils/history.py new file mode 100644 index 0000000000000000000000000000000000000000..ce1215f6e35783fcebfc5892f42e4192d08133e3 --- /dev/null +++ b/src/sdgen/utils/history.py @@ -0,0 +1,230 @@ +"""History storage and indexing utilities for generated images. + +This module handles: +- Writing a GenerationMetadata entry (JSON + images) +- Maintaining a compact index.json for fast history listing +- Atomic writes to avoid corruption on crash +- Optional deletion of individual history entries +""" + +from __future__ import annotations + +import json +import tempfile +import uuid +from datetime import datetime +from pathlib import Path +from typing import Any, Dict, List, Optional, Tuple + +from PIL import Image + +from sdgen.config import ( + HISTORY_ENTRIES_DIR, + HISTORY_FULL_DIR, + HISTORY_ROOT, + HISTORY_THUMBS_DIR, +) +from sdgen.sd.models import GenerationMetadata, HistorySummary +from sdgen.utils.logger import get_logger + +logger = get_logger(__name__) + +# Ensure directories exist early +for _path in ( + HISTORY_ROOT, + HISTORY_ENTRIES_DIR, + HISTORY_THUMBS_DIR, + HISTORY_FULL_DIR, +): + _path.mkdir(parents=True, exist_ok=True) + +INDEX_FILE = HISTORY_ROOT / "index.json" + + +# Internal helpers +def _atomic_write(path: Path, data: bytes) -> None: + """Write bytes atomically to avoid partial writes on crash.""" + with tempfile.NamedTemporaryFile(dir=str(path.parent), delete=False) as tmp: + tmp.write(data) + tmp_path = Path(tmp.name) + tmp_path.replace(path) + + +def _read_index() -> List[Dict[str, Any]]: + """Return list of summary dicts from index.json.""" + if not INDEX_FILE.exists(): + return [] + try: + with INDEX_FILE.open("r", encoding="utf-8") as handle: + return json.load(handle) + except Exception as exc: # noqa: BLE001 + logger.warning("Failed to read history index: %s", exc) + return [] + + +def _write_index(index: List[Dict[str, Any]]) -> None: + """Persist index.json safely.""" + try: + payload = json.dumps( + index, + ensure_ascii=False, + indent=2, + ).encode("utf-8") + _atomic_write(INDEX_FILE, payload) + except Exception as exc: # noqa: BLE001 + logger.exception("Failed to write history index: %s", exc) + + +def _save_images( + entry_id: str, + image: Image.Image, + thumb_max_size: int = 256, +) -> Tuple[str, str]: + """Save full PNG and resized thumbnail for given entry ID.""" + full_path = HISTORY_FULL_DIR / f"{entry_id}.png" + thumb_path = HISTORY_THUMBS_DIR / f"{entry_id}.png" + + image.save(full_path, format="PNG") + + thumb = image.copy() + thumb.thumbnail((thumb_max_size, thumb_max_size), Image.LANCZOS) + thumb.save(thumb_path, format="PNG") + + return str(full_path), str(thumb_path) + + +# Public API +def save_history_entry( + metadata: GenerationMetadata, + image: Image.Image, +) -> GenerationMetadata: + """Write a new history entry: images, metadata, and update index.json. + + Args: + metadata: Populated GenerationMetadata (without paths or id) + image: PIL image to save + + Returns: + The metadata object, updated with id, timestamp, and image paths. + """ + entry_id = metadata.id or str(uuid.uuid4()) + full_path, thumb_path = _save_images(entry_id, image) + + # Update metadata object + metadata.id = entry_id + metadata.full_image = full_path + metadata.thumbnail = thumb_path + if not metadata.timestamp: + metadata.timestamp = datetime.utcnow().isoformat() + + # Write metadata JSON + entry_file = HISTORY_ENTRIES_DIR / f"{entry_id}.json" + try: + payload = json.dumps( + metadata.to_dict(), + ensure_ascii=False, + indent=2, + ).encode("utf-8") + _atomic_write(entry_file, payload) + except Exception as exc: # noqa: BLE001 + logger.exception("Failed to write metadata file: %s", exc) + + # Insert at top of index + try: + index = _read_index() + summary = HistorySummary( + id=entry_id, + prompt=metadata.prompt, + mode=metadata.mode, + seed=metadata.seed, + width=metadata.width, + height=metadata.height, + timestamp=metadata.timestamp, + thumbnail=thumb_path, + ) + # de-dupe old + index = [summary.to_dict()] + [e for e in index if e.get("id") != entry_id] + # cap history length + index = index[:500] + _write_index(index) + except Exception as exc: # noqa: BLE001 + logger.exception("Failed to update history index: %s", exc) + + logger.info("Saved history entry %s", entry_id) + return metadata + + +def list_history(n: int = 50) -> List[Dict[str, Any]]: + """Return newest history summary dicts, up to n.""" + index = _read_index() + return index[:n] + + +def load_entry(entry_id: str) -> Optional[Dict[str, Any]]: + """Return the full metadata dict for a specific entry_id, or None.""" + path = HISTORY_ENTRIES_DIR / f"{entry_id}.json" + if not path.exists(): + return None + try: + with path.open("r", encoding="utf-8") as handle: + return json.load(handle) + except Exception as exc: # noqa: BLE001 + logger.exception("Failed to load entry %s: %s", entry_id, exc) + return None + + +def delete_history_entry(entry_id: str) -> bool: + """Delete a history entry JSON + images and update index.json. + + Args: + entry_id: History entry ID to delete. + + Returns: + True if an entry was removed, False if not found. + """ + index = _read_index() + new_index: List[Dict[str, Any]] = [] + removed = False + + for item in index: + if item.get("id") != entry_id: + new_index.append(item) + continue + + removed = True + # Delete thumbnail + thumb = item.get("thumbnail") + if thumb: + thumb_path = Path(thumb) + if thumb_path.exists(): + try: + thumb_path.unlink() + except Exception: # noqa: BLE001 + pass + + # Delete full image (only known from metadata) + entry = load_entry(entry_id) + if entry: + full = entry.get("full_image") + if full: + full_path = Path(full) + if full_path.exists(): + try: + full_path.unlink() + except Exception: # noqa: BLE001 + pass + + # Delete entry file + json_path = HISTORY_ENTRIES_DIR / f"{entry_id}.json" + if json_path.exists(): + try: + json_path.unlink() + except Exception: # noqa: BLE001 + pass + + if not removed: + return False + + _write_index(new_index) + logger.info("Deleted history entry %s", entry_id) + return True diff --git a/src/sdgen/utils/logger.py b/src/sdgen/utils/logger.py new file mode 100644 index 0000000000000000000000000000000000000000..6a5653dc9148a69ca1a1d731d6761a88ccf2f6db --- /dev/null +++ b/src/sdgen/utils/logger.py @@ -0,0 +1,72 @@ +"""Lightweight logger factory for the SDGen application. + +This module centralizes logger configuration to ensure consistent formatting, +file rotation, and prevention of duplicate handlers during repeated imports. +""" + +from __future__ import annotations + +import logging +from logging import Handler, Logger +from logging.handlers import RotatingFileHandler + +from sdgen.config import LOGS_ROOT + +# Ensure logs directory exists +LOGS_ROOT.mkdir(parents=True, exist_ok=True) + +# Cache prevents repeated handler installation for the same logger name +_LOGGER_CACHE: dict[str, Logger] = {} + + +def _build_handler() -> Handler: + """Return a rotating file handler with unified log formatting. + + The handler writes to `app.log` under LOGS_ROOT and uses log rotation + to cap file size and maintain up to 3 backups. + """ + log_file = LOGS_ROOT / "app.log" + handler = RotatingFileHandler( + filename=log_file, + maxBytes=5_000_000, # ~5 MB + backupCount=3, + ) + fmt = "%(asctime)s [%(name)s] [%(levelname)s] %(message)s" + handler.setFormatter(logging.Formatter(fmt)) + return handler + + +def get_logger(name: str) -> Logger: + """Return a configured logger with rotating file and console handlers. + + The returned logger: + - uses INFO level by default + - writes to both stdout and a rotating log file + - does not propagate to root logger + - never duplicates handlers for the same name + + Args: + name: Distinct logger name, generally the module name. + + Returns: + A configured `logging.Logger` instance. + """ + if name in _LOGGER_CACHE: + return _LOGGER_CACHE[name] + + logger = logging.getLogger(name) + logger.setLevel(logging.INFO) + + # Guard against accidentally adding handlers multiple times + if not logger.handlers: + logger.addHandler(_build_handler()) + + stream = logging.StreamHandler() + stream.setFormatter( + logging.Formatter("%(asctime)s [%(name)s]" + "[%(levelname)s] %(message)s") + ) + logger.addHandler(stream) + + logger.propagate = False + _LOGGER_CACHE[name] = logger + return logger