Spaces:
Sleeping
Sleeping
| import os | |
| import re | |
| import json | |
| import math | |
| import hashlib | |
| import tempfile | |
| from dataclasses import dataclass | |
| from datetime import datetime, date | |
| from typing import Any, Dict, List, Optional, Tuple | |
| import numpy as np | |
| import pandas as pd | |
| import fitz # PyMuPDF | |
| import faiss | |
| from sentence_transformers import SentenceTransformer | |
| from rapidfuzz import fuzz, process | |
| import gradio as gr | |
| from openai import OpenAI | |
| # ============================ | |
| # Settings | |
| # ============================ | |
| TODAY = date(2026, 1, 18) | |
| OPENAI_MODEL = "gpt-5.2" | |
| OPENAI_REASONING = {"effort": "high"} | |
| MATCH_OK = 80 | |
| EMBED_MODEL_NAME = "sentence-transformers/all-MiniLM-L6-v2" | |
| PARSEC_CONTEXT_BEFORE = 900 | |
| PARSEC_CONTEXT_AFTER = 1600 | |
| # ============================ | |
| # OpenAI client (HF Space secret: OPENAI_API_KEY) | |
| # ============================ | |
| API_KEY = os.getenv("OPENAI_API_KEY", "").strip() | |
| client = OpenAI(api_key=API_KEY) if API_KEY else None | |
| # ---------------------------- | |
| # Gradio state helpers | |
| # Keep state as a JSON STRING to avoid schema issues on Hugging Face. | |
| # ---------------------------- | |
| def state_load(st_json: str) -> Dict[str, Any]: | |
| try: | |
| if not st_json: | |
| return {} | |
| return json.loads(st_json) if isinstance(st_json, str) else {} | |
| except Exception: | |
| return {} | |
| def state_dump(st: Dict[str, Any]) -> str: | |
| try: | |
| return json.dumps(st or {}, ensure_ascii=False) | |
| except Exception: | |
| return "{}" | |
| # ============================ | |
| # Helpers | |
| # ============================ | |
| def norm_text(s: Any) -> str: | |
| try: | |
| if s is None or (isinstance(s, float) and math.isnan(s)) or pd.isna(s): | |
| return "" | |
| except Exception: | |
| pass | |
| s = str(s).strip().lower() | |
| s = re.sub(r"[^a-z0-9\s\-\/]", " ", s) | |
| s = re.sub(r"\s+", " ", s).strip() | |
| return s | |
| def safe_str(v: Any) -> str: | |
| if v is None or (isinstance(v, float) and pd.isna(v)) or pd.isna(v): | |
| return "" | |
| return str(v).strip() | |
| def is_5g(modem_type: Any) -> bool: | |
| s = norm_text(modem_type) | |
| return ("5g" in s) or ("nr" in s) | |
| def json_load_safe(s: str) -> Dict[str, Any]: | |
| try: | |
| return json.loads(s) | |
| except Exception: | |
| return {} | |
| def gpt_json(system: str, payload: Dict[str, Any], max_tokens: int = 600) -> Dict[str, Any]: | |
| if client is None: | |
| return {} | |
| resp = client.responses.create( | |
| model=OPENAI_MODEL, | |
| reasoning=OPENAI_REASONING, | |
| input=[{"role":"system","content":system},{"role":"user","content":json.dumps(payload)}], | |
| max_output_tokens=max_tokens, | |
| ) | |
| return json_load_safe(getattr(resp, "output_text", "") or "") | |
| # ============================ | |
| # Load data | |
| # ============================ | |
| EOS_PATH = "routers_eos_eol_by_sku.csv" | |
| DEC_PATH = "dec2025routers.csv" | |
| PARSEC_PDF = "ParsecCatalog.pdf" | |
| if not os.path.exists(EOS_PATH): | |
| raise FileNotFoundError(f"Missing {EOS_PATH} in repo.") | |
| if not os.path.exists(DEC_PATH): | |
| raise FileNotFoundError(f"Missing {DEC_PATH} in repo.") | |
| if not os.path.exists(PARSEC_PDF): | |
| raise FileNotFoundError(f"Missing {PARSEC_PDF} in repo.") | |
| df_eos = pd.read_csv(EOS_PATH).copy() | |
| df_dec = pd.read_csv(DEC_PATH).copy() | |
| def region_ok(x: Any) -> bool: | |
| s = str(x or "").strip().lower() | |
| if not s: | |
| return True | |
| if "not specified" in s: | |
| return True | |
| if "north america" in s: | |
| return True | |
| if re.search(r"\busa\b", s): | |
| return True | |
| if re.search(r"\bunited\s+states\b", s): | |
| return True | |
| if re.search(r"\bu\.?s\.?\b", s): | |
| return True | |
| return False | |
| if "region" in df_eos.columns: | |
| df_eos = df_eos[df_eos["region"].apply(region_ok)].reset_index(drop=True) | |
| # Maker mapping (includes Teltonika) | |
| CANON_MAKER = { | |
| "CRADLEPOINT": {"cradlepoint", "ericsson", "ericsson enterprise wireless"}, | |
| "SIERRA": {"sierra", "sierra wireless", "semtech", "airlink"}, | |
| "FEENEY": {"feeney", "feeney wireless", "inseego"}, | |
| "DIGI": {"digi", "accelerated", "accelerated concepts"}, | |
| "CISCO_MERAKI": {"meraki", "cisco meraki"}, | |
| "CISCO": {"cisco"}, | |
| "TELTONIKA": {"teltonika"}, | |
| } | |
| def canon_maker_from_text(s: Any) -> str: | |
| t = norm_text(s) | |
| for canon, terms in CANON_MAKER.items(): | |
| for term in terms: | |
| if term in t: | |
| return canon | |
| return "UNKNOWN" | |
| df_eos["_canon_make"] = df_eos["manufacturer"].apply(canon_maker_from_text) if "manufacturer" in df_eos.columns else "UNKNOWN" | |
| df_eos["_norm_sku"] = df_eos["sku"].apply(norm_text) if "sku" in df_eos.columns else "" | |
| df_eos["_norm_desc"] = df_eos["description"].apply(norm_text) if "description" in df_eos.columns else "" | |
| df_eos["_norm_notes"] = df_eos["notes"].apply(norm_text) if "notes" in df_eos.columns else "" | |
| df_dec["_canon_make"] = df_dec["Make"].apply(canon_maker_from_text) if "Make" in df_dec.columns else "UNKNOWN" | |
| df_dec["_norm_model"] = df_dec["Model"].apply(norm_text) if "Model" in df_dec.columns else "" | |
| df_dec["_is5g"] = df_dec["Modem Type"].apply(is_5g) if "Modem Type" in df_dec.columns else False | |
| # ============================ | |
| # Date helpers | |
| # ============================ | |
| class ParsedDate: | |
| raw: str | |
| kind: str | |
| value: Optional[date] | |
| def parse_date_field(x: Any) -> ParsedDate: | |
| raw = str(x or "").strip() | |
| if not raw: | |
| return ParsedDate(raw="", kind="missing", value=None) | |
| if re.fullmatch(r"\d{4}", raw): | |
| y = int(raw) | |
| if y == TODAY.year: | |
| return ParsedDate(raw=raw, kind="year", value=date(y, 1, 1)) | |
| if y < TODAY.year: | |
| return ParsedDate(raw=raw, kind="year", value=date(y, 1, 1)) | |
| return ParsedDate(raw=raw, kind="year", value=date(y, 12, 31)) | |
| if re.fullmatch(r"\d{4}-\d{2}", raw): | |
| try: | |
| y, m = raw.split("-") | |
| return ParsedDate(raw=raw, kind="year_month", value=date(int(y), int(m), 1)) | |
| except Exception: | |
| return ParsedDate(raw=raw, kind="bad", value=None) | |
| if re.fullmatch(r"\d{4}-\d{2}-\d{2}", raw): | |
| try: | |
| dt = datetime.strptime(raw, "%Y-%m-%d").date() | |
| return ParsedDate(raw=raw, kind="full", value=dt) | |
| except Exception: | |
| return ParsedDate(raw=raw, kind="bad", value=None) | |
| return ParsedDate(raw=raw, kind="bad", value=None) | |
| def display_date(pd_: ParsedDate) -> str: | |
| if pd_.kind == "missing": | |
| return "Not listed" | |
| if pd_.kind == "bad": | |
| return pd_.raw or "Not listed" | |
| return pd_.raw | |
| def status_from_eos_eol(eos: ParsedDate, eol: ParsedDate) -> str: | |
| if eos.value is None and eol.value is None: | |
| return "Unknown" | |
| if eol.value is not None and eol.value <= TODAY: | |
| return "End of Life" | |
| if eos.value is not None and eos.value <= TODAY: | |
| return "End of Sale" | |
| return "Active" | |
| def row_to_dates_and_status(row: pd.Series) -> Tuple[str, str, str]: | |
| eos = parse_date_field(row.get("end_of_sale")) | |
| eol = parse_date_field(row.get("end_of_life")) | |
| return display_date(eos), display_date(eol), status_from_eos_eol(eos, eol) | |
| # ============================ | |
| # Embeddings + Parsec index | |
| # ============================ | |
| embedder = SentenceTransformer(EMBED_MODEL_NAME) | |
| def extract_pdf_text_pages(path: str) -> List[str]: | |
| doc = fitz.open(path) | |
| return [doc[i].get_text("text") for i in range(len(doc))] | |
| def build_parsec_cards(pages: List[str]) -> List[str]: | |
| cards = [] | |
| for p in pages: | |
| for m in re.finditer(r"Standard\s+SKU:", p): | |
| start = max(0, m.start() - PARSEC_CONTEXT_BEFORE) | |
| end = min(len(p), m.start() + PARSEC_CONTEXT_AFTER) | |
| c = p[start:end].strip() | |
| if len(c) >= 200: | |
| cards.append(c) | |
| out, seen = [], set() | |
| for c in cards: | |
| h = hashlib.sha1(c.encode("utf-8")).hexdigest() | |
| if h not in seen: | |
| seen.add(h); out.append(c) | |
| return out | |
| parsec_cards = build_parsec_cards(extract_pdf_text_pages(PARSEC_PDF)) | |
| parsec_emb = embedder.encode(parsec_cards, batch_size=64, show_progress_bar=False, normalize_embeddings=True) | |
| parsec_emb = np.asarray(parsec_emb, dtype=np.float32) | |
| parsec_index = faiss.IndexFlatIP(parsec_emb.shape[1]) | |
| parsec_index.add(parsec_emb) | |
| # ============================ | |
| # Device resolution | |
| # ============================ | |
| def label_for_row(i: int) -> str: | |
| r = df_eos.iloc[i] | |
| return f"{r.get('sku','')} — {r.get('manufacturer','')} — {r.get('description','')}"[:220] | |
| EOS_LABELS = [label_for_row(i) for i in range(len(df_eos))] | |
| EOS_CORPUS = [] | |
| for _, r in df_eos.iterrows(): | |
| EOS_CORPUS.append(" ".join([r.get("_norm_sku",""), r.get("_canon_make",""), r.get("_norm_desc",""), r.get("_norm_notes","")])) | |
| def local_candidates(query: str, top_k: int = 6) -> List[Tuple[int, int, str]]: | |
| q = norm_text(query) | |
| hits = process.extract(q, EOS_CORPUS, scorer=fuzz.WRatio, limit=top_k) | |
| return [(int(idx), int(score), EOS_LABELS[int(idx)]) for _, score, idx in hits] | |
| def gpt_choose_device(user_text: str, candidates: List[Tuple[int,int,str]]) -> Dict[str, Any]: | |
| if client is None: | |
| return {} | |
| sys = "Pick which router the user meant. Never invent. Return strict JSON only." | |
| payload = { | |
| "user_input": user_text, | |
| "candidates": [{"row_idx": i, "score": s, "label": lbl} for (i,s,lbl) in candidates], | |
| "rules": [ | |
| "If one is clearly correct, return mode='ok' with row_idx.", | |
| "If two are plausible, return mode='pick' with top 2 options." | |
| ], | |
| "output_schema": {"mode":"ok|pick","row_idx":"int","options":[{"row_idx":"int","label":"string"}]} | |
| } | |
| return gpt_json(sys, payload, max_tokens=280) | |
| def resolve_device(user_text: str) -> Dict[str, Any]: | |
| q = norm_text(user_text) | |
| exact = df_eos.index[df_eos["_norm_sku"] == q].tolist() | |
| if len(exact) == 1: | |
| return {"mode":"ok","row_idx": int(exact[0])} | |
| if len(exact) > 1: | |
| opts = [{"row_idx": int(i), "label": EOS_LABELS[int(i)]} for i in exact[:2]] | |
| return {"mode":"pick","options": opts} | |
| cands = local_candidates(user_text, top_k=6) | |
| if not cands: | |
| return {"mode":"not_found"} | |
| if cands[0][1] >= 95 and (len(cands) == 1 or (cands[0][1] - cands[1][1]) >= 8): | |
| return {"mode":"ok","row_idx": cands[0][0]} | |
| g = gpt_choose_device(user_text, cands) | |
| if g.get("mode") == "ok" and isinstance(g.get("row_idx"), int): | |
| return {"mode":"ok","row_idx": int(g["row_idx"])} | |
| if g.get("mode") == "pick": | |
| opts = g.get("options", []) or [] | |
| opts2 = [{"row_idx": int(o["row_idx"]), "label": str(o["label"])} for o in opts[:2] if "row_idx" in o] | |
| if opts2: | |
| return {"mode":"pick","options": opts2} | |
| if len(cands) > 1: | |
| return {"mode":"pick","options":[{"row_idx":cands[0][0],"label":cands[0][2]},{"row_idx":cands[1][0],"label":cands[1][2]}]} | |
| return {"mode":"pick","options":[{"row_idx":cands[0][0],"label":cands[0][2]}]} | |
| # ============================ | |
| # Replacements — lifecycle CSV source of truth | |
| # ============================ | |
| def extract_model_token(text: str) -> str: | |
| s = safe_str(text) | |
| if not s: | |
| return "" | |
| parts = [p.strip() for p in s.split("|") if p.strip()] | |
| candidates = parts[::-1] if parts else [s] | |
| for cand in candidates: | |
| m = re.search(r"\bRUT[A-Z]?\d{2,4}\b", cand.upper()) | |
| if m: | |
| return m.group(0).upper() | |
| m = re.search(r"\bIX\d{2}\b", cand, flags=re.IGNORECASE) | |
| if m: | |
| return m.group(0).upper() | |
| m = re.search(r"\b(R\d{3,4}|E\d{3,4}|S\d{3,4})\b", cand, flags=re.IGNORECASE) | |
| if m: | |
| return m.group(0).upper() | |
| m = re.search(r"\b[A-Z]{1,6}\d{2,4}[A-Z]?\b", cand.upper()) | |
| if m: | |
| return m.group(0).upper() | |
| return candidates[0][:60] | |
| def device_is_4g(row: pd.Series) -> bool: | |
| # Detect LTE/4G even when the description uses "Cat 4 / Cat6 / Cat 12" without saying "LTE" | |
| t = norm_text(row.get("description","")) + " " + norm_text(row.get("notes","")) | |
| # If it explicitly says 5G/NR, treat as not 4G-only | |
| if ("5g" in t) or ("nr" in t): | |
| return False | |
| # Classic signals | |
| if ("lte" in t) or ("4g" in t): | |
| return True | |
| # LTE category signals (Cat 1..20 are LTE categories; Cat M1/M2 are LTE-M) | |
| if re.search(r"\bcat\s*[-]?\s*(m1|m2)\b", t): | |
| return True | |
| m = re.search(r"\bcat\s*[-]?\s*(\d{1,2})\b", t) | |
| if m: | |
| try: | |
| cat = int(m.group(1)) | |
| if 0 < cat <= 20: | |
| return True | |
| except Exception: | |
| pass | |
| # If "cat" appears at all, it's almost always LTE-family | |
| if "cat" in t: | |
| return True | |
| return False | |
| def candidate_5g_models_from_lifecycle(manufacturer: str) -> List[str]: | |
| mfr = norm_text(manufacturer) | |
| pool = df_eos[df_eos["manufacturer"].astype(str).str.lower().eq(mfr)].copy() if "manufacturer" in df_eos.columns else df_eos.copy() | |
| vals = pool["advanced_5g_option"].tolist() if "advanced_5g_option" in pool.columns else [] | |
| out, seen = [], set() | |
| for v in vals: | |
| tok = extract_model_token(v) | |
| if tok and tok.lower() != "nan" and tok not in seen: | |
| seen.add(tok); out.append(tok) | |
| return out | |
| def candidate_4g_models_from_lifecycle(manufacturer: str) -> List[str]: | |
| mfr = norm_text(manufacturer) | |
| pool = df_eos[df_eos["manufacturer"].astype(str).str.lower().eq(mfr)].copy() if "manufacturer" in df_eos.columns else df_eos.copy() | |
| vals = pool["suggested_replacement"].tolist() if "suggested_replacement" in pool.columns else [] | |
| out, seen = [], set() | |
| for v in vals: | |
| tok = extract_model_token(v) | |
| if tok and tok.lower() != "nan" and tok not in seen: | |
| seen.add(tok); out.append(tok) | |
| return out | |
| def gpt_pick_from_candidates(old_row: pd.Series, candidates: List[str], need: str) -> str: | |
| if client is None or not candidates: | |
| return "" | |
| sys = "Pick the best replacement model. Choose only from candidates. Return strict JSON only." | |
| payload = { | |
| "old_device": { | |
| "sku": str(old_row.get("sku","")), | |
| "manufacturer": str(old_row.get("manufacturer","")), | |
| "description": str(old_row.get("description","")), | |
| "need": need, | |
| }, | |
| "candidates": candidates[:40], | |
| "output_schema": {"choice":"string"} | |
| } | |
| out = gpt_json(sys, payload, max_tokens=240) or {} | |
| choice = str(out.get("choice","") or "").strip() | |
| return choice if choice in candidates else "" | |
| def fallback_5g_from_dec(canon_make: str) -> str: | |
| pool5 = df_dec[(df_dec["_canon_make"] == canon_make) & (df_dec["_is5g"] == True)] | |
| return str(pool5.iloc[0]["Model"]).strip() if not pool5.empty else "" | |
| def pick_replacements_lifecycle(row: pd.Series, status: str, use_gpt: bool = True) -> Dict[str, Any]: | |
| canon = str(row.get("_canon_make","UNKNOWN")) | |
| manufacturer = str(row.get("manufacturer","") or "") | |
| sug_raw = safe_str(row.get("suggested_replacement","")) | |
| adv_raw = safe_str(row.get("advanced_5g_option","")) | |
| has_4g_alt = bool(sug_raw.strip()) | |
| has_5g_alt = bool(adv_raw.strip()) | |
| # Treat as 4G if the description indicates LTE OR lifecycle provides a 4G suggested replacement | |
| is_4g = device_is_4g(row) or has_4g_alt | |
| # Provide 5G option if the unit is 4G, EOS/EOL, or lifecycle explicitly provides advanced_5g_option | |
| want_5g = is_4g or (status in {"End of Sale","End of Life"}) or has_5g_alt | |
| # 4G alternative: show whenever lifecycle provides it (or device appears 4G) | |
| repl_4g = "Not applicable" | |
| if is_4g or has_4g_alt: | |
| repl_4g = extract_model_token(sug_raw) | |
| if not repl_4g: | |
| cand4 = candidate_4g_models_from_lifecycle(manufacturer) | |
| repl_4g = (gpt_pick_from_candidates(row, cand4, "4G alternative") if (use_gpt and client) else "") or (cand4[0] if cand4 else "") | |
| if not repl_4g: | |
| repl_4g = "Not applicable" | |
| # 5G replacement: prefer lifecycle advanced_5g_option whenever present | |
| repl_5g = "Not listed" | |
| if want_5g: | |
| repl_5g = extract_model_token(adv_raw) | |
| if not repl_5g: | |
| cand5 = candidate_5g_models_from_lifecycle(manufacturer) | |
| repl_5g = (gpt_pick_from_candidates(row, cand5, "5G replacement/upgrade") if (use_gpt and client) else "") or (cand5[0] if cand5 else "") | |
| if not repl_5g: | |
| repl_5g = fallback_5g_from_dec(canon) or "Not listed" | |
| if repl_5g.lower() == "nan": | |
| repl_5g = "Not listed" | |
| return {"repl_4g": repl_4g, "repl_5g": repl_5g, "sources": ["lifecycle_csv"] + (["gpt"] if (use_gpt and client) else [])} | |
| # ============================ | |
| # Antennas (Parsec-only) | |
| # ============================ | |
| PARSEC_FAMILY_WORDS = {"chinook","labrador","boxer","bloodhound","husky","beagle","mastiff","collie","shepherd","belgian","australian","terrier","pyrenees"} | |
| BAD_NAME_MARKERS = {"customization","standard connectors","connectors","features","benefits","specifications","mechanical","electrical","mounting","accessories","description:","standard sku"} | |
| def clean_line(s: str) -> str: | |
| s = re.sub(r"\s+", " ", str(s or "").strip()) | |
| if re.fullmatch(r"-[a-z0-9]+", s.lower()): | |
| return "" | |
| return s | |
| def is_bad_name_line(line: str) -> bool: | |
| low = line.lower() | |
| if any(m in low for m in BAD_NAME_MARKERS): | |
| return True | |
| if re.search(r"\b-[a-z0-9]{1,4}\b", low) and len(low) <= 25: | |
| return True | |
| return False | |
| def family_from_line(line: str) -> str: | |
| low = line.lower() | |
| for fam in PARSEC_FAMILY_WORDS: | |
| if fam in low: | |
| return fam.capitalize() | |
| return "" | |
| def parsec_connectors_from_card(t: str) -> str: | |
| m = re.search(r"Standard\s+Connectors:\s*(.+)", t, flags=re.IGNORECASE) | |
| if m: | |
| return re.sub(r"\s+", " ", m.group(1).strip())[:80] | |
| return "" | |
| def parsec_mounts_from_card(t: str) -> List[str]: | |
| mounts = [] | |
| for m in re.finditer(r"Mount:\s*(.+)", t, flags=re.IGNORECASE): | |
| val = re.sub(r"\s+", " ", m.group(1).strip()) | |
| parts = [p.strip().lower() for p in val.split(",") if p.strip()] | |
| mounts.extend(parts) | |
| out = [] | |
| seen = set() | |
| for x in mounts: | |
| if x not in seen: | |
| seen.add(x); out.append(x) | |
| return out | |
| def parsec_name_from_card(card_text: str) -> str: | |
| lines = [clean_line(ln) for ln in str(card_text or "").splitlines()] | |
| lines = [ln for ln in lines if ln] | |
| for ln in lines: | |
| if is_bad_name_line(ln): | |
| continue | |
| fam = family_from_line(ln) | |
| if fam: | |
| return fam | |
| sku_i = None | |
| for i, ln in enumerate(lines): | |
| if "standard sku" in ln.lower(): | |
| sku_i = i | |
| break | |
| if sku_i is not None: | |
| window = lines[max(0, sku_i - 12):sku_i] | |
| for ln in reversed(window): | |
| if is_bad_name_line(ln): | |
| continue | |
| if 3 <= len(ln) <= 40 and re.search(r"[A-Za-z]", ln): | |
| return ln.split()[0].capitalize() | |
| return "Parsec antenna" | |
| def parsec_part_from_card(t: str) -> str: | |
| m = re.search(r"Standard\s+SKU:\s*([A-Z0-9]+)", t) | |
| return m.group(1).strip() if m else "" | |
| def parsec_desc_from_card(t: str) -> str: | |
| m = re.search(r"Description:\s*(.+?)(?:\n|$)", t, flags=re.IGNORECASE) | |
| return re.sub(r"\s+"," ",m.group(1).strip())[:220] if m else "" | |
| def parsec_retrieve(query: str, top_k: int = 12) -> List[Dict[str, Any]]: | |
| qv = embedder.encode([query], normalize_embeddings=True) | |
| qv = np.asarray(qv, dtype=np.float32) | |
| scores, ids = parsec_index.search(qv, top_k) | |
| out: List[Dict[str, Any]] = [] | |
| for sc, i in zip(scores[0].tolist(), ids[0].tolist()): | |
| if 0 <= int(i) < len(parsec_cards): | |
| card = parsec_cards[int(i)] | |
| out.append({ | |
| "score": float(sc), | |
| "name": parsec_name_from_card(card), | |
| "part_number": parsec_part_from_card(card), | |
| "description": parsec_desc_from_card(card), | |
| "connectors": parsec_connectors_from_card(card), | |
| "mounts": parsec_mounts_from_card(card), | |
| "_card": card.lower(), | |
| }) | |
| return out | |
| def choose_best_parsec(cands: List[Dict[str, Any]], mode: str) -> Dict[str, Any]: | |
| best = None | |
| best_score = -1e9 | |
| for c in cands: | |
| card = c.get("_card","") | |
| mounts = c.get("mounts", []) or [] | |
| score = float(c.get("score", 0.0)) | |
| if "omni" in card: | |
| score += 0.6 | |
| if "directional" in card: | |
| score -= 1.5 | |
| if mode == "vehicle": | |
| if any("magnetic" in m for m in mounts): | |
| score += 3.0 | |
| if any("through" in m for m in mounts): | |
| score += 2.0 | |
| if any("wall" in m for m in mounts) or any("pole" in m for m in mounts): | |
| score -= 1.2 | |
| if "app: fixed" in card and "mobile" not in card: | |
| score -= 2.0 | |
| if mode == "stationary": | |
| if any("wall" in m for m in mounts): | |
| score += 2.0 | |
| if any("pole" in m for m in mounts): | |
| score += 1.8 | |
| if score > best_score: | |
| best_score = score | |
| best = c | |
| if not best: | |
| return {"name":"Parsec antenna","part_number":"","description":"","connectors":"","mounts":[]} | |
| best = dict(best) | |
| best.pop("_card", None) | |
| return best | |
| def infer_mimo_for_5g(model: str, canon_make: str) -> str: | |
| """Best-effort MIMO guess for antenna selection (2x2 vs 4x4).""" | |
| # If model is unknown, default to 2x2 (safer ordering) | |
| if not model or model in {"Not applicable", "Not listed"}: | |
| return "2x2" | |
| # If the model name hints 5G, lean 4x4 | |
| if "5g" in model.lower() or model.upper().startswith(("R", "E", "S", "IX", "RUTM")): | |
| default = "4x4" | |
| else: | |
| default = "2x2" | |
| # Use dec2025routers.csv if we can match the model under the same maker family | |
| try: | |
| pool = df_dec[df_dec["_canon_make"] == canon_make].copy() | |
| if pool.empty: | |
| return default | |
| hit = process.extractOne(norm_text(model), pool["_norm_model"].tolist(), scorer=fuzz.WRatio) | |
| if not hit or hit[1] < MATCH_OK: | |
| return default | |
| row = pool.iloc[int(hit[2])] | |
| txt2 = (str(row.get("Antennas (internal/external/both)", "")) + " " + str(row.get("Modem Type", "")) + " " + str(row.get("Special notes",""))).lower() | |
| if "4x4" in txt2 or "4 x 4" in txt2 or "4x 4" in txt2: | |
| return "4x4" | |
| if "2x2" in txt2 or "2 x 2" in txt2: | |
| return "2x2" | |
| # If modem type includes 5G, lean 4x4 | |
| if "5g" in txt2 or "nr" in txt2: | |
| return "4x4" | |
| return default | |
| except Exception: | |
| return default | |
| def antenna_options_for(router_model: str, tech: str, mimo: str) -> Dict[str, Any]: | |
| q_stationary = f"{router_model} {tech} {mimo} omni stationary pole wall fixed site Parsec" | |
| q_vehicle = f"{router_model} {tech} {mimo} omni vehicle mobile magnetic through-bolt Parsec" | |
| cand_stationary = parsec_retrieve(q_stationary, top_k=12) | |
| cand_vehicle = parsec_retrieve(q_vehicle, top_k=12) | |
| s = choose_best_parsec(cand_stationary, mode="stationary") | |
| v = choose_best_parsec(cand_vehicle, mode="vehicle") | |
| s.update({"mimo": mimo, "why": "Stationary omni best match."}) | |
| v.update({"mimo": mimo, "why": "Vehicle omni best match."}) | |
| return {"stationary_omni": s, "vehicle_omni": v, "sources":["parsec_rag"]} | |
| # ============================ | |
| # Install-ready checklist | |
| # ============================ | |
| def install_ready_checklist(current_sku: str, repl: Dict[str,Any], ant: Dict[str,Any]) -> str: | |
| st = ant.get("stationary_omni", {}) | |
| vh = ant.get("vehicle_omni", {}) | |
| if client is not None: | |
| sys = "Create a short, install-ready checklist for a Verizon rep. Return markdown only." | |
| payload = {"current_device": current_sku, "replacements": repl, "antennas": {"stationary": st, "vehicle": vh}} | |
| resp = client.responses.create( | |
| model=OPENAI_MODEL, | |
| reasoning=OPENAI_REASONING, | |
| input=[{"role":"system","content":sys},{"role":"user","content":json.dumps(payload)}], | |
| max_output_tokens=520, | |
| ) | |
| return (getattr(resp, "output_text", "") or "").strip() | |
| return "\n".join([ | |
| "### Install-ready checklist", | |
| f"- Current device: {current_sku}", | |
| f"- 5G replacement: {repl.get('repl_5g','')}", | |
| f"- 4G alternative: {repl.get('repl_4g','Not applicable')}", | |
| f"- Stationary omni antenna: {st.get('name','')} (PN {st.get('part_number','')})", | |
| f"- Vehicle omni antenna: {vh.get('name','')} (PN {vh.get('part_number','')})", | |
| "- Next steps: confirm mounting + cable lengths + power; place order; schedule install.", | |
| ]) | |
| # ============================ | |
| # Batch mode (NO GPT) | |
| # ============================ | |
| def parse_batch_inputs(text_blob: str, file_obj: Any) -> List[str]: | |
| items: List[str] = [] | |
| if file_obj is not None: | |
| try: | |
| path = file_obj.name if hasattr(file_obj, "name") else str(file_obj) | |
| df = pd.read_csv(path) | |
| col = df.columns[0] | |
| items.extend([str(x).strip() for x in df[col].tolist() if str(x).strip()]) | |
| except Exception: | |
| pass | |
| if text_blob: | |
| for ln in str(text_blob).splitlines(): | |
| ln = ln.strip() | |
| if ln: | |
| items.append(ln) | |
| seen=set() | |
| out=[] | |
| for x in items: | |
| k=norm_text(x) | |
| if k and k not in seen: | |
| seen.add(k); out.append(x) | |
| return out | |
| def run_batch(text_blob: str, file_obj: Any, include_antennas: bool): | |
| inputs = parse_batch_inputs(text_blob, file_obj) | |
| if not inputs: | |
| return "", None, None, "" | |
| rows=[] | |
| for item in inputs: | |
| res = resolve_device(item) | |
| if res.get("mode") != "ok": | |
| rows.append({"Input": item, "Matched":"", "Status":"Needs review", "EOS":"", "EOL":"", "4G alternative":"", "5G replacement":"", "Notes":"Not found/ambiguous"}) | |
| continue | |
| life_row = df_eos.iloc[int(res["row_idx"])] | |
| eos, eol, status = row_to_dates_and_status(life_row) | |
| repl = pick_replacements_lifecycle(life_row, status, use_gpt=False) | |
| rows.append({ | |
| "Input": item, | |
| "Matched": str(life_row.get("sku","")), | |
| "Status": status, | |
| "EOS": eos, | |
| "EOL": eol, | |
| "4G alternative": repl.get("repl_4g",""), | |
| "5G replacement": repl.get("repl_5g",""), | |
| "Notes": "", | |
| }) | |
| out_df = pd.DataFrame(rows) | |
| counts = out_df["Status"].value_counts(dropna=False).to_dict() | |
| top_5g = out_df["5G replacement"].value_counts(dropna=False).head(5).to_dict() | |
| summary = f"Rows: {len(out_df)} | " + " | ".join([f"{k}: {v}" for k,v in counts.items()]) | |
| rollup = "Top 5G recommendations:\n" + "\n".join([f"- {k}: {v}" for k,v in top_5g.items() if str(k).strip()]) | |
| tmp = tempfile.NamedTemporaryFile(delete=False, suffix=".csv") | |
| out_df.to_csv(tmp.name, index=False) | |
| return summary, out_df, tmp.name, rollup | |
| # ============================ | |
| # Output | |
| # ============================ | |
| def assemble_output(life_row: pd.Series, status: str, eos: str, eol: str, repl: Dict[str,Any], ant: Dict[str,Any]) -> str: | |
| current_name = f"{life_row.get('sku','')} — {life_row.get('description','')}".strip(" —") | |
| st = ant.get("stationary_omni", {}) | |
| vh = ant.get("vehicle_omni", {}) | |
| lines = [] | |
| lines.append(f"1. Current device: **{current_name}**") | |
| lines.append(f"2. Status: **{status}**") | |
| lines.append(f"3. End of Sale date: **{eos}**") | |
| lines.append(f"4. End of Life date: **{eol}**") | |
| lines.append(f"5. 4G alternative (lifecycle): **{repl.get('repl_4g','Not applicable')}**") | |
| lines.append(f"6. 5G replacement (lifecycle): **{repl.get('repl_5g','Not listed')}**") | |
| lines.append("7. Antenna options (Parsec-only):") | |
| conn_s = f" | Conn: {st.get('connectors','')}" if st.get("connectors") else "" | |
| conn_v = f" | Conn: {vh.get('connectors','')}" if vh.get("connectors") else "" | |
| lines.append(f" - Stationary (Omni): **{st.get('name','')}** (Part #: {st.get('part_number','')}) — {st.get('description','')} — MIMO: {st.get('mimo','')}{conn_s}") | |
| lines.append(f" - Vehicle (Omni): **{vh.get('name','')}** (Part #: {vh.get('part_number','')}) — {vh.get('description','')} — MIMO: {vh.get('mimo','')}{conn_v}") | |
| lines.append("\nSources (debug):") | |
| for s in repl.get("sources", []) if isinstance(repl.get("sources"), list) else []: | |
| lines.append(f"- {s}") | |
| lines.append("- ParsecCatalog.pdf (local RAG)") | |
| lines.append("- routers_eos_eol_by_sku.csv (replacements)") | |
| return "\n".join(lines) | |
| # ============================ | |
| # Gradio callbacks | |
| # IMPORTANT: no dict state and ALL events have api_name=False (prevents api_info schema generation) | |
| # ============================ | |
| def run_lookup(user_text: str, st_json: str): | |
| user_text = str(user_text or "").strip() | |
| if not user_text: | |
| return "Enter a router SKU/model.", gr.update(visible=False), gr.update(visible=False), "{}", "" | |
| res = resolve_device(user_text) | |
| if res.get("mode") == "pick": | |
| opts = res.get("options", []) | |
| choices = [o["label"] for o in opts] | |
| st2 = {"mode":"pick","options": opts, "raw": user_text} | |
| return "Did you mean A or B? Pick one, then click Use selection.", gr.update(choices=choices, value=None, visible=True), gr.update(visible=True), state_dump(st2), "" | |
| if res.get("mode") != "ok": | |
| return "Not found.", gr.update(visible=False), gr.update(visible=False), "{}", "" | |
| life_row = df_eos.iloc[int(res["row_idx"])] | |
| eos, eol, status = row_to_dates_and_status(life_row) | |
| repl = pick_replacements_lifecycle(life_row, status, use_gpt=True) | |
| canon_make = str(life_row.get("_canon_make","UNKNOWN")) | |
| mimo = infer_mimo_for_5g(repl.get("repl_5g",""), canon_make) | |
| tech = "5G" if repl.get("repl_5g") and repl.get("repl_5g") != "Not listed" else ("4G" if device_is_4g(life_row) else "Unknown") | |
| ant = antenna_options_for(repl.get("repl_5g") or str(life_row.get("sku","")), tech, mimo) | |
| output = assemble_output(life_row, status, eos, eol, repl, ant) | |
| st_out = {"row_idx": int(res["row_idx"]), "repl": repl, "ant": ant, "raw": user_text} | |
| return output, gr.update(visible=False), gr.update(visible=False), state_dump(st_out), "" | |
| def use_selection(selected_label: str, st_json: str): | |
| st = state_load(st_json) | |
| if not st or st.get("mode") != "pick": | |
| return "Run a search first.", gr.update(visible=False), gr.update(visible=False), "{}", "" | |
| if not selected_label: | |
| return "Pick A or B first.", gr.update(visible=True), gr.update(visible=True), st_json, "" | |
| chosen_row = None | |
| for o in st.get("options", []): | |
| if o.get("label") == selected_label: | |
| chosen_row = int(o["row_idx"]) | |
| break | |
| if chosen_row is None: | |
| return "Pick a valid option.", gr.update(visible=True), gr.update(visible=True), st_json, "" | |
| life_row = df_eos.iloc[int(chosen_row)] | |
| eos, eol, status = row_to_dates_and_status(life_row) | |
| repl = pick_replacements_lifecycle(life_row, status, use_gpt=True) | |
| canon_make = str(life_row.get("_canon_make","UNKNOWN")) | |
| mimo = infer_mimo_for_5g(repl.get("repl_5g",""), canon_make) | |
| tech = "5G" if repl.get("repl_5g") and repl.get("repl_5g") != "Not listed" else ("4G" if device_is_4g(life_row) else "Unknown") | |
| ant = antenna_options_for(repl.get("repl_5g") or str(life_row.get("sku","")), tech, mimo) | |
| output = assemble_output(life_row, status, eos, eol, repl, ant) | |
| st_out = {"row_idx": int(chosen_row), "repl": repl, "ant": ant, "raw": st.get("raw","")} | |
| return output, gr.update(visible=False), gr.update(visible=False), state_dump(st_out), "" | |
| def make_install_ready(st_json: str): | |
| st = state_load(st_json) | |
| if not st or "row_idx" not in st: | |
| return "Run a lookup first." | |
| life_row = df_eos.iloc[int(st["row_idx"])] | |
| current_sku = str(life_row.get("sku","") or "") | |
| return install_ready_checklist(current_sku, st.get("repl", {}) or {}, st.get("ant", {}) or {}) | |
| # ============================ | |
| # UI | |
| # ============================ | |
| with gr.Blocks(title="Only-Routers") as demo: | |
| gr.Markdown("## Only-Routers\nSingle lookup + Batch upload for Verizon reps.") | |
| with gr.Tabs(): | |
| with gr.Tab("Single"): | |
| user_text = gr.Textbox(label="Router SKU or model", placeholder="Examples: IBR650B, AER1600, ES450, WR21, RUT240", lines=1) | |
| st = gr.State("{}") # JSON string | |
| check_btn = gr.Button("Check", variant="primary") | |
| pick_dd = gr.Dropdown(label="Pick A or B", choices=[], visible=False) | |
| use_btn = gr.Button("Use selection", visible=False) | |
| output_md = gr.Markdown() | |
| install_btn = gr.Button("Make install-ready checklist") | |
| install_md = gr.Markdown() | |
| check_btn.click(fn=run_lookup, inputs=[user_text, st], outputs=[output_md, pick_dd, use_btn, st, install_md], api_name=False) | |
| use_btn.click(fn=use_selection, inputs=[pick_dd, st], outputs=[output_md, pick_dd, use_btn, st, install_md], api_name=False) | |
| install_btn.click(fn=make_install_ready, inputs=[st], outputs=[install_md], api_name=False) | |
| with gr.Tab("Batch"): | |
| gr.Markdown("Paste one per line or upload a CSV (first column). Batch runs fast (no GPT).") | |
| batch_text = gr.Textbox(label="Paste devices (one per line)", lines=8, placeholder="WR21\nRUT240\nIBR650B") | |
| batch_file = gr.File(label="Upload CSV", file_types=[".csv"]) | |
| include_ant = gr.Checkbox(label="Include antenna picks (slower)", value=False) | |
| run_btn = gr.Button("Run batch", variant="primary") | |
| summary_md = gr.Markdown() | |
| rollup_md = gr.Markdown() | |
| table = gr.Dataframe(interactive=False, wrap=True) | |
| dl = gr.File(label="Download results CSV") | |
| run_btn.click(fn=run_batch, inputs=[batch_text, batch_file, include_ant], outputs=[summary_md, table, dl, rollup_md], api_name=False) | |
| # IMPORTANT: On Spaces, demo.launch() is correct; do NOT use share=True. | |
| demo.launch(show_api=False) | |