Spaces:
Sleeping
Sleeping
Raghava Pulugu
Optimize HuggingFace CPU backend: Default to InstructPix2Pix with DPM scheduler, bypass custom model, and set CPU step bounds
7fdfb1a | """ | |
| editing_stack.py - Intelligent natural language editing orchestrator. | |
| Parses natural prompts into advanced visual editing pipelines, executing them | |
| instantly using the high-performance CV Engine. Safeguards outputs to prevent | |
| degenerate colorful noise patterns. | |
| """ | |
| from __future__ import annotations | |
| import re | |
| import numpy as np | |
| from dataclasses import dataclass, field | |
| from typing import Any, Optional | |
| from PIL import Image, ImageDraw, ImageEnhance, ImageFilter | |
| from server.cv_engine import CVEditingEngine | |
| try: | |
| import cv2 | |
| except ImportError: | |
| cv2 = None | |
| _RESAMPLING = Image.Resampling if hasattr(Image, "Resampling") else Image | |
| class ParsedInstruction: | |
| raw_prompt: str | |
| normalized_prompt: str | |
| operations: list[str] = field(default_factory=list) | |
| style_preset: Optional[str] = None | |
| color_grade: Optional[str] = None | |
| vignette_scale: Optional[float] = None | |
| bloom_intensity: Optional[float] = None | |
| tilt_shift_focus: Optional[float] = None | |
| retouch_faces: bool = False | |
| white_balance: Optional[float] = None # kelvin scaling | |
| curves_preset: Optional[str] = None | |
| local_contrast_amount: Optional[float] = None | |
| broad_edit: bool = False | |
| preserve_identity: bool = True | |
| requires_foundation_model: bool = False | |
| class ImageUnderstanding: | |
| width: int | |
| height: int | |
| brightness: float | |
| detail_score: float | |
| has_face: bool | |
| face_boxes: list[tuple[int, int, int, int]] = field(default_factory=list) | |
| is_portrait: bool = False | |
| class EditPlan: | |
| backend_prompt: str | |
| cv_pipeline: list[str] | |
| preserve_faces: bool | |
| upscale_output: bool | |
| refine_output: bool | |
| notes: list[str] = field(default_factory=list) | |
| class PipelineOutcome: | |
| image: Image.Image | |
| message: str | |
| used_fallback: bool = False | |
| steps: list[str] = field(default_factory=list) | |
| metadata: dict[str, Any] = field(default_factory=dict) | |
| class InstructionParserStage: | |
| """ | |
| Advanced NLP prompt parsing stage. Maps natural instructions to | |
| optimal computer vision & image processing operations. | |
| """ | |
| def __init__(self, engine: Optional[Any] = None): | |
| self.engine = engine | |
| def parse(self, prompt: str) -> ParsedInstruction: | |
| raw = prompt or "enhance" | |
| normalized = " ".join(raw.lower().strip().split()) | |
| tokens = set(re.findall(r"[a-z0-9']+", normalized)) | |
| ops = [] | |
| style_preset = None | |
| color_grade = None | |
| vignette_scale = None | |
| bloom_intensity = None | |
| tilt_shift_focus = None | |
| retouch_faces = False | |
| white_balance = None | |
| curves_preset = None | |
| local_contrast_amount = None | |
| requires_foundation = False | |
| # --- Color Grades & LUTs --- | |
| if any(w in normalized for w in ["teal", "orange", "cinema", "cinematic", "hollywood"]): | |
| color_grade = "teal_orange" | |
| ops.append("teal-orange-lut") | |
| elif any(w in normalized for w in ["vintage", "retro", "classic", "analog", "antique", "film", "kodak"]): | |
| color_grade = "vintage" | |
| ops.append("vintage-lut") | |
| elif any(w in normalized for w in ["cyberpunk", "neon", "tokyo", "futuristic", "synthwave"]): | |
| color_grade = "cyberpunk" | |
| ops.append("cyberpunk-lut") | |
| elif any(w in normalized for w in ["noir", "dramatic black", "high contrast black", "monochrome"]): | |
| color_grade = "noir" | |
| ops.append("noir-lut") | |
| # --- Artistic Styles --- | |
| if any(w in normalized for w in ["watercolor", "watercolour", "water color"]): | |
| style_preset = "watercolor" | |
| ops.append("watercolor-style") | |
| elif any(w in normalized for w in ["oil painting", "oilpaint", "van gogh", "canvas paint", "artistic paint"]): | |
| style_preset = "oil_painting" | |
| ops.append("oil-painting-style") | |
| # --- Vignette --- | |
| if "vignette" in normalized or "dark corners" in normalized: | |
| vignette_scale = 0.7 | |
| if "heavy" in normalized or "strong" in normalized: | |
| vignette_scale = 0.5 | |
| elif "light" in normalized or "subtle" in normalized: | |
| vignette_scale = 0.85 | |
| ops.append("vignette") | |
| # --- Bloom & Glow --- | |
| if any(w in normalized for w in ["bloom", "glow", "dreamy", "ethereal", "soft light", "highlight glow"]): | |
| bloom_intensity = 0.45 | |
| if "strong" in normalized or "intense" in normalized: | |
| bloom_intensity = 0.7 | |
| elif "subtle" in normalized or "dreamy soft" in normalized: | |
| bloom_intensity = 0.25 | |
| ops.append("bloom") | |
| # --- Tilt Shift --- | |
| if any(w in normalized for w in ["tilt shift", "miniature", "macro focus", "toy model"]): | |
| tilt_shift_focus = 0.5 | |
| if "top" in normalized: | |
| tilt_shift_focus = 0.25 | |
| elif "bottom" in normalized: | |
| tilt_shift_focus = 0.75 | |
| ops.append("tilt-shift") | |
| # --- Face & Portrait retouch --- | |
| if any(w in normalized for w in ["retouch", "face", "portrait", "skin", "smooth skin", "make up", "beautify"]): | |
| retouch_faces = True | |
| ops.append("portrait-retouch") | |
| # --- White Balance / Kelvin --- | |
| if any(w in normalized for w in ["warm", "sunset", "amber", "golden hour", "autumn", "fall"]): | |
| white_balance = 1.15 # Warm shift | |
| ops.append("warm-balance") | |
| elif any(w in normalized for w in ["cool", "blue", "winter", "ice", "frost", "snow"]): | |
| white_balance = 0.85 # Cool shift | |
| ops.append("cool-balance") | |
| # --- Tone Curves & Contrast --- | |
| if "matte" in normalized or "faded" in normalized or "flat" in normalized: | |
| curves_preset = "matte" | |
| ops.append("matte-curves") | |
| elif "dramatic contrast" in normalized or "high contrast" in normalized or "s-curve" in normalized: | |
| curves_preset = "dramatic" | |
| ops.append("dramatic-contrast-curves") | |
| elif "lift shadows" in normalized or "brighten dark" in normalized or "shadow recover" in normalized: | |
| curves_preset = "lift" | |
| ops.append("lift-shadows-curves") | |
| # --- Clarity / Local Contrast --- | |
| if any(w in normalized for w in ["clarity", "detail", "sharp", "structure", "local contrast", "definition"]): | |
| local_contrast_amount = 1.25 | |
| if "heavy" in normalized or "extreme" in normalized: | |
| local_contrast_amount = 1.5 | |
| elif "soft" in normalized or "mild" in normalized: | |
| local_contrast_amount = 1.1 | |
| ops.append("clarity-enhancement") | |
| # --- Fallback to Basic Enhancements if empty --- | |
| if not ops: | |
| ops.append("general-enhancement") | |
| # Check if user prompt is a massive structural/generative edit requiring a foundation diffusion model (e.g. background swap, object add/remove) | |
| broad_words = {"background", "replace", "remove", "add", "swap", "clothing", "bikini", "shirt", "pants", "dress", "car", "dog", "cat"} | |
| broad_edit = any(w in tokens for w in broad_words) | |
| # It requires foundation model if it's a broad edit or if it has general-enhancement but the prompt has descriptive terms | |
| enhancement_words = {"enhance", "improve", "auto", "better", "photo", "image", "clean", "fix", "retouch", "beautify", "portrait", "face", "skin", "smooth"} | |
| has_descriptive_terms = any(t not in enhancement_words for t in tokens if len(t) > 2) | |
| requires_foundation = broad_edit or ("general-enhancement" in ops and has_descriptive_terms) | |
| return ParsedInstruction( | |
| raw_prompt=raw, | |
| normalized_prompt=normalized, | |
| operations=ops, | |
| style_preset=style_preset, | |
| color_grade=color_grade, | |
| vignette_scale=vignette_scale, | |
| bloom_intensity=bloom_intensity, | |
| tilt_shift_focus=tilt_shift_focus, | |
| retouch_faces=retouch_faces, | |
| white_balance=white_balance, | |
| curves_preset=curves_preset, | |
| local_contrast_amount=local_contrast_amount, | |
| broad_edit=broad_edit, | |
| preserve_identity=True, | |
| requires_foundation_model=requires_foundation, | |
| ) | |
| class ImageUnderstandingStage: | |
| """Analyze image metadata and face dimensions.""" | |
| def analyze(self, image: Image.Image) -> ImageUnderstanding: | |
| rgb = image.convert("RGB") | |
| gray = np.asarray(rgb.convert("L"), dtype=np.uint8) | |
| brightness = float(gray.mean() / 255.0) | |
| detail_score = self._detail_score(gray) | |
| face_boxes = self._detect_faces(gray) | |
| has_face = len(face_boxes) > 0 | |
| is_portrait = rgb.height >= rgb.width or has_face | |
| return ImageUnderstanding( | |
| width=rgb.width, | |
| height=rgb.height, | |
| brightness=brightness, | |
| detail_score=detail_score, | |
| has_face=has_face, | |
| face_boxes=face_boxes, | |
| is_portrait=is_portrait, | |
| ) | |
| def _detail_score(self, gray: np.ndarray) -> float: | |
| if cv2 is not None: | |
| lap = cv2.Laplacian(gray, cv2.CV_32F) | |
| return float(lap.var()) | |
| grad_y, grad_x = np.gradient(gray.astype(np.float32)) | |
| return float(np.mean(np.abs(grad_x)) + np.mean(np.abs(grad_y))) | |
| def _detect_faces(self, gray: np.ndarray) -> list[tuple[int, int, int, int]]: | |
| if cv2 is None: | |
| return [] | |
| try: | |
| cascade = cv2.CascadeClassifier( | |
| cv2.data.haarcascades + "haarcascade_frontalface_default.xml" | |
| ) | |
| faces = cascade.detectMultiScale( | |
| gray, | |
| scaleFactor=1.1, | |
| minNeighbors=5, | |
| minSize=(24, 24), | |
| ) | |
| return [(int(x), int(y), int(w), int(h)) for x, y, w, h in faces] | |
| except Exception: | |
| return [] | |
| class EditingPlannerStage: | |
| """Formulate optimal visual processing strategy combining diffusion and CV engine.""" | |
| def plan( | |
| self, | |
| parsed: ParsedInstruction, | |
| understanding: ImageUnderstanding, | |
| backend, | |
| ) -> EditPlan: | |
| cv_pipeline = list(parsed.operations) | |
| preserve_faces = understanding.has_face and parsed.preserve_identity and not parsed.retouch_faces | |
| return EditPlan( | |
| backend_prompt=parsed.raw_prompt, | |
| cv_pipeline=cv_pipeline, | |
| preserve_faces=preserve_faces, | |
| upscale_output=True, | |
| refine_output=True, | |
| notes=cv_pipeline, | |
| ) | |
| class FaceIdentityPreservationStage: | |
| """Seamlessly blend original facial high-resolution details back after major edits.""" | |
| def apply( | |
| self, | |
| original: Image.Image, | |
| edited: Image.Image, | |
| understanding: ImageUnderstanding, | |
| plan: EditPlan, | |
| ) -> Image.Image: | |
| if not plan.preserve_faces or not understanding.face_boxes: | |
| return edited.convert("RGB") | |
| working = edited.convert("RGB").copy() | |
| original_resized = original.convert("RGB").resize(working.size, _RESAMPLING.LANCZOS) | |
| scale_x = working.width / max(understanding.width, 1) | |
| scale_y = working.height / max(understanding.height, 1) | |
| for x, y, w, h in understanding.face_boxes: | |
| sx, sy, sw, sh = int(x * scale_x), int(y * scale_y), int(w * scale_x), int(h * scale_y) | |
| margin_x, margin_y = int(sw * 0.35), int(sh * 0.45) | |
| left = max(0, sx - margin_x) | |
| top = max(0, sy - margin_y) | |
| right = min(working.width, sx + sw + margin_x) | |
| bottom = min(working.height, sy + sh + margin_y) | |
| if right <= left or bottom <= top: | |
| continue | |
| box = (left, top, right, bottom) | |
| edited_crop = working.crop(box) | |
| source_crop = original_resized.crop(box) | |
| blended_crop = Image.blend(edited_crop, source_crop, alpha=0.7) | |
| mask = Image.new("L", edited_crop.size, 0) | |
| draw = ImageDraw.Draw(mask) | |
| width, height = edited_crop.size | |
| draw.ellipse((int(width * 0.08), int(height * 0.03), int(width * 0.92), int(height * 0.97)), fill=255) | |
| blur_radius = max(6, int(min(width, height) * 0.12)) | |
| mask = mask.filter(ImageFilter.GaussianBlur(radius=blur_radius)) | |
| region = Image.composite(blended_crop, edited_crop, mask) | |
| working.paste(region, box) | |
| return working | |
| class EditingOrchestrator: | |
| """The master coordinator managing the entire image transformation lifecycle.""" | |
| def __init__(self, engine: Optional[Any] = None): | |
| self.parser = InstructionParserStage() | |
| self.understanding = ImageUnderstandingStage() | |
| self.planner = EditingPlannerStage() | |
| self.identity = FaceIdentityPreservationStage() | |
| from server.cv_engine import CVEngine | |
| self.engine = engine or CVEngine() | |
| def run( | |
| self, | |
| backend, | |
| fallback, | |
| image: Image.Image, | |
| prompt: str, | |
| num_steps: int, | |
| text_guidance_scale: float, | |
| image_guidance_scale: float, | |
| seed: Optional[int], | |
| background_image: Optional[Image.Image] = None, | |
| reference_image: Optional[Image.Image] = None, | |
| mask: Optional[Image.Image] = None, | |
| prepend_steps: Optional[list[Any]] = None, | |
| disable_diffusion: bool = False, | |
| **kwargs, | |
| ) -> PipelineOutcome: | |
| parsed = self.parser.parse(prompt) | |
| understanding = self.understanding.analyze(image) | |
| plan = self.planner.plan(parsed, understanding, backend) | |
| # Base Image setup | |
| edited = image.convert("RGB") | |
| # 1. Execute any prepended steps (e.g. background replacement or style reference) | |
| if prepend_steps: | |
| from server.cv_engine import OperationContext | |
| ctx = OperationContext( | |
| mask=mask, | |
| reference_image=reference_image, | |
| background_image=background_image, | |
| prompt=prompt, | |
| seed=seed, | |
| ) | |
| res = self.engine.execute_pipeline(edited, prepend_steps, ctx) | |
| edited = res.image | |
| # 2. Run the diffusion model backend if enabled and available, and if the instruction requires it | |
| if not disable_diffusion and backend is not None and parsed.requires_foundation_model: | |
| try: | |
| res = backend.edit( | |
| image=edited, | |
| prompt=prompt, | |
| num_steps=num_steps, | |
| text_guidance_scale=text_guidance_scale, | |
| image_guidance_scale=image_guidance_scale, | |
| seed=seed, | |
| mask=mask, | |
| reference_image=reference_image, | |
| background_image=background_image, | |
| ) | |
| edited = res.image | |
| if "diffusion" not in plan.cv_pipeline: | |
| plan.cv_pipeline.append("diffusion") | |
| except Exception as e: | |
| print(f"Diffusion backend error: {e}, falling back to CV operations only.") | |
| # ===================================================================== | |
| # PROCESS PIPELINE THROUGH THE HIGH-FIDELITY CV ENGINE | |
| # ===================================================================== | |
| try: | |
| if parsed.white_balance is not None: | |
| edited = CVEditingEngine.apply_white_balance(edited, parsed.white_balance) | |
| if parsed.curves_preset is not None: | |
| edited = CVEditingEngine.adjust_curves(edited, parsed.curves_preset) | |
| if parsed.color_grade is not None: | |
| edited = CVEditingEngine.apply_color_grade(edited, parsed.color_grade) | |
| if parsed.style_preset == "watercolor": | |
| edited = CVEditingEngine.apply_watercolor(edited) | |
| elif parsed.style_preset == "oil_painting": | |
| edited = CVEditingEngine.apply_oil_painting(edited) | |
| if parsed.retouch_faces: | |
| edited = CVEditingEngine.enhance_portrait_features(edited) | |
| if parsed.local_contrast_amount is not None: | |
| edited = CVEditingEngine.apply_local_contrast(edited, parsed.local_contrast_amount) | |
| if parsed.bloom_intensity is not None: | |
| edited = CVEditingEngine.apply_bloom_glow(edited, parsed.bloom_intensity) | |
| if parsed.tilt_shift_focus is not None: | |
| edited = CVEditingEngine.apply_tilt_shift(edited, parsed.tilt_shift_focus) | |
| if parsed.vignette_scale is not None: | |
| edited = CVEditingEngine.apply_vignette(edited, parsed.vignette_scale) | |
| # General enhancement fallbacks | |
| if "general-enhancement" in plan.cv_pipeline: | |
| # Apply standard premium enhancements (subtle contrast, brightness, details) | |
| edited = ImageEnhance.Color(edited).enhance(1.08) | |
| edited = ImageEnhance.Contrast(edited).enhance(1.05) | |
| edited = edited.filter(ImageFilter.UnsharpMask(radius=1.0, percent=80, threshold=2)) | |
| except Exception as e: | |
| # Fallback to pure PIL operations if any advanced OpenCV failure | |
| print(f"CV Engine error: {e}, using PIL fallbacks") | |
| edited = ImageEnhance.Color(image.convert("RGB")).enhance(1.1) | |
| edited = ImageEnhance.Contrast(edited).enhance(1.08) | |
| # Apply identity restoration (keep faces natural) | |
| edited = self.identity.apply(image, edited, understanding, plan) | |
| # Construct premium status log message | |
| pipeline_log = " → ".join(plan.cv_pipeline) | |
| msg = f"Successfully orchestrated instant high-fidelity CV edit via [{pipeline_log}]" | |
| return PipelineOutcome( | |
| image=edited, | |
| message=msg, | |
| used_fallback=False, | |
| steps=plan.cv_pipeline, | |
| metadata={"mode": "orchestrated_edit"}, | |
| ) | |
| def run_generate( | |
| self, | |
| backend, | |
| prompt: str, | |
| width: int, | |
| height: int, | |
| num_steps: int, | |
| text_guidance_scale: float, | |
| image_guidance_scale: float, | |
| seed: Optional[int], | |
| **kwargs, | |
| ) -> PipelineOutcome: | |
| """Procedurally generate abstract placeholder backgrounds or scenes on CPU or via diffusion.""" | |
| if backend is not None and getattr(backend, "supports_generation", False): | |
| try: | |
| res = backend.generate( | |
| prompt=prompt, | |
| width=width, | |
| height=height, | |
| num_steps=num_steps, | |
| text_guidance_scale=text_guidance_scale, | |
| image_guidance_scale=image_guidance_scale, | |
| seed=seed, | |
| ) | |
| generated = res.image | |
| steps = ["diffusion_generate"] | |
| except Exception as e: | |
| print(f"Diffusion generation error: {e}, falling back to procedural generation.") | |
| generated = self.engine.procedural_generate(prompt=prompt, size=(width, height), seed=seed) | |
| steps = ["procedural_generate"] | |
| else: | |
| generated = self.engine.procedural_generate(prompt=prompt, size=(width, height), seed=seed) | |
| steps = ["procedural_generate"] | |
| # Apply any stylistic elements derived from prompt | |
| parsed = self.parser.parse(prompt) | |
| edited = generated.convert("RGB") | |
| try: | |
| if parsed.color_grade is not None: | |
| edited = CVEditingEngine.apply_color_grade(edited, parsed.color_grade) | |
| steps.append(parsed.color_grade) | |
| if parsed.bloom_intensity is not None: | |
| edited = CVEditingEngine.apply_bloom_glow(edited, parsed.bloom_intensity) | |
| steps.append("bloom") | |
| if parsed.vignette_scale is not None: | |
| edited = CVEditingEngine.apply_vignette(edited, parsed.vignette_scale) | |
| steps.append("vignette") | |
| except Exception: | |
| pass | |
| return PipelineOutcome( | |
| image=edited, | |
| message="Generation complete.", | |
| used_fallback=False, | |
| steps=steps, | |
| metadata={"mode": "generate"}, | |
| ) | |