dressa-user-study / utils.py
Ryan7mg
Configure Space runtime and use text embeddings for HF push
2c54def
"""
utils.py - Utility functions for Dressa App
Handles:
- Loading precomputed embeddings from .pkl or text files
- Cosine similarity search
- Combining and randomizing results from multiple models
"""
import pickle
import numpy as np
from pathlib import Path
import os
from typing import List, Tuple, Dict, Optional
from sklearn.metrics.pairwise import cosine_similarity
import random
import logging
logger = logging.getLogger(__name__)
# Default paths - use resolve() for consistent absolute paths
APP_DIR = Path(__file__).parent.resolve()
EMBEDDINGS_DIR = Path(
os.getenv("DRESSA_EMBEDDINGS_DIR", str(APP_DIR / "embeddings"))
).resolve()
IMAGES_DIR = Path(
os.getenv("DRESSA_IMAGES_DIR", str(APP_DIR / "dress_images"))
).resolve()
# Model name to filename mapping
EMBEDDING_FILES = {
'openai_clip': 'openai_clip_embeddings.pkl',
'fashion_clip': 'fashion_clip_embeddings.pkl',
'marqo_fashion_clip': 'marqo_fashion_clip_embeddings.pkl',
'marqo_fashion_siglip': 'marqo_fashion_siglip_embeddings.pkl'
}
# Cache for loaded embeddings
_embeddings_cache: Dict[str, dict] = {}
def _text_embedding_paths(model_name: str, embeddings_dir: Path) -> Tuple[Path, Path]:
"""Return (embeddings_csv_path, image_paths_txt_path) for text embedding storage."""
return (
Path(embeddings_dir) / f"{model_name}_embeddings.csv",
Path(embeddings_dir) / f"{model_name}_image_paths.txt",
)
def load_embeddings(
model_name: str,
embeddings_dir: Optional[Path] = None
) -> dict:
"""
Load precomputed embeddings from .pkl or text files.
Args:
model_name: One of 'openai_clip', 'fashion_clip',
'marqo_fashion_clip', 'marqo_fashion_siglip'
embeddings_dir: Directory containing embedding files (default: ./embeddings/)
Returns:
Dict with keys:
- 'embeddings': numpy array of shape (N, 512)
- 'image_paths': list of N image paths
"""
# Return cached if available
if model_name in _embeddings_cache:
return _embeddings_cache[model_name]
if embeddings_dir is None:
embeddings_dir = EMBEDDINGS_DIR
filename = EMBEDDING_FILES.get(model_name)
if not filename:
raise ValueError(f"Unknown model: {model_name}. "
f"Choose from: {list(EMBEDDING_FILES.keys())}")
filepath = Path(embeddings_dir) / filename
if filepath.exists():
logger.info(f"Loading embeddings from {filepath}")
with open(filepath, 'rb') as f:
data = pickle.load(f)
else:
embeddings_csv, image_paths_txt = _text_embedding_paths(model_name, Path(embeddings_dir))
if not embeddings_csv.exists() or not image_paths_txt.exists():
raise FileNotFoundError(
f"Embedding files not found. Checked: {filepath}, "
f"{embeddings_csv}, {image_paths_txt}"
)
logger.info(f"Loading embeddings from text files: {embeddings_csv}, {image_paths_txt}")
embeddings = np.loadtxt(embeddings_csv, delimiter=",", dtype=np.float32)
if embeddings.ndim == 1:
embeddings = embeddings.reshape(1, -1)
image_paths = [
line.strip()
for line in image_paths_txt.read_text(encoding="utf-8").splitlines()
if line.strip()
]
data = {
"embeddings": embeddings,
"image_paths": image_paths,
}
# Validate structure
if 'embeddings' not in data or 'image_paths' not in data:
raise ValueError(f"Invalid embedding file format. "
f"Expected keys: 'embeddings', 'image_paths'. "
f"Got: {data.keys()}")
if len(data['image_paths']) != len(data['embeddings']):
raise ValueError(
f"Embedding row count mismatch for {model_name}: "
f"{len(data['embeddings'])} embeddings vs {len(data['image_paths'])} image paths"
)
# Ensure embeddings are normalized
embeddings = np.asarray(data['embeddings'], dtype=np.float32)
norms = np.linalg.norm(embeddings, axis=1, keepdims=True)
data['embeddings'] = embeddings / norms
# Cache for future use
_embeddings_cache[model_name] = data
logger.info(f"Loaded {len(data['image_paths'])} embeddings for {model_name}")
return data
def search_similar(
query_embedding: np.ndarray,
model_name: str,
top_k: int = 10,
embeddings_data: Optional[dict] = None
) -> List[Tuple[str, float]]:
"""
Find top-K most similar images using cosine similarity.
Args:
query_embedding: Normalized query embedding of shape (512,)
model_name: Which model's embeddings to search
top_k: Number of results to return
embeddings_data: Pre-loaded embeddings (loads if None)
Returns:
List of (image_path, similarity_score) tuples, sorted by similarity
"""
# Load embeddings if not provided
if embeddings_data is None:
embeddings_data = load_embeddings(model_name)
corpus_embeddings = embeddings_data['embeddings']
image_paths = embeddings_data['image_paths']
# Ensure query is 2D for sklearn
query_2d = query_embedding.reshape(1, -1)
# Compute cosine similarities
similarities = cosine_similarity(query_2d, corpus_embeddings)[0]
# Get top-K indices (sorted by similarity, descending)
top_indices = np.argsort(similarities)[::-1][:top_k]
# Build results
results = []
for idx in top_indices:
results.append((image_paths[idx], float(similarities[idx])))
return results
def search_all_models(
query_embeddings: Dict[str, np.ndarray],
top_k: int = 10
) -> Dict[str, List[Tuple[str, float]]]:
"""
Search across all models with their respective query embeddings.
Args:
query_embeddings: Dict mapping model_name -> query_embedding
top_k: Number of results per model
Returns:
Dict mapping model_name -> list of (image_path, score) tuples
"""
results = {}
for model_name, query_emb in query_embeddings.items():
results[model_name] = search_similar(query_emb, model_name, top_k)
return results
def union_and_randomize(
results_dict: Dict[str, List[Tuple[str, float]]],
seed: Optional[int] = None
) -> List[Dict]:
"""
Combine results from all models, remove duplicates, and shuffle.
Args:
results_dict: Dict mapping model_name -> list of (image_path, score)
seed: Random seed for reproducibility (optional)
Returns:
List of dicts with keys:
- 'image_path': path to the image
- 'models': list of model names that returned this image
- 'scores': dict mapping model_name -> similarity score
- 'best_score': highest similarity score across models
"""
# Track which models found each image
image_info = {} # image_path -> {'models': [...], 'scores': {...}}
for model_name, results in results_dict.items():
for image_path, score in results:
if image_path not in image_info:
image_info[image_path] = {
'image_path': image_path,
'models': [],
'scores': {}
}
image_info[image_path]['models'].append(model_name)
image_info[image_path]['scores'][model_name] = score
# Convert to list and compute best_score
combined = list(image_info.values())
for item in combined:
item['best_score'] = max(item['scores'].values())
# Shuffle randomly
if seed is not None:
random.seed(seed)
random.shuffle(combined)
return combined
def union_and_randomize_with_provenance(
results_dict: Dict[str, List[Tuple[str, float]]],
query_image_hash: str
) -> List[Dict]:
"""
Combine results from all models with provenance (rank per model) and deterministic shuffle.
Args:
results_dict: Dict mapping model_name -> list of (image_path, score)
query_image_hash: Hash of query image content (used for deterministic shuffle)
Returns:
List of dicts with keys:
- 'image_path': path to the image
- 'provenance': dict mapping model_name -> rank (1-indexed)
- 'display_position': position in the shuffled display order
"""
image_info = {}
# Log per-model results
logger.info("=" * 60)
logger.info("PER-MODEL RETRIEVAL RESULTS:")
logger.info("=" * 60)
for model_name, results in results_dict.items():
logger.info(f"\n[{model_name}] Top {len(results)} results:")
for rank, (image_path, score) in enumerate(results, start=1):
img_name = Path(image_path).name
logger.info(f" Rank {rank}: {img_name} (score: {score:.4f})")
if image_path not in image_info:
image_info[image_path] = {
'image_path': image_path,
'provenance': {}
}
image_info[image_path]['provenance'][model_name] = rank
combined = list(image_info.values())
# Log duplicates (images returned by multiple models)
logger.info("\n" + "=" * 60)
logger.info("DUPLICATE ANALYSIS (images returned by multiple models):")
logger.info("=" * 60)
duplicates = [item for item in combined if len(item['provenance']) > 1]
if duplicates:
for item in duplicates:
img_name = Path(item['image_path']).name
models_ranks = ", ".join([f"{m}@rank{r}" for m, r in item['provenance'].items()])
logger.info(f" {img_name}: {models_ranks}")
logger.info(f"\nTotal duplicates: {len(duplicates)} / {len(combined)} unique images")
else:
logger.info(" No duplicates found")
# Deterministic shuffle based on query image hash
seed = hash(query_image_hash) & 0xFFFFFFFF
logger.info(f"\nShuffle seed (from image hash): {seed}")
random.Random(seed).shuffle(combined)
for pos, item in enumerate(combined):
item['display_position'] = pos
# Log final order
logger.info("\n" + "=" * 60)
logger.info("FINAL DISPLAY ORDER (after shuffle):")
logger.info("=" * 60)
for item in combined[:10]: # Show first 10
img_name = Path(item['image_path']).name
prov = ", ".join([f"{m}@{r}" for m, r in item['provenance'].items()])
logger.info(f" Pos {item['display_position']}: {img_name} [{prov}]")
if len(combined) > 10:
logger.info(f" ... and {len(combined) - 10} more")
logger.info("=" * 60 + "\n")
return combined
def get_image_full_path(
relative_path: str,
images_dir: Optional[Path] = None
) -> Path:
"""
Convert relative image path to full path.
Args:
relative_path: Relative path from embeddings file
images_dir: Base directory for images
Returns:
Full path to the image
"""
if images_dir is None:
images_dir = IMAGES_DIR
# Handle various path formats
path = Path(relative_path)
if path.is_absolute() and path.exists():
return path.resolve()
# Try as relative to images_dir
full_path = images_dir / path.name
if full_path.exists():
return full_path.resolve()
# Try the original path
full_path = images_dir / relative_path
if full_path.exists():
return full_path.resolve()
# Return best guess (resolved for consistency)
return (images_dir / path.name).resolve()
def append_to_embeddings(
image_path: str,
embedding: np.ndarray,
model_name: str,
embeddings_dir: Optional[Path] = None
):
"""
Append a new image's embedding to the corpus.
Used for dynamic corpus growth after user ratings.
Args:
image_path: Path to the image being added
embedding: Normalized embedding vector (512,)
model_name: Which model's embedding file to update
embeddings_dir: Directory containing embedding files
"""
if embeddings_dir is None:
embeddings_dir = EMBEDDINGS_DIR
filename = EMBEDDING_FILES.get(model_name)
if not filename:
raise ValueError(f"Unknown model: {model_name}")
filepath = Path(embeddings_dir) / filename
embeddings_csv, image_paths_txt = _text_embedding_paths(model_name, Path(embeddings_dir))
if filepath.exists():
with open(filepath, 'rb') as f:
data = pickle.load(f)
storage_format = "pkl"
elif embeddings_csv.exists() and image_paths_txt.exists():
embeddings = np.loadtxt(embeddings_csv, delimiter=",", dtype=np.float32)
if embeddings.ndim == 1:
embeddings = embeddings.reshape(1, -1)
image_paths = [
line.strip()
for line in image_paths_txt.read_text(encoding="utf-8").splitlines()
if line.strip()
]
data = {
"embeddings": embeddings,
"image_paths": image_paths,
}
storage_format = "text"
else:
raise FileNotFoundError(
f"No embedding storage found for {model_name}. "
f"Checked {filepath}, {embeddings_csv}, {image_paths_txt}"
)
# Ensure embedding is normalized
embedding = embedding / np.linalg.norm(embedding)
# Append new embedding
data['embeddings'] = np.vstack([data['embeddings'], embedding.reshape(1, -1)])
data['image_paths'].append(image_path)
# Save back
if storage_format == "pkl":
with open(filepath, 'wb') as f:
pickle.dump(data, f)
else:
np.savetxt(embeddings_csv, data['embeddings'], delimiter=",", fmt="%.8f")
image_paths_txt.write_text("\n".join(data['image_paths']) + "\n", encoding="utf-8")
# Update cache if loaded
if model_name in _embeddings_cache:
_embeddings_cache[model_name] = data
logger.info(f"Added {image_path} to {model_name} corpus. "
f"New size: {len(data['image_paths'])}")
def clear_embeddings_cache():
"""Clear the embeddings cache (useful for testing)."""
global _embeddings_cache
_embeddings_cache = {}
# Convenience function for testing
def test_utils():
"""Test utility functions."""
print("Testing utils.py...")
# Test loading embeddings
for model_name in EMBEDDING_FILES.keys():
try:
data = load_embeddings(model_name)
print(f" {model_name}: {data['embeddings'].shape[0]} images, "
f"dim={data['embeddings'].shape[1]}")
except FileNotFoundError as e:
print(f" {model_name}: File not found (expected for first run)")
print("\nUtils tests complete!")
if __name__ == "__main__":
test_utils()