Spaces:
Running
on
Zero
Running
on
Zero
| # Author: Huzheng Yang | |
| # %% | |
| import copy | |
| from functools import partial | |
| from io import BytesIO | |
| import os | |
| from einops import rearrange | |
| from matplotlib import pyplot as plt | |
| import matplotlib | |
| USE_HUGGINGFACE_ZEROGPU = os.getenv("USE_HUGGINGFACE_ZEROGPU", "False").lower() in ["true", "1", "yes"] | |
| DOWNLOAD_ALL_MODELS_DATASETS = os.getenv("DOWNLOAD_ALL_MODELS_DATASETS", "False").lower() in ["true", "1", "yes"] | |
| if USE_HUGGINGFACE_ZEROGPU: # huggingface ZeroGPU, dynamic GPU allocation | |
| try: | |
| import spaces | |
| except: | |
| USE_HUGGINGFACE_ZEROGPU = False | |
| if USE_HUGGINGFACE_ZEROGPU: | |
| BATCH_SIZE = 1 | |
| else: # run on local machine | |
| BATCH_SIZE = 1 | |
| import gradio as gr | |
| import torch | |
| import torch.nn.functional as F | |
| from PIL import Image | |
| import numpy as np | |
| import time | |
| import threading | |
| from ncut_pytorch.backbone import extract_features, load_model | |
| from ncut_pytorch.backbone import MODEL_DICT, LAYER_DICT, RES_DICT | |
| from ncut_pytorch import NCUT | |
| from ncut_pytorch import eigenvector_to_rgb, rotate_rgb_cube | |
| DATASET_TUPS = [ | |
| # (name, num_classes) | |
| ('UCSC-VLAA/Recap-COCO-30K', None), | |
| ('nateraw/pascal-voc-2012', None), | |
| ('johnowhitaker/imagenette2-320', 10), | |
| ('jainr3/diffusiondb-pixelart', None), | |
| ('nielsr/CelebA-faces', None), | |
| ('JapanDegitalMaterial/Places_in_Japan', None), | |
| ('Borismile/Anime-dataset', None), | |
| ('Multimodal-Fatima/CUB_train', 200), | |
| ('mrm8488/ImageNet1K-val', 1000), | |
| ("trashsock/hands-images", 8), | |
| ] | |
| DATASET_NAMES = [tup[0] for tup in DATASET_TUPS] | |
| DATASET_CLASSES = [tup[1] for tup in DATASET_TUPS] | |
| from datasets import load_dataset | |
| def download_all_datasets(): | |
| for name in DATASET_NAMES: | |
| print(f"Downloading {name}") | |
| try: | |
| load_dataset(name, trust_remote_code=True) | |
| except Exception as e: | |
| print(f"Error downloading {name}: {e}") | |
| def compute_ncut( | |
| features, | |
| num_eig=100, | |
| num_sample_ncut=10000, | |
| affinity_focal_gamma=0.3, | |
| knn_ncut=10, | |
| knn_tsne=10, | |
| embedding_method="UMAP", | |
| embedding_metric='euclidean', | |
| num_sample_tsne=300, | |
| perplexity=150, | |
| n_neighbors=150, | |
| min_dist=0.1, | |
| sampling_method="fps", | |
| metric="cosine", | |
| progess_start=0.4, | |
| ): | |
| progress = gr.Progress() | |
| logging_str = "" | |
| num_nodes = np.prod(features.shape[:-1]) | |
| if num_nodes / 2 < num_eig: | |
| # raise gr.Error("Number of eigenvectors should be less than half the number of nodes.") | |
| gr.Warning("Number of eigenvectors should be less than half the number of nodes.\n" f"Setting num_eig to {num_nodes // 2 - 1}.") | |
| num_eig = num_nodes // 2 - 1 | |
| logging_str += f"Number of eigenvectors should be less than half the number of nodes.\n" f"Setting num_eig to {num_nodes // 2 - 1}.\n" | |
| start = time.time() | |
| progress(progess_start+0.0, desc="NCut") | |
| eigvecs, eigvals = NCUT( | |
| num_eig=num_eig, | |
| num_sample=num_sample_ncut, | |
| device="cuda" if torch.cuda.is_available() else "cpu", | |
| affinity_focal_gamma=affinity_focal_gamma, | |
| knn=knn_ncut, | |
| sample_method=sampling_method, | |
| distance=metric, | |
| normalize_features=False, | |
| ).fit_transform(features.reshape(-1, features.shape[-1])) | |
| # print(f"NCUT time: {time.time() - start:.2f}s") | |
| logging_str += f"NCUT time: {time.time() - start:.2f}s\n" | |
| start = time.time() | |
| progress(progess_start+0.01, desc="spectral-tSNE") | |
| _, rgb = eigenvector_to_rgb( | |
| eigvecs, | |
| method=embedding_method, | |
| metric=embedding_metric, | |
| num_sample=num_sample_tsne, | |
| perplexity=perplexity, | |
| n_neighbors=n_neighbors, | |
| min_distance=min_dist, | |
| knn=knn_tsne, | |
| device="cuda" if torch.cuda.is_available() else "cpu", | |
| ) | |
| logging_str += f"{embedding_method} time: {time.time() - start:.2f}s\n" | |
| rgb = rgb.reshape(features.shape[:-1] + (3,)) | |
| return rgb, logging_str, eigvecs | |
| def dont_use_too_much_green(image_rgb): | |
| # make sure the foval 40% of the image is red leading | |
| x1, x2 = int(image_rgb.shape[1] * 0.3), int(image_rgb.shape[1] * 0.7) | |
| y1, y2 = int(image_rgb.shape[2] * 0.3), int(image_rgb.shape[2] * 0.7) | |
| sum_values = image_rgb[:, x1:x2, y1:y2].mean((0, 1, 2)) | |
| sorted_indices = sum_values.argsort(descending=True) | |
| image_rgb = image_rgb[:, :, :, sorted_indices] | |
| return image_rgb | |
| def to_pil_images(images, target_size=512, resize=True): | |
| size = images[0].shape[1] | |
| multiplier = target_size // size | |
| res = int(size * multiplier) | |
| pil_images = [ | |
| Image.fromarray((image * 255).cpu().numpy().astype(np.uint8)) | |
| for image in images | |
| ] | |
| if resize: | |
| pil_images = [ | |
| image.resize((res, res), Image.Resampling.NEAREST) | |
| for image in pil_images | |
| ] | |
| return pil_images | |
| def pil_images_to_video(images, output_path, fps=5): | |
| # from pil images to numpy | |
| images = [np.array(image) for image in images] | |
| # print("Saving video to", output_path) | |
| import cv2 | |
| fourcc = cv2.VideoWriter_fourcc(*'mp4v') | |
| height, width, _ = images[0].shape | |
| out = cv2.VideoWriter(output_path, fourcc, fps, (width, height)) | |
| for image in images: | |
| out.write(cv2.cvtColor(image, cv2.COLOR_RGB2BGR)) | |
| out.release() | |
| return output_path | |
| # save up to 100 videos in disk | |
| class VideoCache: | |
| def __init__(self, max_videos=100): | |
| self.max_videos = max_videos | |
| self.videos = {} | |
| def add_video(self, video_path): | |
| if len(self.videos) >= self.max_videos: | |
| pop_path = self.videos.popitem()[0] | |
| try: | |
| os.remove(pop_path) | |
| except: | |
| pass | |
| self.videos[video_path] = video_path | |
| def get_video(self, video_path): | |
| return self.videos.get(video_path, None) | |
| video_cache = VideoCache() | |
| def get_random_path(length=10): | |
| import random | |
| import string | |
| name = ''.join(random.choices(string.ascii_lowercase + string.digits, k=length)) | |
| path = f'/tmp/{name}.mp4' | |
| return path | |
| default_images = ['./images/image_0.jpg', './images/image_1.jpg', './images/image_2.jpg', './images/image_3.jpg', './images/guitar_ego.jpg', './images/image_5.jpg'] | |
| default_outputs = ['./images/image-1.webp', './images/image-2.webp', './images/image-3.webp', './images/image-4.webp', './images/image-5.webp'] | |
| # default_outputs_independent = ['./images/image-6.webp', './images/image-7.webp', './images/image-8.webp', './images/image-9.webp', './images/image-10.webp'] | |
| default_outputs_independent = [] | |
| downscaled_images = ['./images/image_0_small.jpg', './images/image_1_small.jpg', './images/image_2_small.jpg', './images/image_3_small.jpg', './images/image_5_small.jpg'] | |
| downscaled_outputs = default_outputs | |
| example_items = downscaled_images[:3] + downscaled_outputs[:3] | |
| def run_alignedthreemodelattnnodes(images, model, batch_size=16): | |
| use_cuda = torch.cuda.is_available() | |
| device = torch.device("cuda" if use_cuda else "cpu") | |
| if use_cuda: | |
| model = model.to(device) | |
| chunked_idxs = torch.split(torch.arange(images.shape[0]), batch_size) | |
| outputs = [] | |
| for idxs in chunked_idxs: | |
| inp = images[idxs] | |
| if use_cuda: | |
| inp = inp.to(device) | |
| out = model(inp) | |
| # normalize before save | |
| out = F.normalize(out, dim=-1) | |
| outputs.append(out.cpu().float()) | |
| outputs = torch.cat(outputs, dim=0) | |
| return outputs | |
| def _reds_colormap(image): | |
| # normed_data = image / image.max() # Normalize to [0, 1] | |
| normed_data = image | |
| colormap = matplotlib.colormaps['inferno'] # Get the Reds colormap | |
| colored_image = colormap(normed_data) # Apply colormap | |
| return (colored_image[..., :3] * 255).astype(np.uint8) # Convert to RGB | |
| # heatmap images | |
| def apply_reds_colormap(images, size): | |
| # for i_image in range(images.shape[0]): | |
| # images[i_image] -= images[i_image].min() | |
| # images[i_image] /= images[i_image].max() | |
| # normed_data = [_reds_colormap(images[i]) for i in range(images.shape[0])] | |
| # normed_data = np.stack(normed_data) | |
| normed_data = _reds_colormap(images) | |
| normed_data = torch.tensor(normed_data).float() | |
| normed_data = rearrange(normed_data, "b h w c -> b c h w") | |
| normed_data = torch.nn.functional.interpolate(normed_data, size=size, mode="nearest") | |
| normed_data = rearrange(normed_data, "b c h w -> b h w c") | |
| normed_data = normed_data.cpu().numpy().astype(np.uint8) | |
| return normed_data | |
| # Blend heatmap with the original image | |
| def blend_image_with_heatmap(image, heatmap, opacity1=0.5, opacity2=0.5): | |
| blended = (1 - opacity1) * image + opacity2 * heatmap | |
| return blended.astype(np.uint8) | |
| def make_cluster_plot(eigvecs, images, h=64, w=64, progess_start=0.6): | |
| progress = gr.Progress() | |
| progress(progess_start, desc="Finding Clusters by FPS") | |
| device = 'cuda' if torch.cuda.is_available() else 'cpu' | |
| eigvecs = eigvecs.to(device) | |
| from ncut_pytorch.ncut_pytorch import farthest_point_sampling | |
| magnitude = torch.norm(eigvecs, dim=-1) | |
| p = 0.8 | |
| top_p_idx = magnitude.argsort(descending=True)[:int(p * magnitude.shape[0])] | |
| num_samples = 300 | |
| if num_samples > top_p_idx.shape[0]: | |
| num_samples = top_p_idx.shape[0] | |
| fps_idx = farthest_point_sampling(eigvecs[top_p_idx], num_samples) | |
| fps_idx = top_p_idx[fps_idx] | |
| # fps round 2 on the heatmap | |
| left = eigvecs[fps_idx, :].clone() | |
| right = eigvecs.clone() | |
| left = F.normalize(left, dim=-1) | |
| right = F.normalize(right, dim=-1) | |
| heatmap = left @ right.T | |
| heatmap = F.normalize(heatmap, dim=-1) | |
| num_samples = 80 | |
| if num_samples > fps_idx.shape[0]: | |
| num_samples = fps_idx.shape[0] | |
| r2_fps_idx = farthest_point_sampling(heatmap, num_samples) | |
| fps_idx = fps_idx[r2_fps_idx] | |
| # downsample to 256x256 | |
| images = F.interpolate(images, (256, 256), mode="bilinear") | |
| images = images.cpu().numpy() | |
| images = images.transpose(0, 2, 3, 1) | |
| images = images * 255 | |
| images = images.astype(np.uint8) | |
| # sort the fps_idx by the mean of the heatmap | |
| fps_heatmaps = {} | |
| sort_values = [] | |
| top3_image_idx = {} | |
| for _, idx in enumerate(fps_idx): | |
| heatmap = F.cosine_similarity(eigvecs, eigvecs[idx][None], dim=-1) | |
| # def top_percentile(tensor, p=0.8, max_size=10000): | |
| # tensor = tensor.clone().flatten() | |
| # if tensor.shape[0] > max_size: | |
| # tensor = tensor[torch.randperm(tensor.shape[0])[:max_size]] | |
| # return tensor.quantile(p) | |
| # top_p = top_percentile(heatmap, p=0.5) | |
| top_p = 0.5 | |
| heatmap = heatmap.reshape(-1, h, w) | |
| mask = (heatmap > top_p).float() | |
| # take top 3 masks only | |
| mask_sort_values = mask.mean((1, 2)) | |
| mask_sort_idx = torch.argsort(mask_sort_values, descending=True) | |
| mask = mask[mask_sort_idx[:3]] | |
| sort_values.append(mask.mean().item()) | |
| # fps_heatmaps[idx.item()] = heatmap.cpu() | |
| fps_heatmaps[idx.item()] = heatmap[mask_sort_idx[:3]].cpu() | |
| top3_image_idx[idx.item()] = mask_sort_idx[:3] | |
| # do the sorting | |
| _sort_idx = torch.tensor(sort_values).argsort(descending=True) | |
| fps_idx = fps_idx[_sort_idx] | |
| # reverse the fps_idx | |
| # fps_idx = fps_idx.flip(0) | |
| # discard the big clusters | |
| fps_idx = fps_idx[10:] | |
| # shuffle the fps_idx | |
| fps_idx = fps_idx[torch.randperm(fps_idx.shape[0])] | |
| fig_images = [] | |
| i_cluster = 0 | |
| num_plots = 10 | |
| plot_step_float = (1.0 - progess_start) / num_plots | |
| for i_fig in range(num_plots): | |
| progress(progess_start + i_fig * plot_step_float, desc="Plotting Clusters") | |
| fig, axs = plt.subplots(3, 5, figsize=(15, 9)) | |
| for ax in axs.flatten(): | |
| ax.axis("off") | |
| for j, idx in enumerate(fps_idx[i_fig*5:i_fig*5+5]): | |
| heatmap = fps_heatmaps[idx.item()] | |
| # mask = (heatmap > 0.1).float() | |
| # sorted_image_idxs = torch.argsort(mask.mean((1, 2)), descending=True) | |
| size = (images.shape[1], images.shape[2]) | |
| heatmap = apply_reds_colormap(heatmap, size) | |
| # for i, image_idx in enumerate(sorted_image_idxs[:3]): | |
| for i, image_idx in enumerate(top3_image_idx[idx.item()]): | |
| # _heatmap = blend_image_with_heatmap(images[image_idx], heatmap[image_idx]) | |
| _heatmap = blend_image_with_heatmap(images[image_idx], heatmap[i]) | |
| axs[i, j].imshow(_heatmap) | |
| if i == 0: | |
| axs[i, j].set_title(f"cluster {i_cluster+1}", fontsize=24) | |
| i_cluster += 1 | |
| plt.tight_layout(h_pad=0.5, w_pad=0.3) | |
| buf = BytesIO() | |
| plt.savefig(buf, bbox_inches='tight', dpi=72) | |
| buf.seek(0) # Move to the start of the BytesIO buffer | |
| img = Image.open(buf) | |
| img = img.convert("RGB") | |
| img = copy.deepcopy(img) | |
| buf.close() | |
| fig_images.append(img) | |
| plt.close() | |
| # plt.imshow(img) | |
| # plt.axis("off") | |
| # plt.show() | |
| return fig_images | |
| def ncut_run( | |
| model, | |
| images, | |
| model_name="DiNO(dino_vitb8_448)", | |
| layer=10, | |
| num_eig=100, | |
| node_type="block", | |
| affinity_focal_gamma=0.5, | |
| num_sample_ncut=10000, | |
| knn_ncut=10, | |
| embedding_method="tsne_3d", | |
| embedding_metric='euclidean', | |
| num_sample_tsne=1000, | |
| knn_tsne=10, | |
| perplexity=500, | |
| n_neighbors=500, | |
| min_dist=0.1, | |
| sampling_method="fps", | |
| old_school_ncut=False, | |
| recursion=False, | |
| recursion_l2_n_eigs=50, | |
| recursion_l3_n_eigs=20, | |
| recursion_metric="euclidean", | |
| recursion_l1_gamma=0.5, | |
| recursion_l2_gamma=0.5, | |
| recursion_l3_gamma=0.5, | |
| video_output=False, | |
| is_lisa=False, | |
| lisa_prompt1="", | |
| lisa_prompt2="", | |
| lisa_prompt3="", | |
| ): | |
| progress = gr.Progress() | |
| progress(0.2, desc="Feature Extraction") | |
| logging_str = "" | |
| if "AlignedThreeModelAttnNodes" == model_name: | |
| # dirty patch for the alignedcut paper | |
| resolution = (224, 224) | |
| else: | |
| resolution = RES_DICT[model_name] | |
| logging_str += f"Resolution: {resolution}\n" | |
| if perplexity >= num_sample_tsne or n_neighbors >= num_sample_tsne: | |
| # raise gr.Error("Perplexity must be less than the number of samples for t-SNE.") | |
| gr.Warning("Perplexity/n_neighbors must be less than the number of samples.\n" f"Setting Perplexity to {num_sample_tsne-1}.") | |
| logging_str += f"Perplexity/n_neighbors must be less than the number of samples.\n" f"Setting Perplexity to {num_sample_tsne-1}.\n" | |
| perplexity = num_sample_tsne - 1 | |
| n_neighbors = num_sample_tsne - 1 | |
| if torch.cuda.is_available(): | |
| torch.cuda.empty_cache() | |
| node_type = node_type.split(":")[0].strip() | |
| start = time.time() | |
| if "AlignedThreeModelAttnNodes" == model_name: | |
| # dirty patch for the alignedcut paper | |
| features = run_alignedthreemodelattnnodes(images, model, batch_size=BATCH_SIZE) | |
| elif is_lisa == True: | |
| # dirty patch for the LISA model | |
| features = [] | |
| with torch.no_grad(): | |
| model = model.cuda() | |
| images = images.cuda() | |
| lisa_prompts = [lisa_prompt1, lisa_prompt2, lisa_prompt3] | |
| for prompt in lisa_prompts: | |
| import bleach | |
| prompt = bleach.clean(prompt) | |
| prompt = prompt.strip() | |
| # print(prompt) | |
| # # copy the sting to a new string | |
| # copy_s = copy.copy(prompt) | |
| feature = model(images, input_str=prompt)[node_type][0] | |
| feature = F.normalize(feature, dim=-1) | |
| features.append(feature.cpu().float()) | |
| features = torch.stack(features) | |
| else: | |
| features = extract_features( | |
| images, model, node_type=node_type, layer=layer-1, batch_size=BATCH_SIZE | |
| ) | |
| # print(f"Feature extraction time (gpu): {time.time() - start:.2f}s") | |
| logging_str += f"Backbone time: {time.time() - start:.2f}s\n" | |
| progress(0.4, desc="NCut") | |
| if recursion: | |
| rgbs = [] | |
| recursion_gammas = [recursion_l1_gamma, recursion_l2_gamma, recursion_l3_gamma] | |
| inp = features | |
| progress_start = 0.4 | |
| for i, n_eigs in enumerate([num_eig, recursion_l2_n_eigs, recursion_l3_n_eigs]): | |
| logging_str += f"Recursion #{i+1}\n" | |
| progress_start += + 0.1 * i | |
| rgb, _logging_str, eigvecs = compute_ncut( | |
| inp, | |
| num_eig=n_eigs, | |
| num_sample_ncut=num_sample_ncut, | |
| affinity_focal_gamma=recursion_gammas[i], | |
| knn_ncut=knn_ncut, | |
| knn_tsne=knn_tsne, | |
| num_sample_tsne=num_sample_tsne, | |
| embedding_method=embedding_method, | |
| embedding_metric=embedding_metric, | |
| perplexity=perplexity, | |
| n_neighbors=n_neighbors, | |
| min_dist=min_dist, | |
| sampling_method=sampling_method, | |
| metric="cosine" if i == 0 else recursion_metric, | |
| progess_start=progress_start, | |
| ) | |
| logging_str += _logging_str | |
| if "AlignedThreeModelAttnNodes" == model_name: | |
| # dirty patch for the alignedcut paper | |
| start = time.time() | |
| progress(progress_start + 0.09, desc=f"Plotting Recursion {i+1}") | |
| pil_images = [] | |
| for i_image in range(rgb.shape[0]): | |
| _im = plot_one_image_36_grid(images[i_image], rgb[i_image]) | |
| pil_images.append(_im) | |
| rgbs.append(pil_images) | |
| logging_str += f"plot time: {time.time() - start:.2f}s\n" | |
| else: | |
| rgb = dont_use_too_much_green(rgb) | |
| rgbs.append(to_pil_images(rgb)) | |
| inp = eigvecs.reshape(*features.shape[:-1], -1) | |
| if recursion_metric == "cosine": | |
| inp = F.normalize(inp, dim=-1) | |
| return rgbs[0], rgbs[1], rgbs[2], logging_str | |
| if old_school_ncut: # individual images | |
| logging_str += "Running NCut for each image independently\n" | |
| rgb = [] | |
| progress_start = 0.4 | |
| step_float = 0.6 / features.shape[0] | |
| for i_image in range(features.shape[0]): | |
| logging_str += f"Image #{i_image+1}\n" | |
| feature = features[i_image] | |
| _rgb, _logging_str, _ = compute_ncut( | |
| feature[None], | |
| num_eig=num_eig, | |
| num_sample_ncut=30000, | |
| affinity_focal_gamma=affinity_focal_gamma, | |
| knn_ncut=1, | |
| knn_tsne=10, | |
| num_sample_tsne=300, | |
| embedding_method=embedding_method, | |
| embedding_metric=embedding_metric, | |
| perplexity=perplexity, | |
| n_neighbors=n_neighbors, | |
| min_dist=min_dist, | |
| sampling_method=sampling_method, | |
| progess_start=progress_start+step_float*i_image, | |
| ) | |
| logging_str += _logging_str | |
| rgb.append(_rgb[0]) | |
| cluster_images = None | |
| if not old_school_ncut: # ailgnedcut, joint across all images | |
| rgb, _logging_str, eigvecs = compute_ncut( | |
| features, | |
| num_eig=num_eig, | |
| num_sample_ncut=num_sample_ncut, | |
| affinity_focal_gamma=affinity_focal_gamma, | |
| knn_ncut=knn_ncut, | |
| knn_tsne=knn_tsne, | |
| num_sample_tsne=num_sample_tsne, | |
| embedding_method=embedding_method, | |
| embedding_metric=embedding_metric, | |
| perplexity=perplexity, | |
| n_neighbors=n_neighbors, | |
| min_dist=min_dist, | |
| sampling_method=sampling_method, | |
| ) | |
| logging_str += _logging_str | |
| if "AlignedThreeModelAttnNodes" == model_name: | |
| # dirty patch for the alignedcut paper | |
| start = time.time() | |
| progress(0.6, desc="Plotting") | |
| pil_images = [] | |
| for i_image in range(rgb.shape[0]): | |
| _im = plot_one_image_36_grid(images[i_image], rgb[i_image]) | |
| pil_images.append(_im) | |
| logging_str += f"plot time: {time.time() - start:.2f}s\n" | |
| return pil_images, logging_str | |
| if is_lisa == True: | |
| # dirty patch for the LISA model | |
| galleries = [] | |
| for i_prompt in range(len(lisa_prompts)): | |
| _rgb = rgb[i_prompt] | |
| galleries.append(to_pil_images(_rgb)) | |
| return *galleries, logging_str | |
| rgb = dont_use_too_much_green(rgb) | |
| if not video_output: | |
| start = time.time() | |
| progress_start = 0.6 | |
| progress(progress_start, desc="Plotting Clusters") | |
| h, w = features.shape[1], features.shape[2] | |
| if torch.cuda.is_available(): | |
| images = images.cuda() | |
| _images = reverse_transform_image(images, stablediffusion="stable" in model_name.lower()) | |
| cluster_images = make_cluster_plot(eigvecs, _images, h=h, w=w, progess_start=progress_start) | |
| logging_str += f"plot time: {time.time() - start:.2f}s\n" | |
| if video_output: | |
| progress(0.8, desc="Saving Video") | |
| video_path = get_random_path() | |
| video_cache.add_video(video_path) | |
| pil_images_to_video(to_pil_images(rgb), video_path) | |
| return video_path, logging_str | |
| return to_pil_images(rgb), cluster_images, logging_str | |
| def _ncut_run(*args, **kwargs): | |
| n_ret = kwargs.pop("n_ret", 1) | |
| try: | |
| if torch.cuda.is_available(): | |
| torch.cuda.empty_cache() | |
| ret = ncut_run(*args, **kwargs) | |
| if torch.cuda.is_available(): | |
| torch.cuda.empty_cache() | |
| ret = list(ret)[:n_ret] + [ret[-1]] | |
| return ret | |
| except Exception as e: | |
| gr.Error(str(e)) | |
| if torch.cuda.is_available(): | |
| torch.cuda.empty_cache() | |
| return *(None for _ in range(n_ret)), "Error: " + str(e) | |
| # ret = ncut_run(*args, **kwargs) | |
| # ret = list(ret)[:n_ret] + [ret[-1]] | |
| # return ret | |
| if USE_HUGGINGFACE_ZEROGPU: | |
| def quick_run(*args, **kwargs): | |
| return _ncut_run(*args, **kwargs) | |
| def long_run(*args, **kwargs): | |
| return _ncut_run(*args, **kwargs) | |
| def longer_run(*args, **kwargs): | |
| return _ncut_run(*args, **kwargs) | |
| def super_duper_long_run(*args, **kwargs): | |
| return _ncut_run(*args, **kwargs) | |
| def cpu_run(*args, **kwargs): | |
| return _ncut_run(*args, **kwargs) | |
| if not USE_HUGGINGFACE_ZEROGPU: | |
| def quick_run(*args, **kwargs): | |
| return _ncut_run(*args, **kwargs) | |
| def long_run(*args, **kwargs): | |
| return _ncut_run(*args, **kwargs) | |
| def longer_run(*args, **kwargs): | |
| return _ncut_run(*args, **kwargs) | |
| def super_duper_long_run(*args, **kwargs): | |
| return _ncut_run(*args, **kwargs) | |
| def cpu_run(*args, **kwargs): | |
| return _ncut_run(*args, **kwargs) | |
| def extract_video_frames(video_path, max_frames=100): | |
| from decord import VideoReader | |
| vr = VideoReader(video_path) | |
| num_frames = len(vr) | |
| if num_frames > max_frames: | |
| gr.Warning(f"Video has {num_frames} frames. Only using {max_frames} frames. Evenly spaced.") | |
| frame_idx = np.linspace(0, num_frames - 1, max_frames, dtype=int).tolist() | |
| else: | |
| frame_idx = list(range(num_frames)) | |
| frames = vr.get_batch(frame_idx).asnumpy() | |
| # return as list of PIL images | |
| return [(Image.fromarray(frames[i]), "") for i in range(frames.shape[0])] | |
| def transform_image(image, resolution=(1024, 1024), stablediffusion=False): | |
| image = image.convert('RGB').resize(resolution, Image.LANCZOS) | |
| # Convert to torch tensor | |
| image = torch.tensor(np.array(image).transpose(2, 0, 1)).float() | |
| image = image / 255 | |
| # Normalize | |
| if not stablediffusion: | |
| mean = [0.485, 0.456, 0.406] | |
| std = [0.229, 0.224, 0.225] | |
| image = (image - torch.tensor(mean).view(3, 1, 1)) / torch.tensor(std).view(3, 1, 1) | |
| if stablediffusion: | |
| image = image * 2 - 1 | |
| return image | |
| def reverse_transform_image(image, stablediffusion=False): | |
| if stablediffusion: | |
| image = (image + 1) / 2 | |
| else: | |
| mean = torch.tensor([0.485, 0.456, 0.406]).view(3, 1, 1).to(image.device) | |
| std = torch.tensor([0.229, 0.224, 0.225]).view(3, 1, 1).to(image.device) | |
| image = image * std + mean | |
| image = torch.clamp(image, 0, 1) | |
| return image | |
| def plot_one_image_36_grid(original_image, tsne_rgb_images): | |
| mean = [0.485, 0.456, 0.406] | |
| std = [0.229, 0.224, 0.225] | |
| original_image = original_image * torch.tensor(std).view(3, 1, 1) + torch.tensor(mean).view(3, 1, 1) | |
| original_image = torch.clamp(original_image, 0, 1) | |
| fig = plt.figure(figsize=(20, 4)) | |
| grid = plt.GridSpec(3, 14, hspace=0.1, wspace=0.1) | |
| ax1 = fig.add_subplot(grid[0:2, 0:2]) | |
| img = original_image.cpu().float().numpy().transpose(1, 2, 0) | |
| def convert_and_pad_image(np_array, pad_size=20): | |
| """ | |
| Converts a NumPy array of shape (height, width, 3) to a PNG image | |
| and pads the right and bottom sides with a transparent background. | |
| Args: | |
| np_array (numpy.ndarray): Input NumPy array of shape (height, width, 3) | |
| pad_size (int, optional): Number of pixels to pad on the right and bottom sides. Default is 20. | |
| Returns: | |
| PIL.Image: Padded PNG image with transparent background | |
| """ | |
| # Convert NumPy array to PIL Image | |
| img = Image.fromarray(np_array) | |
| # Get the original size | |
| width, height = img.size | |
| # Create a new image with padding and transparent background | |
| new_width = width + pad_size | |
| new_height = height + pad_size | |
| padded_img = Image.new('RGBA', (new_width, new_height), color=(255, 255, 255, 0)) | |
| # Paste the original image onto the padded image | |
| padded_img.paste(img, (0, 0)) | |
| return padded_img | |
| img = convert_and_pad_image((img*255).astype(np.uint8)) | |
| ax1.imshow(img) | |
| ax1.axis('off') | |
| model_names = ['CLIP', 'DINO', 'MAE'] | |
| for i_model, model_name in enumerate(model_names): | |
| for i_layer in range(12): | |
| ax = fig.add_subplot(grid[i_model, i_layer+2]) | |
| ax.imshow(tsne_rgb_images[i_layer+12*i_model].cpu().float().numpy()) | |
| ax.axis('off') | |
| if i_model == 0: | |
| ax.set_title(f'Layer{i_layer}', fontsize=16) | |
| if i_layer == 0: | |
| ax.text(-0.1, 0.5, model_name, va="center", ha="center", fontsize=16, transform=ax.transAxes, rotation=90,) | |
| plt.tight_layout() | |
| buf = BytesIO() | |
| plt.savefig(buf, bbox_inches='tight', pad_inches=0, dpi=100) | |
| buf.seek(0) # Move to the start of the BytesIO buffer | |
| img = Image.open(buf) | |
| img = img.convert("RGB") | |
| img = copy.deepcopy(img) | |
| buf.close() | |
| plt.close() | |
| return img | |
| def load_alignedthreemodel(): | |
| import sys | |
| if "alignedthreeattn" not in sys.path: | |
| for _ in range(3): | |
| os.system("git clone https://huggingface.co/huzey/alignedthreeattn >> /dev/null 2>&1") | |
| os.system("git -C alignedthreeattn pull >> /dev/null 2>&1") | |
| # add to path | |
| sys.path.append("alignedthreeattn") | |
| from alignedthreeattn.alignedthreeattn_model import ThreeAttnNodes | |
| align_weights = torch.load("alignedthreeattn/align_weights.pth") | |
| model = ThreeAttnNodes(align_weights) | |
| return model | |
| promptable_diffusion_models = ["Diffusion(stabilityai/stable-diffusion-2)", "Diffusion(CompVis/stable-diffusion-v1-4)"] | |
| promptable_segmentation_models = ["LISA(xinlai/LISA-7B-v1)"] | |
| def run_fn( | |
| images, | |
| model_name="DiNO(dino_vitb8_448)", | |
| layer=10, | |
| num_eig=100, | |
| node_type="block", | |
| positive_prompt="", | |
| negative_prompt="", | |
| is_lisa=False, | |
| lisa_prompt1="", | |
| lisa_prompt2="", | |
| lisa_prompt3="", | |
| affinity_focal_gamma=0.5, | |
| num_sample_ncut=10000, | |
| knn_ncut=10, | |
| embedding_method="tsne_3d", | |
| embedding_metric='euclidean', | |
| num_sample_tsne=300, | |
| knn_tsne=10, | |
| perplexity=150, | |
| n_neighbors=150, | |
| min_dist=0.1, | |
| sampling_method="fps", | |
| old_school_ncut=False, | |
| max_frames=100, | |
| recursion=False, | |
| recursion_l2_n_eigs=50, | |
| recursion_l3_n_eigs=20, | |
| recursion_metric="euclidean", | |
| recursion_l1_gamma=0.5, | |
| recursion_l2_gamma=0.5, | |
| recursion_l3_gamma=0.5, | |
| n_ret=1, | |
| ): | |
| progress=gr.Progress() | |
| progress(0, desc="Starting") | |
| if images is None: | |
| gr.Warning("No images selected.") | |
| return *(None for _ in range(n_ret)), "No images selected." | |
| progress(0.05, desc="Processing Images") | |
| video_output = False | |
| if isinstance(images, str): | |
| images = extract_video_frames(images, max_frames=max_frames) | |
| video_output = True | |
| if sampling_method == "fps": | |
| sampling_method = "farthest" | |
| # resize the images before acquiring GPU | |
| if "AlignedThreeModelAttnNodes" == model_name: | |
| # dirty patch for the alignedcut paper | |
| resolution = (224, 224) | |
| else: | |
| resolution = RES_DICT[model_name] | |
| images = [tup[0] for tup in images] | |
| stablediffusion = True if "Diffusion" in model_name else False | |
| images = [transform_image(image, resolution=resolution, stablediffusion=stablediffusion) for image in images] | |
| images = torch.stack(images) | |
| progress(0.1, desc="Downloading Model") | |
| if is_lisa: | |
| import subprocess | |
| import sys | |
| import importlib | |
| gr.Warning("LISA model is not compatible with the current version of transformers. Please contact the LISA and Llava author for update.") | |
| gr.Warning("This is a dirty patch for the LISA model. switch to the old version of transformers.") | |
| gr.Warning("Not garanteed to work.") | |
| # LISA and Llava is not compatible with the current version of transformers | |
| # please contact the author for update | |
| # this is a dirty patch for the LISA model | |
| # pre-import the SD3 pipeline | |
| from diffusers import StableDiffusion3Pipeline | |
| # unloading the current transformers | |
| for module in list(sys.modules.keys()): | |
| if "transformers" in module: | |
| del sys.modules[module] | |
| def install_transformers_version(version, target_dir): | |
| """Install a specific version of transformers to a target directory.""" | |
| if not os.path.exists(target_dir): | |
| os.makedirs(target_dir) | |
| # Use subprocess to run the pip command | |
| # subprocess.check_call([sys.executable, '-m', 'pip', 'install', f'transformers=={version}', '-t', target_dir]) | |
| os.system(f"{sys.executable} -m pip install transformers=={version} -t {target_dir} >> /dev/null 2>&1") | |
| target_dir = '/tmp/lisa_transformers_v433' | |
| if not os.path.exists(target_dir): | |
| install_transformers_version('4.33.0', target_dir) | |
| # Add the new version path to sys.path | |
| sys.path.insert(0, target_dir) | |
| transformers = importlib.import_module("transformers") | |
| if not is_lisa: | |
| import subprocess | |
| import sys | |
| import importlib | |
| # remove the LISA model from the sys.path | |
| if "/tmp/lisa_transformers_v433" in sys.path: | |
| sys.path.remove("/tmp/lisa_transformers_v433") | |
| transformers = importlib.import_module("transformers") | |
| if "AlignedThreeModelAttnNodes" == model_name: | |
| # dirty patch for the alignedcut paper | |
| model = load_alignedthreemodel() | |
| else: | |
| model = load_model(model_name) | |
| if "stable" in model_name.lower() and "diffusion" in model_name.lower(): | |
| model.timestep = layer | |
| layer = 1 | |
| if model_name in promptable_diffusion_models: | |
| model.positive_prompt = positive_prompt | |
| model.negative_prompt = negative_prompt | |
| kwargs = { | |
| "model_name": model_name, | |
| "layer": layer, | |
| "num_eig": num_eig, | |
| "node_type": node_type, | |
| "affinity_focal_gamma": affinity_focal_gamma, | |
| "num_sample_ncut": num_sample_ncut, | |
| "knn_ncut": knn_ncut, | |
| "embedding_method": embedding_method, | |
| "embedding_metric": embedding_metric, | |
| "num_sample_tsne": num_sample_tsne, | |
| "knn_tsne": knn_tsne, | |
| "perplexity": perplexity, | |
| "n_neighbors": n_neighbors, | |
| "min_dist": min_dist, | |
| "sampling_method": sampling_method, | |
| "old_school_ncut": old_school_ncut, | |
| "recursion": recursion, | |
| "recursion_l2_n_eigs": recursion_l2_n_eigs, | |
| "recursion_l3_n_eigs": recursion_l3_n_eigs, | |
| "recursion_metric": recursion_metric, | |
| "recursion_l1_gamma": recursion_l1_gamma, | |
| "recursion_l2_gamma": recursion_l2_gamma, | |
| "recursion_l3_gamma": recursion_l3_gamma, | |
| "video_output": video_output, | |
| "lisa_prompt1": lisa_prompt1, | |
| "lisa_prompt2": lisa_prompt2, | |
| "lisa_prompt3": lisa_prompt3, | |
| "is_lisa": is_lisa, | |
| "n_ret": n_ret, | |
| } | |
| # print(kwargs) | |
| if old_school_ncut: | |
| return super_duper_long_run(model, images, **kwargs) | |
| if is_lisa: | |
| return super_duper_long_run(model, images, **kwargs) | |
| num_images = len(images) | |
| if num_images >= 100: | |
| return super_duper_long_run(model, images, **kwargs) | |
| if 'diffusion' in model_name.lower(): | |
| return super_duper_long_run(model, images, **kwargs) | |
| if recursion: | |
| return longer_run(model, images, **kwargs) | |
| if num_images >= 50: | |
| return longer_run(model, images, **kwargs) | |
| if old_school_ncut: | |
| return longer_run(model, images, **kwargs) | |
| if num_images >= 10: | |
| return long_run(model, images, **kwargs) | |
| if embedding_method == "UMAP": | |
| if perplexity >= 250 or num_sample_tsne >= 500: | |
| return longer_run(model, images, **kwargs) | |
| return long_run(model, images, **kwargs) | |
| if embedding_method == "t-SNE": | |
| if perplexity >= 250 or num_sample_tsne >= 500: | |
| return long_run(model, images, **kwargs) | |
| return quick_run(model, images, **kwargs) | |
| return quick_run(model, images, **kwargs) | |
| def make_input_images_section(): | |
| gr.Markdown('### Input Images') | |
| input_gallery = gr.Gallery(value=None, label="Select images", show_label=False, elem_id="images", columns=[3], rows=[1], object_fit="contain", height="auto", type="pil", show_share_button=False) | |
| submit_button = gr.Button("🔴 RUN", elem_id="submit_button", variant='primary') | |
| clear_images_button = gr.Button("🗑️Clear", elem_id='clear_button', variant='stop') | |
| return input_gallery, submit_button, clear_images_button | |
| def make_input_video_section(): | |
| # gr.Markdown('### Input Video') | |
| input_gallery = gr.Video(value=None, label="Select video", elem_id="video-input", height="auto", show_share_button=False, interactive=True) | |
| gr.Markdown('_image backbone model is used to extract features from each frame, NCUT is computed on all frames_') | |
| # max_frames_number = gr.Number(100, label="Max frames", elem_id="max_frames") | |
| max_frames_number = gr.Slider(1, 200, step=1, label="Max frames", value=100, elem_id="max_frames") | |
| submit_button = gr.Button("🔴 RUN", elem_id="submit_button", variant='primary') | |
| clear_images_button = gr.Button("🗑️Clear", elem_id='clear_button', variant='stop') | |
| return input_gallery, submit_button, clear_images_button, max_frames_number | |
| def make_dataset_images_section(advanced=False, is_random=False): | |
| gr.Markdown('### Load Datasets') | |
| load_images_button = gr.Button("🔴 Load Images", elem_id="load-images-button", variant='primary') | |
| advanced_radio = gr.Radio(["Basic", "Advanced"], label="Datasets", value="Advanced" if advanced else "Basic", elem_id="advanced-radio") | |
| with gr.Column() as basic_block: | |
| example_gallery = gr.Gallery(value=example_items, label="Example Set A", show_label=False, columns=[3], rows=[2], object_fit="scale-down", height="200px", show_share_button=False, elem_id="example-gallery") | |
| with gr.Column() as advanced_block: | |
| dataset_names = DATASET_NAMES | |
| dataset_classes = DATASET_CLASSES | |
| with gr.Row(): | |
| dataset_dropdown = gr.Dropdown(dataset_names, label="Dataset name", value="mrm8488/ImageNet1K-val", elem_id="dataset", min_width=300) | |
| num_images_slider = gr.Number(10, label="Number of images", elem_id="num_images") | |
| if not is_random: | |
| filter_by_class_checkbox = gr.Checkbox(label="Filter by class", value=True, elem_id="filter_by_class_checkbox") | |
| filter_by_class_text = gr.Textbox(label="Class to select", value="0,33,99", elem_id="filter_by_class_text", info=f"e.g. `0,1,2`. (1000 classes)", visible=True) | |
| is_random_checkbox = gr.Checkbox(label="Random shuffle", value=False, elem_id="random_seed_checkbox") | |
| random_seed_slider = gr.Slider(0, 1000, step=1, label="Random seed", value=1, elem_id="random_seed", visible=False) | |
| if is_random: | |
| filter_by_class_checkbox = gr.Checkbox(label="Filter by class", value=False, elem_id="filter_by_class_checkbox") | |
| filter_by_class_text = gr.Textbox(label="Class to select", value="0,33,99", elem_id="filter_by_class_text", info=f"e.g. `0,1,2`. (1000 classes)", visible=False) | |
| is_random_checkbox = gr.Checkbox(label="Random shuffle", value=True, elem_id="random_seed_checkbox") | |
| random_seed_slider = gr.Slider(0, 1000, step=1, label="Random seed", value=42, elem_id="random_seed", visible=True) | |
| if advanced: | |
| advanced_block.visible = True | |
| basic_block.visible = False | |
| else: | |
| advanced_block.visible = False | |
| basic_block.visible = True | |
| # change visibility | |
| advanced_radio.change(fn=lambda x: gr.update(visible=x=="Advanced"), inputs=advanced_radio, outputs=[advanced_block]) | |
| advanced_radio.change(fn=lambda x: gr.update(visible=x=="Basic"), inputs=advanced_radio, outputs=[basic_block]) | |
| def change_filter_options(dataset_name): | |
| idx = dataset_names.index(dataset_name) | |
| num_classes = dataset_classes[idx] | |
| if num_classes is None: | |
| return (gr.Checkbox(label="Filter by class", value=False, elem_id="filter_by_class_checkbox", visible=False), | |
| gr.Textbox(label="Class to select", value="0,1,2", elem_id="filter_by_class_text", info="e.g. `0,1,2`. This dataset has no class label", visible=False)) | |
| return (gr.Checkbox(label="Filter by class", value=True, elem_id="filter_by_class_checkbox", visible=True), | |
| gr.Textbox(label="Class to select", value="0,1,2", elem_id="filter_by_class_text", info=f"e.g. `0,1,2`. ({num_classes} classes)", visible=True)) | |
| dataset_dropdown.change(fn=change_filter_options, inputs=dataset_dropdown, outputs=[filter_by_class_checkbox, filter_by_class_text]) | |
| def change_filter_by_class(is_filter, dataset_name): | |
| idx = dataset_names.index(dataset_name) | |
| num_classes = dataset_classes[idx] | |
| return gr.Textbox(label="Class to select", value="0,1,2", elem_id="filter_by_class_text", info=f"e.g. `0,1,2`. ({num_classes} classes)", visible=is_filter) | |
| filter_by_class_checkbox.change(fn=change_filter_by_class, inputs=[filter_by_class_checkbox, dataset_dropdown], outputs=filter_by_class_text) | |
| def change_random_seed(is_random): | |
| return gr.Slider(0, 1000, step=1, label="Random seed", value=1, elem_id="random_seed", visible=is_random) | |
| is_random_checkbox.change(fn=change_random_seed, inputs=is_random_checkbox, outputs=random_seed_slider) | |
| def load_dataset_images(is_advanced, dataset_name, num_images=10, | |
| is_filter=True, filter_by_class_text="0,1,2", | |
| is_random=False, seed=1): | |
| progress = gr.Progress() | |
| progress(0, desc="Loading Images") | |
| if is_advanced == "Basic": | |
| gr.Info("Loaded images from Ego-Exo4D") | |
| return default_images | |
| try: | |
| progress(0.5, desc="Downloading Dataset") | |
| dataset = load_dataset(dataset_name, trust_remote_code=True) | |
| key = list(dataset.keys())[0] | |
| dataset = dataset[key] | |
| except Exception as e: | |
| gr.Error(f"Error loading dataset {dataset_name}: {e}") | |
| return None | |
| if num_images > len(dataset): | |
| num_images = len(dataset) | |
| if is_filter: | |
| progress(0.8, desc="Filtering Images") | |
| classes = [int(i) for i in filter_by_class_text.split(",")] | |
| labels = np.array(dataset['label']) | |
| unique_labels = np.unique(labels) | |
| valid_classes = [i for i in classes if i in unique_labels] | |
| invalid_classes = [i for i in classes if i not in unique_labels] | |
| if len(invalid_classes) > 0: | |
| gr.Warning(f"Classes {invalid_classes} not found in the dataset.") | |
| if len(valid_classes) == 0: | |
| gr.Error(f"Classes {classes} not found in the dataset.") | |
| return None | |
| # shuffle each class | |
| chunk_size = num_images // len(valid_classes) | |
| image_idx = [] | |
| for i in valid_classes: | |
| idx = np.where(labels == i)[0] | |
| if is_random: | |
| idx = np.random.RandomState(seed).choice(idx, chunk_size, replace=False) | |
| else: | |
| idx = idx[:chunk_size] | |
| image_idx.extend(idx.tolist()) | |
| if not is_filter: | |
| if is_random: | |
| image_idx = np.random.RandomState(seed).choice(len(dataset), num_images, replace=False).tolist() | |
| else: | |
| image_idx = list(range(num_images)) | |
| images = [dataset[i]['image'] for i in image_idx] | |
| gr.Info(f"Loaded {len(images)} images from {dataset_name}") | |
| return images | |
| load_images_button.click(load_dataset_images, | |
| inputs=[advanced_radio, dataset_dropdown, num_images_slider, | |
| filter_by_class_checkbox, filter_by_class_text, | |
| is_random_checkbox, random_seed_slider], | |
| outputs=[input_gallery]) | |
| return dataset_dropdown, num_images_slider, random_seed_slider, load_images_button | |
| # def random_rotate_rgb_gallery(images): | |
| # if images is None or len(images) == 0: | |
| # gr.Warning("No images selected.") | |
| # return [] | |
| # # read webp images | |
| # images = [Image.open(image[0]).convert("RGB") for image in images] | |
| # images = [np.array(image).astype(np.float32) for image in images] | |
| # images = np.stack(images) | |
| # images = torch.tensor(images) / 255 | |
| # position = np.random.choice([1, 2, 4, 5, 6]) | |
| # images = rotate_rgb_cube(images, position) | |
| # images = to_pil_images(images, resize=False) | |
| # return images | |
| def protect_original_image_in_plot(original_image, rotated_images): | |
| plot_h, plot_w = 332, 1542 | |
| image_h, image_w = original_image.shape[1], original_image.shape[2] | |
| if not (plot_h == image_h and plot_w == image_w): | |
| return rotated_images | |
| protection_w = 190 | |
| rotated_images[:, :, :protection_w] = original_image[:, :, :protection_w] | |
| return rotated_images | |
| def sequence_rotate_rgb_gallery(images): | |
| if images is None or len(images) == 0: | |
| gr.Warning("No images selected.") | |
| return [] | |
| # read webp images | |
| images = [Image.open(image[0]).convert("RGB") for image in images] | |
| images = [np.array(image).astype(np.float32) for image in images] | |
| images = np.stack(images) | |
| images = torch.tensor(images) / 255 | |
| original_images = images.clone() | |
| rotation_matrix = torch.tensor([[0, 1, 0], [0, 0, 1], [1, 0, 0]]).float() | |
| images = images @ rotation_matrix | |
| images = protect_original_image_in_plot(original_images, images) | |
| images = to_pil_images(images, resize=False) | |
| return images | |
| def flip_rgb_gallery(images, axis=0): | |
| if images is None or len(images) == 0: | |
| gr.Warning("No images selected.") | |
| return [] | |
| # read webp images | |
| images = [Image.open(image[0]).convert("RGB") for image in images] | |
| images = [np.array(image).astype(np.float32) for image in images] | |
| images = np.stack(images) | |
| images = torch.tensor(images) / 255 | |
| original_images = images.clone() | |
| images = 1 - images | |
| images = protect_original_image_in_plot(original_images, images) | |
| images = to_pil_images(images, resize=False) | |
| return images | |
| def add_output_images_buttons(output_gallery): | |
| with gr.Row(): | |
| rotate_button = gr.Button("🔄 Rotate", elem_id="rotate_button", variant='secondary') | |
| rotate_button.click(sequence_rotate_rgb_gallery, inputs=[output_gallery], outputs=[output_gallery]) | |
| flip_button = gr.Button("🔃 Flip", elem_id="flip_button", variant='secondary') | |
| flip_button.click(flip_rgb_gallery, inputs=[output_gallery], outputs=[output_gallery]) | |
| return rotate_button, flip_button | |
| def make_output_images_section(): | |
| gr.Markdown('### Output Images') | |
| output_gallery = gr.Gallery(format='png', value=[], label="NCUT Embedding", show_label=False, elem_id="ncut", columns=[3], rows=[1], object_fit="contain", height="auto", show_share_button=True, interactive=False) | |
| add_output_images_buttons(output_gallery) | |
| return output_gallery | |
| def make_parameters_section(is_lisa=False, model_ratio=True): | |
| gr.Markdown("### Parameters <a style='color: #0044CC;' href='https://ncut-pytorch.readthedocs.io/en/latest/how_to_get_better_segmentation/' target='_blank'>Help</a>") | |
| from ncut_pytorch.backbone import list_models, get_demo_model_names | |
| model_names = list_models() | |
| model_names = sorted(model_names) | |
| def get_filtered_model_names(name): | |
| return [m for m in model_names if name.lower() in m.lower()] | |
| def get_default_model_name(name): | |
| lst = get_filtered_model_names(name) | |
| if len(lst) > 1: | |
| return lst[1] | |
| return lst[0] | |
| if is_lisa: | |
| model_dropdown = gr.Dropdown(["LISA(xinlai/LISA-7B-v1)"], label="Backbone", value="LISA(xinlai/LISA-7B-v1)", elem_id="model_name") | |
| layer_slider = gr.Slider(1, 6, step=1, label="LISA decoder: Layer index", value=6, elem_id="layer", visible=False) | |
| layer_names = ["dec_0_input", "dec_0_attn", "dec_0_block", "dec_1_input", "dec_1_attn", "dec_1_block"] | |
| positive_prompt = gr.Textbox(label="Prompt (Positive)", elem_id="prompt", placeholder="e.g. 'a photo of Gibson Les Pual guitar'", visible=False) | |
| negative_prompt = gr.Textbox(label="Prompt (Negative)", elem_id="prompt", placeholder="e.g. 'a photo from egocentric view'", visible=False) | |
| node_type_dropdown = gr.Dropdown(layer_names, label="LISA (SAM) decoder: Layer and Node", value="dec_1_block", elem_id="node_type") | |
| else: | |
| model_radio = gr.Radio(["CLIP", "DiNO", "Diffusion", "ImageNet", "MAE", "SAM"], label="Backbone", value="DiNO", elem_id="model_radio", show_label=True, visible=model_ratio) | |
| model_dropdown = gr.Dropdown(get_filtered_model_names("DiNO"), label="", value="DiNO(dino_vitb8_448)", elem_id="model_name", show_label=False) | |
| model_radio.change(fn=lambda x: gr.update(choices=get_filtered_model_names(x), value=get_default_model_name(x)), inputs=model_radio, outputs=[model_dropdown]) | |
| layer_slider = gr.Slider(1, 12, step=1, label="Backbone: Layer index", value=10, elem_id="layer") | |
| positive_prompt = gr.Textbox(label="Prompt (Positive)", elem_id="prompt", placeholder="e.g. 'a photo of Gibson Les Pual guitar'") | |
| positive_prompt.visible = False | |
| negative_prompt = gr.Textbox(label="Prompt (Negative)", elem_id="prompt", placeholder="e.g. 'a photo from egocentric view'") | |
| negative_prompt.visible = False | |
| node_type_dropdown = gr.Dropdown(["attn: attention output", "mlp: mlp output", "block: sum of residual"], label="Backbone: Layer type", value="block: sum of residual", elem_id="node_type", info="which feature to take from each layer?") | |
| num_eig_slider = gr.Slider(1, 1000, step=1, label="NCUT: Number of eigenvectors", value=100, elem_id="num_eig", info='increase for smaller clusters') | |
| def change_layer_slider(model_name): | |
| # SD2, UNET | |
| if "stable" in model_name.lower() and "diffusion" in model_name.lower(): | |
| from ncut_pytorch.backbone import SD_KEY_DICT | |
| default_layer = 'up_2_resnets_1_block' if 'diffusion-3' not in model_name else 'block_23' | |
| return (gr.Slider(1, 49, step=1, label="Diffusion: Timestep (Noise)", value=5, elem_id="layer", visible=True, info="Noise level, 50 is max noise"), | |
| gr.Dropdown(SD_KEY_DICT[model_name], label="Diffusion: Layer and Node", value=default_layer, elem_id="node_type", info="U-Net (v1, v2) or DiT (v3)")) | |
| if model_name == "LISSL(xinlai/LISSL-7B-v1)": | |
| layer_names = ["dec_0_input", "dec_0_attn", "dec_0_block", "dec_1_input", "dec_1_attn", "dec_1_block"] | |
| default_layer = "dec_1_block" | |
| return (gr.Slider(1, 6, step=1, label="LISA decoder: Layer index", value=6, elem_id="layer", visible=False, info=""), | |
| gr.Dropdown(layer_names, label="LISA decoder: Layer and Node", value=default_layer, elem_id="node_type")) | |
| layer_dict = LAYER_DICT | |
| if model_name in layer_dict: | |
| value = layer_dict[model_name] | |
| return (gr.Slider(1, value, step=1, label="Backbone: Layer index", value=value, elem_id="layer", visible=True, info=""), | |
| gr.Dropdown(["attn: attention output", "mlp: mlp output", "block: sum of residual"], label="Backbone: Layer type", value="block: sum of residual", elem_id="node_type", info="which feature to take from each layer?")) | |
| else: | |
| value = 12 | |
| return (gr.Slider(1, value, step=1, label="Backbone: Layer index", value=value, elem_id="layer", visible=True, info=""), | |
| gr.Dropdown(["attn: attention output", "mlp: mlp output", "block: sum of residual"], label="Backbone: Layer type", value="block: sum of residual", elem_id="node_type", info="which feature to take from each layer?")) | |
| model_dropdown.change(fn=change_layer_slider, inputs=model_dropdown, outputs=[layer_slider, node_type_dropdown]) | |
| def change_prompt_text(model_name): | |
| if model_name in promptable_diffusion_models: | |
| return (gr.Textbox(label="Prompt (Positive)", elem_id="prompt", placeholder="e.g. 'a photo of Gibson Les Pual guitar'", visible=True), | |
| gr.Textbox(label="Prompt (Negative)", elem_id="prompt", placeholder="e.g. 'a photo from egocentric view'", visible=True)) | |
| return (gr.Textbox(label="Prompt (Positive)", elem_id="prompt", placeholder="e.g. 'a photo of Gibson Les Pual guitar'", visible=False), | |
| gr.Textbox(label="Prompt (Negative)", elem_id="prompt", placeholder="e.g. 'a photo from egocentric view'", visible=False)) | |
| model_dropdown.change(fn=change_prompt_text, inputs=model_dropdown, outputs=[positive_prompt, negative_prompt]) | |
| with gr.Accordion("➡️ Click to expand: more parameters", open=False): | |
| gr.Markdown("<a href='https://ncut-pytorch.readthedocs.io/en/latest/how_to_get_better_segmentation/' target='_blank'>Docs: How to Get Better Segmentation</a>") | |
| affinity_focal_gamma_slider = gr.Slider(0.01, 1, step=0.01, label="NCUT: Affinity focal gamma", value=0.5, elem_id="affinity_focal_gamma", info="decrease for shaper segmentation") | |
| num_sample_ncut_slider = gr.Slider(100, 50000, step=100, label="NCUT: num_sample", value=10000, elem_id="num_sample_ncut", info="Nyström approximation") | |
| sampling_method_dropdown = gr.Dropdown(["fps", "random"], label="NCUT: Sampling method", value="fps", elem_id="sampling_method", info="Nyström approximation") | |
| knn_ncut_slider = gr.Slider(1, 100, step=1, label="NCUT: KNN", value=10, elem_id="knn_ncut", info="Nyström approximation") | |
| embedding_method_dropdown = gr.Dropdown(["tsne_3d", "umap_3d", "umap_shpere", "tsne_2d", "umap_2d"], label="Coloring method", value="tsne_3d", elem_id="embedding_method") | |
| embedding_metric_dropdown = gr.Dropdown(["euclidean", "cosine"], label="t-SNE/UMAP metric", value="euclidean", elem_id="embedding_metric") | |
| num_sample_tsne_slider = gr.Slider(100, 10000, step=100, label="t-SNE/UMAP: num_sample", value=300, elem_id="num_sample_tsne", info="Nyström approximation") | |
| knn_tsne_slider = gr.Slider(1, 100, step=1, label="t-SNE/UMAP: KNN", value=10, elem_id="knn_tsne", info="Nyström approximation") | |
| perplexity_slider = gr.Slider(10, 1000, step=10, label="t-SNE: perplexity", value=150, elem_id="perplexity") | |
| n_neighbors_slider = gr.Slider(10, 1000, step=10, label="UMAP: n_neighbors", value=150, elem_id="n_neighbors") | |
| min_dist_slider = gr.Slider(0.1, 1, step=0.1, label="UMAP: min_dist", value=0.1, elem_id="min_dist") | |
| return [model_dropdown, layer_slider, node_type_dropdown, num_eig_slider, | |
| affinity_focal_gamma_slider, num_sample_ncut_slider, knn_ncut_slider, | |
| embedding_method_dropdown, embedding_metric_dropdown, num_sample_tsne_slider, knn_tsne_slider, | |
| perplexity_slider, n_neighbors_slider, min_dist_slider, | |
| sampling_method_dropdown, positive_prompt, negative_prompt] | |
| demo = gr.Blocks( | |
| theme=gr.themes.Base(spacing_size='md', text_size='lg', primary_hue='blue', neutral_hue='slate', secondary_hue='pink'), | |
| # fill_width=False, | |
| # title="ncut-pytorch", | |
| ) | |
| with demo: | |
| with gr.Tab('AlignedCut'): | |
| with gr.Row(): | |
| with gr.Column(scale=5, min_width=200): | |
| input_gallery, submit_button, clear_images_button = make_input_images_section() | |
| dataset_dropdown, num_images_slider, random_seed_slider, load_images_button = make_dataset_images_section() | |
| num_images_slider.value = 30 | |
| logging_text = gr.Textbox("Logging information", label="Logging", elem_id="logging", type="text", placeholder="Logging information", autofocus=False, autoscroll=False) | |
| with gr.Column(scale=5, min_width=200): | |
| output_gallery = make_output_images_section() | |
| cluster_gallery = gr.Gallery(value=[], label="Clusters", show_label=False, elem_id="clusters", columns=[5], rows=[2], object_fit="contain", height="auto", show_share_button=True, preview=True, interactive=False) | |
| [ | |
| model_dropdown, layer_slider, node_type_dropdown, num_eig_slider, | |
| affinity_focal_gamma_slider, num_sample_ncut_slider, knn_ncut_slider, | |
| embedding_method_dropdown, embedding_metric_dropdown, num_sample_tsne_slider, knn_tsne_slider, | |
| perplexity_slider, n_neighbors_slider, min_dist_slider, | |
| sampling_method_dropdown, positive_prompt, negative_prompt | |
| ] = make_parameters_section() | |
| num_eig_slider.value = 30 | |
| clear_images_button.click(lambda x: ([], [], []), outputs=[input_gallery, output_gallery, cluster_gallery]) | |
| false_placeholder = gr.Checkbox(label="False", value=False, elem_id="false_placeholder", visible=False) | |
| no_prompt = gr.Textbox("", label="", elem_id="empty_placeholder", type="text", placeholder="", visible=False) | |
| submit_button.click( | |
| partial(run_fn, n_ret=2), | |
| inputs=[ | |
| input_gallery, model_dropdown, layer_slider, num_eig_slider, node_type_dropdown, | |
| positive_prompt, negative_prompt, | |
| false_placeholder, no_prompt, no_prompt, no_prompt, | |
| affinity_focal_gamma_slider, num_sample_ncut_slider, knn_ncut_slider, | |
| embedding_method_dropdown, embedding_metric_dropdown, num_sample_tsne_slider, knn_tsne_slider, | |
| perplexity_slider, n_neighbors_slider, min_dist_slider, sampling_method_dropdown | |
| ], | |
| outputs=[output_gallery, cluster_gallery, logging_text], | |
| api_name="API_AlignedCut" | |
| ) | |
| with gr.Tab('NCut'): | |
| gr.Markdown('#### NCut (Legacy), not aligned, no Nyström approximation') | |
| gr.Markdown('Each image is solved independently, <em>color is <b>not</b> aligned across images</em>') | |
| gr.Markdown('---') | |
| gr.Markdown('<p style="text-align: center;"><b>NCut vs. AlignedCut</b></p>') | |
| with gr.Row(): | |
| with gr.Column(scale=5, min_width=200): | |
| gr.Markdown('#### Pros') | |
| gr.Markdown('- Easy Solution. Use less eigenvectors.') | |
| gr.Markdown('- Exact solution. No Nyström approximation.') | |
| with gr.Column(scale=5, min_width=200): | |
| gr.Markdown('#### Cons') | |
| gr.Markdown('- Not aligned. Distance is not preserved across images. No pseudo-labeling or correspondence.') | |
| gr.Markdown('- Poor complexity scaling. Unable to handle large number of pixels.') | |
| gr.Markdown('---') | |
| with gr.Row(): | |
| with gr.Column(scale=5, min_width=200): | |
| gr.Markdown(' ') | |
| with gr.Column(scale=5, min_width=200): | |
| gr.Markdown('<em>color is <b>not</b> aligned across images</em> 👇') | |
| with gr.Row(): | |
| with gr.Column(scale=5, min_width=200): | |
| input_gallery, submit_button, clear_images_button = make_input_images_section() | |
| dataset_dropdown, num_images_slider, random_seed_slider, load_images_button = make_dataset_images_section() | |
| with gr.Column(scale=5, min_width=200): | |
| output_gallery = make_output_images_section() | |
| [ | |
| model_dropdown, layer_slider, node_type_dropdown, num_eig_slider, | |
| affinity_focal_gamma_slider, num_sample_ncut_slider, knn_ncut_slider, | |
| embedding_method_dropdown, embedding_metric_dropdown, num_sample_tsne_slider, knn_tsne_slider, | |
| perplexity_slider, n_neighbors_slider, min_dist_slider, | |
| sampling_method_dropdown, positive_prompt, negative_prompt | |
| ] = make_parameters_section() | |
| old_school_ncut_checkbox = gr.Checkbox(label="Old school NCut", value=True, elem_id="old_school_ncut") | |
| invisible_list = [old_school_ncut_checkbox, num_sample_ncut_slider, knn_ncut_slider, | |
| num_sample_tsne_slider, knn_tsne_slider, sampling_method_dropdown] | |
| for item in invisible_list: | |
| item.visible = False | |
| # logging text box | |
| logging_text = gr.Textbox("Logging information", label="Logging", elem_id="logging", type="text", placeholder="Logging information") | |
| clear_images_button.click(lambda x: ([], []), outputs=[input_gallery, output_gallery]) | |
| false_placeholder = gr.Checkbox(label="False", value=False, elem_id="false_placeholder", visible=False) | |
| no_prompt = gr.Textbox("", label="", elem_id="empty_placeholder", type="text", placeholder="", visible=False) | |
| submit_button.click( | |
| run_fn, | |
| inputs=[ | |
| input_gallery, model_dropdown, layer_slider, num_eig_slider, node_type_dropdown, | |
| positive_prompt, negative_prompt, | |
| false_placeholder, no_prompt, no_prompt, no_prompt, | |
| affinity_focal_gamma_slider, num_sample_ncut_slider, knn_ncut_slider, | |
| embedding_method_dropdown, embedding_metric_dropdown, num_sample_tsne_slider, knn_tsne_slider, | |
| perplexity_slider, n_neighbors_slider, min_dist_slider, sampling_method_dropdown, | |
| old_school_ncut_checkbox | |
| ], | |
| outputs=[output_gallery, logging_text], | |
| api_name="API_NCut", | |
| ) | |
| with gr.Tab('Recursive Cut'): | |
| gr.Markdown('NCUT can be applied recursively, the eigenvectors from previous iteration is the input for the next iteration NCUT. ') | |
| gr.Markdown('__Recursive NCUT__ amplifies small object parts, please see [Documentation](https://ncut-pytorch.readthedocs.io/en/latest/how_to_get_better_segmentation/#recursive-ncut)') | |
| gr.Markdown('---') | |
| with gr.Row(): | |
| with gr.Column(scale=5, min_width=200): | |
| gr.Markdown('### Output (Recursion #1)') | |
| l1_gallery = gr.Gallery(format='png', value=[], label="Recursion #1", show_label=False, elem_id="ncut_l1", columns=[3], rows=[5], object_fit="contain", height="auto", show_fullscreen_button=True, interactive=False) | |
| add_output_images_buttons(l1_gallery) | |
| with gr.Column(scale=5, min_width=200): | |
| gr.Markdown('### Output (Recursion #2)') | |
| l2_gallery = gr.Gallery(format='png', value=[], label="Recursion #2", show_label=False, elem_id="ncut_l2", columns=[3], rows=[5], object_fit="contain", height="auto", show_fullscreen_button=True, interactive=False) | |
| add_output_images_buttons(l2_gallery) | |
| with gr.Column(scale=5, min_width=200): | |
| gr.Markdown('### Output (Recursion #3)') | |
| l3_gallery = gr.Gallery(format='png', value=[], label="Recursion #3", show_label=False, elem_id="ncut_l3", columns=[3], rows=[5], object_fit="contain", height="auto", show_fullscreen_button=True, interactive=False) | |
| add_output_images_buttons(l3_gallery) | |
| with gr.Row(): | |
| with gr.Column(scale=5, min_width=200): | |
| input_gallery, submit_button, clear_images_button = make_input_images_section() | |
| dataset_dropdown, num_images_slider, random_seed_slider, load_images_button = make_dataset_images_section(advanced=True) | |
| num_images_slider.value = 100 | |
| clear_images_button.visible = False | |
| logging_text = gr.Textbox("Logging information", label="Logging", elem_id="logging", type="text", placeholder="Logging information") | |
| with gr.Column(scale=5, min_width=200): | |
| with gr.Accordion("➡️ Recursion config", open=True): | |
| l1_num_eig_slider = gr.Slider(1, 1000, step=1, label="Recursion #1: N eigenvectors", value=100, elem_id="l1_num_eig") | |
| l2_num_eig_slider = gr.Slider(1, 1000, step=1, label="Recursion #2: N eigenvectors", value=50, elem_id="l2_num_eig") | |
| l3_num_eig_slider = gr.Slider(1, 1000, step=1, label="Recursion #3: N eigenvectors", value=50, elem_id="l3_num_eig") | |
| metric_dropdown = gr.Dropdown(["euclidean", "cosine"], label="Recursion distance metric", value="cosine", elem_id="recursion_metric") | |
| l1_affinity_focal_gamma_slider = gr.Slider(0.01, 1, step=0.01, label="Recursion #1: Affinity focal gamma", value=0.5, elem_id="recursion_l1_gamma") | |
| l2_affinity_focal_gamma_slider = gr.Slider(0.01, 1, step=0.01, label="Recursion #2: Affinity focal gamma", value=0.5, elem_id="recursion_l2_gamma") | |
| l3_affinity_focal_gamma_slider = gr.Slider(0.01, 1, step=0.01, label="Recursion #3: Affinity focal gamma", value=0.5, elem_id="recursion_l3_gamma") | |
| [ | |
| model_dropdown, layer_slider, node_type_dropdown, num_eig_slider, | |
| affinity_focal_gamma_slider, num_sample_ncut_slider, knn_ncut_slider, | |
| embedding_method_dropdown, embedding_metric_dropdown, num_sample_tsne_slider, knn_tsne_slider, | |
| perplexity_slider, n_neighbors_slider, min_dist_slider, | |
| sampling_method_dropdown, positive_prompt, negative_prompt | |
| ] = make_parameters_section() | |
| num_eig_slider.visible = False | |
| affinity_focal_gamma_slider.visible = False | |
| true_placeholder = gr.Checkbox(label="True placeholder", value=True, elem_id="true_placeholder") | |
| true_placeholder.visible = False | |
| false_placeholder = gr.Checkbox(label="False placeholder", value=False, elem_id="false_placeholder") | |
| false_placeholder.visible = False | |
| number_placeholder = gr.Number(0, label="Number placeholder", elem_id="number_placeholder") | |
| number_placeholder.visible = False | |
| clear_images_button.click(lambda x: ([],), outputs=[input_gallery]) | |
| no_prompt = gr.Textbox("", label="", elem_id="empty_placeholder", type="text", placeholder="", visible=False) | |
| submit_button.click( | |
| partial(run_fn, n_ret=3), | |
| inputs=[ | |
| input_gallery, model_dropdown, layer_slider, l1_num_eig_slider, node_type_dropdown, | |
| positive_prompt, negative_prompt, | |
| false_placeholder, no_prompt, no_prompt, no_prompt, | |
| affinity_focal_gamma_slider, num_sample_ncut_slider, knn_ncut_slider, | |
| embedding_method_dropdown, embedding_metric_dropdown, num_sample_tsne_slider, knn_tsne_slider, | |
| perplexity_slider, n_neighbors_slider, min_dist_slider, sampling_method_dropdown, | |
| false_placeholder, number_placeholder, true_placeholder, | |
| l2_num_eig_slider, l3_num_eig_slider, metric_dropdown, | |
| l1_affinity_focal_gamma_slider, l2_affinity_focal_gamma_slider, l3_affinity_focal_gamma_slider | |
| ], | |
| outputs=[l1_gallery, l2_gallery, l3_gallery, logging_text], | |
| api_name="API_RecursiveCut" | |
| ) | |
| with gr.Tab('Video'): | |
| with gr.Row(): | |
| with gr.Column(scale=5, min_width=200): | |
| video_input_gallery, submit_button, clear_video_button, max_frame_number = make_input_video_section() | |
| with gr.Column(scale=5, min_width=200): | |
| video_output_gallery = gr.Video(value=None, label="NCUT Embedding", elem_id="ncut", height="auto", show_share_button=False) | |
| [ | |
| model_dropdown, layer_slider, node_type_dropdown, num_eig_slider, | |
| affinity_focal_gamma_slider, num_sample_ncut_slider, knn_ncut_slider, | |
| embedding_method_dropdown, embedding_metric_dropdown, num_sample_tsne_slider, knn_tsne_slider, | |
| perplexity_slider, n_neighbors_slider, min_dist_slider, | |
| sampling_method_dropdown, positive_prompt, negative_prompt | |
| ] = make_parameters_section() | |
| num_sample_tsne_slider.value = 1000 | |
| perplexity_slider.value = 500 | |
| n_neighbors_slider.value = 500 | |
| knn_tsne_slider.value = 20 | |
| # logging text box | |
| logging_text = gr.Textbox("Logging information", label="Logging", elem_id="logging", type="text", placeholder="Logging information") | |
| clear_video_button.click(lambda x: (None, None), outputs=[video_input_gallery, video_output_gallery]) | |
| place_holder_false = gr.Checkbox(label="Place holder", value=False, elem_id="place_holder_false") | |
| place_holder_false.visible = False | |
| false_placeholder = gr.Checkbox(label="False", value=False, elem_id="false_placeholder", visible=False) | |
| no_prompt = gr.Textbox("", label="", elem_id="empty_placeholder", type="text", placeholder="", visible=False) | |
| submit_button.click( | |
| run_fn, | |
| inputs=[ | |
| input_gallery, model_dropdown, layer_slider, num_eig_slider, node_type_dropdown, | |
| positive_prompt, negative_prompt, | |
| false_placeholder, no_prompt, no_prompt, no_prompt, | |
| affinity_focal_gamma_slider, num_sample_ncut_slider, knn_ncut_slider, | |
| embedding_method_dropdown, embedding_metric_dropdown, num_sample_tsne_slider, knn_tsne_slider, | |
| perplexity_slider, n_neighbors_slider, min_dist_slider, sampling_method_dropdown, | |
| place_holder_false, max_frame_number | |
| ], | |
| outputs=[video_output_gallery, logging_text], | |
| api_name="API_VideoCut", | |
| ) | |
| with gr.Tab('Text'): | |
| try: | |
| from app_text import make_demo | |
| except ImportError: | |
| print("Debugging") | |
| from draft_gradio_app_text import make_demo | |
| make_demo() | |
| with gr.Tab('Vision-Language'): | |
| gr.Markdown('[LISA](https://arxiv.org/pdf/2308.00692) is a vision-language model. Input a text prompt and image, LISA generate segmentation masks.') | |
| gr.Markdown('In the mask decoder layers, LISA updates the image features w.r.t. the text prompt') | |
| gr.Markdown('This page aims to see how the text prompt affects the image features') | |
| gr.Markdown('---') | |
| gr.Markdown('<p style="text-align: center;">Color is <b>aligned</b> across 3 prompts. NCUT is computed on the concatenated features from 3 prompts.</p>') | |
| with gr.Row(): | |
| with gr.Column(scale=5, min_width=200): | |
| gr.Markdown('### Output (Prompt #1)') | |
| l1_gallery = gr.Gallery(format='png', value=[], label="Prompt #1", show_label=False, elem_id="ncut_p1", columns=[3], rows=[5], object_fit="contain", height="auto", show_fullscreen_button=True, interactive=False) | |
| prompt1 = gr.Textbox(label="Input Prompt #1", elem_id="prompt1", value="where is the person, include the clothes, don't include the guitar and chair", lines=3) | |
| with gr.Column(scale=5, min_width=200): | |
| gr.Markdown('### Output (Prompt #2)') | |
| l2_gallery = gr.Gallery(format='png', value=[], label="Prompt #2", show_label=False, elem_id="ncut_p2", columns=[3], rows=[5], object_fit="contain", height="auto", show_fullscreen_button=True, interactive=False) | |
| prompt2 = gr.Textbox(label="Input Prompt #2", elem_id="prompt2", value="where is the Gibson Les Pual guitar", lines=3) | |
| with gr.Column(scale=5, min_width=200): | |
| gr.Markdown('### Output (Prompt #3)') | |
| l3_gallery = gr.Gallery(format='png', value=[], label="Prompt #3", show_label=False, elem_id="ncut_p3", columns=[3], rows=[5], object_fit="contain", height="auto", show_fullscreen_button=True, interactive=False) | |
| prompt3 = gr.Textbox(label="Input Prompt #3", elem_id="prompt3", value="where is the floor", lines=3) | |
| with gr.Row(): | |
| with gr.Column(scale=5, min_width=200): | |
| input_gallery, submit_button, clear_images_button = make_input_images_section() | |
| dataset_dropdown, num_images_slider, random_seed_slider, load_images_button = make_dataset_images_section(advanced=False) | |
| clear_images_button.click(lambda x: ([], [], [], []), outputs=[input_gallery, l1_gallery, l2_gallery, l3_gallery]) | |
| with gr.Column(scale=5, min_width=200): | |
| [ | |
| model_dropdown, layer_slider, node_type_dropdown, num_eig_slider, | |
| affinity_focal_gamma_slider, num_sample_ncut_slider, knn_ncut_slider, | |
| embedding_method_dropdown, embedding_metric_dropdown, num_sample_tsne_slider, knn_tsne_slider, | |
| perplexity_slider, n_neighbors_slider, min_dist_slider, | |
| sampling_method_dropdown, positive_prompt, negative_prompt | |
| ] = make_parameters_section(is_lisa=True) | |
| logging_text = gr.Textbox("Logging information", label="Logging", elem_id="logging", type="text", placeholder="Logging information") | |
| galleries = [l1_gallery, l2_gallery, l3_gallery] | |
| true_placeholder = gr.Checkbox(label="True placeholder", value=True, elem_id="true_placeholder", visible=False) | |
| submit_button.click( | |
| partial(run_fn, n_ret=len(galleries)), | |
| inputs=[ | |
| input_gallery, model_dropdown, layer_slider, num_eig_slider, node_type_dropdown, | |
| positive_prompt, negative_prompt, | |
| true_placeholder, prompt1, prompt2, prompt3, | |
| affinity_focal_gamma_slider, num_sample_ncut_slider, knn_ncut_slider, | |
| embedding_method_dropdown, embedding_metric_dropdown, num_sample_tsne_slider, knn_tsne_slider, | |
| perplexity_slider, n_neighbors_slider, min_dist_slider, sampling_method_dropdown | |
| ], | |
| outputs=galleries + [logging_text], | |
| ) | |
| with gr.Tab('Model Aligned'): | |
| gr.Markdown('This page reproduce the results from the paper [AlignedCut](https://arxiv.org/abs/2406.18344)') | |
| gr.Markdown('---') | |
| gr.Markdown('**Features are aligned across models and layers.** A linear alignment transform is trained for each model/layer, learning signal comes from 1) fMRI brain activation and 2) segmentation preserving eigen-constraints.') | |
| gr.Markdown('NCUT is computed on the concatenated graph of all models, layers, and images. Color is **aligned** across all models and layers.') | |
| gr.Markdown('') | |
| gr.Markdown("To see a good pattern, you will need to load 100~1000 images. 100 images need 10sec for RTX4090. Running out of HuggingFace GPU Quota? Try [Demo](https://ncut-pytorch.readthedocs.io/en/latest/demo/) hosted at UPenn") | |
| gr.Markdown('---') | |
| with gr.Row(): | |
| with gr.Column(scale=5, min_width=200): | |
| input_gallery, submit_button, clear_images_button = make_input_images_section() | |
| dataset_dropdown, num_images_slider, random_seed_slider, load_images_button = make_dataset_images_section(advanced=True, is_random=True) | |
| num_images_slider.value = 100 | |
| with gr.Column(scale=5, min_width=200): | |
| output_gallery = make_output_images_section() | |
| gr.Markdown('### TIP1: use the `full-screen` button, and use `arrow keys` to navigate') | |
| gr.Markdown('---') | |
| gr.Markdown('Model: CLIP(ViT-B-16/openai), DiNOv2reg(dinov2_vitb14_reg), MAE(vit_base)') | |
| gr.Markdown('Layer type: attention output (attn), without sum of residual') | |
| gr.Markdown('### TIP2: for large image set, please increase the `num_sample` for t-SNE and NCUT') | |
| gr.Markdown('---') | |
| [ | |
| model_dropdown, layer_slider, node_type_dropdown, num_eig_slider, | |
| affinity_focal_gamma_slider, num_sample_ncut_slider, knn_ncut_slider, | |
| embedding_method_dropdown, embedding_metric_dropdown, num_sample_tsne_slider, knn_tsne_slider, | |
| perplexity_slider, n_neighbors_slider, min_dist_slider, | |
| sampling_method_dropdown, positive_prompt, negative_prompt | |
| ] = make_parameters_section(model_ratio=False) | |
| model_dropdown.value = "AlignedThreeModelAttnNodes" | |
| model_dropdown.visible = False | |
| layer_slider.visible = False | |
| node_type_dropdown.visible = False | |
| num_sample_ncut_slider.value = 10000 | |
| num_sample_tsne_slider.value = 1000 | |
| # logging text box | |
| logging_text = gr.Textbox("Logging information", label="Logging", elem_id="logging", type="text", placeholder="Logging information") | |
| clear_images_button.click(lambda x: ([], []), outputs=[input_gallery, output_gallery]) | |
| false_placeholder = gr.Checkbox(label="False", value=False, elem_id="false_placeholder", visible=False) | |
| no_prompt = gr.Textbox("", label="", elem_id="empty_placeholder", type="text", placeholder="", visible=False) | |
| submit_button.click( | |
| run_fn, | |
| inputs=[ | |
| input_gallery, model_dropdown, layer_slider, num_eig_slider, node_type_dropdown, | |
| positive_prompt, negative_prompt, | |
| false_placeholder, no_prompt, no_prompt, no_prompt, | |
| affinity_focal_gamma_slider, num_sample_ncut_slider, knn_ncut_slider, | |
| embedding_method_dropdown, embedding_metric_dropdown, num_sample_tsne_slider, knn_tsne_slider, | |
| perplexity_slider, n_neighbors_slider, min_dist_slider, sampling_method_dropdown | |
| ], | |
| # outputs=galleries + [logging_text], | |
| outputs=[output_gallery, logging_text], | |
| ) | |
| with gr.Tab('Model Aligned (+Rrecursion)'): | |
| gr.Markdown('This page reproduce the results from the paper [AlignedCut](https://arxiv.org/abs/2406.18344)') | |
| gr.Markdown('---') | |
| gr.Markdown('**Features are aligned across models and layers.** A linear alignment transform is trained for each model/layer, learning signal comes from 1) fMRI brain activation and 2) segmentation preserving eigen-constraints.') | |
| gr.Markdown('NCUT is computed on the concatenated graph of all models, layers, and images. Color is **aligned** across all models and layers.') | |
| gr.Markdown('') | |
| gr.Markdown("To see a good pattern, you will need to load 100~1000 images. 100 images need 10sec for RTX4090. Running out of HuggingFace GPU Quota? Try [Demo](https://ncut-pytorch.readthedocs.io/en/latest/demo/) hosted at UPenn") | |
| gr.Markdown('---') | |
| # with gr.Row(): | |
| # with gr.Column(scale=5, min_width=200): | |
| # gr.Markdown('### Output (Recursion #1)') | |
| # l1_gallery = gr.Gallery(format='png', value=[], label="Recursion #1", show_label=False, elem_id="ncut_l1", columns=[3], rows=[5], object_fit="contain", height="auto", show_fullscreen_button=True, interactive=False) | |
| # add_output_images_buttons(l1_gallery) | |
| # with gr.Column(scale=5, min_width=200): | |
| # gr.Markdown('### Output (Recursion #2)') | |
| # l2_gallery = gr.Gallery(format='png', value=[], label="Recursion #2", show_label=False, elem_id="ncut_l2", columns=[3], rows=[5], object_fit="contain", height="auto", show_fullscreen_button=True, interactive=False) | |
| # add_output_images_buttons(l2_gallery) | |
| # with gr.Column(scale=5, min_width=200): | |
| # gr.Markdown('### Output (Recursion #3)') | |
| # l3_gallery = gr.Gallery(format='png', value=[], label="Recursion #3", show_label=False, elem_id="ncut_l3", columns=[3], rows=[5], object_fit="contain", height="auto", show_fullscreen_button=True, interactive=False) | |
| # add_output_images_buttons(l3_gallery) | |
| gr.Markdown('### Output (Recursion #1)') | |
| l1_gallery = gr.Gallery(format='png', value=[], label="Recursion #1", show_label=False, elem_id="ncut_l1", columns=[100], rows=[1], object_fit="contain", height="auto", show_fullscreen_button=True, interactive=False, preview=True) | |
| add_output_images_buttons(l1_gallery) | |
| gr.Markdown('### Output (Recursion #2)') | |
| l2_gallery = gr.Gallery(format='png', value=[], label="Recursion #2", show_label=False, elem_id="ncut_l2", columns=[100], rows=[1], object_fit="contain", height="auto", show_fullscreen_button=True, interactive=False, preview=True) | |
| add_output_images_buttons(l2_gallery) | |
| gr.Markdown('### Output (Recursion #3)') | |
| l3_gallery = gr.Gallery(format='png', value=[], label="Recursion #3", show_label=False, elem_id="ncut_l3", columns=[100], rows=[1], object_fit="contain", height="auto", show_fullscreen_button=True, interactive=False, preview=True) | |
| add_output_images_buttons(l3_gallery) | |
| with gr.Row(): | |
| with gr.Column(scale=5, min_width=200): | |
| input_gallery, submit_button, clear_images_button = make_input_images_section() | |
| dataset_dropdown, num_images_slider, random_seed_slider, load_images_button = make_dataset_images_section(advanced=True, is_random=True) | |
| num_images_slider.value = 100 | |
| with gr.Column(scale=5, min_width=200): | |
| with gr.Accordion("➡️ Recursion config", open=True): | |
| l1_num_eig_slider = gr.Slider(1, 1000, step=1, label="Recursion #1: N eigenvectors", value=100, elem_id="l1_num_eig") | |
| l2_num_eig_slider = gr.Slider(1, 1000, step=1, label="Recursion #2: N eigenvectors", value=50, elem_id="l2_num_eig") | |
| l3_num_eig_slider = gr.Slider(1, 1000, step=1, label="Recursion #3: N eigenvectors", value=50, elem_id="l3_num_eig") | |
| metric_dropdown = gr.Dropdown(["euclidean", "cosine"], label="Recursion distance metric", value="cosine", elem_id="recursion_metric") | |
| l1_affinity_focal_gamma_slider = gr.Slider(0.01, 1, step=0.01, label="Recursion #1: Affinity focal gamma", value=0.5, elem_id="recursion_l1_gamma") | |
| l2_affinity_focal_gamma_slider = gr.Slider(0.01, 1, step=0.01, label="Recursion #2: Affinity focal gamma", value=0.5, elem_id="recursion_l2_gamma") | |
| l3_affinity_focal_gamma_slider = gr.Slider(0.01, 1, step=0.01, label="Recursion #3: Affinity focal gamma", value=0.5, elem_id="recursion_l3_gamma") | |
| gr.Markdown('---') | |
| gr.Markdown('Model: CLIP(ViT-B-16/openai), DiNOv2reg(dinov2_vitb14_reg), MAE(vit_base)') | |
| gr.Markdown('Layer type: attention output (attn), without sum of residual') | |
| [ | |
| model_dropdown, layer_slider, node_type_dropdown, num_eig_slider, | |
| affinity_focal_gamma_slider, num_sample_ncut_slider, knn_ncut_slider, | |
| embedding_method_dropdown, embedding_metric_dropdown, num_sample_tsne_slider, knn_tsne_slider, | |
| perplexity_slider, n_neighbors_slider, min_dist_slider, | |
| sampling_method_dropdown, positive_prompt, negative_prompt | |
| ] = make_parameters_section(model_ratio=False) | |
| num_eig_slider.visible = False | |
| affinity_focal_gamma_slider.visible = False | |
| model_dropdown.value = "AlignedThreeModelAttnNodes" | |
| model_dropdown.visible = False | |
| layer_slider.visible = False | |
| node_type_dropdown.visible = False | |
| num_sample_ncut_slider.value = 10000 | |
| num_sample_tsne_slider.value = 1000 | |
| # logging text box | |
| logging_text = gr.Textbox("Logging information", label="Logging", elem_id="logging", type="text", placeholder="Logging information") | |
| clear_images_button.click(lambda x: ([],), outputs=[input_gallery]) | |
| true_placeholder = gr.Checkbox(label="True placeholder", value=True, elem_id="true_placeholder") | |
| true_placeholder.visible = False | |
| false_placeholder = gr.Checkbox(label="False placeholder", value=False, elem_id="false_placeholder") | |
| false_placeholder.visible = False | |
| number_placeholder = gr.Number(0, label="Number placeholder", elem_id="number_placeholder") | |
| number_placeholder.visible = False | |
| no_prompt = gr.Textbox("", label="", elem_id="empty_placeholder", type="text", placeholder="", visible=False) | |
| submit_button.click( | |
| partial(run_fn, n_ret=3), | |
| inputs=[ | |
| input_gallery, model_dropdown, layer_slider, l1_num_eig_slider, node_type_dropdown, | |
| positive_prompt, negative_prompt, | |
| false_placeholder, no_prompt, no_prompt, no_prompt, | |
| affinity_focal_gamma_slider, num_sample_ncut_slider, knn_ncut_slider, | |
| embedding_method_dropdown, embedding_metric_dropdown, num_sample_tsne_slider, knn_tsne_slider, | |
| perplexity_slider, n_neighbors_slider, min_dist_slider, sampling_method_dropdown, | |
| false_placeholder, number_placeholder, true_placeholder, | |
| l2_num_eig_slider, l3_num_eig_slider, metric_dropdown, | |
| l1_affinity_focal_gamma_slider, l2_affinity_focal_gamma_slider, l3_affinity_focal_gamma_slider | |
| ], | |
| outputs=[l1_gallery, l2_gallery, l3_gallery, logging_text], | |
| ) | |
| with gr.Tab('Compare Models'): | |
| def add_one_model(i_model=1): | |
| with gr.Column(scale=5, min_width=200) as col: | |
| gr.Markdown(f'### Output Images') | |
| output_gallery = gr.Gallery(format='png', value=[], label="NCUT Embedding", show_label=False, elem_id=f"ncut{i_model}", columns=[3], rows=[1], object_fit="contain", height="auto", show_fullscreen_button=True, interactive=False) | |
| submit_button = gr.Button("🔴 RUN", elem_id=f"submit_button{i_model}", variant='primary') | |
| add_output_images_buttons(output_gallery) | |
| [ | |
| model_dropdown, layer_slider, node_type_dropdown, num_eig_slider, | |
| affinity_focal_gamma_slider, num_sample_ncut_slider, knn_ncut_slider, | |
| embedding_method_dropdown, embedding_metric_dropdown, num_sample_tsne_slider, knn_tsne_slider, | |
| perplexity_slider, n_neighbors_slider, min_dist_slider, | |
| sampling_method_dropdown, positive_prompt, negative_prompt | |
| ] = make_parameters_section() | |
| # logging text box | |
| logging_text = gr.Textbox("Logging information", label="Logging", elem_id="logging", type="text", placeholder="Logging information") | |
| false_placeholder = gr.Checkbox(label="False", value=False, elem_id="false_placeholder", visible=False) | |
| no_prompt = gr.Textbox("", label="", elem_id="empty_placeholder", type="text", placeholder="", visible=False) | |
| submit_button.click( | |
| run_fn, | |
| inputs=[ | |
| input_gallery, model_dropdown, layer_slider, num_eig_slider, node_type_dropdown, | |
| positive_prompt, negative_prompt, | |
| false_placeholder, no_prompt, no_prompt, no_prompt, | |
| affinity_focal_gamma_slider, num_sample_ncut_slider, knn_ncut_slider, | |
| embedding_method_dropdown, embedding_metric_dropdown, num_sample_tsne_slider, knn_tsne_slider, | |
| perplexity_slider, n_neighbors_slider, min_dist_slider, sampling_method_dropdown | |
| ], | |
| outputs=[output_gallery, logging_text] | |
| ) | |
| return col | |
| with gr.Row(): | |
| with gr.Column(scale=5, min_width=200): | |
| input_gallery, submit_button, clear_images_button = make_input_images_section() | |
| clear_images_button.click(lambda x: [], outputs=[input_gallery]) | |
| submit_button.visible = False | |
| dataset_dropdown, num_images_slider, random_seed_slider, load_images_button = make_dataset_images_section(advanced=True) | |
| for i in range(2): | |
| add_one_model() | |
| # Create rows and buttons in a loop | |
| rows = [] | |
| buttons = [] | |
| for i in range(4): | |
| row = gr.Row(visible=False) | |
| rows.append(row) | |
| with row: | |
| for j in range(3): | |
| with gr.Column(scale=5, min_width=200): | |
| add_one_model() | |
| button = gr.Button("➕ Add Compare", elem_id=f"add_button_{i}", visible=False if i > 0 else True, scale=3) | |
| buttons.append(button) | |
| if i > 0: | |
| # Reveal the current row and next button | |
| buttons[i - 1].click(fn=lambda x: gr.update(visible=True), outputs=row) | |
| buttons[i - 1].click(fn=lambda x: gr.update(visible=True), outputs=button) | |
| # Hide the current button | |
| buttons[i - 1].click(fn=lambda x: gr.update(visible=False), outputs=buttons[i - 1]) | |
| # Last button only reveals the last row and hides itself | |
| buttons[-1].click(fn=lambda x: gr.update(visible=True), outputs=rows[-1]) | |
| buttons[-1].click(fn=lambda x: gr.update(visible=False), outputs=buttons[-1]) | |
| with gr.Tab('📄About'): | |
| gr.Markdown("**This demo is for the Python package `ncut-pytorch`, please visit the [Documentation](https://ncut-pytorch.readthedocs.io/)**") | |
| gr.Markdown("**All the models and functions used for this demo are in the Python package `ncut-pytorch`**") | |
| gr.Markdown("---") | |
| gr.Markdown("---") | |
| gr.Markdown("**Normalized Cuts**, aka. spectral clustering, is a graphical method to analyze data grouping in the affinity eigenvector space. It has been widely used for unsupervised segmentation in the 2000s.") | |
| gr.Markdown("*Normalized Cuts and Image Segmentation, Jianbo Shi and Jitendra Malik, 2000*") | |
| gr.Markdown("---") | |
| gr.Markdown("**We have improved NCut, with some advanced features:**") | |
| gr.Markdown("- **Nyström** Normalized Cut, is a new approximation algorithm developed for large-scale graph cuts, a large-graph of million nodes can be processed in under 10s (cpu) or 2s (gpu).") | |
| gr.Markdown("- **spectral-tSNE** visualization, a new method to visualize the high-dimensional eigenvector space with 3D RGB cube. Color is aligned across images, color infers distance in representation.") | |
| gr.Markdown("*paper in prep, Yang 2024*") | |
| gr.Markdown("*AlignedCut: Visual Concepts Discovery on Brain-Guided Universal Feature Space, Huzheng Yang, James Gee\*, and Jianbo Shi\*, 2024*") | |
| gr.Markdown("---") | |
| gr.Markdown("---") | |
| gr.Markdown('<p style="text-align: center;">We thank the HuggingFace team for hosting this demo.</p>') | |
| with gr.Row(): | |
| with gr.Column(): | |
| gr.Markdown("##### This demo is for `ncut-pytorch`, [Documentation](https://ncut-pytorch.readthedocs.io/) ") | |
| with gr.Column(): | |
| gr.Markdown("###### Running out of GPU? Try [Demo](https://ncut-pytorch.readthedocs.io/en/latest/demo/) hosted at UPenn") | |
| # for local development | |
| if os.path.exists("/hf_token.txt"): | |
| os.environ["HF_ACCESS_TOKEN"] = open("/hf_token.txt").read().strip() | |
| if DOWNLOAD_ALL_MODELS_DATASETS: | |
| from ncut_pytorch.backbone import download_all_models | |
| # t1 = threading.Thread(target=download_all_models).start() | |
| # t1.join() | |
| # t3 = threading.Thread(target=download_all_datasets).start() | |
| # t3.join() | |
| download_all_models() | |
| download_all_datasets() | |
| from ncut_pytorch.backbone_text import download_all_models | |
| # t2 = threading.Thread(target=download_all_models).start() | |
| # t2.join() | |
| download_all_models() | |
| demo.launch(share=True) | |
| # # %% | |
| # # debug | |
| # # change working directory to "/" | |
| # os.chdir("/") | |
| # images = [(Image.open(image), None) for image in default_images] | |
| # ret = run_fn(images, num_eig=30) | |
| # # %% | |
| # %% | |