Only-Routers / Updates /app_latest_working.py
crazycrazypete's picture
Upload folder using huggingface_hub
0fbe5f9 verified
import os
import re
import json
import math
import hashlib
import tempfile
from dataclasses import dataclass
from datetime import datetime, date
from typing import Any, Dict, List, Optional, Tuple
import numpy as np
import pandas as pd
import fitz # PyMuPDF
import faiss
from sentence_transformers import SentenceTransformer
from rapidfuzz import fuzz, process
import gradio as gr
from openai import OpenAI
# ============================
# Settings
# ============================
TODAY = date(2026, 1, 18)
OPENAI_MODEL = "gpt-5.2"
OPENAI_REASONING = {"effort": "high"}
MATCH_OK = 80
EMBED_MODEL_NAME = "sentence-transformers/all-MiniLM-L6-v2"
PARSEC_CONTEXT_BEFORE = 900
PARSEC_CONTEXT_AFTER = 1600
# ============================
# OpenAI client (HF Space secret: OPENAI_API_KEY)
# ============================
API_KEY = os.getenv("OPENAI_API_KEY", "").strip()
client = OpenAI(api_key=API_KEY) if API_KEY else None
# ----------------------------
# Gradio state helpers
# Keep state as a JSON STRING to avoid schema issues on Hugging Face.
# ----------------------------
def state_load(st_json: str) -> Dict[str, Any]:
try:
if not st_json:
return {}
return json.loads(st_json) if isinstance(st_json, str) else {}
except Exception:
return {}
def state_dump(st: Dict[str, Any]) -> str:
try:
return json.dumps(st or {}, ensure_ascii=False)
except Exception:
return "{}"
# ============================
# Helpers
# ============================
def norm_text(s: Any) -> str:
try:
if s is None or (isinstance(s, float) and math.isnan(s)) or pd.isna(s):
return ""
except Exception:
pass
s = str(s).strip().lower()
s = re.sub(r"[^a-z0-9\s\-\/]", " ", s)
s = re.sub(r"\s+", " ", s).strip()
return s
def safe_str(v: Any) -> str:
if v is None or (isinstance(v, float) and pd.isna(v)) or pd.isna(v):
return ""
return str(v).strip()
def is_5g(modem_type: Any) -> bool:
s = norm_text(modem_type)
return ("5g" in s) or ("nr" in s)
def json_load_safe(s: str) -> Dict[str, Any]:
try:
return json.loads(s)
except Exception:
return {}
def gpt_json(system: str, payload: Dict[str, Any], max_tokens: int = 600) -> Dict[str, Any]:
if client is None:
return {}
resp = client.responses.create(
model=OPENAI_MODEL,
reasoning=OPENAI_REASONING,
input=[{"role":"system","content":system},{"role":"user","content":json.dumps(payload)}],
max_output_tokens=max_tokens,
)
return json_load_safe(getattr(resp, "output_text", "") or "")
def gpt_answer_md(system: str, user: str, max_tokens: int = 650) -> str:
"""Return a rep-friendly markdown answer."""
if client is None:
return "No API key is configured, so I can't answer detailed questions right now."
resp = client.responses.create(
model=OPENAI_MODEL,
reasoning=OPENAI_REASONING,
input=[
{"role": "system", "content": system},
{"role": "user", "content": user},
],
max_output_tokens=max_tokens,
)
return (getattr(resp, "output_text", "") or "").strip()
# ============================
# Load data
# ============================
EOS_PATH = "routers_eos_eol_by_sku.csv"
DEC_PATH = "dec2025routers.csv"
PARSEC_PDF = "ParsecCatalog.pdf"
if not os.path.exists(EOS_PATH):
raise FileNotFoundError(f"Missing {EOS_PATH} in repo.")
if not os.path.exists(DEC_PATH):
raise FileNotFoundError(f"Missing {DEC_PATH} in repo.")
if not os.path.exists(PARSEC_PDF):
raise FileNotFoundError(f"Missing {PARSEC_PDF} in repo.")
df_eos = pd.read_csv(EOS_PATH).copy()
df_dec = pd.read_csv(DEC_PATH).copy()
def _canonize_eos_columns(df: pd.DataFrame) -> pd.DataFrame:
"""Normalize lifecycle CSV column names (case-insensitive) and create expected columns."""
# Map various header spellings to canonical names used by the app
mapping = {}
for c in df.columns:
k = str(c).strip().lower().replace(" ", "_")
if k in {"sku", "model", "device", "device_sku"}:
mapping[c] = "sku"
elif k in {"manufacturer", "make", "vendor"}:
mapping[c] = "manufacturer"
elif k in {"device_type", "type"}:
mapping[c] = "device_type"
elif k in {"end_of_sale", "eos", "end_sale", "end_of_sales"}:
mapping[c] = "end_of_sale"
elif k in {"end_of_life", "eol", "end_life"}:
mapping[c] = "end_of_life"
elif k in {"suggested_replacement", "replacement_4g", "lte_replacement", "replacement_lte", "replacement"}:
mapping[c] = "suggested_replacement"
elif k in {"advanced_5g_option", "replacement_5g", "fiveg_replacement", "5g_replacement", "upgrade_5g"}:
mapping[c] = "advanced_5g_option"
elif k in {"region", "market"}:
mapping[c] = "region"
elif k in {"notes", "note"}:
mapping[c] = "notes"
elif k in {"description", "device_description", "name"}:
mapping[c] = "description"
df = df.rename(columns=mapping).copy()
# Create expected columns if missing
if "sku" not in df.columns:
# Try the common capitalized header as a fallback
if "SKU" in df.columns:
df["sku"] = df["SKU"].astype(str)
else:
df["sku"] = ""
if "manufacturer" not in df.columns:
df["manufacturer"] = ""
if "device_type" not in df.columns:
df["device_type"] = ""
if "description" not in df.columns:
# If the simplified file removed description, use SKU as description (still searchable)
df["description"] = df["sku"].astype(str)
if "notes" not in df.columns:
df["notes"] = ""
if "region" not in df.columns:
df["region"] = ""
if "suggested_replacement" not in df.columns:
df["suggested_replacement"] = ""
if "advanced_5g_option" not in df.columns:
df["advanced_5g_option"] = ""
if "end_of_sale" not in df.columns:
df["end_of_sale"] = ""
if "end_of_life" not in df.columns:
df["end_of_life"] = ""
return df
df_eos = _canonize_eos_columns(df_eos)
def region_ok(x: Any) -> bool:
s = str(x or "").strip().lower()
if not s:
return True
if "not specified" in s:
return True
if "north america" in s:
return True
if re.search(r"\busa\b", s):
return True
if re.search(r"\bunited\s+states\b", s):
return True
if re.search(r"\bu\.?s\.?\b", s):
return True
return False
if "region" in df_eos.columns:
df_eos = df_eos[df_eos["region"].apply(region_ok)].reset_index(drop=True)
# Maker mapping (includes Teltonika)
CANON_MAKER = {
"CRADLEPOINT": {"cradlepoint", "ericsson", "ericsson enterprise wireless"},
"SIERRA": {"sierra", "sierra wireless", "semtech", "airlink"},
"FEENEY": {"feeney", "feeney wireless", "inseego"},
"DIGI": {"digi", "accelerated", "accelerated concepts"},
"CISCO_MERAKI": {"meraki", "cisco meraki"},
"CISCO": {"cisco"},
"TELTONIKA": {"teltonika"},
}
def canon_maker_from_text(s: Any) -> str:
t = norm_text(s)
for canon, terms in CANON_MAKER.items():
for term in terms:
if term in t:
return canon
return "UNKNOWN"
df_eos["_canon_make"] = df_eos["manufacturer"].apply(canon_maker_from_text) if "manufacturer" in df_eos.columns else "UNKNOWN"
df_eos["_norm_sku"] = df_eos["sku"].apply(norm_text) if "sku" in df_eos.columns else ""
df_eos["_norm_desc"] = df_eos["description"].apply(norm_text) if "description" in df_eos.columns else ""
df_eos["_norm_notes"] = df_eos["notes"].apply(norm_text) if "notes" in df_eos.columns else ""
df_dec["_canon_make"] = df_dec["Make"].apply(canon_maker_from_text) if "Make" in df_dec.columns else "UNKNOWN"
df_dec["_norm_model"] = df_dec["Model"].apply(norm_text) if "Model" in df_dec.columns else ""
df_dec["_is5g"] = df_dec["Modem Type"].apply(is_5g) if "Modem Type" in df_dec.columns else False
# ============================
# Date helpers
# ============================
@dataclass
class ParsedDate:
raw: str
kind: str
value: Optional[date]
def parse_date_field(x: Any) -> ParsedDate:
raw = str(x or "").strip()
if not raw:
return ParsedDate(raw="", kind="missing", value=None)
# Common US formats: M/D/YY or M/D/YYYY (e.g., 6/24/24, 9/30/21)
for fmt in ("%m/%d/%y", "%m/%d/%Y", "%-m/%-d/%y", "%-m/%-d/%Y"):
try:
dt = datetime.strptime(raw, fmt).date()
return ParsedDate(raw=raw, kind="full", value=dt)
except Exception:
pass
# ISO-ish: YYYY
if re.fullmatch(r"\d{4}", raw):
y = int(raw)
if y == TODAY.year:
return ParsedDate(raw=raw, kind="year", value=date(y, 1, 1))
if y < TODAY.year:
return ParsedDate(raw=raw, kind="year", value=date(y, 1, 1))
return ParsedDate(raw=raw, kind="year", value=date(y, 12, 31))
# YYYY-MM
if re.fullmatch(r"\d{4}-\d{2}", raw):
try:
y, m = raw.split("-")
return ParsedDate(raw=raw, kind="year_month", value=date(int(y), int(m), 1))
except Exception:
return ParsedDate(raw=raw, kind="bad", value=None)
# YYYY-MM-DD
if re.fullmatch(r"\d{4}-\d{2}-\d{2}", raw):
try:
dt = datetime.strptime(raw, "%Y-%m-%d").date()
return ParsedDate(raw=raw, kind="full", value=dt)
except Exception:
return ParsedDate(raw=raw, kind="bad", value=None)
# Last resort: leave as raw (unparsed)
return ParsedDate(raw=raw, kind="bad", value=None)
if re.fullmatch(r"\d{4}-\d{2}-\d{2}", raw):
try:
dt = datetime.strptime(raw, "%Y-%m-%d").date()
return ParsedDate(raw=raw, kind="full", value=dt)
except Exception:
return ParsedDate(raw=raw, kind="bad", value=None)
return ParsedDate(raw=raw, kind="bad", value=None)
def display_date(pd_: ParsedDate) -> str:
if pd_.kind == "missing":
return "Not listed"
if pd_.kind == "bad":
return pd_.raw or "Not listed"
return pd_.raw
def status_from_eos_eol(eos: ParsedDate, eol: ParsedDate) -> str:
if eos.value is None and eol.value is None:
return "Unknown"
if eol.value is not None and eol.value <= TODAY:
return "End of Life"
if eos.value is not None and eos.value <= TODAY:
return "End of Sale"
return "Active"
def row_to_dates_and_status(row: pd.Series) -> Tuple[str, str, str]:
eos = parse_date_field(row.get("end_of_sale"))
eol = parse_date_field(row.get("end_of_life"))
return display_date(eos), display_date(eol), status_from_eos_eol(eos, eol)
# ============================
# Embeddings + Parsec index
# ============================
embedder = SentenceTransformer(EMBED_MODEL_NAME)
def extract_pdf_text_pages(path: str) -> List[str]:
doc = fitz.open(path)
return [doc[i].get_text("text") for i in range(len(doc))]
def build_parsec_cards(pages: List[str]) -> List[str]:
cards = []
for p in pages:
for m in re.finditer(r"Standard\s+SKU:", p):
start = max(0, m.start() - PARSEC_CONTEXT_BEFORE)
end = min(len(p), m.start() + PARSEC_CONTEXT_AFTER)
c = p[start:end].strip()
if len(c) >= 200:
cards.append(c)
out, seen = [], set()
for c in cards:
h = hashlib.sha1(c.encode("utf-8")).hexdigest()
if h not in seen:
seen.add(h); out.append(c)
return out
parsec_cards = build_parsec_cards(extract_pdf_text_pages(PARSEC_PDF))
parsec_emb = embedder.encode(parsec_cards, batch_size=64, show_progress_bar=False, normalize_embeddings=True)
parsec_emb = np.asarray(parsec_emb, dtype=np.float32)
parsec_index = faiss.IndexFlatIP(parsec_emb.shape[1])
parsec_index.add(parsec_emb)
# ============================
# Device resolution
# ============================
def label_for_row(i: int) -> str:
r = df_eos.iloc[i]
return f"{r.get('sku','')}{r.get('manufacturer','')}{r.get('description','')}"[:220]
EOS_LABELS = [label_for_row(i) for i in range(len(df_eos))]
EOS_CORPUS = []
for _, r in df_eos.iterrows():
EOS_CORPUS.append(" ".join([r.get("_norm_sku",""), r.get("_canon_make",""), r.get("_norm_desc",""), r.get("_norm_notes","")]))
def local_candidates(query: str, top_k: int = 6) -> List[Tuple[int, int, str]]:
q = norm_text(query)
hits = process.extract(q, EOS_CORPUS, scorer=fuzz.WRatio, limit=top_k)
return [(int(idx), int(score), EOS_LABELS[int(idx)]) for _, score, idx in hits]
def gpt_choose_device(user_text: str, candidates: List[Tuple[int,int,str]]) -> Dict[str, Any]:
if client is None:
return {}
sys = "Pick which router the user meant. Never invent. Return strict JSON only."
payload = {
"user_input": user_text,
"candidates": [{"row_idx": i, "score": s, "label": lbl} for (i,s,lbl) in candidates],
"rules": [
"If one is clearly correct, return mode='ok' with row_idx.",
"If two are plausible, return mode='pick' with top 2 options."
],
"output_schema": {"mode":"ok|pick","row_idx":"int","options":[{"row_idx":"int","label":"string"}]}
}
return gpt_json(sys, payload, max_tokens=280)
def resolve_device(user_text: str) -> Dict[str, Any]:
q = norm_text(user_text)
exact = df_eos.index[df_eos["_norm_sku"] == q].tolist()
if len(exact) == 1:
return {"mode":"ok","row_idx": int(exact[0])}
if len(exact) > 1:
opts = [{"row_idx": int(i), "label": EOS_LABELS[int(i)]} for i in exact[:2]]
return {"mode":"pick","options": opts}
cands = local_candidates(user_text, top_k=6)
if not cands:
return {"mode":"not_found"}
if cands[0][1] >= 95 and (len(cands) == 1 or (cands[0][1] - cands[1][1]) >= 8):
return {"mode":"ok","row_idx": cands[0][0]}
g = gpt_choose_device(user_text, cands)
if g.get("mode") == "ok" and isinstance(g.get("row_idx"), int):
return {"mode":"ok","row_idx": int(g["row_idx"])}
if g.get("mode") == "pick":
opts = g.get("options", []) or []
opts2 = [{"row_idx": int(o["row_idx"]), "label": str(o["label"])} for o in opts[:2] if "row_idx" in o]
if opts2:
return {"mode":"pick","options": opts2}
if len(cands) > 1:
return {"mode":"pick","options":[{"row_idx":cands[0][0],"label":cands[0][2]},{"row_idx":cands[1][0],"label":cands[1][2]}]}
return {"mode":"pick","options":[{"row_idx":cands[0][0],"label":cands[0][2]}]}
# ============================
# Replacements — lifecycle CSV source of truth
# ============================
def extract_model_token(text: str) -> str:
s = safe_str(text)
if not s:
return ""
parts = [p.strip() for p in s.split("|") if p.strip()]
candidates = parts[::-1] if parts else [s]
for cand in candidates:
m = re.search(r"\bRUT[A-Z]?\d{2,4}\b", cand.upper())
if m:
return m.group(0).upper()
m = re.search(r"\bIX\d{2}\b", cand, flags=re.IGNORECASE)
if m:
return m.group(0).upper()
m = re.search(r"\b(R\d{3,4}|E\d{3,4}|S\d{3,4})\b", cand, flags=re.IGNORECASE)
if m:
return m.group(0).upper()
m = re.search(r"\b[A-Z]{1,6}\d{2,4}[A-Z]?\b", cand.upper())
if m:
return m.group(0).upper()
return candidates[0][:60]
def device_is_4g(row: pd.Series) -> bool:
# Detect LTE/4G even when the description uses "Cat 4 / Cat6 / Cat 12" without saying "LTE"
t = norm_text(row.get("description","")) + " " + norm_text(row.get("notes","")) + " " + norm_text(row.get("sku",""))
# If it explicitly says 5G/NR, treat as not 4G-only
if ("5g" in t) or ("nr" in t):
return False
# Classic signals
if ("lte" in t) or ("4g" in t):
return True
# LTE category signals (Cat 1..20 are LTE categories; Cat M1/M2 are LTE-M)
if re.search(r"\bcat\s*[-]?\s*(m1|m2)\b", t):
return True
m = re.search(r"\bcat\s*[-]?\s*(\d{1,2})\b", t)
if m:
try:
cat = int(m.group(1))
if 0 < cat <= 20:
return True
except Exception:
pass
# If "cat" appears at all, it's almost always LTE-family
if "cat" in t:
return True
return False
# If it explicitly says 5G/NR, treat as not 4G-only
if ("5g" in t) or ("nr" in t):
return False
# Classic signals
if ("lte" in t) or ("4g" in t):
return True
# LTE category signals (Cat 1..20 are LTE categories; Cat M1/M2 are LTE-M)
if re.search(r"\bcat\s*[-]?\s*(m1|m2)\b", t):
return True
m = re.search(r"\bcat\s*[-]?\s*(\d{1,2})\b", t)
if m:
try:
cat = int(m.group(1))
if 0 < cat <= 20:
return True
except Exception:
pass
# If "cat" appears at all, it's almost always LTE-family
if "cat" in t:
return True
return False
def candidate_5g_models_from_lifecycle(manufacturer: str) -> List[str]:
mfr = norm_text(manufacturer)
pool = df_eos[df_eos["manufacturer"].astype(str).str.lower().eq(mfr)].copy() if "manufacturer" in df_eos.columns else df_eos.copy()
vals = pool["advanced_5g_option"].tolist() if "advanced_5g_option" in pool.columns else []
out, seen = [], set()
for v in vals:
tok = extract_model_token(v)
if tok and tok.lower() != "nan" and tok not in seen:
seen.add(tok); out.append(tok)
return out
def candidate_4g_models_from_lifecycle(manufacturer: str) -> List[str]:
mfr = norm_text(manufacturer)
pool = df_eos[df_eos["manufacturer"].astype(str).str.lower().eq(mfr)].copy() if "manufacturer" in df_eos.columns else df_eos.copy()
vals = pool["suggested_replacement"].tolist() if "suggested_replacement" in pool.columns else []
out, seen = [], set()
for v in vals:
tok = extract_model_token(v)
if tok and tok.lower() != "nan" and tok not in seen:
seen.add(tok); out.append(tok)
return out
def gpt_pick_from_candidates(old_row: pd.Series, candidates: List[str], need: str) -> str:
if client is None or not candidates:
return ""
sys = "Pick the best replacement model. Choose only from candidates. Return strict JSON only."
payload = {
"old_device": {
"sku": str(old_row.get("sku","")),
"manufacturer": str(old_row.get("manufacturer","")),
"description": str(old_row.get("description","")),
"need": need,
},
"candidates": candidates[:40],
"output_schema": {"choice":"string"}
}
out = gpt_json(sys, payload, max_tokens=240) or {}
choice = str(out.get("choice","") or "").strip()
return choice if choice in candidates else ""
def fallback_5g_from_dec(canon_make: str) -> str:
pool5 = df_dec[(df_dec["_canon_make"] == canon_make) & (df_dec["_is5g"] == True)]
return str(pool5.iloc[0]["Model"]).strip() if not pool5.empty else ""
def pick_replacements_lifecycle(row: pd.Series, status: str, use_gpt: bool = True) -> Dict[str, Any]:
canon = str(row.get("_canon_make","UNKNOWN"))
manufacturer = str(row.get("manufacturer","") or "")
sug_raw = safe_str(row.get("suggested_replacement",""))
adv_raw = safe_str(row.get("advanced_5g_option",""))
has_4g_alt = bool(sug_raw.strip())
has_5g_alt = bool(adv_raw.strip())
# Treat as 4G if the description indicates LTE OR lifecycle provides a 4G suggested replacement
is_4g = device_is_4g(row) or has_4g_alt
# Provide 5G option if the unit is 4G, EOS/EOL, or lifecycle explicitly provides advanced_5g_option
want_5g = is_4g or (status in {"End of Sale","End of Life"}) or has_5g_alt
# 4G alternative: show whenever lifecycle provides it (or device appears 4G)
repl_4g = "Not applicable"
if is_4g or has_4g_alt:
repl_4g = extract_model_token(sug_raw)
if not repl_4g:
cand4 = candidate_4g_models_from_lifecycle(manufacturer)
repl_4g = (gpt_pick_from_candidates(row, cand4, "4G alternative") if (use_gpt and client) else "") or (cand4[0] if cand4 else "")
if not repl_4g:
repl_4g = "Not applicable"
# 5G replacement: prefer lifecycle advanced_5g_option whenever present
repl_5g = "Not listed"
if want_5g:
repl_5g = extract_model_token(adv_raw)
if not repl_5g:
cand5 = candidate_5g_models_from_lifecycle(manufacturer)
repl_5g = (gpt_pick_from_candidates(row, cand5, "5G replacement/upgrade") if (use_gpt and client) else "") or (cand5[0] if cand5 else "")
if not repl_5g:
repl_5g = fallback_5g_from_dec(canon) or "Not listed"
if repl_5g.lower() == "nan":
repl_5g = "Not listed"
return {"repl_4g": repl_4g, "repl_5g": repl_5g, "sources": ["lifecycle_csv"] + (["gpt"] if (use_gpt and client) else [])}
# ============================
# Antennas (Parsec-only)
# ============================
PARSEC_FAMILY_WORDS = {"chinook","labrador","boxer","bloodhound","husky","beagle","mastiff","collie","shepherd","belgian","australian","terrier","pyrenees"}
BAD_NAME_MARKERS = {"customization","standard connectors","connectors","features","benefits","specifications","mechanical","electrical","mounting","accessories","description:","standard sku"}
def clean_line(s: str) -> str:
s = re.sub(r"\s+", " ", str(s or "").strip())
if re.fullmatch(r"-[a-z0-9]+", s.lower()):
return ""
return s
def is_bad_name_line(line: str) -> bool:
low = line.lower()
if any(m in low for m in BAD_NAME_MARKERS):
return True
if re.search(r"\b-[a-z0-9]{1,4}\b", low) and len(low) <= 25:
return True
return False
def family_from_line(line: str) -> str:
low = line.lower()
for fam in PARSEC_FAMILY_WORDS:
if fam in low:
return fam.capitalize()
return ""
def parsec_connectors_from_card(t: str) -> str:
m = re.search(r"Standard\s+Connectors:\s*(.+)", t, flags=re.IGNORECASE)
if m:
return re.sub(r"\s+", " ", m.group(1).strip())[:80]
return ""
def parsec_mounts_from_card(t: str) -> List[str]:
mounts = []
for m in re.finditer(r"Mount:\s*(.+)", t, flags=re.IGNORECASE):
val = re.sub(r"\s+", " ", m.group(1).strip())
parts = [p.strip().lower() for p in val.split(",") if p.strip()]
mounts.extend(parts)
out = []
seen = set()
for x in mounts:
if x not in seen:
seen.add(x); out.append(x)
return out
def parsec_name_from_card(card_text: str) -> str:
lines = [clean_line(ln) for ln in str(card_text or "").splitlines()]
lines = [ln for ln in lines if ln]
for ln in lines:
if is_bad_name_line(ln):
continue
fam = family_from_line(ln)
if fam:
return fam
sku_i = None
for i, ln in enumerate(lines):
if "standard sku" in ln.lower():
sku_i = i
break
if sku_i is not None:
window = lines[max(0, sku_i - 12):sku_i]
for ln in reversed(window):
if is_bad_name_line(ln):
continue
if 3 <= len(ln) <= 40 and re.search(r"[A-Za-z]", ln):
return ln.split()[0].capitalize()
return "Parsec antenna"
def parsec_part_from_card(t: str) -> str:
m = re.search(r"Standard\s+SKU:\s*([A-Z0-9]+)", t)
return m.group(1).strip() if m else ""
def parsec_desc_from_card(t: str) -> str:
m = re.search(r"Description:\s*(.+?)(?:\n|$)", t, flags=re.IGNORECASE)
return re.sub(r"\s+"," ",m.group(1).strip())[:220] if m else ""
def parsec_retrieve(query: str, top_k: int = 12) -> List[Dict[str, Any]]:
qv = embedder.encode([query], normalize_embeddings=True)
qv = np.asarray(qv, dtype=np.float32)
scores, ids = parsec_index.search(qv, top_k)
out: List[Dict[str, Any]] = []
for sc, i in zip(scores[0].tolist(), ids[0].tolist()):
if 0 <= int(i) < len(parsec_cards):
card = parsec_cards[int(i)]
out.append({
"score": float(sc),
"name": parsec_name_from_card(card),
"part_number": parsec_part_from_card(card),
"description": parsec_desc_from_card(card),
"connectors": parsec_connectors_from_card(card),
"mounts": parsec_mounts_from_card(card),
"_card": card.lower(),
})
return out
def choose_best_parsec(cands: List[Dict[str, Any]], mode: str) -> Dict[str, Any]:
best = None
best_score = -1e9
for c in cands:
card = c.get("_card","")
mounts = c.get("mounts", []) or []
score = float(c.get("score", 0.0))
if "omni" in card:
score += 0.6
if "directional" in card:
score -= 1.5
if mode == "vehicle":
if any("magnetic" in m for m in mounts):
score += 3.0
if any("through" in m for m in mounts):
score += 2.0
if any("wall" in m for m in mounts) or any("pole" in m for m in mounts):
score -= 1.2
if "app: fixed" in card and "mobile" not in card:
score -= 2.0
if mode == "stationary":
if any("wall" in m for m in mounts):
score += 2.0
if any("pole" in m for m in mounts):
score += 1.8
if score > best_score:
best_score = score
best = c
if not best:
return {"name":"Parsec antenna","part_number":"","description":"","connectors":"","mounts":[]}
best = dict(best)
best.pop("_card", None)
return best
def infer_mimo_for_5g(repl_5g_model: str) -> str:
"""Rule: every 5G router uses a 4x4 antenna."""
return "4x4"
# If the model name hints 5G, lean 4x4
if "5g" in model.lower() or model.upper().startswith(("R", "E", "S", "IX", "RUTM")):
default = "4x4"
else:
default = "2x2"
# Use dec2025routers.csv if we can match the model under the same maker family
try:
pool = df_dec[df_dec["_canon_make"] == canon_make].copy()
if pool.empty:
return default
hit = process.extractOne(norm_text(model), pool["_norm_model"].tolist(), scorer=fuzz.WRatio)
if not hit or hit[1] < MATCH_OK:
return default
row = pool.iloc[int(hit[2])]
txt2 = (str(row.get("Antennas (internal/external/both)", "")) + " " + str(row.get("Modem Type", "")) + " " + str(row.get("Special notes",""))).lower()
if "4x4" in txt2 or "4 x 4" in txt2 or "4x 4" in txt2:
return "4x4"
if "2x2" in txt2 or "2 x 2" in txt2:
return "2x2"
# If modem type includes 5G, lean 4x4
if "5g" in txt2 or "nr" in txt2:
return "4x4"
return default
except Exception:
return default
def antenna_options_for(router_model: str, tech: str, mimo: str) -> Dict[str, Any]:
q_stationary = f"{router_model} {tech} {mimo} omni stationary pole wall fixed site Parsec"
q_vehicle = f"{router_model} {tech} {mimo} omni vehicle mobile magnetic through-bolt Parsec"
cand_stationary = parsec_retrieve(q_stationary, top_k=12)
cand_vehicle = parsec_retrieve(q_vehicle, top_k=12)
s = choose_best_parsec(cand_stationary, mode="stationary")
v = choose_best_parsec(cand_vehicle, mode="vehicle")
s.update({"mimo": mimo, "why": "Stationary omni best match."})
v.update({"mimo": mimo, "why": "Vehicle omni best match."})
return {"stationary_omni": s, "vehicle_omni": v, "sources":["parsec_rag"]}
# ============================
# Install-ready checklist
# ============================
def install_ready_checklist(current_sku: str, repl: Dict[str,Any], ant: Dict[str,Any]) -> str:
st = ant.get("stationary_omni", {})
vh = ant.get("vehicle_omni", {})
if client is not None:
sys = "Create a short, install-ready checklist for a Verizon rep. Return markdown only."
payload = {"current_device": current_sku, "replacements": repl, "antennas": {"stationary": st, "vehicle": vh}}
resp = client.responses.create(
model=OPENAI_MODEL,
reasoning=OPENAI_REASONING,
input=[{"role":"system","content":sys},{"role":"user","content":json.dumps(payload)}],
max_output_tokens=520,
)
return (getattr(resp, "output_text", "") or "").strip()
return "\n".join([
"### Install-ready checklist",
f"- Current device: {current_sku}",
f"- 5G replacement: {repl.get('repl_5g','')}",
f"- 4G alternative: {repl.get('repl_4g','Not applicable')}",
f"- Stationary omni antenna: {st.get('name','')} (PN {st.get('part_number','')})",
f"- Vehicle omni antenna: {vh.get('name','')} (PN {vh.get('part_number','')})",
"- Next steps: confirm mounting + cable lengths + power; place order; schedule install.",
])
# ============================
# Batch mode (NO GPT)
# ============================
def parse_batch_inputs(text_blob: str, file_obj: Any) -> List[str]:
items: List[str] = []
if file_obj is not None:
try:
path = file_obj.name if hasattr(file_obj, "name") else str(file_obj)
df = pd.read_csv(path)
col = df.columns[0]
items.extend([str(x).strip() for x in df[col].tolist() if str(x).strip()])
except Exception:
pass
if text_blob:
for ln in str(text_blob).splitlines():
ln = ln.strip()
if ln:
items.append(ln)
seen=set()
out=[]
for x in items:
k=norm_text(x)
if k and k not in seen:
seen.add(k); out.append(x)
return out
def run_batch(text_blob: str, file_obj: Any, include_antennas: bool):
inputs = parse_batch_inputs(text_blob, file_obj)
if not inputs:
return "", None, None, ""
rows=[]
for item in inputs:
res = resolve_device(item)
if res.get("mode") != "ok":
rows.append({"Input": item, "Matched":"", "Status":"Needs review", "EOS":"", "EOL":"", "4G alternative":"", "5G replacement":"", "Notes":"Not found/ambiguous"})
continue
life_row = df_eos.iloc[int(res["row_idx"])]
eos, eol, status = row_to_dates_and_status(life_row)
repl = pick_replacements_lifecycle(life_row, status, use_gpt=False)
rows.append({
"Input": item,
"Matched": str(life_row.get("sku","")),
"Status": status,
"EOS": eos,
"EOL": eol,
"4G alternative": repl.get("repl_4g",""),
"5G replacement": repl.get("repl_5g",""),
"Notes": "",
})
out_df = pd.DataFrame(rows)
counts = out_df["Status"].value_counts(dropna=False).to_dict()
top_5g = out_df["5G replacement"].value_counts(dropna=False).head(5).to_dict()
summary = f"Rows: {len(out_df)} | " + " | ".join([f"{k}: {v}" for k,v in counts.items()])
rollup = "Top 5G recommendations:\n" + "\n".join([f"- {k}: {v}" for k,v in top_5g.items() if str(k).strip()])
tmp = tempfile.NamedTemporaryFile(delete=False, suffix=".csv")
out_df.to_csv(tmp.name, index=False)
return summary, out_df, tmp.name, rollup
# ============================
# Replacement feature table + manufacturer link (5G device)
# ============================
FEATURE_COLS = ["Device", "Modem technology", "WiFi", "Ports", "Antennas", "Ruggedness", "Use case"]
# Manufacturer domains used for best-effort link resolution (no non-maker domains).
MAKER_DOMAINS = {
"CRADLEPOINT": ["cradlepoint.com", "ericsson.com"],
"SIERRA": ["semtech.com", "airlink.com"],
"FEENEY": ["inseego.com"],
"DIGI": ["digi.com"],
"CISCO_MERAKI": ["meraki.cisco.com", "cisco.com"],
"CISCO": ["cisco.com"],
"TELTONIKA": ["teltonika-networks.com"],
"UNKNOWN": [],
}
HTTP_HEADERS = {
"User-Agent": "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 "
"(KHTML, like Gecko) Chrome/120.0 Safari/537.36"
}
HTTP_TIMEOUT = 12
def _best_effort_manufacturer_url(model: str, canon_make: str) -> str:
"""Try to find a manufacturer page or datasheet link using simple on-domain searches.
If we can't confirm a page, return the manufacturer homepage for the maker family.
"""
model = str(model or "").strip()
if not model or model in {"Not listed", "Not applicable"}:
return ""
domains = MAKER_DOMAINS.get(canon_make, []) or []
if not domains:
return ""
# Candidate on-domain search URLs (common patterns across sites).
# We keep these on the manufacturer domain (no Google/Bing).
q = re.sub(r"\s+", "+", model)
url_candidates = []
for d in domains:
url_candidates += [
f"https://{d}/search?q={q}",
f"https://{d}/search?query={q}",
f"https://{d}/?s={q}",
f"https://www.{d}/search?q={q}",
f"https://www.{d}/search?query={q}",
f"https://www.{d}/?s={q}",
]
# Also try a few direct product patterns for known makers (best effort).
if canon_make == "TELTONIKA":
slug = model.lower()
url_candidates += [
f"https://teltonika-networks.com/products/routers/{slug}",
f"https://teltonika-networks.com/product/{slug}",
"https://teltonika-networks.com/products/routers/",
]
if canon_make == "DIGI":
url_candidates += [
"https://www.digi.com/products/networking/cellular-routers",
f"https://www.digi.com/search?q={q}",
]
if canon_make == "CRADLEPOINT":
url_candidates += [
"https://cradlepoint.com/products/",
f"https://cradlepoint.com/?s={q}",
]
if canon_make in {"CISCO", "CISCO_MERAKI"}:
url_candidates += [
f"https://www.cisco.com/c/en/us/search.html?q={q}",
]
# Try to confirm a working page (HTTP 200 and model string somewhere in HTML).
for u in url_candidates[:18]:
try:
import requests
r = requests.get(u, headers=HTTP_HEADERS, timeout=HTTP_TIMEOUT, allow_redirects=True)
if r.status_code != 200:
continue
html = (r.text or "").lower()
if model.lower() in html or "datasheet" in html or "data sheet" in html:
return r.url
except Exception:
continue
# Fallback: maker homepage
d0 = domains[0]
return f"https://{d0}"
def _features_from_dec(model: str, canon_make: str) -> Dict[str, str]:
"""Lookup a router model in dec2025routers.csv and return the key feature fields."""
if not model or model in {"Not listed", "Not applicable"}:
return {k: "Not listed" for k in FEATURE_COLS[1:]}
pool = df_dec[df_dec["_canon_make"] == canon_make].copy()
if pool.empty:
return {k: "Not listed" for k in FEATURE_COLS[1:]}
hit = process.extractOne(norm_text(model), pool["_norm_model"].tolist(), scorer=fuzz.WRatio)
if not hit or hit[1] < MATCH_OK:
return {k: "Not listed" for k in FEATURE_COLS[1:]}
r = pool.iloc[int(hit[2])]
ports = f"WAN: {r.get('WAN ports and speed','')} | LAN: {r.get('LAN ports and speed','')}"
return {
"Modem technology": str(r.get("Modem Type","")) or "Not listed",
"WiFi": str(r.get("WiFi type","")) or "Not listed",
"Ports": ports.strip() if ports.strip() else "Not listed",
"Antennas": str(r.get("Antennas (internal/external/both)","")) or "Not listed",
"Ruggedness": str(r.get("Ruggedization","")) or "Not listed",
"Use case": str(r.get("Primary use case","")) or "Not listed",
}
def _gpt_fill_feature_row(device_label: str, model: str, canon_make: str, row: Dict[str, str]) -> Dict[str, str]:
"""If dec can't supply values, ask GPT to fill missing ones (best guess)."""
if client is None:
return row
missing = [k for k,v in row.items() if (not v) or str(v).strip().lower() in {"not listed","nan",""}]
if not missing:
return row
sys = "Fill missing router feature fields for a Verizon rep. Return strict JSON only."
payload = {
"device_label": device_label,
"model": model,
"maker_family": canon_make,
"known": row,
"fill_only": missing,
"rules": [
"Fill only the requested fields.",
"Best guess if needed. Short phrases only.",
"Return JSON only."
],
"output_schema": {k: "string" for k in missing}
}
out = gpt_json(sys, payload, max_tokens=260) or {}
for k in missing:
val = str(out.get(k, "") or "").strip()
if val:
row[k] = val
return row
def build_replacement_features_table(repl_4g: str, repl_5g: str, canon_make: str) -> pd.DataFrame:
rows = []
# 4G
row4 = _features_from_dec(repl_4g, canon_make)
row4 = _gpt_fill_feature_row("4G alternative", repl_4g, canon_make, row4)
rows.append({"Device": "4G alternative", **row4})
# 5G
row5 = _features_from_dec(repl_5g, canon_make)
row5 = _gpt_fill_feature_row("5G replacement", repl_5g, canon_make, row5)
rows.append({"Device": "5G replacement", **row5})
df = pd.DataFrame(rows, columns=FEATURE_COLS)
return df
# ============================
# Verizon fit badges (small table) for recommended devices
# ============================
FIT_COLS = ["Device", "Fit badges", "Ethernet ports", "Battery"]
def _parse_ethernet_ports(wan_field: str, lan_field: str) -> str:
"""Best-effort total ethernet ports based on WAN/LAN text."""
def _count(field: str) -> int:
s = str(field or "")
# Common forms: "1x GbE", "2 x 10/100", "WAN: 1", etc.
nums = [int(x) for x in re.findall(r"(\\d+)\\s*x", s.lower())]
if nums:
return sum(nums)
# Fallback: if it contains 'port' with a number
m = re.search(r"(\\d+)\\s*port", s.lower())
if m:
return int(m.group(1))
# If it contains '1' and 'wan' in short text, guess 1
if "wan" in s.lower() and re.search(r"\\b1\\b", s):
return 1
return 0
total = _count(wan_field) + _count(lan_field)
return str(total) if total > 0 else "Not listed"
def _battery_badge(battery_field: str) -> str:
s = str(battery_field or "").strip().lower()
if not s or s in {"none", "no", "n/a", "not listed"}:
return "No"
return "Yes"
def _bool_badge(flag: bool) -> str:
return "Yes" if flag else "No"
def _dual_sim_from_row_text(*fields: str) -> bool:
txt = " ".join([str(x or "") for x in fields]).lower()
return ("dual sim" in txt) or ("2 sim" in txt) or ("two sim" in txt) or ("dual-sim" in txt)
def _throughput_high(throughput_field: str) -> bool:
t = str(throughput_field or "").lower()
# Heuristic: anything mentioning gbps or >=1000 mbps
if "gbps" in t:
return True
m = re.search(r"(\\d+(?:\\.\\d+)?)\\s*mbps", t)
if m:
try:
return float(m.group(1)) >= 1000.0
except Exception:
pass
return False
def _gpt_fit_badges(model: str, canon_make: str, is_5g: bool, dec_row: Optional[pd.Series]) -> Tuple[str, str, str]:
"""
GPT-based fill for Fit badges / Ethernet ports / Battery, used when dec is missing or incomplete.
Returns (badges_csv, ethernet_ports, battery_yesno).
"""
if client is None:
return ("Not listed", "Not listed", "Not listed")
dec_ctx = {}
if dec_row is not None:
try:
dec_ctx = {
"Model": str(dec_row.get("Model","")),
"Modem Type": str(dec_row.get("Modem Type","")),
"Ruggedization": str(dec_row.get("Ruggedization","")),
"WAN ports and speed": str(dec_row.get("WAN ports and speed","")),
"LAN ports and speed": str(dec_row.get("LAN ports and speed","")),
"Antennas": str(dec_row.get("Antennas (internal/external/both)","")),
"WiFi type": str(dec_row.get("WiFi type","")),
"Primary use case": str(dec_row.get("Primary use case","")),
"Serial port": str(dec_row.get("Serial port (yes/no)","")),
"VPN": str(dec_row.get("VPN capabilities","")),
"Throughput": str(dec_row.get("Router throughput","")),
"Battery": str(dec_row.get("Battery (internal/removable/none/optional)","")),
"Special notes": str(dec_row.get("Special notes","")),
"Summary": str(dec_row.get("summary and use case","")),
}
except Exception:
dec_ctx = {}
sys = (
"You are helping a Verizon rep. Based on the provided router context, output fit badges and a couple quick traits.\n"
"Return STRICT JSON only.\n"
"Badges must be chosen from this set only:\n"
"['Vehicle','Fixed site','Wi‑Fi','Rugged','Dual‑SIM','4x4 MIMO','High throughput','Serial'].\n"
"Rules:\n"
"- If is_5g is true, ALWAYS include '4x4 MIMO'.\n"
"- Ethernet ports: return a single integer as a string if you can infer total ethernet ports, otherwise 'Not listed'.\n"
"- Battery: return 'Yes' or 'No' if you can infer, otherwise 'Not listed'.\n"
"- If uncertain between Vehicle vs Fixed site, pick the most likely based on use case/ruggedization.\n"
)
payload = {
"model": model,
"maker_family": canon_make,
"is_5g": bool(is_5g),
"dec_context": dec_ctx,
"output_schema": {
"badges": ["string"],
"ethernet_ports": "string",
"battery": "Yes|No|Not listed"
}
}
out = gpt_json(sys, payload, max_tokens=260) or {}
badges = out.get("badges", []) or []
allowed = {"Vehicle","Fixed site","Wi‑Fi","Rugged","Dual‑SIM","4x4 MIMO","High throughput","Serial"}
clean = []
for b in badges:
bs = str(b).strip()
if bs in allowed:
clean.append(bs)
if is_5g and "4x4 MIMO" not in clean:
clean.append("4x4 MIMO")
eth = str(out.get("ethernet_ports","") or "").strip()
if not eth or eth.lower() in {"nan","none"}:
eth = "Not listed"
m = re.search(r"\d+", eth)
eth = m.group(0) if m else ("Not listed" if eth == "Not listed" else eth)
bat = str(out.get("battery","") or "").strip()
if not bat:
bat = "Not listed"
if bat.lower().startswith("y"):
bat = "Yes"
elif bat.lower().startswith("n"):
bat = "No"
elif bat not in {"Yes","No","Not listed"}:
bat = "Not listed"
dedup=[]
seen=set()
for b in clean:
if b not in seen:
seen.add(b); dedup.append(b)
badges_csv = ", ".join(dedup) if dedup else "Not listed"
return (badges_csv, eth, bat)
def _fit_badges_for_model(model: str, canon_make: str, is_5g: bool) -> Tuple[str, str, str]:
"""Return (badges_csv, ethernet_ports, battery_yesno). Uses dec2025routers.csv first, then GPT fill."""
model = str(model or "").strip()
if not model or model in {"Not listed", "Not applicable"}:
return ("Not listed", "Not listed", "Not listed")
pool = df_dec[df_dec["_canon_make"] == canon_make].copy()
row = None
if not pool.empty:
hit = process.extractOne(norm_text(model), pool["_norm_model"].tolist(), scorer=fuzz.WRatio)
if hit and hit[1] >= MATCH_OK:
row = pool.iloc[int(hit[2])]
badges = []
eth = "Not listed"
bat_yes = "Not listed"
if row is not None:
use_case = str(row.get("Primary use case","") or "").lower()
rugged = str(row.get("Ruggedization","") or "").lower()
if any(k in use_case for k in ["vehicle","mobile","fleet","in-vehicle"]) or "vehicle" in rugged:
badges.append("Vehicle")
else:
badges.append("Fixed site")
wifi = str(row.get("WiFi type","") or "").strip()
if wifi and wifi.lower() not in {"none","no","n/a"}:
badges.append("Wi‑Fi")
if any(k in rugged for k in ["rugged","industrial","ip","harsh"]):
badges.append("Rugged")
notes_blob = " ".join([
str(row.get("Special notes","") or ""),
str(row.get("summary and use case","") or ""),
]).lower()
if "dual" in notes_blob and "sim" in notes_blob:
badges.append("Dual‑SIM")
if is_5g:
badges.append("4x4 MIMO")
thr = str(row.get("Router throughput","") or "").lower()
m = re.search(r"(\d+(\.\d+)?)\s*gb", thr)
if m:
try:
if float(m.group(1)) >= 1.0:
badges.append("High throughput")
except Exception:
pass
serial = str(row.get("Serial port (yes/no)","") or "").strip().lower()
if serial in {"yes","y","true"}:
badges.append("Serial")
wan = str(row.get("WAN ports and speed","") or "")
lan = str(row.get("LAN ports and speed","") or "")
m1 = re.search(r"(\d+)\s*x", wan.lower())
m2 = re.search(r"(\d+)\s*x", lan.lower())
if m1 or m2:
total = (int(m1.group(1)) if m1 else 0) + (int(m2.group(1)) if m2 else 0)
eth = str(total) if total > 0 else "Not listed"
bat = str(row.get("Battery (internal/removable/none/optional)","") or "")
bat_l = bat.lower().strip()
if bat_l:
if "none" in bat_l:
bat_yes = "No"
else:
bat_yes = "Yes"
# Use GPT when anything is missing (instead of best-effort inference)
if (row is None) or (eth == "Not listed") or (bat_yes == "Not listed") or (not badges):
g_badges, g_eth, g_bat = _gpt_fit_badges(model, canon_make, is_5g, row)
if badges:
if is_5g and "4x4 MIMO" not in badges:
badges.append("4x4 MIMO")
dedup=[]
seen=set()
for b in badges:
if b not in seen:
seen.add(b); dedup.append(b)
badges_csv = ", ".join(dedup)
else:
badges_csv = g_badges
eth = eth if eth != "Not listed" else g_eth
bat_yes = bat_yes if bat_yes != "Not listed" else g_bat
return (badges_csv or "Not listed", eth or "Not listed", bat_yes or "Not listed")
dedup=[]
seen=set()
for b in badges:
if b not in seen:
seen.add(b); dedup.append(b)
badges_csv = ", ".join(dedup) if dedup else "Not listed"
return (badges_csv, eth, bat_yes)
def build_fit_table(repl_4g: str, repl_5g: str, canon_make: str) -> pd.DataFrame:
rows = []
# 4G alt row (is_5g False)
b4, eth4, bat4 = _fit_badges_for_model(repl_4g, canon_make, is_5g=False)
rows.append({"Device": "4G alternative", "Fit badges": b4, "Ethernet ports": eth4, "Battery": bat4})
# 5G row (is_5g True)
b5, eth5, bat5 = _fit_badges_for_model(repl_5g, canon_make, is_5g=True)
rows.append({"Device": "5G replacement", "Fit badges": b5, "Ethernet ports": eth5, "Battery": bat5})
return pd.DataFrame(rows, columns=FIT_COLS)
# ============================
# Output
# ============================
def assemble_output(life_row: pd.Series, status: str, eos: str, eol: str, repl: Dict[str,Any], ant: Dict[str,Any]) -> str:
current_name = f"{life_row.get('sku','')}{life_row.get('description','')}".strip(" —")
st = ant.get("stationary_omni", {})
vh = ant.get("vehicle_omni", {})
lines = []
lines.append(f"1. Current device: **{current_name}**")
lines.append(f"2. Status: **{status}**")
lines.append(f"3. End of Sale date: **{eos}**")
lines.append(f"4. End of Life date: **{eol}**")
lines.append(f"5. 4G alternative (lifecycle): **{repl.get('repl_4g','Not applicable')}**")
lines.append(f"6. 5G replacement (lifecycle): **{repl.get('repl_5g','Not listed')}**")
lines.append("7. Antenna options (Parsec-only):")
conn_s = f" | Conn: {st.get('connectors','')}" if st.get("connectors") else ""
conn_v = f" | Conn: {vh.get('connectors','')}" if vh.get("connectors") else ""
lines.append(f" - Stationary (Omni): **{st.get('name','')}** (Part #: {st.get('part_number','')}) — {st.get('description','')} — MIMO: {st.get('mimo','')}{conn_s}")
lines.append(f" - Vehicle (Omni): **{vh.get('name','')}** (Part #: {vh.get('part_number','')}) — {vh.get('description','')} — MIMO: {vh.get('mimo','')}{conn_v}")
lines.append("\nSources (debug):")
for s in repl.get("sources", []) if isinstance(repl.get("sources"), list) else []:
lines.append(f"- {s}")
lines.append("- ParsecCatalog.pdf (local RAG)")
lines.append("- routers_eos_eol_by_sku.csv (replacements)")
return "\n".join(lines)
# ============================
# Customer-ready email summary (single lookup only)
# ============================
def build_customer_email(life_row: pd.Series, status: str, eos: str, eol: str, repl: Dict[str,Any], ant: Dict[str,Any], link5: str) -> str:
"""Email-style summary the rep can paste to a customer (lightly sales-y)."""
current = f"{life_row.get('sku','')}{life_row.get('description','')}".strip(" —")
repl5 = str(repl.get("repl_5g","") or "").strip()
repl4 = str(repl.get("repl_4g","") or "").strip()
st = ant.get("stationary_omni", {}) or {}
vh = ant.get("vehicle_omni", {}) or {}
lines = []
lines.append("Subject: Router replacement recommendation")
lines.append("")
lines.append("Hi there,")
lines.append("")
lines.append(f"We reviewed your current router (**{current}**) and recommend the following path forward:")
lines.append("")
lines.append(f"- **Status:** {status}")
lines.append(f"- **End of Sale:** {eos}")
lines.append(f"- **End of Life:** {eol}")
lines.append("")
lines.append("**Recommended replacement (5G):**")
lines.append(f"- {repl5 if repl5 else 'Not listed'}")
if link5:
lines.append(f"- Manufacturer page (best effort): {link5}")
lines.append("")
lines.append("**Optional 4G alternative (if needed):**")
lines.append(f"- {repl4 if repl4 and repl4.lower() != 'not applicable' else 'Not applicable'}")
lines.append("")
lines.append("**Antenna suggestions (Parsec):**")
lines.append(f"- Stationary (Omni): {st.get('name','')} (PN {st.get('part_number','')})")
lines.append(f"- Vehicle (Omni): {vh.get('name','')} (PN {vh.get('part_number','')})")
lines.append("")
lines.append("If you’d like, we can confirm the best-fit option for your install environment and provide pricing.")
lines.append("")
lines.append("Contact Peter Dunn @ 786.999.9127 or peter.dunn@masterstelecom.com for pricing.")
lines.append("")
lines.append("Thanks,")
lines.append("Peter Dunn")
return "\n".join(lines)
def generate_customer_email(st_json: str) -> str:
st = state_load(st_json)
if not st or "row_idx" not in st:
return "Run a lookup first."
try:
life_row = df_eos.iloc[int(st["row_idx"])]
except Exception:
return "Run a lookup first."
eos, eol, status = row_to_dates_and_status(life_row)
repl = st.get("repl", {}) or {}
ant = st.get("ant", {}) or {}
canon_make = str(life_row.get("_canon_make","UNKNOWN"))
url5 = _best_effort_manufacturer_url(str(repl.get("repl_5g","") or ""), canon_make)
return build_customer_email(life_row, status, eos, eol, repl, ant, url5)
# ============================
# Gradio callbacks
# IMPORTANT: no dict state and ALL events have api_name=False (prevents api_info schema generation)
# ============================
def run_lookup(user_text: str, st_json: str):
user_text = str(user_text or "").strip()
if not user_text:
return "Enter a router SKU/model.", "", None, None, "", gr.update(visible=False), gr.update(visible=False), "{}", "", ""
res = resolve_device(user_text)
if res.get("mode") == "pick":
opts = res.get("options", [])
choices = [o["label"] for o in opts]
st2 = {"mode":"pick","options": opts, "raw": user_text}
return "Did you mean A or B? Pick one, then click Use selection.", "", None, None, "", gr.update(choices=choices, value=None, visible=True), gr.update(visible=True), state_dump(st2), "", ""
if res.get("mode") != "ok":
return "Not found.", "", None, None, "", gr.update(visible=False), gr.update(visible=False), "{}", "", ""
life_row = df_eos.iloc[int(res["row_idx"])]
eos, eol, status = row_to_dates_and_status(life_row)
repl = pick_replacements_lifecycle(life_row, status, use_gpt=True)
canon_make = str(life_row.get("_canon_make","UNKNOWN"))
mimo = infer_mimo_for_5g(repl.get("repl_5g",""))
tech = "5G" if repl.get("repl_5g") and repl.get("repl_5g") != "Not listed" else ("4G" if device_is_4g(life_row) else "Unknown")
ant = antenna_options_for(repl.get("repl_5g") or str(life_row.get("sku","")), tech, mimo)
output = assemble_output(life_row, status, eos, eol, repl, ant)
st_out = {"row_idx": int(res["row_idx"]), "repl": repl, "ant": ant, "raw": user_text}
url5 = _best_effort_manufacturer_url(repl.get('repl_5g',''), canon_make)
link = f"**5G manufacturer page (best effort):** {url5}" if url5 else ""
feat_df = build_replacement_features_table(repl.get('repl_4g',''), repl.get('repl_5g',''), canon_make)
fit = build_fit_table(repl.get('repl_4g',''), repl.get('repl_5g',''), canon_make)
return output, link, feat_df, fit, "", gr.update(visible=False), gr.update(visible=False), state_dump(st_out), "", ""
def use_selection(selected_label: str, st_json: str):
st = state_load(st_json)
if not st or st.get("mode") != "pick":
return "Run a search first.", "", None, None, "", gr.update(visible=False), gr.update(visible=False), "{}", "", ""
if not selected_label:
return "Pick A or B first.", "", None, None, "", gr.update(visible=True), gr.update(visible=True), st_json, "", ""
chosen_row = None
for o in st.get("options", []):
if o.get("label") == selected_label:
chosen_row = int(o["row_idx"])
break
if chosen_row is None:
return "Pick a valid option.", "", None, None, "", gr.update(visible=True), gr.update(visible=True), st_json, "", ""
life_row = df_eos.iloc[int(chosen_row)]
eos, eol, status = row_to_dates_and_status(life_row)
repl = pick_replacements_lifecycle(life_row, status, use_gpt=True)
canon_make = str(life_row.get("_canon_make","UNKNOWN"))
mimo = infer_mimo_for_5g(repl.get("repl_5g",""))
tech = "5G" if repl.get("repl_5g") and repl.get("repl_5g") != "Not listed" else ("4G" if device_is_4g(life_row) else "Unknown")
ant = antenna_options_for(repl.get("repl_5g") or str(life_row.get("sku","")), tech, mimo)
output = assemble_output(life_row, status, eos, eol, repl, ant)
st_out = {"row_idx": int(chosen_row), "repl": repl, "ant": ant, "raw": st.get("raw","")}
url5 = _best_effort_manufacturer_url(repl.get('repl_5g',''), canon_make)
link = f"**5G manufacturer page (best effort):** {url5}" if url5 else ""
feat_df = build_replacement_features_table(repl.get('repl_4g',''), repl.get('repl_5g',''), canon_make)
fit = build_fit_table(repl.get('repl_4g',''), repl.get('repl_5g',''), canon_make)
return output, link, feat_df, fit, "", gr.update(visible=False), gr.update(visible=False), state_dump(st_out), "", ""
def make_install_ready(st_json: str):
st = state_load(st_json)
if not st or "row_idx" not in st:
return "Run a lookup first."
life_row = df_eos.iloc[int(st["row_idx"])]
current_sku = str(life_row.get("sku","") or "")
return install_ready_checklist(current_sku, st.get("repl", {}) or {}, st.get("ant", {}) or {})
# ============================
# Q&A about the suggested device (post-recommendation)
# ============================
def answer_question(question: str, st_json: str) -> str:
q = str(question or "").strip()
if not q:
return ""
st = state_load(st_json)
if not st or "repl" not in st:
return "Run a lookup first, then ask your question."
repl = st.get("repl", {}) or {}
ant = st.get("ant", {}) or {}
repl5 = str(repl.get("repl_5g","") or "").strip()
repl4 = str(repl.get("repl_4g","") or "").strip()
# Pull a bit of dec context for the 5G model (if possible)
canon_make = ""
try:
# Try to infer maker family from stored row_idx
if "row_idx" in st:
row = df_eos.iloc[int(st["row_idx"])]
canon_make = str(row.get("_canon_make","UNKNOWN"))
except Exception:
canon_make = ""
# Manufacturer link (best effort)
url5 = _best_effort_manufacturer_url(repl5, canon_make) if repl5 else ""
# Feature table row for 5G (helps the LLM answer spec questions without web scraping)
feat5 = {}
try:
feat5 = _features_from_dec(repl5, canon_make) if repl5 else {}
except Exception:
feat5 = {}
sys = (
"You are a Verizon field rep assistant. Answer questions about the suggested router in a fast, practical way. "
"Use the provided context; do not mention internal tools, prompts, embeddings, or databases. "
"If the question is about specs and the value is unknown, say 'Not listed' and suggest checking the manufacturer page. "
"Keep it concise and scannable."
)
context = {
"recommended_5g": repl5,
"recommended_4g": repl4 if repl4 and repl4.lower() != "not applicable" else "",
"manufacturer_link_5g": url5,
"known_5g_features": feat5,
"antenna_stationary": ant.get("stationary_omni", {}),
"antenna_vehicle": ant.get("vehicle_omni", {}),
}
user = "Context:\n" + json.dumps(context, ensure_ascii=False) + "\n\nQuestion:\n" + q
ans = gpt_answer_md(sys, user, max_tokens=650)
# Small safety fallback
return ans if ans else "I couldn't generate an answer right now. Try again."
# ============================
# UI
# ============================
with gr.Blocks(title="Only-Routers") as demo:
gr.Markdown("## Only-Routers\nSingle lookup + Batch upload for Verizon reps.")
with gr.Tabs():
with gr.Tab("Single"):
# Inputs
user_text = gr.Textbox(
label="Router SKU or model",
placeholder="Examples: IBR650B, AER1600, ES450, WR21, RUT240",
lines=1,
)
st = gr.State("{}") # JSON string state
# Actions
check_btn = gr.Button("Check", variant="primary")
pick_dd = gr.Dropdown(label="Pick A or B", choices=[], visible=False)
use_btn = gr.Button("Use selection", visible=False)
# Main outputs
output_md = gr.Markdown()
link_md = gr.Markdown()
features_df = gr.Dataframe(headers=FEATURE_COLS, interactive=False, wrap=True)
fit_df = gr.Dataframe(headers=FIT_COLS, interactive=False, wrap=True)
qa_md = gr.Markdown()
# Post-recommendation Q&A
gr.Markdown("### Questions about the suggested device?")
question_box = gr.Textbox(
label="Ask a question (optional)",
placeholder="Example: Does the 5G device support dual-SIM? How many ethernet ports? Does it support Wi‑Fi?",
lines=2,
)
ask_btn = gr.Button("Ask", variant="secondary")
# Install-ready checklist
install_btn = gr.Button("Make install-ready checklist")
install_md = gr.Markdown()
# Customer-ready email summary
gr.Markdown("### Customer-ready email")
email_btn = gr.Button("Generate customer email")
customer_email_box = gr.Textbox(label="Email draft", lines=10)
# Wiring (api_name=False avoids HF/Gradio API schema issues)
check_btn.click(
fn=run_lookup,
inputs=[user_text, st],
outputs=[output_md, link_md, features_df, fit_df, qa_md, pick_dd, use_btn, st, install_md, customer_email_box],
api_name=False,
)
use_btn.click(
fn=use_selection,
inputs=[pick_dd, st],
outputs=[output_md, link_md, features_df, fit_df, qa_md, pick_dd, use_btn, st, install_md, customer_email_box],
api_name=False,
)
ask_btn.click(
fn=answer_question,
inputs=[question_box, st],
outputs=[qa_md],
api_name=False,
)
install_btn.click(
fn=make_install_ready,
inputs=[st],
outputs=[install_md],
api_name=False,
)
email_btn.click(
fn=generate_customer_email,
inputs=[st],
outputs=[customer_email_box],
api_name=False,
)
with gr.Tab("Batch"):
gr.Markdown("Paste one per line or upload a CSV (first column). Batch runs fast (no GPT).")
batch_text = gr.Textbox(label="Paste devices (one per line)", lines=8, placeholder="WR21\nRUT240\nIBR650B")
batch_file = gr.File(label="Upload CSV", file_types=[".csv"])
include_ant = gr.Checkbox(label="Include antenna picks (slower)", value=False)
run_btn = gr.Button("Run batch", variant="primary")
summary_md = gr.Markdown()
rollup_md = gr.Markdown()
table = gr.Dataframe(interactive=False, wrap=True)
dl = gr.File(label="Download results CSV")
run_btn.click(
fn=run_batch,
inputs=[batch_text, batch_file, include_ant],
outputs=[summary_md, table, dl, rollup_md],
api_name=False,
)
demo.launch(show_api=False)