Spaces:
Sleeping
Sleeping
| import os | |
| import re | |
| import json | |
| import math | |
| import hashlib | |
| from dataclasses import dataclass | |
| from datetime import datetime, date | |
| from typing import Dict, List, Optional, Tuple, Any | |
| import numpy as np | |
| import pandas as pd | |
| import fitz # PyMuPDF | |
| import faiss | |
| from sentence_transformers import SentenceTransformer | |
| from rapidfuzz import fuzz, process | |
| import gradio as gr | |
| from openai import OpenAI | |
| # ============================ | |
| # Settings | |
| # ============================ | |
| TODAY = date(2026, 1, 18) | |
| OPENAI_MODEL = "gpt-5.2" | |
| OPENAI_REASONING = {"effort": "high"} | |
| MATCH_OK = 80 | |
| EMBED_MODEL_NAME = "sentence-transformers/all-MiniLM-L6-v2" | |
| PARSEC_CONTEXT_BEFORE = 900 | |
| PARSEC_CONTEXT_AFTER = 1600 | |
| CACHE_DIR = os.path.join(os.getcwd(), ".onlyrouters_cache") | |
| os.makedirs(CACHE_DIR, exist_ok=True) | |
| # ============================ | |
| # OpenAI client (HF Space secret: OPENAI_API_KEY) | |
| # ============================ | |
| API_KEY = os.getenv("OPENAI_API_KEY", "").strip() | |
| client = OpenAI(api_key=API_KEY) if API_KEY else None | |
| # ============================ | |
| # Utilities | |
| # ============================ | |
| def norm_text(s: Any) -> str: | |
| try: | |
| if s is None or (isinstance(s, float) and math.isnan(s)) or pd.isna(s): | |
| return "" | |
| except Exception: | |
| pass | |
| s = str(s).strip().lower() | |
| s = re.sub(r"[^a-z0-9\s\-\/]", " ", s) | |
| s = re.sub(r"\s+", " ", s).strip() | |
| return s | |
| def _safe_str(v: Any) -> str: | |
| if v is None or (isinstance(v, float) and pd.isna(v)) or pd.isna(v): | |
| return "" | |
| return str(v).strip() | |
| def _is_5g(modem_type: Any) -> bool: | |
| s = norm_text(modem_type) | |
| return ("5g" in s) or ("nr" in s) | |
| def _json_load_safe(s: str) -> Dict[str, Any]: | |
| try: | |
| return json.loads(s) | |
| except Exception: | |
| return {} | |
| def gpt_json(system: str, payload: Dict[str, Any], max_tokens: int = 700) -> Dict[str, Any]: | |
| if client is None: | |
| return {} | |
| resp = client.responses.create( | |
| model=OPENAI_MODEL, | |
| reasoning=OPENAI_REASONING, | |
| input=[ | |
| {"role": "system", "content": system}, | |
| {"role": "user", "content": json.dumps(payload)}, | |
| ], | |
| max_output_tokens=max_tokens, | |
| ) | |
| return _json_load_safe(getattr(resp, "output_text", "") or "") | |
| # ============================ | |
| # Load data files (must exist in repo) | |
| # ============================ | |
| EOS_PATH = "routers_eos_eol_by_sku.csv" | |
| DEC_PATH = "dec2025routers.csv" | |
| PARSEC_PDF = "ParsecCatalog.pdf" | |
| if not os.path.exists(EOS_PATH): | |
| raise FileNotFoundError(f"Missing {EOS_PATH} in repo.") | |
| if not os.path.exists(DEC_PATH): | |
| raise FileNotFoundError(f"Missing {DEC_PATH} in repo.") | |
| if not os.path.exists(PARSEC_PDF): | |
| raise FileNotFoundError(f"Missing {PARSEC_PDF} in repo.") | |
| df_eos = pd.read_csv(EOS_PATH).copy() | |
| df_dec = pd.read_csv(DEC_PATH).copy() | |
| # Region filter: keep USA / North America / blank / not specified | |
| def _region_ok(x: Any) -> bool: | |
| s = str(x or "").strip().lower() | |
| if not s: | |
| return True | |
| if "not specified" in s: | |
| return True | |
| if "north america" in s: | |
| return True | |
| if re.search(r"\busa\b", s): | |
| return True | |
| if re.search(r"\bunited\s+states\b", s): | |
| return True | |
| if re.search(r"\bu\.?s\.?\b", s): | |
| return True | |
| return False | |
| if "region" in df_eos.columns: | |
| df_eos = df_eos[df_eos["region"].apply(_region_ok)].reset_index(drop=True) | |
| # Optional "Device Type" | |
| device_type_col = None | |
| for c in df_eos.columns: | |
| if norm_text(c) == "device type": | |
| device_type_col = c | |
| break | |
| # Maker mapping (expanded — adds Teltonika) | |
| CANON_MAKER = { | |
| "CRADLEPOINT": {"cradlepoint", "ericsson", "ericsson enterprise wireless"}, | |
| "SIERRA": {"sierra", "sierra wireless", "semtech", "airlink"}, | |
| "FEENEY": {"feeney", "feeney wireless", "inseego"}, | |
| "DIGI": {"digi", "accelerated", "accelerated concepts"}, | |
| "CISCO_MERAKI": {"meraki", "cisco meraki"}, | |
| "CISCO": {"cisco"}, | |
| "TELTONIKA": {"teltonika"}, | |
| } | |
| DISPLAY_MAKER = { | |
| "CRADLEPOINT": "Cradlepoint", | |
| "SIERRA": "Sierra Wireless", | |
| "FEENEY": "Feeney Wireless", | |
| "DIGI": "Digi", | |
| "CISCO_MERAKI": "Cisco Meraki", | |
| "CISCO": "Cisco", | |
| "TELTONIKA": "Teltonika", | |
| "UNKNOWN": "Unknown", | |
| } | |
| def canon_maker_from_text(s: Any) -> str: | |
| t = norm_text(s) | |
| for canon, terms in CANON_MAKER.items(): | |
| for term in terms: | |
| if term in t: | |
| return canon | |
| return "UNKNOWN" | |
| df_eos["_canon_make"] = df_eos["manufacturer"].apply(canon_maker_from_text) if "manufacturer" in df_eos.columns else "UNKNOWN" | |
| df_eos["_norm_sku"] = df_eos["sku"].apply(norm_text) if "sku" in df_eos.columns else "" | |
| df_eos["_norm_desc"] = df_eos["description"].apply(norm_text) if "description" in df_eos.columns else "" | |
| df_eos["_norm_notes"] = df_eos["notes"].apply(norm_text) if "notes" in df_eos.columns else "" | |
| df_dec["_canon_make"] = df_dec["Make"].apply(canon_maker_from_text) if "Make" in df_dec.columns else "UNKNOWN" | |
| df_dec["_norm_model"] = df_dec["Model"].apply(norm_text) if "Model" in df_dec.columns else "" | |
| df_dec["_is5g"] = df_dec["Modem Type"].apply(_is_5g) if "Modem Type" in df_dec.columns else False | |
| # ============================ | |
| # Date helpers | |
| # ============================ | |
| class ParsedDate: | |
| raw: str | |
| kind: str | |
| value: Optional[date] | |
| def parse_date_field(x: Any) -> ParsedDate: | |
| raw = str(x or "").strip() | |
| if not raw: | |
| return ParsedDate(raw="", kind="missing", value=None) | |
| if re.fullmatch(r"\d{4}", raw): | |
| y = int(raw) | |
| if y == TODAY.year: | |
| return ParsedDate(raw=raw, kind="year", value=date(y, 1, 1)) | |
| if y < TODAY.year: | |
| return ParsedDate(raw=raw, kind="year", value=date(y, 1, 1)) | |
| return ParsedDate(raw=raw, kind="year", value=date(y, 12, 31)) | |
| if re.fullmatch(r"\d{4}-\d{2}", raw): | |
| try: | |
| y, m = raw.split("-") | |
| return ParsedDate(raw=raw, kind="year_month", value=date(int(y), int(m), 1)) | |
| except Exception: | |
| return ParsedDate(raw=raw, kind="bad", value=None) | |
| if re.fullmatch(r"\d{4}-\d{2}-\d{2}", raw): | |
| try: | |
| dt = datetime.strptime(raw, "%Y-%m-%d").date() | |
| return ParsedDate(raw=raw, kind="full", value=dt) | |
| except Exception: | |
| return ParsedDate(raw=raw, kind="bad", value=None) | |
| return ParsedDate(raw=raw, kind="bad", value=None) | |
| def display_date(parsed: ParsedDate) -> str: | |
| if parsed.kind == "missing": | |
| return "Not listed" | |
| if parsed.kind == "bad": | |
| return parsed.raw or "Not listed" | |
| return parsed.raw | |
| def status_from_eos_eol(eos: ParsedDate, eol: ParsedDate) -> str: | |
| if eos.value is None and eol.value is None: | |
| return "Unknown" | |
| if eol.value is not None and eol.value <= TODAY: | |
| return "End of Life" | |
| if eos.value is not None and eos.value <= TODAY: | |
| return "End of Sale" | |
| return "Active" | |
| def row_to_dates_and_status(life_row: pd.Series) -> Tuple[str, str, str]: | |
| eos = parse_date_field(life_row.get("end_of_sale")) | |
| eol = parse_date_field(life_row.get("end_of_life")) | |
| return display_date(eos), display_date(eol), status_from_eos_eol(eos, eol) | |
| # ============================ | |
| # Embeddings + Parsec index | |
| # ============================ | |
| embedder = SentenceTransformer(EMBED_MODEL_NAME) | |
| def extract_pdf_text_pages(path: str) -> List[str]: | |
| doc = fitz.open(path) | |
| return [doc[i].get_text("text") for i in range(len(doc))] | |
| def build_parsec_cards(pages: List[str]) -> List[str]: | |
| cards = [] | |
| for p in pages: | |
| for m in re.finditer(r"Standard\s+SKU:", p): | |
| start = max(0, m.start() - PARSEC_CONTEXT_BEFORE) | |
| end = min(len(p), m.start() + PARSEC_CONTEXT_AFTER) | |
| c = p[start:end].strip() | |
| if len(c) >= 200: | |
| cards.append(c) | |
| out, seen = [], set() | |
| for c in cards: | |
| h = hashlib.sha1(c.encode("utf-8")).hexdigest() | |
| if h not in seen: | |
| seen.add(h); out.append(c) | |
| return out | |
| parsec_cards = build_parsec_cards(extract_pdf_text_pages(PARSEC_PDF)) | |
| parsec_emb = embedder.encode(parsec_cards, batch_size=64, show_progress_bar=False, normalize_embeddings=True) | |
| parsec_emb = np.asarray(parsec_emb, dtype=np.float32) | |
| parsec_index = faiss.IndexFlatIP(parsec_emb.shape[1]) | |
| parsec_index.add(parsec_emb) | |
| # ============================ | |
| # Device resolution (exact SKU -> GPT A/B) | |
| # ============================ | |
| def _label_for_row(i: int) -> str: | |
| r = df_eos.iloc[i] | |
| return f"{r.get('sku','')} — {r.get('manufacturer','')} — {r.get('description','')}"[:220] | |
| EOS_LABELS = [_label_for_row(i) for i in range(len(df_eos))] | |
| EOS_CORPUS = [] | |
| for _, r in df_eos.iterrows(): | |
| EOS_CORPUS.append(" ".join([ | |
| r.get("_norm_sku",""), | |
| r.get("_canon_make",""), | |
| r.get("_norm_desc",""), | |
| r.get("_norm_notes",""), | |
| ])) | |
| def local_candidates(query: str, top_k: int = 6) -> List[Tuple[int,int,str]]: | |
| q = norm_text(query) | |
| hits = process.extract(q, EOS_CORPUS, scorer=fuzz.WRatio, limit=top_k) | |
| return [(int(idx), int(score), EOS_LABELS[int(idx)]) for _, score, idx in hits] | |
| def gpt_choose_device(user_text: str, candidates: List[Tuple[int,int,str]]) -> Dict[str, Any]: | |
| if client is None: | |
| return {} | |
| sys = "Pick which router the user meant. Never invent. Return strict JSON only." | |
| payload = { | |
| "user_input": user_text, | |
| "candidates": [{"row_idx": i, "score": s, "label": lbl} for (i,s,lbl) in candidates], | |
| "rules": [ | |
| "If one candidate is clearly correct, return mode='ok' with row_idx.", | |
| "If two are plausible, return mode='pick' with top 2 options." | |
| ], | |
| "output_schema": {"mode":"ok|pick","row_idx":"int","options":[{"row_idx":"int","label":"string"}]} | |
| } | |
| return gpt_json(sys, payload, max_tokens=300) | |
| def resolve_device(user_text: str) -> Dict[str, Any]: | |
| q = norm_text(user_text) | |
| exact_idxs = df_eos.index[df_eos["_norm_sku"] == q].tolist() | |
| if len(exact_idxs) == 1: | |
| return {"mode":"ok","row_idx": int(exact_idxs[0])} | |
| if len(exact_idxs) > 1: | |
| opts = [{"row_idx": int(i), "label": EOS_LABELS[int(i)]} for i in exact_idxs[:2]] | |
| return {"mode":"pick","options": opts} | |
| cands = local_candidates(user_text, top_k=6) | |
| if not cands: | |
| return {"mode":"not_found"} | |
| if cands[0][1] >= 95 and (len(cands) == 1 or (cands[0][1] - cands[1][1]) >= 8): | |
| return {"mode":"ok","row_idx": cands[0][0]} | |
| g = gpt_choose_device(user_text, cands) | |
| if g.get("mode") == "ok" and isinstance(g.get("row_idx"), int): | |
| return {"mode":"ok","row_idx": int(g["row_idx"])} | |
| if g.get("mode") == "pick": | |
| opts = g.get("options", []) or [] | |
| opts2 = [{"row_idx": int(o["row_idx"]), "label": str(o["label"])} for o in opts[:2] if "row_idx" in o] | |
| if opts2: | |
| return {"mode":"pick","options": opts2} | |
| # fallback | |
| if len(cands) > 1: | |
| return {"mode":"pick","options":[{"row_idx":cands[0][0],"label":cands[0][2]},{"row_idx":cands[1][0],"label":cands[1][2]}]} | |
| return {"mode":"pick","options":[{"row_idx":cands[0][0],"label":cands[0][2]}]} | |
| # ============================ | |
| # Replacements — lifecycle CSV is source of truth | |
| # Fix: always show 4G alternative if lifecycle suggests it (even if Active) | |
| # ============================ | |
| def _extract_model_token(text: str) -> str: | |
| s = _safe_str(text) | |
| if not s: | |
| return "" | |
| parts = [p.strip() for p in s.split("|") if p.strip()] | |
| candidates = parts[::-1] if parts else [s] | |
| for cand in candidates: | |
| # Teltonika family | |
| m = re.search(r"\bRUT[A-Z]?\d{2,4}\b", cand.upper()) | |
| if m: | |
| return m.group(0).upper() | |
| # Digi IX-series | |
| m = re.search(r"\bIX\d{2}\b", cand, flags=re.IGNORECASE) | |
| if m: | |
| return m.group(0).upper() | |
| # Cradlepoint R/E/S | |
| m = re.search(r"\b(R\d{3,4}|E\d{3,4}|S\d{3,4})\b", cand, flags=re.IGNORECASE) | |
| if m: | |
| return m.group(0).upper() | |
| # Generic model token | |
| m = re.search(r"\b[A-Z]{1,6}\d{2,4}[A-Z]?\b", cand.upper()) | |
| if m: | |
| return m.group(0).upper() | |
| return candidates[0][:60] | |
| def _device_is_4g(life_row: pd.Series) -> bool: | |
| t = norm_text(life_row.get("description","")) + " " + norm_text(life_row.get("notes","")) | |
| return (("lte" in t or "4g" in t) and ("5g" not in t and "nr" not in t)) | |
| def _candidate_5g_models_from_lifecycle(manufacturer: str) -> List[str]: | |
| # Pool within same manufacturer text (not just canon) to support Teltonika etc | |
| mfr = norm_text(manufacturer) | |
| pool = df_eos[df_eos["manufacturer"].astype(str).str.lower().eq(mfr)].copy() if "manufacturer" in df_eos.columns else df_eos.copy() | |
| vals = pool["advanced_5g_option"].tolist() if "advanced_5g_option" in pool.columns else [] | |
| out, seen = [], set() | |
| for v in vals: | |
| tok = _extract_model_token(v) | |
| if tok and tok.lower() != "nan" and tok not in seen: | |
| seen.add(tok); out.append(tok) | |
| return out | |
| def _candidate_4g_models_from_lifecycle(manufacturer: str) -> List[str]: | |
| mfr = norm_text(manufacturer) | |
| pool = df_eos[df_eos["manufacturer"].astype(str).str.lower().eq(mfr)].copy() if "manufacturer" in df_eos.columns else df_eos.copy() | |
| vals = pool["suggested_replacement"].tolist() if "suggested_replacement" in pool.columns else [] | |
| out, seen = [], set() | |
| for v in vals: | |
| tok = _extract_model_token(v) | |
| if tok and tok.lower() != "nan" and tok not in seen: | |
| seen.add(tok); out.append(tok) | |
| return out | |
| def _gpt_pick_from_candidates(old_row: pd.Series, candidates: List[str], need: str) -> str: | |
| if client is None or not candidates: | |
| return "" | |
| sys = "Pick the best replacement model. Choose only from candidates. Return strict JSON only." | |
| payload = { | |
| "old_device": { | |
| "sku": str(old_row.get("sku","")), | |
| "manufacturer": str(old_row.get("manufacturer","")), | |
| "description": str(old_row.get("description","")), | |
| "need": need, | |
| }, | |
| "candidates": candidates[:40], | |
| "output_schema": {"choice":"string"} | |
| } | |
| out = gpt_json(sys, payload, max_tokens=240) or {} | |
| choice = str(out.get("choice","") or "").strip() | |
| return choice if choice in candidates else "" | |
| def _fallback_5g_from_dec(canon_make: str) -> str: | |
| pool5 = df_dec[(df_dec["_canon_make"] == canon_make) & (df_dec["_is5g"] == True)] | |
| return str(pool5.iloc[0]["Model"]).strip() if not pool5.empty else "" | |
| def pick_replacements_lifecycle(life_row: pd.Series, status: str) -> Dict[str, Any]: | |
| canon = str(life_row.get("_canon_make","UNKNOWN")) | |
| manufacturer = str(life_row.get("manufacturer","") or "") | |
| is_4g_device = _device_is_4g(life_row) | |
| needs_4g_repl = is_4g_device and (status in {"End of Sale","End of Life"}) | |
| want_5g = is_4g_device or (status in {"End of Sale","End of Life"}) | |
| # 4G alternative: ALWAYS if suggested_replacement exists for 4G devices | |
| repl_4g = "Not applicable" | |
| if is_4g_device: | |
| repl_4g = _extract_model_token(_safe_str(life_row.get("suggested_replacement",""))) | |
| if not repl_4g: | |
| cand4 = _candidate_4g_models_from_lifecycle(manufacturer) | |
| repl_4g = _gpt_pick_from_candidates(life_row, cand4, "4G alternative") or (cand4[0] if cand4 else "") | |
| if not repl_4g: | |
| repl_4g = "Not applicable" | |
| # 5G replacement: ALWAYS when want_5g is true | |
| repl_5g = "Not applicable" | |
| if want_5g: | |
| repl_5g = _extract_model_token(_safe_str(life_row.get("advanced_5g_option",""))) | |
| if not repl_5g: | |
| cand5 = _candidate_5g_models_from_lifecycle(manufacturer) | |
| repl_5g = _gpt_pick_from_candidates(life_row, cand5, "5G replacement/upgrade") or (cand5[0] if cand5 else "") | |
| if not repl_5g: | |
| # last resort: dec catalog fallback | |
| repl_5g = _fallback_5g_from_dec(canon) | |
| if repl_5g.lower() == "nan": | |
| repl_5g = "" | |
| return { | |
| "repl_4g": repl_4g, | |
| "repl_5g": repl_5g, | |
| "why": "Lifecycle replacements (GPT fallback when missing).", | |
| "sources": ["lifecycle_csv"] + (["gpt"] if client else []) + (["dec_fallback"] if (want_5g and not repl_5g) else []), | |
| } | |
| # ============================ | |
| # Antennas (Parsec-only; family name extraction) | |
| # ============================ | |
| PARSEC_FAMILY_WORDS = { | |
| "chinook","labrador","boxer","bloodhound","husky","beagle","mastiff","collie", | |
| "shepherd","belgian","australian","terrier","pyrenees" | |
| } | |
| BAD_NAME_MARKERS = { | |
| "customization", "standard connectors", "connectors", "features", "benefits", | |
| "specifications", "mechanical", "electrical", "mounting", "accessories", | |
| "description:", "standard sku" | |
| } | |
| def _clean_line(s: str) -> str: | |
| s = re.sub(r"\s+", " ", str(s or "").strip()) | |
| if re.fullmatch(r"-[a-z0-9]+", s.lower()): | |
| return "" | |
| return s | |
| def _is_bad_name_line(line: str) -> bool: | |
| low = line.lower() | |
| if any(m in low for m in BAD_NAME_MARKERS): | |
| return True | |
| if re.search(r"\b-[a-z0-9]{1,4}\b", low) and len(low) <= 25: | |
| return True | |
| return False | |
| def _family_from_line(line: str) -> str: | |
| low = line.lower() | |
| for fam in PARSEC_FAMILY_WORDS: | |
| if fam in low: | |
| return fam.capitalize() | |
| return "" | |
| def _parsec_name_from_card(card_text: str) -> str: | |
| lines = [_clean_line(ln) for ln in str(card_text or "").splitlines()] | |
| lines = [ln for ln in lines if ln] | |
| for ln in lines: | |
| if _is_bad_name_line(ln): | |
| continue | |
| fam = _family_from_line(ln) | |
| if fam: | |
| return fam | |
| # fallback near SKU line | |
| sku_i = None | |
| for i, ln in enumerate(lines): | |
| if "standard sku" in ln.lower(): | |
| sku_i = i | |
| break | |
| if sku_i is not None: | |
| window = lines[max(0, sku_i - 12):sku_i] | |
| for ln in reversed(window): | |
| if _is_bad_name_line(ln): | |
| continue | |
| if 3 <= len(ln) <= 40 and re.search(r"[A-Za-z]", ln): | |
| return ln.split()[0].capitalize() | |
| return "Parsec antenna" | |
| def _parsec_part_from_card(t: str) -> str: | |
| m = re.search(r"Standard\s+SKU:\s*([A-Z0-9]+)", t) | |
| return m.group(1).strip() if m else "" | |
| def _parsec_desc_from_card(t: str) -> str: | |
| m = re.search(r"Description:\s*(.+?)(?:\n|$)", t, flags=re.IGNORECASE) | |
| return re.sub(r"\s+"," ",m.group(1).strip())[:220] if m else "" | |
| def parsec_retrieve(query: str, top_k: int = 10) -> List[Dict[str, Any]]: | |
| qv = embedder.encode([query], normalize_embeddings=True) | |
| qv = np.asarray(qv, dtype=np.float32) | |
| scores, ids = parsec_index.search(qv, top_k) | |
| out = [] | |
| for sc, i in zip(scores[0].tolist(), ids[0].tolist()): | |
| if 0 <= int(i) < len(parsec_cards): | |
| card = parsec_cards[int(i)] | |
| out.append({ | |
| "score": float(sc), | |
| "name": _parsec_name_from_card(card), | |
| "part_number": _parsec_part_from_card(card), | |
| "description": _parsec_desc_from_card(card), | |
| }) | |
| return out | |
| def antenna_options_for(router_model: str, tech: str, mimo: str) -> Dict[str, Any]: | |
| q_stationary = f"{router_model} {tech} {mimo} omni stationary outdoor Parsec" | |
| q_vehicle = f"{router_model} {tech} {mimo} omni vehicle mobile Parsec" | |
| cand_stationary = parsec_retrieve(q_stationary, top_k=10) | |
| cand_vehicle = parsec_retrieve(q_vehicle, top_k=10) | |
| # deterministic fallback if no GPT | |
| s = cand_stationary[0] if cand_stationary else {"name":"Parsec antenna","part_number":"","description":""} | |
| v = cand_vehicle[0] if cand_vehicle else {"name":"Parsec antenna","part_number":"","description":""} | |
| s.update({"mimo": mimo, "why": "Stationary omni best match."}) | |
| v.update({"mimo": mimo, "why": "Vehicle omni best match."}) | |
| return {"stationary_omni": s, "vehicle_omni": v, "sources":["parsec_rag"]} | |
| # ============================ | |
| # Feature table + GPT fill for missing fields | |
| # ============================ | |
| FEATURE_COLS = ["Name","Modem technology","WiFi","Ports","Antennas","Ruggedness","Use case"] | |
| def dec_features_by_model(model: str, canon_make: str) -> Dict[str, str]: | |
| if not model or model in {"Not applicable","Not listed"}: | |
| return {k:"Not listed" for k in FEATURE_COLS} | |
| pool = df_dec[df_dec["_canon_make"] == canon_make].copy() | |
| if pool.empty: | |
| return {k:"Not listed" for k in FEATURE_COLS} | |
| hit = process.extractOne(norm_text(model), pool["_norm_model"].tolist(), scorer=fuzz.WRatio) | |
| if not hit or hit[1] < MATCH_OK: | |
| return {k:"Not listed" for k in FEATURE_COLS} | |
| r = pool.iloc[int(hit[2])] | |
| ports = f"WAN: {r.get('WAN ports and speed','')} | LAN: {r.get('LAN ports and speed','')}" | |
| return { | |
| "Name": str(r.get("Model","")), | |
| "Modem technology": str(r.get("Modem Type","")), | |
| "WiFi": str(r.get("WiFi type","")), | |
| "Ports": ports, | |
| "Antennas": str(r.get("Antennas (internal/external/both)","")), | |
| "Ruggedness": str(r.get("Ruggedization","")), | |
| "Use case": str(r.get("Primary use case","")), | |
| } | |
| def gpt_fill_features(device_label: str, feats: Dict[str,str], context: str) -> Dict[str,str]: | |
| missing = [k for k,v in feats.items() if (not v) or v.strip().lower() in {"not listed","nan"}] | |
| if client is None or not missing: | |
| return feats | |
| sys = "Fill missing router feature fields. Return strict JSON only." | |
| payload = { | |
| "device": device_label, | |
| "known": feats, | |
| "context": context[:2000], | |
| "fill_only": missing, | |
| "rules": ["Fill only requested fields. Best guess if needed. Return JSON only."], | |
| "output_schema": {k:"string" for k in missing} | |
| } | |
| out = gpt_json(sys, payload, max_tokens=350) or {} | |
| for k in missing: | |
| v = str(out.get(k,"") or "").strip() | |
| if v: | |
| feats[k] = v | |
| return feats | |
| def current_features_guess(life_row: pd.Series) -> Dict[str,str]: | |
| sku = str(life_row.get("sku","") or "").strip() | |
| desc = str(life_row.get("description","") or "").strip() | |
| notes = str(life_row.get("notes","") or "").strip() | |
| base = { | |
| "Name": sku, | |
| "Modem technology": "4G" if _device_is_4g(life_row) else ("5G" if ("5g" in (desc+notes).lower() or "nr" in (desc+notes).lower()) else "Not listed"), | |
| "WiFi": "Not listed", | |
| "Ports": "Not listed", | |
| "Antennas": "Not listed", | |
| "Ruggedness": "Not listed", | |
| "Use case": "Not listed", | |
| } | |
| return gpt_fill_features("Current device", base, f"{desc}\n{notes}") | |
| def build_features_table(cur: Dict[str,str], r4: Dict[str,str], r5: Dict[str,str]) -> str: | |
| cols = ["Device", "Modem technology", "WiFi", "Ports", "Antennas", "Ruggedness", "Use case"] | |
| header = "| " + " | ".join(cols) + " |" | |
| sep = "| " + " | ".join(["---"]*len(cols)) + " |" | |
| def row(name: str, feats: Dict[str,str]) -> str: | |
| return "| " + " | ".join([ | |
| name, | |
| feats.get("Modem technology","Not listed"), | |
| feats.get("WiFi","Not listed"), | |
| feats.get("Ports","Not listed"), | |
| feats.get("Antennas","Not listed"), | |
| feats.get("Ruggedness","Not listed"), | |
| feats.get("Use case","Not listed"), | |
| ]) + " |" | |
| return "\n".join([header, sep, row("Current", cur), row("4G alternative", r4), row("5G replacement", r5)]) | |
| # ============================ | |
| # Output + Gradio | |
| # ============================ | |
| def assemble_output(life_row: pd.Series, status: str, eos: str, eol: str, repl: Dict[str,Any], ant: Dict[str,Any]) -> str: | |
| canon_make = str(life_row.get("_canon_make","UNKNOWN")) | |
| current_name = f"{life_row.get('sku','')} — {life_row.get('description','')}".strip(" —") | |
| st = ant.get("stationary_omni", {}) | |
| vh = ant.get("vehicle_omni", {}) | |
| cur_feats = current_features_guess(life_row) | |
| r4_feats = dec_features_by_model(repl.get("repl_4g",""), canon_make) | |
| r5_feats = dec_features_by_model(repl.get("repl_5g",""), canon_make) | |
| # If dec doesn't know the model, ask GPT to fill missing cells (best guess) | |
| if client is not None: | |
| r4_feats = gpt_fill_features("4G alternative", r4_feats, f"Model: {repl.get('repl_4g','')}\nMake: {canon_make}") | |
| r5_feats = gpt_fill_features("5G replacement", r5_feats, f"Model: {repl.get('repl_5g','')}\nMake: {canon_make}") | |
| table_md = build_features_table(cur_feats, r4_feats, r5_feats) | |
| lines = [] | |
| lines.append(f"1. Current device: **{current_name}**") | |
| lines.append(f"2. Status: **{status}**") | |
| lines.append(f"3. End of Sale date: **{eos}**") | |
| lines.append(f"4. End of Life date: **{eol}**") | |
| lines.append(f"5. 4G alternative (lifecycle): **{repl.get('repl_4g','Not applicable')}**") | |
| lines.append(f"6. 5G replacement (lifecycle): **{repl.get('repl_5g','Not listed')}**") | |
| lines.append("7. Antenna options (Parsec-only):") | |
| lines.append(f" - Stationary (Omni): **{st.get('name','')}** (Part #: {st.get('part_number','')}) — {st.get('description','')} — MIMO: {st.get('mimo','')} — {st.get('why','')}") | |
| lines.append(f" - Vehicle (Omni): **{vh.get('name','')}** (Part #: {vh.get('part_number','')}) — {vh.get('description','')} — MIMO: {vh.get('mimo','')} — {vh.get('why','')}") | |
| lines.append("8. Recommended features table:") | |
| lines.append(table_md) | |
| lines.append("\nSources (debug):") | |
| for s in repl.get("sources", []) if isinstance(repl.get("sources"), list) else []: | |
| lines.append(f"- {s}") | |
| lines.append("- ParsecCatalog.pdf (local RAG)") | |
| lines.append("- routers_eos_eol_by_sku.csv (replacements)") | |
| lines.append("- dec2025routers.csv (features)") | |
| return "\n".join(lines) | |
| def run_lookup(user_text: str, st: Dict[str,Any]): | |
| user_text = str(user_text or "").strip() | |
| if not user_text: | |
| return "Enter a router SKU/model.", gr.update(visible=False), gr.update(visible=False), {} | |
| res = resolve_device(user_text) | |
| if res.get("mode") == "pick": | |
| opts = res.get("options", []) | |
| choices = [o["label"] for o in opts] | |
| st2 = {"mode":"pick","options": opts} | |
| return "Did you mean A or B? Pick one, then click Use selection.", gr.update(choices=choices, value=None, visible=True), gr.update(visible=True), st2 | |
| if res.get("mode") != "ok": | |
| return "Not found.", gr.update(visible=False), gr.update(visible=False), {} | |
| life_row = df_eos.iloc[int(res["row_idx"])] | |
| eos, eol, status = row_to_dates_and_status(life_row) | |
| repl = pick_replacements_lifecycle(life_row, status) | |
| tech = "5G" if repl.get("repl_5g") and repl.get("repl_5g") not in {"Not applicable","Not listed"} else ("4G" if _device_is_4g(life_row) else "Unknown") | |
| mimo_guess = "4x4" if tech == "5G" else "2x2" | |
| ant = antenna_options_for(router_model=repl.get("repl_5g") or str(life_row.get("sku","")), tech=tech, mimo=mimo_guess) | |
| return assemble_output(life_row, status, eos, eol, repl, ant), gr.update(visible=False), gr.update(visible=False), {} | |
| def use_selection(selected_label: str, st: Dict[str,Any]): | |
| if not st or st.get("mode") != "pick": | |
| return "Run a search first.", gr.update(visible=False), gr.update(visible=False), {} | |
| if not selected_label: | |
| return "Pick A or B first.", gr.update(visible=True), gr.update(visible=True), st | |
| chosen_row = None | |
| for o in st.get("options", []): | |
| if o.get("label") == selected_label: | |
| chosen_row = int(o["row_idx"]) | |
| break | |
| if chosen_row is None: | |
| return "Pick a valid option.", gr.update(visible=True), gr.update(visible=True), st | |
| life_row = df_eos.iloc[int(chosen_row)] | |
| eos, eol, status = row_to_dates_and_status(life_row) | |
| repl = pick_replacements_lifecycle(life_row, status) | |
| tech = "5G" if repl.get("repl_5g") and repl.get("repl_5g") not in {"Not applicable","Not listed"} else ("4G" if _device_is_4g(life_row) else "Unknown") | |
| mimo_guess = "4x4" if tech == "5G" else "2x2" | |
| ant = antenna_options_for(router_model=repl.get("repl_5g") or str(life_row.get("sku","")), tech=tech, mimo=mimo_guess) | |
| return assemble_output(life_row, status, eos, eol, repl, ant), gr.update(visible=False), gr.update(visible=False), {} | |
| with gr.Blocks(title="Only-Routers") as demo: | |
| gr.Markdown("## Only-Routers\nEnter a router SKU/model. If ambiguous, you’ll get A/B choices.") | |
| user_text = gr.Textbox(label="Router SKU or model", placeholder="Examples: IBR650B, AER1600, ES450, WR21, RUT240", lines=1) | |
| st = gr.State({}) | |
| check_btn = gr.Button("Check", variant="primary") | |
| pick_dd = gr.Dropdown(label="Pick A or B", choices=[], visible=False) | |
| use_btn = gr.Button("Use selection", visible=False) | |
| output_md = gr.Markdown() | |
| check_btn.click(fn=run_lookup, inputs=[user_text, st], outputs=[output_md, pick_dd, use_btn, st]) | |
| use_btn.click(fn=use_selection, inputs=[pick_dd, st], outputs=[output_md, pick_dd, use_btn, st]) | |
| demo.launch() | |