Only-Routers / Updates /app2.py
crazycrazypete's picture
Upload folder using huggingface_hub
d410642 verified
import os
import re
import json
import math
import hashlib
import tempfile
from dataclasses import dataclass
from datetime import datetime, date
from typing import Any, Dict, List, Optional, Tuple
import numpy as np
import pandas as pd
import fitz # PyMuPDF
import faiss
from sentence_transformers import SentenceTransformer
from rapidfuzz import fuzz, process
import gradio as gr
from openai import OpenAI
# ============================
# Settings
# ============================
TODAY = date(2026, 1, 18)
OPENAI_MODEL = "gpt-5.2"
OPENAI_REASONING = {"effort": "high"}
MATCH_OK = 80
EMBED_MODEL_NAME = "sentence-transformers/all-MiniLM-L6-v2"
PARSEC_CONTEXT_BEFORE = 900
PARSEC_CONTEXT_AFTER = 1600
# ============================
# OpenAI client (HF Space secret: OPENAI_API_KEY)
# ============================
API_KEY = os.getenv("OPENAI_API_KEY", "").strip()
client = OpenAI(api_key=API_KEY) if API_KEY else None
# ----------------------------
# Gradio state helpers
# Keep state as a JSON STRING to avoid schema issues on Hugging Face.
# ----------------------------
def state_load(st_json: str) -> Dict[str, Any]:
try:
if not st_json:
return {}
return json.loads(st_json) if isinstance(st_json, str) else {}
except Exception:
return {}
def state_dump(st: Dict[str, Any]) -> str:
try:
return json.dumps(st or {}, ensure_ascii=False)
except Exception:
return "{}"
# ============================
# Helpers
# ============================
def norm_text(s: Any) -> str:
try:
if s is None or (isinstance(s, float) and math.isnan(s)) or pd.isna(s):
return ""
except Exception:
pass
s = str(s).strip().lower()
s = re.sub(r"[^a-z0-9\s\-\/]", " ", s)
s = re.sub(r"\s+", " ", s).strip()
return s
def safe_str(v: Any) -> str:
if v is None or (isinstance(v, float) and pd.isna(v)) or pd.isna(v):
return ""
return str(v).strip()
def is_5g(modem_type: Any) -> bool:
s = norm_text(modem_type)
return ("5g" in s) or ("nr" in s)
def json_load_safe(s: str) -> Dict[str, Any]:
try:
return json.loads(s)
except Exception:
return {}
def gpt_json(system: str, payload: Dict[str, Any], max_tokens: int = 600) -> Dict[str, Any]:
if client is None:
return {}
resp = client.responses.create(
model=OPENAI_MODEL,
reasoning=OPENAI_REASONING,
input=[{"role":"system","content":system},{"role":"user","content":json.dumps(payload)}],
max_output_tokens=max_tokens,
)
return json_load_safe(getattr(resp, "output_text", "") or "")
# ============================
# Load data
# ============================
EOS_PATH = "routers_eos_eol_by_sku.csv"
DEC_PATH = "dec2025routers.csv"
PARSEC_PDF = "ParsecCatalog.pdf"
if not os.path.exists(EOS_PATH):
raise FileNotFoundError(f"Missing {EOS_PATH} in repo.")
if not os.path.exists(DEC_PATH):
raise FileNotFoundError(f"Missing {DEC_PATH} in repo.")
if not os.path.exists(PARSEC_PDF):
raise FileNotFoundError(f"Missing {PARSEC_PDF} in repo.")
df_eos = pd.read_csv(EOS_PATH).copy()
df_dec = pd.read_csv(DEC_PATH).copy()
def region_ok(x: Any) -> bool:
s = str(x or "").strip().lower()
if not s:
return True
if "not specified" in s:
return True
if "north america" in s:
return True
if re.search(r"\busa\b", s):
return True
if re.search(r"\bunited\s+states\b", s):
return True
if re.search(r"\bu\.?s\.?\b", s):
return True
return False
if "region" in df_eos.columns:
df_eos = df_eos[df_eos["region"].apply(region_ok)].reset_index(drop=True)
# Maker mapping (includes Teltonika)
CANON_MAKER = {
"CRADLEPOINT": {"cradlepoint", "ericsson", "ericsson enterprise wireless"},
"SIERRA": {"sierra", "sierra wireless", "semtech", "airlink"},
"FEENEY": {"feeney", "feeney wireless", "inseego"},
"DIGI": {"digi", "accelerated", "accelerated concepts"},
"CISCO_MERAKI": {"meraki", "cisco meraki"},
"CISCO": {"cisco"},
"TELTONIKA": {"teltonika"},
}
def canon_maker_from_text(s: Any) -> str:
t = norm_text(s)
for canon, terms in CANON_MAKER.items():
for term in terms:
if term in t:
return canon
return "UNKNOWN"
df_eos["_canon_make"] = df_eos["manufacturer"].apply(canon_maker_from_text) if "manufacturer" in df_eos.columns else "UNKNOWN"
df_eos["_norm_sku"] = df_eos["sku"].apply(norm_text) if "sku" in df_eos.columns else ""
df_eos["_norm_desc"] = df_eos["description"].apply(norm_text) if "description" in df_eos.columns else ""
df_eos["_norm_notes"] = df_eos["notes"].apply(norm_text) if "notes" in df_eos.columns else ""
df_dec["_canon_make"] = df_dec["Make"].apply(canon_maker_from_text) if "Make" in df_dec.columns else "UNKNOWN"
df_dec["_norm_model"] = df_dec["Model"].apply(norm_text) if "Model" in df_dec.columns else ""
df_dec["_is5g"] = df_dec["Modem Type"].apply(is_5g) if "Modem Type" in df_dec.columns else False
# ============================
# Date helpers
# ============================
@dataclass
class ParsedDate:
raw: str
kind: str
value: Optional[date]
def parse_date_field(x: Any) -> ParsedDate:
raw = str(x or "").strip()
if not raw:
return ParsedDate(raw="", kind="missing", value=None)
if re.fullmatch(r"\d{4}", raw):
y = int(raw)
if y == TODAY.year:
return ParsedDate(raw=raw, kind="year", value=date(y, 1, 1))
if y < TODAY.year:
return ParsedDate(raw=raw, kind="year", value=date(y, 1, 1))
return ParsedDate(raw=raw, kind="year", value=date(y, 12, 31))
if re.fullmatch(r"\d{4}-\d{2}", raw):
try:
y, m = raw.split("-")
return ParsedDate(raw=raw, kind="year_month", value=date(int(y), int(m), 1))
except Exception:
return ParsedDate(raw=raw, kind="bad", value=None)
if re.fullmatch(r"\d{4}-\d{2}-\d{2}", raw):
try:
dt = datetime.strptime(raw, "%Y-%m-%d").date()
return ParsedDate(raw=raw, kind="full", value=dt)
except Exception:
return ParsedDate(raw=raw, kind="bad", value=None)
return ParsedDate(raw=raw, kind="bad", value=None)
def display_date(pd_: ParsedDate) -> str:
if pd_.kind == "missing":
return "Not listed"
if pd_.kind == "bad":
return pd_.raw or "Not listed"
return pd_.raw
def status_from_eos_eol(eos: ParsedDate, eol: ParsedDate) -> str:
if eos.value is None and eol.value is None:
return "Unknown"
if eol.value is not None and eol.value <= TODAY:
return "End of Life"
if eos.value is not None and eos.value <= TODAY:
return "End of Sale"
return "Active"
def row_to_dates_and_status(row: pd.Series) -> Tuple[str, str, str]:
eos = parse_date_field(row.get("end_of_sale"))
eol = parse_date_field(row.get("end_of_life"))
return display_date(eos), display_date(eol), status_from_eos_eol(eos, eol)
# ============================
# Embeddings + Parsec index
# ============================
embedder = SentenceTransformer(EMBED_MODEL_NAME)
def extract_pdf_text_pages(path: str) -> List[str]:
doc = fitz.open(path)
return [doc[i].get_text("text") for i in range(len(doc))]
def build_parsec_cards(pages: List[str]) -> List[str]:
cards = []
for p in pages:
for m in re.finditer(r"Standard\s+SKU:", p):
start = max(0, m.start() - PARSEC_CONTEXT_BEFORE)
end = min(len(p), m.start() + PARSEC_CONTEXT_AFTER)
c = p[start:end].strip()
if len(c) >= 200:
cards.append(c)
out, seen = [], set()
for c in cards:
h = hashlib.sha1(c.encode("utf-8")).hexdigest()
if h not in seen:
seen.add(h); out.append(c)
return out
parsec_cards = build_parsec_cards(extract_pdf_text_pages(PARSEC_PDF))
parsec_emb = embedder.encode(parsec_cards, batch_size=64, show_progress_bar=False, normalize_embeddings=True)
parsec_emb = np.asarray(parsec_emb, dtype=np.float32)
parsec_index = faiss.IndexFlatIP(parsec_emb.shape[1])
parsec_index.add(parsec_emb)
# ============================
# Device resolution
# ============================
def label_for_row(i: int) -> str:
r = df_eos.iloc[i]
return f"{r.get('sku','')}{r.get('manufacturer','')}{r.get('description','')}"[:220]
EOS_LABELS = [label_for_row(i) for i in range(len(df_eos))]
EOS_CORPUS = []
for _, r in df_eos.iterrows():
EOS_CORPUS.append(" ".join([r.get("_norm_sku",""), r.get("_canon_make",""), r.get("_norm_desc",""), r.get("_norm_notes","")]))
def local_candidates(query: str, top_k: int = 6) -> List[Tuple[int, int, str]]:
q = norm_text(query)
hits = process.extract(q, EOS_CORPUS, scorer=fuzz.WRatio, limit=top_k)
return [(int(idx), int(score), EOS_LABELS[int(idx)]) for _, score, idx in hits]
def gpt_choose_device(user_text: str, candidates: List[Tuple[int,int,str]]) -> Dict[str, Any]:
if client is None:
return {}
sys = "Pick which router the user meant. Never invent. Return strict JSON only."
payload = {
"user_input": user_text,
"candidates": [{"row_idx": i, "score": s, "label": lbl} for (i,s,lbl) in candidates],
"rules": [
"If one is clearly correct, return mode='ok' with row_idx.",
"If two are plausible, return mode='pick' with top 2 options."
],
"output_schema": {"mode":"ok|pick","row_idx":"int","options":[{"row_idx":"int","label":"string"}]}
}
return gpt_json(sys, payload, max_tokens=280)
def resolve_device(user_text: str) -> Dict[str, Any]:
q = norm_text(user_text)
exact = df_eos.index[df_eos["_norm_sku"] == q].tolist()
if len(exact) == 1:
return {"mode":"ok","row_idx": int(exact[0])}
if len(exact) > 1:
opts = [{"row_idx": int(i), "label": EOS_LABELS[int(i)]} for i in exact[:2]]
return {"mode":"pick","options": opts}
cands = local_candidates(user_text, top_k=6)
if not cands:
return {"mode":"not_found"}
if cands[0][1] >= 95 and (len(cands) == 1 or (cands[0][1] - cands[1][1]) >= 8):
return {"mode":"ok","row_idx": cands[0][0]}
g = gpt_choose_device(user_text, cands)
if g.get("mode") == "ok" and isinstance(g.get("row_idx"), int):
return {"mode":"ok","row_idx": int(g["row_idx"])}
if g.get("mode") == "pick":
opts = g.get("options", []) or []
opts2 = [{"row_idx": int(o["row_idx"]), "label": str(o["label"])} for o in opts[:2] if "row_idx" in o]
if opts2:
return {"mode":"pick","options": opts2}
if len(cands) > 1:
return {"mode":"pick","options":[{"row_idx":cands[0][0],"label":cands[0][2]},{"row_idx":cands[1][0],"label":cands[1][2]}]}
return {"mode":"pick","options":[{"row_idx":cands[0][0],"label":cands[0][2]}]}
# ============================
# Replacements — lifecycle CSV source of truth
# ============================
def extract_model_token(text: str) -> str:
s = safe_str(text)
if not s:
return ""
parts = [p.strip() for p in s.split("|") if p.strip()]
candidates = parts[::-1] if parts else [s]
for cand in candidates:
m = re.search(r"\bRUT[A-Z]?\d{2,4}\b", cand.upper())
if m:
return m.group(0).upper()
m = re.search(r"\bIX\d{2}\b", cand, flags=re.IGNORECASE)
if m:
return m.group(0).upper()
m = re.search(r"\b(R\d{3,4}|E\d{3,4}|S\d{3,4})\b", cand, flags=re.IGNORECASE)
if m:
return m.group(0).upper()
m = re.search(r"\b[A-Z]{1,6}\d{2,4}[A-Z]?\b", cand.upper())
if m:
return m.group(0).upper()
return candidates[0][:60]
def device_is_4g(row: pd.Series) -> bool:
t = norm_text(row.get("description","")) + " " + norm_text(row.get("notes",""))
return (("lte" in t or "4g" in t) and ("5g" not in t and "nr" not in t))
def candidate_5g_models_from_lifecycle(manufacturer: str) -> List[str]:
mfr = norm_text(manufacturer)
pool = df_eos[df_eos["manufacturer"].astype(str).str.lower().eq(mfr)].copy() if "manufacturer" in df_eos.columns else df_eos.copy()
vals = pool["advanced_5g_option"].tolist() if "advanced_5g_option" in pool.columns else []
out, seen = [], set()
for v in vals:
tok = extract_model_token(v)
if tok and tok.lower() != "nan" and tok not in seen:
seen.add(tok); out.append(tok)
return out
def candidate_4g_models_from_lifecycle(manufacturer: str) -> List[str]:
mfr = norm_text(manufacturer)
pool = df_eos[df_eos["manufacturer"].astype(str).str.lower().eq(mfr)].copy() if "manufacturer" in df_eos.columns else df_eos.copy()
vals = pool["suggested_replacement"].tolist() if "suggested_replacement" in pool.columns else []
out, seen = [], set()
for v in vals:
tok = extract_model_token(v)
if tok and tok.lower() != "nan" and tok not in seen:
seen.add(tok); out.append(tok)
return out
def gpt_pick_from_candidates(old_row: pd.Series, candidates: List[str], need: str) -> str:
if client is None or not candidates:
return ""
sys = "Pick the best replacement model. Choose only from candidates. Return strict JSON only."
payload = {
"old_device": {
"sku": str(old_row.get("sku","")),
"manufacturer": str(old_row.get("manufacturer","")),
"description": str(old_row.get("description","")),
"need": need,
},
"candidates": candidates[:40],
"output_schema": {"choice":"string"}
}
out = gpt_json(sys, payload, max_tokens=240) or {}
choice = str(out.get("choice","") or "").strip()
return choice if choice in candidates else ""
def fallback_5g_from_dec(canon_make: str) -> str:
pool5 = df_dec[(df_dec["_canon_make"] == canon_make) & (df_dec["_is5g"] == True)]
return str(pool5.iloc[0]["Model"]).strip() if not pool5.empty else ""
def pick_replacements_lifecycle(row: pd.Series, status: str, use_gpt: bool = True) -> Dict[str, Any]:
canon = str(row.get("_canon_make","UNKNOWN"))
manufacturer = str(row.get("manufacturer","") or "")
is_4g = device_is_4g(row)
want_5g = is_4g or (status in {"End of Sale","End of Life"})
repl_4g = "Not applicable"
if is_4g:
repl_4g = extract_model_token(safe_str(row.get("suggested_replacement","")))
if not repl_4g:
cand4 = candidate_4g_models_from_lifecycle(manufacturer)
repl_4g = (gpt_pick_from_candidates(row, cand4, "4G alternative") if (use_gpt and client) else "") or (cand4[0] if cand4 else "")
if not repl_4g:
repl_4g = "Not applicable"
repl_5g = "Not listed"
if want_5g:
repl_5g = extract_model_token(safe_str(row.get("advanced_5g_option","")))
if not repl_5g:
cand5 = candidate_5g_models_from_lifecycle(manufacturer)
repl_5g = (gpt_pick_from_candidates(row, cand5, "5G replacement/upgrade") if (use_gpt and client) else "") or (cand5[0] if cand5 else "")
if not repl_5g:
repl_5g = fallback_5g_from_dec(canon) or "Not listed"
if repl_5g.lower() == "nan":
repl_5g = "Not listed"
return {"repl_4g": repl_4g, "repl_5g": repl_5g, "sources": ["lifecycle_csv"] + (["gpt"] if (use_gpt and client) else [])}
# ============================
# Antennas (Parsec-only)
# ============================
PARSEC_FAMILY_WORDS = {"chinook","labrador","boxer","bloodhound","husky","beagle","mastiff","collie","shepherd","belgian","australian","terrier","pyrenees"}
BAD_NAME_MARKERS = {"customization","standard connectors","connectors","features","benefits","specifications","mechanical","electrical","mounting","accessories","description:","standard sku"}
def clean_line(s: str) -> str:
s = re.sub(r"\s+", " ", str(s or "").strip())
if re.fullmatch(r"-[a-z0-9]+", s.lower()):
return ""
return s
def is_bad_name_line(line: str) -> bool:
low = line.lower()
if any(m in low for m in BAD_NAME_MARKERS):
return True
if re.search(r"\b-[a-z0-9]{1,4}\b", low) and len(low) <= 25:
return True
return False
def family_from_line(line: str) -> str:
low = line.lower()
for fam in PARSEC_FAMILY_WORDS:
if fam in low:
return fam.capitalize()
return ""
def parsec_connectors_from_card(t: str) -> str:
m = re.search(r"Standard\s+Connectors:\s*(.+)", t, flags=re.IGNORECASE)
if m:
return re.sub(r"\s+", " ", m.group(1).strip())[:80]
return ""
def parsec_name_from_card(card_text: str) -> str:
lines = [clean_line(ln) for ln in str(card_text or "").splitlines()]
lines = [ln for ln in lines if ln]
for ln in lines:
if is_bad_name_line(ln):
continue
fam = family_from_line(ln)
if fam:
return fam
return "Parsec antenna"
def parsec_part_from_card(t: str) -> str:
m = re.search(r"Standard\s+SKU:\s*([A-Z0-9]+)", t)
return m.group(1).strip() if m else ""
def parsec_desc_from_card(t: str) -> str:
m = re.search(r"Description:\s*(.+?)(?:\n|$)", t, flags=re.IGNORECASE)
return re.sub(r"\s+"," ",m.group(1).strip())[:220] if m else ""
def parsec_retrieve(query: str, top_k: int = 10) -> List[Dict[str, Any]]:
qv = embedder.encode([query], normalize_embeddings=True)
qv = np.asarray(qv, dtype=np.float32)
scores, ids = parsec_index.search(qv, top_k)
out: List[Dict[str, Any]] = []
for sc, i in zip(scores[0].tolist(), ids[0].tolist()):
if 0 <= int(i) < len(parsec_cards):
card = parsec_cards[int(i)]
out.append({
"score": float(sc),
"name": parsec_name_from_card(card),
"part_number": parsec_part_from_card(card),
"description": parsec_desc_from_card(card),
"connectors": parsec_connectors_from_card(card),
})
return out
def infer_mimo_for_5g(model: str, canon_make: str) -> str:
if not model or model in {"Not applicable","Not listed"}:
return "2x2"
pool = df_dec[df_dec["_canon_make"] == canon_make].copy()
if not pool.empty:
hit = process.extractOne(norm_text(model), pool["_norm_model"].tolist(), scorer=fuzz.WRatio)
if hit and hit[1] >= MATCH_OK:
row = pool.iloc[int(hit[2])]
txt = (str(row.get("Antennas (internal/external/both)","")) + " " + str(row.get("Modem Type",""))).lower()
if "4x4" in txt or "4 x 4" in txt:
return "4x4"
return "4x4" if ("5g" in model.lower() or model.upper().startswith(("R","E","S","IX","RUTM"))) else "2x2"
def antenna_options_for(router_model: str, tech: str, mimo: str) -> Dict[str, Any]:
q_stationary = f"{router_model} {tech} {mimo} omni stationary outdoor Parsec"
q_vehicle = f"{router_model} {tech} {mimo} omni vehicle mobile Parsec"
cand_stationary = parsec_retrieve(q_stationary, top_k=10)
cand_vehicle = parsec_retrieve(q_vehicle, top_k=10)
s = cand_stationary[0] if cand_stationary else {"name":"Parsec antenna","part_number":"","description":"","connectors":""}
v = cand_vehicle[0] if cand_vehicle else {"name":"Parsec antenna","part_number":"","description":"","connectors":""}
s.update({"mimo": mimo, "why": "Stationary omni best match."})
v.update({"mimo": mimo, "why": "Vehicle omni best match."})
return {"stationary_omni": s, "vehicle_omni": v, "sources":["parsec_rag"]}
# ============================
# Install-ready checklist
# ============================
def install_ready_checklist(current_sku: str, repl: Dict[str,Any], ant: Dict[str,Any]) -> str:
st = ant.get("stationary_omni", {})
vh = ant.get("vehicle_omni", {})
if client is not None:
sys = "Create a short, install-ready checklist for a Verizon rep. Return markdown only."
payload = {"current_device": current_sku, "replacements": repl, "antennas": {"stationary": st, "vehicle": vh}}
resp = client.responses.create(
model=OPENAI_MODEL,
reasoning=OPENAI_REASONING,
input=[{"role":"system","content":sys},{"role":"user","content":json.dumps(payload)}],
max_output_tokens=520,
)
return (getattr(resp, "output_text", "") or "").strip()
return "\n".join([
"### Install-ready checklist",
f"- Current device: {current_sku}",
f"- 5G replacement: {repl.get('repl_5g','')}",
f"- 4G alternative: {repl.get('repl_4g','Not applicable')}",
f"- Stationary omni antenna: {st.get('name','')} (PN {st.get('part_number','')})",
f"- Vehicle omni antenna: {vh.get('name','')} (PN {vh.get('part_number','')})",
"- Next steps: confirm mounting + cable lengths + power; place order; schedule install.",
])
# ============================
# Batch mode (NO GPT)
# ============================
def parse_batch_inputs(text_blob: str, file_obj: Any) -> List[str]:
items: List[str] = []
if file_obj is not None:
try:
path = file_obj.name if hasattr(file_obj, "name") else str(file_obj)
df = pd.read_csv(path)
col = df.columns[0]
items.extend([str(x).strip() for x in df[col].tolist() if str(x).strip()])
except Exception:
pass
if text_blob:
for ln in str(text_blob).splitlines():
ln = ln.strip()
if ln:
items.append(ln)
seen=set()
out=[]
for x in items:
k=norm_text(x)
if k and k not in seen:
seen.add(k); out.append(x)
return out
def run_batch(text_blob: str, file_obj: Any, include_antennas: bool):
inputs = parse_batch_inputs(text_blob, file_obj)
if not inputs:
return "", None, None, ""
rows=[]
for item in inputs:
res = resolve_device(item)
if res.get("mode") != "ok":
rows.append({"Input": item, "Matched":"", "Status":"Needs review", "EOS":"", "EOL":"", "4G alternative":"", "5G replacement":"", "Notes":"Not found/ambiguous"})
continue
life_row = df_eos.iloc[int(res["row_idx"])]
eos, eol, status = row_to_dates_and_status(life_row)
repl = pick_replacements_lifecycle(life_row, status, use_gpt=False)
rows.append({
"Input": item,
"Matched": str(life_row.get("sku","")),
"Status": status,
"EOS": eos,
"EOL": eol,
"4G alternative": repl.get("repl_4g",""),
"5G replacement": repl.get("repl_5g",""),
"Notes": "",
})
out_df = pd.DataFrame(rows)
counts = out_df["Status"].value_counts(dropna=False).to_dict()
top_5g = out_df["5G replacement"].value_counts(dropna=False).head(5).to_dict()
summary = f"Rows: {len(out_df)} | " + " | ".join([f"{k}: {v}" for k,v in counts.items()])
rollup = "Top 5G recommendations:\n" + "\n".join([f"- {k}: {v}" for k,v in top_5g.items() if str(k).strip()])
tmp = tempfile.NamedTemporaryFile(delete=False, suffix=".csv")
out_df.to_csv(tmp.name, index=False)
return summary, out_df, tmp.name, rollup
# ============================
# Output
# ============================
def assemble_output(life_row: pd.Series, status: str, eos: str, eol: str, repl: Dict[str,Any], ant: Dict[str,Any]) -> str:
current_name = f"{life_row.get('sku','')}{life_row.get('description','')}".strip(" —")
st = ant.get("stationary_omni", {})
vh = ant.get("vehicle_omni", {})
lines = []
lines.append(f"1. Current device: **{current_name}**")
lines.append(f"2. Status: **{status}**")
lines.append(f"3. End of Sale date: **{eos}**")
lines.append(f"4. End of Life date: **{eol}**")
lines.append(f"5. 4G alternative (lifecycle): **{repl.get('repl_4g','Not applicable')}**")
lines.append(f"6. 5G replacement (lifecycle): **{repl.get('repl_5g','Not listed')}**")
lines.append("7. Antenna options (Parsec-only):")
conn_s = f" | Conn: {st.get('connectors','')}" if st.get("connectors") else ""
conn_v = f" | Conn: {vh.get('connectors','')}" if vh.get("connectors") else ""
lines.append(f" - Stationary (Omni): **{st.get('name','')}** (Part #: {st.get('part_number','')}) — {st.get('description','')} — MIMO: {st.get('mimo','')}{conn_s}")
lines.append(f" - Vehicle (Omni): **{vh.get('name','')}** (Part #: {vh.get('part_number','')}) — {vh.get('description','')} — MIMO: {vh.get('mimo','')}{conn_v}")
lines.append("\nSources (debug):")
for s in repl.get("sources", []) if isinstance(repl.get("sources"), list) else []:
lines.append(f"- {s}")
lines.append("- ParsecCatalog.pdf (local RAG)")
lines.append("- routers_eos_eol_by_sku.csv (replacements)")
return "\n".join(lines)
# ============================
# Gradio callbacks
# IMPORTANT: no dict state and ALL events have api_name=False (prevents api_info schema generation)
# ============================
def run_lookup(user_text: str, st_json: str):
user_text = str(user_text or "").strip()
if not user_text:
return "Enter a router SKU/model.", gr.update(visible=False), gr.update(visible=False), "{}", ""
res = resolve_device(user_text)
if res.get("mode") == "pick":
opts = res.get("options", [])
choices = [o["label"] for o in opts]
st2 = {"mode":"pick","options": opts, "raw": user_text}
return "Did you mean A or B? Pick one, then click Use selection.", gr.update(choices=choices, value=None, visible=True), gr.update(visible=True), state_dump(st2), ""
if res.get("mode") != "ok":
return "Not found.", gr.update(visible=False), gr.update(visible=False), "{}", ""
life_row = df_eos.iloc[int(res["row_idx"])]
eos, eol, status = row_to_dates_and_status(life_row)
repl = pick_replacements_lifecycle(life_row, status, use_gpt=True)
canon_make = str(life_row.get("_canon_make","UNKNOWN"))
mimo = infer_mimo_for_5g(repl.get("repl_5g",""), canon_make)
tech = "5G" if repl.get("repl_5g") and repl.get("repl_5g") != "Not listed" else ("4G" if device_is_4g(life_row) else "Unknown")
ant = antenna_options_for(repl.get("repl_5g") or str(life_row.get("sku","")), tech, mimo)
output = assemble_output(life_row, status, eos, eol, repl, ant)
st_out = {"row_idx": int(res["row_idx"]), "repl": repl, "ant": ant, "raw": user_text}
return output, gr.update(visible=False), gr.update(visible=False), state_dump(st_out), ""
def use_selection(selected_label: str, st_json: str):
st = state_load(st_json)
if not st or st.get("mode") != "pick":
return "Run a search first.", gr.update(visible=False), gr.update(visible=False), "{}", ""
if not selected_label:
return "Pick A or B first.", gr.update(visible=True), gr.update(visible=True), st_json, ""
chosen_row = None
for o in st.get("options", []):
if o.get("label") == selected_label:
chosen_row = int(o["row_idx"])
break
if chosen_row is None:
return "Pick a valid option.", gr.update(visible=True), gr.update(visible=True), st_json, ""
life_row = df_eos.iloc[int(chosen_row)]
eos, eol, status = row_to_dates_and_status(life_row)
repl = pick_replacements_lifecycle(life_row, status, use_gpt=True)
canon_make = str(life_row.get("_canon_make","UNKNOWN"))
mimo = infer_mimo_for_5g(repl.get("repl_5g",""), canon_make)
tech = "5G" if repl.get("repl_5g") and repl.get("repl_5g") != "Not listed" else ("4G" if device_is_4g(life_row) else "Unknown")
ant = antenna_options_for(repl.get("repl_5g") or str(life_row.get("sku","")), tech, mimo)
output = assemble_output(life_row, status, eos, eol, repl, ant)
st_out = {"row_idx": int(chosen_row), "repl": repl, "ant": ant, "raw": st.get("raw","")}
return output, gr.update(visible=False), gr.update(visible=False), state_dump(st_out), ""
def make_install_ready(st_json: str):
st = state_load(st_json)
if not st or "row_idx" not in st:
return "Run a lookup first."
life_row = df_eos.iloc[int(st["row_idx"])]
current_sku = str(life_row.get("sku","") or "")
return install_ready_checklist(current_sku, st.get("repl", {}) or {}, st.get("ant", {}) or {})
# ============================
# UI
# ============================
with gr.Blocks(title="Only-Routers") as demo:
gr.Markdown("## Only-Routers\nSingle lookup + Batch upload for Verizon reps.")
with gr.Tabs():
with gr.Tab("Single"):
user_text = gr.Textbox(label="Router SKU or model", placeholder="Examples: IBR650B, AER1600, ES450, WR21, RUT240", lines=1)
st = gr.State("{}") # JSON string
check_btn = gr.Button("Check", variant="primary")
pick_dd = gr.Dropdown(label="Pick A or B", choices=[], visible=False)
use_btn = gr.Button("Use selection", visible=False)
output_md = gr.Markdown()
install_btn = gr.Button("Make install-ready checklist")
install_md = gr.Markdown()
check_btn.click(fn=run_lookup, inputs=[user_text, st], outputs=[output_md, pick_dd, use_btn, st, install_md], api_name=False)
use_btn.click(fn=use_selection, inputs=[pick_dd, st], outputs=[output_md, pick_dd, use_btn, st, install_md], api_name=False)
install_btn.click(fn=make_install_ready, inputs=[st], outputs=[install_md], api_name=False)
with gr.Tab("Batch"):
gr.Markdown("Paste one per line or upload a CSV (first column). Batch runs fast (no GPT).")
batch_text = gr.Textbox(label="Paste devices (one per line)", lines=8, placeholder="WR21\nRUT240\nIBR650B")
batch_file = gr.File(label="Upload CSV", file_types=[".csv"])
include_ant = gr.Checkbox(label="Include antenna picks (slower)", value=False)
run_btn = gr.Button("Run batch", variant="primary")
summary_md = gr.Markdown()
rollup_md = gr.Markdown()
table = gr.Dataframe(interactive=False, wrap=True)
dl = gr.File(label="Download results CSV")
run_btn.click(fn=run_batch, inputs=[batch_text, batch_file, include_ant], outputs=[summary_md, table, dl, rollup_md], api_name=False)
# IMPORTANT: On Spaces, demo.launch() is correct; do NOT use share=True.
demo.launch(show_api=False)