Spaces:
Sleeping
Sleeping
| # app_new_images.py β RideSearch with real trims + auto photos + Admin tools | |
| # ------------------------------------------------------------------------- | |
| # Features: | |
| # - Trim mapping via trims_map.json (for correct brand/model-specific display) | |
| # - Cross-brand recommendations (unique model families by default) | |
| # - Automatic photos from Wikipedia/Wikimedia (no key), optional Bing fallback via env BING_KEY | |
| # - Admin β’ Trim Fixer: preview dataset trims, save curated display trims per model | |
| # - Admin β’ Dataset Tools: recompute zero_to_100_kmh_s with a realistic heuristic and download the fixed CSV | |
| import os, glob, json, urllib.parse, requests, io | |
| import numpy as np | |
| import pandas as pd | |
| from sklearn.metrics.pairwise import cosine_similarity | |
| from sklearn.preprocessing import StandardScaler | |
| import gradio as gr | |
| DATA_PATH = 'RideSearch_dataset.csv' | |
| TRIMS_PATH = 'trims_map.json' | |
| # ----------------------------- Data loading ----------------------------- | |
| def load_df(): | |
| if os.path.exists(DATA_PATH): | |
| return pd.read_csv(DATA_PATH) | |
| parts = sorted(glob.glob('RideSearch_part*_small.csv')) | |
| if not parts: | |
| raise FileNotFoundError("Upload RideSearch_dataset.csv OR the 10 parts RideSearch_part*_small.csv.") | |
| df = pd.concat([pd.read_csv(p) for p in parts], ignore_index=True) | |
| df.to_csv(DATA_PATH, index=False) | |
| return df | |
| DF = load_df() | |
| NUM_COLS = [ | |
| 'horsepower','zero_to_100_kmh_s','seats','cargo_liters','price_usd', | |
| 'popularity_score','comfort_score','reliability_score','tech_score', | |
| 'ownership_cost_score','safety_rating' | |
| ] | |
| # ----------------------------- Embeddings ----------------------------- | |
| def ensure_emb(): | |
| txt_ok = os.path.exists('emb_text.npy') | |
| num_ok = os.path.exists('emb_num.npy') | |
| if txt_ok and num_ok: | |
| return np.load('emb_text.npy'), np.load('emb_num.npy') | |
| from sentence_transformers import SentenceTransformer | |
| m = SentenceTransformer('sentence-transformers/all-MiniLM-L6-v2') | |
| texts = DF['text_record'].astype(str).tolist() | |
| Etext = m.encode(texts, batch_size=256, show_progress_bar=True, normalize_embeddings=True) | |
| Etext = np.asarray(Etext, dtype='float32') | |
| np.save('emb_text.npy', Etext) | |
| X = DF[NUM_COLS].copy() | |
| if 'zero_to_100_kmh_s' in X.columns: | |
| X['zero_to_100_kmh_s'] = -X['zero_to_100_kmh_s'].astype('float32') # lower is better β invert | |
| Xs = StandardScaler().fit_transform(X.values.astype('float32')) | |
| Enum = Xs.astype('float32') | |
| np.save('emb_num.npy', Enum) | |
| return Etext, Enum | |
| # ----------------------------- Trims mapping ----------------------------- | |
| TRIM_CHOICES = {} # (make, model) -> [display trims] | |
| TRIM_ALIAS_TO_GENERIC = {} # (make, model, alias) -> generic token | |
| def load_trims(): | |
| global TRIM_CHOICES, TRIM_ALIAS_TO_GENERIC | |
| TRIM_CHOICES, TRIM_ALIAS_TO_GENERIC = {}, {} | |
| if not os.path.exists(TRIMS_PATH): | |
| return | |
| with open(TRIMS_PATH,'r',encoding='utf-8') as f: | |
| data = json.load(f) | |
| for key, v in data.items(): | |
| make, model = key.split('||', 1) | |
| TRIM_CHOICES[(make, model)] = v.get('display', []) | |
| for alias, generic in v.get('alias_to_generic', {}).items(): | |
| TRIM_ALIAS_TO_GENERIC[(make, model, alias)] = generic | |
| load_trims() | |
| def generic_to_display(make, model, generic_trim): | |
| if not generic_trim: | |
| return "" | |
| choices = TRIM_CHOICES.get((make, model)) | |
| if not choices: | |
| return str(generic_trim) | |
| for alias in choices: | |
| if TRIM_ALIAS_TO_GENERIC.get((make, model, alias), alias) == generic_trim: | |
| return alias | |
| return str(generic_trim) | |
| def alias_to_generic(make, model, alias): | |
| if not alias: | |
| return None | |
| return TRIM_ALIAS_TO_GENERIC.get((make, model, alias), alias) | |
| # ----------------------------- UI helpers ----------------------------- | |
| def models_for(make): | |
| if not make: | |
| return gr.update(choices=[], value=None) | |
| opts = sorted(DF.loc[DF['make'].eq(make), 'model'].dropna().unique().tolist()) | |
| return gr.update(choices=opts, value=None) | |
| def trim_year(make, model): | |
| if make and model and (make, model) in TRIM_CHOICES: | |
| trims = TRIM_CHOICES[(make, model)] | |
| else: | |
| sub = DF | |
| if make: sub = sub[sub['make'] == make] | |
| if model: sub = sub[sub['model'] == model] | |
| if 'trim' in sub.columns and not sub.empty: | |
| # Frequent trims first | |
| freq = sub['trim'].astype(str).value_counts().head(15).index.tolist() | |
| trims = [generic_to_display(make or "", model or "", t) for t in freq] | |
| else: | |
| trims = [] | |
| if make and model: | |
| years = sorted( | |
| DF.loc[(DF['make'].eq(make)) & (DF['model'].eq(model)), 'year'] | |
| .dropna().astype(int).unique().tolist() | |
| ) | |
| else: | |
| years = [] | |
| return trims, years | |
| def on_model_change(make, model): | |
| trims, years = trim_year(make, model) | |
| return gr.update(choices=trims, value=None), gr.update(choices=years, value=None) | |
| def apply_filters(df, body, fuel, y_min, y_max, p_min, p_max, safety, reliab): | |
| out = df.copy() | |
| if body != 'Any': out = out[out['body_type'] == body] | |
| if fuel != 'Any': out = out[out['fuel'] == fuel] | |
| out = out[(out['year'] >= y_min) & (out['year'] <= y_max)] | |
| out = out[(out['price_usd'] >= p_min) & (out['price_usd'] <= p_max)] | |
| out = out[(out['safety_rating'] >= safety) & (out['reliability_score'] >= reliab)] | |
| return out | |
| # ----------------------------- Photos ----------------------------- | |
| def fetch_wikimedia_image(query): | |
| # Try PageImages | |
| try: | |
| q = urllib.parse.quote(query) | |
| url = f"https://en.wikipedia.org/w/api.php?action=query&prop=pageimages&format=json&piprop=original&titles={q}" | |
| r = requests.get(url, timeout=8) | |
| data = r.json() | |
| pages = data.get('query', {}).get('pages', {}) | |
| for _, v in pages.items(): | |
| orig = v.get('original') | |
| if orig and 'source' in orig: | |
| return orig['source'] | |
| except Exception: | |
| pass | |
| # Try REST search + summary thumbnail | |
| try: | |
| s = requests.get( | |
| "https://en.wikipedia.org/w/rest.php/v1/search/title", | |
| params={"q": query, "limit": 1}, | |
| timeout=8 | |
| ).json() | |
| if s.get('pages'): | |
| title = s['pages'][0]['title'] | |
| summ = requests.get( | |
| f"https://en.wikipedia.org/api/rest_v1/page/summary/{urllib.parse.quote(title)}", | |
| timeout=8 | |
| ).json() | |
| thumb = summ.get('thumbnail',{}).get('source') | |
| if thumb: | |
| return thumb | |
| except Exception: | |
| pass | |
| return None | |
| def fetch_bing_image(query): | |
| key = os.getenv("BING_KEY") | |
| if not key: | |
| return None | |
| try: | |
| headers = {"Ocp-Apim-Subscription-Key": key} | |
| params = {"q": query, "count": 1, "safeSearch": "Strict"} | |
| r = requests.get("https://api.bing.microsoft.com/v7.0/images/search", | |
| headers=headers, params=params, timeout=8) | |
| j = r.json() | |
| if j.get("value"): | |
| return j["value"][0]["contentUrl"] | |
| except Exception: | |
| return None | |
| return None | |
| def get_image_for(make, model, trim_disp, year): | |
| parts = [str(p) for p in [year, make, model, trim_disp] if p] | |
| base = " ".join(parts) | |
| url = fetch_wikimedia_image(base) or fetch_wikimedia_image(f"{make} {model}") | |
| if not url: | |
| url = fetch_bing_image(base) | |
| return url | |
| def placeholder_svg_data_uri(title): | |
| svg = f\"\"\"<svg xmlns='http://www.w3.org/2000/svg' width='480' height='320'> | |
| <rect width='100%' height='100%' fill='#eef3fb'/> | |
| <text x='50%' y='50%' dominant-baseline='middle' text-anchor='middle' | |
| font-family='Arial' font-size='22' fill='#223'> | |
| {title} | |
| </text> | |
| </svg>\"\"\" | |
| return "data:image/svg+xml;utf8," + urllib.parse.quote(svg) | |
| def build_gallery_html(df_rows): | |
| cards = [] | |
| for _, r in df_rows.iterrows(): | |
| disp_trim = generic_to_display(r['make'], r['model'], r['trim']) | |
| label = f"{r['make']} {r['model']} {disp_trim}" | |
| img_src = get_image_for(r['make'], r['model'], disp_trim, int(r['year'])) | |
| if not img_src: | |
| img_src = placeholder_svg_data_uri(f"{r['make']} {r['model']}") | |
| cards.append(f\"\"\" | |
| <div style="width:240px;margin:6px;border:1px solid #ddd;border-radius:12px;overflow:hidden;background:#fff;"> | |
| <img src="{img_src}" style="width:240px;height:160px;object-fit:cover;display:block" /> | |
| <div style="padding:8px 10px;font:14px/1.3 Arial,sans-serif;color:#111">{label}</div> | |
| </div> | |
| \"\"\") | |
| return f"<div style='display:flex;flex-wrap:wrap'>{''.join(cards)}</div>" | |
| # ----------------------------- Anchor & Recommend ----------------------------- | |
| def find_anchor(make, model, trim_display, year): | |
| trim_generic = alias_to_generic(make, model, trim_display) if trim_display else None | |
| sub = DF.copy() | |
| if make: sub = sub[sub['make'] == make] | |
| if model: sub = sub[sub['model'] == model] | |
| def pick(df_): | |
| if df_.empty: return None | |
| return df_.sort_values('popularity_score', ascending=False).iloc[0] | |
| exact = sub.copy() | |
| if trim_generic: exact = exact[exact['trim'] == trim_generic] | |
| if year: exact = exact[exact['year'] == year] | |
| if not exact.empty: return pick(exact) | |
| if year: | |
| y_only = sub[sub['year'] == year] | |
| if not y_only.empty: return pick(y_only) | |
| if trim_generic: | |
| t_only = sub[sub['trim'] == trim_generic] | |
| if not t_only.empty: return pick(t_only) | |
| return pick(sub) | |
| def apply_and_recommend(a, topk, alpha, body, fuel, y_min, y_max, p_min, p_max, safety, reliab, | |
| cross_brand_only, exclude_same_model): | |
| pool = DF.copy() | |
| if cross_brand_only: | |
| pool = pool[pool['make'] != a['make']] | |
| if exclude_same_model: | |
| pool = pool[~((pool['make'] == a['make']) & (pool['model'] == a['model']))] | |
| pool = apply_filters(pool, body, fuel, int(y_min), int(y_max), int(p_min), int(p_max), int(safety), int(reliab)) | |
| if pool.empty: | |
| return None, "No cars after filters. Try widening year/price/safety." | |
| Etext, Enum = ensure_emb() | |
| idx_anchor = int(a.name) | |
| cand_idx = pool.index.values | |
| st = cosine_similarity(Etext[idx_anchor:idx_anchor+1], Etext[cand_idx])[0] | |
| sn = cosine_similarity(Enum[idx_anchor:idx_anchor+1], Enum[cand_idx])[0] | |
| s = float(alpha)*st + (1-float(alpha))*sn | |
| order = np.argsort(-s) | |
| seen = set(); chosen = [] | |
| for j in order: | |
| r = DF.loc[cand_idx[j]] | |
| key = (r['make'], r['model']) | |
| if key in seen: continue | |
| seen.add(key); chosen.append(cand_idx[j]) | |
| if len(chosen) >= int(topk): break | |
| if not chosen: | |
| return None, "No recommendations found after constraints." | |
| sel = DF.loc[chosen].copy() | |
| sel['trim_display'] = sel.apply(lambda r: generic_to_display(r['make'], r['model'], r['trim']), axis=1) | |
| sim_lookup = {cand_idx[j]: round(float(s[j])*100, 1) for j in order} | |
| sel['similarity_%'] = sel.index.map(lambda k: sim_lookup.get(k, 0.0)) | |
| return sel, None | |
| def recommend(make, model, trim_display, year, topk, alpha, | |
| body, fuel, y_min, y_max, p_min, p_max, safety, reliab, | |
| cross_brand_only=True, exclude_same_model=True): | |
| a = find_anchor(make, model, trim_display, year) | |
| if a is None: | |
| return "No match for that combo.", None, "", None | |
| sel, err = apply_and_recommend(a, topk, alpha, body, fuel, y_min, y_max, p_min, p_max, safety, reliab, | |
| cross_brand_only, exclude_same_model) | |
| if err: | |
| return err, None, "", None | |
| cols = ['name','make','model','trim_display','year','body_type','fuel','engine_type', | |
| 'price_usd','horsepower','zero_to_100_kmh_s','popularity_score','comfort_score', | |
| 'reliability_score','tech_score','ownership_cost_score','safety_rating','similarity_%'] | |
| anchor_text = (f"**{a['make']} {a['model']} {generic_to_display(a['make'], a['model'], a['trim'])} " | |
| f"{int(a['year'])}** \\n" | |
| f"Body: {a['body_type']} β’ Fuel: {a['fuel']} β’ Engine: {a['engine_type']} \\n" | |
| f"HP: {int(a['horsepower'])} β’ 0β100: {a['zero_to_100_kmh_s']}s β’ Price: ${int(a['price_usd']):,} \\n" | |
| f"Popularity {int(a['popularity_score'])}/10 β’ Comfort {int(a['comfort_score'])}/10 β’ " | |
| f"Reliability {int(a['reliability_score'])}/100 β’ Safety {int(a['safety_rating'])}β ") | |
| note = (f"Ξ± = {float(alpha):.2f} (text β numeric) β’ Cross-brand only = {cross_brand_only} " | |
| f"β’ Exclude same model = {exclude_same_model}") | |
| gallery = build_gallery_html(sel) | |
| return anchor_text, sel[cols], note, gallery | |
| # ----------------------------- Admin: Trim Fixer ----------------------------- | |
| def wiki_suggest_trims(make, model): | |
| query = f"{make} {model} trim levels" | |
| titles = [] | |
| try: | |
| s = requests.get( | |
| "https://en.wikipedia.org/w/rest.php/v1/search/title", | |
| params={"q": query, "limit": 5}, | |
| timeout=8 | |
| ).json() | |
| titles = [p['title'] for p in s.get('pages', [])] | |
| except Exception: | |
| pass | |
| sub = DF[(DF['make']==make) & (DF['model']==model)] | |
| hints = sub['trim'].astype(str).value_counts().head(10).index.tolist() | |
| return {"wiki_titles": titles, "dataset_top_trims": hints} | |
| def admin_preview(make, model): | |
| info = wiki_suggest_trims(make, model) | |
| df_sub = DF[(DF['make']==make) & (DF['model']==model)][['trim','year']].copy() | |
| df_sub['count'] = 1 | |
| counts = df_sub.groupby('trim')['count'].sum().reset_index().sort_values('count', ascending=False) | |
| return info, counts | |
| def admin_save_mapping(make, model, list_of_trims): | |
| entries = [t.strip() for t in list_of_trims.splitlines() if t.strip()] | |
| if not entries: | |
| return "No trims provided." | |
| key = f"{make}||{model}" | |
| alias_map = {t: t for t in entries} # identity mapping by default | |
| data = {} | |
| if os.path.exists(TRIMS_PATH): | |
| with open(TRIMS_PATH,'r',encoding='utf-8') as f: | |
| data = json.load(f) | |
| data[key] = {"display": entries, "alias_to_generic": alias_map} | |
| with open(TRIMS_PATH,'w',encoding='utf-8') as f: | |
| json.dump(data, f, indent=2, ensure_ascii=False) | |
| load_trims() | |
| return f"Saved {len(entries)} trims for {make} {model}. Refresh the main tab dropdown." | |
| # ----------------------------- Admin: Dataset Tools (0β100 fix) ----------------------------- | |
| def _estimate_0_100(row): | |
| """Heuristic: t = 26 - 3.2*ln(hp) + body_adj + fuel_adj + trim_adj + noise, clamped [2.9, 14.5].""" | |
| try: | |
| hp = float(row.get('horsepower', 150) or 150) | |
| except Exception: | |
| hp = 150.0 | |
| body = str(row.get('body_type','')).lower() | |
| fuel = str(row.get('fuel','')).lower() | |
| trim = str(row.get('trim','')).lower() | |
| base = 26.0 - 3.2*np.log(max(hp, 60.0)) # >=60 hp to avoid extremes | |
| body_adj_map = { | |
| 'sports': -2.5, 'coupe': -1.2, 'sedan': 0.0, 'hatch': 0.2, 'hatchback': 0.2, | |
| 'wagon': 0.4, 'suv': 0.8, 'crossover': 0.6, 'pickup': 1.2, 'truck': 1.2, 'van': 1.0 | |
| } | |
| body_adj = 0.0 | |
| for k,v in body_adj_map.items(): | |
| if k in body: | |
| body_adj = v; break | |
| fuel_adj = 0.0 | |
| if 'ev' in fuel or 'electric' in fuel: fuel_adj -= 0.8 | |
| if 'hybrid' in fuel: fuel_adj -= 0.3 | |
| if 'diesel' in fuel: fuel_adj += 0.4 | |
| trim_adj = 0.0 | |
| if 'performance' in trim or 'rs' in trim or 'amg' in trim or 'm ' in f" {trim} " or 'type r' in trim: | |
| trim_adj -= 0.5 | |
| elif 'sport' in trim: | |
| trim_adj -= 0.3 | |
| elif 'premium' in trim: | |
| trim_adj -= 0.2 | |
| noise = np.random.uniform(-0.2, 0.2) | |
| t = base + body_adj + fuel_adj + trim_adj + noise | |
| t = float(np.clip(t, 2.9, 14.5)) | |
| return round(t, 2) | |
| def admin_fix_zero_to_100(save_as_new=True): | |
| df = DF.copy() | |
| col = 'zero_to_100_kmh_s' | |
| # Detect "broken" column (too-low variance or few unique values) | |
| bad = False | |
| try: | |
| vals = df[col].astype(float) | |
| if vals.std() < 0.25 or vals.nunique() < max(10, int(0.05*len(vals))): | |
| bad = True | |
| except Exception: | |
| bad = True | |
| if not bad: | |
| # Still offer regeneration by choice | |
| bad = True | |
| if bad: | |
| df[col] = df.apply(_estimate_0_100, axis=1) | |
| out_name = 'RideSearch_dataset_fixed.csv' if save_as_new else DATA_PATH | |
| df.to_csv(out_name, index=False) | |
| # Simple before/after stats | |
| try: | |
| old_std = float(DF[col].astype(float).std()) | |
| except Exception: | |
| old_std = float('nan') | |
| new_std = float(df[col].astype(float).std()) | |
| info = { | |
| "saved_to": out_name, | |
| "old_std": old_std, | |
| "new_std": new_std, | |
| "rows": int(len(df)) | |
| } | |
| # Provide a downloadable file | |
| return info, out_name | |
| # ----------------------------- UI ----------------------------- | |
| def build_ui(): | |
| y_lo, y_hi = int(DF['year'].min()), int(DF['year'].max()) | |
| p_lo, p_hi = int(DF['price_usd'].min()), int(DF['price_usd'].max()) | |
| with gr.Blocks(theme=gr.themes.Soft()) as demo: | |
| gr.Markdown("# RideSearch β cross-brand recommendations with **real trims** + automatic photos") | |
| with gr.Tab("Pick & Recommend"): | |
| with gr.Row(): | |
| mk = gr.Dropdown(sorted(DF['make'].dropna().unique().tolist()), label="Make") | |
| md = gr.Dropdown([], label="Model") | |
| tr = gr.Dropdown([], label="Trim (optional)") | |
| yr = gr.Dropdown([], label="Year (optional)") | |
| mk.change(models_for, mk, md) | |
| md.change(lambda a,b: on_model_change(a,b), [mk, md], [tr, yr]) | |
| with gr.Row(): | |
| body = gr.Dropdown(['Any'] + sorted(DF['body_type'].dropna().unique().tolist()), value='Any', label='Body') | |
| fuel = gr.Dropdown(['Any'] + sorted(DF['fuel'].dropna().unique().tolist()), value='Any', label='Fuel') | |
| with gr.Row(): | |
| y_min = gr.Slider(y_lo, y_hi, value=y_lo, step=1, label='Year min') | |
| y_max = gr.Slider(y_lo, y_hi, value=y_hi, step=1, label='Year max') | |
| with gr.Row(): | |
| p_min = gr.Slider(p_lo, p_hi, value=p_lo, step=500, label='Price min (USD)') | |
| p_max = gr.Slider(p_lo, p_hi, value=min(p_hi, 80000), step=500, label='Price max (USD)') | |
| with gr.Row(): | |
| safety = gr.Slider(3, 5, value=4, step=1, label='Min Safety β ') | |
| reliab = gr.Slider(55, 99, value=70, step=1, label='Min Reliability') | |
| with gr.Row(): | |
| topk = gr.Slider(1, 10, value=5, step=1, label='Recommendations') | |
| alpha = gr.Slider(0, 1, value=0.7, step=0.05, label='Ξ± β Text vs Numeric') | |
| with gr.Row(): | |
| cross = gr.Checkbox(label="Cross-brand only", value=True) | |
| xmodel = gr.Checkbox(label="Exclude same model family", value=True) | |
| go = gr.Button("Recommend") | |
| anchor_md = gr.Markdown() | |
| table = gr.Dataframe(interactive=False, wrap=True, label="Recommendations") | |
| note = gr.Markdown() | |
| gallery = gr.HTML() | |
| go.click( | |
| recommend, | |
| [mk, md, tr, yr, topk, alpha, body, fuel, y_min, y_max, p_min, p_max, safety, reliab, cross, xmodel], | |
| [anchor_md, table, note, gallery] | |
| ) | |
| with gr.Tab("Admin β’ Trim Fixer"): | |
| gr.Markdown("**Add or repair realistic trim lists** per model. These control dropdowns and result display.") | |
| with gr.Row(): | |
| a_mk = gr.Dropdown(sorted(DF['make'].dropna().unique().tolist()), label="Make") | |
| a_md = gr.Dropdown([], label="Model") | |
| a_mk.change(models_for, a_mk, a_md) | |
| with gr.Row(): | |
| prev_btn = gr.Button("Preview dataset trims + Wiki hints") | |
| out_json = gr.JSON(value={}) | |
| out_counts = gr.Dataframe(wrap=True) | |
| prev_btn.click(admin_preview, [a_mk, a_md], [out_json, out_counts]) | |
| gr.Markdown("Paste **display trims** (one per line), then **Save mapping**.") | |
| trims_txt = gr.Textbox(lines=8, placeholder="e.g.\n320i\n330i\n340i\nM3", label="Display trims (one per line)") | |
| save_btn = gr.Button("Save mapping to trims_map.json") | |
| save_msg = gr.Markdown() | |
| save_btn.click(admin_save_mapping, [a_mk, a_md, trims_txt], save_msg) | |
| with gr.Tab("Admin β’ Dataset Tools"): | |
| gr.Markdown("**Fix zero_to_100_kmh_s** with a realistic heuristic and download the updated CSV.") | |
| with gr.Row(): | |
| save_new = gr.Checkbox(value=True, label="Save as new file (RideSearch_dataset_fixed.csv)") | |
| run_btn = gr.Button("Recompute 0β100 and Save") | |
| info_json = gr.JSON() | |
| out_file = gr.File(label="Download fixed CSV") | |
| run_btn.click(admin_fix_zero_to_100, [save_new], [info_json, out_file]) | |
| gr.Markdown("Tip: Add a `BING_KEY` secret in Space β Settings β Variables for Bing image fallback.") | |
| return demo | |
| demo = build_ui() | |
| if __name__ == "__main__": | |
| demo.queue().launch(server_name="0.0.0.0", server_port=7860) | |