salu_Image_Editter / server /editing_stack.py
Raghava Pulugu
Optimize HuggingFace CPU backend: Default to InstructPix2Pix with DPM scheduler, bypass custom model, and set CPU step bounds
7fdfb1a
Raw
History Blame Contribute Delete
20.6 kB
"""
editing_stack.py - Intelligent natural language editing orchestrator.
Parses natural prompts into advanced visual editing pipelines, executing them
instantly using the high-performance CV Engine. Safeguards outputs to prevent
degenerate colorful noise patterns.
"""
from __future__ import annotations
import re
import numpy as np
from dataclasses import dataclass, field
from typing import Any, Optional
from PIL import Image, ImageDraw, ImageEnhance, ImageFilter
from server.cv_engine import CVEditingEngine
try:
import cv2
except ImportError:
cv2 = None
_RESAMPLING = Image.Resampling if hasattr(Image, "Resampling") else Image
@dataclass
class ParsedInstruction:
raw_prompt: str
normalized_prompt: str
operations: list[str] = field(default_factory=list)
style_preset: Optional[str] = None
color_grade: Optional[str] = None
vignette_scale: Optional[float] = None
bloom_intensity: Optional[float] = None
tilt_shift_focus: Optional[float] = None
retouch_faces: bool = False
white_balance: Optional[float] = None # kelvin scaling
curves_preset: Optional[str] = None
local_contrast_amount: Optional[float] = None
broad_edit: bool = False
preserve_identity: bool = True
requires_foundation_model: bool = False
@dataclass
class ImageUnderstanding:
width: int
height: int
brightness: float
detail_score: float
has_face: bool
face_boxes: list[tuple[int, int, int, int]] = field(default_factory=list)
is_portrait: bool = False
@dataclass
class EditPlan:
backend_prompt: str
cv_pipeline: list[str]
preserve_faces: bool
upscale_output: bool
refine_output: bool
notes: list[str] = field(default_factory=list)
@dataclass
class PipelineOutcome:
image: Image.Image
message: str
used_fallback: bool = False
steps: list[str] = field(default_factory=list)
metadata: dict[str, Any] = field(default_factory=dict)
class InstructionParserStage:
"""
Advanced NLP prompt parsing stage. Maps natural instructions to
optimal computer vision & image processing operations.
"""
def __init__(self, engine: Optional[Any] = None):
self.engine = engine
def parse(self, prompt: str) -> ParsedInstruction:
raw = prompt or "enhance"
normalized = " ".join(raw.lower().strip().split())
tokens = set(re.findall(r"[a-z0-9']+", normalized))
ops = []
style_preset = None
color_grade = None
vignette_scale = None
bloom_intensity = None
tilt_shift_focus = None
retouch_faces = False
white_balance = None
curves_preset = None
local_contrast_amount = None
requires_foundation = False
# --- Color Grades & LUTs ---
if any(w in normalized for w in ["teal", "orange", "cinema", "cinematic", "hollywood"]):
color_grade = "teal_orange"
ops.append("teal-orange-lut")
elif any(w in normalized for w in ["vintage", "retro", "classic", "analog", "antique", "film", "kodak"]):
color_grade = "vintage"
ops.append("vintage-lut")
elif any(w in normalized for w in ["cyberpunk", "neon", "tokyo", "futuristic", "synthwave"]):
color_grade = "cyberpunk"
ops.append("cyberpunk-lut")
elif any(w in normalized for w in ["noir", "dramatic black", "high contrast black", "monochrome"]):
color_grade = "noir"
ops.append("noir-lut")
# --- Artistic Styles ---
if any(w in normalized for w in ["watercolor", "watercolour", "water color"]):
style_preset = "watercolor"
ops.append("watercolor-style")
elif any(w in normalized for w in ["oil painting", "oilpaint", "van gogh", "canvas paint", "artistic paint"]):
style_preset = "oil_painting"
ops.append("oil-painting-style")
# --- Vignette ---
if "vignette" in normalized or "dark corners" in normalized:
vignette_scale = 0.7
if "heavy" in normalized or "strong" in normalized:
vignette_scale = 0.5
elif "light" in normalized or "subtle" in normalized:
vignette_scale = 0.85
ops.append("vignette")
# --- Bloom & Glow ---
if any(w in normalized for w in ["bloom", "glow", "dreamy", "ethereal", "soft light", "highlight glow"]):
bloom_intensity = 0.45
if "strong" in normalized or "intense" in normalized:
bloom_intensity = 0.7
elif "subtle" in normalized or "dreamy soft" in normalized:
bloom_intensity = 0.25
ops.append("bloom")
# --- Tilt Shift ---
if any(w in normalized for w in ["tilt shift", "miniature", "macro focus", "toy model"]):
tilt_shift_focus = 0.5
if "top" in normalized:
tilt_shift_focus = 0.25
elif "bottom" in normalized:
tilt_shift_focus = 0.75
ops.append("tilt-shift")
# --- Face & Portrait retouch ---
if any(w in normalized for w in ["retouch", "face", "portrait", "skin", "smooth skin", "make up", "beautify"]):
retouch_faces = True
ops.append("portrait-retouch")
# --- White Balance / Kelvin ---
if any(w in normalized for w in ["warm", "sunset", "amber", "golden hour", "autumn", "fall"]):
white_balance = 1.15 # Warm shift
ops.append("warm-balance")
elif any(w in normalized for w in ["cool", "blue", "winter", "ice", "frost", "snow"]):
white_balance = 0.85 # Cool shift
ops.append("cool-balance")
# --- Tone Curves & Contrast ---
if "matte" in normalized or "faded" in normalized or "flat" in normalized:
curves_preset = "matte"
ops.append("matte-curves")
elif "dramatic contrast" in normalized or "high contrast" in normalized or "s-curve" in normalized:
curves_preset = "dramatic"
ops.append("dramatic-contrast-curves")
elif "lift shadows" in normalized or "brighten dark" in normalized or "shadow recover" in normalized:
curves_preset = "lift"
ops.append("lift-shadows-curves")
# --- Clarity / Local Contrast ---
if any(w in normalized for w in ["clarity", "detail", "sharp", "structure", "local contrast", "definition"]):
local_contrast_amount = 1.25
if "heavy" in normalized or "extreme" in normalized:
local_contrast_amount = 1.5
elif "soft" in normalized or "mild" in normalized:
local_contrast_amount = 1.1
ops.append("clarity-enhancement")
# --- Fallback to Basic Enhancements if empty ---
if not ops:
ops.append("general-enhancement")
# Check if user prompt is a massive structural/generative edit requiring a foundation diffusion model (e.g. background swap, object add/remove)
broad_words = {"background", "replace", "remove", "add", "swap", "clothing", "bikini", "shirt", "pants", "dress", "car", "dog", "cat"}
broad_edit = any(w in tokens for w in broad_words)
# It requires foundation model if it's a broad edit or if it has general-enhancement but the prompt has descriptive terms
enhancement_words = {"enhance", "improve", "auto", "better", "photo", "image", "clean", "fix", "retouch", "beautify", "portrait", "face", "skin", "smooth"}
has_descriptive_terms = any(t not in enhancement_words for t in tokens if len(t) > 2)
requires_foundation = broad_edit or ("general-enhancement" in ops and has_descriptive_terms)
return ParsedInstruction(
raw_prompt=raw,
normalized_prompt=normalized,
operations=ops,
style_preset=style_preset,
color_grade=color_grade,
vignette_scale=vignette_scale,
bloom_intensity=bloom_intensity,
tilt_shift_focus=tilt_shift_focus,
retouch_faces=retouch_faces,
white_balance=white_balance,
curves_preset=curves_preset,
local_contrast_amount=local_contrast_amount,
broad_edit=broad_edit,
preserve_identity=True,
requires_foundation_model=requires_foundation,
)
class ImageUnderstandingStage:
"""Analyze image metadata and face dimensions."""
def analyze(self, image: Image.Image) -> ImageUnderstanding:
rgb = image.convert("RGB")
gray = np.asarray(rgb.convert("L"), dtype=np.uint8)
brightness = float(gray.mean() / 255.0)
detail_score = self._detail_score(gray)
face_boxes = self._detect_faces(gray)
has_face = len(face_boxes) > 0
is_portrait = rgb.height >= rgb.width or has_face
return ImageUnderstanding(
width=rgb.width,
height=rgb.height,
brightness=brightness,
detail_score=detail_score,
has_face=has_face,
face_boxes=face_boxes,
is_portrait=is_portrait,
)
def _detail_score(self, gray: np.ndarray) -> float:
if cv2 is not None:
lap = cv2.Laplacian(gray, cv2.CV_32F)
return float(lap.var())
grad_y, grad_x = np.gradient(gray.astype(np.float32))
return float(np.mean(np.abs(grad_x)) + np.mean(np.abs(grad_y)))
def _detect_faces(self, gray: np.ndarray) -> list[tuple[int, int, int, int]]:
if cv2 is None:
return []
try:
cascade = cv2.CascadeClassifier(
cv2.data.haarcascades + "haarcascade_frontalface_default.xml"
)
faces = cascade.detectMultiScale(
gray,
scaleFactor=1.1,
minNeighbors=5,
minSize=(24, 24),
)
return [(int(x), int(y), int(w), int(h)) for x, y, w, h in faces]
except Exception:
return []
class EditingPlannerStage:
"""Formulate optimal visual processing strategy combining diffusion and CV engine."""
def plan(
self,
parsed: ParsedInstruction,
understanding: ImageUnderstanding,
backend,
) -> EditPlan:
cv_pipeline = list(parsed.operations)
preserve_faces = understanding.has_face and parsed.preserve_identity and not parsed.retouch_faces
return EditPlan(
backend_prompt=parsed.raw_prompt,
cv_pipeline=cv_pipeline,
preserve_faces=preserve_faces,
upscale_output=True,
refine_output=True,
notes=cv_pipeline,
)
class FaceIdentityPreservationStage:
"""Seamlessly blend original facial high-resolution details back after major edits."""
def apply(
self,
original: Image.Image,
edited: Image.Image,
understanding: ImageUnderstanding,
plan: EditPlan,
) -> Image.Image:
if not plan.preserve_faces or not understanding.face_boxes:
return edited.convert("RGB")
working = edited.convert("RGB").copy()
original_resized = original.convert("RGB").resize(working.size, _RESAMPLING.LANCZOS)
scale_x = working.width / max(understanding.width, 1)
scale_y = working.height / max(understanding.height, 1)
for x, y, w, h in understanding.face_boxes:
sx, sy, sw, sh = int(x * scale_x), int(y * scale_y), int(w * scale_x), int(h * scale_y)
margin_x, margin_y = int(sw * 0.35), int(sh * 0.45)
left = max(0, sx - margin_x)
top = max(0, sy - margin_y)
right = min(working.width, sx + sw + margin_x)
bottom = min(working.height, sy + sh + margin_y)
if right <= left or bottom <= top:
continue
box = (left, top, right, bottom)
edited_crop = working.crop(box)
source_crop = original_resized.crop(box)
blended_crop = Image.blend(edited_crop, source_crop, alpha=0.7)
mask = Image.new("L", edited_crop.size, 0)
draw = ImageDraw.Draw(mask)
width, height = edited_crop.size
draw.ellipse((int(width * 0.08), int(height * 0.03), int(width * 0.92), int(height * 0.97)), fill=255)
blur_radius = max(6, int(min(width, height) * 0.12))
mask = mask.filter(ImageFilter.GaussianBlur(radius=blur_radius))
region = Image.composite(blended_crop, edited_crop, mask)
working.paste(region, box)
return working
class EditingOrchestrator:
"""The master coordinator managing the entire image transformation lifecycle."""
def __init__(self, engine: Optional[Any] = None):
self.parser = InstructionParserStage()
self.understanding = ImageUnderstandingStage()
self.planner = EditingPlannerStage()
self.identity = FaceIdentityPreservationStage()
from server.cv_engine import CVEngine
self.engine = engine or CVEngine()
def run(
self,
backend,
fallback,
image: Image.Image,
prompt: str,
num_steps: int,
text_guidance_scale: float,
image_guidance_scale: float,
seed: Optional[int],
background_image: Optional[Image.Image] = None,
reference_image: Optional[Image.Image] = None,
mask: Optional[Image.Image] = None,
prepend_steps: Optional[list[Any]] = None,
disable_diffusion: bool = False,
**kwargs,
) -> PipelineOutcome:
parsed = self.parser.parse(prompt)
understanding = self.understanding.analyze(image)
plan = self.planner.plan(parsed, understanding, backend)
# Base Image setup
edited = image.convert("RGB")
# 1. Execute any prepended steps (e.g. background replacement or style reference)
if prepend_steps:
from server.cv_engine import OperationContext
ctx = OperationContext(
mask=mask,
reference_image=reference_image,
background_image=background_image,
prompt=prompt,
seed=seed,
)
res = self.engine.execute_pipeline(edited, prepend_steps, ctx)
edited = res.image
# 2. Run the diffusion model backend if enabled and available, and if the instruction requires it
if not disable_diffusion and backend is not None and parsed.requires_foundation_model:
try:
res = backend.edit(
image=edited,
prompt=prompt,
num_steps=num_steps,
text_guidance_scale=text_guidance_scale,
image_guidance_scale=image_guidance_scale,
seed=seed,
mask=mask,
reference_image=reference_image,
background_image=background_image,
)
edited = res.image
if "diffusion" not in plan.cv_pipeline:
plan.cv_pipeline.append("diffusion")
except Exception as e:
print(f"Diffusion backend error: {e}, falling back to CV operations only.")
# =====================================================================
# PROCESS PIPELINE THROUGH THE HIGH-FIDELITY CV ENGINE
# =====================================================================
try:
if parsed.white_balance is not None:
edited = CVEditingEngine.apply_white_balance(edited, parsed.white_balance)
if parsed.curves_preset is not None:
edited = CVEditingEngine.adjust_curves(edited, parsed.curves_preset)
if parsed.color_grade is not None:
edited = CVEditingEngine.apply_color_grade(edited, parsed.color_grade)
if parsed.style_preset == "watercolor":
edited = CVEditingEngine.apply_watercolor(edited)
elif parsed.style_preset == "oil_painting":
edited = CVEditingEngine.apply_oil_painting(edited)
if parsed.retouch_faces:
edited = CVEditingEngine.enhance_portrait_features(edited)
if parsed.local_contrast_amount is not None:
edited = CVEditingEngine.apply_local_contrast(edited, parsed.local_contrast_amount)
if parsed.bloom_intensity is not None:
edited = CVEditingEngine.apply_bloom_glow(edited, parsed.bloom_intensity)
if parsed.tilt_shift_focus is not None:
edited = CVEditingEngine.apply_tilt_shift(edited, parsed.tilt_shift_focus)
if parsed.vignette_scale is not None:
edited = CVEditingEngine.apply_vignette(edited, parsed.vignette_scale)
# General enhancement fallbacks
if "general-enhancement" in plan.cv_pipeline:
# Apply standard premium enhancements (subtle contrast, brightness, details)
edited = ImageEnhance.Color(edited).enhance(1.08)
edited = ImageEnhance.Contrast(edited).enhance(1.05)
edited = edited.filter(ImageFilter.UnsharpMask(radius=1.0, percent=80, threshold=2))
except Exception as e:
# Fallback to pure PIL operations if any advanced OpenCV failure
print(f"CV Engine error: {e}, using PIL fallbacks")
edited = ImageEnhance.Color(image.convert("RGB")).enhance(1.1)
edited = ImageEnhance.Contrast(edited).enhance(1.08)
# Apply identity restoration (keep faces natural)
edited = self.identity.apply(image, edited, understanding, plan)
# Construct premium status log message
pipeline_log = " → ".join(plan.cv_pipeline)
msg = f"Successfully orchestrated instant high-fidelity CV edit via [{pipeline_log}]"
return PipelineOutcome(
image=edited,
message=msg,
used_fallback=False,
steps=plan.cv_pipeline,
metadata={"mode": "orchestrated_edit"},
)
def run_generate(
self,
backend,
prompt: str,
width: int,
height: int,
num_steps: int,
text_guidance_scale: float,
image_guidance_scale: float,
seed: Optional[int],
**kwargs,
) -> PipelineOutcome:
"""Procedurally generate abstract placeholder backgrounds or scenes on CPU or via diffusion."""
if backend is not None and getattr(backend, "supports_generation", False):
try:
res = backend.generate(
prompt=prompt,
width=width,
height=height,
num_steps=num_steps,
text_guidance_scale=text_guidance_scale,
image_guidance_scale=image_guidance_scale,
seed=seed,
)
generated = res.image
steps = ["diffusion_generate"]
except Exception as e:
print(f"Diffusion generation error: {e}, falling back to procedural generation.")
generated = self.engine.procedural_generate(prompt=prompt, size=(width, height), seed=seed)
steps = ["procedural_generate"]
else:
generated = self.engine.procedural_generate(prompt=prompt, size=(width, height), seed=seed)
steps = ["procedural_generate"]
# Apply any stylistic elements derived from prompt
parsed = self.parser.parse(prompt)
edited = generated.convert("RGB")
try:
if parsed.color_grade is not None:
edited = CVEditingEngine.apply_color_grade(edited, parsed.color_grade)
steps.append(parsed.color_grade)
if parsed.bloom_intensity is not None:
edited = CVEditingEngine.apply_bloom_glow(edited, parsed.bloom_intensity)
steps.append("bloom")
if parsed.vignette_scale is not None:
edited = CVEditingEngine.apply_vignette(edited, parsed.vignette_scale)
steps.append("vignette")
except Exception:
pass
return PipelineOutcome(
image=edited,
message="Generation complete.",
used_fallback=False,
steps=steps,
metadata={"mode": "generate"},
)