Spaces:
Runtime error
Runtime error
| import streamlit as st | |
| import open_clip | |
| import torch | |
| import requests | |
| from PIL import Image | |
| from io import BytesIO | |
| import time | |
| import json | |
| import numpy as np | |
| from ultralytics import YOLO | |
| import cv2 | |
| import chromadb | |
| # Load CLIP model and tokenizer | |
| def load_clip_model(): | |
| model, preprocess_train, preprocess_val = open_clip.create_model_and_transforms('hf-hub:Marqo/marqo-fashionSigLIP') | |
| tokenizer = open_clip.get_tokenizer('hf-hub:Marqo/marqo-fashionSigLIP') | |
| device = torch.device("cuda" if torch.cuda.is_available() else "cpu") | |
| model.to(device) | |
| return model, preprocess_val, tokenizer, device | |
| clip_model, preprocess_val, tokenizer, device = load_clip_model() | |
| # Load YOLOv8 model | |
| def load_yolo_model(): | |
| return YOLO("./best.pt") | |
| yolo_model = load_yolo_model() | |
| # Load ChromaDB | |
| def load_chromadb(): | |
| client = chromadb.PersistentClient(path="./chromadb_new") | |
| collection = client.get_collection(name="clothes_items_musinsa_sumin") | |
| return collection | |
| collection = load_chromadb() | |
| # Helper functions | |
| def load_image_from_url(url, max_retries=3): | |
| for attempt in range(max_retries): | |
| try: | |
| response = requests.get(url, timeout=10) | |
| response.raise_for_status() | |
| img = Image.open(BytesIO(response.content)).convert('RGB') | |
| return img | |
| except (requests.RequestException, Image.UnidentifiedImageError) as e: | |
| if attempt < max_retries - 1: | |
| time.sleep(1) | |
| else: | |
| return None | |
| # get_image_embedding ํจ์ ์์ | |
| def get_image_embedding(image): | |
| image_tensor = preprocess_val(image).unsqueeze(0).to(device) | |
| with torch.no_grad(): | |
| image_features = clip_model.encode_image(image_tensor) | |
| image_features /= image_features.norm(dim=-1, keepdim=True) | |
| return image_features.cpu().numpy().squeeze().tolist() # numpy ๋ฐฐ์ด์ ํ์ด์ฌ ๋ฆฌ์คํธ๋ก ๋ณํ | |
| def get_text_embedding(text): | |
| text_tokens = tokenizer([text]).to(device) | |
| with torch.no_grad(): | |
| text_features = clip_model.encode_text(text_tokens) | |
| text_features /= text_features.norm(dim=-1, keepdim=True) | |
| return text_features.cpu().numpy() | |
| def get_average_embedding(main_image_url, additional_image_urls): | |
| embeddings = [] | |
| # ๋ฉ์ธ ์ด๋ฏธ์ง ์๋ฒ ๋ฉ | |
| main_image = load_image_from_url(main_image_url) | |
| if main_image: | |
| embeddings.append(get_image_embedding(main_image)) | |
| # ์ถ๊ฐ ์ด๋ฏธ์ง ์๋ฒ ๋ฉ | |
| for url in additional_image_urls: | |
| img = load_image_from_url(url) | |
| if img: | |
| embeddings.append(get_image_embedding(img)) | |
| if embeddings: | |
| avg_embedding = np.mean(embeddings, axis=0) | |
| return avg_embedding if isinstance(avg_embedding, np.ndarray) else avg_embedding | |
| else: | |
| return None | |
| def update_collection_embeddings(): | |
| all_ids = collection.get(include=['metadatas'])['ids'] | |
| all_metadata = collection.get(include=['metadatas'])['metadatas'] | |
| batch_size = 100 # ํ ๋ฒ์ ์ฒ๋ฆฌํ ์์ดํ ์ | |
| for i in range(0, len(all_ids), batch_size): | |
| batch_ids = all_ids[i:i+batch_size] | |
| batch_metadata = all_metadata[i:i+batch_size] | |
| batch_embeddings = [] | |
| valid_ids = [] | |
| for id, metadata in zip(batch_ids, batch_metadata): | |
| main_image_url = metadata['image_url'] | |
| additional_image_urls = metadata.get('additional_images', []) | |
| try: | |
| avg_embedding = get_average_embedding(main_image_url, additional_image_urls) | |
| if avg_embedding is not None: | |
| batch_embeddings.append(avg_embedding) | |
| valid_ids.append(id) | |
| else: | |
| st.warning(f"Failed to generate embedding for item {id}") | |
| except Exception as e: | |
| st.error(f"Error processing item {id}: {str(e)}") | |
| if valid_ids: | |
| try: | |
| collection.update( | |
| ids=valid_ids, | |
| embeddings=batch_embeddings | |
| ) | |
| st.success(f"Updated embeddings for {len(valid_ids)} items") | |
| except Exception as e: | |
| st.error(f"Error updating embeddings: {str(e)}") | |
| st.error(f"First embedding type: {type(batch_embeddings[0])}") | |
| st.error(f"First embedding length: {len(batch_embeddings[0])}") | |
| st.error(f"First embedding: {batch_embeddings[0][:10]}...") # ์ฒ์ 10๊ฐ ์์๋ง ์ถ๋ ฅ | |
| # ์งํ ์ํฉ ํ์ | |
| st.progress((i + batch_size) / len(all_ids)) | |
| def find_similar_images(query_embedding, collection, top_k=5): | |
| results = collection.query( | |
| query_embeddings=[query_embedding.squeeze().tolist()], | |
| n_results=top_k, | |
| include=["metadatas", "distances"] | |
| ) | |
| similar_items = [] | |
| for metadata, distance in zip(results['metadatas'][0], results['distances'][0]): | |
| similar_items.append({ | |
| 'info': metadata, | |
| 'similarity': 1 - distance # ๊ฑฐ๋ฆฌ๋ฅผ ์ ์ฌ๋๋ก ๋ณํ | |
| }) | |
| return similar_items | |
| def update_collection_embeddings(): | |
| all_ids = collection.get(include=['metadatas'])['ids'] | |
| all_metadata = collection.get(include=['metadatas'])['metadatas'] | |
| for id, metadata in zip(all_ids, all_metadata): | |
| main_image_url = metadata['image_url'] | |
| additional_image_urls = metadata.get('additional_images', []) | |
| avg_embedding = get_average_embedding(main_image_url, additional_image_urls) | |
| if avg_embedding is not None: | |
| collection.update( | |
| ids=[id], | |
| embeddings=[avg_embedding.tolist()] | |
| ) | |
| def detect_clothing(image): | |
| results = yolo_model(image) | |
| detections = results[0].boxes.data.cpu().numpy() | |
| categories = [] | |
| for detection in detections: | |
| x1, y1, x2, y2, conf, cls = detection | |
| category = yolo_model.names[int(cls)] | |
| if category in ['sunglass','hat','jacket','shirt','pants','shorts','skirt','dress','bag','shoe']: | |
| categories.append({ | |
| 'category': category, | |
| 'bbox': [int(x1), int(y1), int(x2), int(y2)], | |
| 'confidence': conf | |
| }) | |
| return categories | |
| def crop_image(image, bbox): | |
| return image.crop((bbox[0], bbox[1], bbox[2], bbox[3])) | |
| # ์ธ์ ์ํ ์ด๊ธฐํ | |
| if 'step' not in st.session_state: | |
| st.session_state.step = 'input' | |
| if 'query_image_url' not in st.session_state: | |
| st.session_state.query_image_url = '' | |
| if 'detections' not in st.session_state: | |
| st.session_state.detections = [] | |
| if 'selected_category' not in st.session_state: | |
| st.session_state.selected_category = None | |
| # Streamlit app | |
| st.title("Advanced Fashion Search App") | |
| # ์ปฌ๋ ์ ์๋ฒ ๋ฉ ์ ๋ฐ์ดํธ (์ฒซ ์คํ ์ ํ ๋ฒ๋ง) | |
| if 'embeddings_updated' not in st.session_state: | |
| with st.spinner("Updating collection embeddings... This may take a while."): | |
| update_collection_embeddings() | |
| st.session_state.embeddings_updated = True | |
| # ๋จ๊ณ๋ณ ์ฒ๋ฆฌ | |
| if st.session_state.step == 'input': | |
| st.session_state.query_image_url = st.text_input("Enter image URL:", st.session_state.query_image_url) | |
| if st.button("Detect Clothing"): | |
| if st.session_state.query_image_url: | |
| query_image = load_image_from_url(st.session_state.query_image_url) | |
| if query_image is not None: | |
| st.session_state.query_image = query_image | |
| st.session_state.detections = detect_clothing(query_image) | |
| if st.session_state.detections: | |
| st.session_state.step = 'select_category' | |
| else: | |
| st.warning("No clothing items detected in the image.") | |
| else: | |
| st.error("Failed to load the image. Please try another URL.") | |
| else: | |
| st.warning("Please enter an image URL.") | |
| elif st.session_state.step == 'select_category': | |
| st.image(st.session_state.query_image, caption="Query Image", use_column_width=True) | |
| st.subheader("Detected Clothing Items:") | |
| for detection in st.session_state.detections: | |
| col1, col2 = st.columns([1, 3]) | |
| with col1: | |
| st.write(f"{detection['category']} (Confidence: {detection['confidence']:.2f})") | |
| with col2: | |
| cropped_image = crop_image(st.session_state.query_image, detection['bbox']) | |
| st.image(cropped_image, caption=detection['category'], use_column_width=True) | |
| options = [f"{d['category']} (Confidence: {d['confidence']:.2f})" for d in st.session_state.detections] | |
| selected_option = st.selectbox("Select a category to search:", options) | |
| if st.button("Search Similar Items"): | |
| st.session_state.selected_category = selected_option | |
| st.session_state.step = 'show_results' | |
| elif st.session_state.step == 'show_results': | |
| st.image(st.session_state.query_image, caption="Query Image", use_column_width=True) | |
| selected_detection = next(d for d in st.session_state.detections | |
| if f"{d['category']} (Confidence: {d['confidence']:.2f})" == st.session_state.selected_category) | |
| cropped_image = crop_image(st.session_state.query_image, selected_detection['bbox']) | |
| st.image(cropped_image, caption="Cropped Image", use_column_width=True) | |
| query_embedding = get_image_embedding(cropped_image) | |
| similar_images = find_similar_images(query_embedding, collection) | |
| st.subheader("Similar Items:") | |
| for img in similar_images: | |
| col1, col2 = st.columns(2) | |
| with col1: | |
| st.image(img['info']['image_url'], use_column_width=True) | |
| with col2: | |
| st.write(f"Name: {img['info']['name']}") | |
| st.write(f"Brand: {img['info']['brand']}") | |
| category = img['info'].get('category') | |
| if category: | |
| st.write(f"Category: {category}") | |
| st.write(f"Price: {img['info']['price']}") | |
| st.write(f"Discount: {img['info']['discount']}%") | |
| st.write(f"Similarity: {img['similarity']:.2f}") | |
| # ์ถ๊ฐ ์ด๋ฏธ์ง ํ์ | |
| additional_images = img['info'].get('additional_images', []) | |
| if additional_images: | |
| st.write("Additional Images:") | |
| for add_img_url in additional_images[:3]: # ์ต๋ 3๊ฐ๊น์ง๋ง ํ์ | |
| st.image(add_img_url, width=100) | |
| if st.button("Start New Search"): | |
| st.session_state.step = 'input' | |
| st.session_state.query_image_url = '' | |
| st.session_state.detections = [] | |
| st.session_state.selected_category = None | |
| # Text search | |
| st.sidebar.title("Text Search") | |
| query_text = st.sidebar.text_input("Enter search text:") | |
| if st.sidebar.button("Search by Text"): | |
| if query_text: | |
| text_embedding = get_text_embedding(query_text) | |
| similar_images = find_similar_images(text_embedding, collection) | |
| st.sidebar.subheader("Similar Items:") | |
| for img in similar_images: | |
| st.sidebar.image(img['info']['image_url'], use_column_width=True) | |
| st.sidebar.write(f"Name: {img['info']['name']}") | |
| st.sidebar.write(f"Brand: {img['info']['brand']}") | |
| category = img['info'].get('category') | |
| if category: | |
| st.sidebar.write(f"Category: {category}") | |
| st.sidebar.write(f"Price: {img['info']['price']}") | |
| st.sidebar.write(f"Discount: {img['info']['discount']}%") | |
| st.sidebar.write(f"Similarity: {img['similarity']:.2f}") | |
| else: | |
| st.sidebar.warning("Please enter a search text.") |