Spaces:
Sleeping
Sleeping
| from fastapi import FastAPI, Query | |
| from fastapi.staticfiles import StaticFiles | |
| from typing import List, Dict, Any, Optional | |
| import httpx | |
| import pandas as pd | |
| from rapidfuzz import fuzz | |
| import re | |
| from pydantic import BaseModel | |
| DATA_URL = "https://raw.githubusercontent.com/supermarkt/checkjebon/refs/heads/main/data/supermarkets.json" | |
| app = FastAPI(title="Supermarket Price Analyzer") | |
| class SearchResult(BaseModel): | |
| """Search result with product information and clickable URL""" | |
| shop: str | |
| query: str | |
| match: str | |
| price: Optional[float] | |
| size: Optional[str] | |
| url: Optional[str] = None | |
| score: float | |
| class ProductQuery(BaseModel): | |
| """Query model for product lookups: {supermarket: product_name}""" | |
| pass # Dynamic mapping, will be dict-like | |
| def _build_product_url(shop: str, base_url: str, link: str, product_name: str = None) -> str: | |
| """Build product URL based on shop-specific URL patterns""" | |
| if not base_url or not link: | |
| return None | |
| # dekamarkt: construct URL from product name and ID | |
| if shop.lower() == 'dekamarkt' and product_name: | |
| # Extract ID from link (last segment or numeric part) | |
| product_id = link.split('/')[-1] if '/' in link else link | |
| # Create URL slug from product name | |
| slug = product_name.lower().strip() | |
| slug = re.sub(r'[^a-z0-9\s%-]', '', slug) # remove special chars | |
| slug = re.sub(r'\s+', '%20', slug) # replace spaces with %20 | |
| return f"https://www.dekamarkt.nl/producten/{slug}/{product_id}" | |
| # All other shops: concatenate base_url + link | |
| return base_url + link | |
| def _to_float(v: Any): | |
| if v is None: | |
| return None | |
| if isinstance(v, (int, float)): | |
| return float(v) | |
| s = str(v) | |
| s = s.strip() | |
| s = re.sub(r"[^0-9,.\-]", "", s) | |
| if s == "": | |
| return None | |
| # handle comma decimal | |
| if s.count(',') == 1 and s.count('.') == 0: | |
| s = s.replace(',', '.') | |
| s = s.replace(',', '') | |
| try: | |
| return float(s) | |
| except Exception: | |
| return None | |
| def guess_name_key(d: Dict[str, Any]): | |
| candidates = [ | |
| 'name', 'title', 'product_name', 'displayName', 'label', 'naam', 'product', 'n' | |
| ] | |
| for k in candidates: | |
| if k in d: | |
| return k | |
| # fallback to first string field | |
| for k, v in d.items(): | |
| if isinstance(v, str): | |
| return k | |
| return None | |
| def guess_price_key(d: Dict[str, Any]): | |
| candidates = ['price', 'amount', 'price_value', 'priceValue', 'prijs', 'unit_price'] | |
| for k in candidates: | |
| if k in d: | |
| return k | |
| # common short key used in this dataset | |
| if 'p' in d: | |
| return 'p' | |
| # try to find numeric-like fields | |
| for k, v in d.items(): | |
| if isinstance(v, (int, float)): | |
| return k | |
| if isinstance(v, str) and re.search(r"\d", v): | |
| return k | |
| return None | |
| async def fetch_dataset(url: str = DATA_URL): | |
| async with httpx.AsyncClient(timeout=30) as client: | |
| r = await client.get(url) | |
| r.raise_for_status() | |
| return r.json() | |
| def normalize_dataset(raw: Any): | |
| # return mapping supermarket -> list of items {name, price, raw, id, size, link, base_url} | |
| result = {} | |
| if isinstance(raw, dict): | |
| # if values are lists of items per supermarket | |
| possible = True | |
| for k, v in raw.items(): | |
| if not isinstance(v, list): | |
| possible = False | |
| break | |
| if possible: | |
| for shop, items in raw.items(): | |
| rows = [] | |
| for it in items: | |
| if not isinstance(it, dict): | |
| continue | |
| name_k = guess_name_key(it) | |
| price_k = guess_price_key(it) | |
| name = it.get(name_k) if name_k else None | |
| price = _to_float(it.get(price_k)) if price_k else None | |
| pid = it.get('l') or it.get('id') | |
| size = it.get('s') | |
| link = it.get('l') | |
| if name: | |
| rows.append({'name': name, 'price': price, 'raw': it, 'id': pid, 'size': size, 'link': link, 'base_url': None}) | |
| if rows: | |
| result[shop] = rows | |
| return result | |
| # if it's a list of supermarket objects each with 'n' (id) and 'd' (data/products) | |
| if isinstance(raw, list): | |
| # detect supermarket-list structure | |
| is_shop_list = all(isinstance(it, dict) and ('n' in it and 'd' in it and isinstance(it.get('d'), list)) for it in raw) | |
| if is_shop_list: | |
| for shop_obj in raw: | |
| shop_id = shop_obj.get('n') or shop_obj.get('c') or 'unknown' | |
| base_url = shop_obj.get('u') or '' | |
| rows = [] | |
| for prod in shop_obj.get('d', []): | |
| if not isinstance(prod, dict): | |
| continue | |
| name_k = guess_name_key(prod) | |
| price_k = guess_price_key(prod) | |
| name = prod.get(name_k) if name_k else None | |
| price = _to_float(prod.get(price_k)) if price_k else None | |
| pid = prod.get('l') or prod.get('id') | |
| size = prod.get('s') | |
| link = prod.get('l') | |
| if name: | |
| rows.append({'name': name, 'price': price, 'raw': prod, 'id': pid, 'size': size, 'link': link, 'base_url': base_url}) | |
| if rows: | |
| result[str(shop_id)] = rows | |
| return result | |
| # fallback: treat list as flat products with shop field inside each | |
| for it in raw: | |
| if not isinstance(it, dict): | |
| continue | |
| shop = None | |
| for k in ('shop', 'supermarket', 'store', 'market'): | |
| if k in it: | |
| shop = it.get(k) | |
| break | |
| if shop is None: | |
| shop = it.get('source') or it.get('chain') | |
| name_k = guess_name_key(it) | |
| price_k = guess_price_key(it) | |
| name = it.get(name_k) if name_k else None | |
| price = _to_float(it.get(price_k)) if price_k else None | |
| if name: | |
| result.setdefault(str(shop or 'unknown'), []).append({'name': name, 'price': price, 'raw': it}) | |
| return result | |
| return result | |
| def cluster_products(items, threshold: int = 88): | |
| # items: list of dict {'shop','name','price','id'} | |
| clusters = [] | |
| # 1) cluster by explicit id (fast and exact) | |
| id_map = {} | |
| no_id = [] | |
| for entry in items: | |
| pid = entry.get('id') | |
| if pid: | |
| id_map.setdefault(pid, []).append(entry) | |
| else: | |
| no_id.append(entry) | |
| for pid, members in id_map.items(): | |
| # representative name: shortest non-empty name | |
| rep = min((m['name'] for m in members if m.get('name')), key=len) | |
| clusters.append({'rep': rep, 'members': list(members)}) | |
| # 2) cluster remaining by normalized exact name | |
| def _norm_name(s: str) -> str: | |
| s2 = s.lower() | |
| s2 = re.sub(r"[^a-z0-9]", "", s2) | |
| return s2 | |
| norm_map = {} | |
| for entry in no_id: | |
| n = entry['name'] | |
| nn = _norm_name(n) | |
| if nn in norm_map: | |
| norm_map[nn].append(entry) | |
| else: | |
| norm_map[nn] = [entry] | |
| for nn, members in norm_map.items(): | |
| rep = min((m['name'] for m in members if m.get('name')), key=len) | |
| clusters.append({'rep': rep, 'members': list(members)}) | |
| # 3) optional fuzzy merge of clusters with similar reps (if threshold < 100) | |
| if threshold < 100: | |
| merged = [] | |
| for cl in clusters: | |
| rep = cl['rep'] | |
| placed = False | |
| for m in merged: | |
| score = fuzz.token_set_ratio(rep, m['rep']) | |
| if score >= threshold: | |
| m['members'].extend(cl['members']) | |
| # pick shorter rep | |
| if len(rep) < len(m['rep']): | |
| m['rep'] = rep | |
| placed = True | |
| break | |
| if not placed: | |
| merged.append({'rep': rep, 'members': list(cl['members'])}) | |
| clusters = merged | |
| return clusters | |
| def build_table(clusters, shops: List[str]): | |
| rows = [] | |
| for cl in clusters: | |
| row = {s: '' for s in shops} | |
| row['product'] = cl['rep'] | |
| for m in cl['members']: | |
| shop = str(m.get('shop') or 'unknown') | |
| if shop in shops: | |
| price = m.get('price') | |
| row[shop] = '' if price is None else price | |
| rows.append(row) | |
| df = pd.DataFrame(rows) | |
| if not df.empty: | |
| df = df.set_index('product') | |
| return df | |
| async def search(query: str = Query(None), shops: str = "ah,lidl,dekamarkt", threshold: int = Query(80), min_price: float = Query(0)): | |
| """Search for similar products across supermarkets. | |
| Query params: | |
| - query: product name to search for (e.g., "volkoren brood") | |
| - shops: comma-separated list of supermarket keys to search in (e.g., "ah,lidl,dekamarkt"). If omitted, searches all. | |
| - threshold: minimum match score 0-100 (default: 80) | |
| - min_price: minimum price filter (default: 0). Products below this price are filtered out. | |
| Returns: List of all matching products from selected supermarkets with size and URL | |
| """ | |
| raw = await fetch_dataset() | |
| norm = normalize_dataset(raw) | |
| # Filter supermarkets if provided | |
| if shops: | |
| shop_list = [s.strip() for s in shops.split(',') if s.strip()] | |
| norm = {k: v for k, v in norm.items() if k in shop_list} | |
| results = [] | |
| for shop, items in norm.items(): | |
| for item in items: | |
| score = fuzz.token_set_ratio(query.lower(), item['name'].lower()) | |
| if score >= threshold: | |
| price = item.get('price') | |
| # Skip products below min_price or without price | |
| if price is not None and price < min_price: | |
| continue | |
| url = _build_product_url(shop, item.get('base_url'), item.get('link'), item.get('name')) | |
| results.append({ | |
| 'shop': shop, | |
| 'query': query, | |
| 'match': item['name'], | |
| 'price': price, | |
| 'size': item.get('size'), | |
| 'url': url, | |
| 'score': score | |
| }) | |
| # Sort by price ascending (None last), then by score descending | |
| results.sort(key=lambda x: (x['price'] if x['price'] is not None else float('inf'), -x['score'])) | |
| return results | |
| # Mount static files (must be after API routes) | |
| from pathlib import Path | |
| app.mount("/static", StaticFiles(directory=Path(__file__).parent), name="static") | |
| # Serve index.html for root path | |
| async def root(): | |
| from fastapi.responses import FileResponse | |
| return FileResponse(Path(__file__).parent / "index.html") | |