| import torch |
| import open_clip |
| from PIL import Image |
| import requests |
| import json |
| import gradio as gr |
| import pandas as pd |
| from io import BytesIO |
| import os |
|
|
| |
| with open("amazon.json", "r") as f: |
| AMAZON_TAXONOMY = json.load(f) |
|
|
|
|
| base_model_name = "ViT-B-16" |
| model_base, _, preprocess_base = open_clip.create_model_and_transforms(base_model_name) |
| tokenizer_base = open_clip.get_tokenizer(base_model_name) |
| model_name_B = "hf-hub:Marqo/marqo-ecommerce-embeddings-B" |
| model_B, _, preprocess_B = open_clip.create_model_and_transforms(model_name_B) |
| tokenizer_B = open_clip.get_tokenizer(model_name_B) |
| model_name_L = "hf-hub:Marqo/marqo-ecommerce-embeddings-L" |
| model_L, _, preprocess_L = open_clip.create_model_and_transforms(model_name_L) |
| tokenizer_L = open_clip.get_tokenizer(model_name_L) |
|
|
| models = [base_model_name, model_name_B, model_name_L] |
|
|
| taxonomy_cache = {} |
| for model in models: |
| with open(f'{model.split("/")[-1]}.json', "r") as f: |
| taxonomy_cache[model] = json.load(f) |
|
|
|
|
| def cosine_similarity(a: torch.Tensor, b: torch.Tensor) -> torch.Tensor: |
| numerator = (a * b).sum(dim=-1) |
| denominator = torch.linalg.norm(a, ord=2, dim=-1) * torch.linalg.norm( |
| b, ord=2, dim=-1 |
| ) |
| return 0.5 * (numerator / denominator + 1.0) |
|
|
|
|
| class BeamPath: |
| def __init__(self, path: list, cumulative_score: float, current_layer: dict | list): |
| self.path = path |
| self.cumulative_score = cumulative_score |
| self.current_layer = current_layer |
|
|
| def __repr__(self): |
| return f"BeamPath(path={self.path}, cumulative_score={self.cumulative_score})" |
|
|
|
|
| def _compute_similarities(classes: list, base_embedding: torch.Tensor, cache_key: str): |
| text_features = torch.tensor( |
| [taxonomy_cache[cache_key][class_name] for class_name in classes] |
| ) |
|
|
| similarities = cosine_similarity(base_embedding, text_features) |
| return similarities.cpu().numpy() |
|
|
|
|
| def map_taxonomy( |
| base_image: Image.Image, |
| taxonomy: dict, |
| model, |
| tokenizer, |
| preprocess_val, |
| cache_key, |
| beam_width: int = 3, |
| ) -> tuple[list[tuple[str, float]], float]: |
| image_tensor = preprocess_val(base_image).unsqueeze(0) |
| with torch.no_grad(), torch.cuda.amp.autocast(): |
| base_embedding = model.encode_image(image_tensor, normalize=True) |
|
|
| initial_path = BeamPath(path=[], cumulative_score=0.0, current_layer=taxonomy) |
| beam = [initial_path] |
|
|
| final_paths = [] |
| is_first = True |
| while beam: |
| candidates = [] |
| candidate_entries = [] |
|
|
| for beam_path in beam: |
| layer = beam_path.current_layer |
|
|
| if isinstance(layer, dict): |
| classes = list(layer.keys()) |
| elif isinstance(layer, list): |
| classes = layer |
| if classes == []: |
| final_paths.append(beam_path) |
| continue |
| else: |
| final_paths.append(beam_path) |
| continue |
|
|
| |
|
|
| for class_name in classes: |
| candidate_string = class_name |
| if isinstance(layer, dict): |
| next_layer = layer[class_name] |
| else: |
| next_layer = None |
| candidate_entries.append( |
| (candidate_string, class_name, beam_path, next_layer) |
| ) |
|
|
| if not candidate_entries: |
| break |
|
|
| candidate_strings = [ |
| candidate_string for candidate_string, _, _, _ in candidate_entries |
| ] |
|
|
| similarities = _compute_similarities( |
| candidate_strings, base_embedding, cache_key |
| ) |
|
|
| for (candidate_string, class_name, beam_path, next_layer), similarity in zip( |
| candidate_entries, similarities |
| ): |
| new_path = beam_path.path + [(class_name, float(similarity))] |
| new_cumulative_score = beam_path.cumulative_score + similarity |
| candidate = BeamPath( |
| path=new_path, |
| cumulative_score=new_cumulative_score, |
| current_layer=next_layer, |
| ) |
| candidates.append(candidate) |
|
|
| from collections import defaultdict |
|
|
| by_parents = defaultdict(list) |
|
|
| for candidate in candidates: |
| by_parents[candidate.path[0][0]].append(candidate) |
|
|
| beam = [] |
| for parent in by_parents: |
| children = by_parents[parent] |
| children.sort( |
| key=lambda x: x.cumulative_score / len(x.path) + x.path[-1][1], |
| reverse=True, |
| ) |
| if is_first: |
| beam.extend(children) |
| else: |
| beam.extend(children[:beam_width]) |
|
|
| is_first = False |
|
|
| all_paths = beam + final_paths |
|
|
| if all_paths: |
| all_paths.sort(key=lambda x: x.cumulative_score / len(x.path), reverse=True) |
| best_path = all_paths[0] |
| return best_path.path, float(best_path.cumulative_score) |
| else: |
| return [], 0.0 |
|
|
|
|
| |
| def classify_image( |
| image_input: Image.Image | None, |
| image_url: str | None, |
| model_size: str, |
| beam_width: int, |
| ): |
| if image_input is not None: |
| image = image_input |
| elif image_url: |
| |
| try: |
| response = requests.get(image_url) |
| image = Image.open(BytesIO(response.content)).convert("RGB") |
| except Exception as e: |
| return pd.DataFrame({"Error": [str(e)]}) |
| else: |
| return pd.DataFrame( |
| { |
| "Error": [ |
| "Please provide an image, an image URL, or select an example image" |
| ] |
| } |
| ) |
|
|
| |
| if model_size == "marqo-ecommerce-embeddings-L": |
| key = "hf-hub:Marqo/marqo-ecommerce-embeddings-L" |
| model = model_L |
| preprocess_val = preprocess_L |
| tokenizer = tokenizer_L |
| elif model_size == "marqo-ecommerce-embeddings-B": |
| key = "hf-hub:Marqo/marqo-ecommerce-embeddings-B" |
| model = model_B |
| preprocess_val = preprocess_B |
| tokenizer = tokenizer_B |
| elif model_size == "openai-ViT-B-16": |
| key = "ViT-B-16" |
| model = model_base |
| preprocess_val = preprocess_base |
| tokenizer = tokenizer_base |
| else: |
| return pd.DataFrame({"Error": ["Invalid model size"]}) |
|
|
| path, cumulative_score = map_taxonomy( |
| base_image=image, |
| taxonomy=AMAZON_TAXONOMY, |
| model=model, |
| tokenizer=tokenizer, |
| preprocess_val=preprocess_val, |
| cache_key=key, |
| beam_width=beam_width, |
| ) |
|
|
| output = [] |
| for idx, (category, score) in enumerate(path): |
| level = idx + 1 |
| output.append({"Level": level, "Category": category, "Score": score}) |
|
|
| df = pd.DataFrame(output) |
| return df |
|
|
|
|
| with gr.Blocks() as demo: |
| gr.Markdown("# Image Classification with Taxonomy Mapping") |
| gr.Markdown( |
| "## How to use this app\n\nThis app compares [Marqo's Ecommerce embeddings](https://huggingface.co/collections/Marqo/marqo-ecommerce-embeddings-66f611b9bb9d035a8d164fbb) to OpenAI's ViT-B-16 CLIP model for Ecommerce taxonomy mapping. A beam search is used to find the correct classification in the taxonomy. The original OpenAI CLIP models perform very poorly on Ecommerce data." |
| ) |
| gr.Markdown( |
| "Upload an image, provide an image URL, or select an example image, select the model size, and get the taxonomy mapping. The taxonomy is based on the Amazon product taxonomy." |
| ) |
|
|
| with gr.Row(): |
| with gr.Column(): |
| image_input = gr.Image(type="pil", label="Upload Image", height=300) |
| image_url_input = gr.Textbox( |
| lines=1, placeholder="Image URL", label="Image URL" |
| ) |
| gr.Markdown("### Or select an example image:") |
| |
| example_images_folder = "images" |
| example_image_paths = [ |
| os.path.join(example_images_folder, img) |
| for img in os.listdir(example_images_folder) |
| ] |
| gr.Examples( |
| examples=[[img_path] for img_path in example_image_paths], |
| inputs=image_input, |
| label="Example Images", |
| examples_per_page=100, |
| ) |
| with gr.Column(): |
| model_size_input = gr.Radio( |
| choices=[ |
| "marqo-ecommerce-embeddings-L", |
| "marqo-ecommerce-embeddings-B", |
| "openai-ViT-B-16", |
| ], |
| label="Model", |
| value="marqo-ecommerce-embeddings-L", |
| ) |
| beam_width_input = gr.Number( |
| label="Beam Width", value=5, minimum=1, step=1 |
| ) |
| classify_button = gr.Button("Classify") |
| output_table = gr.Dataframe(headers=["Level", "Category", "Score"]) |
|
|
| classify_button.click( |
| fn=classify_image, |
| inputs=[image_input, image_url_input, model_size_input, beam_width_input], |
| outputs=output_table, |
| ) |
|
|
| demo.launch() |
|
|