ParshvPatel's picture
feat: HuggingFace Spaces deployment
d992912
import logging
from collections import Counter
from typing import Dict, List, Optional
import numpy as np
import pandas as pd
from backend.app.config import SearchConfig
from backend.app.engine.query_parser import ParsedQuery
from backend.app.engine.bm25 import SimpleBM25
logger = logging.getLogger("asos_search")
__all__ = [
"apply_filters",
"relax_and_retry",
"hybrid_rerank",
"generate_suggestions",
]
def apply_filters(candidates: pd.DataFrame, parsed: ParsedQuery) -> pd.DataFrame:
df = candidates
if parsed.category_filter and 'category' in df.columns:
df = df[df['category'] == parsed.category_filter]
if parsed.color_filter and 'color_family' in df.columns:
df = df[df['color_family'].str.lower() == parsed.color_filter.lower()]
if parsed.gender_filter and 'gender' in df.columns:
df = df[(df['gender'] == parsed.gender_filter) | (df['gender'] == 'Unisex')]
if parsed.price_min is not None and 'price' in df.columns:
df = df[df['price'] >= parsed.price_min]
if parsed.price_max is not None and 'price' in df.columns:
df = df[df['price'] <= parsed.price_max]
if parsed.brand_filter and 'brand' in df.columns:
df = df[df['brand'].str.lower() == parsed.brand_filter.lower()]
# ── Size filtering (v3.3) ──
if parsed.size_filter and 'sizes_available' in df.columns:
size_val = parsed.size_filter.lower().strip()
df = df[df['sizes_available'].apply(
lambda sizes: any(
size_val == str(s).lower().strip()
for s in (sizes if isinstance(sizes, list) else [])
) if isinstance(sizes, list) else False
)]
# ── Material filtering (v3.3) ──
if parsed.material_filter and 'materials' in df.columns:
mat = parsed.material_filter.lower()
df = df[df['materials'].apply(
lambda mats: (
any(mat in str(m).lower() for m in mats)
if isinstance(mats, list) and len(mats) > 0
else mat in str(mats).lower() if mats else False
)
)]
# ── Exclusion filtering (v3.3) ──
if parsed.exclusions:
for excl in parsed.exclusions:
excl_lower = excl.lower()
# Check against name, color, category, style_tags, materials
mask = pd.Series(True, index=df.index)
if 'name' in df.columns:
mask &= ~df['name'].str.lower().str.contains(excl_lower, na=False)
if 'color_clean' in df.columns:
mask &= ~df['color_clean'].str.lower().str.contains(excl_lower, na=False)
if 'color_family' in df.columns:
mask &= ~(df['color_family'].str.lower() == excl_lower)
if 'style_tags' in df.columns:
mask &= ~df['style_tags'].apply(
lambda tags: any(excl_lower in str(t).lower() for t in tags)
if isinstance(tags, list) else False
)
if 'materials' in df.columns:
mask &= ~df['materials'].apply(
lambda mats: any(excl_lower in str(m).lower()
for m in (mats if isinstance(mats, list) else []))
)
df = df[mask]
if parsed.in_stock_only and 'any_in_stock' in df.columns:
df = df[df['any_in_stock'] == True]
return df
def relax_and_retry(candidates: pd.DataFrame, parsed: ParsedQuery,
min_results: int = 10) -> pd.DataFrame:
"""
Smart progressive filter relaxation.
Key improvement: instead of dropping price_max entirely (which shows
£200 items for "under £10"), we progressively expand the budget in
steps (×1.5, ×2, ×3, ×5) so the user sees the cheapest viable options.
"""
relaxed = ParsedQuery(
raw_query=parsed.raw_query, vibe_text=parsed.vibe_text,
category_filter=parsed.category_filter, color_filter=parsed.color_filter,
gender_filter=parsed.gender_filter, price_min=parsed.price_min,
price_max=parsed.price_max, brand_filter=parsed.brand_filter,
in_stock_only=parsed.in_stock_only, style_tags=parsed.style_tags,
material_filter=parsed.material_filter, size_filter=parsed.size_filter,
exclusions=parsed.exclusions,
)
best_so_far = pd.DataFrame()
# Phase 0: Try relaxing size and material first (least important constraints)
# Try each independently before committing
for attr in ('size_filter', 'material_filter'):
if getattr(relaxed, attr) is not None:
saved = getattr(relaxed, attr)
setattr(relaxed, attr, None)
result = apply_filters(candidates, relaxed)
if len(result) >= min_results:
logger.info(f"Relaxed filter '{attr}' -> {len(result)} results")
return result
if len(result) > len(best_so_far):
best_so_far = result
else:
setattr(relaxed, attr, saved) # restore if it didn't help
# Phase 0b: Relax exclusions if they're too restrictive
if relaxed.exclusions:
relaxed.exclusions = []
result = apply_filters(candidates, relaxed)
if len(result) > len(best_so_far):
best_so_far = result
if len(result) >= min_results:
logger.info(f"Relaxed exclusions -> {len(result)} results")
return result
# Phase 1: Try relaxing non-price filters one by one
non_price_relaxations = [
('color_filter', None), ('gender_filter', None), ('in_stock_only', False),
]
for attr, val in non_price_relaxations:
if getattr(relaxed, attr) is not None and getattr(relaxed, attr) != val:
setattr(relaxed, attr, val)
result = apply_filters(candidates, relaxed)
if len(result) > len(best_so_far):
best_so_far = result
if len(result) >= min_results:
logger.info(f"Relaxed filter '{attr}' -> {len(result)} results")
return result
# Phase 2: Progressive price expansion (keep category if possible)
if parsed.price_max is not None:
original_max = parsed.price_max
expansion_factors = [1.5, 2.0, 3.0, 5.0, 10.0]
for factor in expansion_factors:
relaxed.price_max = original_max * factor
result = apply_filters(candidates, relaxed)
if len(result) > len(best_so_far):
best_so_far = result
if len(result) >= min_results:
logger.info(
f"Expanded price_max: £{original_max:.0f} -> "
f"£{relaxed.price_max:.0f} ({factor}×) -> {len(result)} results"
)
return result
# If even 10× doesn't work, drop the price filter
relaxed.price_max = None
result = apply_filters(candidates, relaxed)
if len(result) > len(best_so_far):
best_so_far = result
if len(result) >= min_results:
logger.info(f"Dropped price_max entirely -> {len(result)} results")
return result
if parsed.price_min is not None:
relaxed.price_min = None
result = apply_filters(candidates, relaxed)
if len(result) > len(best_so_far):
best_so_far = result
if len(result) >= min_results:
logger.info(f"Dropped price_min -> {len(result)} results")
return result
# Phase 3: Drop category as last resort
if relaxed.category_filter is not None:
relaxed.category_filter = None
result = apply_filters(candidates, relaxed)
if len(result) > len(best_so_far):
best_so_far = result
if len(result) >= min_results:
logger.info(f"Relaxed category_filter -> {len(result)} results")
return result
# Return best partial result even if < min_results
if len(best_so_far) > 0:
logger.info(f"Returning best available: {len(best_so_far)} results (wanted {min_results})")
return best_so_far
logger.warning("All filters relaxed. Returning unfiltered results.")
return candidates
def hybrid_rerank(candidates: pd.DataFrame, parsed: ParsedQuery,
config: SearchConfig, bm25: Optional[SimpleBM25] = None) -> pd.DataFrame:
scored = candidates.copy()
if len(scored) == 0:
return scored
# Normalize RRF
rrf_vals = scored['rrf_score'].values
rrf_min, rrf_max = rrf_vals.min(), rrf_vals.max()
scored['rrf_norm'] = (
(rrf_vals - rrf_min) / (rrf_max - rrf_min) if rrf_max > rrf_min else 1.0
)
# Tag overlap
query_tags = set(parsed.style_tags)
if query_tags and 'style_tags' in scored.columns:
scored['tag_score'] = scored['style_tags'].apply(
lambda tags: (
len(set(tags) & query_tags) / len(query_tags)
if isinstance(tags, list) and query_tags else 0.0
)
)
else:
scored['tag_score'] = 0.0
# BM25
if bm25 is not None and '_orig_idx' in scored.columns:
bm25_raw = bm25.score_candidates(parsed.raw_query, scored['_orig_idx'].tolist())
bm25_max = bm25_raw.max()
scored['bm25_norm'] = bm25_raw / bm25_max if bm25_max > 0 else 0.0
else:
scored['bm25_norm'] = 0.0
# Stock bonus
if 'any_in_stock' in scored.columns:
scored['stock_bonus'] = scored['any_in_stock'].astype(float)
else:
scored['stock_bonus'] = 0.5
# ── Material match bonus (v3.3) ──
mat_bonus = np.zeros(len(scored), dtype=np.float32)
if parsed.material_filter and 'materials' in scored.columns:
mat_q = parsed.material_filter.lower()
mat_bonus = scored['materials'].apply(
lambda mats: 1.0 if isinstance(mats, list) and any(
mat_q in str(m).lower() for m in mats
) else 0.0
).values.astype(np.float32)
scored['material_bonus'] = mat_bonus
# ── Price proximity bonus ──
# When user specifies a budget, items closer to that price rank higher.
# This prevents £200 items outranking £20 items when user said "under £10".
price_proximity = np.zeros(len(scored), dtype=np.float32)
target_price = parsed.price_max or parsed.price_min
if target_price is not None and 'price' in scored.columns:
prices = scored['price'].values.astype(np.float32)
# Exponential decay: items at target_price get 1.0, items far away get ~0
# sigma controls how fast the penalty drops off
sigma = max(target_price * 0.5, 10.0) # half the budget or £10 minimum
price_proximity = np.exp(-((prices - target_price) ** 2) / (2 * sigma ** 2))
scored['price_proximity'] = price_proximity
# Weighted combination — price proximity gets 0.10 weight when active
has_price_intent = target_price is not None
has_material_intent = parsed.material_filter is not None
if has_price_intent:
scored['hybrid_score'] = (
0.40 * scored['rrf_norm'] +
0.18 * scored['tag_score'] +
0.10 * scored['bm25_norm'] +
0.05 * scored['stock_bonus'] +
0.20 * scored['price_proximity'] +
0.07 * scored['material_bonus']
)
elif has_material_intent:
scored['hybrid_score'] = (
0.45 * scored['rrf_norm'] +
0.20 * scored['tag_score'] +
0.12 * scored['bm25_norm'] +
0.05 * scored['stock_bonus'] +
0.18 * scored['material_bonus']
)
else:
scored['hybrid_score'] = (
config.alpha_clip * scored['rrf_norm'] +
config.beta_tags * scored['tag_score'] +
config.gamma_text * scored['bm25_norm'] +
config.delta_freshness * scored['stock_bonus']
)
return scored.sort_values('hybrid_score', ascending=False)
def generate_suggestions(results: pd.DataFrame, parsed: ParsedQuery,
max_suggestions: int = 5) -> List[str]:
"""
Generate natural, diverse related search suggestions.
v3.3: produces clean, human-readable queries instead of awkward
concatenations. Covers color refinement, price ranges, category
alternatives, style variations, and brand-specific searches.
"""
if len(results) == 0:
return []
suggestions = []
# Extract core item type from the query for clean suggestion construction
cat = parsed.category_filter
cat_names = {
'Dresses': 'dresses', 'Tops': 'tops', 'Coats & Jackets': 'jackets',
'Knitwear': 'knitwear', 'Jeans': 'jeans', 'Trousers': 'trousers',
'Shoes': 'shoes', 'Bags': 'bags', 'Accessories': 'accessories',
'Skirts': 'skirts', 'Shorts': 'shorts', 'Swimwear': 'swimwear',
'Hoodies & Sweatshirts': 'hoodies', 'Suits & Tailoring': 'suits',
'Jumpsuits & Playsuits': 'jumpsuits',
}
base_term = cat_names.get(cat, parsed.vibe_text.strip()[:30])
# 1. Color refinements — suggest specific colors the user hasn't tried
if 'color_family' in results.columns and not parsed.color_filter:
top_colors = (results['color_family']
.value_counts()
.head(4).index.tolist())
for color in top_colors[:2]:
if color and color not in ('other', 'multi'):
suggestions.append(f"{color} {base_term}")
# 2. Alternate color if user specified one
if parsed.color_filter and 'color_family' in results.columns:
alt_colors = ['black', 'white', 'navy', 'beige']
for ac in alt_colors:
if ac != parsed.color_filter:
suggestions.append(f"{ac} {base_term}")
break
# 3. Price-constrained suggestion
if parsed.price_max is None and parsed.price_min is None and 'price' in results.columns:
p25 = results['price'].quantile(0.25)
if p25 > 5:
suggestions.append(f"{base_term} under \u00a3{int(p25)}")
# 4. Style variation — suggest a popular style tag from results
if 'style_tags' in results.columns:
tag_counts = Counter()
for tags in results['style_tags']:
if isinstance(tags, list):
for t in tags:
if t not in parsed.style_tags and t not in parsed.vibe_text:
tag_counts[t] += 1
if tag_counts:
best_tag = tag_counts.most_common(1)[0][0]
suggestions.append(f"{best_tag} {base_term}")
# 5. Brand-specific suggestion (clean format)
if 'brand' in results.columns:
top_brand = (results['brand']
.value_counts()
.head(1).index.tolist())
if top_brand and top_brand[0] and top_brand[0] != 'Unknown':
brand = top_brand[0]
if brand.lower() not in parsed.vibe_text.lower():
suggestions.append(f"{brand} {base_term}")
# 6. Category alternatives — suggest related categories
if cat:
related = {
'Dresses': 'jumpsuits', 'Tops': 'blouses',
'Jeans': 'trousers', 'Trousers': 'jeans',
'Coats & Jackets': 'blazers', 'Knitwear': 'cardigans',
'Skirts': 'dresses', 'Shorts': 'skirts',
}
alt = related.get(cat)
if alt:
prefix = f"{parsed.color_filter} " if parsed.color_filter else ""
suggestions.append(f"{prefix}{alt}".strip())
# Deduplicate and limit
seen = set()
unique = []
for s in suggestions:
s_clean = s.strip().lower()
if s_clean not in seen and s_clean != parsed.raw_query.lower():
seen.add(s_clean)
unique.append(s.strip())
return unique[:max_suggestions]