| import logging |
| from collections import Counter |
| from typing import Dict, List, Optional |
|
|
| import numpy as np |
| import pandas as pd |
|
|
| from backend.app.config import SearchConfig |
| from backend.app.engine.query_parser import ParsedQuery |
| from backend.app.engine.bm25 import SimpleBM25 |
|
|
| logger = logging.getLogger("asos_search") |
|
|
| __all__ = [ |
| "apply_filters", |
| "relax_and_retry", |
| "hybrid_rerank", |
| "generate_suggestions", |
| ] |
|
|
|
|
| def apply_filters(candidates: pd.DataFrame, parsed: ParsedQuery) -> pd.DataFrame: |
| df = candidates |
| if parsed.category_filter and 'category' in df.columns: |
| df = df[df['category'] == parsed.category_filter] |
| if parsed.color_filter and 'color_family' in df.columns: |
| df = df[df['color_family'].str.lower() == parsed.color_filter.lower()] |
| if parsed.gender_filter and 'gender' in df.columns: |
| df = df[(df['gender'] == parsed.gender_filter) | (df['gender'] == 'Unisex')] |
| if parsed.price_min is not None and 'price' in df.columns: |
| df = df[df['price'] >= parsed.price_min] |
| if parsed.price_max is not None and 'price' in df.columns: |
| df = df[df['price'] <= parsed.price_max] |
| if parsed.brand_filter and 'brand' in df.columns: |
| df = df[df['brand'].str.lower() == parsed.brand_filter.lower()] |
|
|
| |
| if parsed.size_filter and 'sizes_available' in df.columns: |
| size_val = parsed.size_filter.lower().strip() |
| df = df[df['sizes_available'].apply( |
| lambda sizes: any( |
| size_val == str(s).lower().strip() |
| for s in (sizes if isinstance(sizes, list) else []) |
| ) if isinstance(sizes, list) else False |
| )] |
|
|
| |
| if parsed.material_filter and 'materials' in df.columns: |
| mat = parsed.material_filter.lower() |
| df = df[df['materials'].apply( |
| lambda mats: ( |
| any(mat in str(m).lower() for m in mats) |
| if isinstance(mats, list) and len(mats) > 0 |
| else mat in str(mats).lower() if mats else False |
| ) |
| )] |
|
|
| |
| if parsed.exclusions: |
| for excl in parsed.exclusions: |
| excl_lower = excl.lower() |
| |
| mask = pd.Series(True, index=df.index) |
| if 'name' in df.columns: |
| mask &= ~df['name'].str.lower().str.contains(excl_lower, na=False) |
| if 'color_clean' in df.columns: |
| mask &= ~df['color_clean'].str.lower().str.contains(excl_lower, na=False) |
| if 'color_family' in df.columns: |
| mask &= ~(df['color_family'].str.lower() == excl_lower) |
| if 'style_tags' in df.columns: |
| mask &= ~df['style_tags'].apply( |
| lambda tags: any(excl_lower in str(t).lower() for t in tags) |
| if isinstance(tags, list) else False |
| ) |
| if 'materials' in df.columns: |
| mask &= ~df['materials'].apply( |
| lambda mats: any(excl_lower in str(m).lower() |
| for m in (mats if isinstance(mats, list) else [])) |
| ) |
| df = df[mask] |
|
|
| if parsed.in_stock_only and 'any_in_stock' in df.columns: |
| df = df[df['any_in_stock'] == True] |
| return df |
|
|
|
|
| def relax_and_retry(candidates: pd.DataFrame, parsed: ParsedQuery, |
| min_results: int = 10) -> pd.DataFrame: |
| """ |
| Smart progressive filter relaxation. |
| |
| Key improvement: instead of dropping price_max entirely (which shows |
| £200 items for "under £10"), we progressively expand the budget in |
| steps (×1.5, ×2, ×3, ×5) so the user sees the cheapest viable options. |
| """ |
| relaxed = ParsedQuery( |
| raw_query=parsed.raw_query, vibe_text=parsed.vibe_text, |
| category_filter=parsed.category_filter, color_filter=parsed.color_filter, |
| gender_filter=parsed.gender_filter, price_min=parsed.price_min, |
| price_max=parsed.price_max, brand_filter=parsed.brand_filter, |
| in_stock_only=parsed.in_stock_only, style_tags=parsed.style_tags, |
| material_filter=parsed.material_filter, size_filter=parsed.size_filter, |
| exclusions=parsed.exclusions, |
| ) |
|
|
| best_so_far = pd.DataFrame() |
|
|
| |
| |
| for attr in ('size_filter', 'material_filter'): |
| if getattr(relaxed, attr) is not None: |
| saved = getattr(relaxed, attr) |
| setattr(relaxed, attr, None) |
| result = apply_filters(candidates, relaxed) |
| if len(result) >= min_results: |
| logger.info(f"Relaxed filter '{attr}' -> {len(result)} results") |
| return result |
| if len(result) > len(best_so_far): |
| best_so_far = result |
| else: |
| setattr(relaxed, attr, saved) |
|
|
| |
| if relaxed.exclusions: |
| relaxed.exclusions = [] |
| result = apply_filters(candidates, relaxed) |
| if len(result) > len(best_so_far): |
| best_so_far = result |
| if len(result) >= min_results: |
| logger.info(f"Relaxed exclusions -> {len(result)} results") |
| return result |
|
|
| |
| non_price_relaxations = [ |
| ('color_filter', None), ('gender_filter', None), ('in_stock_only', False), |
| ] |
| for attr, val in non_price_relaxations: |
| if getattr(relaxed, attr) is not None and getattr(relaxed, attr) != val: |
| setattr(relaxed, attr, val) |
| result = apply_filters(candidates, relaxed) |
| if len(result) > len(best_so_far): |
| best_so_far = result |
| if len(result) >= min_results: |
| logger.info(f"Relaxed filter '{attr}' -> {len(result)} results") |
| return result |
|
|
| |
| if parsed.price_max is not None: |
| original_max = parsed.price_max |
| expansion_factors = [1.5, 2.0, 3.0, 5.0, 10.0] |
| for factor in expansion_factors: |
| relaxed.price_max = original_max * factor |
| result = apply_filters(candidates, relaxed) |
| if len(result) > len(best_so_far): |
| best_so_far = result |
| if len(result) >= min_results: |
| logger.info( |
| f"Expanded price_max: £{original_max:.0f} -> " |
| f"£{relaxed.price_max:.0f} ({factor}×) -> {len(result)} results" |
| ) |
| return result |
|
|
| |
| relaxed.price_max = None |
| result = apply_filters(candidates, relaxed) |
| if len(result) > len(best_so_far): |
| best_so_far = result |
| if len(result) >= min_results: |
| logger.info(f"Dropped price_max entirely -> {len(result)} results") |
| return result |
|
|
| if parsed.price_min is not None: |
| relaxed.price_min = None |
| result = apply_filters(candidates, relaxed) |
| if len(result) > len(best_so_far): |
| best_so_far = result |
| if len(result) >= min_results: |
| logger.info(f"Dropped price_min -> {len(result)} results") |
| return result |
|
|
| |
| if relaxed.category_filter is not None: |
| relaxed.category_filter = None |
| result = apply_filters(candidates, relaxed) |
| if len(result) > len(best_so_far): |
| best_so_far = result |
| if len(result) >= min_results: |
| logger.info(f"Relaxed category_filter -> {len(result)} results") |
| return result |
|
|
| |
| if len(best_so_far) > 0: |
| logger.info(f"Returning best available: {len(best_so_far)} results (wanted {min_results})") |
| return best_so_far |
|
|
| logger.warning("All filters relaxed. Returning unfiltered results.") |
| return candidates |
|
|
|
|
| def hybrid_rerank(candidates: pd.DataFrame, parsed: ParsedQuery, |
| config: SearchConfig, bm25: Optional[SimpleBM25] = None) -> pd.DataFrame: |
| scored = candidates.copy() |
| if len(scored) == 0: |
| return scored |
|
|
| |
| rrf_vals = scored['rrf_score'].values |
| rrf_min, rrf_max = rrf_vals.min(), rrf_vals.max() |
| scored['rrf_norm'] = ( |
| (rrf_vals - rrf_min) / (rrf_max - rrf_min) if rrf_max > rrf_min else 1.0 |
| ) |
|
|
| |
| query_tags = set(parsed.style_tags) |
| if query_tags and 'style_tags' in scored.columns: |
| scored['tag_score'] = scored['style_tags'].apply( |
| lambda tags: ( |
| len(set(tags) & query_tags) / len(query_tags) |
| if isinstance(tags, list) and query_tags else 0.0 |
| ) |
| ) |
| else: |
| scored['tag_score'] = 0.0 |
|
|
| |
| if bm25 is not None and '_orig_idx' in scored.columns: |
| bm25_raw = bm25.score_candidates(parsed.raw_query, scored['_orig_idx'].tolist()) |
| bm25_max = bm25_raw.max() |
| scored['bm25_norm'] = bm25_raw / bm25_max if bm25_max > 0 else 0.0 |
| else: |
| scored['bm25_norm'] = 0.0 |
|
|
| |
| if 'any_in_stock' in scored.columns: |
| scored['stock_bonus'] = scored['any_in_stock'].astype(float) |
| else: |
| scored['stock_bonus'] = 0.5 |
|
|
| |
| mat_bonus = np.zeros(len(scored), dtype=np.float32) |
| if parsed.material_filter and 'materials' in scored.columns: |
| mat_q = parsed.material_filter.lower() |
| mat_bonus = scored['materials'].apply( |
| lambda mats: 1.0 if isinstance(mats, list) and any( |
| mat_q in str(m).lower() for m in mats |
| ) else 0.0 |
| ).values.astype(np.float32) |
| scored['material_bonus'] = mat_bonus |
|
|
| |
| |
| |
| price_proximity = np.zeros(len(scored), dtype=np.float32) |
| target_price = parsed.price_max or parsed.price_min |
| if target_price is not None and 'price' in scored.columns: |
| prices = scored['price'].values.astype(np.float32) |
| |
| |
| sigma = max(target_price * 0.5, 10.0) |
| price_proximity = np.exp(-((prices - target_price) ** 2) / (2 * sigma ** 2)) |
|
|
| scored['price_proximity'] = price_proximity |
|
|
| |
| has_price_intent = target_price is not None |
| has_material_intent = parsed.material_filter is not None |
|
|
| if has_price_intent: |
| scored['hybrid_score'] = ( |
| 0.40 * scored['rrf_norm'] + |
| 0.18 * scored['tag_score'] + |
| 0.10 * scored['bm25_norm'] + |
| 0.05 * scored['stock_bonus'] + |
| 0.20 * scored['price_proximity'] + |
| 0.07 * scored['material_bonus'] |
| ) |
| elif has_material_intent: |
| scored['hybrid_score'] = ( |
| 0.45 * scored['rrf_norm'] + |
| 0.20 * scored['tag_score'] + |
| 0.12 * scored['bm25_norm'] + |
| 0.05 * scored['stock_bonus'] + |
| 0.18 * scored['material_bonus'] |
| ) |
| else: |
| scored['hybrid_score'] = ( |
| config.alpha_clip * scored['rrf_norm'] + |
| config.beta_tags * scored['tag_score'] + |
| config.gamma_text * scored['bm25_norm'] + |
| config.delta_freshness * scored['stock_bonus'] |
| ) |
| return scored.sort_values('hybrid_score', ascending=False) |
|
|
|
|
| def generate_suggestions(results: pd.DataFrame, parsed: ParsedQuery, |
| max_suggestions: int = 5) -> List[str]: |
| """ |
| Generate natural, diverse related search suggestions. |
| |
| v3.3: produces clean, human-readable queries instead of awkward |
| concatenations. Covers color refinement, price ranges, category |
| alternatives, style variations, and brand-specific searches. |
| """ |
| if len(results) == 0: |
| return [] |
|
|
| suggestions = [] |
|
|
| |
| cat = parsed.category_filter |
| cat_names = { |
| 'Dresses': 'dresses', 'Tops': 'tops', 'Coats & Jackets': 'jackets', |
| 'Knitwear': 'knitwear', 'Jeans': 'jeans', 'Trousers': 'trousers', |
| 'Shoes': 'shoes', 'Bags': 'bags', 'Accessories': 'accessories', |
| 'Skirts': 'skirts', 'Shorts': 'shorts', 'Swimwear': 'swimwear', |
| 'Hoodies & Sweatshirts': 'hoodies', 'Suits & Tailoring': 'suits', |
| 'Jumpsuits & Playsuits': 'jumpsuits', |
| } |
| base_term = cat_names.get(cat, parsed.vibe_text.strip()[:30]) |
|
|
| |
| if 'color_family' in results.columns and not parsed.color_filter: |
| top_colors = (results['color_family'] |
| .value_counts() |
| .head(4).index.tolist()) |
| for color in top_colors[:2]: |
| if color and color not in ('other', 'multi'): |
| suggestions.append(f"{color} {base_term}") |
|
|
| |
| if parsed.color_filter and 'color_family' in results.columns: |
| alt_colors = ['black', 'white', 'navy', 'beige'] |
| for ac in alt_colors: |
| if ac != parsed.color_filter: |
| suggestions.append(f"{ac} {base_term}") |
| break |
|
|
| |
| if parsed.price_max is None and parsed.price_min is None and 'price' in results.columns: |
| p25 = results['price'].quantile(0.25) |
| if p25 > 5: |
| suggestions.append(f"{base_term} under \u00a3{int(p25)}") |
|
|
| |
| if 'style_tags' in results.columns: |
| tag_counts = Counter() |
| for tags in results['style_tags']: |
| if isinstance(tags, list): |
| for t in tags: |
| if t not in parsed.style_tags and t not in parsed.vibe_text: |
| tag_counts[t] += 1 |
| if tag_counts: |
| best_tag = tag_counts.most_common(1)[0][0] |
| suggestions.append(f"{best_tag} {base_term}") |
|
|
| |
| if 'brand' in results.columns: |
| top_brand = (results['brand'] |
| .value_counts() |
| .head(1).index.tolist()) |
| if top_brand and top_brand[0] and top_brand[0] != 'Unknown': |
| brand = top_brand[0] |
| if brand.lower() not in parsed.vibe_text.lower(): |
| suggestions.append(f"{brand} {base_term}") |
|
|
| |
| if cat: |
| related = { |
| 'Dresses': 'jumpsuits', 'Tops': 'blouses', |
| 'Jeans': 'trousers', 'Trousers': 'jeans', |
| 'Coats & Jackets': 'blazers', 'Knitwear': 'cardigans', |
| 'Skirts': 'dresses', 'Shorts': 'skirts', |
| } |
| alt = related.get(cat) |
| if alt: |
| prefix = f"{parsed.color_filter} " if parsed.color_filter else "" |
| suggestions.append(f"{prefix}{alt}".strip()) |
|
|
| |
| seen = set() |
| unique = [] |
| for s in suggestions: |
| s_clean = s.strip().lower() |
| if s_clean not in seen and s_clean != parsed.raw_query.lower(): |
| seen.add(s_clean) |
| unique.append(s.strip()) |
| return unique[:max_suggestions] |
|
|