import os import re import json import math import time import hashlib import base64 import tempfile from dataclasses import dataclass from datetime import datetime, date from functools import lru_cache from typing import Any, Dict, List, Optional, Tuple import numpy as np import pandas as pd import fitz # PyMuPDF import faiss from sentence_transformers import SentenceTransformer from rapidfuzz import fuzz, process import gradio as gr from openai import OpenAI # ============================================================ # Only-Routers (Chat, production-lean) # - Fast model by default (no reasoning payload) # - One LLM call max per lookup (enrichment only, cached) # - No HTTP crawling during normal lookup (links are deterministic) # - Timing logs to HF console when DEBUG_TIMING=1 # ============================================================ # ---------------------------- # Settings # ---------------------------- TODAY = date(2026, 1, 18) # Fast default model (override via env) OPENAI_MODEL = os.getenv("OPENAI_MODEL", "gpt-5.2").strip() # Disable LLM at runtime: OPENAI_DISABLE=1 OPENAI_DISABLE = os.getenv("OPENAI_DISABLE", "0").strip() == "1" # Timing logs DEBUG_TIMING = os.getenv("DEBUG_TIMING", "0").strip() == "1" # Matching thresholds MATCH_OK = 82 MATCH_AUTOPICK = 95 MATCH_GAP = 8 # Embeddings EMBED_MODEL_NAME = os.getenv("EMBED_MODEL_NAME", "sentence-transformers/all-MiniLM-L6-v2").strip() # Parsec PDF slicing PARSEC_CONTEXT_BEFORE = 900 PARSEC_CONTEXT_AFTER = 1600 # ---------------------------- # OpenAI client # ---------------------------- API_KEY = os.getenv("OPENAI_API_KEY", "").strip() client = None if (not API_KEY or OPENAI_DISABLE) else OpenAI(api_key=API_KEY) # ---------------------------- # Timing helper # ---------------------------- def _tlog(label: str, t0: float) -> None: if DEBUG_TIMING: dt = time.perf_counter() - t0 print(f"[TIMER] {label}: {dt:.2f}s") # ---------------------------- # JSON-safe helpers # ---------------------------- def _json_load_safe(s: str) -> Dict[str, Any]: try: return json.loads(s) except Exception: return {} def _json_dump_safe(obj: Any) -> str: try: return json.dumps(obj, ensure_ascii=False) except Exception: return "{}" # ---------------------------- # Gradio state helpers (string JSON only) # ---------------------------- def state_load(st_json: str) -> Dict[str, Any]: try: return json.loads(st_json) if isinstance(st_json, str) and st_json else {} except Exception: return {} def state_dump(st: Dict[str, Any]) -> str: return _json_dump_safe(st or {}) # ---------------------------- # Normalization # ---------------------------- def norm_text(x: Any) -> str: try: if x is None or (isinstance(x, float) and math.isnan(x)) or pd.isna(x): return "" except Exception: pass s = str(x).strip().lower() s = re.sub(r"[^a-z0-9\s\-\/]", " ", s) s = re.sub(r"\s+", " ", s).strip() return s def safe_str(x: Any) -> str: if x is None or (isinstance(x, float) and pd.isna(x)) or pd.isna(x): return "" return str(x).strip() def is_5g_text(s: str) -> bool: t = norm_text(s) return ("5g" in t) or ("nr" in t) def is_4g_lte_family(row: pd.Series) -> bool: # Treat LTE categories as 4G t = norm_text(row.get("description", "")) + " " + norm_text(row.get("notes", "")) if "5g" in t or "nr" in t: return False if "lte" in t or "4g" in t: return True if re.search(r"\bcat\s*[-]?\s*(m1|m2)\b", t): return True if re.search(r"\bcat\s*[-]?\s*\d{1,2}\b", t): return True if "cat" in t: return True return False # ---------------------------- # Lifecycle CSV normalization # ---------------------------- def _normalize_lifecycle_df(df: pd.DataFrame) -> pd.DataFrame: df = df.copy() lower_cols = {c.lower(): c for c in df.columns} def _pick(*names): for n in names: if n.lower() in lower_cols: return lower_cols[n.lower()] return None col_map = {} sku_col = _pick("sku", "SKU") if sku_col: col_map[sku_col] = "sku" mfr_col = _pick("manufacturer", "Manufacturer") if mfr_col: col_map[mfr_col] = "manufacturer" dt_col = _pick("device type", "Device Type", "device_type") if dt_col: col_map[dt_col] = "device_type" eos_col = _pick("end_of_sale", "end of sale", "End of Sale", "eos") if eos_col: col_map[eos_col] = "end_of_sale" eol_col = _pick("end_of_life", "end of life", "End of Life", "eol") if eol_col: col_map[eol_col] = "end_of_life" sr_col = _pick("suggested_replacement", "Suggested Replacement") if sr_col: col_map[sr_col] = "suggested_replacement" a5_col = _pick("advanced_5g_option", "Advanced 5G Option", "advanced 5g option") if a5_col: col_map[a5_col] = "advanced_5g_option" df = df.rename(columns=col_map) for req in ["sku", "manufacturer", "device_type", "end_of_sale", "end_of_life", "suggested_replacement", "advanced_5g_option"]: if req not in df.columns: df[req] = "" # Compatibility fields used by matching/output if "description" not in df.columns: df["description"] = df["sku"].astype(str) if "notes" not in df.columns: df["notes"] = "" if "region" not in df.columns: df["region"] = "" return df # ---------------------------- # Maker mapping # ---------------------------- CANON_MAKER = { "CRADLEPOINT": {"cradlepoint", "ericsson", "ericsson enterprise wireless"}, "SIERRA": {"sierra", "sierra wireless", "semtech", "airlink"}, "FEENEY": {"feeney", "feeney wireless", "inseego"}, "DIGI": {"digi", "accelerated", "accelerated concepts"}, "CISCO_MERAKI": {"meraki", "cisco meraki"}, "CISCO": {"cisco"}, "TELTONIKA": {"teltonika"}, } def canon_maker_from_text(s: Any) -> str: t = norm_text(s) for canon, terms in CANON_MAKER.items(): for term in terms: if term in t: return canon return "UNKNOWN" # ---------------------------- # Date parsing # ---------------------------- @dataclass class ParsedDate: raw: str kind: str value: Optional[date] def parse_date_field(x: Any) -> ParsedDate: raw = safe_str(x) if not raw: return ParsedDate(raw="", kind="missing", value=None) # MM/DD/YY or M/D/YY if re.fullmatch(r"\d{1,2}/\d{1,2}/\d{2,4}", raw): try: parts = raw.split("/") m = int(parts[0]); d = int(parts[1]); y = int(parts[2]) if y < 100: y += 2000 dt = date(y, m, d) return ParsedDate(raw=f"{y:04d}-{m:02d}-{d:02d}", kind="full", value=dt) except Exception: return ParsedDate(raw=raw, kind="bad", value=None) # YYYY if re.fullmatch(r"\d{4}", raw): y = int(raw) if y == TODAY.year: return ParsedDate(raw=raw, kind="year", value=date(y, 1, 1)) if y < TODAY.year: return ParsedDate(raw=raw, kind="year", value=date(y, 1, 1)) return ParsedDate(raw=raw, kind="year", value=date(y, 12, 31)) # YYYY-MM if re.fullmatch(r"\d{4}-\d{2}", raw): try: y, m = raw.split("-") dt = date(int(y), int(m), 1) return ParsedDate(raw=raw, kind="year_month", value=dt) except Exception: return ParsedDate(raw=raw, kind="bad", value=None) # YYYY-MM-DD if re.fullmatch(r"\d{4}-\d{2}-\d{2}", raw): try: dt = datetime.strptime(raw, "%Y-%m-%d").date() return ParsedDate(raw=raw, kind="full", value=dt) except Exception: return ParsedDate(raw=raw, kind="bad", value=None) return ParsedDate(raw=raw, kind="bad", value=None) def display_date(pd_: ParsedDate) -> str: if pd_.kind == "missing": return "Not listed" if pd_.kind == "bad": return pd_.raw or "Not listed" return pd_.raw def status_from_eos_eol(eos: ParsedDate, eol: ParsedDate) -> str: if eos.value is None and eol.value is None: return "Unknown" if eol.value is not None and eol.value <= TODAY: return "End of Life" if eos.value is not None and eos.value <= TODAY: return "End of Sale" return "Active" def row_to_dates_and_status(row: pd.Series) -> Tuple[str, str, str]: eos = parse_date_field(row.get("end_of_sale")) eol = parse_date_field(row.get("end_of_life")) return display_date(eos), display_date(eol), status_from_eos_eol(eos, eol) # ---------------------------- # Files # ---------------------------- EOS_PATH = "routers_eos_eol_by_sku.csv" DEC_PATH = "dec2025routers.csv" PARSEC_PDF = "ParsecCatalog.pdf" if not os.path.exists(EOS_PATH): raise FileNotFoundError(f"Missing {EOS_PATH} in repo.") if not os.path.exists(DEC_PATH): raise FileNotFoundError(f"Missing {DEC_PATH} in repo.") if not os.path.exists(PARSEC_PDF): raise FileNotFoundError(f"Missing {PARSEC_PDF} in repo.") t0 = time.perf_counter() df_eos = pd.read_csv(EOS_PATH).copy() df_dec = pd.read_csv(DEC_PATH).copy() df_eos = _normalize_lifecycle_df(df_eos) # Canon columns df_eos["_canon_make"] = df_eos["manufacturer"].apply(canon_maker_from_text) df_eos["_norm_sku"] = df_eos["sku"].apply(norm_text) df_eos["_norm_desc"] = df_eos["description"].apply(norm_text) df_eos["_norm_notes"] = df_eos["notes"].apply(norm_text) df_dec["_canon_make"] = df_dec["Make"].apply(canon_maker_from_text) if "Make" in df_dec.columns else "UNKNOWN" df_dec["_norm_model"] = df_dec["Model"].apply(norm_text) if "Model" in df_dec.columns else "" df_dec["_is5g"] = df_dec["Modem Type"].apply(lambda x: is_5g_text(str(x))) if "Modem Type" in df_dec.columns else False _tlog("load csv", t0) # ---------------------------- # Build fuzzy corpus for device matching # ---------------------------- def _label_for_row(i: int) -> str: r = df_eos.iloc[i] return f"{r.get('sku','')} — {r.get('manufacturer','')} — {r.get('description','')}"[:220] EOS_LABELS = [_label_for_row(i) for i in range(len(df_eos))] EOS_CORPUS = [] for _, r in df_eos.iterrows(): EOS_CORPUS.append(" ".join([r.get("_norm_sku",""), r.get("_canon_make",""), r.get("_norm_desc",""), r.get("_norm_notes","")])) def resolve_device(term: str) -> Dict[str, Any]: q = norm_text(term) if not q: return {"mode": "not_found"} exact = df_eos.index[df_eos["_norm_sku"] == q].tolist() if len(exact) == 1: return {"mode":"ok","row_idx": int(exact[0])} hits = process.extract(q, EOS_CORPUS, scorer=fuzz.WRatio, limit=6) cands = [(int(idx), int(score), EOS_LABELS[int(idx)]) for _, score, idx in hits] if not cands: return {"mode":"not_found"} if cands[0][1] >= MATCH_AUTOPICK and (len(cands) == 1 or (cands[0][1] - cands[1][1]) >= MATCH_GAP): return {"mode":"ok","row_idx": cands[0][0]} opts = [{"row_idx": cands[0][0], "label": cands[0][2]}] if len(cands) > 1: opts.append({"row_idx": cands[1][0], "label": cands[1][2]}) return {"mode":"pick","options": opts} # ---------------------------- # Parsec RAG (FAISS) # ---------------------------- t0 = time.perf_counter() embedder = SentenceTransformer(EMBED_MODEL_NAME) def extract_pdf_text_pages(path: str) -> List[str]: doc = fitz.open(path) return [doc[i].get_text("text") for i in range(len(doc))] def build_parsec_cards(pages: List[str]) -> List[str]: cards = [] for p in pages: for m in re.finditer(r"Standard\s+SKU:", p): start = max(0, m.start() - PARSEC_CONTEXT_BEFORE) end = min(len(p), m.start() + PARSEC_CONTEXT_AFTER) c = p[start:end].strip() if len(c) >= 200: cards.append(c) out, seen = [], set() for c in cards: h = hashlib.sha1(c.encode("utf-8")).hexdigest() if h not in seen: seen.add(h); out.append(c) return out parsec_cards = build_parsec_cards(extract_pdf_text_pages(PARSEC_PDF)) parsec_emb = embedder.encode(parsec_cards, batch_size=64, show_progress_bar=False, normalize_embeddings=True) parsec_emb = np.asarray(parsec_emb, dtype=np.float32) parsec_index = faiss.IndexFlatIP(parsec_emb.shape[1]) parsec_index.add(parsec_emb) _tlog("parsec index", t0) # ---------------------------- # Antenna photos from ParsecCatalog.pdf (best effort) # - Build a map from Standard SKU -> page indices once at startup # - Extract the largest image on the matching page and embed as data URI in markdown # (only used when user asks for antenna options) # ---------------------------- PARSEC_PN_TO_PAGES: Dict[str, List[int]] = {} try: _doc = fitz.open(PARSEC_PDF) for i in range(len(_doc)): t = _doc[i].get_text("text") or "" for m in re.finditer(r"Standard\s+SKU:\s*([A-Z0-9]+)", t): pn = m.group(1).strip().upper() PARSEC_PN_TO_PAGES.setdefault(pn, []).append(i) except Exception: PARSEC_PN_TO_PAGES = {} def _extract_largest_image_data_uri(page_index: int, max_bytes: int = 350_000) -> str: """ Extract the largest raster image on a PDF page and return as a data URI (PNG). If the image is too large to embed, return empty string. """ try: doc = fitz.open(PARSEC_PDF) page = doc[page_index] imgs = page.get_images(full=True) or [] if not imgs: return "" best_xref = None best_area = 0 for img in imgs: xref = img[0] pix = fitz.Pixmap(doc, xref) area = pix.width * pix.height if area > best_area and pix.width >= 200 and pix.height >= 200: best_area = area best_xref = xref pix = None if best_xref is None: return "" pix = fitz.Pixmap(doc, best_xref) if pix.n >= 5: # CMYK pix = fitz.Pixmap(fitz.csRGB, pix) png_bytes = pix.tobytes("png") if len(png_bytes) > max_bytes: return "" b64 = base64.b64encode(png_bytes).decode("ascii") return f"data:image/png;base64,{b64}" except Exception: return "" @lru_cache(maxsize=512) def antenna_photo_data_uri(part_number: str) -> str: pn = str(part_number or "").strip().upper() if not pn: return "" pages = PARSEC_PN_TO_PAGES.get(pn, []) if not pages: return "" for p in pages[:3]: uri = _extract_largest_image_data_uri(p) if uri: return uri return "" # ---------------------------- # Stronger matching (regex normalization + fuzzy) # ---------------------------- def _normalize_query_compact(s: str) -> str: s = str(s or "").strip().upper() return re.sub(r"[^A-Z0-9]", "", s) def resolve_device_stronger(term: str) -> Dict[str, Any]: raw = str(term or "").strip() if not raw: return {"mode":"not_found"} q_compact = _normalize_query_compact(raw) # exact compact SKU match if q_compact: for i, sku in enumerate(df_eos["_norm_sku"].tolist()): if _normalize_query_compact(sku) == q_compact: return {"mode":"ok", "row_idx": i, "confidence":"High"} hits = process.extract(raw, EOS_CORPUS, scorer=fuzz.WRatio, limit=6) cands = [(int(idx), int(score), EOS_LABELS[int(idx)]) for _, score, idx in hits] if not cands: return {"mode":"not_found"} if cands[0][1] >= MATCH_AUTOPICK and (len(cands)==1 or (cands[0][1]-cands[1][1]) >= MATCH_GAP): return {"mode":"ok", "row_idx": cands[0][0], "confidence":"High"} return {"mode":"guess", "row_idx": cands[0][0], "confidence":"Medium", "guess_label": cands[0][2], "raw": raw} # ---------------------------- # LLM fallback: identify router + replacements (Verizon equipment only, no pricing) # ---------------------------- def llm_identify_router_and_replacements(raw_text: str) -> Dict[str, Any]: if client is None: return {"found": False, "note": "No API key configured."} sys = ( "You help Verizon reps identify cellular routers and suggest replacements. " "Keep it to Verizon-sellable equipment families when possible " "(Cradlepoint, Sierra/AirLink, Digi, Cisco/Meraki, Teltonika, Inseego). " "No pricing. Return strict JSON only." ) payload = { "user_input": raw_text, "output_schema": { "best_guess_model": "string", "maker_family": "CRADLEPOINT|SIERRA|DIGI|CISCO|CISCO_MERAKI|TELTONIKA|FEENEY|UNKNOWN", "repl_5g": "string", "repl_4g": "string", "confidence": "High|Medium", "note": "string" } } resp = client.responses.create( model=OPENAI_MODEL, input=[{"role":"system","content":sys},{"role":"user","content":_json_dump_safe(payload)}], max_output_tokens=360, ) out = _json_load_safe(getattr(resp, "output_text", "") or "") if not isinstance(out, dict) or not out.get("best_guess_model"): return {"found": False, "note": "Could not identify router."} out["found"] = True return out # ---------------------------- # Antenna options: Vehicle + Indoor + Outdoor + Directional # (all omni except directional) # ---------------------------- def antenna_options_4pack(repl5: str) -> Dict[str, Dict[str, Any]]: # All 5G routers => 4x4 veh = antenna_pick(repl5, mode="vehicle", detail=None) ind = antenna_pick(repl5, mode="stationary", detail="indoor") outd = antenna_pick(repl5, mode="stationary", detail="outdoor") direc = antenna_pick(repl5, mode="stationary", detail="directional") for a in (veh, ind, outd, direc): a["photo_uri"] = antenna_photo_data_uri(a.get("part_number","")) return {"vehicle": veh, "indoor": ind, "outdoor": outd, "directional": direc} def _fmt_ant(a: Dict[str, Any]) -> str: name = a.get("name","") pn = a.get("part_number","") desc = a.get("description","") conn = a.get("connectors","") s = f"**{name}** (PN {pn}) — {desc}" if conn: s += f" | Conn: {conn}" return s PARSEC_FAMILY_WORDS = {"chinook","labrador","boxer","bloodhound","husky","beagle","mastiff","collie","shepherd","belgian","australian","terrier","pyrenees"} def _parsec_name_from_card(card_text: str) -> str: low = card_text.lower() for fam in PARSEC_FAMILY_WORDS: if fam in low: return fam.capitalize() return "Parsec antenna" def _parsec_part_from_card(t: str) -> str: m = re.search(r"Standard\s+SKU:\s*([A-Z0-9]+)", t) return m.group(1).strip() if m else "" def _parsec_desc_from_card(t: str) -> str: m = re.search(r"Description:\s*(.+?)(?:\n|$)", t, flags=re.IGNORECASE) return re.sub(r"\s+"," ",m.group(1).strip())[:220] if m else "" def _parsec_connectors_from_card(t: str) -> str: m = re.search(r"Standard\s+Connectors:\s*(.+)", t, flags=re.IGNORECASE) return re.sub(r"\s+"," ",m.group(1).strip())[:80] if m else "" def parsec_retrieve(query: str, top_k: int = 8) -> List[Dict[str, Any]]: qv = embedder.encode([query], normalize_embeddings=True) qv = np.asarray(qv, dtype=np.float32) scores, ids = parsec_index.search(qv, top_k) out = [] for sc, i in zip(scores[0].tolist(), ids[0].tolist()): if 0 <= int(i) < len(parsec_cards): card = parsec_cards[int(i)] out.append({ "score": float(sc), "name": _parsec_name_from_card(card), "part_number": _parsec_part_from_card(card), "description": _parsec_desc_from_card(card), "connectors": _parsec_connectors_from_card(card), }) return out def antenna_pick(repl5: str, mode: str, detail: Optional[str]) -> Dict[str, Any]: mimo = "4x4" # rule: all 5G -> 4x4 tech = "5G" if mode == "vehicle": q = f"{repl5} {tech} {mimo} omni vehicle mobile magnetic through-bolt" c = parsec_retrieve(q, top_k=8) best = c[0] if c else {"name":"Parsec antenna","part_number":"","description":"","connectors":""} best.update({"mimo": mimo, "why": "Vehicle omni best match."}) return best if detail == "directional": q = f"{repl5} {tech} {mimo} directional fixed site" c = parsec_retrieve(q, top_k=8) best = c[0] if c else {"name":"Parsec antenna","part_number":"","description":"","connectors":""} best.update({"mimo": mimo, "why": "Stationary directional best match."}) return best if detail == "indoor": q = f"{repl5} {tech} {mimo} omni indoor" c = parsec_retrieve(q, top_k=8) best = c[0] if c else {"name":"Parsec antenna","part_number":"","description":"","connectors":""} best.update({"mimo": mimo, "why": "Stationary indoor omni best match."}) return best q = f"{repl5} {tech} {mimo} omni outdoor pole wall fixed site" c = parsec_retrieve(q, top_k=8) best = c[0] if c else {"name":"Parsec antenna","part_number":"","description":"","connectors":""} best.update({"mimo": mimo, "why": "Stationary outdoor omni best match."}) return best # ---------------------------- # Replacement selection (lifecycle-first) # ---------------------------- def extract_model_token(text: str) -> str: s = safe_str(text) if not s: return "" parts = [p.strip() for p in s.split("|") if p.strip()] candidates = parts[::-1] if parts else [s] for cand in candidates: u = cand.upper() m = re.search(r"\bRUT[A-Z]?\d{2,4}\b", u) if m: return m.group(0) m = re.search(r"\bRUTM\d{2,3}\b", u) if m: return m.group(0) m = re.search(r"\bIX\d{2}\b", u) if m: return m.group(0) m = re.search(r"\b(R\d{3,4}|E\d{3,4}|S\d{3,4})\b", u) if m: return m.group(0) m = re.search(r"\b[A-Z]{1,6}\d{2,4}[A-Z]?\b", u) if m: return m.group(0) return candidates[0][:60] def pick_replacements(row: pd.Series, status: str) -> Dict[str, str]: sug = safe_str(row.get("suggested_replacement", "")) adv = safe_str(row.get("advanced_5g_option", "")) repl_4g = extract_model_token(sug) if sug else "Not applicable" repl_5g = extract_model_token(adv) if adv else "Not listed" # Always provide some 5G answer: if lifecycle missing, pick top 5G from dec (same maker) if repl_5g in {"", "Not listed"}: canon_make = str(row.get("_canon_make","UNKNOWN")) pool = df_dec[(df_dec["_canon_make"] == canon_make) & (df_dec["_is5g"] == True)].copy() repl_5g = str(pool.iloc[0]["Model"]).strip() if not pool.empty else "Not listed" return {"repl_4g": repl_4g or "Not applicable", "repl_5g": repl_5g or "Not listed"} # ---------------------------- # Features + Fit (dec first, single LLM enrichment call if needed) # ---------------------------- FEATURE_COLS = ["Device", "Modem technology", "WiFi", "Ports", "Antennas", "Ruggedness", "Use case"] FIT_COLS = ["Device", "Fit badges", "Ethernet ports", "Battery"] def _features_from_dec(model: str, canon_make: str) -> Dict[str, str]: if not model or model in {"Not listed", "Not applicable"}: return {k: "Not listed" for k in FEATURE_COLS[1:]} pool = df_dec[df_dec["_canon_make"] == canon_make].copy() if pool.empty: return {k: "Not listed" for k in FEATURE_COLS[1:]} hit = process.extractOne(norm_text(model), pool["_norm_model"].tolist(), scorer=fuzz.WRatio) if not hit or hit[1] < MATCH_OK: return {k: "Not listed" for k in FEATURE_COLS[1:]} r = pool.iloc[int(hit[2])] ports = f"WAN: {r.get('WAN ports and speed','')} | LAN: {r.get('LAN ports and speed','')}".strip() return { "Modem technology": str(r.get("Modem Type","") or "Not listed"), "WiFi": str(r.get("WiFi type","") or "Not listed"), "Ports": ports if ports else "Not listed", "Antennas": str(r.get("Antennas (internal/external/both)","") or "Not listed"), "Ruggedness": str(r.get("Ruggedization","") or "Not listed"), "Use case": str(r.get("Primary use case","") or "Not listed"), } def _fit_from_dec(model: str, canon_make: str, is5: bool) -> Dict[str, str]: badges = [] eth = "Not listed" bat = "Not listed" if is5: badges.append("4x4 MIMO") pool = df_dec[df_dec["_canon_make"] == canon_make].copy() if pool.empty or not model or model in {"Not listed", "Not applicable"}: return {"Fit badges": ", ".join(badges) if badges else "Not listed", "Ethernet ports": eth, "Battery": bat} hit = process.extractOne(norm_text(model), pool["_norm_model"].tolist(), scorer=fuzz.WRatio) if not hit or hit[1] < MATCH_OK: return {"Fit badges": ", ".join(badges) if badges else "Not listed", "Ethernet ports": eth, "Battery": bat} r = pool.iloc[int(hit[2])] use_case = str(r.get("Primary use case","") or "").lower() rugged = str(r.get("Ruggedization","") or "").lower() wifi = str(r.get("WiFi type","") or "").strip().lower() serial = str(r.get("Serial port (yes/no)","") or "").strip().lower() battery = str(r.get("Battery (internal/removable/none/optional)","") or "").strip().lower() notes_blob = " ".join([str(r.get("Special notes","") or ""), str(r.get("summary and use case","") or "")]).lower() if any(k in use_case for k in ["vehicle","mobile","fleet","in-vehicle"]) or "vehicle" in rugged: badges.append("Vehicle") else: badges.append("Fixed site") if wifi and wifi not in {"none","no","n/a"}: badges.append("Wi‑Fi") if any(k in rugged for k in ["rugged","industrial","ip","harsh"]): badges.append("Rugged") if "dual" in notes_blob and "sim" in notes_blob: badges.append("Dual‑SIM") if serial in {"yes","y","true"}: badges.append("Serial") if battery: if "none" in battery: bat = "No" else: bat = "Yes" badges_csv = ", ".join(dict.fromkeys(badges)) if badges else "Not listed" return {"Fit badges": badges_csv, "Ethernet ports": eth, "Battery": bat} # Enrichment cache (one call per (make, repl4, repl5)) _ENRICH_CACHE: Dict[str, Dict[str, Any]] = {} def _enrich_key(canon_make: str, repl4: str, repl5: str) -> str: return hashlib.sha1(f"{canon_make}|{repl4}|{repl5}".encode("utf-8")).hexdigest() def gpt_enrich(repl4: str, repl5: str, canon_make: str, feat4: Dict[str,str], feat5: Dict[str,str], fit4: Dict[str,str], fit5: Dict[str,str]) -> Dict[str, Any]: if client is None: return {"feat4": feat4, "feat5": feat5, "fit4": fit4, "fit5": fit5} key = _enrich_key(canon_make, repl4, repl5) if key in _ENRICH_CACHE: return _ENRICH_CACHE[key] def miss(d: Dict[str,str]) -> List[str]: out=[] for k,v in d.items(): if (not v) or str(v).strip().lower() in {"not listed","nan",""}: out.append(k) return out m_feat4 = miss(feat4); m_feat5 = miss(feat5) m_fit4 = miss(fit4); m_fit5 = miss(fit5) if not (m_feat4 or m_feat5 or m_fit4 or m_fit5): pack = {"feat4": feat4, "feat5": feat5, "fit4": fit4, "fit5": fit5} _ENRICH_CACHE[key] = pack return pack sys = ( "You are helping a Verizon rep. Fill missing router feature fields and fit traits. Return strict JSON only. " "Keep values short. " "Fit badges must be chosen from: ['Vehicle','Fixed site','Wi‑Fi','Rugged','Dual‑SIM','4x4 MIMO','High throughput','Serial'] only. " "Rule: if a router is 5G, include '4x4 MIMO'. " "Ethernet ports must be a single integer as a string when possible; else 'Not listed'. " "Battery must be 'Yes', 'No', or 'Not listed'." ) payload = { "maker_family": canon_make, "models": {"repl4": repl4, "repl5": repl5}, "known": {"feat4": feat4, "feat5": feat5, "fit4": fit4, "fit5": fit5}, "missing": {"feat4": m_feat4, "feat5": m_feat5, "fit4": m_fit4, "fit5": m_fit5}, "output_schema": { "feat4": {k: "string" for k in m_feat4}, "feat5": {k: "string" for k in m_feat5}, "fit4": {k: "string" for k in m_fit4}, "fit5": {k: "string" for k in m_fit5}, }, } t0 = time.perf_counter() resp = client.responses.create( model=OPENAI_MODEL, input=[{"role":"system","content":sys},{"role":"user","content":_json_dump_safe(payload)}], max_output_tokens=420, ) _tlog("llm enrich", t0) out = _json_load_safe(getattr(resp, "output_text", "") or "") def merge(base: Dict[str,str], patch: Any) -> Dict[str,str]: if isinstance(patch, dict): for k,v in patch.items(): sv = str(v or "").strip() if sv: base[k] = sv return base feat4x = merge(dict(feat4), out.get("feat4", {})) feat5x = merge(dict(feat5), out.get("feat5", {})) fit4x = merge(dict(fit4), out.get("fit4", {})) fit5x = merge(dict(fit5), out.get("fit5", {})) # Enforce 5G 4x4 badge b = str(fit5x.get("Fit badges","") or "") if "4x4 MIMO" not in b: fit5x["Fit badges"] = (b + ", 4x4 MIMO").strip(", ").strip() if b and b != "Not listed" else "4x4 MIMO" pack = {"feat4": feat4x, "feat5": feat5x, "fit4": fit4x, "fit5": fit5x} _ENRICH_CACHE[key] = pack return pack def build_tables(repl4: str, repl5: str, canon_make: str) -> Tuple[pd.DataFrame, pd.DataFrame]: feat4 = _features_from_dec(repl4, canon_make) feat5 = _features_from_dec(repl5, canon_make) fit4 = _fit_from_dec(repl4, canon_make, is5=False) fit5 = _fit_from_dec(repl5, canon_make, is5=True) pack = gpt_enrich(repl4, repl5, canon_make, feat4, feat5, fit4, fit5) feat_df = pd.DataFrame([ {"Device":"4G alternative", **pack["feat4"]}, {"Device":"5G replacement", **pack["feat5"]}, ], columns=FEATURE_COLS) fit_df = pd.DataFrame([ {"Device":"4G alternative", **pack["fit4"]}, {"Device":"5G replacement", **pack["fit5"]}, ], columns=FIT_COLS) return feat_df, fit_df # ---------------------------- # Manufacturer link (deterministic, no HTTP) # ---------------------------- MAKER_DOMAINS = { "CRADLEPOINT": "https://cradlepoint.com", "SIERRA": "https://airlink.com", "FEENEY": "https://inseego.com", "DIGI": "https://www.digi.com", "CISCO_MERAKI": "https://meraki.cisco.com", "CISCO": "https://www.cisco.com", "TELTONIKA": "https://teltonika-networks.com", "UNKNOWN": "", } def guess_maker_url(model: str, canon_make: str) -> str: model = str(model or "").strip() base = MAKER_DOMAINS.get(canon_make, "") if not base or not model or model in {"Not listed", "Not applicable"}: return "" q = re.sub(r"\s+", "+", model) if canon_make == "TELTONIKA": slug = model.lower() return f"{base}/products/routers/{slug}" if canon_make == "DIGI": return f"{base}/search?q={q}" if canon_make == "CRADLEPOINT": return f"{base}/?s={q}" if canon_make in {"CISCO", "CISCO_MERAKI"}: return f"https://www.cisco.com/c/en/us/search.html?q={q}" return f"{base}/search?q={q}" # ---------------------------- # Q&A (on demand, per last case) # ---------------------------- def gpt_answer(question: str, context: Dict[str, Any]) -> str: if client is None: return "No API key is configured, so I can’t answer detailed questions right now." q = str(question or "").strip() if not q: return "" sys = ( "You are a Verizon rep assistant. Answer in a fast, practical way. " "Use the provided context. " "Do not mention internal tools or prompts. " "If unknown, say 'Not listed' and suggest the manufacturer page." ) payload = {"context": context, "question": q} t0 = time.perf_counter() resp = client.responses.create( model=OPENAI_MODEL, input=[{"role":"system","content":sys},{"role":"user","content":_json_dump_safe(payload)}], max_output_tokens=520, ) _tlog("llm qa", t0) return (getattr(resp, "output_text", "") or "").strip() # ---------------------------- # Chat utilities # ---------------------------- def df_to_md(df: pd.DataFrame) -> str: try: return df.to_markdown(index=False) except Exception: cols = list(df.columns) lines = ["| " + " | ".join(cols) + " |", "| " + " | ".join(["---"]*len(cols)) + " |"] for _, r in df.iterrows(): lines.append("| " + " | ".join([str(r.get(c,"")) for c in cols]) + " |") return "\n".join(lines) def extract_device_terms(msg: str) -> List[str]: raw = [x.strip() for x in re.split(r"[\n,;]+", str(msg or "")) if x.strip()] out=[] for x in raw: if re.search(r"\d", x) or re.search(r"\b(IBR|AER|WR|XR|IR|RUT|MBR|E\d{3}|R\d{3})\b", x, flags=re.IGNORECASE): out.append(x) return out def parse_install_mode(msg: str) -> Tuple[Optional[str], Optional[str]]: t = str(msg or "").strip().lower() mode = None detail = None if "vehicle" in t or "mobile" in t: mode = "vehicle" if "stationary" in t or "fixed" in t or "site" in t: mode = "stationary" if "indoor" in t: detail = "indoor" if "outdoor" in t: detail = "outdoor" if "directional" in t: detail = "directional" return mode, detail def make_case_key(s: str) -> str: s = str(s or "").strip() return re.sub(r"\s+", " ", s)[:80] # ---------------------------- # Chat UI (schema-safe) # ---------------------------- with gr.Blocks(title="Only-Routers") as demo: gr.Markdown("## Only-Routers\n\n**Please enter the router models you would like to verify for replacement.**\n\nPaste multiple models/SKUs separated by commas or new lines.") state = gr.State("{}") chatbot = gr.Chatbot(label="Only-Routers Chat", height=600, type="tuples") msg = gr.Textbox(label="Message", placeholder="Example: RUT240, WR21\nVehicle install", lines=2) send = gr.Button("Send", variant="primary") def chat_fn(user_msg, history, st_json): t0 = time.perf_counter() st = state_load(st_json) st.setdefault("cases", {}) st.setdefault("last_case_keys", []) st.setdefault("pending", {}) text = (user_msg or "").strip() if not text: return history, state_dump(st) # ---------------------------- # Pending: confirm best guess # ---------------------------- if st.get("pending", {}).get("type") == "confirm_guess": pend = st["pending"] raw = pend.get("raw","") row_idx = int(pend.get("row_idx",-1)) low = text.lower().strip() if low in {"yes","y","yeah","yep","correct","right","ok","okay"}: life_row = df_eos.iloc[row_idx] eos, eol, status = row_to_dates_and_status(life_row) repl = pick_replacements(life_row, status) canon_make = str(life_row.get("_canon_make","UNKNOWN")) feat_df, fit_df = build_tables(repl["repl_4g"], repl["repl_5g"], canon_make) url4 = guess_maker_url(repl["repl_4g"], canon_make) if repl["repl_4g"] != "Not applicable" else "" url5 = guess_maker_url(repl["repl_5g"], canon_make) if repl["repl_5g"] != "Not listed" else "" ck = make_case_key(str(life_row.get("sku","")) or raw) st["cases"][ck] = {"row_idx": row_idx, "repl": repl, "canon_make": canon_make, "status": status, "eos": eos, "eol": eol, "urls": {"4g": url4, "5g": url5}} st["last_case_keys"].append(ck) bot=[] bot.append(f"**{ck}**") bot.append(f"- Status: **{status}** | EOS: **{eos}** | EOL: **{eol}**") bot.append(f"- 4G alternative: **{repl['repl_4g']}**") bot.append(f"- 5G replacement: **{repl['repl_5g']}**") if url4: bot.append(f"- 4G manufacturer page: {url4}") if url5: bot.append(f"- 5G manufacturer page: {url5}") bot.append("\n**Replacement features**\n" + df_to_md(feat_df)) bot.append("\n**Verizon fit**\n" + df_to_md(fit_df)) bot.append("\nWould you like to see the **antenna options** (Vehicle, Indoor, Outdoor, Directional) for this router? Reply **Yes** or **No**.") st["pending"] = {"type":"ask_antennas", "case_keys":[ck]} history.append((text, "\n".join(bot))) _tlog("confirm guess", t0) return history, state_dump(st) if low in {"no","n","nope","wrong","incorrect"}: st["pending"] = {"type":"await_corrected_model"} history.append((text, "No problem — please reply with the corrected router model/SKU.")) return history, state_dump(st) # If they pasted corrected model instead of yes/no, fall through as new input st["pending"] = {} # ---------------------------- # Pending: waiting for corrected model # ---------------------------- if st.get("pending", {}).get("type") == "await_corrected_model": st["pending"] = {} # treat message as a new lookup # ---------------------------- # Pending: ask antennas yes/no # ---------------------------- if st.get("pending", {}).get("type") == "ask_antennas": low = text.lower().strip() want = low in {"yes","y","yeah","yep","sure","ok","okay"} case_keys = st["pending"].get("case_keys", []) or st.get("last_case_keys", []) if want: blocks=[] for ck in case_keys: case = st["cases"].get(ck, {}) repl5 = (case.get("repl", {}) or {}).get("repl_5g","") if not repl5 or repl5 == "Not listed": blocks.append(f"**{ck}**: No 5G replacement available to anchor antenna picks.") continue opts = antenna_options_4pack(repl5) case["antenna_options"] = opts st["cases"][ck] = case b=[] b.append(f"**{ck} — Antenna options (Parsec)**") b.append(f"- Vehicle (Omni): {_fmt_ant(opts['vehicle'])}") b.append(f"- Indoor (Omni): {_fmt_ant(opts['indoor'])}") b.append(f"- Outdoor (Omni): {_fmt_ant(opts['outdoor'])}") b.append(f"- Directional: {_fmt_ant(opts['directional'])}") # Photos (best effort, may be empty if too large or not found) for label in ["vehicle","indoor","outdoor","directional"]: uri = opts[label].get("photo_uri","") if uri: b.append(f"\n**{label.capitalize()} photo**\n![]({uri})\n") blocks.append("\n".join(b)) blocks.append("\nAny questions about the router(s) — including alternatives and comparisons? Ask anything router-related (no pricing).") st["pending"] = {"type":"await_questions"} history.append((text, "\n\n---\n\n".join(blocks))) _tlog("antennas yes", t0) return history, state_dump(st) # No antennas st["pending"] = {"type":"await_questions"} history.append((text, "Got it. Any questions about the router(s) — including alternatives and comparisons? Ask anything router-related (no pricing).")) return history, state_dump(st) # ---------------------------- # Pending: questions phase # ---------------------------- if st.get("pending", {}).get("type") == "await_questions": if not st.get("last_case_keys"): history.append((text, "Please enter the router models you would like to verify for replacement.")) return history, state_dump(st) # Route to most recent unless message mentions a case key target = st["last_case_keys"][-1] t_low = text.lower() for ck in reversed(st["last_case_keys"]): if ck.lower() in t_low: target = ck break case = st["cases"].get(target, {}) ctx = { "case": target, "status": case.get("status",""), "eos": case.get("eos",""), "eol": case.get("eol",""), "replacements": case.get("repl", {}), "urls": case.get("urls", {}), "antenna_options": case.get("antenna_options", {}), } ans = gpt_answer(text, ctx) history.append((text, ans)) _tlog("qa", t0) return history, state_dump(st) # ---------------------------- # Normal device intake # ---------------------------- terms = extract_device_terms(text) if not terms: # If not a device list, treat as question about last router if possible if st.get("last_case_keys"): case = st["cases"].get(st["last_case_keys"][-1], {}) ctx = {"replacements": case.get("repl", {}), "urls": case.get("urls", {}), "antenna_options": case.get("antenna_options", {})} ans = gpt_answer(text, ctx) history.append((text, ans)) return history, state_dump(st) history.append((text, "Please enter the router models you would like to verify for replacement.")) return history, state_dump(st) blocks=[] case_keys=[] for term in terms: res = resolve_device_stronger(term) if res.get("mode") == "ok": row_idx = int(res["row_idx"]) life_row = df_eos.iloc[row_idx] eos, eol, status = row_to_dates_and_status(life_row) repl = pick_replacements(life_row, status) canon_make = str(life_row.get("_canon_make","UNKNOWN")) feat_df, fit_df = build_tables(repl["repl_4g"], repl["repl_5g"], canon_make) url4 = guess_maker_url(repl["repl_4g"], canon_make) if repl["repl_4g"] != "Not applicable" else "" url5 = guess_maker_url(repl["repl_5g"], canon_make) if repl["repl_5g"] != "Not listed" else "" ck = make_case_key(str(life_row.get("sku","")) or term) st["cases"][ck] = {"row_idx": row_idx, "repl": repl, "canon_make": canon_make, "status": status, "eos": eos, "eol": eol, "urls": {"4g": url4, "5g": url5}} st["last_case_keys"].append(ck) case_keys.append(ck) bot=[] bot.append(f"**{ck}**") bot.append(f"- Status: **{status}** | EOS: **{eos}** | EOL: **{eol}**") bot.append(f"- 4G alternative: **{repl['repl_4g']}**") bot.append(f"- 5G replacement: **{repl['repl_5g']}**") if url4: bot.append(f"- 4G manufacturer page: {url4}") if url5: bot.append(f"- 5G manufacturer page: {url5}") bot.append("\n**Replacement features**\n" + df_to_md(feat_df)) bot.append("\n**Verizon fit**\n" + df_to_md(fit_df)) blocks.append("\n".join(bot)) continue if res.get("mode") == "guess": st["pending"] = {"type":"confirm_guess", "row_idx": int(res["row_idx"]), "raw": res.get("raw","")} history.append((text, f"I think you mean: **{res.get('guess_label','')}**. Is that correct? Reply **Yes** or **No** (or paste the corrected model).")) return history, state_dump(st) # Not found locally: ask to clarify AND attempt LLM best effort llm = llm_identify_router_and_replacements(term) if llm.get("found"): ck = make_case_key(llm.get("best_guess_model","") or term) repl = {"repl_4g": llm.get("repl_4g","Not applicable") or "Not applicable", "repl_5g": llm.get("repl_5g","Not listed") or "Not listed"} canon_make = llm.get("maker_family","UNKNOWN") url4 = guess_maker_url(repl["repl_4g"], canon_make) if repl["repl_4g"] != "Not applicable" else "" url5 = guess_maker_url(repl["repl_5g"], canon_make) if repl["repl_5g"] != "Not listed" else "" st["cases"][ck] = {"row_idx": None, "repl": repl, "canon_make": canon_make, "status": "Unknown", "eos": "Not listed", "eol": "Not listed", "urls": {"4g": url4, "5g": url5}, "llm_note": llm.get("note","")} st["last_case_keys"].append(ck) case_keys.append(ck) bot=[] bot.append(f"**{ck}** (best effort)") bot.append(f"- Note: {llm.get('note','')}") bot.append(f"- 4G alternative: **{repl['repl_4g']}**") bot.append(f"- 5G replacement: **{repl['repl_5g']}**") if url4: bot.append(f"- 4G manufacturer page: {url4}") if url5: bot.append(f"- 5G manufacturer page: {url5}") bot.append("\nIf this is not the correct router, reply with the exact model and manufacturer.") blocks.append("\n".join(bot)) else: blocks.append(f"**{term}**: not found. Who makes it (manufacturer) and what's the exact model/SKU?") if case_keys: blocks.append("\nWould you like to see the **antenna options** (Vehicle, Indoor, Outdoor, Directional) for each router? Reply **Yes** or **No**.") st["pending"] = {"type":"ask_antennas", "case_keys": case_keys} else: st["pending"] = {"type":"await_questions"} history.append((text, "\n\n---\n\n".join(blocks))) _tlog("lookup", t0) return history, state_dump(st) send.click(fn=chat_fn, inputs=[msg, chatbot, state], outputs=[chatbot, state], api_name=False) demo.launch(server_name="0.0.0.0", server_port=int(os.getenv("PORT","7860")), share=False, show_api=False)