Spaces:
Running on Zero
Running on Zero
| import base64 | |
| import hashlib | |
| import io | |
| import json | |
| import traceback | |
| import gradio as gr | |
| import numpy as np | |
| import torch | |
| import cv2 | |
| from PIL import Image | |
| from transformers import pipeline as hf_pipeline | |
| import sys | |
| from pathlib import Path | |
| # ββ ZeroGPU shim βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| try: | |
| import spaces | |
| except ImportError: | |
| class _DummySpaces: | |
| def GPU(self, fn): | |
| return fn | |
| spaces = _DummySpaces() | |
| DEVICE = 0 if torch.cuda.is_available() else -1 | |
| sam_vit_pipeline = None | |
| # ββ Parametros sincronizados entre UI y backend βββββββββββββββββββββββββββββββ | |
| # Estos valores se actualizan cada vez que el usuario corre "Segmentar" en la UI. | |
| # segment_for_backend los lee para usar exactamente los mismos. | |
| PARAMS = { | |
| "pred_iou_thresh": 0.95, | |
| "stability_score_thresh": 0.5, | |
| "points_per_batch": 32, | |
| "min_mask_region_area": 4500, | |
| "box_nms_thresh": 0.8, | |
| } | |
| # ββ Renderizado βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _render_masks(imagen_rgb: Image.Image, masks: list) -> Image.Image: | |
| img_arr = np.array(imagen_rgb).copy() | |
| overlay = img_arr.copy() | |
| for i, mask in enumerate(masks): | |
| h = hashlib.md5(str(i).encode()).hexdigest()[:6] | |
| color = (int(h[0:2], 16), int(h[2:4], 16), int(h[4:6], 16)) | |
| overlay[np.array(mask) > 0] = color | |
| blended = cv2.addWeighted(img_arr, 0.5, overlay, 0.5, 0) | |
| return Image.fromarray(blended) | |
| def _load_pipeline(): | |
| global sam_vit_pipeline | |
| if sam_vit_pipeline is None: | |
| print("Cargando SAM ViT-Huge...") | |
| sam_vit_pipeline = hf_pipeline( | |
| "mask-generation", | |
| model="facebook/sam-vit-huge", | |
| device=DEVICE, | |
| ) | |
| # ββ Segmentacion UI βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def segmentar( | |
| imagen: Image.Image, | |
| pred_iou_thresh: float, | |
| stability_score_thresh: float, | |
| points_per_batch: int, | |
| min_mask_region_area: int, | |
| box_nms_thresh: float, | |
| ): | |
| global PARAMS | |
| if imagen is None: | |
| return None, "Sube una imagen para comenzar." | |
| # Sincronizar PARAMS con los sliders actuales | |
| PARAMS.update({ | |
| "pred_iou_thresh": float(pred_iou_thresh), | |
| "stability_score_thresh": float(stability_score_thresh), | |
| "points_per_batch": int(points_per_batch), | |
| "min_mask_region_area": int(min_mask_region_area), | |
| "box_nms_thresh": float(box_nms_thresh), | |
| }) | |
| _load_pipeline() | |
| imagen_rgb = imagen.convert("RGB") | |
| resultado = sam_vit_pipeline( | |
| imagen_rgb, | |
| points_per_batch=PARAMS["points_per_batch"], | |
| pred_iou_thresh=PARAMS["pred_iou_thresh"], | |
| stability_score_thresh=PARAMS["stability_score_thresh"], | |
| min_mask_region_area=PARAMS["min_mask_region_area"], | |
| box_nms_thresh=PARAMS["box_nms_thresh"], | |
| ) | |
| if isinstance(resultado, list): | |
| resultado = resultado[0] | |
| masks = resultado.get("masks", []) | |
| if not masks: | |
| return imagen_rgb, "No se detectaron zonas." | |
| info = ( | |
| f"UI: {len(masks)} zonas | " | |
| f"iou={PARAMS['pred_iou_thresh']} stab={PARAMS['stability_score_thresh']} " | |
| f"min_area={PARAMS['min_mask_region_area']} " | |
| f"nms={PARAMS['box_nms_thresh']} batch={PARAMS['points_per_batch']}" | |
| ) | |
| return _render_masks(imagen_rgb, masks), info | |
| # ββ Endpoint para el backend Docker ββββββββββββββββββββββββββββββββββββββββββ | |
| def segment_for_backend(image_np: np.ndarray): | |
| """ | |
| Llamado por el backend via gradio_client (api_name='/segment'). | |
| Usa los mismos PARAMS que la UI β sincronizados al ultimo "Segmentar". | |
| Entrada : numpy uint8 H x W x 3. | |
| Salida : (overlay_np, combined_json_str) | |
| """ | |
| try: | |
| if image_np is None: | |
| empty = np.zeros((100, 100, 3), dtype=np.uint8) | |
| return empty, json.dumps({"masks": [], "label_map_b64": ""}) | |
| _load_pipeline() | |
| pil_image = Image.fromarray(image_np.astype(np.uint8)).convert("RGB") | |
| h, w = image_np.shape[:2] | |
| resultado = sam_vit_pipeline( | |
| pil_image, | |
| points_per_batch=PARAMS["points_per_batch"], | |
| pred_iou_thresh=PARAMS["pred_iou_thresh"], | |
| stability_score_thresh=PARAMS["stability_score_thresh"], | |
| min_mask_region_area=PARAMS["min_mask_region_area"], | |
| box_nms_thresh=PARAMS["box_nms_thresh"], | |
| ) | |
| if isinstance(resultado, list): | |
| resultado = resultado[0] | |
| all_masks_raw = resultado.get("masks", []) | |
| masks_bool = [np.array(m).astype(bool) for m in all_masks_raw] | |
| # Ordenar de mayor a menor area: las mascaras grandes se escriben primero | |
| # y las pequenas (ventanas, detalles) las sobreescriben β evita que el muro | |
| # tape a la ventana en el label_map. | |
| masks_bool = sorted(masks_bool, key=lambda m: m.sum(), reverse=True) | |
| # Label map: cada pixel contiene el indice de la mascara (1-based, max 254) | |
| label_map = np.zeros((h, w), dtype=np.uint8) | |
| masks_out = [] | |
| for i, mask in enumerate(masks_bool[:254], start=1): | |
| label_map[mask] = i | |
| area_ratio = float(mask.sum()) / max(1, h * w) | |
| ys, xs = np.where(mask) | |
| bbox = ( | |
| [int(xs.min()), int(ys.min()), int(xs.max() - xs.min()), int(ys.max() - ys.min())] | |
| if len(ys) else [0, 0, 0, 0] | |
| ) | |
| masks_out.append({ | |
| "index": i, | |
| "surface": f"Zona {i}", | |
| "area_ratio": round(area_ratio, 4), | |
| "bbox_xywh": bbox, | |
| }) | |
| pil_label = Image.fromarray(label_map, mode="L") | |
| buf = io.BytesIO() | |
| pil_label.save(buf, format="PNG") | |
| label_map_b64 = base64.b64encode(buf.getvalue()).decode("utf-8") | |
| overlay_pil = _render_masks(pil_image, masks_bool) | |
| overlay_np = np.array(overlay_pil.convert("RGB")) | |
| combined = { | |
| "masks": masks_out, | |
| "label_map_b64": label_map_b64, | |
| "entorno": "gpu", | |
| "motor": "SAM Auto (GPU - ZeroGPU)", | |
| "params_used": dict(PARAMS), | |
| } | |
| return overlay_np, json.dumps(combined, ensure_ascii=False) | |
| except Exception: | |
| err = traceback.format_exc() | |
| empty = np.zeros((100, 100, 3), dtype=np.uint8) | |
| return empty, json.dumps({"error": err, "masks": [], "label_map_b64": ""}) | |
| def limpiar_mascara(mask: np.ndarray, area_minima: int = 1500) -> np.ndarray: | |
| """Elimina salpicaduras pequenas usando morfologia y connected components.""" | |
| try: | |
| mask_uint8 = (mask.astype(np.uint8)) * 255 | |
| kernel = np.ones((7, 7), np.uint8) | |
| mask_limpia = cv2.morphologyEx(mask_uint8, cv2.MORPH_OPEN, kernel) | |
| mask_limpia = cv2.morphologyEx(mask_limpia, cv2.MORPH_CLOSE, kernel) | |
| num_labels, labels, stats, _ = cv2.connectedComponentsWithStats(mask_limpia, connectivity=8) | |
| mask_final = np.zeros_like(mask_limpia) | |
| if num_labels > 1: | |
| areas = stats[1:, cv2.CC_STAT_AREA] | |
| max_area = int(areas.max()) if areas.size else 0 | |
| for i in range(1, num_labels): | |
| area = int(stats[i, cv2.CC_STAT_AREA]) | |
| if area >= area_minima and (max_area == 0 or area >= (max_area * 0.05)): | |
| mask_final[labels == i] = 255 | |
| return (mask_final.astype(bool)) | |
| except Exception: | |
| return mask | |
| def render_for_backend( | |
| image_np: np.ndarray, | |
| label_map_b64: str, | |
| mask_index: int = 1, | |
| texture_name: str | None = None, | |
| texture_b64: str | None = None, | |
| params_json: str = "{}", | |
| ): | |
| """ | |
| Aplica la textura sobre la mascara especificada usando el pipeline del Generador de Texturas. | |
| Retorna (imagen_renderizada_np, json_str) | |
| """ | |
| try: | |
| if image_np is None: | |
| empty = np.zeros((100, 100, 3), dtype=np.uint8) | |
| return empty, json.dumps({"error": "no_image"}) | |
| # Preparar imagen | |
| pil_image = Image.fromarray(image_np.astype(np.uint8)).convert("RGB") | |
| image_rgb = np.array(pil_image) | |
| # Decodificar label_map | |
| if not label_map_b64: | |
| return image_rgb, json.dumps({"error": "no_label_map"}) | |
| try: | |
| lb = base64.b64decode(label_map_b64) | |
| lab = Image.open(io.BytesIO(lb)).convert("L") | |
| label_map = np.array(lab, dtype=np.uint8) | |
| except Exception as e: | |
| return image_rgb, json.dumps({"error": "bad_label_map", "detail": str(e)}) | |
| idx = int(float(mask_index)) if mask_index is not None else 1 | |
| if idx <= 0: | |
| return image_rgb, json.dumps({"error": "invalid_mask_index"}) | |
| mask = (label_map == idx) | |
| if not mask.any(): | |
| return image_rgb, json.dumps({"error": "mask_not_found", "index": idx}) | |
| # Limpieza basica | |
| mask = limpiar_mascara(mask) | |
| # Intentar importar el pipeline del Generador de Texturas | |
| gen_root_candidates = [ | |
| Path("c:/Users/alane/OneDrive/Escritorio/Generandor de texturas"), | |
| Path("../Generandor de texturas"), | |
| Path("../../Generandor de texturas"), | |
| Path("Generandor de texturas"), | |
| ] | |
| gen_root = None | |
| for p in gen_root_candidates: | |
| if p.exists(): | |
| gen_root = p.resolve() | |
| break | |
| if gen_root is not None: | |
| sys.path.insert(0, str(gen_root)) | |
| sys.path.insert(0, str((gen_root / "pipeline"))) | |
| try: | |
| from pipeline.perspective import PerspectiveAnalyzer | |
| from pipeline.depth_estimator import DepthEstimator | |
| from pipeline.renderer import TextureRenderer | |
| except Exception as e: | |
| return image_rgb, json.dumps({"error": "import_failed", "detail": str(e)}) | |
| # Esquinas y perspectiva | |
| analyzer = PerspectiveAnalyzer() | |
| corners = analyzer.get_wall_corners(mask) | |
| # ββ ValidaciΓ³n de quad: si la mΓ‘scara es dispersa (multi-mΓ‘scara) las esquinas | |
| # pueden ser degeneradas (Γ‘rea casi 0). Fallback: bounding box de la mΓ‘scara. | |
| def _quad_area(pts: np.ndarray) -> float: | |
| """Γrea del polΓgono usando fΓ³rmula de Shoelace.""" | |
| n = len(pts) | |
| s = 0.0 | |
| for _i in range(n): | |
| _j = (_i + 1) % n | |
| s += pts[_i, 0] * pts[_j, 1] | |
| s -= pts[_j, 0] * pts[_i, 1] | |
| return abs(s) / 2.0 | |
| _MIN_QUAD_AREA_PX = 400 # < 20x20 px β degenerado | |
| if _quad_area(corners) < _MIN_QUAD_AREA_PX: | |
| _ys_m, _xs_m = np.where(mask) | |
| if len(_ys_m) > 0: | |
| # Usar bounding box de todos los pΓxeles de la mΓ‘scara | |
| corners = np.array([ | |
| [float(_xs_m.min()), float(_ys_m.min())], # TL | |
| [float(_xs_m.max()), float(_ys_m.min())], # TR | |
| [float(_xs_m.max()), float(_ys_m.max())], # BR | |
| [float(_xs_m.min()), float(_ys_m.max())], # BL | |
| ], dtype=float) | |
| # Profundidad y tamaΓ±o de pared | |
| depth_est = DepthEstimator(model_type="DPT_Large") | |
| depth_map = depth_est.estimate(image_rgb) | |
| wall_w_m, wall_h_m = depth_est.estimate_wall_size(mask, depth_map, corners) | |
| # Cargar textura (b64 o por nombre en textures) | |
| texture_np = None | |
| if texture_b64: | |
| try: | |
| tb = base64.b64decode(texture_b64) | |
| tpil = Image.open(io.BytesIO(tb)).convert("RGB") | |
| texture_np = np.array(tpil) | |
| except Exception: | |
| texture_np = None | |
| if texture_np is None and texture_name: | |
| tex_dir = gen_root / "textures" if gen_root is not None else Path("textures") | |
| cand = tex_dir / texture_name | |
| if cand.exists(): | |
| texture_np = np.array(Image.open(cand).convert("RGB")) | |
| else: | |
| for ext in [".jpg", ".jpeg", ".png"]: | |
| cc = tex_dir / (texture_name + ext) | |
| if cc.exists(): | |
| texture_np = np.array(Image.open(cc).convert("RGB")) | |
| break | |
| # Fallback texture | |
| if texture_np is None: | |
| texture_np = np.ones((256, 256, 3), dtype=np.uint8) * 200 | |
| # Parsear params | |
| try: | |
| params = json.loads(params_json) if params_json else {} | |
| except Exception: | |
| params = {} | |
| tile_w_m = params.get("ancho_panel_m") or params.get("tile_w_m") or None | |
| tile_h_m = params.get("alto_panel_m") or params.get("tile_h_m") or None | |
| blend = float(params.get("intensidad_textura", params.get("blend_strength", 0.85))) | |
| separacion = int(params.get("separacion_vertical_px", params.get("separacion_px", 0))) | |
| separacion_h = int(params.get("separacion_horizontal_px", params.get("separacion_h_px", 0))) | |
| orientacion = params.get("orientacion", params.get("orientation", "vertical")) | |
| perspectiva_h = float(params.get("perspectiva_horizontal", params.get("perspectiva_h", 0.35))) | |
| perspectiva_v = float(params.get("perspectiva_vertical", params.get("perspectiva_v", 0.7))) | |
| modo_fusion = params.get("modo_fusion", params.get("blend_mode", "Luz suave")) | |
| # Ajuste de perspectiva segΓΊn tipo de superficie (piso vs pared). | |
| # La orientaciΓ³n la controla exclusivamente el preset β no se sobreescribe aquΓ. | |
| surface_type = params.get("surface_type", "wall") | |
| if surface_type in ("floor", "deck", "ceiling"): | |
| # Para piso: perspectiva diferente a la de pared. | |
| # Solo se aplica si el preset NO tiene un valor explΓcito para estos campos. | |
| if "perspectiva_horizontal" not in params and "perspectiva_h" not in params: | |
| perspectiva_h = 0.15 | |
| if "perspectiva_vertical" not in params and "perspectiva_v" not in params: | |
| perspectiva_v = 0.85 | |
| # Ajuste orientacion | |
| if str(orientacion).lower().startswith("h"): | |
| texture_np = np.rot90(texture_np, k=1).copy() | |
| effective_tile_w = tile_h_m | |
| effective_tile_h = tile_w_m | |
| else: | |
| effective_tile_w = tile_w_m | |
| effective_tile_h = tile_h_m | |
| # Auto-remap de separaciΓ³n segΓΊn orientaciΓ³n. | |
| # El usuario define UN valor de separaciΓ³n; el eje (v o h) depende de la orientaciΓ³n: | |
| # - Modo horizontal (tablones izq-der): gaps entre tablones = h_bands (separacion_h_px) | |
| # - Modo vertical (tablones arriba-abajo): gaps entre tablones = v_bands (separacion_px) | |
| # Si el usuario sΓ³lo tiene un valor en el eje "opuesto", lo remapeamos automΓ‘ticamente. | |
| _is_horiz = str(orientacion).lower().startswith("h") | |
| if _is_horiz and separacion_h == 0 and separacion > 0: | |
| # Vertical β Horizontal: mover sep_v a sep_h para que queden bandas h entre tablones | |
| separacion_h, separacion = separacion, 0 | |
| elif not _is_horiz and separacion == 0 and separacion_h > 0: | |
| # Horizontal β Vertical: mover sep_h a sep_v para que queden bandas v entre tablones | |
| separacion, separacion_h = separacion_h, 0 | |
| # Render β envuelto en try para capturar errores OpenCV residuales | |
| renderer = TextureRenderer() | |
| try: | |
| result = renderer.render( | |
| image=image_rgb, | |
| mask=mask, | |
| texture=texture_np, | |
| corners=corners, | |
| wall_w_m=wall_w_m, | |
| wall_h_m=wall_h_m, | |
| tile_w_m=effective_tile_w, | |
| tile_h_m=effective_tile_h, | |
| blend_strength=blend, | |
| separacion_px=separacion, | |
| separacion_h_px=separacion_h, | |
| horizontal_sep=_is_horiz, | |
| perspectiva_h=perspectiva_h, | |
| perspectiva_v=perspectiva_v, | |
| modo_fusion=modo_fusion, | |
| ) | |
| except Exception as _render_err: | |
| print(f" [Space render] Error en renderer.render: {_render_err}. Usando tiling simple.") | |
| # Fallback: tiling simple sobre la mΓ‘scara sin perspectiva | |
| th_np, tw_np = texture_np.shape[:2] | |
| h_im, w_im = image_rgb.shape[:2] | |
| reps_y = max(1, -(-h_im // th_np)) # ceil division | |
| reps_x = max(1, -(-w_im // tw_np)) | |
| big = np.tile(texture_np, (reps_y, reps_x, 1))[:h_im, :w_im] | |
| mask_3 = np.stack([mask, mask, mask], axis=2).astype(np.float32) | |
| orig_f = image_rgb.astype(np.float32) / 255.0 | |
| big_f = big.astype(np.float32) / 255.0 | |
| result = np.clip( | |
| np.where(mask_3, big_f * blend + orig_f * (1.0 - blend), orig_f) * 255, 0, 255 | |
| ).astype(np.uint8) | |
| combined = { | |
| "mask_index": idx, | |
| "wall_w_m": wall_w_m, | |
| "wall_h_m": wall_h_m, | |
| "params": params, | |
| } | |
| return result, json.dumps(combined, ensure_ascii=False) | |
| except Exception: | |
| err = traceback.format_exc() | |
| empty = np.zeros((100, 100, 3), dtype=np.uint8) | |
| return empty, json.dumps({"error": err}) | |
| # ββ UI ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| with gr.Blocks(title="SAM Auto - Segmentacion") as demo: | |
| gr.Markdown("# Segmentacion Automatica - SAM ViT-Huge") | |
| gr.Markdown( | |
| "SAM detecta todos los elementos de la imagen de forma automatica, " | |
| "sin necesidad de seleccionar zonas ni escribir prompts." | |
| ) | |
| with gr.Row(): | |
| imagen_entrada = gr.Image(type="pil", label="Foto del Espacio") | |
| imagen_salida = gr.Image(label="Resultado") | |
| estado = gr.Markdown() | |
| boton = gr.Button("Segmentar", variant="primary") | |
| with gr.Accordion("Parametros de segmentacion (sincronizados con el backend)", open=True): | |
| gr.Markdown( | |
| "> Los parametros que configures aqui se aplican tanto a la UI como al backend Docker. " | |
| "Haz clic en **Segmentar** para que el backend adopte los nuevos valores." | |
| ) | |
| with gr.Row(): | |
| sl_pred_iou = gr.Slider( | |
| minimum=0.0, maximum=1.0, step=0.01, value=PARAMS["pred_iou_thresh"], | |
| label="pred_iou_thresh (β menos mascaras, mas limpias | HF default: 0.88)" | |
| ) | |
| sl_stability = gr.Slider( | |
| minimum=0.0, maximum=1.0, step=0.01, value=PARAMS["stability_score_thresh"], | |
| label="stability_score_thresh (β descarta zonas inestables | HF default: 0.95)" | |
| ) | |
| with gr.Row(): | |
| sl_batch = gr.Slider( | |
| minimum=16, maximum=128, step=16, value=PARAMS["points_per_batch"], | |
| label="points_per_batch (no afecta calidad, solo velocidad)" | |
| ) | |
| sl_min_area = gr.Slider( | |
| minimum=0, maximum=5000, step=100, value=PARAMS["min_mask_region_area"], | |
| label="min_mask_region_area px (β filtra zonas pequenas)" | |
| ) | |
| with gr.Row(): | |
| sl_nms = gr.Slider( | |
| minimum=0.0, maximum=1.0, step=0.05, value=PARAMS["box_nms_thresh"], | |
| label="box_nms_thresh (β permite mas solapamiento entre mascaras)" | |
| ) | |
| all_inputs = [imagen_entrada, sl_pred_iou, sl_stability, sl_batch, sl_min_area, sl_nms] | |
| boton.click(fn=segmentar, inputs=all_inputs, outputs=[imagen_salida, estado]) | |
| imagen_entrada.upload(fn=segmentar, inputs=all_inputs, outputs=[imagen_salida, estado]) | |
| # Endpoint oculto para el backend Docker | |
| _api_in = gr.Image(type="numpy", label="backend_input", visible=False) | |
| _api_over = gr.Image(type="numpy", label="backend_overlay", visible=False) | |
| _api_json = gr.Textbox(label="backend_json", visible=False) | |
| _api_btn = gr.Button(visible=False) | |
| _api_btn.click( | |
| fn=segment_for_backend, | |
| inputs=[_api_in], | |
| outputs=[_api_over, _api_json], | |
| api_name="segment", | |
| ) | |
| # Endpoint oculto para renderizado remoto (usado por backend) | |
| _r_in = gr.Image(type="numpy", label="render_input", visible=False) | |
| _r_label = gr.Textbox(label="render_label_map_b64", visible=False) | |
| _r_mask = gr.Number(value=1, visible=False) | |
| _r_texture_name = gr.Textbox(label="texture_name", visible=False) | |
| _r_texture_b64 = gr.Textbox(label="texture_b64", visible=False) | |
| _r_params = gr.Textbox(label="render_params_json", visible=False) | |
| _r_out = gr.Image(type="numpy", label="render_output", visible=False) | |
| _r_json = gr.Textbox(label="render_json", visible=False) | |
| _r_btn = gr.Button(visible=False) | |
| _r_btn.click( | |
| fn=render_for_backend, | |
| inputs=[_r_in, _r_label, _r_mask, _r_texture_name, _r_texture_b64, _r_params], | |
| outputs=[_r_out, _r_json], | |
| api_name="render", | |
| ) | |
| if __name__ == "__main__": | |
| demo.launch() | |