|
|
import os |
|
|
import logging |
|
|
import hashlib |
|
|
import sys |
|
|
import traceback |
|
|
import tempfile |
|
|
import cv2 |
|
|
import numpy as np |
|
|
import torch |
|
|
import torch.nn.functional as F |
|
|
import gradio as gr |
|
|
from PIL import Image, ImageFilter, ImageChops |
|
|
from huggingface_hub import hf_hub_download |
|
|
import spaces |
|
|
|
|
|
|
|
|
|
|
|
from sam2.build_sam import build_sam2 |
|
|
from sam2.sam2_image_predictor import SAM2ImagePredictor |
|
|
from plm_adapter_lora_with_image_input_only_text_positions import PLMLanguageAdapter |
|
|
|
|
|
|
|
|
logging.basicConfig(level=logging.INFO) |
|
|
|
|
|
|
|
|
REPO_ID = "aadarsh99/ConvSeg-Stage2" |
|
|
SAM2_CONFIG = "sam2_hiera_l.yaml" |
|
|
BASE_CKPT_NAME = "sam2_hiera_large.pt" |
|
|
FINE_TUNED_SAM = "fine_tuned_sam2_batched_18000.torch" |
|
|
FINE_TUNED_PLM = "fine_tuned_sam2_batched_plm_18000.torch" |
|
|
|
|
|
SQUARE_DIM = 1024 |
|
|
|
|
|
|
|
|
MODEL_CACHE = {"sam": None, "plm": None} |
|
|
|
|
|
|
|
|
def download_if_needed(repo_id, filename): |
|
|
try: |
|
|
logging.info(f"Checking {filename} in {repo_id}...") |
|
|
return hf_hub_download(repo_id=repo_id, filename=filename) |
|
|
except Exception as e: |
|
|
raise FileNotFoundError(f"Could not find {filename} in {repo_id}. Error: {e}") |
|
|
|
|
|
def stable_color(key: str): |
|
|
h = int(hashlib.sha256(str(key).encode("utf-8")).hexdigest(), 16) |
|
|
|
|
|
EDGE_COLORS_HEX = ["#3A86FF", "#FF006E", "#43AA8B", "#F3722C", "#8338EC", "#90BE6D"] |
|
|
colors = [tuple(int(c.lstrip("#")[i:i+2], 16) for i in (0, 2, 4)) for c in EDGE_COLORS_HEX] |
|
|
return colors[h % len(colors)] |
|
|
|
|
|
def make_overlay(rgb: np.ndarray, mask: np.ndarray, key: str = "mask") -> Image.Image: |
|
|
base = Image.fromarray(rgb.astype(np.uint8)).convert("RGBA") |
|
|
mask_bool = mask > 0 |
|
|
color = stable_color(key) |
|
|
|
|
|
|
|
|
fill_layer = Image.new("RGBA", base.size, color + (0,)) |
|
|
fill_alpha = Image.fromarray((mask_bool.astype(np.uint8) * 140), "L") |
|
|
fill_layer.putalpha(fill_alpha) |
|
|
|
|
|
|
|
|
m = Image.fromarray((mask_bool.astype(np.uint8) * 255), "L") |
|
|
edges = ImageChops.difference(m.filter(ImageFilter.MaxFilter(3)), m.filter(ImageFilter.MinFilter(3))) |
|
|
stroke_layer = Image.new("RGBA", base.size, color + (255,)) |
|
|
stroke_layer.putalpha(edges) |
|
|
|
|
|
|
|
|
out = Image.alpha_composite(base, fill_layer) |
|
|
out = Image.alpha_composite(out, stroke_layer) |
|
|
return out.convert("RGB") |
|
|
|
|
|
def ensure_models_loaded(): |
|
|
global MODEL_CACHE |
|
|
if MODEL_CACHE["sam"] is not None: |
|
|
return |
|
|
|
|
|
logging.info(f"Loading models from {REPO_ID}...") |
|
|
|
|
|
|
|
|
base_path = download_if_needed(REPO_ID, BASE_CKPT_NAME) |
|
|
model = build_sam2(SAM2_CONFIG, base_path, device="cpu") |
|
|
|
|
|
sam_ckpt_path = download_if_needed(REPO_ID, FINE_TUNED_SAM) |
|
|
sd = torch.load(sam_ckpt_path, map_location="cpu") |
|
|
model.load_state_dict(sd.get("model", sd), strict=True) |
|
|
|
|
|
|
|
|
plm_path = download_if_needed(REPO_ID, FINE_TUNED_PLM) |
|
|
plm = PLMLanguageAdapter( |
|
|
model_name="Qwen/Qwen2.5-VL-3B-Instruct", |
|
|
transformer_dim=model.sam_mask_decoder.transformer_dim, |
|
|
n_sparse_tokens=0, use_dense_bias=True, use_lora=True, |
|
|
lora_r=16, lora_alpha=32, lora_dropout=0.05, |
|
|
dtype=torch.bfloat16, device="cpu" |
|
|
) |
|
|
plm_sd = torch.load(plm_path, map_location="cpu") |
|
|
plm.load_state_dict(plm_sd["plm"], strict=True) |
|
|
plm.eval() |
|
|
|
|
|
MODEL_CACHE["sam"] = model |
|
|
MODEL_CACHE["plm"] = plm |
|
|
logging.info("Models loaded successfully.") |
|
|
|
|
|
|
|
|
|
|
|
@spaces.GPU(duration=120) |
|
|
def run_prediction(image_pil, user_text, threshold=0.5): |
|
|
if image_pil is None or not user_text: |
|
|
return None, None, None |
|
|
|
|
|
|
|
|
full_prompt = f"Segment the {user_text.strip()}" |
|
|
|
|
|
if full_prompt[-1] in {".", "!", "?"}: |
|
|
full_prompt = full_prompt[:-1] |
|
|
logging.info(f"Processing prompt: {full_prompt}") |
|
|
|
|
|
ensure_models_loaded() |
|
|
sam_model = MODEL_CACHE["sam"] |
|
|
plm_model = MODEL_CACHE["plm"] |
|
|
|
|
|
|
|
|
sam_model.to("cuda") |
|
|
plm_model.to("cuda") |
|
|
|
|
|
try: |
|
|
with torch.inference_mode(): |
|
|
predictor = SAM2ImagePredictor(sam_model) |
|
|
rgb_orig = np.array(image_pil.convert("RGB")) |
|
|
H, W = rgb_orig.shape[:2] |
|
|
|
|
|
|
|
|
scale = SQUARE_DIM / max(H, W) |
|
|
nw, nh = int(W * scale), int(H * scale) |
|
|
top, left = (SQUARE_DIM - nh) // 2, (SQUARE_DIM - nw) // 2 |
|
|
|
|
|
rgb_sq = cv2.resize(rgb_orig, (nw, nh), interpolation=cv2.INTER_LINEAR) |
|
|
rgb_sq = cv2.copyMakeBorder(rgb_sq, top, SQUARE_DIM-nh-top, left, SQUARE_DIM-nw-left, cv2.BORDER_CONSTANT, value=0) |
|
|
|
|
|
|
|
|
predictor.set_image(rgb_sq) |
|
|
image_emb = predictor._features["image_embed"][-1].unsqueeze(0) |
|
|
hi = [lvl[-1].unsqueeze(0) for lvl in predictor._features["high_res_feats"]] |
|
|
|
|
|
|
|
|
with tempfile.NamedTemporaryFile(suffix=".jpg") as tmp: |
|
|
image_pil.save(tmp.name) |
|
|
|
|
|
sp, dp = plm_model([full_prompt], image_emb.shape[2], image_emb.shape[3], [tmp.name]) |
|
|
|
|
|
|
|
|
dec = sam_model.sam_mask_decoder |
|
|
dev, dtype = next(dec.parameters()).device, next(dec.parameters()).dtype |
|
|
|
|
|
low, scores, _, _ = dec( |
|
|
image_embeddings=image_emb.to(dev, dtype), |
|
|
image_pe=sam_model.sam_prompt_encoder.get_dense_pe().to(dev, dtype), |
|
|
sparse_prompt_embeddings=sp.to(dev, dtype), |
|
|
dense_prompt_embeddings=dp.to(dev, dtype), |
|
|
multimask_output=True, repeat_image=False, |
|
|
high_res_features=[h.to(dev, dtype) for h in hi] |
|
|
) |
|
|
|
|
|
|
|
|
logits = predictor._transforms.postprocess_masks(low, (SQUARE_DIM, SQUARE_DIM)) |
|
|
best_idx = scores.argmax().item() |
|
|
|
|
|
logit_crop = logits[0, best_idx, top:top+nh, left:left+nw].unsqueeze(0).unsqueeze(0) |
|
|
logit_full = F.interpolate(logit_crop, size=(H, W), mode="bilinear", align_corners=False)[0, 0] |
|
|
|
|
|
prob = torch.sigmoid(logit_full).float().cpu().numpy() |
|
|
|
|
|
|
|
|
heatmap_cv = cv2.applyColorMap((prob * 255).astype(np.uint8), cv2.COLORMAP_JET) |
|
|
heatmap_rgb = cv2.cvtColor(heatmap_cv, cv2.COLOR_BGR2RGB) |
|
|
|
|
|
mask = (prob > threshold).astype(np.uint8) * 255 |
|
|
|
|
|
overlay = make_overlay(rgb_orig, mask, key=full_prompt) |
|
|
|
|
|
return overlay, Image.fromarray(heatmap_rgb), prob |
|
|
|
|
|
except Exception: |
|
|
traceback.print_exc() |
|
|
raise gr.Error("Inference failed. Please check logs.") |
|
|
finally: |
|
|
|
|
|
sam_model.to("cpu") |
|
|
plm_model.to("cpu") |
|
|
torch.cuda.empty_cache() |
|
|
|
|
|
def update_threshold_ui(image_pil, user_text, threshold, cached_prob): |
|
|
"""Real-time update using CPU only (no GPU quota usage).""" |
|
|
if image_pil is None or cached_prob is None: |
|
|
return None |
|
|
rgb_orig = np.array(image_pil.convert("RGB")) |
|
|
mask = (cached_prob > threshold).astype(np.uint8) * 255 |
|
|
|
|
|
full_prompt = f"Segment the {user_text.strip()}" if user_text else "mask" |
|
|
return make_overlay(rgb_orig, mask, key=full_prompt) |
|
|
|
|
|
|
|
|
|
|
|
custom_css = """ |
|
|
h1 { |
|
|
text-align: center; |
|
|
display: block; |
|
|
} |
|
|
.subtitle { |
|
|
text-align: center; |
|
|
font-size: 1.1em; |
|
|
margin-bottom: 20px; |
|
|
} |
|
|
.prefix-container { |
|
|
display: flex; |
|
|
align-items: center; |
|
|
justify-content: center; |
|
|
height: 100%; |
|
|
/* Match Gradio Textbox font style */ |
|
|
font-family: var(--font-sans); |
|
|
font-size: var(--input-text-size); |
|
|
font-weight: 400; |
|
|
color: var(--body-text-color); |
|
|
} |
|
|
/* Force the HTML container to match height of neighbor */ |
|
|
.prefix-box { |
|
|
display: flex; |
|
|
flex-direction: column; |
|
|
justify-content: center; |
|
|
height: 100% !important; |
|
|
min-height: 42px; /* Standard Gradio input height fallback */ |
|
|
} |
|
|
""" |
|
|
|
|
|
theme = gr.themes.Soft( |
|
|
primary_hue="blue", |
|
|
neutral_hue="slate", |
|
|
).set( |
|
|
button_primary_background_fill="*primary_600", |
|
|
button_primary_background_fill_hover="*primary_700", |
|
|
) |
|
|
|
|
|
def example_handler(text): |
|
|
"""Callback to strip the prefix when an example is clicked""" |
|
|
prefix = "Segment the " |
|
|
if text and text.startswith(prefix): |
|
|
return text[len(prefix):] |
|
|
return text |
|
|
|
|
|
with gr.Blocks(theme=theme, css=custom_css, title="ConvSeg-Net Demo") as demo: |
|
|
prob_state = gr.State() |
|
|
|
|
|
|
|
|
gr.Markdown("# 🧩 Conversational Image Segmentation") |
|
|
gr.Markdown( |
|
|
"<div class='subtitle'>Grounding abstract concepts and physics-based reasoning into pixel-accurate masks.<br>" |
|
|
"Powered by <b>SAM2 + Qwen2.5-VL</b></div>" |
|
|
) |
|
|
|
|
|
with gr.Row(): |
|
|
|
|
|
with gr.Column(scale=1): |
|
|
input_image = gr.Image(type="pil", label="Input Image", height=400) |
|
|
|
|
|
|
|
|
gr.Markdown("**Conversational Prompt**") |
|
|
with gr.Group(): |
|
|
with gr.Row(equal_height=True): |
|
|
|
|
|
gr.HTML( |
|
|
"<div class='prefix-container'>Segment the</div>", |
|
|
elem_classes="prefix-box", |
|
|
min_width=100, |
|
|
max_width=100 |
|
|
) |
|
|
|
|
|
text_prompt = gr.Textbox( |
|
|
show_label=False, |
|
|
container=False, |
|
|
placeholder="object that is prone to rolling...", |
|
|
lines=1, |
|
|
scale=5 |
|
|
) |
|
|
|
|
|
with gr.Accordion("⚙️ Advanced Options", open=False): |
|
|
threshold_slider = gr.Slider( |
|
|
0.0, 1.0, value=0.5, step=0.01, |
|
|
label="Mask Confidence Threshold", |
|
|
info="Adjust after running to refine the mask edges." |
|
|
) |
|
|
|
|
|
run_btn = gr.Button("🚀 Run Segmentation", variant="primary", size="lg") |
|
|
|
|
|
|
|
|
with gr.Column(scale=1): |
|
|
out_overlay = gr.Image(label="Segmentation Result", type="pil") |
|
|
out_heatmap = gr.Image(label="Confidence Heatmap", type="pil") |
|
|
|
|
|
|
|
|
|
|
|
hidden_example_text = gr.Textbox(visible=False) |
|
|
|
|
|
gr.Markdown("### 📝 Try Examples") |
|
|
gr.Examples( |
|
|
examples=[ |
|
|
["./examples/elephants.png", "Segment the elephant acting as the vanguard of the herd."], |
|
|
["./examples/luggage.png", "Segment the luggage resting precariously."], |
|
|
["./examples/veggies.png", "Segment the produce harvested from underground."], |
|
|
], |
|
|
inputs=[input_image, hidden_example_text], |
|
|
) |
|
|
|
|
|
|
|
|
hidden_example_text.change( |
|
|
fn=example_handler, |
|
|
inputs=hidden_example_text, |
|
|
outputs=text_prompt |
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
run_btn.click( |
|
|
fn=run_prediction, |
|
|
inputs=[input_image, text_prompt, threshold_slider], |
|
|
outputs=[out_overlay, out_heatmap, prob_state] |
|
|
) |
|
|
|
|
|
|
|
|
threshold_slider.change( |
|
|
fn=update_threshold_ui, |
|
|
inputs=[input_image, text_prompt, threshold_slider, prob_state], |
|
|
outputs=[out_overlay] |
|
|
) |
|
|
|
|
|
if __name__ == "__main__": |
|
|
demo.queue().launch() |