Spaces:
Sleeping
Sleeping
Update app.py
Browse files
app.py
CHANGED
|
@@ -24,18 +24,18 @@ import gradio as gr
|
|
| 24 |
# -------------------------
|
| 25 |
# Configuration / Globals
|
| 26 |
# -------------------------
|
| 27 |
-
MODEL_NAME = "ViT-B-32"
|
| 28 |
# MODEL_PRETRAIN = "laion2b_s32b_b79k"
|
| 29 |
-
MODEL_PRETRAIN = "openai"
|
| 30 |
DEVICE = "cuda" if torch.cuda.is_available() else "cpu"
|
| 31 |
-
BATCH_SIZE = 64
|
| 32 |
-
TOP_K_DEFAULT = 20
|
| 33 |
-
THUMB_SIZE = (256, 256)
|
| 34 |
-
FONT_PATH = None
|
| 35 |
-
NORMALIZE_SCORE_TO = 100
|
| 36 |
# -------------------------
|
| 37 |
|
| 38 |
-
|
| 39 |
_model_data = {"loaded": False}
|
| 40 |
|
| 41 |
|
|
@@ -52,7 +52,7 @@ def load_model(device: str = DEVICE):
|
|
| 52 |
tokenizer = open_clip.get_tokenizer(MODEL_NAME)
|
| 53 |
model.to(device)
|
| 54 |
model.eval()
|
| 55 |
-
|
| 56 |
dim = model.text_projection.shape[1] if hasattr(model, "text_projection") else model.projection.shape[1]
|
| 57 |
_model_data.update({
|
| 58 |
"loaded": True,
|
|
@@ -107,14 +107,14 @@ def encode_images(images: List[Image.Image], model, preprocess, device: str = DE
|
|
| 107 |
all_feats = []
|
| 108 |
model_device = next(model.parameters()).device
|
| 109 |
for batch in batchify(images, batch_size):
|
| 110 |
-
|
| 111 |
batch_tensors = torch.stack([preprocess(img) for img in batch]).to(device)
|
| 112 |
with torch.no_grad():
|
| 113 |
feats = model.encode_image(batch_tensors)
|
| 114 |
feats = feats / feats.norm(dim=-1, keepdim=True)
|
| 115 |
all_feats.append(feats.cpu())
|
| 116 |
all_feats = torch.cat(all_feats, dim=0)
|
| 117 |
-
return all_feats
|
| 118 |
|
| 119 |
|
| 120 |
def cosine_similarity_matrix(text_feat: torch.Tensor, image_feats: torch.Tensor) -> np.ndarray:
|
|
@@ -122,11 +122,11 @@ def cosine_similarity_matrix(text_feat: torch.Tensor, image_feats: torch.Tensor)
|
|
| 122 |
Given text_feat (1 x dim) and image_feats (N x dim), compute cosine similarities in numpy.
|
| 123 |
Returns ndarray shape (N,)
|
| 124 |
"""
|
| 125 |
-
|
| 126 |
if isinstance(text_feat, torch.Tensor):
|
| 127 |
text_feat = text_feat.cpu()
|
| 128 |
sims = (image_feats @ text_feat.squeeze(0).cpu().T).numpy().squeeze()
|
| 129 |
-
|
| 130 |
sims = np.clip(sims, -1.0, 1.0)
|
| 131 |
return sims
|
| 132 |
|
|
@@ -136,15 +136,15 @@ def normalize_scores_to_range(scores: np.ndarray, low=0.0, high=NORMALIZE_SCORE_
|
|
| 136 |
Maps scores from [-1,1] (cosine) to [low,high] (e.g., 0..100).
|
| 137 |
If all scores equal, map to mid-range to avoid divide-by-zero.
|
| 138 |
"""
|
| 139 |
-
|
| 140 |
min_s, max_s = float(scores.min()), float(scores.max())
|
| 141 |
if math.isclose(min_s, max_s):
|
| 142 |
-
|
| 143 |
mid = (low + high) / 2.0
|
| 144 |
return np.full_like(scores, fill_value=mid, dtype=float)
|
| 145 |
-
|
| 146 |
scores_clipped = np.clip(scores, -1.0, 1.0)
|
| 147 |
-
|
| 148 |
norm01 = (scores_clipped - (-1.0)) / (2.0)
|
| 149 |
mapped = low + norm01 * (high - low)
|
| 150 |
return mapped
|
|
@@ -180,10 +180,9 @@ def make_visual_grid(images: List[Image.Image], scores: List[float], top_k: int
|
|
| 180 |
x = col * w
|
| 181 |
y = row * (h + caption_height)
|
| 182 |
grid_img.paste(img, (x, y))
|
| 183 |
-
|
| 184 |
caption = f"{scores[idx]:.1f}"
|
| 185 |
-
|
| 186 |
-
# For Pillow >=10
|
| 187 |
bbox = draw.textbbox((0, 0), caption, font=font)
|
| 188 |
text_w, text_h = bbox[2] - bbox[0], bbox[3] - bbox[1]
|
| 189 |
|
|
@@ -199,9 +198,7 @@ def make_visual_grid(images: List[Image.Image], scores: List[float], top_k: int
|
|
| 199 |
return grid_img
|
| 200 |
|
| 201 |
|
| 202 |
-
|
| 203 |
-
# Core pipeline
|
| 204 |
-
# -------------------------
|
| 205 |
def rank_images_by_text(query: str, files: List[gr.File], top_k: int = TOP_K_DEFAULT,
|
| 206 |
use_gpu: bool = (DEVICE == "cuda")) -> Tuple[pd.DataFrame, Image.Image]:
|
| 207 |
"""
|
|
@@ -220,20 +217,20 @@ def rank_images_by_text(query: str, files: List[gr.File], top_k: int = TOP_K_DEF
|
|
| 220 |
model, preprocess, tokenizer, dim = load_model(DEVICE if use_gpu else "cpu")
|
| 221 |
device = DEVICE if use_gpu else "cpu"
|
| 222 |
|
| 223 |
-
|
| 224 |
images = []
|
| 225 |
filenames = []
|
| 226 |
for f in files:
|
| 227 |
-
|
| 228 |
try:
|
| 229 |
pil = load_pil_image(f)
|
| 230 |
images.append(pil)
|
| 231 |
-
|
| 232 |
name = getattr(f, "name", None)
|
| 233 |
if name:
|
| 234 |
fname = os.path.basename(name)
|
| 235 |
else:
|
| 236 |
-
|
| 237 |
fname = getattr(f, "filename", "uploaded_image")
|
| 238 |
filenames.append(fname)
|
| 239 |
except Exception as e:
|
|
@@ -242,31 +239,30 @@ def rank_images_by_text(query: str, files: List[gr.File], top_k: int = TOP_K_DEF
|
|
| 242 |
if len(images) == 0:
|
| 243 |
raise ValueError("No valid images could be loaded from uploads.")
|
| 244 |
|
| 245 |
-
|
| 246 |
text_feat = encode_text(query, model, tokenizer, device=device)
|
| 247 |
|
| 248 |
-
|
| 249 |
image_feats = encode_images(images, model, preprocess, device=device, batch_size=BATCH_SIZE)
|
| 250 |
|
| 251 |
-
|
| 252 |
sims = cosine_similarity_matrix(text_feat, image_feats) # range [-1,1]
|
| 253 |
scores_norm = normalize_scores_to_range(sims, low=0.0, high=float(NORMALIZE_SCORE_TO))
|
| 254 |
|
| 255 |
# Rank results
|
| 256 |
-
order = np.argsort(-sims)
|
| 257 |
sims_sorted = sims[order]
|
| 258 |
scores_sorted = scores_norm[order]
|
| 259 |
filenames_sorted = [filenames[i] for i in order]
|
| 260 |
images_sorted = [images[i] for i in order]
|
| 261 |
|
| 262 |
-
# Build DataFrame
|
| 263 |
df = pd.DataFrame({
|
| 264 |
"filename": filenames_sorted,
|
| 265 |
"score_cosine": sims_sorted,
|
| 266 |
f"score_{int(NORMALIZE_SCORE_TO)}": scores_sorted
|
| 267 |
})
|
| 268 |
|
| 269 |
-
|
| 270 |
top_k = min(top_k, len(images_sorted))
|
| 271 |
top_images = images_sorted[:top_k]
|
| 272 |
top_scores = scores_sorted[:top_k].tolist()
|
|
@@ -291,23 +287,20 @@ def gradio_rank_fn(query: str, image_files: List[gr.File], top_k: int = TOP_K_DE
|
|
| 291 |
except Exception as e:
|
| 292 |
return f"Error: {e}", None, None
|
| 293 |
|
| 294 |
-
# Save CSV to buffer so user can download
|
| 295 |
csv_buffer = io.StringIO()
|
| 296 |
df.to_csv(csv_buffer, index=False)
|
| 297 |
csv_bytes = csv_buffer.getvalue().encode("utf-8")
|
| 298 |
csv_buffer.close()
|
| 299 |
|
| 300 |
-
# Return textual summary, grid image, and CSV bytes for download component
|
| 301 |
summary = f"Ranked {len(df)} images for query: '{query}'. Top score: {df['score_cosine'].max():.4f}"
|
| 302 |
return summary, grid_img, ("rankings.csv", csv_bytes, "text/csv")
|
| 303 |
|
| 304 |
|
| 305 |
def build_interface():
|
| 306 |
-
title = "Text → Image Ranking
|
| 307 |
description = """
|
| 308 |
Enter any text query (e.g., "red chinos") and upload multiple product images (100+ supported).
|
| 309 |
The app uses an OpenCLIP model (open-source) to compute embeddings for text and images, then ranks images by cosine similarity.
|
| 310 |
-
You will get a visual grid of the top results annotated with normalized similarity scores (0–100) and a downloadable CSV of all rankings.
|
| 311 |
"""
|
| 312 |
with gr.Blocks(title=title) as demo:
|
| 313 |
gr.Markdown(f"# {title}")
|
|
@@ -326,18 +319,16 @@ def build_interface():
|
|
| 326 |
download = gr.File(label="Download CSV rankings")
|
| 327 |
summary = gr.Textbox(label="Summary", interactive=False)
|
| 328 |
|
| 329 |
-
|
| 330 |
def wrapped_run(q, files, topk, use_gpu_flag):
|
| 331 |
status = "Processing..."
|
| 332 |
-
|
| 333 |
try:
|
| 334 |
summary_text, grid_img, csv_tuple = gradio_rank_fn(q, files, topk, use_gpu_flag)
|
| 335 |
-
|
| 336 |
-
# Save csv bytes to temp file for gr.File returning
|
| 337 |
if csv_tuple:
|
| 338 |
fname, content_bytes, mime = csv_tuple
|
| 339 |
-
|
| 340 |
-
# We'll save to disk in a temp file to make it simple:
|
| 341 |
tmp_path = os.path.join(os.getcwd(), fname)
|
| 342 |
with open(tmp_path, "wb") as f:
|
| 343 |
f.write(content_bytes)
|
|
@@ -351,8 +342,9 @@ def build_interface():
|
|
| 351 |
run_btn.click(fn=wrapped_run, inputs=[query, image_files, top_k, use_gpu], outputs=[summary, gallery, download])
|
| 352 |
gr.Markdown("## Notes")
|
| 353 |
gr.Markdown("- This uses an **open-source** OpenCLIP model. No paid API calls.")
|
| 354 |
-
gr.Markdown("-
|
| 355 |
-
gr.Markdown("
|
|
|
|
| 356 |
return demo
|
| 357 |
|
| 358 |
|
|
|
|
| 24 |
# -------------------------
|
| 25 |
# Configuration / Globals
|
| 26 |
# -------------------------
|
| 27 |
+
MODEL_NAME = "ViT-B-32"
|
| 28 |
# MODEL_PRETRAIN = "laion2b_s32b_b79k"
|
| 29 |
+
MODEL_PRETRAIN = "openai"
|
| 30 |
DEVICE = "cuda" if torch.cuda.is_available() else "cpu"
|
| 31 |
+
BATCH_SIZE = 64
|
| 32 |
+
TOP_K_DEFAULT = 20
|
| 33 |
+
THUMB_SIZE = (256, 256)
|
| 34 |
+
FONT_PATH = None
|
| 35 |
+
NORMALIZE_SCORE_TO = 100
|
| 36 |
# -------------------------
|
| 37 |
|
| 38 |
+
|
| 39 |
_model_data = {"loaded": False}
|
| 40 |
|
| 41 |
|
|
|
|
| 52 |
tokenizer = open_clip.get_tokenizer(MODEL_NAME)
|
| 53 |
model.to(device)
|
| 54 |
model.eval()
|
| 55 |
+
|
| 56 |
dim = model.text_projection.shape[1] if hasattr(model, "text_projection") else model.projection.shape[1]
|
| 57 |
_model_data.update({
|
| 58 |
"loaded": True,
|
|
|
|
| 107 |
all_feats = []
|
| 108 |
model_device = next(model.parameters()).device
|
| 109 |
for batch in batchify(images, batch_size):
|
| 110 |
+
|
| 111 |
batch_tensors = torch.stack([preprocess(img) for img in batch]).to(device)
|
| 112 |
with torch.no_grad():
|
| 113 |
feats = model.encode_image(batch_tensors)
|
| 114 |
feats = feats / feats.norm(dim=-1, keepdim=True)
|
| 115 |
all_feats.append(feats.cpu())
|
| 116 |
all_feats = torch.cat(all_feats, dim=0)
|
| 117 |
+
return all_feats
|
| 118 |
|
| 119 |
|
| 120 |
def cosine_similarity_matrix(text_feat: torch.Tensor, image_feats: torch.Tensor) -> np.ndarray:
|
|
|
|
| 122 |
Given text_feat (1 x dim) and image_feats (N x dim), compute cosine similarities in numpy.
|
| 123 |
Returns ndarray shape (N,)
|
| 124 |
"""
|
| 125 |
+
|
| 126 |
if isinstance(text_feat, torch.Tensor):
|
| 127 |
text_feat = text_feat.cpu()
|
| 128 |
sims = (image_feats @ text_feat.squeeze(0).cpu().T).numpy().squeeze()
|
| 129 |
+
|
| 130 |
sims = np.clip(sims, -1.0, 1.0)
|
| 131 |
return sims
|
| 132 |
|
|
|
|
| 136 |
Maps scores from [-1,1] (cosine) to [low,high] (e.g., 0..100).
|
| 137 |
If all scores equal, map to mid-range to avoid divide-by-zero.
|
| 138 |
"""
|
| 139 |
+
|
| 140 |
min_s, max_s = float(scores.min()), float(scores.max())
|
| 141 |
if math.isclose(min_s, max_s):
|
| 142 |
+
|
| 143 |
mid = (low + high) / 2.0
|
| 144 |
return np.full_like(scores, fill_value=mid, dtype=float)
|
| 145 |
+
|
| 146 |
scores_clipped = np.clip(scores, -1.0, 1.0)
|
| 147 |
+
|
| 148 |
norm01 = (scores_clipped - (-1.0)) / (2.0)
|
| 149 |
mapped = low + norm01 * (high - low)
|
| 150 |
return mapped
|
|
|
|
| 180 |
x = col * w
|
| 181 |
y = row * (h + caption_height)
|
| 182 |
grid_img.paste(img, (x, y))
|
| 183 |
+
|
| 184 |
caption = f"{scores[idx]:.1f}"
|
| 185 |
+
|
|
|
|
| 186 |
bbox = draw.textbbox((0, 0), caption, font=font)
|
| 187 |
text_w, text_h = bbox[2] - bbox[0], bbox[3] - bbox[1]
|
| 188 |
|
|
|
|
| 198 |
return grid_img
|
| 199 |
|
| 200 |
|
| 201 |
+
|
|
|
|
|
|
|
| 202 |
def rank_images_by_text(query: str, files: List[gr.File], top_k: int = TOP_K_DEFAULT,
|
| 203 |
use_gpu: bool = (DEVICE == "cuda")) -> Tuple[pd.DataFrame, Image.Image]:
|
| 204 |
"""
|
|
|
|
| 217 |
model, preprocess, tokenizer, dim = load_model(DEVICE if use_gpu else "cpu")
|
| 218 |
device = DEVICE if use_gpu else "cpu"
|
| 219 |
|
| 220 |
+
|
| 221 |
images = []
|
| 222 |
filenames = []
|
| 223 |
for f in files:
|
| 224 |
+
|
| 225 |
try:
|
| 226 |
pil = load_pil_image(f)
|
| 227 |
images.append(pil)
|
| 228 |
+
|
| 229 |
name = getattr(f, "name", None)
|
| 230 |
if name:
|
| 231 |
fname = os.path.basename(name)
|
| 232 |
else:
|
| 233 |
+
|
| 234 |
fname = getattr(f, "filename", "uploaded_image")
|
| 235 |
filenames.append(fname)
|
| 236 |
except Exception as e:
|
|
|
|
| 239 |
if len(images) == 0:
|
| 240 |
raise ValueError("No valid images could be loaded from uploads.")
|
| 241 |
|
| 242 |
+
|
| 243 |
text_feat = encode_text(query, model, tokenizer, device=device)
|
| 244 |
|
| 245 |
+
|
| 246 |
image_feats = encode_images(images, model, preprocess, device=device, batch_size=BATCH_SIZE)
|
| 247 |
|
| 248 |
+
|
| 249 |
sims = cosine_similarity_matrix(text_feat, image_feats) # range [-1,1]
|
| 250 |
scores_norm = normalize_scores_to_range(sims, low=0.0, high=float(NORMALIZE_SCORE_TO))
|
| 251 |
|
| 252 |
# Rank results
|
| 253 |
+
order = np.argsort(-sims)
|
| 254 |
sims_sorted = sims[order]
|
| 255 |
scores_sorted = scores_norm[order]
|
| 256 |
filenames_sorted = [filenames[i] for i in order]
|
| 257 |
images_sorted = [images[i] for i in order]
|
| 258 |
|
|
|
|
| 259 |
df = pd.DataFrame({
|
| 260 |
"filename": filenames_sorted,
|
| 261 |
"score_cosine": sims_sorted,
|
| 262 |
f"score_{int(NORMALIZE_SCORE_TO)}": scores_sorted
|
| 263 |
})
|
| 264 |
|
| 265 |
+
|
| 266 |
top_k = min(top_k, len(images_sorted))
|
| 267 |
top_images = images_sorted[:top_k]
|
| 268 |
top_scores = scores_sorted[:top_k].tolist()
|
|
|
|
| 287 |
except Exception as e:
|
| 288 |
return f"Error: {e}", None, None
|
| 289 |
|
|
|
|
| 290 |
csv_buffer = io.StringIO()
|
| 291 |
df.to_csv(csv_buffer, index=False)
|
| 292 |
csv_bytes = csv_buffer.getvalue().encode("utf-8")
|
| 293 |
csv_buffer.close()
|
| 294 |
|
|
|
|
| 295 |
summary = f"Ranked {len(df)} images for query: '{query}'. Top score: {df['score_cosine'].max():.4f}"
|
| 296 |
return summary, grid_img, ("rankings.csv", csv_bytes, "text/csv")
|
| 297 |
|
| 298 |
|
| 299 |
def build_interface():
|
| 300 |
+
title = "Text → Image Ranking"
|
| 301 |
description = """
|
| 302 |
Enter any text query (e.g., "red chinos") and upload multiple product images (100+ supported).
|
| 303 |
The app uses an OpenCLIP model (open-source) to compute embeddings for text and images, then ranks images by cosine similarity.
|
|
|
|
| 304 |
"""
|
| 305 |
with gr.Blocks(title=title) as demo:
|
| 306 |
gr.Markdown(f"# {title}")
|
|
|
|
| 319 |
download = gr.File(label="Download CSV rankings")
|
| 320 |
summary = gr.Textbox(label="Summary", interactive=False)
|
| 321 |
|
| 322 |
+
|
| 323 |
def wrapped_run(q, files, topk, use_gpu_flag):
|
| 324 |
status = "Processing..."
|
| 325 |
+
|
| 326 |
try:
|
| 327 |
summary_text, grid_img, csv_tuple = gradio_rank_fn(q, files, topk, use_gpu_flag)
|
| 328 |
+
|
|
|
|
| 329 |
if csv_tuple:
|
| 330 |
fname, content_bytes, mime = csv_tuple
|
| 331 |
+
|
|
|
|
| 332 |
tmp_path = os.path.join(os.getcwd(), fname)
|
| 333 |
with open(tmp_path, "wb") as f:
|
| 334 |
f.write(content_bytes)
|
|
|
|
| 342 |
run_btn.click(fn=wrapped_run, inputs=[query, image_files, top_k, use_gpu], outputs=[summary, gallery, download])
|
| 343 |
gr.Markdown("## Notes")
|
| 344 |
gr.Markdown("- This uses an **open-source** OpenCLIP model. No paid API calls.")
|
| 345 |
+
gr.Markdown("- The app is slow because every time it runs it creates embeddings of the text and the images . The speed of the app can be increased if we use already stored images so we don't have to create embeddings everytime.")
|
| 346 |
+
gr.Markdown("The accuracy of this app can be increased if we used different models of open clip , but for computational efficiency i have utilized one of the efficient models . Also if we finetune this model , the accuracy of the model can be hugely increased, But since this is just a asssignment , i have created a demo prototype only.")
|
| 347 |
+
|
| 348 |
return demo
|
| 349 |
|
| 350 |
|