Klim Mikhailov
Add application file
d6fb204
from fastapi import FastAPI, Query
from fastapi.staticfiles import StaticFiles
from typing import List, Dict, Any, Optional
import httpx
import pandas as pd
from rapidfuzz import fuzz
import re
from pydantic import BaseModel
DATA_URL = "https://raw.githubusercontent.com/supermarkt/checkjebon/refs/heads/main/data/supermarkets.json"
app = FastAPI(title="Supermarket Price Analyzer")
class SearchResult(BaseModel):
"""Search result with product information and clickable URL"""
shop: str
query: str
match: str
price: Optional[float]
size: Optional[str]
url: Optional[str] = None
score: float
class ProductQuery(BaseModel):
"""Query model for product lookups: {supermarket: product_name}"""
pass # Dynamic mapping, will be dict-like
def _build_product_url(shop: str, base_url: str, link: str, product_name: str = None) -> str:
"""Build product URL based on shop-specific URL patterns"""
if not base_url or not link:
return None
# dekamarkt: construct URL from product name and ID
if shop.lower() == 'dekamarkt' and product_name:
# Extract ID from link (last segment or numeric part)
product_id = link.split('/')[-1] if '/' in link else link
# Create URL slug from product name
slug = product_name.lower().strip()
slug = re.sub(r'[^a-z0-9\s%-]', '', slug) # remove special chars
slug = re.sub(r'\s+', '%20', slug) # replace spaces with %20
return f"https://www.dekamarkt.nl/producten/{slug}/{product_id}"
# All other shops: concatenate base_url + link
return base_url + link
def _to_float(v: Any):
if v is None:
return None
if isinstance(v, (int, float)):
return float(v)
s = str(v)
s = s.strip()
s = re.sub(r"[^0-9,.\-]", "", s)
if s == "":
return None
# handle comma decimal
if s.count(',') == 1 and s.count('.') == 0:
s = s.replace(',', '.')
s = s.replace(',', '')
try:
return float(s)
except Exception:
return None
def guess_name_key(d: Dict[str, Any]):
candidates = [
'name', 'title', 'product_name', 'displayName', 'label', 'naam', 'product', 'n'
]
for k in candidates:
if k in d:
return k
# fallback to first string field
for k, v in d.items():
if isinstance(v, str):
return k
return None
def guess_price_key(d: Dict[str, Any]):
candidates = ['price', 'amount', 'price_value', 'priceValue', 'prijs', 'unit_price']
for k in candidates:
if k in d:
return k
# common short key used in this dataset
if 'p' in d:
return 'p'
# try to find numeric-like fields
for k, v in d.items():
if isinstance(v, (int, float)):
return k
if isinstance(v, str) and re.search(r"\d", v):
return k
return None
async def fetch_dataset(url: str = DATA_URL):
async with httpx.AsyncClient(timeout=30) as client:
r = await client.get(url)
r.raise_for_status()
return r.json()
def normalize_dataset(raw: Any):
# return mapping supermarket -> list of items {name, price, raw, id, size, link, base_url}
result = {}
if isinstance(raw, dict):
# if values are lists of items per supermarket
possible = True
for k, v in raw.items():
if not isinstance(v, list):
possible = False
break
if possible:
for shop, items in raw.items():
rows = []
for it in items:
if not isinstance(it, dict):
continue
name_k = guess_name_key(it)
price_k = guess_price_key(it)
name = it.get(name_k) if name_k else None
price = _to_float(it.get(price_k)) if price_k else None
pid = it.get('l') or it.get('id')
size = it.get('s')
link = it.get('l')
if name:
rows.append({'name': name, 'price': price, 'raw': it, 'id': pid, 'size': size, 'link': link, 'base_url': None})
if rows:
result[shop] = rows
return result
# if it's a list of supermarket objects each with 'n' (id) and 'd' (data/products)
if isinstance(raw, list):
# detect supermarket-list structure
is_shop_list = all(isinstance(it, dict) and ('n' in it and 'd' in it and isinstance(it.get('d'), list)) for it in raw)
if is_shop_list:
for shop_obj in raw:
shop_id = shop_obj.get('n') or shop_obj.get('c') or 'unknown'
base_url = shop_obj.get('u') or ''
rows = []
for prod in shop_obj.get('d', []):
if not isinstance(prod, dict):
continue
name_k = guess_name_key(prod)
price_k = guess_price_key(prod)
name = prod.get(name_k) if name_k else None
price = _to_float(prod.get(price_k)) if price_k else None
pid = prod.get('l') or prod.get('id')
size = prod.get('s')
link = prod.get('l')
if name:
rows.append({'name': name, 'price': price, 'raw': prod, 'id': pid, 'size': size, 'link': link, 'base_url': base_url})
if rows:
result[str(shop_id)] = rows
return result
# fallback: treat list as flat products with shop field inside each
for it in raw:
if not isinstance(it, dict):
continue
shop = None
for k in ('shop', 'supermarket', 'store', 'market'):
if k in it:
shop = it.get(k)
break
if shop is None:
shop = it.get('source') or it.get('chain')
name_k = guess_name_key(it)
price_k = guess_price_key(it)
name = it.get(name_k) if name_k else None
price = _to_float(it.get(price_k)) if price_k else None
if name:
result.setdefault(str(shop or 'unknown'), []).append({'name': name, 'price': price, 'raw': it})
return result
return result
def cluster_products(items, threshold: int = 88):
# items: list of dict {'shop','name','price','id'}
clusters = []
# 1) cluster by explicit id (fast and exact)
id_map = {}
no_id = []
for entry in items:
pid = entry.get('id')
if pid:
id_map.setdefault(pid, []).append(entry)
else:
no_id.append(entry)
for pid, members in id_map.items():
# representative name: shortest non-empty name
rep = min((m['name'] for m in members if m.get('name')), key=len)
clusters.append({'rep': rep, 'members': list(members)})
# 2) cluster remaining by normalized exact name
def _norm_name(s: str) -> str:
s2 = s.lower()
s2 = re.sub(r"[^a-z0-9]", "", s2)
return s2
norm_map = {}
for entry in no_id:
n = entry['name']
nn = _norm_name(n)
if nn in norm_map:
norm_map[nn].append(entry)
else:
norm_map[nn] = [entry]
for nn, members in norm_map.items():
rep = min((m['name'] for m in members if m.get('name')), key=len)
clusters.append({'rep': rep, 'members': list(members)})
# 3) optional fuzzy merge of clusters with similar reps (if threshold < 100)
if threshold < 100:
merged = []
for cl in clusters:
rep = cl['rep']
placed = False
for m in merged:
score = fuzz.token_set_ratio(rep, m['rep'])
if score >= threshold:
m['members'].extend(cl['members'])
# pick shorter rep
if len(rep) < len(m['rep']):
m['rep'] = rep
placed = True
break
if not placed:
merged.append({'rep': rep, 'members': list(cl['members'])})
clusters = merged
return clusters
def build_table(clusters, shops: List[str]):
rows = []
for cl in clusters:
row = {s: '' for s in shops}
row['product'] = cl['rep']
for m in cl['members']:
shop = str(m.get('shop') or 'unknown')
if shop in shops:
price = m.get('price')
row[shop] = '' if price is None else price
rows.append(row)
df = pd.DataFrame(rows)
if not df.empty:
df = df.set_index('product')
return df
@app.get('/search', response_model=List[SearchResult])
async def search(query: str = Query(None), shops: str = "ah,lidl,dekamarkt", threshold: int = Query(80), min_price: float = Query(0)):
"""Search for similar products across supermarkets.
Query params:
- query: product name to search for (e.g., "volkoren brood")
- shops: comma-separated list of supermarket keys to search in (e.g., "ah,lidl,dekamarkt"). If omitted, searches all.
- threshold: minimum match score 0-100 (default: 80)
- min_price: minimum price filter (default: 0). Products below this price are filtered out.
Returns: List of all matching products from selected supermarkets with size and URL
"""
raw = await fetch_dataset()
norm = normalize_dataset(raw)
# Filter supermarkets if provided
if shops:
shop_list = [s.strip() for s in shops.split(',') if s.strip()]
norm = {k: v for k, v in norm.items() if k in shop_list}
results = []
for shop, items in norm.items():
for item in items:
score = fuzz.token_set_ratio(query.lower(), item['name'].lower())
if score >= threshold:
price = item.get('price')
# Skip products below min_price or without price
if price is not None and price < min_price:
continue
url = _build_product_url(shop, item.get('base_url'), item.get('link'), item.get('name'))
results.append({
'shop': shop,
'query': query,
'match': item['name'],
'price': price,
'size': item.get('size'),
'url': url,
'score': score
})
# Sort by price ascending (None last), then by score descending
results.sort(key=lambda x: (x['price'] if x['price'] is not None else float('inf'), -x['score']))
return results
# Mount static files (must be after API routes)
from pathlib import Path
app.mount("/static", StaticFiles(directory=Path(__file__).parent), name="static")
# Serve index.html for root path
@app.get("/")
async def root():
from fastapi.responses import FileResponse
return FileResponse(Path(__file__).parent / "index.html")