Spaces:
Sleeping
Sleeping
| import os, json, time, tempfile | |
| import gradio as gr | |
| from dataclasses import asdict | |
| from huggingface_hub import HfApi, HfFolder | |
| from equipment_catalog import load_catalog # add_asset_from_catalog not used here | |
| # ------------ Hub dataset config ------------ | |
| DATA_REPO = os.environ.get("DATA_REPO", "jeffrey1963/Farm_Sim_Data") | |
| DATA_REPO_TYPE = "dataset" # you created a Dataset, not a Space | |
| def _hf_token(): | |
| tok = HfFolder.get_token() | |
| if not tok: | |
| raise RuntimeError("No HF token found. Set HF_TOKEN secret in Space settings.") | |
| return tok | |
| # ------------ Catalog helpers ------------ | |
| def _catalog_list(): | |
| cat = load_catalog() | |
| items = [] | |
| for k, v in cat.items(): | |
| d = asdict(v) | |
| d["id"] = k | |
| items.append(d) | |
| return items | |
| def _catalog_lookup(): | |
| return {i["id"]: i for i in _catalog_list()} | |
| def _brands(items): return ["All"] + sorted(list({i["brand"] for i in items})) | |
| def _cats(items): return ["All"] + sorted(list({i["category"] for i in items})) | |
| def _filter_sort(items, q, brand, category, sort_key): | |
| q = (q or "").lower().strip() | |
| brand = brand or "All" | |
| category = category or "All" | |
| out = [] | |
| for it in items: | |
| if brand != "All" and it["brand"] != brand: | |
| continue | |
| if category != "All" and it["category"] != category: | |
| continue | |
| blob = f'{it.get("name","")} {it["brand"]} {it["model"]} {it["category"]}'.lower() | |
| if q and q not in blob: | |
| continue | |
| out.append(it) | |
| keymap = { | |
| "Price β": lambda x: -x["list_price"], | |
| "Price β": lambda x: x["list_price"], | |
| "HP β": lambda x: -(x.get("rated_hp") or 0), | |
| "HP β": lambda x: (x.get("rated_hp") or 0), | |
| "Brand AβZ": lambda x: (x["brand"], x["model"]), | |
| } | |
| return sorted(out, key=keymap.get(sort_key, keymap["Price β"])) | |
| def _grid_md(items): | |
| cards = [] | |
| for it in items: | |
| price = f'${it["list_price"]:,.0f}' | |
| hp = f'{it["rated_hp"]} hp' if it.get("rated_hp") else "" | |
| life = f'{it["expected_life_years"]} yrs' | |
| salvage = f'{int(100*it["salvage_value_pct"])}%' | |
| cards.append( | |
| f"""<div class="card"> | |
| <div class="title">{it['brand']} {it['model']}</div> | |
| <div class="subtitle">{it.get('name','')}</div> | |
| <div class="meta">{it['category'].title()} Β· {hp}</div> | |
| <div class="price">{price}</div> | |
| <div class="attrs">Life: {life} Β· Salvage: {salvage}</div> | |
| <div class="row"> | |
| <span class="small">ASAE dep: {it['asae_dep1']}/{it['asae_dep2']}</span> | |
| <span class="small">RFs: {it['repair_rf1']}/{it['repair_rf2']}</span> | |
| </div> | |
| </div>""" | |
| ) | |
| return ( | |
| """<style> | |
| .grid{display:grid;grid-template-columns:repeat(auto-fill,minmax(260px,1fr));gap:14px} | |
| .card{border:1px solid #e5e7eb;border-radius:12px;padding:12px;background:white} | |
| .title{font-weight:700}.subtitle{color:#6b7280;margin-top:2px} | |
| .meta{color:#374151;margin:6px 0}.price{font-size:1.1rem;font-weight:700;margin:6px 0} | |
| .attrs{color:#374151;margin-bottom:6px}.small{font-size:12px;color:#6b7280} | |
| .row{display:flex;justify-content:space-between} | |
| </style> | |
| <div class="grid"> | |
| """ + "\n".join(cards) + "\n</div>" | |
| ) | |
| def list_for_select(items): | |
| return [ (f'{i["brand"]} {i["model"]} β ${i["list_price"]:,.0f}', i["id"]) for i in items ] | |
| # ------------ Persistence ------------ | |
| def log_purchases_to_dataset(student_id: str, cart_items: list, catalog_lookup: dict) -> str: | |
| """ | |
| Append one JSON line per cart item to dataset. | |
| We save a timestamped file per checkout to avoid overwriting history. | |
| """ | |
| sid = (student_id or "").strip() | |
| if not sid: | |
| return "β οΈ Missing Student ID β NOT saved to dataset." | |
| ts = int(time.time()) | |
| lines = [] | |
| for it in cart_items: | |
| spec = catalog_lookup.get(it["id"], {}) | |
| lines.append(json.dumps({ | |
| "student_id": sid, | |
| "ts": ts, | |
| "catalog_id": it["id"], | |
| "label": it["label"], | |
| "price": float(it["price"]), | |
| "snapshot": { | |
| "brand": spec.get("brand"), | |
| "model": spec.get("model"), | |
| "category": spec.get("category"), | |
| "expected_life_years": spec.get("expected_life_years"), | |
| "salvage_value_pct": spec.get("salvage_value_pct"), | |
| "dep_method_default": spec.get("dep_method_default"), | |
| "asae_dep1": spec.get("asae_dep1"), | |
| "asae_dep2": spec.get("asae_dep2"), | |
| "repair_rf1": spec.get("repair_rf1"), | |
| "repair_rf2": spec.get("repair_rf2"), | |
| "eul_hours": spec.get("eul_hours"), | |
| "rated_hp": spec.get("rated_hp"), | |
| "transport_speed_mph": spec.get("transport_speed_mph"), | |
| "fuel_type": spec.get("fuel_type"), | |
| } | |
| })) | |
| tmp = tempfile.NamedTemporaryFile(delete=False, suffix=".jsonl") | |
| with open(tmp.name, "w") as f: | |
| f.write("\n".join(lines) + "\n") | |
| api = HfApi(token=_hf_token()) | |
| # Use timestamped file for durable history | |
| api.upload_file( | |
| path_or_fileobj=tmp.name, | |
| path_in_repo=f"purchases/{sid}-{ts}.jsonl", | |
| repo_id=DATA_REPO, | |
| repo_type=DATA_REPO_TYPE, | |
| ) | |
| return f"πΎ Saved {len(cart_items)} purchase(s) to dataset for {sid}." | |
| # ------------ Cart & UI logic ------------ | |
| def refresh(q, brand, category, sort_key, state): | |
| items = _catalog_list() | |
| filtered = _filter_sort(items, q, brand, category, sort_key) | |
| grid = _grid_md(filtered) | |
| options = list_for_select(filtered) | |
| cart = state or [] | |
| if not cart: | |
| cart_txt = "Cart is empty." | |
| else: | |
| total = sum(x["price"] for x in cart) | |
| lines = [f'β’ {c["label"]} β ${c["price"]:,.0f}' for c in cart] | |
| cart_txt = "\n".join(lines) + f"\n\n**Subtotal:** ${total:,.0f}" | |
| return grid, gr.update(choices=options), cart_txt | |
| def add_to_cart(sel_id, custom_price, state): | |
| items = {i["id"]: i for i in _catalog_list()} | |
| if sel_id not in items: | |
| return state, "Select a product first." | |
| it = items[sel_id] | |
| try: | |
| price = float(custom_price) if custom_price else float(it["list_price"]) | |
| except: | |
| price = float(it["list_price"]) | |
| cart = state or [] | |
| cart.append({"id": sel_id, "label": f'{it["brand"]} {it["model"]}', "price": price}) | |
| return cart, f"β Added {it['brand']} {it['model']} (${price:,.0f}) to cart." | |
| def clear_cart(_state): | |
| return [], "π§Ή Cart cleared." | |
| def checkout(state, student_id): | |
| if not state: | |
| return "Cart is empty.", [] | |
| note = log_purchases_to_dataset(student_id, state, _catalog_lookup()) | |
| receipts = [f'Purchased {c["label"]} at ${c["price"]:,.0f}.' for c in state] | |
| return "β Checkout complete!\n" + "\n".join(receipts) + f"\n\n{note}", [] | |
| # ------------ Gradio app ------------ | |
| with gr.Blocks(theme=gr.themes.Soft()) as app: | |
| gr.Markdown("## π Farm Equipment Store β Haymazon") | |
| cart_state = gr.State([]) | |
| items = _catalog_list() | |
| with gr.Row(): | |
| q = gr.Textbox(label="Search", placeholder="tractor, brand, modelβ¦") | |
| brand = gr.Dropdown(choices=_brands(items), value="All", label="Brand") | |
| category = gr.Dropdown(choices=_cats(items), value="All", label="Category") | |
| sort_key = gr.Dropdown(choices=["Price β","Price β","HP β","HP β","Brand AβZ"], value="Price β", label="Sort by") | |
| student_id = gr.Textbox(label="Student ID (required to save to class ledger)", placeholder="e.g., netid or student number") | |
| grid_md = gr.HTML() | |
| sel = gr.Dropdown(choices=list_for_select(items), label="Select item to add") | |
| custom_price = gr.Textbox(label="Custom price (optional)") | |
| with gr.Row(): | |
| add_btn = gr.Button("Add to Cart") | |
| clear_btn = gr.Button("Clear Cart") | |
| buy_btn = gr.Button("Checkout") | |
| cart_box = gr.Markdown("Cart is empty.") | |
| status = gr.Markdown() | |
| # Events | |
| for w in (q, brand, category, sort_key): | |
| w.change(refresh, inputs=[q, brand, category, sort_key, cart_state], | |
| outputs=[grid_md, sel, cart_box]) | |
| add_btn.click(add_to_cart, inputs=[sel, custom_price, cart_state], outputs=[cart_state, status]) \ | |
| .then(refresh, inputs=[q, brand, category, sort_key, cart_state], | |
| outputs=[grid_md, sel, cart_box]) | |
| clear_btn.click(clear_cart, inputs=[cart_state], outputs=[cart_state, status]) \ | |
| .then(refresh, inputs=[q, brand, category, sort_key, cart_state], | |
| outputs=[grid_md, sel, cart_box]) | |
| # β Checkout wired ONCE with student_id | |
| buy_btn.click( | |
| checkout, | |
| inputs=[cart_state, student_id], | |
| outputs=[status, cart_state] | |
| ).then( | |
| refresh, | |
| inputs=[q, brand, category, sort_key, cart_state], | |
| outputs=[grid_md, sel, cart_box] | |
| ) | |
| # Initial render | |
| app.load(refresh, inputs=[q, brand, category, sort_key, cart_state], | |
| outputs=[grid_md, sel, cart_box]) | |
| if __name__ == "__main__": | |
| app.launch() | |