crazycrazypete's picture
Upload folder using huggingface_hub
7265825 verified
import os
import re
import json
import math
import time
import hashlib
import tempfile
from dataclasses import dataclass
from datetime import datetime, date
from functools import lru_cache
from typing import Any, Dict, List, Optional, Tuple
import numpy as np
import pandas as pd
import fitz # PyMuPDF
import faiss
from sentence_transformers import SentenceTransformer
from rapidfuzz import fuzz, process
import gradio as gr
from openai import OpenAI
# ============================================================
# Only-Routers (Chat, production-lean)
# - Fast model by default (no reasoning payload)
# - One LLM call max per lookup (enrichment only, cached)
# - No HTTP crawling during normal lookup (links are deterministic)
# - Timing logs to HF console when DEBUG_TIMING=1
# ============================================================
# ----------------------------
# Settings
# ----------------------------
TODAY = date(2026, 1, 18)
# Fast default model (override via env)
OPENAI_MODEL = os.getenv("OPENAI_MODEL", "gpt-5.2").strip()
# Disable LLM at runtime: OPENAI_DISABLE=1
OPENAI_DISABLE = os.getenv("OPENAI_DISABLE", "0").strip() == "1"
# Timing logs
DEBUG_TIMING = os.getenv("DEBUG_TIMING", "0").strip() == "1"
# Matching thresholds
MATCH_OK = 82
MATCH_AUTOPICK = 95
MATCH_GAP = 8
# Embeddings
EMBED_MODEL_NAME = os.getenv("EMBED_MODEL_NAME", "sentence-transformers/all-MiniLM-L6-v2").strip()
# Parsec PDF slicing
PARSEC_CONTEXT_BEFORE = 900
PARSEC_CONTEXT_AFTER = 1600
# ----------------------------
# OpenAI client
# ----------------------------
API_KEY = os.getenv("OPENAI_API_KEY", "").strip()
client = None if (not API_KEY or OPENAI_DISABLE) else OpenAI(api_key=API_KEY)
# ----------------------------
# Timing helper
# ----------------------------
def _tlog(label: str, t0: float) -> None:
if DEBUG_TIMING:
dt = time.perf_counter() - t0
print(f"[TIMER] {label}: {dt:.2f}s")
# ----------------------------
# JSON-safe helpers
# ----------------------------
def _json_load_safe(s: str) -> Dict[str, Any]:
try:
return json.loads(s)
except Exception:
return {}
def _json_dump_safe(obj: Any) -> str:
try:
return json.dumps(obj, ensure_ascii=False)
except Exception:
return "{}"
# ----------------------------
# Gradio state helpers (string JSON only)
# ----------------------------
def state_load(st_json: str) -> Dict[str, Any]:
try:
return json.loads(st_json) if isinstance(st_json, str) and st_json else {}
except Exception:
return {}
def state_dump(st: Dict[str, Any]) -> str:
return _json_dump_safe(st or {})
# ----------------------------
# Normalization
# ----------------------------
def norm_text(x: Any) -> str:
try:
if x is None or (isinstance(x, float) and math.isnan(x)) or pd.isna(x):
return ""
except Exception:
pass
s = str(x).strip().lower()
s = re.sub(r"[^a-z0-9\s\-\/]", " ", s)
s = re.sub(r"\s+", " ", s).strip()
return s
def safe_str(x: Any) -> str:
if x is None or (isinstance(x, float) and pd.isna(x)) or pd.isna(x):
return ""
return str(x).strip()
def is_5g_text(s: str) -> bool:
t = norm_text(s)
return ("5g" in t) or ("nr" in t)
def is_4g_lte_family(row: pd.Series) -> bool:
# Treat LTE categories as 4G
t = norm_text(row.get("description", "")) + " " + norm_text(row.get("notes", ""))
if "5g" in t or "nr" in t:
return False
if "lte" in t or "4g" in t:
return True
if re.search(r"\bcat\s*[-]?\s*(m1|m2)\b", t):
return True
if re.search(r"\bcat\s*[-]?\s*\d{1,2}\b", t):
return True
if "cat" in t:
return True
return False
# ----------------------------
# Lifecycle CSV normalization
# ----------------------------
def _normalize_lifecycle_df(df: pd.DataFrame) -> pd.DataFrame:
df = df.copy()
lower_cols = {c.lower(): c for c in df.columns}
def _pick(*names):
for n in names:
if n.lower() in lower_cols:
return lower_cols[n.lower()]
return None
col_map = {}
sku_col = _pick("sku", "SKU")
if sku_col:
col_map[sku_col] = "sku"
mfr_col = _pick("manufacturer", "Manufacturer")
if mfr_col:
col_map[mfr_col] = "manufacturer"
dt_col = _pick("device type", "Device Type", "device_type")
if dt_col:
col_map[dt_col] = "device_type"
eos_col = _pick("end_of_sale", "end of sale", "End of Sale", "eos")
if eos_col:
col_map[eos_col] = "end_of_sale"
eol_col = _pick("end_of_life", "end of life", "End of Life", "eol")
if eol_col:
col_map[eol_col] = "end_of_life"
sr_col = _pick("suggested_replacement", "Suggested Replacement")
if sr_col:
col_map[sr_col] = "suggested_replacement"
a5_col = _pick("advanced_5g_option", "Advanced 5G Option", "advanced 5g option")
if a5_col:
col_map[a5_col] = "advanced_5g_option"
df = df.rename(columns=col_map)
for req in ["sku", "manufacturer", "device_type", "end_of_sale", "end_of_life", "suggested_replacement", "advanced_5g_option"]:
if req not in df.columns:
df[req] = ""
# Compatibility fields used by matching/output
if "description" not in df.columns:
df["description"] = df["sku"].astype(str)
if "notes" not in df.columns:
df["notes"] = ""
if "region" not in df.columns:
df["region"] = ""
return df
# ----------------------------
# Maker mapping
# ----------------------------
CANON_MAKER = {
"CRADLEPOINT": {"cradlepoint", "ericsson", "ericsson enterprise wireless"},
"SIERRA": {"sierra", "sierra wireless", "semtech", "airlink"},
"FEENEY": {"feeney", "feeney wireless", "inseego"},
"DIGI": {"digi", "accelerated", "accelerated concepts"},
"CISCO_MERAKI": {"meraki", "cisco meraki"},
"CISCO": {"cisco"},
"TELTONIKA": {"teltonika"},
}
def canon_maker_from_text(s: Any) -> str:
t = norm_text(s)
for canon, terms in CANON_MAKER.items():
for term in terms:
if term in t:
return canon
return "UNKNOWN"
# ----------------------------
# Date parsing
# ----------------------------
@dataclass
class ParsedDate:
raw: str
kind: str
value: Optional[date]
def parse_date_field(x: Any) -> ParsedDate:
raw = safe_str(x)
if not raw:
return ParsedDate(raw="", kind="missing", value=None)
# MM/DD/YY or M/D/YY
if re.fullmatch(r"\d{1,2}/\d{1,2}/\d{2,4}", raw):
try:
parts = raw.split("/")
m = int(parts[0]); d = int(parts[1]); y = int(parts[2])
if y < 100:
y += 2000
dt = date(y, m, d)
return ParsedDate(raw=f"{y:04d}-{m:02d}-{d:02d}", kind="full", value=dt)
except Exception:
return ParsedDate(raw=raw, kind="bad", value=None)
# YYYY
if re.fullmatch(r"\d{4}", raw):
y = int(raw)
if y == TODAY.year:
return ParsedDate(raw=raw, kind="year", value=date(y, 1, 1))
if y < TODAY.year:
return ParsedDate(raw=raw, kind="year", value=date(y, 1, 1))
return ParsedDate(raw=raw, kind="year", value=date(y, 12, 31))
# YYYY-MM
if re.fullmatch(r"\d{4}-\d{2}", raw):
try:
y, m = raw.split("-")
dt = date(int(y), int(m), 1)
return ParsedDate(raw=raw, kind="year_month", value=dt)
except Exception:
return ParsedDate(raw=raw, kind="bad", value=None)
# YYYY-MM-DD
if re.fullmatch(r"\d{4}-\d{2}-\d{2}", raw):
try:
dt = datetime.strptime(raw, "%Y-%m-%d").date()
return ParsedDate(raw=raw, kind="full", value=dt)
except Exception:
return ParsedDate(raw=raw, kind="bad", value=None)
return ParsedDate(raw=raw, kind="bad", value=None)
def display_date(pd_: ParsedDate) -> str:
if pd_.kind == "missing":
return "Not listed"
if pd_.kind == "bad":
return pd_.raw or "Not listed"
return pd_.raw
def status_from_eos_eol(eos: ParsedDate, eol: ParsedDate) -> str:
if eos.value is None and eol.value is None:
return "Unknown"
if eol.value is not None and eol.value <= TODAY:
return "End of Life"
if eos.value is not None and eos.value <= TODAY:
return "End of Sale"
return "Active"
def row_to_dates_and_status(row: pd.Series) -> Tuple[str, str, str]:
eos = parse_date_field(row.get("end_of_sale"))
eol = parse_date_field(row.get("end_of_life"))
return display_date(eos), display_date(eol), status_from_eos_eol(eos, eol)
# ----------------------------
# Files
# ----------------------------
EOS_PATH = "routers_eos_eol_by_sku.csv"
DEC_PATH = "dec2025routers.csv"
PARSEC_PDF = "ParsecCatalog.pdf"
if not os.path.exists(EOS_PATH):
raise FileNotFoundError(f"Missing {EOS_PATH} in repo.")
if not os.path.exists(DEC_PATH):
raise FileNotFoundError(f"Missing {DEC_PATH} in repo.")
if not os.path.exists(PARSEC_PDF):
raise FileNotFoundError(f"Missing {PARSEC_PDF} in repo.")
t0 = time.perf_counter()
df_eos = pd.read_csv(EOS_PATH).copy()
df_dec = pd.read_csv(DEC_PATH).copy()
df_eos = _normalize_lifecycle_df(df_eos)
# Canon columns
df_eos["_canon_make"] = df_eos["manufacturer"].apply(canon_maker_from_text)
df_eos["_norm_sku"] = df_eos["sku"].apply(norm_text)
df_eos["_norm_desc"] = df_eos["description"].apply(norm_text)
df_eos["_norm_notes"] = df_eos["notes"].apply(norm_text)
df_dec["_canon_make"] = df_dec["Make"].apply(canon_maker_from_text) if "Make" in df_dec.columns else "UNKNOWN"
df_dec["_norm_model"] = df_dec["Model"].apply(norm_text) if "Model" in df_dec.columns else ""
df_dec["_is5g"] = df_dec["Modem Type"].apply(lambda x: is_5g_text(str(x))) if "Modem Type" in df_dec.columns else False
_tlog("load csv", t0)
# ----------------------------
# Build fuzzy corpus for device matching
# ----------------------------
def _label_for_row(i: int) -> str:
r = df_eos.iloc[i]
return f"{r.get('sku','')}{r.get('manufacturer','')}{r.get('description','')}"[:220]
EOS_LABELS = [_label_for_row(i) for i in range(len(df_eos))]
EOS_CORPUS = []
for _, r in df_eos.iterrows():
EOS_CORPUS.append(" ".join([r.get("_norm_sku",""), r.get("_canon_make",""), r.get("_norm_desc",""), r.get("_norm_notes","")]))
def resolve_device(term: str) -> Dict[str, Any]:
q = norm_text(term)
if not q:
return {"mode": "not_found"}
exact = df_eos.index[df_eos["_norm_sku"] == q].tolist()
if len(exact) == 1:
return {"mode":"ok","row_idx": int(exact[0])}
hits = process.extract(q, EOS_CORPUS, scorer=fuzz.WRatio, limit=6)
cands = [(int(idx), int(score), EOS_LABELS[int(idx)]) for _, score, idx in hits]
if not cands:
return {"mode":"not_found"}
if cands[0][1] >= MATCH_AUTOPICK and (len(cands) == 1 or (cands[0][1] - cands[1][1]) >= MATCH_GAP):
return {"mode":"ok","row_idx": cands[0][0]}
opts = [{"row_idx": cands[0][0], "label": cands[0][2]}]
if len(cands) > 1:
opts.append({"row_idx": cands[1][0], "label": cands[1][2]})
return {"mode":"pick","options": opts}
# ----------------------------
# Parsec RAG (FAISS)
# ----------------------------
t0 = time.perf_counter()
embedder = SentenceTransformer(EMBED_MODEL_NAME)
def extract_pdf_text_pages(path: str) -> List[str]:
doc = fitz.open(path)
return [doc[i].get_text("text") for i in range(len(doc))]
def build_parsec_cards(pages: List[str]) -> List[str]:
cards = []
for p in pages:
for m in re.finditer(r"Standard\s+SKU:", p):
start = max(0, m.start() - PARSEC_CONTEXT_BEFORE)
end = min(len(p), m.start() + PARSEC_CONTEXT_AFTER)
c = p[start:end].strip()
if len(c) >= 200:
cards.append(c)
out, seen = [], set()
for c in cards:
h = hashlib.sha1(c.encode("utf-8")).hexdigest()
if h not in seen:
seen.add(h); out.append(c)
return out
parsec_cards = build_parsec_cards(extract_pdf_text_pages(PARSEC_PDF))
parsec_emb = embedder.encode(parsec_cards, batch_size=64, show_progress_bar=False, normalize_embeddings=True)
parsec_emb = np.asarray(parsec_emb, dtype=np.float32)
parsec_index = faiss.IndexFlatIP(parsec_emb.shape[1])
parsec_index.add(parsec_emb)
_tlog("parsec index", t0)
PARSEC_FAMILY_WORDS = {"chinook","labrador","boxer","bloodhound","husky","beagle","mastiff","collie","shepherd","belgian","australian","terrier","pyrenees"}
def _parsec_name_from_card(card_text: str) -> str:
low = card_text.lower()
for fam in PARSEC_FAMILY_WORDS:
if fam in low:
return fam.capitalize()
return "Parsec antenna"
def _parsec_part_from_card(t: str) -> str:
m = re.search(r"Standard\s+SKU:\s*([A-Z0-9]+)", t)
return m.group(1).strip() if m else ""
def _parsec_desc_from_card(t: str) -> str:
m = re.search(r"Description:\s*(.+?)(?:\n|$)", t, flags=re.IGNORECASE)
return re.sub(r"\s+"," ",m.group(1).strip())[:220] if m else ""
def _parsec_connectors_from_card(t: str) -> str:
m = re.search(r"Standard\s+Connectors:\s*(.+)", t, flags=re.IGNORECASE)
return re.sub(r"\s+"," ",m.group(1).strip())[:80] if m else ""
def parsec_retrieve(query: str, top_k: int = 8) -> List[Dict[str, Any]]:
qv = embedder.encode([query], normalize_embeddings=True)
qv = np.asarray(qv, dtype=np.float32)
scores, ids = parsec_index.search(qv, top_k)
out = []
for sc, i in zip(scores[0].tolist(), ids[0].tolist()):
if 0 <= int(i) < len(parsec_cards):
card = parsec_cards[int(i)]
out.append({
"score": float(sc),
"name": _parsec_name_from_card(card),
"part_number": _parsec_part_from_card(card),
"description": _parsec_desc_from_card(card),
"connectors": _parsec_connectors_from_card(card),
})
return out
def antenna_pick(repl5: str, mode: str, detail: Optional[str]) -> Dict[str, Any]:
mimo = "4x4" # rule: all 5G -> 4x4
tech = "5G"
if mode == "vehicle":
q = f"{repl5} {tech} {mimo} omni vehicle mobile magnetic through-bolt"
c = parsec_retrieve(q, top_k=8)
best = c[0] if c else {"name":"Parsec antenna","part_number":"","description":"","connectors":""}
best.update({"mimo": mimo, "why": "Vehicle omni best match."})
return best
if detail == "directional":
q = f"{repl5} {tech} {mimo} directional fixed site"
c = parsec_retrieve(q, top_k=8)
best = c[0] if c else {"name":"Parsec antenna","part_number":"","description":"","connectors":""}
best.update({"mimo": mimo, "why": "Stationary directional best match."})
return best
if detail == "indoor":
q = f"{repl5} {tech} {mimo} omni indoor"
c = parsec_retrieve(q, top_k=8)
best = c[0] if c else {"name":"Parsec antenna","part_number":"","description":"","connectors":""}
best.update({"mimo": mimo, "why": "Stationary indoor omni best match."})
return best
q = f"{repl5} {tech} {mimo} omni outdoor pole wall fixed site"
c = parsec_retrieve(q, top_k=8)
best = c[0] if c else {"name":"Parsec antenna","part_number":"","description":"","connectors":""}
best.update({"mimo": mimo, "why": "Stationary outdoor omni best match."})
return best
# ----------------------------
# Replacement selection (lifecycle-first)
# ----------------------------
def extract_model_token(text: str) -> str:
s = safe_str(text)
if not s:
return ""
parts = [p.strip() for p in s.split("|") if p.strip()]
candidates = parts[::-1] if parts else [s]
for cand in candidates:
u = cand.upper()
m = re.search(r"\bRUT[A-Z]?\d{2,4}\b", u)
if m:
return m.group(0)
m = re.search(r"\bRUTM\d{2,3}\b", u)
if m:
return m.group(0)
m = re.search(r"\bIX\d{2}\b", u)
if m:
return m.group(0)
m = re.search(r"\b(R\d{3,4}|E\d{3,4}|S\d{3,4})\b", u)
if m:
return m.group(0)
m = re.search(r"\b[A-Z]{1,6}\d{2,4}[A-Z]?\b", u)
if m:
return m.group(0)
return candidates[0][:60]
def pick_replacements(row: pd.Series, status: str) -> Dict[str, str]:
sug = safe_str(row.get("suggested_replacement", ""))
adv = safe_str(row.get("advanced_5g_option", ""))
repl_4g = extract_model_token(sug) if sug else "Not applicable"
repl_5g = extract_model_token(adv) if adv else "Not listed"
# Always provide some 5G answer: if lifecycle missing, pick top 5G from dec (same maker)
if repl_5g in {"", "Not listed"}:
canon_make = str(row.get("_canon_make","UNKNOWN"))
pool = df_dec[(df_dec["_canon_make"] == canon_make) & (df_dec["_is5g"] == True)].copy()
repl_5g = str(pool.iloc[0]["Model"]).strip() if not pool.empty else "Not listed"
return {"repl_4g": repl_4g or "Not applicable", "repl_5g": repl_5g or "Not listed"}
# ----------------------------
# Features + Fit (dec first, single LLM enrichment call if needed)
# ----------------------------
FEATURE_COLS = ["Device", "Modem technology", "WiFi", "Ports", "Antennas", "Ruggedness", "Use case"]
FIT_COLS = ["Device", "Fit badges", "Ethernet ports", "Battery"]
def _features_from_dec(model: str, canon_make: str) -> Dict[str, str]:
if not model or model in {"Not listed", "Not applicable"}:
return {k: "Not listed" for k in FEATURE_COLS[1:]}
pool = df_dec[df_dec["_canon_make"] == canon_make].copy()
if pool.empty:
return {k: "Not listed" for k in FEATURE_COLS[1:]}
hit = process.extractOne(norm_text(model), pool["_norm_model"].tolist(), scorer=fuzz.WRatio)
if not hit or hit[1] < MATCH_OK:
return {k: "Not listed" for k in FEATURE_COLS[1:]}
r = pool.iloc[int(hit[2])]
ports = f"WAN: {r.get('WAN ports and speed','')} | LAN: {r.get('LAN ports and speed','')}".strip()
return {
"Modem technology": str(r.get("Modem Type","") or "Not listed"),
"WiFi": str(r.get("WiFi type","") or "Not listed"),
"Ports": ports if ports else "Not listed",
"Antennas": str(r.get("Antennas (internal/external/both)","") or "Not listed"),
"Ruggedness": str(r.get("Ruggedization","") or "Not listed"),
"Use case": str(r.get("Primary use case","") or "Not listed"),
}
def _fit_from_dec(model: str, canon_make: str, is5: bool) -> Dict[str, str]:
badges = []
eth = "Not listed"
bat = "Not listed"
if is5:
badges.append("4x4 MIMO")
pool = df_dec[df_dec["_canon_make"] == canon_make].copy()
if pool.empty or not model or model in {"Not listed", "Not applicable"}:
return {"Fit badges": ", ".join(badges) if badges else "Not listed", "Ethernet ports": eth, "Battery": bat}
hit = process.extractOne(norm_text(model), pool["_norm_model"].tolist(), scorer=fuzz.WRatio)
if not hit or hit[1] < MATCH_OK:
return {"Fit badges": ", ".join(badges) if badges else "Not listed", "Ethernet ports": eth, "Battery": bat}
r = pool.iloc[int(hit[2])]
use_case = str(r.get("Primary use case","") or "").lower()
rugged = str(r.get("Ruggedization","") or "").lower()
wifi = str(r.get("WiFi type","") or "").strip().lower()
serial = str(r.get("Serial port (yes/no)","") or "").strip().lower()
battery = str(r.get("Battery (internal/removable/none/optional)","") or "").strip().lower()
notes_blob = " ".join([str(r.get("Special notes","") or ""), str(r.get("summary and use case","") or "")]).lower()
if any(k in use_case for k in ["vehicle","mobile","fleet","in-vehicle"]) or "vehicle" in rugged:
badges.append("Vehicle")
else:
badges.append("Fixed site")
if wifi and wifi not in {"none","no","n/a"}:
badges.append("Wi‑Fi")
if any(k in rugged for k in ["rugged","industrial","ip","harsh"]):
badges.append("Rugged")
if "dual" in notes_blob and "sim" in notes_blob:
badges.append("Dual‑SIM")
if serial in {"yes","y","true"}:
badges.append("Serial")
if battery:
if "none" in battery:
bat = "No"
else:
bat = "Yes"
badges_csv = ", ".join(dict.fromkeys(badges)) if badges else "Not listed"
return {"Fit badges": badges_csv, "Ethernet ports": eth, "Battery": bat}
# Enrichment cache (one call per (make, repl4, repl5))
_ENRICH_CACHE: Dict[str, Dict[str, Any]] = {}
def _enrich_key(canon_make: str, repl4: str, repl5: str) -> str:
return hashlib.sha1(f"{canon_make}|{repl4}|{repl5}".encode("utf-8")).hexdigest()
def gpt_enrich(repl4: str, repl5: str, canon_make: str, feat4: Dict[str,str], feat5: Dict[str,str], fit4: Dict[str,str], fit5: Dict[str,str]) -> Dict[str, Any]:
if client is None:
return {"feat4": feat4, "feat5": feat5, "fit4": fit4, "fit5": fit5}
key = _enrich_key(canon_make, repl4, repl5)
if key in _ENRICH_CACHE:
return _ENRICH_CACHE[key]
def miss(d: Dict[str,str]) -> List[str]:
out=[]
for k,v in d.items():
if (not v) or str(v).strip().lower() in {"not listed","nan",""}:
out.append(k)
return out
m_feat4 = miss(feat4); m_feat5 = miss(feat5)
m_fit4 = miss(fit4); m_fit5 = miss(fit5)
if not (m_feat4 or m_feat5 or m_fit4 or m_fit5):
pack = {"feat4": feat4, "feat5": feat5, "fit4": fit4, "fit5": fit5}
_ENRICH_CACHE[key] = pack
return pack
sys = (
"You are helping a Verizon rep. Fill missing router feature fields and fit traits. Return strict JSON only. "
"Keep values short. "
"Fit badges must be chosen from: ['Vehicle','Fixed site','Wi‑Fi','Rugged','Dual‑SIM','4x4 MIMO','High throughput','Serial'] only. "
"Rule: if a router is 5G, include '4x4 MIMO'. "
"Ethernet ports must be a single integer as a string when possible; else 'Not listed'. "
"Battery must be 'Yes', 'No', or 'Not listed'."
)
payload = {
"maker_family": canon_make,
"models": {"repl4": repl4, "repl5": repl5},
"known": {"feat4": feat4, "feat5": feat5, "fit4": fit4, "fit5": fit5},
"missing": {"feat4": m_feat4, "feat5": m_feat5, "fit4": m_fit4, "fit5": m_fit5},
"output_schema": {
"feat4": {k: "string" for k in m_feat4},
"feat5": {k: "string" for k in m_feat5},
"fit4": {k: "string" for k in m_fit4},
"fit5": {k: "string" for k in m_fit5},
},
}
t0 = time.perf_counter()
resp = client.responses.create(
model=OPENAI_MODEL,
input=[{"role":"system","content":sys},{"role":"user","content":_json_dump_safe(payload)}],
max_output_tokens=420,
)
_tlog("llm enrich", t0)
out = _json_load_safe(getattr(resp, "output_text", "") or "")
def merge(base: Dict[str,str], patch: Any) -> Dict[str,str]:
if isinstance(patch, dict):
for k,v in patch.items():
sv = str(v or "").strip()
if sv:
base[k] = sv
return base
feat4x = merge(dict(feat4), out.get("feat4", {}))
feat5x = merge(dict(feat5), out.get("feat5", {}))
fit4x = merge(dict(fit4), out.get("fit4", {}))
fit5x = merge(dict(fit5), out.get("fit5", {}))
# Enforce 5G 4x4 badge
b = str(fit5x.get("Fit badges","") or "")
if "4x4 MIMO" not in b:
fit5x["Fit badges"] = (b + ", 4x4 MIMO").strip(", ").strip() if b and b != "Not listed" else "4x4 MIMO"
pack = {"feat4": feat4x, "feat5": feat5x, "fit4": fit4x, "fit5": fit5x}
_ENRICH_CACHE[key] = pack
return pack
def build_tables(repl4: str, repl5: str, canon_make: str) -> Tuple[pd.DataFrame, pd.DataFrame]:
feat4 = _features_from_dec(repl4, canon_make)
feat5 = _features_from_dec(repl5, canon_make)
fit4 = _fit_from_dec(repl4, canon_make, is5=False)
fit5 = _fit_from_dec(repl5, canon_make, is5=True)
pack = gpt_enrich(repl4, repl5, canon_make, feat4, feat5, fit4, fit5)
feat_df = pd.DataFrame([
{"Device":"4G alternative", **pack["feat4"]},
{"Device":"5G replacement", **pack["feat5"]},
], columns=FEATURE_COLS)
fit_df = pd.DataFrame([
{"Device":"4G alternative", **pack["fit4"]},
{"Device":"5G replacement", **pack["fit5"]},
], columns=FIT_COLS)
return feat_df, fit_df
# ----------------------------
# Manufacturer link (deterministic, no HTTP)
# ----------------------------
MAKER_DOMAINS = {
"CRADLEPOINT": "https://cradlepoint.com",
"SIERRA": "https://airlink.com",
"FEENEY": "https://inseego.com",
"DIGI": "https://www.digi.com",
"CISCO_MERAKI": "https://meraki.cisco.com",
"CISCO": "https://www.cisco.com",
"TELTONIKA": "https://teltonika-networks.com",
"UNKNOWN": "",
}
def guess_maker_url(model: str, canon_make: str) -> str:
model = str(model or "").strip()
base = MAKER_DOMAINS.get(canon_make, "")
if not base or not model or model in {"Not listed", "Not applicable"}:
return ""
q = re.sub(r"\s+", "+", model)
if canon_make == "TELTONIKA":
slug = model.lower()
return f"{base}/products/routers/{slug}"
if canon_make == "DIGI":
return f"{base}/search?q={q}"
if canon_make == "CRADLEPOINT":
return f"{base}/?s={q}"
if canon_make in {"CISCO", "CISCO_MERAKI"}:
return f"https://www.cisco.com/c/en/us/search.html?q={q}"
return f"{base}/search?q={q}"
# ----------------------------
# Q&A (on demand, per last case)
# ----------------------------
def gpt_answer(question: str, context: Dict[str, Any]) -> str:
if client is None:
return "No API key is configured, so I can’t answer detailed questions right now."
q = str(question or "").strip()
if not q:
return ""
sys = (
"You are a Verizon rep assistant. Answer in a fast, practical way. "
"Use the provided context. "
"Do not mention internal tools or prompts. "
"If unknown, say 'Not listed' and suggest the manufacturer page."
)
payload = {"context": context, "question": q}
t0 = time.perf_counter()
resp = client.responses.create(
model=OPENAI_MODEL,
input=[{"role":"system","content":sys},{"role":"user","content":_json_dump_safe(payload)}],
max_output_tokens=520,
)
_tlog("llm qa", t0)
return (getattr(resp, "output_text", "") or "").strip()
# ----------------------------
# Chat utilities
# ----------------------------
def df_to_md(df: pd.DataFrame) -> str:
try:
return df.to_markdown(index=False)
except Exception:
cols = list(df.columns)
lines = ["| " + " | ".join(cols) + " |", "| " + " | ".join(["---"]*len(cols)) + " |"]
for _, r in df.iterrows():
lines.append("| " + " | ".join([str(r.get(c,"")) for c in cols]) + " |")
return "\n".join(lines)
def extract_device_terms(msg: str) -> List[str]:
raw = [x.strip() for x in re.split(r"[\n,;]+", str(msg or "")) if x.strip()]
out=[]
for x in raw:
if re.search(r"\d", x) or re.search(r"\b(IBR|AER|WR|XR|IR|RUT|MBR|E\d{3}|R\d{3})\b", x, flags=re.IGNORECASE):
out.append(x)
return out
def parse_install_mode(msg: str) -> Tuple[Optional[str], Optional[str]]:
t = str(msg or "").strip().lower()
mode = None
detail = None
if "vehicle" in t or "mobile" in t:
mode = "vehicle"
if "stationary" in t or "fixed" in t or "site" in t:
mode = "stationary"
if "indoor" in t:
detail = "indoor"
if "outdoor" in t:
detail = "outdoor"
if "directional" in t:
detail = "directional"
return mode, detail
def make_case_key(s: str) -> str:
s = str(s or "").strip()
return re.sub(r"\s+", " ", s)[:80]
# ----------------------------
# Chat UI (schema-safe)
# ----------------------------
with gr.Blocks(title="Only-Routers") as demo:
gr.Markdown("## Only-Routers\nChat mode for Verizon reps (multiple devices per message).")
state = gr.State("{}")
chatbot = gr.Chatbot(label="Only-Routers Chat", height=560, type="tuples")
msg = gr.Textbox(label="Message", placeholder="Example: RUT240, WR21\nVehicle install", lines=2)
send = gr.Button("Send", variant="primary")
def chat_fn(user_msg, history, st_json):
t0 = time.perf_counter()
st = state_load(st_json)
st.setdefault("cases", {})
st.setdefault("last_case_keys", [])
st.setdefault("pending", {})
st.setdefault("awaiting_questions", False)
text = (user_msg or "").strip()
if not text:
return history, state_dump(st)
# Pending A/B pick
if st.get("pending", {}).get("type") == "pick":
opts = st["pending"].get("options", [])
choice = text.strip().lower()
idx = 0 if choice in {"a","1"} else (1 if choice in {"b","2"} else None)
if idx is None or idx >= len(opts):
history.append((text, "Please reply with **A** or **B**."))
return history, state_dump(st)
chosen_row = int(opts[idx]["row_idx"])
life_row = df_eos.iloc[chosen_row]
eos, eol, status = row_to_dates_and_status(life_row)
repl = pick_replacements(life_row, status)
canon_make = str(life_row.get("_canon_make","UNKNOWN"))
feat_df, fit_df = build_tables(repl["repl_4g"], repl["repl_5g"], canon_make)
url4 = guess_maker_url(repl["repl_4g"], canon_make) if repl["repl_4g"] != "Not applicable" else ""
url5 = guess_maker_url(repl["repl_5g"], canon_make) if repl["repl_5g"] != "Not listed" else ""
ck = make_case_key(str(life_row.get("sku","")))
st["cases"][ck] = {"row_idx": chosen_row, "repl": repl, "canon_make": canon_make, "status": status, "eos": eos, "eol": eol, "urls": {"4g": url4, "5g": url5}}
st["last_case_keys"].append(ck)
st["pending"] = {"type":"install_mode", "case_keys":[ck]}
st["awaiting_questions"] = True
bot = []
bot.append(f"**{ck}**")
bot.append(f"- Status: **{status}** | EOS: **{eos}** | EOL: **{eol}**")
bot.append(f"- 4G alternative: **{repl['repl_4g']}**")
bot.append(f"- 5G replacement: **{repl['repl_5g']}**")
if url4:
bot.append(f"- 4G manufacturer page: {url4}")
if url5:
bot.append(f"- 5G manufacturer page: {url5}")
bot.append("\n**Replacement features**\n" + df_to_md(feat_df))
bot.append("\n**Verizon fit**\n" + df_to_md(fit_df))
bot.append("\nFor antennas: **Vehicle/Mobile** or **Stationary**? If Stationary: **Indoor**, **Outdoor**, or **Directional**.")
bot.append("Any questions about the suggested device(s)?")
history.append((text, "\n".join(bot)))
_tlog("chat pick flow", t0)
return history, state_dump(st)
# Pending install-mode
if st.get("pending", {}).get("type") == "install_mode":
mode, detail = parse_install_mode(text)
if mode is None:
history.append((text, "Quick one: **Vehicle/Mobile** or **Stationary**? If Stationary: **Indoor**, **Outdoor**, or **Directional**."))
return history, state_dump(st)
updates=[]
for ck in st["pending"].get("case_keys", []):
case = st["cases"].get(ck, {})
repl5 = (case.get("repl", {}) or {}).get("repl_5g","")
ant = antenna_pick(repl5, mode=mode, detail=detail)
case.setdefault("antennas", {})
case["antennas"][f"{mode}:{detail or ''}"] = ant
st["cases"][ck] = case
updates.append(f"**{ck}** antenna ({mode}{' / '+detail if detail else ''}): {ant.get('name','')} (PN {ant.get('part_number','')})")
st["pending"] = {}
history.append((text, "\n".join(updates)))
_tlog("chat antenna flow", t0)
return history, state_dump(st)
# Device lookup
device_terms = extract_device_terms(text)
if device_terms:
bots=[]
new_case_keys=[]
for term in device_terms:
res = resolve_device(term)
if res.get("mode") == "pick":
st["pending"] = {"type":"pick", "options": res.get("options", []), "raw": term}
opts = res.get("options", [])
bot = "I found more than one close match. Reply **A** or **B**:\n"
for i,o in enumerate(opts):
bot += f"- **{'A' if i==0 else 'B'}**: {o.get('label','')}\n"
history.append((text, bot.strip()))
_tlog("chat resolve->pick", t0)
return history, state_dump(st)
if res.get("mode") != "ok":
bots.append(f"**{term}**: not found in lifecycle list. Who makes it (manufacturer) and what's the exact model/SKU?")
continue
life_row = df_eos.iloc[int(res["row_idx"])]
eos, eol, status = row_to_dates_and_status(life_row)
repl = pick_replacements(life_row, status)
canon_make = str(life_row.get("_canon_make","UNKNOWN"))
t1 = time.perf_counter()
feat_df, fit_df = build_tables(repl["repl_4g"], repl["repl_5g"], canon_make)
_tlog("tables", t1)
url4 = guess_maker_url(repl["repl_4g"], canon_make) if repl["repl_4g"] != "Not applicable" else ""
url5 = guess_maker_url(repl["repl_5g"], canon_make) if repl["repl_5g"] != "Not listed" else ""
ck = make_case_key(str(life_row.get("sku","")) or term)
st["cases"][ck] = {"row_idx": int(res["row_idx"]), "repl": repl, "canon_make": canon_make, "status": status, "eos": eos, "eol": eol, "urls": {"4g": url4, "5g": url5}}
st["last_case_keys"].append(ck)
new_case_keys.append(ck)
bot=[]
bot.append(f"**{ck}**")
bot.append(f"- Status: **{status}** | EOS: **{eos}** | EOL: **{eol}**")
bot.append(f"- 4G alternative: **{repl['repl_4g']}**")
bot.append(f"- 5G replacement: **{repl['repl_5g']}**")
if url4:
bot.append(f"- 4G manufacturer page: {url4}")
if url5:
bot.append(f"- 5G manufacturer page: {url5}")
bot.append("\n**Replacement features**\n" + df_to_md(feat_df))
bot.append("\n**Verizon fit**\n" + df_to_md(fit_df))
bots.append("\n".join(bot))
if new_case_keys:
st["pending"] = {"type":"install_mode", "case_keys": new_case_keys}
bots.append("\nFor antennas: **Vehicle/Mobile** or **Stationary**? If Stationary: **Indoor**, **Outdoor**, or **Directional**.")
bots.append("Any questions about the suggested device(s)?")
st["awaiting_questions"] = True
history.append((text, "\n\n---\n\n".join(bots)))
_tlog("chat lookup flow", t0)
return history, state_dump(st)
# Q&A about most recent case
if not st.get("last_case_keys"):
history.append((text, "Tell me the router model/SKU you’re working with (you can paste multiple)."))
return history, state_dump(st)
ck = st["last_case_keys"][-1]
case = st["cases"].get(ck, {})
ctx = {"case": ck, "replacements": case.get("repl", {}), "urls": case.get("urls", {}), "antennas": case.get("antennas", {})}
ans = gpt_answer(text, ctx)
history.append((text, ans))
_tlog("chat qa flow", t0)
return history, state_dump(st)
send.click(fn=chat_fn, inputs=[msg, chatbot, state], outputs=[chatbot, state], api_name=False)
demo.launch(server_name="0.0.0.0", server_port=int(os.getenv("PORT","7860")), share=False, show_api=False)