RideSearch / app_new_images.py
Levimichael4's picture
Upload 3 files
1ce89d0 verified
# app_new_images.py β€” RideSearch with real trims + auto photos + Admin tools
# -------------------------------------------------------------------------
# Features:
# - Trim mapping via trims_map.json (for correct brand/model-specific display)
# - Cross-brand recommendations (unique model families by default)
# - Automatic photos from Wikipedia/Wikimedia (no key), optional Bing fallback via env BING_KEY
# - Admin β€’ Trim Fixer: preview dataset trims, save curated display trims per model
# - Admin β€’ Dataset Tools: recompute zero_to_100_kmh_s with a realistic heuristic and download the fixed CSV
import os, glob, json, urllib.parse, requests, io
import numpy as np
import pandas as pd
from sklearn.metrics.pairwise import cosine_similarity
from sklearn.preprocessing import StandardScaler
import gradio as gr
DATA_PATH = 'RideSearch_dataset.csv'
TRIMS_PATH = 'trims_map.json'
# ----------------------------- Data loading -----------------------------
def load_df():
if os.path.exists(DATA_PATH):
return pd.read_csv(DATA_PATH)
parts = sorted(glob.glob('RideSearch_part*_small.csv'))
if not parts:
raise FileNotFoundError("Upload RideSearch_dataset.csv OR the 10 parts RideSearch_part*_small.csv.")
df = pd.concat([pd.read_csv(p) for p in parts], ignore_index=True)
df.to_csv(DATA_PATH, index=False)
return df
DF = load_df()
NUM_COLS = [
'horsepower','zero_to_100_kmh_s','seats','cargo_liters','price_usd',
'popularity_score','comfort_score','reliability_score','tech_score',
'ownership_cost_score','safety_rating'
]
# ----------------------------- Embeddings -----------------------------
def ensure_emb():
txt_ok = os.path.exists('emb_text.npy')
num_ok = os.path.exists('emb_num.npy')
if txt_ok and num_ok:
return np.load('emb_text.npy'), np.load('emb_num.npy')
from sentence_transformers import SentenceTransformer
m = SentenceTransformer('sentence-transformers/all-MiniLM-L6-v2')
texts = DF['text_record'].astype(str).tolist()
Etext = m.encode(texts, batch_size=256, show_progress_bar=True, normalize_embeddings=True)
Etext = np.asarray(Etext, dtype='float32')
np.save('emb_text.npy', Etext)
X = DF[NUM_COLS].copy()
if 'zero_to_100_kmh_s' in X.columns:
X['zero_to_100_kmh_s'] = -X['zero_to_100_kmh_s'].astype('float32') # lower is better β†’ invert
Xs = StandardScaler().fit_transform(X.values.astype('float32'))
Enum = Xs.astype('float32')
np.save('emb_num.npy', Enum)
return Etext, Enum
# ----------------------------- Trims mapping -----------------------------
TRIM_CHOICES = {} # (make, model) -> [display trims]
TRIM_ALIAS_TO_GENERIC = {} # (make, model, alias) -> generic token
def load_trims():
global TRIM_CHOICES, TRIM_ALIAS_TO_GENERIC
TRIM_CHOICES, TRIM_ALIAS_TO_GENERIC = {}, {}
if not os.path.exists(TRIMS_PATH):
return
with open(TRIMS_PATH,'r',encoding='utf-8') as f:
data = json.load(f)
for key, v in data.items():
make, model = key.split('||', 1)
TRIM_CHOICES[(make, model)] = v.get('display', [])
for alias, generic in v.get('alias_to_generic', {}).items():
TRIM_ALIAS_TO_GENERIC[(make, model, alias)] = generic
load_trims()
def generic_to_display(make, model, generic_trim):
if not generic_trim:
return ""
choices = TRIM_CHOICES.get((make, model))
if not choices:
return str(generic_trim)
for alias in choices:
if TRIM_ALIAS_TO_GENERIC.get((make, model, alias), alias) == generic_trim:
return alias
return str(generic_trim)
def alias_to_generic(make, model, alias):
if not alias:
return None
return TRIM_ALIAS_TO_GENERIC.get((make, model, alias), alias)
# ----------------------------- UI helpers -----------------------------
def models_for(make):
if not make:
return gr.update(choices=[], value=None)
opts = sorted(DF.loc[DF['make'].eq(make), 'model'].dropna().unique().tolist())
return gr.update(choices=opts, value=None)
def trim_year(make, model):
if make and model and (make, model) in TRIM_CHOICES:
trims = TRIM_CHOICES[(make, model)]
else:
sub = DF
if make: sub = sub[sub['make'] == make]
if model: sub = sub[sub['model'] == model]
if 'trim' in sub.columns and not sub.empty:
# Frequent trims first
freq = sub['trim'].astype(str).value_counts().head(15).index.tolist()
trims = [generic_to_display(make or "", model or "", t) for t in freq]
else:
trims = []
if make and model:
years = sorted(
DF.loc[(DF['make'].eq(make)) & (DF['model'].eq(model)), 'year']
.dropna().astype(int).unique().tolist()
)
else:
years = []
return trims, years
def on_model_change(make, model):
trims, years = trim_year(make, model)
return gr.update(choices=trims, value=None), gr.update(choices=years, value=None)
def apply_filters(df, body, fuel, y_min, y_max, p_min, p_max, safety, reliab):
out = df.copy()
if body != 'Any': out = out[out['body_type'] == body]
if fuel != 'Any': out = out[out['fuel'] == fuel]
out = out[(out['year'] >= y_min) & (out['year'] <= y_max)]
out = out[(out['price_usd'] >= p_min) & (out['price_usd'] <= p_max)]
out = out[(out['safety_rating'] >= safety) & (out['reliability_score'] >= reliab)]
return out
# ----------------------------- Photos -----------------------------
def fetch_wikimedia_image(query):
# Try PageImages
try:
q = urllib.parse.quote(query)
url = f"https://en.wikipedia.org/w/api.php?action=query&prop=pageimages&format=json&piprop=original&titles={q}"
r = requests.get(url, timeout=8)
data = r.json()
pages = data.get('query', {}).get('pages', {})
for _, v in pages.items():
orig = v.get('original')
if orig and 'source' in orig:
return orig['source']
except Exception:
pass
# Try REST search + summary thumbnail
try:
s = requests.get(
"https://en.wikipedia.org/w/rest.php/v1/search/title",
params={"q": query, "limit": 1},
timeout=8
).json()
if s.get('pages'):
title = s['pages'][0]['title']
summ = requests.get(
f"https://en.wikipedia.org/api/rest_v1/page/summary/{urllib.parse.quote(title)}",
timeout=8
).json()
thumb = summ.get('thumbnail',{}).get('source')
if thumb:
return thumb
except Exception:
pass
return None
def fetch_bing_image(query):
key = os.getenv("BING_KEY")
if not key:
return None
try:
headers = {"Ocp-Apim-Subscription-Key": key}
params = {"q": query, "count": 1, "safeSearch": "Strict"}
r = requests.get("https://api.bing.microsoft.com/v7.0/images/search",
headers=headers, params=params, timeout=8)
j = r.json()
if j.get("value"):
return j["value"][0]["contentUrl"]
except Exception:
return None
return None
def get_image_for(make, model, trim_disp, year):
parts = [str(p) for p in [year, make, model, trim_disp] if p]
base = " ".join(parts)
url = fetch_wikimedia_image(base) or fetch_wikimedia_image(f"{make} {model}")
if not url:
url = fetch_bing_image(base)
return url
def placeholder_svg_data_uri(title):
svg = f\"\"\"<svg xmlns='http://www.w3.org/2000/svg' width='480' height='320'>
<rect width='100%' height='100%' fill='#eef3fb'/>
<text x='50%' y='50%' dominant-baseline='middle' text-anchor='middle'
font-family='Arial' font-size='22' fill='#223'>
{title}
</text>
</svg>\"\"\"
return "data:image/svg+xml;utf8," + urllib.parse.quote(svg)
def build_gallery_html(df_rows):
cards = []
for _, r in df_rows.iterrows():
disp_trim = generic_to_display(r['make'], r['model'], r['trim'])
label = f"{r['make']} {r['model']} {disp_trim}"
img_src = get_image_for(r['make'], r['model'], disp_trim, int(r['year']))
if not img_src:
img_src = placeholder_svg_data_uri(f"{r['make']} {r['model']}")
cards.append(f\"\"\"
<div style="width:240px;margin:6px;border:1px solid #ddd;border-radius:12px;overflow:hidden;background:#fff;">
<img src="{img_src}" style="width:240px;height:160px;object-fit:cover;display:block" />
<div style="padding:8px 10px;font:14px/1.3 Arial,sans-serif;color:#111">{label}</div>
</div>
\"\"\")
return f"<div style='display:flex;flex-wrap:wrap'>{''.join(cards)}</div>"
# ----------------------------- Anchor & Recommend -----------------------------
def find_anchor(make, model, trim_display, year):
trim_generic = alias_to_generic(make, model, trim_display) if trim_display else None
sub = DF.copy()
if make: sub = sub[sub['make'] == make]
if model: sub = sub[sub['model'] == model]
def pick(df_):
if df_.empty: return None
return df_.sort_values('popularity_score', ascending=False).iloc[0]
exact = sub.copy()
if trim_generic: exact = exact[exact['trim'] == trim_generic]
if year: exact = exact[exact['year'] == year]
if not exact.empty: return pick(exact)
if year:
y_only = sub[sub['year'] == year]
if not y_only.empty: return pick(y_only)
if trim_generic:
t_only = sub[sub['trim'] == trim_generic]
if not t_only.empty: return pick(t_only)
return pick(sub)
def apply_and_recommend(a, topk, alpha, body, fuel, y_min, y_max, p_min, p_max, safety, reliab,
cross_brand_only, exclude_same_model):
pool = DF.copy()
if cross_brand_only:
pool = pool[pool['make'] != a['make']]
if exclude_same_model:
pool = pool[~((pool['make'] == a['make']) & (pool['model'] == a['model']))]
pool = apply_filters(pool, body, fuel, int(y_min), int(y_max), int(p_min), int(p_max), int(safety), int(reliab))
if pool.empty:
return None, "No cars after filters. Try widening year/price/safety."
Etext, Enum = ensure_emb()
idx_anchor = int(a.name)
cand_idx = pool.index.values
st = cosine_similarity(Etext[idx_anchor:idx_anchor+1], Etext[cand_idx])[0]
sn = cosine_similarity(Enum[idx_anchor:idx_anchor+1], Enum[cand_idx])[0]
s = float(alpha)*st + (1-float(alpha))*sn
order = np.argsort(-s)
seen = set(); chosen = []
for j in order:
r = DF.loc[cand_idx[j]]
key = (r['make'], r['model'])
if key in seen: continue
seen.add(key); chosen.append(cand_idx[j])
if len(chosen) >= int(topk): break
if not chosen:
return None, "No recommendations found after constraints."
sel = DF.loc[chosen].copy()
sel['trim_display'] = sel.apply(lambda r: generic_to_display(r['make'], r['model'], r['trim']), axis=1)
sim_lookup = {cand_idx[j]: round(float(s[j])*100, 1) for j in order}
sel['similarity_%'] = sel.index.map(lambda k: sim_lookup.get(k, 0.0))
return sel, None
def recommend(make, model, trim_display, year, topk, alpha,
body, fuel, y_min, y_max, p_min, p_max, safety, reliab,
cross_brand_only=True, exclude_same_model=True):
a = find_anchor(make, model, trim_display, year)
if a is None:
return "No match for that combo.", None, "", None
sel, err = apply_and_recommend(a, topk, alpha, body, fuel, y_min, y_max, p_min, p_max, safety, reliab,
cross_brand_only, exclude_same_model)
if err:
return err, None, "", None
cols = ['name','make','model','trim_display','year','body_type','fuel','engine_type',
'price_usd','horsepower','zero_to_100_kmh_s','popularity_score','comfort_score',
'reliability_score','tech_score','ownership_cost_score','safety_rating','similarity_%']
anchor_text = (f"**{a['make']} {a['model']} {generic_to_display(a['make'], a['model'], a['trim'])} "
f"{int(a['year'])}** \\n"
f"Body: {a['body_type']} β€’ Fuel: {a['fuel']} β€’ Engine: {a['engine_type']} \\n"
f"HP: {int(a['horsepower'])} β€’ 0–100: {a['zero_to_100_kmh_s']}s β€’ Price: ${int(a['price_usd']):,} \\n"
f"Popularity {int(a['popularity_score'])}/10 β€’ Comfort {int(a['comfort_score'])}/10 β€’ "
f"Reliability {int(a['reliability_score'])}/100 β€’ Safety {int(a['safety_rating'])}β˜…")
note = (f"Ξ± = {float(alpha):.2f} (text ↔ numeric) β€’ Cross-brand only = {cross_brand_only} "
f"β€’ Exclude same model = {exclude_same_model}")
gallery = build_gallery_html(sel)
return anchor_text, sel[cols], note, gallery
# ----------------------------- Admin: Trim Fixer -----------------------------
def wiki_suggest_trims(make, model):
query = f"{make} {model} trim levels"
titles = []
try:
s = requests.get(
"https://en.wikipedia.org/w/rest.php/v1/search/title",
params={"q": query, "limit": 5},
timeout=8
).json()
titles = [p['title'] for p in s.get('pages', [])]
except Exception:
pass
sub = DF[(DF['make']==make) & (DF['model']==model)]
hints = sub['trim'].astype(str).value_counts().head(10).index.tolist()
return {"wiki_titles": titles, "dataset_top_trims": hints}
def admin_preview(make, model):
info = wiki_suggest_trims(make, model)
df_sub = DF[(DF['make']==make) & (DF['model']==model)][['trim','year']].copy()
df_sub['count'] = 1
counts = df_sub.groupby('trim')['count'].sum().reset_index().sort_values('count', ascending=False)
return info, counts
def admin_save_mapping(make, model, list_of_trims):
entries = [t.strip() for t in list_of_trims.splitlines() if t.strip()]
if not entries:
return "No trims provided."
key = f"{make}||{model}"
alias_map = {t: t for t in entries} # identity mapping by default
data = {}
if os.path.exists(TRIMS_PATH):
with open(TRIMS_PATH,'r',encoding='utf-8') as f:
data = json.load(f)
data[key] = {"display": entries, "alias_to_generic": alias_map}
with open(TRIMS_PATH,'w',encoding='utf-8') as f:
json.dump(data, f, indent=2, ensure_ascii=False)
load_trims()
return f"Saved {len(entries)} trims for {make} {model}. Refresh the main tab dropdown."
# ----------------------------- Admin: Dataset Tools (0–100 fix) -----------------------------
def _estimate_0_100(row):
"""Heuristic: t = 26 - 3.2*ln(hp) + body_adj + fuel_adj + trim_adj + noise, clamped [2.9, 14.5]."""
try:
hp = float(row.get('horsepower', 150) or 150)
except Exception:
hp = 150.0
body = str(row.get('body_type','')).lower()
fuel = str(row.get('fuel','')).lower()
trim = str(row.get('trim','')).lower()
base = 26.0 - 3.2*np.log(max(hp, 60.0)) # >=60 hp to avoid extremes
body_adj_map = {
'sports': -2.5, 'coupe': -1.2, 'sedan': 0.0, 'hatch': 0.2, 'hatchback': 0.2,
'wagon': 0.4, 'suv': 0.8, 'crossover': 0.6, 'pickup': 1.2, 'truck': 1.2, 'van': 1.0
}
body_adj = 0.0
for k,v in body_adj_map.items():
if k in body:
body_adj = v; break
fuel_adj = 0.0
if 'ev' in fuel or 'electric' in fuel: fuel_adj -= 0.8
if 'hybrid' in fuel: fuel_adj -= 0.3
if 'diesel' in fuel: fuel_adj += 0.4
trim_adj = 0.0
if 'performance' in trim or 'rs' in trim or 'amg' in trim or 'm ' in f" {trim} " or 'type r' in trim:
trim_adj -= 0.5
elif 'sport' in trim:
trim_adj -= 0.3
elif 'premium' in trim:
trim_adj -= 0.2
noise = np.random.uniform(-0.2, 0.2)
t = base + body_adj + fuel_adj + trim_adj + noise
t = float(np.clip(t, 2.9, 14.5))
return round(t, 2)
def admin_fix_zero_to_100(save_as_new=True):
df = DF.copy()
col = 'zero_to_100_kmh_s'
# Detect "broken" column (too-low variance or few unique values)
bad = False
try:
vals = df[col].astype(float)
if vals.std() < 0.25 or vals.nunique() < max(10, int(0.05*len(vals))):
bad = True
except Exception:
bad = True
if not bad:
# Still offer regeneration by choice
bad = True
if bad:
df[col] = df.apply(_estimate_0_100, axis=1)
out_name = 'RideSearch_dataset_fixed.csv' if save_as_new else DATA_PATH
df.to_csv(out_name, index=False)
# Simple before/after stats
try:
old_std = float(DF[col].astype(float).std())
except Exception:
old_std = float('nan')
new_std = float(df[col].astype(float).std())
info = {
"saved_to": out_name,
"old_std": old_std,
"new_std": new_std,
"rows": int(len(df))
}
# Provide a downloadable file
return info, out_name
# ----------------------------- UI -----------------------------
def build_ui():
y_lo, y_hi = int(DF['year'].min()), int(DF['year'].max())
p_lo, p_hi = int(DF['price_usd'].min()), int(DF['price_usd'].max())
with gr.Blocks(theme=gr.themes.Soft()) as demo:
gr.Markdown("# RideSearch β€” cross-brand recommendations with **real trims** + automatic photos")
with gr.Tab("Pick & Recommend"):
with gr.Row():
mk = gr.Dropdown(sorted(DF['make'].dropna().unique().tolist()), label="Make")
md = gr.Dropdown([], label="Model")
tr = gr.Dropdown([], label="Trim (optional)")
yr = gr.Dropdown([], label="Year (optional)")
mk.change(models_for, mk, md)
md.change(lambda a,b: on_model_change(a,b), [mk, md], [tr, yr])
with gr.Row():
body = gr.Dropdown(['Any'] + sorted(DF['body_type'].dropna().unique().tolist()), value='Any', label='Body')
fuel = gr.Dropdown(['Any'] + sorted(DF['fuel'].dropna().unique().tolist()), value='Any', label='Fuel')
with gr.Row():
y_min = gr.Slider(y_lo, y_hi, value=y_lo, step=1, label='Year min')
y_max = gr.Slider(y_lo, y_hi, value=y_hi, step=1, label='Year max')
with gr.Row():
p_min = gr.Slider(p_lo, p_hi, value=p_lo, step=500, label='Price min (USD)')
p_max = gr.Slider(p_lo, p_hi, value=min(p_hi, 80000), step=500, label='Price max (USD)')
with gr.Row():
safety = gr.Slider(3, 5, value=4, step=1, label='Min Safety β˜…')
reliab = gr.Slider(55, 99, value=70, step=1, label='Min Reliability')
with gr.Row():
topk = gr.Slider(1, 10, value=5, step=1, label='Recommendations')
alpha = gr.Slider(0, 1, value=0.7, step=0.05, label='Ξ± β€” Text vs Numeric')
with gr.Row():
cross = gr.Checkbox(label="Cross-brand only", value=True)
xmodel = gr.Checkbox(label="Exclude same model family", value=True)
go = gr.Button("Recommend")
anchor_md = gr.Markdown()
table = gr.Dataframe(interactive=False, wrap=True, label="Recommendations")
note = gr.Markdown()
gallery = gr.HTML()
go.click(
recommend,
[mk, md, tr, yr, topk, alpha, body, fuel, y_min, y_max, p_min, p_max, safety, reliab, cross, xmodel],
[anchor_md, table, note, gallery]
)
with gr.Tab("Admin β€’ Trim Fixer"):
gr.Markdown("**Add or repair realistic trim lists** per model. These control dropdowns and result display.")
with gr.Row():
a_mk = gr.Dropdown(sorted(DF['make'].dropna().unique().tolist()), label="Make")
a_md = gr.Dropdown([], label="Model")
a_mk.change(models_for, a_mk, a_md)
with gr.Row():
prev_btn = gr.Button("Preview dataset trims + Wiki hints")
out_json = gr.JSON(value={})
out_counts = gr.Dataframe(wrap=True)
prev_btn.click(admin_preview, [a_mk, a_md], [out_json, out_counts])
gr.Markdown("Paste **display trims** (one per line), then **Save mapping**.")
trims_txt = gr.Textbox(lines=8, placeholder="e.g.\n320i\n330i\n340i\nM3", label="Display trims (one per line)")
save_btn = gr.Button("Save mapping to trims_map.json")
save_msg = gr.Markdown()
save_btn.click(admin_save_mapping, [a_mk, a_md, trims_txt], save_msg)
with gr.Tab("Admin β€’ Dataset Tools"):
gr.Markdown("**Fix zero_to_100_kmh_s** with a realistic heuristic and download the updated CSV.")
with gr.Row():
save_new = gr.Checkbox(value=True, label="Save as new file (RideSearch_dataset_fixed.csv)")
run_btn = gr.Button("Recompute 0–100 and Save")
info_json = gr.JSON()
out_file = gr.File(label="Download fixed CSV")
run_btn.click(admin_fix_zero_to_100, [save_new], [info_json, out_file])
gr.Markdown("Tip: Add a `BING_KEY` secret in Space β†’ Settings β†’ Variables for Bing image fallback.")
return demo
demo = build_ui()
if __name__ == "__main__":
demo.queue().launch(server_name="0.0.0.0", server_port=7860)