Only-Routers / app.py
crazycrazypete's picture
Upload folder using huggingface_hub
7265825 verified
import os
import re
import json
import math
import time
import hashlib
import base64
import tempfile
from dataclasses import dataclass
from datetime import datetime, date
from functools import lru_cache
from typing import Any, Dict, List, Optional, Tuple
import numpy as np
import pandas as pd
import fitz # PyMuPDF
import faiss
from sentence_transformers import SentenceTransformer
from rapidfuzz import fuzz, process
import gradio as gr
from openai import OpenAI
# ============================================================
# Only-Routers (Chat, production-lean)
# - Fast model by default (no reasoning payload)
# - One LLM call max per lookup (enrichment only, cached)
# - No HTTP crawling during normal lookup (links are deterministic)
# - Timing logs to HF console when DEBUG_TIMING=1
# ============================================================
# ----------------------------
# Settings
# ----------------------------
TODAY = date(2026, 1, 18)
# Fast default model (override via env)
OPENAI_MODEL = os.getenv("OPENAI_MODEL", "gpt-5.2").strip()
# Disable LLM at runtime: OPENAI_DISABLE=1
OPENAI_DISABLE = os.getenv("OPENAI_DISABLE", "0").strip() == "1"
# Timing logs
DEBUG_TIMING = os.getenv("DEBUG_TIMING", "0").strip() == "1"
# Matching thresholds
MATCH_OK = 82
MATCH_AUTOPICK = 95
MATCH_GAP = 8
# Embeddings
EMBED_MODEL_NAME = os.getenv("EMBED_MODEL_NAME", "sentence-transformers/all-MiniLM-L6-v2").strip()
# Parsec PDF slicing
PARSEC_CONTEXT_BEFORE = 900
PARSEC_CONTEXT_AFTER = 1600
# ----------------------------
# OpenAI client
# ----------------------------
API_KEY = os.getenv("OPENAI_API_KEY", "").strip()
client = None if (not API_KEY or OPENAI_DISABLE) else OpenAI(api_key=API_KEY)
# ----------------------------
# Timing helper
# ----------------------------
def _tlog(label: str, t0: float) -> None:
if DEBUG_TIMING:
dt = time.perf_counter() - t0
print(f"[TIMER] {label}: {dt:.2f}s")
# ----------------------------
# JSON-safe helpers
# ----------------------------
def _json_load_safe(s: str) -> Dict[str, Any]:
try:
return json.loads(s)
except Exception:
return {}
def _json_dump_safe(obj: Any) -> str:
try:
return json.dumps(obj, ensure_ascii=False)
except Exception:
return "{}"
# ----------------------------
# Gradio state helpers (string JSON only)
# ----------------------------
def state_load(st_json: str) -> Dict[str, Any]:
try:
return json.loads(st_json) if isinstance(st_json, str) and st_json else {}
except Exception:
return {}
def state_dump(st: Dict[str, Any]) -> str:
return _json_dump_safe(st or {})
# ----------------------------
# Normalization
# ----------------------------
def norm_text(x: Any) -> str:
try:
if x is None or (isinstance(x, float) and math.isnan(x)) or pd.isna(x):
return ""
except Exception:
pass
s = str(x).strip().lower()
s = re.sub(r"[^a-z0-9\s\-\/]", " ", s)
s = re.sub(r"\s+", " ", s).strip()
return s
def safe_str(x: Any) -> str:
if x is None or (isinstance(x, float) and pd.isna(x)) or pd.isna(x):
return ""
return str(x).strip()
def is_5g_text(s: str) -> bool:
t = norm_text(s)
return ("5g" in t) or ("nr" in t)
def is_4g_lte_family(row: pd.Series) -> bool:
# Treat LTE categories as 4G
t = norm_text(row.get("description", "")) + " " + norm_text(row.get("notes", ""))
if "5g" in t or "nr" in t:
return False
if "lte" in t or "4g" in t:
return True
if re.search(r"\bcat\s*[-]?\s*(m1|m2)\b", t):
return True
if re.search(r"\bcat\s*[-]?\s*\d{1,2}\b", t):
return True
if "cat" in t:
return True
return False
# ----------------------------
# Lifecycle CSV normalization
# ----------------------------
def _normalize_lifecycle_df(df: pd.DataFrame) -> pd.DataFrame:
df = df.copy()
lower_cols = {c.lower(): c for c in df.columns}
def _pick(*names):
for n in names:
if n.lower() in lower_cols:
return lower_cols[n.lower()]
return None
col_map = {}
sku_col = _pick("sku", "SKU")
if sku_col:
col_map[sku_col] = "sku"
mfr_col = _pick("manufacturer", "Manufacturer")
if mfr_col:
col_map[mfr_col] = "manufacturer"
dt_col = _pick("device type", "Device Type", "device_type")
if dt_col:
col_map[dt_col] = "device_type"
eos_col = _pick("end_of_sale", "end of sale", "End of Sale", "eos")
if eos_col:
col_map[eos_col] = "end_of_sale"
eol_col = _pick("end_of_life", "end of life", "End of Life", "eol")
if eol_col:
col_map[eol_col] = "end_of_life"
sr_col = _pick("suggested_replacement", "Suggested Replacement")
if sr_col:
col_map[sr_col] = "suggested_replacement"
a5_col = _pick("advanced_5g_option", "Advanced 5G Option", "advanced 5g option")
if a5_col:
col_map[a5_col] = "advanced_5g_option"
df = df.rename(columns=col_map)
for req in ["sku", "manufacturer", "device_type", "end_of_sale", "end_of_life", "suggested_replacement", "advanced_5g_option"]:
if req not in df.columns:
df[req] = ""
# Compatibility fields used by matching/output
if "description" not in df.columns:
df["description"] = df["sku"].astype(str)
if "notes" not in df.columns:
df["notes"] = ""
if "region" not in df.columns:
df["region"] = ""
return df
# ----------------------------
# Maker mapping
# ----------------------------
CANON_MAKER = {
"CRADLEPOINT": {"cradlepoint", "ericsson", "ericsson enterprise wireless"},
"SIERRA": {"sierra", "sierra wireless", "semtech", "airlink"},
"FEENEY": {"feeney", "feeney wireless", "inseego"},
"DIGI": {"digi", "accelerated", "accelerated concepts"},
"CISCO_MERAKI": {"meraki", "cisco meraki"},
"CISCO": {"cisco"},
"TELTONIKA": {"teltonika"},
}
def canon_maker_from_text(s: Any) -> str:
t = norm_text(s)
for canon, terms in CANON_MAKER.items():
for term in terms:
if term in t:
return canon
return "UNKNOWN"
# ----------------------------
# Date parsing
# ----------------------------
@dataclass
class ParsedDate:
raw: str
kind: str
value: Optional[date]
def parse_date_field(x: Any) -> ParsedDate:
raw = safe_str(x)
if not raw:
return ParsedDate(raw="", kind="missing", value=None)
# MM/DD/YY or M/D/YY
if re.fullmatch(r"\d{1,2}/\d{1,2}/\d{2,4}", raw):
try:
parts = raw.split("/")
m = int(parts[0]); d = int(parts[1]); y = int(parts[2])
if y < 100:
y += 2000
dt = date(y, m, d)
return ParsedDate(raw=f"{y:04d}-{m:02d}-{d:02d}", kind="full", value=dt)
except Exception:
return ParsedDate(raw=raw, kind="bad", value=None)
# YYYY
if re.fullmatch(r"\d{4}", raw):
y = int(raw)
if y == TODAY.year:
return ParsedDate(raw=raw, kind="year", value=date(y, 1, 1))
if y < TODAY.year:
return ParsedDate(raw=raw, kind="year", value=date(y, 1, 1))
return ParsedDate(raw=raw, kind="year", value=date(y, 12, 31))
# YYYY-MM
if re.fullmatch(r"\d{4}-\d{2}", raw):
try:
y, m = raw.split("-")
dt = date(int(y), int(m), 1)
return ParsedDate(raw=raw, kind="year_month", value=dt)
except Exception:
return ParsedDate(raw=raw, kind="bad", value=None)
# YYYY-MM-DD
if re.fullmatch(r"\d{4}-\d{2}-\d{2}", raw):
try:
dt = datetime.strptime(raw, "%Y-%m-%d").date()
return ParsedDate(raw=raw, kind="full", value=dt)
except Exception:
return ParsedDate(raw=raw, kind="bad", value=None)
return ParsedDate(raw=raw, kind="bad", value=None)
def display_date(pd_: ParsedDate) -> str:
if pd_.kind == "missing":
return "Not listed"
if pd_.kind == "bad":
return pd_.raw or "Not listed"
return pd_.raw
def status_from_eos_eol(eos: ParsedDate, eol: ParsedDate) -> str:
if eos.value is None and eol.value is None:
return "Unknown"
if eol.value is not None and eol.value <= TODAY:
return "End of Life"
if eos.value is not None and eos.value <= TODAY:
return "End of Sale"
return "Active"
def row_to_dates_and_status(row: pd.Series) -> Tuple[str, str, str]:
eos = parse_date_field(row.get("end_of_sale"))
eol = parse_date_field(row.get("end_of_life"))
return display_date(eos), display_date(eol), status_from_eos_eol(eos, eol)
# ----------------------------
# Files
# ----------------------------
EOS_PATH = "routers_eos_eol_by_sku.csv"
DEC_PATH = "dec2025routers.csv"
PARSEC_PDF = "ParsecCatalog.pdf"
if not os.path.exists(EOS_PATH):
raise FileNotFoundError(f"Missing {EOS_PATH} in repo.")
if not os.path.exists(DEC_PATH):
raise FileNotFoundError(f"Missing {DEC_PATH} in repo.")
if not os.path.exists(PARSEC_PDF):
raise FileNotFoundError(f"Missing {PARSEC_PDF} in repo.")
t0 = time.perf_counter()
df_eos = pd.read_csv(EOS_PATH).copy()
df_dec = pd.read_csv(DEC_PATH).copy()
df_eos = _normalize_lifecycle_df(df_eos)
# Canon columns
df_eos["_canon_make"] = df_eos["manufacturer"].apply(canon_maker_from_text)
df_eos["_norm_sku"] = df_eos["sku"].apply(norm_text)
df_eos["_norm_desc"] = df_eos["description"].apply(norm_text)
df_eos["_norm_notes"] = df_eos["notes"].apply(norm_text)
df_dec["_canon_make"] = df_dec["Make"].apply(canon_maker_from_text) if "Make" in df_dec.columns else "UNKNOWN"
df_dec["_norm_model"] = df_dec["Model"].apply(norm_text) if "Model" in df_dec.columns else ""
df_dec["_is5g"] = df_dec["Modem Type"].apply(lambda x: is_5g_text(str(x))) if "Modem Type" in df_dec.columns else False
_tlog("load csv", t0)
# ----------------------------
# Build fuzzy corpus for device matching
# ----------------------------
def _label_for_row(i: int) -> str:
r = df_eos.iloc[i]
return f"{r.get('sku','')}{r.get('manufacturer','')}{r.get('description','')}"[:220]
EOS_LABELS = [_label_for_row(i) for i in range(len(df_eos))]
EOS_CORPUS = []
for _, r in df_eos.iterrows():
EOS_CORPUS.append(" ".join([r.get("_norm_sku",""), r.get("_canon_make",""), r.get("_norm_desc",""), r.get("_norm_notes","")]))
def resolve_device(term: str) -> Dict[str, Any]:
q = norm_text(term)
if not q:
return {"mode": "not_found"}
exact = df_eos.index[df_eos["_norm_sku"] == q].tolist()
if len(exact) == 1:
return {"mode":"ok","row_idx": int(exact[0])}
hits = process.extract(q, EOS_CORPUS, scorer=fuzz.WRatio, limit=6)
cands = [(int(idx), int(score), EOS_LABELS[int(idx)]) for _, score, idx in hits]
if not cands:
return {"mode":"not_found"}
if cands[0][1] >= MATCH_AUTOPICK and (len(cands) == 1 or (cands[0][1] - cands[1][1]) >= MATCH_GAP):
return {"mode":"ok","row_idx": cands[0][0]}
opts = [{"row_idx": cands[0][0], "label": cands[0][2]}]
if len(cands) > 1:
opts.append({"row_idx": cands[1][0], "label": cands[1][2]})
return {"mode":"pick","options": opts}
# ----------------------------
# Parsec RAG (FAISS)
# ----------------------------
t0 = time.perf_counter()
embedder = SentenceTransformer(EMBED_MODEL_NAME)
def extract_pdf_text_pages(path: str) -> List[str]:
doc = fitz.open(path)
return [doc[i].get_text("text") for i in range(len(doc))]
def build_parsec_cards(pages: List[str]) -> List[str]:
cards = []
for p in pages:
for m in re.finditer(r"Standard\s+SKU:", p):
start = max(0, m.start() - PARSEC_CONTEXT_BEFORE)
end = min(len(p), m.start() + PARSEC_CONTEXT_AFTER)
c = p[start:end].strip()
if len(c) >= 200:
cards.append(c)
out, seen = [], set()
for c in cards:
h = hashlib.sha1(c.encode("utf-8")).hexdigest()
if h not in seen:
seen.add(h); out.append(c)
return out
parsec_cards = build_parsec_cards(extract_pdf_text_pages(PARSEC_PDF))
parsec_emb = embedder.encode(parsec_cards, batch_size=64, show_progress_bar=False, normalize_embeddings=True)
parsec_emb = np.asarray(parsec_emb, dtype=np.float32)
parsec_index = faiss.IndexFlatIP(parsec_emb.shape[1])
parsec_index.add(parsec_emb)
_tlog("parsec index", t0)
# ----------------------------
# Antenna photos from ParsecCatalog.pdf (best effort)
# - Build a map from Standard SKU -> page indices once at startup
# - Extract the largest image on the matching page and embed as data URI in markdown
# (only used when user asks for antenna options)
# ----------------------------
PARSEC_PN_TO_PAGES: Dict[str, List[int]] = {}
try:
_doc = fitz.open(PARSEC_PDF)
for i in range(len(_doc)):
t = _doc[i].get_text("text") or ""
for m in re.finditer(r"Standard\s+SKU:\s*([A-Z0-9]+)", t):
pn = m.group(1).strip().upper()
PARSEC_PN_TO_PAGES.setdefault(pn, []).append(i)
except Exception:
PARSEC_PN_TO_PAGES = {}
def _extract_largest_image_data_uri(page_index: int, max_bytes: int = 350_000) -> str:
"""
Extract the largest raster image on a PDF page and return as a data URI (PNG).
If the image is too large to embed, return empty string.
"""
try:
doc = fitz.open(PARSEC_PDF)
page = doc[page_index]
imgs = page.get_images(full=True) or []
if not imgs:
return ""
best_xref = None
best_area = 0
for img in imgs:
xref = img[0]
pix = fitz.Pixmap(doc, xref)
area = pix.width * pix.height
if area > best_area and pix.width >= 200 and pix.height >= 200:
best_area = area
best_xref = xref
pix = None
if best_xref is None:
return ""
pix = fitz.Pixmap(doc, best_xref)
if pix.n >= 5: # CMYK
pix = fitz.Pixmap(fitz.csRGB, pix)
png_bytes = pix.tobytes("png")
if len(png_bytes) > max_bytes:
return ""
b64 = base64.b64encode(png_bytes).decode("ascii")
return f"data:image/png;base64,{b64}"
except Exception:
return ""
@lru_cache(maxsize=512)
def antenna_photo_data_uri(part_number: str) -> str:
pn = str(part_number or "").strip().upper()
if not pn:
return ""
pages = PARSEC_PN_TO_PAGES.get(pn, [])
if not pages:
return ""
for p in pages[:3]:
uri = _extract_largest_image_data_uri(p)
if uri:
return uri
return ""
# ----------------------------
# Stronger matching (regex normalization + fuzzy)
# ----------------------------
def _normalize_query_compact(s: str) -> str:
s = str(s or "").strip().upper()
return re.sub(r"[^A-Z0-9]", "", s)
def resolve_device_stronger(term: str) -> Dict[str, Any]:
raw = str(term or "").strip()
if not raw:
return {"mode":"not_found"}
q_compact = _normalize_query_compact(raw)
# exact compact SKU match
if q_compact:
for i, sku in enumerate(df_eos["_norm_sku"].tolist()):
if _normalize_query_compact(sku) == q_compact:
return {"mode":"ok", "row_idx": i, "confidence":"High"}
hits = process.extract(raw, EOS_CORPUS, scorer=fuzz.WRatio, limit=6)
cands = [(int(idx), int(score), EOS_LABELS[int(idx)]) for _, score, idx in hits]
if not cands:
return {"mode":"not_found"}
if cands[0][1] >= MATCH_AUTOPICK and (len(cands)==1 or (cands[0][1]-cands[1][1]) >= MATCH_GAP):
return {"mode":"ok", "row_idx": cands[0][0], "confidence":"High"}
return {"mode":"guess", "row_idx": cands[0][0], "confidence":"Medium", "guess_label": cands[0][2], "raw": raw}
# ----------------------------
# LLM fallback: identify router + replacements (Verizon equipment only, no pricing)
# ----------------------------
def llm_identify_router_and_replacements(raw_text: str) -> Dict[str, Any]:
if client is None:
return {"found": False, "note": "No API key configured."}
sys = (
"You help Verizon reps identify cellular routers and suggest replacements. "
"Keep it to Verizon-sellable equipment families when possible "
"(Cradlepoint, Sierra/AirLink, Digi, Cisco/Meraki, Teltonika, Inseego). "
"No pricing. Return strict JSON only."
)
payload = {
"user_input": raw_text,
"output_schema": {
"best_guess_model": "string",
"maker_family": "CRADLEPOINT|SIERRA|DIGI|CISCO|CISCO_MERAKI|TELTONIKA|FEENEY|UNKNOWN",
"repl_5g": "string",
"repl_4g": "string",
"confidence": "High|Medium",
"note": "string"
}
}
resp = client.responses.create(
model=OPENAI_MODEL,
input=[{"role":"system","content":sys},{"role":"user","content":_json_dump_safe(payload)}],
max_output_tokens=360,
)
out = _json_load_safe(getattr(resp, "output_text", "") or "")
if not isinstance(out, dict) or not out.get("best_guess_model"):
return {"found": False, "note": "Could not identify router."}
out["found"] = True
return out
# ----------------------------
# Antenna options: Vehicle + Indoor + Outdoor + Directional
# (all omni except directional)
# ----------------------------
def antenna_options_4pack(repl5: str) -> Dict[str, Dict[str, Any]]:
# All 5G routers => 4x4
veh = antenna_pick(repl5, mode="vehicle", detail=None)
ind = antenna_pick(repl5, mode="stationary", detail="indoor")
outd = antenna_pick(repl5, mode="stationary", detail="outdoor")
direc = antenna_pick(repl5, mode="stationary", detail="directional")
for a in (veh, ind, outd, direc):
a["photo_uri"] = antenna_photo_data_uri(a.get("part_number",""))
return {"vehicle": veh, "indoor": ind, "outdoor": outd, "directional": direc}
def _fmt_ant(a: Dict[str, Any]) -> str:
name = a.get("name","")
pn = a.get("part_number","")
desc = a.get("description","")
conn = a.get("connectors","")
s = f"**{name}** (PN {pn}) — {desc}"
if conn:
s += f" | Conn: {conn}"
return s
PARSEC_FAMILY_WORDS = {"chinook","labrador","boxer","bloodhound","husky","beagle","mastiff","collie","shepherd","belgian","australian","terrier","pyrenees"}
def _parsec_name_from_card(card_text: str) -> str:
low = card_text.lower()
for fam in PARSEC_FAMILY_WORDS:
if fam in low:
return fam.capitalize()
return "Parsec antenna"
def _parsec_part_from_card(t: str) -> str:
m = re.search(r"Standard\s+SKU:\s*([A-Z0-9]+)", t)
return m.group(1).strip() if m else ""
def _parsec_desc_from_card(t: str) -> str:
m = re.search(r"Description:\s*(.+?)(?:\n|$)", t, flags=re.IGNORECASE)
return re.sub(r"\s+"," ",m.group(1).strip())[:220] if m else ""
def _parsec_connectors_from_card(t: str) -> str:
m = re.search(r"Standard\s+Connectors:\s*(.+)", t, flags=re.IGNORECASE)
return re.sub(r"\s+"," ",m.group(1).strip())[:80] if m else ""
def parsec_retrieve(query: str, top_k: int = 8) -> List[Dict[str, Any]]:
qv = embedder.encode([query], normalize_embeddings=True)
qv = np.asarray(qv, dtype=np.float32)
scores, ids = parsec_index.search(qv, top_k)
out = []
for sc, i in zip(scores[0].tolist(), ids[0].tolist()):
if 0 <= int(i) < len(parsec_cards):
card = parsec_cards[int(i)]
out.append({
"score": float(sc),
"name": _parsec_name_from_card(card),
"part_number": _parsec_part_from_card(card),
"description": _parsec_desc_from_card(card),
"connectors": _parsec_connectors_from_card(card),
})
return out
def antenna_pick(repl5: str, mode: str, detail: Optional[str]) -> Dict[str, Any]:
mimo = "4x4" # rule: all 5G -> 4x4
tech = "5G"
if mode == "vehicle":
q = f"{repl5} {tech} {mimo} omni vehicle mobile magnetic through-bolt"
c = parsec_retrieve(q, top_k=8)
best = c[0] if c else {"name":"Parsec antenna","part_number":"","description":"","connectors":""}
best.update({"mimo": mimo, "why": "Vehicle omni best match."})
return best
if detail == "directional":
q = f"{repl5} {tech} {mimo} directional fixed site"
c = parsec_retrieve(q, top_k=8)
best = c[0] if c else {"name":"Parsec antenna","part_number":"","description":"","connectors":""}
best.update({"mimo": mimo, "why": "Stationary directional best match."})
return best
if detail == "indoor":
q = f"{repl5} {tech} {mimo} omni indoor"
c = parsec_retrieve(q, top_k=8)
best = c[0] if c else {"name":"Parsec antenna","part_number":"","description":"","connectors":""}
best.update({"mimo": mimo, "why": "Stationary indoor omni best match."})
return best
q = f"{repl5} {tech} {mimo} omni outdoor pole wall fixed site"
c = parsec_retrieve(q, top_k=8)
best = c[0] if c else {"name":"Parsec antenna","part_number":"","description":"","connectors":""}
best.update({"mimo": mimo, "why": "Stationary outdoor omni best match."})
return best
# ----------------------------
# Replacement selection (lifecycle-first)
# ----------------------------
def extract_model_token(text: str) -> str:
s = safe_str(text)
if not s:
return ""
parts = [p.strip() for p in s.split("|") if p.strip()]
candidates = parts[::-1] if parts else [s]
for cand in candidates:
u = cand.upper()
m = re.search(r"\bRUT[A-Z]?\d{2,4}\b", u)
if m:
return m.group(0)
m = re.search(r"\bRUTM\d{2,3}\b", u)
if m:
return m.group(0)
m = re.search(r"\bIX\d{2}\b", u)
if m:
return m.group(0)
m = re.search(r"\b(R\d{3,4}|E\d{3,4}|S\d{3,4})\b", u)
if m:
return m.group(0)
m = re.search(r"\b[A-Z]{1,6}\d{2,4}[A-Z]?\b", u)
if m:
return m.group(0)
return candidates[0][:60]
def pick_replacements(row: pd.Series, status: str) -> Dict[str, str]:
sug = safe_str(row.get("suggested_replacement", ""))
adv = safe_str(row.get("advanced_5g_option", ""))
repl_4g = extract_model_token(sug) if sug else "Not applicable"
repl_5g = extract_model_token(adv) if adv else "Not listed"
# Always provide some 5G answer: if lifecycle missing, pick top 5G from dec (same maker)
if repl_5g in {"", "Not listed"}:
canon_make = str(row.get("_canon_make","UNKNOWN"))
pool = df_dec[(df_dec["_canon_make"] == canon_make) & (df_dec["_is5g"] == True)].copy()
repl_5g = str(pool.iloc[0]["Model"]).strip() if not pool.empty else "Not listed"
return {"repl_4g": repl_4g or "Not applicable", "repl_5g": repl_5g or "Not listed"}
# ----------------------------
# Features + Fit (dec first, single LLM enrichment call if needed)
# ----------------------------
FEATURE_COLS = ["Device", "Modem technology", "WiFi", "Ports", "Antennas", "Ruggedness", "Use case"]
FIT_COLS = ["Device", "Fit badges", "Ethernet ports", "Battery"]
def _features_from_dec(model: str, canon_make: str) -> Dict[str, str]:
if not model or model in {"Not listed", "Not applicable"}:
return {k: "Not listed" for k in FEATURE_COLS[1:]}
pool = df_dec[df_dec["_canon_make"] == canon_make].copy()
if pool.empty:
return {k: "Not listed" for k in FEATURE_COLS[1:]}
hit = process.extractOne(norm_text(model), pool["_norm_model"].tolist(), scorer=fuzz.WRatio)
if not hit or hit[1] < MATCH_OK:
return {k: "Not listed" for k in FEATURE_COLS[1:]}
r = pool.iloc[int(hit[2])]
ports = f"WAN: {r.get('WAN ports and speed','')} | LAN: {r.get('LAN ports and speed','')}".strip()
return {
"Modem technology": str(r.get("Modem Type","") or "Not listed"),
"WiFi": str(r.get("WiFi type","") or "Not listed"),
"Ports": ports if ports else "Not listed",
"Antennas": str(r.get("Antennas (internal/external/both)","") or "Not listed"),
"Ruggedness": str(r.get("Ruggedization","") or "Not listed"),
"Use case": str(r.get("Primary use case","") or "Not listed"),
}
def _fit_from_dec(model: str, canon_make: str, is5: bool) -> Dict[str, str]:
badges = []
eth = "Not listed"
bat = "Not listed"
if is5:
badges.append("4x4 MIMO")
pool = df_dec[df_dec["_canon_make"] == canon_make].copy()
if pool.empty or not model or model in {"Not listed", "Not applicable"}:
return {"Fit badges": ", ".join(badges) if badges else "Not listed", "Ethernet ports": eth, "Battery": bat}
hit = process.extractOne(norm_text(model), pool["_norm_model"].tolist(), scorer=fuzz.WRatio)
if not hit or hit[1] < MATCH_OK:
return {"Fit badges": ", ".join(badges) if badges else "Not listed", "Ethernet ports": eth, "Battery": bat}
r = pool.iloc[int(hit[2])]
use_case = str(r.get("Primary use case","") or "").lower()
rugged = str(r.get("Ruggedization","") or "").lower()
wifi = str(r.get("WiFi type","") or "").strip().lower()
serial = str(r.get("Serial port (yes/no)","") or "").strip().lower()
battery = str(r.get("Battery (internal/removable/none/optional)","") or "").strip().lower()
notes_blob = " ".join([str(r.get("Special notes","") or ""), str(r.get("summary and use case","") or "")]).lower()
if any(k in use_case for k in ["vehicle","mobile","fleet","in-vehicle"]) or "vehicle" in rugged:
badges.append("Vehicle")
else:
badges.append("Fixed site")
if wifi and wifi not in {"none","no","n/a"}:
badges.append("Wi‑Fi")
if any(k in rugged for k in ["rugged","industrial","ip","harsh"]):
badges.append("Rugged")
if "dual" in notes_blob and "sim" in notes_blob:
badges.append("Dual‑SIM")
if serial in {"yes","y","true"}:
badges.append("Serial")
if battery:
if "none" in battery:
bat = "No"
else:
bat = "Yes"
badges_csv = ", ".join(dict.fromkeys(badges)) if badges else "Not listed"
return {"Fit badges": badges_csv, "Ethernet ports": eth, "Battery": bat}
# Enrichment cache (one call per (make, repl4, repl5))
_ENRICH_CACHE: Dict[str, Dict[str, Any]] = {}
def _enrich_key(canon_make: str, repl4: str, repl5: str) -> str:
return hashlib.sha1(f"{canon_make}|{repl4}|{repl5}".encode("utf-8")).hexdigest()
def gpt_enrich(repl4: str, repl5: str, canon_make: str, feat4: Dict[str,str], feat5: Dict[str,str], fit4: Dict[str,str], fit5: Dict[str,str]) -> Dict[str, Any]:
if client is None:
return {"feat4": feat4, "feat5": feat5, "fit4": fit4, "fit5": fit5}
key = _enrich_key(canon_make, repl4, repl5)
if key in _ENRICH_CACHE:
return _ENRICH_CACHE[key]
def miss(d: Dict[str,str]) -> List[str]:
out=[]
for k,v in d.items():
if (not v) or str(v).strip().lower() in {"not listed","nan",""}:
out.append(k)
return out
m_feat4 = miss(feat4); m_feat5 = miss(feat5)
m_fit4 = miss(fit4); m_fit5 = miss(fit5)
if not (m_feat4 or m_feat5 or m_fit4 or m_fit5):
pack = {"feat4": feat4, "feat5": feat5, "fit4": fit4, "fit5": fit5}
_ENRICH_CACHE[key] = pack
return pack
sys = (
"You are helping a Verizon rep. Fill missing router feature fields and fit traits. Return strict JSON only. "
"Keep values short. "
"Fit badges must be chosen from: ['Vehicle','Fixed site','Wi‑Fi','Rugged','Dual‑SIM','4x4 MIMO','High throughput','Serial'] only. "
"Rule: if a router is 5G, include '4x4 MIMO'. "
"Ethernet ports must be a single integer as a string when possible; else 'Not listed'. "
"Battery must be 'Yes', 'No', or 'Not listed'."
)
payload = {
"maker_family": canon_make,
"models": {"repl4": repl4, "repl5": repl5},
"known": {"feat4": feat4, "feat5": feat5, "fit4": fit4, "fit5": fit5},
"missing": {"feat4": m_feat4, "feat5": m_feat5, "fit4": m_fit4, "fit5": m_fit5},
"output_schema": {
"feat4": {k: "string" for k in m_feat4},
"feat5": {k: "string" for k in m_feat5},
"fit4": {k: "string" for k in m_fit4},
"fit5": {k: "string" for k in m_fit5},
},
}
t0 = time.perf_counter()
resp = client.responses.create(
model=OPENAI_MODEL,
input=[{"role":"system","content":sys},{"role":"user","content":_json_dump_safe(payload)}],
max_output_tokens=420,
)
_tlog("llm enrich", t0)
out = _json_load_safe(getattr(resp, "output_text", "") or "")
def merge(base: Dict[str,str], patch: Any) -> Dict[str,str]:
if isinstance(patch, dict):
for k,v in patch.items():
sv = str(v or "").strip()
if sv:
base[k] = sv
return base
feat4x = merge(dict(feat4), out.get("feat4", {}))
feat5x = merge(dict(feat5), out.get("feat5", {}))
fit4x = merge(dict(fit4), out.get("fit4", {}))
fit5x = merge(dict(fit5), out.get("fit5", {}))
# Enforce 5G 4x4 badge
b = str(fit5x.get("Fit badges","") or "")
if "4x4 MIMO" not in b:
fit5x["Fit badges"] = (b + ", 4x4 MIMO").strip(", ").strip() if b and b != "Not listed" else "4x4 MIMO"
pack = {"feat4": feat4x, "feat5": feat5x, "fit4": fit4x, "fit5": fit5x}
_ENRICH_CACHE[key] = pack
return pack
def build_tables(repl4: str, repl5: str, canon_make: str) -> Tuple[pd.DataFrame, pd.DataFrame]:
feat4 = _features_from_dec(repl4, canon_make)
feat5 = _features_from_dec(repl5, canon_make)
fit4 = _fit_from_dec(repl4, canon_make, is5=False)
fit5 = _fit_from_dec(repl5, canon_make, is5=True)
pack = gpt_enrich(repl4, repl5, canon_make, feat4, feat5, fit4, fit5)
feat_df = pd.DataFrame([
{"Device":"4G alternative", **pack["feat4"]},
{"Device":"5G replacement", **pack["feat5"]},
], columns=FEATURE_COLS)
fit_df = pd.DataFrame([
{"Device":"4G alternative", **pack["fit4"]},
{"Device":"5G replacement", **pack["fit5"]},
], columns=FIT_COLS)
return feat_df, fit_df
# ----------------------------
# Manufacturer link (deterministic, no HTTP)
# ----------------------------
MAKER_DOMAINS = {
"CRADLEPOINT": "https://cradlepoint.com",
"SIERRA": "https://airlink.com",
"FEENEY": "https://inseego.com",
"DIGI": "https://www.digi.com",
"CISCO_MERAKI": "https://meraki.cisco.com",
"CISCO": "https://www.cisco.com",
"TELTONIKA": "https://teltonika-networks.com",
"UNKNOWN": "",
}
def guess_maker_url(model: str, canon_make: str) -> str:
model = str(model or "").strip()
base = MAKER_DOMAINS.get(canon_make, "")
if not base or not model or model in {"Not listed", "Not applicable"}:
return ""
q = re.sub(r"\s+", "+", model)
if canon_make == "TELTONIKA":
slug = model.lower()
return f"{base}/products/routers/{slug}"
if canon_make == "DIGI":
return f"{base}/search?q={q}"
if canon_make == "CRADLEPOINT":
return f"{base}/?s={q}"
if canon_make in {"CISCO", "CISCO_MERAKI"}:
return f"https://www.cisco.com/c/en/us/search.html?q={q}"
return f"{base}/search?q={q}"
# ----------------------------
# Q&A (on demand, per last case)
# ----------------------------
def gpt_answer(question: str, context: Dict[str, Any]) -> str:
if client is None:
return "No API key is configured, so I can’t answer detailed questions right now."
q = str(question or "").strip()
if not q:
return ""
sys = (
"You are a Verizon rep assistant. Answer in a fast, practical way. "
"Use the provided context. "
"Do not mention internal tools or prompts. "
"If unknown, say 'Not listed' and suggest the manufacturer page."
)
payload = {"context": context, "question": q}
t0 = time.perf_counter()
resp = client.responses.create(
model=OPENAI_MODEL,
input=[{"role":"system","content":sys},{"role":"user","content":_json_dump_safe(payload)}],
max_output_tokens=520,
)
_tlog("llm qa", t0)
return (getattr(resp, "output_text", "") or "").strip()
# ----------------------------
# Chat utilities
# ----------------------------
def df_to_md(df: pd.DataFrame) -> str:
try:
return df.to_markdown(index=False)
except Exception:
cols = list(df.columns)
lines = ["| " + " | ".join(cols) + " |", "| " + " | ".join(["---"]*len(cols)) + " |"]
for _, r in df.iterrows():
lines.append("| " + " | ".join([str(r.get(c,"")) for c in cols]) + " |")
return "\n".join(lines)
def extract_device_terms(msg: str) -> List[str]:
raw = [x.strip() for x in re.split(r"[\n,;]+", str(msg or "")) if x.strip()]
out=[]
for x in raw:
if re.search(r"\d", x) or re.search(r"\b(IBR|AER|WR|XR|IR|RUT|MBR|E\d{3}|R\d{3})\b", x, flags=re.IGNORECASE):
out.append(x)
return out
def parse_install_mode(msg: str) -> Tuple[Optional[str], Optional[str]]:
t = str(msg or "").strip().lower()
mode = None
detail = None
if "vehicle" in t or "mobile" in t:
mode = "vehicle"
if "stationary" in t or "fixed" in t or "site" in t:
mode = "stationary"
if "indoor" in t:
detail = "indoor"
if "outdoor" in t:
detail = "outdoor"
if "directional" in t:
detail = "directional"
return mode, detail
def make_case_key(s: str) -> str:
s = str(s or "").strip()
return re.sub(r"\s+", " ", s)[:80]
# ----------------------------
# Chat UI (schema-safe)
# ----------------------------
with gr.Blocks(title="Only-Routers") as demo:
gr.Markdown("## Only-Routers\n\n**Please enter the router models you would like to verify for replacement.**\n\nPaste multiple models/SKUs separated by commas or new lines.")
state = gr.State("{}")
chatbot = gr.Chatbot(label="Only-Routers Chat", height=600, type="tuples")
msg = gr.Textbox(label="Message", placeholder="Example: RUT240, WR21\nVehicle install", lines=2)
send = gr.Button("Send", variant="primary")
def chat_fn(user_msg, history, st_json):
t0 = time.perf_counter()
st = state_load(st_json)
st.setdefault("cases", {})
st.setdefault("last_case_keys", [])
st.setdefault("pending", {})
text = (user_msg or "").strip()
if not text:
return history, state_dump(st)
# ----------------------------
# Pending: confirm best guess
# ----------------------------
if st.get("pending", {}).get("type") == "confirm_guess":
pend = st["pending"]
raw = pend.get("raw","")
row_idx = int(pend.get("row_idx",-1))
low = text.lower().strip()
if low in {"yes","y","yeah","yep","correct","right","ok","okay"}:
life_row = df_eos.iloc[row_idx]
eos, eol, status = row_to_dates_and_status(life_row)
repl = pick_replacements(life_row, status)
canon_make = str(life_row.get("_canon_make","UNKNOWN"))
feat_df, fit_df = build_tables(repl["repl_4g"], repl["repl_5g"], canon_make)
url4 = guess_maker_url(repl["repl_4g"], canon_make) if repl["repl_4g"] != "Not applicable" else ""
url5 = guess_maker_url(repl["repl_5g"], canon_make) if repl["repl_5g"] != "Not listed" else ""
ck = make_case_key(str(life_row.get("sku","")) or raw)
st["cases"][ck] = {"row_idx": row_idx, "repl": repl, "canon_make": canon_make, "status": status, "eos": eos, "eol": eol, "urls": {"4g": url4, "5g": url5}}
st["last_case_keys"].append(ck)
bot=[]
bot.append(f"**{ck}**")
bot.append(f"- Status: **{status}** | EOS: **{eos}** | EOL: **{eol}**")
bot.append(f"- 4G alternative: **{repl['repl_4g']}**")
bot.append(f"- 5G replacement: **{repl['repl_5g']}**")
if url4:
bot.append(f"- 4G manufacturer page: {url4}")
if url5:
bot.append(f"- 5G manufacturer page: {url5}")
bot.append("\n**Replacement features**\n" + df_to_md(feat_df))
bot.append("\n**Verizon fit**\n" + df_to_md(fit_df))
bot.append("\nWould you like to see the **antenna options** (Vehicle, Indoor, Outdoor, Directional) for this router? Reply **Yes** or **No**.")
st["pending"] = {"type":"ask_antennas", "case_keys":[ck]}
history.append((text, "\n".join(bot)))
_tlog("confirm guess", t0)
return history, state_dump(st)
if low in {"no","n","nope","wrong","incorrect"}:
st["pending"] = {"type":"await_corrected_model"}
history.append((text, "No problem — please reply with the corrected router model/SKU."))
return history, state_dump(st)
# If they pasted corrected model instead of yes/no, fall through as new input
st["pending"] = {}
# ----------------------------
# Pending: waiting for corrected model
# ----------------------------
if st.get("pending", {}).get("type") == "await_corrected_model":
st["pending"] = {} # treat message as a new lookup
# ----------------------------
# Pending: ask antennas yes/no
# ----------------------------
if st.get("pending", {}).get("type") == "ask_antennas":
low = text.lower().strip()
want = low in {"yes","y","yeah","yep","sure","ok","okay"}
case_keys = st["pending"].get("case_keys", []) or st.get("last_case_keys", [])
if want:
blocks=[]
for ck in case_keys:
case = st["cases"].get(ck, {})
repl5 = (case.get("repl", {}) or {}).get("repl_5g","")
if not repl5 or repl5 == "Not listed":
blocks.append(f"**{ck}**: No 5G replacement available to anchor antenna picks.")
continue
opts = antenna_options_4pack(repl5)
case["antenna_options"] = opts
st["cases"][ck] = case
b=[]
b.append(f"**{ck} — Antenna options (Parsec)**")
b.append(f"- Vehicle (Omni): {_fmt_ant(opts['vehicle'])}")
b.append(f"- Indoor (Omni): {_fmt_ant(opts['indoor'])}")
b.append(f"- Outdoor (Omni): {_fmt_ant(opts['outdoor'])}")
b.append(f"- Directional: {_fmt_ant(opts['directional'])}")
# Photos (best effort, may be empty if too large or not found)
for label in ["vehicle","indoor","outdoor","directional"]:
uri = opts[label].get("photo_uri","")
if uri:
b.append(f"\n**{label.capitalize()} photo**\n![]({uri})\n")
blocks.append("\n".join(b))
blocks.append("\nAny questions about the router(s) — including alternatives and comparisons? Ask anything router-related (no pricing).")
st["pending"] = {"type":"await_questions"}
history.append((text, "\n\n---\n\n".join(blocks)))
_tlog("antennas yes", t0)
return history, state_dump(st)
# No antennas
st["pending"] = {"type":"await_questions"}
history.append((text, "Got it. Any questions about the router(s) — including alternatives and comparisons? Ask anything router-related (no pricing)."))
return history, state_dump(st)
# ----------------------------
# Pending: questions phase
# ----------------------------
if st.get("pending", {}).get("type") == "await_questions":
if not st.get("last_case_keys"):
history.append((text, "Please enter the router models you would like to verify for replacement."))
return history, state_dump(st)
# Route to most recent unless message mentions a case key
target = st["last_case_keys"][-1]
t_low = text.lower()
for ck in reversed(st["last_case_keys"]):
if ck.lower() in t_low:
target = ck
break
case = st["cases"].get(target, {})
ctx = {
"case": target,
"status": case.get("status",""),
"eos": case.get("eos",""),
"eol": case.get("eol",""),
"replacements": case.get("repl", {}),
"urls": case.get("urls", {}),
"antenna_options": case.get("antenna_options", {}),
}
ans = gpt_answer(text, ctx)
history.append((text, ans))
_tlog("qa", t0)
return history, state_dump(st)
# ----------------------------
# Normal device intake
# ----------------------------
terms = extract_device_terms(text)
if not terms:
# If not a device list, treat as question about last router if possible
if st.get("last_case_keys"):
case = st["cases"].get(st["last_case_keys"][-1], {})
ctx = {"replacements": case.get("repl", {}), "urls": case.get("urls", {}), "antenna_options": case.get("antenna_options", {})}
ans = gpt_answer(text, ctx)
history.append((text, ans))
return history, state_dump(st)
history.append((text, "Please enter the router models you would like to verify for replacement."))
return history, state_dump(st)
blocks=[]
case_keys=[]
for term in terms:
res = resolve_device_stronger(term)
if res.get("mode") == "ok":
row_idx = int(res["row_idx"])
life_row = df_eos.iloc[row_idx]
eos, eol, status = row_to_dates_and_status(life_row)
repl = pick_replacements(life_row, status)
canon_make = str(life_row.get("_canon_make","UNKNOWN"))
feat_df, fit_df = build_tables(repl["repl_4g"], repl["repl_5g"], canon_make)
url4 = guess_maker_url(repl["repl_4g"], canon_make) if repl["repl_4g"] != "Not applicable" else ""
url5 = guess_maker_url(repl["repl_5g"], canon_make) if repl["repl_5g"] != "Not listed" else ""
ck = make_case_key(str(life_row.get("sku","")) or term)
st["cases"][ck] = {"row_idx": row_idx, "repl": repl, "canon_make": canon_make, "status": status, "eos": eos, "eol": eol, "urls": {"4g": url4, "5g": url5}}
st["last_case_keys"].append(ck)
case_keys.append(ck)
bot=[]
bot.append(f"**{ck}**")
bot.append(f"- Status: **{status}** | EOS: **{eos}** | EOL: **{eol}**")
bot.append(f"- 4G alternative: **{repl['repl_4g']}**")
bot.append(f"- 5G replacement: **{repl['repl_5g']}**")
if url4:
bot.append(f"- 4G manufacturer page: {url4}")
if url5:
bot.append(f"- 5G manufacturer page: {url5}")
bot.append("\n**Replacement features**\n" + df_to_md(feat_df))
bot.append("\n**Verizon fit**\n" + df_to_md(fit_df))
blocks.append("\n".join(bot))
continue
if res.get("mode") == "guess":
st["pending"] = {"type":"confirm_guess", "row_idx": int(res["row_idx"]), "raw": res.get("raw","")}
history.append((text, f"I think you mean: **{res.get('guess_label','')}**. Is that correct? Reply **Yes** or **No** (or paste the corrected model)."))
return history, state_dump(st)
# Not found locally: ask to clarify AND attempt LLM best effort
llm = llm_identify_router_and_replacements(term)
if llm.get("found"):
ck = make_case_key(llm.get("best_guess_model","") or term)
repl = {"repl_4g": llm.get("repl_4g","Not applicable") or "Not applicable", "repl_5g": llm.get("repl_5g","Not listed") or "Not listed"}
canon_make = llm.get("maker_family","UNKNOWN")
url4 = guess_maker_url(repl["repl_4g"], canon_make) if repl["repl_4g"] != "Not applicable" else ""
url5 = guess_maker_url(repl["repl_5g"], canon_make) if repl["repl_5g"] != "Not listed" else ""
st["cases"][ck] = {"row_idx": None, "repl": repl, "canon_make": canon_make, "status": "Unknown", "eos": "Not listed", "eol": "Not listed", "urls": {"4g": url4, "5g": url5}, "llm_note": llm.get("note","")}
st["last_case_keys"].append(ck)
case_keys.append(ck)
bot=[]
bot.append(f"**{ck}** (best effort)")
bot.append(f"- Note: {llm.get('note','')}")
bot.append(f"- 4G alternative: **{repl['repl_4g']}**")
bot.append(f"- 5G replacement: **{repl['repl_5g']}**")
if url4:
bot.append(f"- 4G manufacturer page: {url4}")
if url5:
bot.append(f"- 5G manufacturer page: {url5}")
bot.append("\nIf this is not the correct router, reply with the exact model and manufacturer.")
blocks.append("\n".join(bot))
else:
blocks.append(f"**{term}**: not found. Who makes it (manufacturer) and what's the exact model/SKU?")
if case_keys:
blocks.append("\nWould you like to see the **antenna options** (Vehicle, Indoor, Outdoor, Directional) for each router? Reply **Yes** or **No**.")
st["pending"] = {"type":"ask_antennas", "case_keys": case_keys}
else:
st["pending"] = {"type":"await_questions"}
history.append((text, "\n\n---\n\n".join(blocks)))
_tlog("lookup", t0)
return history, state_dump(st)
send.click(fn=chat_fn, inputs=[msg, chatbot, state], outputs=[chatbot, state], api_name=False)
demo.launch(server_name="0.0.0.0", server_port=int(os.getenv("PORT","7860")), share=False, show_api=False)