diff --git "a/app.py" "b/app.py" --- "a/app.py" +++ "b/app.py" @@ -1,1665 +1,1175 @@ +"""Only Routers (chat) + +A Gradio chat app for Verizon reps to: +- Look up a router SKU (or close match) +- Show suggested 4G + 5G replacements +- Show a small feature table for each suggested device +- Produce a customer-ready email-style summary +- Answer follow-up questions about the suggested device(s) + +Data files (keep them next to this app.py) +- routers_eos_eol_by_sku.csv (EoS/EoL + suggested replacements) +- dec2025routers.csv (device detail table) + +OpenAI +- Set OPENAI_API_KEY as a Space secret. +- Default model is a non-thinking GPT-5.2 variant for lower latency. + +Notes +- For 5G replacements, cellular MIMO is always treated as 4x4. +- The app uses the Dec 2025 device table first. If a field is missing, it can ask GPT. +""" + +from __future__ import annotations + +import csv +import json +import logging import os import re -import json -import math -import hashlib -import tempfile -from dataclasses import dataclass -from datetime import datetime, date +from dataclasses import dataclass, field +from functools import lru_cache +from pathlib import Path from typing import Any, Dict, List, Optional, Tuple -import numpy as np -import pandas as pd +import gradio as gr -import fitz # PyMuPDF -import faiss -from sentence_transformers import SentenceTransformer -from rapidfuzz import fuzz, process +# Optional deps. The app runs without them, with fewer niceties. +try: + from openai import OpenAI # type: ignore +except Exception: # pragma: no cover + OpenAI = None # type: ignore -import gradio as gr -from openai import OpenAI +try: + import requests # type: ignore +except Exception: # pragma: no cover + requests = None # type: ignore +try: + from rapidfuzz import fuzz as rf_fuzz # type: ignore + from rapidfuzz import process as rf_process # type: ignore +except Exception: # pragma: no cover + rf_fuzz = None # type: ignore + rf_process = None # type: ignore -# ============================ -# Settings -# ============================ -TODAY = date(2026, 1, 18) -OPENAI_MODEL = "gpt-5.2" -OPENAI_REASONING = {"effort": "high"} -MATCH_OK = 80 -EMBED_MODEL_NAME = "sentence-transformers/all-MiniLM-L6-v2" -PARSEC_CONTEXT_BEFORE = 900 -PARSEC_CONTEXT_AFTER = 1600 +# ----------------------------- +# App config +# ----------------------------- +LOG_LEVEL = os.getenv("LOG_LEVEL", "INFO").upper() +logging.basicConfig(level=LOG_LEVEL, format="%(levelname)s | %(message)s") +log = logging.getLogger("only-routers") -# ============================ -# OpenAI client (HF Space secret: OPENAI_API_KEY) -# ============================ -API_KEY = os.getenv("OPENAI_API_KEY", "").strip() -client = OpenAI(api_key=API_KEY) if API_KEY else None +HERE = Path(__file__).resolve().parent -# ---------------------------- -# Gradio state helpers -# Keep state as a JSON STRING to avoid schema issues on Hugging Face. -# ---------------------------- -def state_load(st_json: str) -> Dict[str, Any]: - try: - if not st_json: - return {} - return json.loads(st_json) if isinstance(st_json, str) else {} - except Exception: - return {} +EOS_CSV_PATH = Path(os.getenv("EOS_CSV_PATH", str(HERE / "routers_eos_eol_by_sku.csv"))) +CATALOG_CSV_PATH = Path(os.getenv("CATALOG_CSV_PATH", str(HERE / "dec2025routers.csv"))) -def state_dump(st: Dict[str, Any]) -> str: - try: - return json.dumps(st or {}, ensure_ascii=False) - except Exception: - return "{}" +# Default to the faster, non-thinking GPT-5.2 route. +OPENAI_MODEL = os.getenv("OPENAI_MODEL", "gpt-5.2-chat-latest") +OPENAI_MODEL_QA = os.getenv("OPENAI_MODEL_QA", OPENAI_MODEL) +OPENAI_TEMPERATURE = float(os.getenv("OPENAI_TEMPERATURE", "0.2")) +OPENAI_MAX_TOKENS = int(os.getenv("OPENAI_MAX_TOKENS", "700")) +# Fast timeouts help avoid hanging requests in a Space. +OPENAI_TIMEOUT_SEC = float(os.getenv("OPENAI_TIMEOUT_SEC", "25")) +HTTP_TIMEOUT_SEC = float(os.getenv("HTTP_TIMEOUT_SEC", "7")) +# If fuzzy matching is present, these control auto-pick. +FUZZY_AUTOPICK_MIN = int(os.getenv("FUZZY_AUTOPICK_MIN", "92")) +FUZZY_AUTOPICK_GAP = int(os.getenv("FUZZY_AUTOPICK_GAP", "5")) -# ============================ -# Helpers -# ============================ -def norm_text(s: Any) -> str: - try: - if s is None or (isinstance(s, float) and math.isnan(s)) or pd.isna(s): - return "" - except Exception: - pass - s = str(s).strip().lower() - s = re.sub(r"[^a-z0-9\s\-\/]", " ", s) - s = re.sub(r"\s+", " ", s).strip() - return s +CONTACT_LINE = "Contact Peter Dunn @ 786.999.9127 or peter.dunn@masterstelecom.com for pricing." -def safe_str(v: Any) -> str: - if v is None or (isinstance(v, float) and pd.isna(v)) or pd.isna(v): - return "" - return str(v).strip() -def is_5g(modem_type: Any) -> bool: - s = norm_text(modem_type) - return ("5g" in s) or ("nr" in s) +# ----------------------------- +# Data models +# ----------------------------- -def json_load_safe(s: str) -> Dict[str, Any]: - try: - return json.loads(s) - except Exception: - return {} -def gpt_json(system: str, payload: Dict[str, Any], max_tokens: int = 600) -> Dict[str, Any]: - if client is None: - return {} - resp = client.responses.create( - model=OPENAI_MODEL, - reasoning=OPENAI_REASONING, - input=[{"role":"system","content":system},{"role":"user","content":json.dumps(payload)}], - max_output_tokens=max_tokens, - ) - return json_load_safe(getattr(resp, "output_text", "") or "") +@dataclass(frozen=True) +class EosRecord: + sku: str + manufacturer: str + end_of_sale: str + end_of_life: str + description: str + suggested_replacement: str + advanced_5g_option: str + region: str + source: str + source_detail: str + notes: str -def gpt_answer_md(system: str, user: str, max_tokens: int = 650) -> str: - """Return a rep-friendly markdown answer.""" - if client is None: - return "No API key is configured, so I can't answer detailed questions right now." - resp = client.responses.create( - model=OPENAI_MODEL, - reasoning=OPENAI_REASONING, - input=[ - {"role": "system", "content": system}, - {"role": "user", "content": user}, - ], - max_output_tokens=max_tokens, - ) - return (getattr(resp, "output_text", "") or "").strip() - - -# ============================ -# Load data -# ============================ -EOS_PATH = "routers_eos_eol_by_sku.csv" -DEC_PATH = "dec2025routers.csv" -PARSEC_PDF = "ParsecCatalog.pdf" - -if not os.path.exists(EOS_PATH): - raise FileNotFoundError(f"Missing {EOS_PATH} in repo.") -if not os.path.exists(DEC_PATH): - raise FileNotFoundError(f"Missing {DEC_PATH} in repo.") -if not os.path.exists(PARSEC_PDF): - raise FileNotFoundError(f"Missing {PARSEC_PDF} in repo.") - -df_eos = pd.read_csv(EOS_PATH).copy() -df_dec = pd.read_csv(DEC_PATH).copy() - - -def _canonize_eos_columns(df: pd.DataFrame) -> pd.DataFrame: - """Normalize lifecycle CSV column names (case-insensitive) and create expected columns.""" - # Map various header spellings to canonical names used by the app - mapping = {} - for c in df.columns: - k = str(c).strip().lower().replace(" ", "_") - if k in {"sku", "model", "device", "device_sku"}: - mapping[c] = "sku" - elif k in {"manufacturer", "make", "vendor"}: - mapping[c] = "manufacturer" - elif k in {"device_type", "type"}: - mapping[c] = "device_type" - elif k in {"end_of_sale", "eos", "end_sale", "end_of_sales"}: - mapping[c] = "end_of_sale" - elif k in {"end_of_life", "eol", "end_life"}: - mapping[c] = "end_of_life" - elif k in {"suggested_replacement", "replacement_4g", "lte_replacement", "replacement_lte", "replacement"}: - mapping[c] = "suggested_replacement" - elif k in {"advanced_5g_option", "replacement_5g", "fiveg_replacement", "5g_replacement", "upgrade_5g"}: - mapping[c] = "advanced_5g_option" - elif k in {"region", "market"}: - mapping[c] = "region" - elif k in {"notes", "note"}: - mapping[c] = "notes" - elif k in {"description", "device_description", "name"}: - mapping[c] = "description" - - df = df.rename(columns=mapping).copy() - - # Create expected columns if missing - if "sku" not in df.columns: - # Try the common capitalized header as a fallback - if "SKU" in df.columns: - df["sku"] = df["SKU"].astype(str) - else: - df["sku"] = "" - - if "manufacturer" not in df.columns: - df["manufacturer"] = "" - - if "device_type" not in df.columns: - df["device_type"] = "" - - if "description" not in df.columns: - # If the simplified file removed description, use SKU as description (still searchable) - df["description"] = df["sku"].astype(str) - - if "notes" not in df.columns: - df["notes"] = "" - - if "region" not in df.columns: - df["region"] = "" - - if "suggested_replacement" not in df.columns: - df["suggested_replacement"] = "" - - if "advanced_5g_option" not in df.columns: - df["advanced_5g_option"] = "" - - if "end_of_sale" not in df.columns: - df["end_of_sale"] = "" - - if "end_of_life" not in df.columns: - df["end_of_life"] = "" - - return df - -df_eos = _canonize_eos_columns(df_eos) - - -def region_ok(x: Any) -> bool: - s = str(x or "").strip().lower() - if not s: - return True - if "not specified" in s: - return True - if "north america" in s: - return True - if re.search(r"\busa\b", s): - return True - if re.search(r"\bunited\s+states\b", s): - return True - if re.search(r"\bu\.?s\.?\b", s): - return True - return False - -if "region" in df_eos.columns: - df_eos = df_eos[df_eos["region"].apply(region_ok)].reset_index(drop=True) - -# Maker mapping (includes Teltonika) -CANON_MAKER = { - "CRADLEPOINT": {"cradlepoint", "ericsson", "ericsson enterprise wireless"}, - "SIERRA": {"sierra", "sierra wireless", "semtech", "airlink"}, - "FEENEY": {"feeney", "feeney wireless", "inseego"}, - "DIGI": {"digi", "accelerated", "accelerated concepts"}, - "CISCO_MERAKI": {"meraki", "cisco meraki"}, - "CISCO": {"cisco"}, - "TELTONIKA": {"teltonika"}, -} - -def canon_maker_from_text(s: Any) -> str: - t = norm_text(s) - for canon, terms in CANON_MAKER.items(): - for term in terms: - if term in t: - return canon - return "UNKNOWN" - -df_eos["_canon_make"] = df_eos["manufacturer"].apply(canon_maker_from_text) if "manufacturer" in df_eos.columns else "UNKNOWN" -df_eos["_norm_sku"] = df_eos["sku"].apply(norm_text) if "sku" in df_eos.columns else "" -df_eos["_norm_desc"] = df_eos["description"].apply(norm_text) if "description" in df_eos.columns else "" -df_eos["_norm_notes"] = df_eos["notes"].apply(norm_text) if "notes" in df_eos.columns else "" - -df_dec["_canon_make"] = df_dec["Make"].apply(canon_maker_from_text) if "Make" in df_dec.columns else "UNKNOWN" -df_dec["_norm_model"] = df_dec["Model"].apply(norm_text) if "Model" in df_dec.columns else "" -df_dec["_is5g"] = df_dec["Modem Type"].apply(is_5g) if "Modem Type" in df_dec.columns else False - - -# ============================ -# Date helpers -# ============================ @dataclass -class ParsedDate: - raw: str - kind: str - value: Optional[date] +class DeviceDetails: + make: str = "" + model: str = "" + modem_type: str = "" + rugged: str = "" # Yes/No/Unknown + wifi: str = "" # WiFi type, or "None" + primary_use_case: str = "" + serial: str = "" # Yes/No/Unknown + throughput: str = "" # keep as text + antennas: str = "" # keep as text + ethernet_ports_total: Optional[int] = None + battery: str = "" # Yes/No/Unknown + dual_sim: str = "" # Yes/No/Unknown + cell_mimo: str = "" # 2x2 / 4x4 + link: str = "" # manufacturer page or datasheet + raw: Dict[str, str] = field(default_factory=dict) -def parse_date_field(x: Any) -> ParsedDate: - raw = str(x or "").strip() - if not raw: - return ParsedDate(raw="", kind="missing", value=None) - - # Common US formats: M/D/YY or M/D/YYYY (e.g., 6/24/24, 9/30/21) - for fmt in ("%m/%d/%y", "%m/%d/%Y", "%-m/%-d/%y", "%-m/%-d/%Y"): - try: - dt = datetime.strptime(raw, fmt).date() - return ParsedDate(raw=raw, kind="full", value=dt) - except Exception: - pass - - # ISO-ish: YYYY - if re.fullmatch(r"\d{4}", raw): - y = int(raw) - if y == TODAY.year: - return ParsedDate(raw=raw, kind="year", value=date(y, 1, 1)) - if y < TODAY.year: - return ParsedDate(raw=raw, kind="year", value=date(y, 1, 1)) - return ParsedDate(raw=raw, kind="year", value=date(y, 12, 31)) - - # YYYY-MM - if re.fullmatch(r"\d{4}-\d{2}", raw): - try: - y, m = raw.split("-") - return ParsedDate(raw=raw, kind="year_month", value=date(int(y), int(m), 1)) - except Exception: - return ParsedDate(raw=raw, kind="bad", value=None) - - # YYYY-MM-DD - if re.fullmatch(r"\d{4}-\d{2}-\d{2}", raw): - try: - dt = datetime.strptime(raw, "%Y-%m-%d").date() - return ParsedDate(raw=raw, kind="full", value=dt) - except Exception: - return ParsedDate(raw=raw, kind="bad", value=None) - - # Last resort: leave as raw (unparsed) - return ParsedDate(raw=raw, kind="bad", value=None) - - if re.fullmatch(r"\d{4}-\d{2}-\d{2}", raw): - try: - dt = datetime.strptime(raw, "%Y-%m-%d").date() - return ParsedDate(raw=raw, kind="full", value=dt) - except Exception: - return ParsedDate(raw=raw, kind="bad", value=None) - - return ParsedDate(raw=raw, kind="bad", value=None) - -def display_date(pd_: ParsedDate) -> str: - if pd_.kind == "missing": - return "Not listed" - if pd_.kind == "bad": - return pd_.raw or "Not listed" - return pd_.raw - -def status_from_eos_eol(eos: ParsedDate, eol: ParsedDate) -> str: - if eos.value is None and eol.value is None: - return "Unknown" - if eol.value is not None and eol.value <= TODAY: - return "End of Life" - if eos.value is not None and eos.value <= TODAY: - return "End of Sale" - return "Active" - -def row_to_dates_and_status(row: pd.Series) -> Tuple[str, str, str]: - eos = parse_date_field(row.get("end_of_sale")) - eol = parse_date_field(row.get("end_of_life")) - return display_date(eos), display_date(eol), status_from_eos_eol(eos, eol) - - -# ============================ -# Embeddings + Parsec index -# ============================ -embedder = SentenceTransformer(EMBED_MODEL_NAME) - -def extract_pdf_text_pages(path: str) -> List[str]: - doc = fitz.open(path) - return [doc[i].get_text("text") for i in range(len(doc))] - -def build_parsec_cards(pages: List[str]) -> List[str]: - cards = [] - for p in pages: - for m in re.finditer(r"Standard\s+SKU:", p): - start = max(0, m.start() - PARSEC_CONTEXT_BEFORE) - end = min(len(p), m.start() + PARSEC_CONTEXT_AFTER) - c = p[start:end].strip() - if len(c) >= 200: - cards.append(c) - out, seen = [], set() - for c in cards: - h = hashlib.sha1(c.encode("utf-8")).hexdigest() - if h not in seen: - seen.add(h); out.append(c) - return out + def is_5g(self) -> bool: + txt = f"{self.modem_type} {self.model} {self.raw.get('Notes','')}".lower() + return "5g" in txt -parsec_cards = build_parsec_cards(extract_pdf_text_pages(PARSEC_PDF)) -parsec_emb = embedder.encode(parsec_cards, batch_size=64, show_progress_bar=False, normalize_embeddings=True) -parsec_emb = np.asarray(parsec_emb, dtype=np.float32) -parsec_index = faiss.IndexFlatIP(parsec_emb.shape[1]) -parsec_index.add(parsec_emb) +# ----------------------------- +# Small helpers +# ----------------------------- -# ============================ -# Device resolution -# ============================ -def label_for_row(i: int) -> str: - r = df_eos.iloc[i] - return f"{r.get('sku','')} — {r.get('manufacturer','')} — {r.get('description','')}"[:220] -EOS_LABELS = [label_for_row(i) for i in range(len(df_eos))] -EOS_CORPUS = [] -for _, r in df_eos.iterrows(): - EOS_CORPUS.append(" ".join([r.get("_norm_sku",""), r.get("_canon_make",""), r.get("_norm_desc",""), r.get("_norm_notes","")])) +_SKU_CLEAN_RE = re.compile(r"[^A-Z0-9\-]+") -def local_candidates(query: str, top_k: int = 6) -> List[Tuple[int, int, str]]: - q = norm_text(query) - hits = process.extract(q, EOS_CORPUS, scorer=fuzz.WRatio, limit=top_k) - return [(int(idx), int(score), EOS_LABELS[int(idx)]) for _, score, idx in hits] -def gpt_choose_device(user_text: str, candidates: List[Tuple[int,int,str]]) -> Dict[str, Any]: - if client is None: - return {} - sys = "Pick which router the user meant. Never invent. Return strict JSON only." - payload = { - "user_input": user_text, - "candidates": [{"row_idx": i, "score": s, "label": lbl} for (i,s,lbl) in candidates], - "rules": [ - "If one is clearly correct, return mode='ok' with row_idx.", - "If two are plausible, return mode='pick' with top 2 options." - ], - "output_schema": {"mode":"ok|pick","row_idx":"int","options":[{"row_idx":"int","label":"string"}]} - } - return gpt_json(sys, payload, max_tokens=280) - -def resolve_device(user_text: str) -> Dict[str, Any]: - q = norm_text(user_text) - exact = df_eos.index[df_eos["_norm_sku"] == q].tolist() - if len(exact) == 1: - return {"mode":"ok","row_idx": int(exact[0])} - if len(exact) > 1: - opts = [{"row_idx": int(i), "label": EOS_LABELS[int(i)]} for i in exact[:2]] - return {"mode":"pick","options": opts} - - cands = local_candidates(user_text, top_k=6) - if not cands: - return {"mode":"not_found"} - - if cands[0][1] >= 95 and (len(cands) == 1 or (cands[0][1] - cands[1][1]) >= 8): - return {"mode":"ok","row_idx": cands[0][0]} - - g = gpt_choose_device(user_text, cands) - if g.get("mode") == "ok" and isinstance(g.get("row_idx"), int): - return {"mode":"ok","row_idx": int(g["row_idx"])} - - if g.get("mode") == "pick": - opts = g.get("options", []) or [] - opts2 = [{"row_idx": int(o["row_idx"]), "label": str(o["label"])} for o in opts[:2] if "row_idx" in o] - if opts2: - return {"mode":"pick","options": opts2} - - if len(cands) > 1: - return {"mode":"pick","options":[{"row_idx":cands[0][0],"label":cands[0][2]},{"row_idx":cands[1][0],"label":cands[1][2]}]} - return {"mode":"pick","options":[{"row_idx":cands[0][0],"label":cands[0][2]}]} - - -# ============================ -# Replacements — lifecycle CSV source of truth -# ============================ -def extract_model_token(text: str) -> str: - s = safe_str(text) - if not s: +def norm_sku(s: str) -> str: + """Normalize a user-entered SKU for matching.""" + s = (s or "").strip().upper() + return _SKU_CLEAN_RE.sub("", s) + + +def clean_str(x: Any) -> str: + if x is None: return "" - parts = [p.strip() for p in s.split("|") if p.strip()] - candidates = parts[::-1] if parts else [s] - for cand in candidates: - m = re.search(r"\bRUT[A-Z]?\d{2,4}\b", cand.upper()) - if m: - return m.group(0).upper() - m = re.search(r"\bIX\d{2}\b", cand, flags=re.IGNORECASE) - if m: - return m.group(0).upper() - m = re.search(r"\b(R\d{3,4}|E\d{3,4}|S\d{3,4})\b", cand, flags=re.IGNORECASE) - if m: - return m.group(0).upper() - m = re.search(r"\b[A-Z]{1,6}\d{2,4}[A-Z]?\b", cand.upper()) - if m: - return m.group(0).upper() - return candidates[0][:60] + s = str(x) + if s.strip().lower() in {"nan", "none"}: + return "" + return s.strip() -def device_is_4g(row: pd.Series) -> bool: - # Detect LTE/4G even when the description uses "Cat 4 / Cat6 / Cat 12" without saying "LTE" - t = norm_text(row.get("description","")) + " " + norm_text(row.get("notes","")) + " " + norm_text(row.get("sku","")) - # If it explicitly says 5G/NR, treat as not 4G-only - if ("5g" in t) or ("nr" in t): - return False +def first_nonempty(*vals: str) -> str: + for v in vals: + v = clean_str(v) + if v: + return v + return "" - # Classic signals - if ("lte" in t) or ("4g" in t): - return True - # LTE category signals (Cat 1..20 are LTE categories; Cat M1/M2 are LTE-M) - if re.search(r"\bcat\s*[-]?\s*(m1|m2)\b", t): - return True +def safe_int(x: Any) -> Optional[int]: + try: + return int(str(x).strip()) + except Exception: + return None - m = re.search(r"\bcat\s*[-]?\s*(\d{1,2})\b", t) - if m: - try: - cat = int(m.group(1)) - if 0 < cat <= 20: - return True - except Exception: - pass - # If "cat" appears at all, it's almost always LTE-family - if "cat" in t: - return True +def looks_like_sku(s: str) -> bool: + """Heuristic: user typed something like RUT240, IBR900, CBA850, etc.""" + s2 = norm_sku(s) + return len(s2) >= 4 and any(c.isdigit() for c in s2) - return False - # If it explicitly says 5G/NR, treat as not 4G-only - if ("5g" in t) or ("nr" in t): +def parse_yes_no(val: str) -> str: + v = clean_str(val).lower() + if not v: + return "Unknown" + if v in {"y", "yes", "true", "1"}: + return "Yes" + if v in {"n", "no", "false", "0"}: + return "No" + # handle mixed text + if "yes" in v: + return "Yes" + if "no" in v: + return "No" + return val.strip() + + +def is_wifi_present(wifi_type: str) -> bool: + v = clean_str(wifi_type).lower() + if not v: + return False + if v in {"none", "n/a", "na", "no"}: return False + return True + + +def extract_models_from_text(text: str, known_models: List[str]) -> List[str]: + """Find known model tokens inside a longer string.""" + t = (text or "") + if not t: + return [] + t_up = t.upper() + hits: List[str] = [] + for m in known_models: + if m and m.upper() in t_up: + hits.append(m) + # De-dupe while keeping order + seen: set[str] = set() + out: List[str] = [] + for h in hits: + if h.upper() not in seen: + out.append(h) + seen.add(h.upper()) + return out + - # Classic signals - if ("lte" in t) or ("4g" in t): - return True +_PORT_TOKEN_RE = re.compile(r"(\d+)\s*[x×]\s*") - # LTE category signals (Cat 1..20 are LTE categories; Cat M1/M2 are LTE-M) - if re.search(r"\bcat\s*[-]?\s*(m1|m2)\b", t): - return True - m = re.search(r"\bcat\s*[-]?\s*(\d{1,2})\b", t) - if m: - try: - cat = int(m.group(1)) - if 0 < cat <= 20: - return True - except Exception: - pass +def parse_port_count(text: str) -> int: + """Best-effort parse of strings like '1x GbE', '2 x FE', 'WAN: 1x, LAN: 4x'.""" + t = clean_str(text) + if not t: + return 0 - # If "cat" appears at all, it's almost always LTE-family - if "cat" in t: - return True + total = 0 + for m in _PORT_TOKEN_RE.finditer(t.lower()): + n = safe_int(m.group(1)) + if n: + total += n - return False + # Fallback: if the string is literally '1' or '2' + if total == 0: + n = safe_int(t) + if n is not None and 0 <= n <= 16: + total = n + return total -def candidate_5g_models_from_lifecycle(manufacturer: str) -> List[str]: - mfr = norm_text(manufacturer) - pool = df_eos[df_eos["manufacturer"].astype(str).str.lower().eq(mfr)].copy() if "manufacturer" in df_eos.columns else df_eos.copy() - vals = pool["advanced_5g_option"].tolist() if "advanced_5g_option" in pool.columns else [] - out, seen = [], set() - for v in vals: - tok = extract_model_token(v) - if tok and tok.lower() != "nan" and tok not in seen: - seen.add(tok); out.append(tok) - return out -def candidate_4g_models_from_lifecycle(manufacturer: str) -> List[str]: - mfr = norm_text(manufacturer) - pool = df_eos[df_eos["manufacturer"].astype(str).str.lower().eq(mfr)].copy() if "manufacturer" in df_eos.columns else df_eos.copy() - vals = pool["suggested_replacement"].tolist() if "suggested_replacement" in pool.columns else [] - out, seen = [], set() - for v in vals: - tok = extract_model_token(v) - if tok and tok.lower() != "nan" and tok not in seen: - seen.add(tok); out.append(tok) +def best_effort_total_ethernet(wan: str, lan: str) -> Optional[int]: + wan_n = parse_port_count(wan) + lan_n = parse_port_count(lan) + total = wan_n + lan_n + return total if total > 0 else None + + +def fit_badges(details: DeviceDetails) -> List[str]: + """Derive simple badges for quick scanning.""" + badges: List[str] = [] + + use_case = clean_str(details.primary_use_case).lower() + if "vehicle" in use_case or "mobile" in use_case or "fleet" in use_case: + badges.append("Vehicle") + if "fixed" in use_case or "branch" in use_case or "site" in use_case or "industrial" in use_case: + badges.append("Fixed site") + + if is_wifi_present(details.wifi): + badges.append("Wi-Fi") + + if parse_yes_no(details.rugged) == "Yes": + badges.append("Rugged") + + if parse_yes_no(details.dual_sim) == "Yes": + badges.append("Dual-SIM") + + if clean_str(details.cell_mimo): + if "4" in details.cell_mimo and "x" in details.cell_mimo: + badges.append("4x4 MIMO") + elif "2" in details.cell_mimo and "x" in details.cell_mimo: + badges.append("2x2 MIMO") + + # Very rough check for higher throughput. + thr = clean_str(details.throughput).lower() + if any(tok in thr for tok in ["1 gb", "1000", "2 gb", "2000", "multi-g", "gig"]): + badges.append("High throughput") + + if parse_yes_no(details.serial) == "Yes": + badges.append("Serial") + + return badges + + +# ----------------------------- +# CSV loading +# ----------------------------- + + +def _read_csv(path: Path) -> List[Dict[str, str]]: + if not path.exists(): + raise FileNotFoundError(f"Missing file: {path}") + + with path.open("r", encoding="utf-8-sig", newline="") as f: + reader = csv.DictReader(f) + rows: List[Dict[str, str]] = [] + for r in reader: + rows.append({k: clean_str(v) for k, v in (r or {}).items()}) + return rows + + +@lru_cache(maxsize=1) +def load_eos_records() -> List[EosRecord]: + rows = _read_csv(EOS_CSV_PATH) + out: List[EosRecord] = [] + for r in rows: + out.append( + EosRecord( + sku=clean_str(r.get("sku")), + manufacturer=clean_str(r.get("manufacturer")), + end_of_sale=clean_str(r.get("end_of_sale")), + end_of_life=clean_str(r.get("end_of_life")), + description=clean_str(r.get("description")), + suggested_replacement=clean_str(r.get("suggested_replacement")), + advanced_5g_option=clean_str(r.get("advanced_5g_option")), + region=clean_str(r.get("region")), + source=clean_str(r.get("source")), + source_detail=clean_str(r.get("source_detail")), + notes=clean_str(r.get("notes")), + ) + ) return out -def gpt_pick_from_candidates(old_row: pd.Series, candidates: List[str], need: str) -> str: - if client is None or not candidates: - return "" - sys = "Pick the best replacement model. Choose only from candidates. Return strict JSON only." - payload = { - "old_device": { - "sku": str(old_row.get("sku","")), - "manufacturer": str(old_row.get("manufacturer","")), - "description": str(old_row.get("description","")), - "need": need, - }, - "candidates": candidates[:40], - "output_schema": {"choice":"string"} - } - out = gpt_json(sys, payload, max_tokens=240) or {} - choice = str(out.get("choice","") or "").strip() - return choice if choice in candidates else "" - -def fallback_5g_from_dec(canon_make: str) -> str: - pool5 = df_dec[(df_dec["_canon_make"] == canon_make) & (df_dec["_is5g"] == True)] - return str(pool5.iloc[0]["Model"]).strip() if not pool5.empty else "" - -def pick_replacements_lifecycle(row: pd.Series, status: str, use_gpt: bool = True) -> Dict[str, Any]: - canon = str(row.get("_canon_make","UNKNOWN")) - manufacturer = str(row.get("manufacturer","") or "") - - sug_raw = safe_str(row.get("suggested_replacement","")) - adv_raw = safe_str(row.get("advanced_5g_option","")) - - has_4g_alt = bool(sug_raw.strip()) - has_5g_alt = bool(adv_raw.strip()) - - # Treat as 4G if the description indicates LTE OR lifecycle provides a 4G suggested replacement - is_4g = device_is_4g(row) or has_4g_alt - - # Provide 5G option if the unit is 4G, EOS/EOL, or lifecycle explicitly provides advanced_5g_option - want_5g = is_4g or (status in {"End of Sale","End of Life"}) or has_5g_alt - - # 4G alternative: show whenever lifecycle provides it (or device appears 4G) - repl_4g = "Not applicable" - if is_4g or has_4g_alt: - repl_4g = extract_model_token(sug_raw) - if not repl_4g: - cand4 = candidate_4g_models_from_lifecycle(manufacturer) - repl_4g = (gpt_pick_from_candidates(row, cand4, "4G alternative") if (use_gpt and client) else "") or (cand4[0] if cand4 else "") - if not repl_4g: - repl_4g = "Not applicable" - - # 5G replacement: prefer lifecycle advanced_5g_option whenever present - repl_5g = "Not listed" - if want_5g: - repl_5g = extract_model_token(adv_raw) - if not repl_5g: - cand5 = candidate_5g_models_from_lifecycle(manufacturer) - repl_5g = (gpt_pick_from_candidates(row, cand5, "5G replacement/upgrade") if (use_gpt and client) else "") or (cand5[0] if cand5 else "") - if not repl_5g: - repl_5g = fallback_5g_from_dec(canon) or "Not listed" - - if repl_5g.lower() == "nan": - repl_5g = "Not listed" - - return {"repl_4g": repl_4g, "repl_5g": repl_5g, "sources": ["lifecycle_csv"] + (["gpt"] if (use_gpt and client) else [])} - - -# ============================ -# Antennas (Parsec-only) -# ============================ -PARSEC_FAMILY_WORDS = {"chinook","labrador","boxer","bloodhound","husky","beagle","mastiff","collie","shepherd","belgian","australian","terrier","pyrenees"} -BAD_NAME_MARKERS = {"customization","standard connectors","connectors","features","benefits","specifications","mechanical","electrical","mounting","accessories","description:","standard sku"} - -def clean_line(s: str) -> str: - s = re.sub(r"\s+", " ", str(s or "").strip()) - if re.fullmatch(r"-[a-z0-9]+", s.lower()): - return "" - return s - -def is_bad_name_line(line: str) -> bool: - low = line.lower() - if any(m in low for m in BAD_NAME_MARKERS): - return True - if re.search(r"\b-[a-z0-9]{1,4}\b", low) and len(low) <= 25: - return True - return False - -def family_from_line(line: str) -> str: - low = line.lower() - for fam in PARSEC_FAMILY_WORDS: - if fam in low: - return fam.capitalize() - return "" -def parsec_connectors_from_card(t: str) -> str: - m = re.search(r"Standard\s+Connectors:\s*(.+)", t, flags=re.IGNORECASE) - if m: - return re.sub(r"\s+", " ", m.group(1).strip())[:80] - return "" +@lru_cache(maxsize=1) +def eos_index() -> Dict[str, EosRecord]: + """Index by normalized SKU.""" + idx: Dict[str, EosRecord] = {} + for rec in load_eos_records(): + key = norm_sku(rec.sku) + if key: + idx[key] = rec + return idx + + +@lru_cache(maxsize=1) +def eos_choices() -> List[str]: + """List of raw SKU strings, for fuzzy match UI.""" + return [r.sku for r in load_eos_records() if clean_str(r.sku)] + + +@lru_cache(maxsize=1) +def load_catalog_rows() -> List[Dict[str, str]]: + return _read_csv(CATALOG_CSV_PATH) + -def parsec_mounts_from_card(t: str) -> List[str]: - mounts = [] - for m in re.finditer(r"Mount:\s*(.+)", t, flags=re.IGNORECASE): - val = re.sub(r"\s+", " ", m.group(1).strip()) - parts = [p.strip().lower() for p in val.split(",") if p.strip()] - mounts.extend(parts) - out = [] - seen = set() - for x in mounts: - if x not in seen: - seen.add(x); out.append(x) +@lru_cache(maxsize=1) +def catalog_models() -> List[str]: + models: List[str] = [] + for r in load_catalog_rows(): + m = clean_str(r.get("Model")) + if m: + models.append(m) + # De-dupe + seen: set[str] = set() + out: List[str] = [] + for m in models: + mu = m.upper() + if mu not in seen: + out.append(m) + seen.add(mu) return out -def parsec_name_from_card(card_text: str) -> str: - lines = [clean_line(ln) for ln in str(card_text or "").splitlines()] - lines = [ln for ln in lines if ln] - for ln in lines: - if is_bad_name_line(ln): +@lru_cache(maxsize=1) +def catalog_index_by_model() -> Dict[str, Dict[str, str]]: + idx: Dict[str, Dict[str, str]] = {} + for r in load_catalog_rows(): + m = clean_str(r.get("Model")) + if not m: continue - fam = family_from_line(ln) - if fam: - return fam - - sku_i = None - for i, ln in enumerate(lines): - if "standard sku" in ln.lower(): - sku_i = i - break - if sku_i is not None: - window = lines[max(0, sku_i - 12):sku_i] - for ln in reversed(window): - if is_bad_name_line(ln): - continue - if 3 <= len(ln) <= 40 and re.search(r"[A-Za-z]", ln): - return ln.split()[0].capitalize() - - return "Parsec antenna" - -def parsec_part_from_card(t: str) -> str: - m = re.search(r"Standard\s+SKU:\s*([A-Z0-9]+)", t) - return m.group(1).strip() if m else "" - -def parsec_desc_from_card(t: str) -> str: - m = re.search(r"Description:\s*(.+?)(?:\n|$)", t, flags=re.IGNORECASE) - return re.sub(r"\s+"," ",m.group(1).strip())[:220] if m else "" - -def parsec_retrieve(query: str, top_k: int = 12) -> List[Dict[str, Any]]: - qv = embedder.encode([query], normalize_embeddings=True) - qv = np.asarray(qv, dtype=np.float32) - scores, ids = parsec_index.search(qv, top_k) - out: List[Dict[str, Any]] = [] - for sc, i in zip(scores[0].tolist(), ids[0].tolist()): - if 0 <= int(i) < len(parsec_cards): - card = parsec_cards[int(i)] - out.append({ - "score": float(sc), - "name": parsec_name_from_card(card), - "part_number": parsec_part_from_card(card), - "description": parsec_desc_from_card(card), - "connectors": parsec_connectors_from_card(card), - "mounts": parsec_mounts_from_card(card), - "_card": card.lower(), - }) - return out + idx[m.upper()] = r + return idx -def choose_best_parsec(cands: List[Dict[str, Any]], mode: str) -> Dict[str, Any]: - best = None - best_score = -1e9 - - for c in cands: - card = c.get("_card","") - mounts = c.get("mounts", []) or [] - score = float(c.get("score", 0.0)) - - if "omni" in card: - score += 0.6 - if "directional" in card: - score -= 1.5 - - if mode == "vehicle": - if any("magnetic" in m for m in mounts): - score += 3.0 - if any("through" in m for m in mounts): - score += 2.0 - if any("wall" in m for m in mounts) or any("pole" in m for m in mounts): - score -= 1.2 - if "app: fixed" in card and "mobile" not in card: - score -= 2.0 - - if mode == "stationary": - if any("wall" in m for m in mounts): - score += 2.0 - if any("pole" in m for m in mounts): - score += 1.8 - - if score > best_score: - best_score = score - best = c - - if not best: - return {"name":"Parsec antenna","part_number":"","description":"","connectors":"","mounts":[]} - - best = dict(best) - best.pop("_card", None) - return best - - -def infer_mimo_for_5g(repl_5g_model: str) -> str: - """Rule: every 5G router uses a 4x4 antenna.""" - return "4x4" - - # If the model name hints 5G, lean 4x4 - if "5g" in model.lower() or model.upper().startswith(("R", "E", "S", "IX", "RUTM")): - default = "4x4" - else: - default = "2x2" - # Use dec2025routers.csv if we can match the model under the same maker family - try: - pool = df_dec[df_dec["_canon_make"] == canon_make].copy() - if pool.empty: - return default - hit = process.extractOne(norm_text(model), pool["_norm_model"].tolist(), scorer=fuzz.WRatio) - if not hit or hit[1] < MATCH_OK: - return default - row = pool.iloc[int(hit[2])] - txt2 = (str(row.get("Antennas (internal/external/both)", "")) + " " + str(row.get("Modem Type", "")) + " " + str(row.get("Special notes",""))).lower() - if "4x4" in txt2 or "4 x 4" in txt2 or "4x 4" in txt2: - return "4x4" - if "2x2" in txt2 or "2 x 2" in txt2: - return "2x2" - # If modem type includes 5G, lean 4x4 - if "5g" in txt2 or "nr" in txt2: - return "4x4" - return default - except Exception: - return default +def catalog_lookup(model: str) -> Optional[Dict[str, str]]: + if not model: + return None + return catalog_index_by_model().get(model.strip().upper()) + -def antenna_options_for(router_model: str, tech: str, mimo: str) -> Dict[str, Any]: - q_stationary = f"{router_model} {tech} {mimo} omni stationary pole wall fixed site Parsec" - q_vehicle = f"{router_model} {tech} {mimo} omni vehicle mobile magnetic through-bolt Parsec" +# ----------------------------- +# OpenAI client + calls +# ----------------------------- - cand_stationary = parsec_retrieve(q_stationary, top_k=12) - cand_vehicle = parsec_retrieve(q_vehicle, top_k=12) - s = choose_best_parsec(cand_stationary, mode="stationary") - v = choose_best_parsec(cand_vehicle, mode="vehicle") +@lru_cache(maxsize=1) +def get_openai_client() -> Optional[Any]: + if OpenAI is None: + return None - s.update({"mimo": mimo, "why": "Stationary omni best match."}) - v.update({"mimo": mimo, "why": "Vehicle omni best match."}) + api_key = os.getenv("OPENAI_API_KEY", "").strip() + if not api_key: + return None - return {"stationary_omni": s, "vehicle_omni": v, "sources":["parsec_rag"]} + try: + # OpenAI python SDK v1 + return OpenAI(api_key=api_key) + except Exception: + return None + + +def openai_chat( + *, + model: str, + messages: List[Dict[str, str]], + max_tokens: int = OPENAI_MAX_TOKENS, + temperature: float = OPENAI_TEMPERATURE, +) -> str: + """Small wrapper that works across common OpenAI SDK shapes.""" + client = get_openai_client() + if client is None: + return "" + # Try chat.completions first. + try: + resp = client.chat.completions.create( + model=model, + messages=messages, + temperature=temperature, + max_tokens=max_tokens, + timeout=OPENAI_TIMEOUT_SEC, + ) + return (resp.choices[0].message.content or "").strip() + except Exception as e: + log.warning("OpenAI chat call failed: %s", e) -# ============================ -# Install-ready checklist -# ============================ -def install_ready_checklist(current_sku: str, repl: Dict[str,Any], ant: Dict[str,Any]) -> str: - st = ant.get("stationary_omni", {}) - vh = ant.get("vehicle_omni", {}) - if client is not None: - sys = "Create a short, install-ready checklist for a Verizon rep. Return markdown only." - payload = {"current_device": current_sku, "replacements": repl, "antennas": {"stationary": st, "vehicle": vh}} + # Fallback to responses API when available. + try: resp = client.responses.create( - model=OPENAI_MODEL, - reasoning=OPENAI_REASONING, - input=[{"role":"system","content":sys},{"role":"user","content":json.dumps(payload)}], - max_output_tokens=520, + model=model, + input=messages, + temperature=temperature, + max_output_tokens=max_tokens, + timeout=OPENAI_TIMEOUT_SEC, ) - return (getattr(resp, "output_text", "") or "").strip() - return "\n".join([ - "### Install-ready checklist", - f"- Current device: {current_sku}", - f"- 5G replacement: {repl.get('repl_5g','')}", - f"- 4G alternative: {repl.get('repl_4g','Not applicable')}", - f"- Stationary omni antenna: {st.get('name','')} (PN {st.get('part_number','')})", - f"- Vehicle omni antenna: {vh.get('name','')} (PN {vh.get('part_number','')})", - "- Next steps: confirm mounting + cable lengths + power; place order; schedule install.", - ]) - - -# ============================ -# Batch mode (NO GPT) -# ============================ -def parse_batch_inputs(text_blob: str, file_obj: Any) -> List[str]: - items: List[str] = [] - if file_obj is not None: - try: - path = file_obj.name if hasattr(file_obj, "name") else str(file_obj) - df = pd.read_csv(path) - col = df.columns[0] - items.extend([str(x).strip() for x in df[col].tolist() if str(x).strip()]) - except Exception: - pass - if text_blob: - for ln in str(text_blob).splitlines(): - ln = ln.strip() - if ln: - items.append(ln) - seen=set() - out=[] - for x in items: - k=norm_text(x) - if k and k not in seen: - seen.add(k); out.append(x) - return out + # Best-effort text extraction. + if hasattr(resp, "output_text"): + return (resp.output_text or "").strip() + return "" + except Exception as e: + log.warning("OpenAI responses call failed: %s", e) + return "" -def run_batch(text_blob: str, file_obj: Any, include_antennas: bool): - inputs = parse_batch_inputs(text_blob, file_obj) - if not inputs: - return "", None, None, "" - rows=[] - for item in inputs: - res = resolve_device(item) - if res.get("mode") != "ok": - rows.append({"Input": item, "Matched":"", "Status":"Needs review", "EOS":"", "EOL":"", "4G alternative":"", "5G replacement":"", "Notes":"Not found/ambiguous"}) - continue +def try_parse_json(text: str) -> Optional[Dict[str, Any]]: + """Parse JSON from an LLM response. - life_row = df_eos.iloc[int(res["row_idx"])] - eos, eol, status = row_to_dates_and_status(life_row) - repl = pick_replacements_lifecycle(life_row, status, use_gpt=False) - - rows.append({ - "Input": item, - "Matched": str(life_row.get("sku","")), - "Status": status, - "EOS": eos, - "EOL": eol, - "4G alternative": repl.get("repl_4g",""), - "5G replacement": repl.get("repl_5g",""), - "Notes": "", - }) - - out_df = pd.DataFrame(rows) - counts = out_df["Status"].value_counts(dropna=False).to_dict() - top_5g = out_df["5G replacement"].value_counts(dropna=False).head(5).to_dict() - summary = f"Rows: {len(out_df)} | " + " | ".join([f"{k}: {v}" for k,v in counts.items()]) - rollup = "Top 5G recommendations:\n" + "\n".join([f"- {k}: {v}" for k,v in top_5g.items() if str(k).strip()]) - - tmp = tempfile.NamedTemporaryFile(delete=False, suffix=".csv") - out_df.to_csv(tmp.name, index=False) - - return summary, out_df, tmp.name, rollup - - -# ============================ -# Replacement feature table + manufacturer link (5G device) -# ============================ - -FEATURE_COLS = ["Device", "Modem technology", "WiFi", "Ports", "Antennas", "Ruggedness", "Use case"] - -# Manufacturer domains used for best-effort link resolution (no non-maker domains). -MAKER_DOMAINS = { - "CRADLEPOINT": ["cradlepoint.com", "ericsson.com"], - "SIERRA": ["semtech.com", "airlink.com"], - "FEENEY": ["inseego.com"], - "DIGI": ["digi.com"], - "CISCO_MERAKI": ["meraki.cisco.com", "cisco.com"], - "CISCO": ["cisco.com"], - "TELTONIKA": ["teltonika-networks.com"], - "UNKNOWN": [], -} - -HTTP_HEADERS = { - "User-Agent": "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 " - "(KHTML, like Gecko) Chrome/120.0 Safari/537.36" -} -HTTP_TIMEOUT = 12 - -def _best_effort_manufacturer_url(model: str, canon_make: str) -> str: - """Try to find a manufacturer page or datasheet link using simple on-domain searches. - If we can't confirm a page, return the manufacturer homepage for the maker family. + The model is asked for strict JSON. + This still handles the common case where JSON is wrapped in fences. """ - model = str(model or "").strip() - if not model or model in {"Not listed", "Not applicable"}: - return "" + raw = clean_str(text) + if not raw: + return None - domains = MAKER_DOMAINS.get(canon_make, []) or [] - if not domains: - return "" + # Strip ```json fences + raw = raw.strip() + raw = re.sub(r"^```json\s*", "", raw, flags=re.IGNORECASE).strip() + raw = re.sub(r"^```\s*", "", raw, flags=re.IGNORECASE).strip() + raw = re.sub(r"```\s*$", "", raw).strip() - # Candidate on-domain search URLs (common patterns across sites). - # We keep these on the manufacturer domain (no Google/Bing). - q = re.sub(r"\s+", "+", model) - url_candidates = [] - for d in domains: - url_candidates += [ - f"https://{d}/search?q={q}", - f"https://{d}/search?query={q}", - f"https://{d}/?s={q}", - f"https://www.{d}/search?q={q}", - f"https://www.{d}/search?query={q}", - f"https://www.{d}/?s={q}", - ] + try: + obj = json.loads(raw) + if isinstance(obj, dict): + return obj + return None + except Exception: + return None - # Also try a few direct product patterns for known makers (best effort). - if canon_make == "TELTONIKA": - slug = model.lower() - url_candidates += [ - f"https://teltonika-networks.com/products/routers/{slug}", - f"https://teltonika-networks.com/product/{slug}", - "https://teltonika-networks.com/products/routers/", - ] - if canon_make == "DIGI": - url_candidates += [ - "https://www.digi.com/products/networking/cellular-routers", - f"https://www.digi.com/search?q={q}", - ] - if canon_make == "CRADLEPOINT": - url_candidates += [ - "https://cradlepoint.com/products/", - f"https://cradlepoint.com/?s={q}", - ] - if canon_make in {"CISCO", "CISCO_MERAKI"}: - url_candidates += [ - f"https://www.cisco.com/c/en/us/search.html?q={q}", - ] - # Try to confirm a working page (HTTP 200 and model string somewhere in HTML). - for u in url_candidates[:18]: - try: - import requests - r = requests.get(u, headers=HTTP_HEADERS, timeout=HTTP_TIMEOUT, allow_redirects=True) - if r.status_code != 200: - continue - html = (r.text or "").lower() - if model.lower() in html or "datasheet" in html or "data sheet" in html: - return r.url - except Exception: - continue +def validate_url(url: str) -> str: + """Return url if it looks usable. No throw.""" + u = clean_str(url) + if not u: + return "" + if not (u.startswith("http://") or u.startswith("https://")): + return "" - # Fallback: maker homepage - d0 = domains[0] - return f"https://{d0}" + if requests is None: + return u -def _fetch_page_text(url: str, max_chars: int = 12000) -> str: - """Fetch page HTML and return a simplified text blob for GPT (best effort).""" - if not url: - return "" + # Quick HEAD/GET check; if blocked, keep the URL anyway. try: - import requests - r = requests.get(url, headers=HTTP_HEADERS, timeout=HTTP_TIMEOUT, allow_redirects=True) - if r.status_code != 200: - return "" - html = r.text or "" - html = re.sub(r"(?is).*?", " ", html) - html = re.sub(r"(?is).*?", " ", html) - text = re.sub(r"(?is)<[^>]+>", " ", html) - text = re.sub(r"\s+", " ", text).strip() - return text[:max_chars] + r = requests.head(u, timeout=HTTP_TIMEOUT_SEC, allow_redirects=True) + if 200 <= int(getattr(r, "status_code", 0)) < 400: + return u except Exception: - return "" + pass + try: + r = requests.get(u, timeout=HTTP_TIMEOUT_SEC, allow_redirects=True) + if 200 <= int(getattr(r, "status_code", 0)) < 400: + return u + except Exception: + pass + + return u + + +# ----------------------------- +# Device detail build + GPT fill +# ----------------------------- + + +def details_from_catalog_row(row: Dict[str, str]) -> DeviceDetails: + """Convert a Dec 2025 catalog row into the normalized DeviceDetails.""" + det = DeviceDetails( + make=clean_str(row.get("Make")), + model=clean_str(row.get("Model")), + modem_type=clean_str(row.get("Modem Type")), + rugged=parse_yes_no(row.get("Ruggedization", "")), + wifi=clean_str(row.get("WiFi type")), + primary_use_case=clean_str(row.get("Primary use case")), + serial=parse_yes_no(row.get("Serial port", "")), + throughput=clean_str(row.get("Router throughput")), + antennas=clean_str(row.get("Antennas")), + ethernet_ports_total=best_effort_total_ethernet( + row.get("WAN ports and speed", ""), + row.get("LAN ports and speed", ""), + ), + battery=parse_yes_no(row.get("Battery", "")), + # Dual-SIM is not a column in this sheet today. + dual_sim="Unknown", + raw=row, + ) -def _features_from_dec(model: str, canon_make: str) -> Dict[str, str]: - """Lookup a router model in dec2025routers.csv and return the key feature fields.""" - if not model or model in {"Not listed", "Not applicable"}: - return {k: "Not listed" for k in FEATURE_COLS[1:]} + # Best-effort cellular MIMO guess from the Antennas cell. + ant = det.antennas.lower() + if "4x4" in ant or "4 x 4" in ant: + det.cell_mimo = "4x4" + elif "2x2" in ant or "2 x 2" in ant: + det.cell_mimo = "2x2" - pool = df_dec[df_dec["_canon_make"] == canon_make].copy() - if pool.empty: - return {k: "Not listed" for k in FEATURE_COLS[1:]} + return det - hit = process.extractOne(norm_text(model), pool["_norm_model"].tolist(), scorer=fuzz.WRatio) - if not hit or hit[1] < MATCH_OK: - return {k: "Not listed" for k in FEATURE_COLS[1:]} - r = pool.iloc[int(hit[2])] - ports = f"WAN: {r.get('WAN ports and speed','')} | LAN: {r.get('LAN ports and speed','')}" - return { - "Modem technology": str(r.get("Modem Type","")) or "Not listed", - "WiFi": str(r.get("WiFi type","")) or "Not listed", - "Ports": ports.strip() if ports.strip() else "Not listed", - "Antennas": str(r.get("Antennas (internal/external/both)","")) or "Not listed", - "Ruggedness": str(r.get("Ruggedization","")) or "Not listed", - "Use case": str(r.get("Primary use case","")) or "Not listed", - } +def enforce_5g_rules(det: DeviceDetails) -> DeviceDetails: + """Apply hard rules that should never vary.""" + if det.is_5g(): + det.cell_mimo = "4x4" # always + return det -def _gpt_fill_feature_row(device_label: str, model: str, canon_make: str, row: Dict[str, str], manufacturer_url: str = "", page_text: str = "") -> Dict[str, str]: - """If dec can't supply values, ask GPT to fill missing ones (best guess).""" - if client is None: - return row - missing = [k for k,v in row.items() if (not v) or str(v).strip().lower() in {"not listed","nan",""}] - if not missing: - return row +@lru_cache(maxsize=256) +def _gpt_enrich_missing_fields_cached( + make: str, + model: str, + is_5g: bool, + catalog_blob: str, + want_link: bool, +) -> dict: + """Cached GPT call used by :func:`gpt_fill_missing_fields`. - sys = ( - "Fill missing router feature fields for a Verizon rep. Return strict JSON only. " - "Use manufacturer page text when available. If still unknown, make a best-guess." + Cache key is (make, model, is_5g, catalog_blob, want_link). + """ + client = get_openai_client() + if client is None: + return {} + + system = ( + "You are helping a Verizon seller prep a device swap. " + "The catalog text is the main reference. " + "For dual_sim and ethernet_ports_total, only use the catalog text. " + "If not stated, return null. " + "For link, you may propose an official manufacturer product page or datasheet URL " + "if you are confident it is correct; else return null. " + "Return strict JSON only." ) - payload = { - "device_label": device_label, - "model": model, - "maker_family": canon_make, - "manufacturer_url": manufacturer_url, - "manufacturer_page_text": page_text[:8000], - "known": row, - "fill_only": missing, - "rules": ["Fill only requested fields.", "Short phrases only.", "Return JSON only."], - "output_schema": {k: "string" for k in missing}, + + user = { + "task": "Fill missing device fields.", + "device_make": make, + "device_model": model, + "device_is_5g": is_5g, + "catalog_text": catalog_blob, + "fields": { + "dual_sim": "Yes/No or null", + "ethernet_ports_total": "integer or null", + "link": "URL string or null", + }, + "notes": { + "link_requested": want_link, + "cell_mimo_rule": "If device_is_5g is true, cellular MIMO is 4x4 (already enforced elsewhere).", + }, } - out = gpt_json(sys, payload, max_tokens=320) or {} - for k in missing: - val = str(out.get(k, "") or "").strip() - if val: - row[k] = val - return row - missing = [k for k,v in row.items() if (not v) or str(v).strip().lower() in {"not listed","nan",""}] - if not missing: - return row - - sys = "Fill missing router feature fields for a Verizon rep. Return strict JSON only." - payload = { - "device_label": device_label, - "model": model, - "maker_family": canon_make, - "known": row, - "fill_only": missing, - "rules": [ - "Fill only the requested fields.", - "Best guess if needed. Short phrases only.", - "Return JSON only." + + resp = openai_chat( + model=OPENAI_MODEL, + messages=[ + {"role": "system", "content": system}, + {"role": "user", "content": json.dumps(user, ensure_ascii=False)}, ], - "output_schema": {k: "string" for k in missing} - } - out = gpt_json(sys, payload, max_tokens=260) or {} - for k in missing: - val = str(out.get(k, "") or "").strip() - if val: - row[k] = val - return row - -def build_replacement_features_table(repl_4g: str, repl_5g: str, canon_make: str) -> pd.DataFrame: - rows = [] - - # 4G alternative row - row4 = _features_from_dec(repl_4g, canon_make) - url4 = _best_effort_manufacturer_url(repl_4g, canon_make) if repl_4g else "" - txt4 = _fetch_page_text(url4) if url4 else "" - row4 = _gpt_fill_feature_row("4G alternative", repl_4g, canon_make, row4, manufacturer_url=url4, page_text=txt4) - rows.append({"Device": "4G alternative", **row4}) - - # 5G replacement row - row5 = _features_from_dec(repl_5g, canon_make) - url5 = _best_effort_manufacturer_url(repl_5g, canon_make) if repl_5g else "" - txt5 = _fetch_page_text(url5) if url5 else "" - row5 = _gpt_fill_feature_row("5G replacement", repl_5g, canon_make, row5, manufacturer_url=url5, page_text=txt5) - rows.append({"Device": "5G replacement", **row5}) - - df = pd.DataFrame(rows, columns=FEATURE_COLS) - return df -# ============================ -# Verizon fit badges (small table) for recommended devices -# ============================ - -FIT_COLS = ["Device", "Fit badges", "Ethernet ports", "Battery"] - -def _parse_ethernet_ports(wan_field: str, lan_field: str) -> str: - """Best-effort total ethernet ports based on WAN/LAN text.""" - def _count(field: str) -> int: - s = str(field or "") - # Common forms: "1x GbE", "2 x 10/100", "WAN: 1", etc. - nums = [int(x) for x in re.findall(r"(\\d+)\\s*x", s.lower())] - if nums: - return sum(nums) - # Fallback: if it contains 'port' with a number - m = re.search(r"(\\d+)\\s*port", s.lower()) - if m: - return int(m.group(1)) - # If it contains '1' and 'wan' in short text, guess 1 - if "wan" in s.lower() and re.search(r"\\b1\\b", s): - return 1 - return 0 + max_tokens=420, + temperature=0.0, + ) - total = _count(wan_field) + _count(lan_field) - return str(total) if total > 0 else "Not listed" + return try_parse_json(resp) or {} -def _battery_badge(battery_field: str) -> str: - s = str(battery_field or "").strip().lower() - if not s or s in {"none", "no", "n/a", "not listed"}: - return "No" - return "Yes" - -def _bool_badge(flag: bool) -> str: - return "Yes" if flag else "No" - -def _dual_sim_from_row_text(*fields: str) -> bool: - txt = " ".join([str(x or "") for x in fields]).lower() - return ("dual sim" in txt) or ("2 sim" in txt) or ("two sim" in txt) or ("dual-sim" in txt) - -def _throughput_high(throughput_field: str) -> bool: - t = str(throughput_field or "").lower() - # Heuristic: anything mentioning gbps or >=1000 mbps - if "gbps" in t: - return True - m = re.search(r"(\\d+(?:\\.\\d+)?)\\s*mbps", t) - if m: - try: - return float(m.group(1)) >= 1000.0 - except Exception: - pass - return False - -def _gpt_fit_badges(model: str, canon_make: str, is_5g: bool, dec_row: Optional[pd.Series]) -> Tuple[str, str, str]: - """ - GPT-based fill for Fit badges / Ethernet ports / Battery, used when dec is missing or incomplete. - Returns (badges_csv, ethernet_ports, battery_yesno). + +def gpt_fill_missing_fields(det: DeviceDetails) -> DeviceDetails: + """Fill missing fields via GPT. + + This only runs when we have gaps, then writes back into the passed DeviceDetails. """ - if client is None: - return ("Not listed", "Not listed", "Not listed") - - dec_ctx = {} - if dec_row is not None: - try: - dec_ctx = { - "Model": str(dec_row.get("Model","")), - "Modem Type": str(dec_row.get("Modem Type","")), - "Ruggedization": str(dec_row.get("Ruggedization","")), - "WAN ports and speed": str(dec_row.get("WAN ports and speed","")), - "LAN ports and speed": str(dec_row.get("LAN ports and speed","")), - "Antennas": str(dec_row.get("Antennas (internal/external/both)","")), - "WiFi type": str(dec_row.get("WiFi type","")), - "Primary use case": str(dec_row.get("Primary use case","")), - "Serial port": str(dec_row.get("Serial port (yes/no)","")), - "VPN": str(dec_row.get("VPN capabilities","")), - "Throughput": str(dec_row.get("Router throughput","")), - "Battery": str(dec_row.get("Battery (internal/removable/none/optional)","")), - "Special notes": str(dec_row.get("Special notes","")), - "Summary": str(dec_row.get("summary and use case","")), - } - except Exception: - dec_ctx = {} - - sys = ( - "You are helping a Verizon rep. Based on the provided router context, output fit badges and a couple quick traits.\n" - "Return STRICT JSON only.\n" - "Badges must be chosen from this set only:\n" - "['Vehicle','Fixed site','Wi‑Fi','Rugged','Dual‑SIM','4x4 MIMO','High throughput','Serial'].\n" - "Rules:\n" - "- If is_5g is true, ALWAYS include '4x4 MIMO'.\n" - "- Ethernet ports: return a single integer as a string if you can infer total ethernet ports, otherwise 'Not listed'.\n" - "- Battery: return 'Yes' or 'No' if you can infer, otherwise 'Not listed'.\n" - "- If uncertain between Vehicle vs Fixed site, pick the most likely based on use case/ruggedization.\n" + + if get_openai_client() is None: + return det + + # Only ask GPT when we have gaps. + needs_dual_sim = parse_yes_no(det.dual_sim) == "Unknown" + needs_ports = det.ethernet_ports_total is None + + # Links can be slow (and are not always needed). Default: fetch for 5G only. + links_mode = clean_str(os.getenv("FETCH_DEVICE_LINKS", "5g")).lower() # 5g | all | none + needs_link = False + if links_mode != "none" and not clean_str(det.link): + if links_mode == "all": + needs_link = True + elif links_mode == "5g" and det.is_5g(): + needs_link = True + + if not (needs_dual_sim or needs_ports or needs_link): + return det + + # Use the catalog row text as context. + catalog_blob = "\n".join([f"{k}: {v}" for k, v in det.raw.items() if clean_str(v)]) + + obj = _gpt_enrich_missing_fields_cached( + make=det.make, + model=det.model, + is_5g=det.is_5g(), + catalog_blob=catalog_blob, + want_link=needs_link, + ) + + if not obj: + return det + + if needs_dual_sim: + det.dual_sim = parse_yes_no(clean_str(obj.get("dual_sim"))) + + if needs_ports: + det.ethernet_ports_total = safe_int(obj.get("ethernet_ports_total")) + + if needs_link: + det.link = validate_url(clean_str(obj.get("link"))) + + return det + + +# ----------------------------- +# Lookup + recommendation +# ----------------------------- + + +def fuzzy_match_sku(query: str) -> Tuple[Optional[str], List[Tuple[str, int]]]: + """Return (autopick, ranked list).""" + q = clean_str(query) + if not q: + return None, [] + + # If rapidfuzz is missing, we cannot do ranked match. + if rf_process is None or rf_fuzz is None: + return None, [] + + choices = eos_choices() + ranked = rf_process.extract( + q, + choices, + scorer=rf_fuzz.WRatio, + limit=8, ) + ranked2: List[Tuple[str, int]] = [(m[0], int(m[1])) for m in ranked] + + if not ranked2: + return None, [] + + best_sku, best_score = ranked2[0] + second_score = ranked2[1][1] if len(ranked2) > 1 else 0 + + if best_score >= FUZZY_AUTOPICK_MIN and (best_score - second_score) >= FUZZY_AUTOPICK_GAP: + return best_sku, ranked2 + + return None, ranked2 + + +def find_eos_record(query: str) -> Tuple[Optional[EosRecord], List[Tuple[str, int]]]: + """Try exact SKU match, then fuzzy match.""" + + q_norm = norm_sku(query) + if q_norm and q_norm in eos_index(): + return eos_index()[q_norm], [] + + autopick, ranked = fuzzy_match_sku(query) + if autopick: + rec = eos_index().get(norm_sku(autopick)) + if rec: + return rec, ranked + + return None, ranked + + +def pick_model_for_replacement(text: str) -> Optional[str]: + """Pick the best catalog model that appears inside the replacement text.""" + hits = extract_models_from_text(text, catalog_models()) + if hits: + # prefer the longest token (tends to be the real model) + hits_sorted = sorted(hits, key=lambda x: len(x), reverse=True) + return hits_sorted[0] + return None + + +def build_device_details(model: str) -> Optional[DeviceDetails]: + row = catalog_lookup(model) + if not row: + return None + det = details_from_catalog_row(row) + det = enforce_5g_rules(det) + det = gpt_fill_missing_fields(det) + det = enforce_5g_rules(det) + return det + + +def build_recommendation(rec: EosRecord) -> Dict[str, Any]: + """Return a dict with 4G/5G device details, plus a small table and summary.""" + + # Replacement strings from the EoS table. + repl_4g_text = clean_str(rec.suggested_replacement) + repl_5g_text = clean_str(rec.advanced_5g_option) + + # Try to map them to catalog models. + repl_4g_model = pick_model_for_replacement(repl_4g_text) if repl_4g_text else None + repl_5g_model = pick_model_for_replacement(repl_5g_text) if repl_5g_text else None + + det_4g = build_device_details(repl_4g_model) if repl_4g_model else None + det_5g = build_device_details(repl_5g_model) if repl_5g_model else None + + # Feature table + table_cols = [ + "Role", + "Make", + "Model", + "Modem", + "Fit badges", + "Cell MIMO", + "Eth ports", + "Battery", + "Wi-Fi", + "Rugged", + "Dual-SIM", + "Serial", + "Throughput", + "Link", + ] + + def _row(role: str, d: Optional[DeviceDetails]) -> List[Any]: + if d is None: + return [role] + [""] * (len(table_cols) - 1) + badges = ", ".join(fit_badges(d)) + link = d.link + return [ + role, + d.make, + d.model, + d.modem_type, + badges, + d.cell_mimo, + d.ethernet_ports_total if d.ethernet_ports_total is not None else "", + parse_yes_no(d.battery), + d.wifi, + parse_yes_no(d.rugged), + parse_yes_no(d.dual_sim), + parse_yes_no(d.serial), + d.throughput, + link, + ] + + table_rows = [ + _row("4G option", det_4g), + _row("5G option", det_5g), + ] + + summary = customer_email_summary(rec, det_4g, det_5g) - payload = { - "model": model, - "maker_family": canon_make, - "is_5g": bool(is_5g), - "dec_context": dec_ctx, - "output_schema": { - "badges": ["string"], - "ethernet_ports": "string", - "battery": "Yes|No|Not listed" - } + return { + "eos": rec.__dict__, + "repl_4g_text": repl_4g_text, + "repl_5g_text": repl_5g_text, + "repl_4g_model": repl_4g_model or "", + "repl_5g_model": repl_5g_model or "", + "det_4g": det_4g.__dict__ if det_4g else {}, + "det_5g": det_5g.__dict__ if det_5g else {}, + "table_cols": table_cols, + "table_rows": table_rows, + "summary": summary, } - out = gpt_json(sys, payload, max_tokens=260) or {} - - badges = out.get("badges", []) or [] - allowed = {"Vehicle","Fixed site","Wi‑Fi","Rugged","Dual‑SIM","4x4 MIMO","High throughput","Serial"} - clean = [] - for b in badges: - bs = str(b).strip() - if bs in allowed: - clean.append(bs) - - if is_5g and "4x4 MIMO" not in clean: - clean.append("4x4 MIMO") - - eth = str(out.get("ethernet_ports","") or "").strip() - if not eth or eth.lower() in {"nan","none"}: - eth = "Not listed" - m = re.search(r"\d+", eth) - eth = m.group(0) if m else ("Not listed" if eth == "Not listed" else eth) - - bat = str(out.get("battery","") or "").strip() - if not bat: - bat = "Not listed" - if bat.lower().startswith("y"): - bat = "Yes" - elif bat.lower().startswith("n"): - bat = "No" - elif bat not in {"Yes","No","Not listed"}: - bat = "Not listed" - - dedup=[] - seen=set() - for b in clean: - if b not in seen: - seen.add(b); dedup.append(b) - badges_csv = ", ".join(dedup) if dedup else "Not listed" - return (badges_csv, eth, bat) - - -def _fit_badges_for_model(model: str, canon_make: str, is_5g: bool) -> Tuple[str, str, str]: - """Return (badges_csv, ethernet_ports, battery_yesno). Uses dec2025routers.csv first, then GPT fill.""" - model = str(model or "").strip() - if not model or model in {"Not listed", "Not applicable"}: - return ("Not listed", "Not listed", "Not listed") - - pool = df_dec[df_dec["_canon_make"] == canon_make].copy() - row = None - if not pool.empty: - hit = process.extractOne(norm_text(model), pool["_norm_model"].tolist(), scorer=fuzz.WRatio) - if hit and hit[1] >= MATCH_OK: - row = pool.iloc[int(hit[2])] - - badges = [] - eth = "Not listed" - bat_yes = "Not listed" - - if row is not None: - use_case = str(row.get("Primary use case","") or "").lower() - rugged = str(row.get("Ruggedization","") or "").lower() - - if any(k in use_case for k in ["vehicle","mobile","fleet","in-vehicle"]) or "vehicle" in rugged: - badges.append("Vehicle") - else: - badges.append("Fixed site") - wifi = str(row.get("WiFi type","") or "").strip() - if wifi and wifi.lower() not in {"none","no","n/a"}: - badges.append("Wi‑Fi") +def customer_email_summary(rec: EosRecord, det_4g: Optional[DeviceDetails], det_5g: Optional[DeviceDetails]) -> str: + """Customer-ready email-style summary.""" - if any(k in rugged for k in ["rugged","industrial","ip","harsh"]): - badges.append("Rugged") + sku = rec.sku + make = rec.manufacturer - notes_blob = " ".join([ - str(row.get("Special notes","") or ""), - str(row.get("summary and use case","") or ""), - ]).lower() - if "dual" in notes_blob and "sim" in notes_blob: - badges.append("Dual‑SIM") + eos = clean_str(rec.end_of_sale) + eol = clean_str(rec.end_of_life) - if is_5g: - badges.append("4x4 MIMO") + subject = f"Subject: Replacement options for {make} {sku}".strip() - thr = str(row.get("Router throughput","") or "").lower() - m = re.search(r"(\d+(\.\d+)?)\s*gb", thr) - if m: - try: - if float(m.group(1)) >= 1.0: - badges.append("High throughput") - except Exception: - pass - - serial = str(row.get("Serial port (yes/no)","") or "").strip().lower() - if serial in {"yes","y","true"}: - badges.append("Serial") - - wan = str(row.get("WAN ports and speed","") or "") - lan = str(row.get("LAN ports and speed","") or "") - m1 = re.search(r"(\d+)\s*x", wan.lower()) - m2 = re.search(r"(\d+)\s*x", lan.lower()) - if m1 or m2: - total = (int(m1.group(1)) if m1 else 0) + (int(m2.group(1)) if m2 else 0) - eth = str(total) if total > 0 else "Not listed" - - bat = str(row.get("Battery (internal/removable/none/optional)","") or "") - bat_l = bat.lower().strip() - if bat_l: - if "none" in bat_l: - bat_yes = "No" + lines: List[str] = [subject, "", "Hi," , "", "Quick heads-up on lifecycle timing:"] + + if eos: + lines.append(f"- End of sale: {eos}") + if eol: + lines.append(f"- End of life: {eol}") + + desc = clean_str(rec.description) + if desc: + lines.append(f"- Current device: {desc}") + + lines.append("") + lines.append("Recommended replacement paths:") + + def _device_block(label: str, det: Optional[DeviceDetails], fallback_text: str) -> None: + if det is None: + if fallback_text: + lines.append(f"- {label}: {fallback_text}") else: - bat_yes = "Yes" - - # Use GPT when anything is missing (instead of best-effort inference) - if (row is None) or (eth == "Not listed") or (bat_yes == "Not listed") or (not badges): - g_badges, g_eth, g_bat = _gpt_fit_badges(model, canon_make, is_5g, row) - - if badges: - if is_5g and "4x4 MIMO" not in badges: - badges.append("4x4 MIMO") - dedup=[] - seen=set() - for b in badges: - if b not in seen: - seen.add(b); dedup.append(b) - badges_csv = ", ".join(dedup) - else: - badges_csv = g_badges - - eth = eth if eth != "Not listed" else g_eth - bat_yes = bat_yes if bat_yes != "Not listed" else g_bat - return (badges_csv or "Not listed", eth or "Not listed", bat_yes or "Not listed") - - dedup=[] - seen=set() - for b in badges: - if b not in seen: - seen.add(b); dedup.append(b) - badges_csv = ", ".join(dedup) if dedup else "Not listed" - return (badges_csv, eth, bat_yes) - -def build_fit_table(repl_4g: str, repl_5g: str, canon_make: str) -> pd.DataFrame: - rows = [] - # 4G alt row (is_5g False) - b4, eth4, bat4 = _fit_badges_for_model(repl_4g, canon_make, is_5g=False) - rows.append({"Device": "4G alternative", "Fit badges": b4, "Ethernet ports": eth4, "Battery": bat4}) - # 5G row (is_5g True) - b5, eth5, bat5 = _fit_badges_for_model(repl_5g, canon_make, is_5g=True) - rows.append({"Device": "5G replacement", "Fit badges": b5, "Ethernet ports": eth5, "Battery": bat5}) - return pd.DataFrame(rows, columns=FIT_COLS) - -# ============================ -# Output -# ============================ -def assemble_output(life_row: pd.Series, status: str, eos: str, eol: str, repl: Dict[str,Any], ant: Dict[str,Any]) -> str: - current_name = f"{life_row.get('sku','')} — {life_row.get('description','')}".strip(" —") - st = ant.get("stationary_omni", {}) - vh = ant.get("vehicle_omni", {}) - - lines = [] - lines.append(f"1. Current device: **{current_name}**") - lines.append(f"2. Status: **{status}**") - lines.append(f"3. End of Sale date: **{eos}**") - lines.append(f"4. End of Life date: **{eol}**") - lines.append(f"5. 4G alternative (lifecycle): **{repl.get('repl_4g','Not applicable')}**") - lines.append(f"6. 5G replacement (lifecycle): **{repl.get('repl_5g','Not listed')}**") - lines.append("7. Antenna options (Parsec-only):") - conn_s = f" | Conn: {st.get('connectors','')}" if st.get("connectors") else "" - conn_v = f" | Conn: {vh.get('connectors','')}" if vh.get("connectors") else "" - lines.append(f" - Stationary (Omni): **{st.get('name','')}** (Part #: {st.get('part_number','')}) — {st.get('description','')} — MIMO: {st.get('mimo','')}{conn_s}") - lines.append(f" - Vehicle (Omni): **{vh.get('name','')}** (Part #: {vh.get('part_number','')}) — {vh.get('description','')} — MIMO: {vh.get('mimo','')}{conn_v}") - - lines.append("\nSources (debug):") - for s in repl.get("sources", []) if isinstance(repl.get("sources"), list) else []: - lines.append(f"- {s}") - lines.append("- ParsecCatalog.pdf (local RAG)") - lines.append("- routers_eos_eol_by_sku.csv (replacements)") - return "\n".join(lines) + lines.append(f"- {label}: (no suggestion on file)") + return + ports = det.ethernet_ports_total + ports_txt = f"{ports} total" if ports is not None else "Unknown" + bat = parse_yes_no(det.battery) -# ============================ -# Customer-ready email summary (single lookup only) -# ============================ + badges = fit_badges(det) + badges_txt = ", ".join(badges) if badges else "" -def build_customer_email(life_row: pd.Series, status: str, eos: str, eol: str, repl: Dict[str,Any], ant: Dict[str,Any], link5: str) -> str: - """Email-style summary the rep can paste to a customer (lightly sales-y).""" - current = f"{life_row.get('sku','')} — {life_row.get('description','')}".strip(" —") - repl5 = str(repl.get("repl_5g","") or "").strip() - repl4 = str(repl.get("repl_4g","") or "").strip() + line = f"- {label}: {det.make} {det.model} ({det.modem_type})" + if badges_txt: + line += f" | {badges_txt}" + lines.append(line) + lines.append(f" - Ethernet ports: {ports_txt}") + lines.append(f" - Battery: {bat}") + if clean_str(det.link): + lines.append(f" - Info link: {det.link}") - st = ant.get("stationary_omni", {}) or {} - vh = ant.get("vehicle_omni", {}) or {} + _device_block("4G option", det_4g, rec.suggested_replacement) + _device_block("5G option", det_5g, rec.advanced_5g_option) - lines = [] - lines.append("Subject: Router replacement recommendation") - lines.append("") - lines.append("Hi there,") - lines.append("") - lines.append(f"We reviewed your current router (**{current}**) and recommend the following path forward:") - lines.append("") - lines.append(f"- **Status:** {status}") - lines.append(f"- **End of Sale:** {eos}") - lines.append(f"- **End of Life:** {eol}") - lines.append("") - lines.append("**Recommended replacement (5G):**") - lines.append(f"- {repl5 if repl5 else 'Not listed'}") - if link5: - lines.append(f"- Manufacturer page (best effort): {link5}") - lines.append("") - lines.append("**Optional 4G alternative (if needed):**") - lines.append(f"- {repl4 if repl4 and repl4.lower() != 'not applicable' else 'Not applicable'}") - lines.append("") - lines.append("**Antenna suggestions (Parsec):**") - lines.append(f"- Stationary (Omni): {st.get('name','')} (PN {st.get('part_number','')})") - lines.append(f"- Vehicle (Omni): {vh.get('name','')} (PN {vh.get('part_number','')})") lines.append("") - lines.append("If you’d like, we can confirm the best-fit option for your install environment and provide pricing.") + lines.append("If you tell me vehicle vs fixed site (and indoor vs outdoor), I can sanity-check antennas too.") lines.append("") - lines.append("Contact Peter Dunn @ 786.999.9127 or peter.dunn@masterstelecom.com for pricing.") - lines.append("") - lines.append("Thanks,") - lines.append("Peter Dunn") - return "\n".join(lines) + lines.append(CONTACT_LINE) + + return "\n".join(lines).strip() + -def generate_customer_email(st_json: str) -> str: - st = state_load(st_json) - if not st or "row_idx" not in st: - return "Run a lookup first." +# ----------------------------- +# Chat state + handlers +# ----------------------------- + + +def state_default() -> Dict[str, Any]: + return { + "pending_choices": [], # list[str] + "last_reco": {}, + } + + +def state_to_json(st: Dict[str, Any]) -> str: + return json.dumps(st, ensure_ascii=False) + + +def state_from_json(st_json: str) -> Dict[str, Any]: try: - life_row = df_eos.iloc[int(st["row_idx"])] + obj = json.loads(st_json or "{}") + if isinstance(obj, dict): + return obj + return state_default() except Exception: - return "Run a lookup first." - - eos, eol, status = row_to_dates_and_status(life_row) - repl = st.get("repl", {}) or {} - ant = st.get("ant", {}) or {} - - canon_make = str(life_row.get("_canon_make","UNKNOWN")) - url5 = _best_effort_manufacturer_url(str(repl.get("repl_5g","") or ""), canon_make) - return build_customer_email(life_row, status, eos, eol, repl, ant, url5) - -# ============================ -# Gradio callbacks -# IMPORTANT: no dict state and ALL events have api_name=False (prevents api_info schema generation) -# ============================ -def run_lookup(user_text: str, st_json: str): - user_text = str(user_text or "").strip() - if not user_text: - return "Enter a router SKU/model.", "", None, None, "", gr.update(visible=False), gr.update(visible=False), "{}", "", "" - - res = resolve_device(user_text) - - if res.get("mode") == "pick": - opts = res.get("options", []) - choices = [o["label"] for o in opts] - st2 = {"mode":"pick","options": opts, "raw": user_text} - return "Did you mean A or B? Pick one, then click Use selection.", "", None, None, "", gr.update(choices=choices, value=None, visible=True), gr.update(visible=True), state_dump(st2), "", "" - - if res.get("mode") != "ok": - return "Not found.", "", None, None, "", gr.update(visible=False), gr.update(visible=False), "{}", "", "" - - life_row = df_eos.iloc[int(res["row_idx"])] - eos, eol, status = row_to_dates_and_status(life_row) - - repl = pick_replacements_lifecycle(life_row, status, use_gpt=True) - canon_make = str(life_row.get("_canon_make","UNKNOWN")) - mimo = infer_mimo_for_5g(repl.get("repl_5g","")) - tech = "5G" if repl.get("repl_5g") and repl.get("repl_5g") != "Not listed" else ("4G" if device_is_4g(life_row) else "Unknown") - ant = antenna_options_for(repl.get("repl_5g") or str(life_row.get("sku","")), tech, mimo) - - output = assemble_output(life_row, status, eos, eol, repl, ant) - st_out = {"row_idx": int(res["row_idx"]), "repl": repl, "ant": ant, "raw": user_text} - url5 = _best_effort_manufacturer_url(repl.get('repl_5g',''), canon_make) - link = f"**5G manufacturer page (best effort):** {url5}" if url5 else "" - feat_df = build_replacement_features_table(repl.get('repl_4g',''), repl.get('repl_5g',''), canon_make) - fit = build_fit_table(repl.get('repl_4g',''), repl.get('repl_5g',''), canon_make) - return output, link, feat_df, fit, "", gr.update(visible=False), gr.update(visible=False), state_dump(st_out), "", "" - -def use_selection(selected_label: str, st_json: str): - st = state_load(st_json) - if not st or st.get("mode") != "pick": - return "Run a search first.", "", None, None, "", gr.update(visible=False), gr.update(visible=False), "{}", "", "" - - if not selected_label: - return "Pick A or B first.", "", None, None, "", gr.update(visible=True), gr.update(visible=True), st_json, "", "" - - chosen_row = None - for o in st.get("options", []): - if o.get("label") == selected_label: - chosen_row = int(o["row_idx"]) - break - if chosen_row is None: - return "Pick a valid option.", "", None, None, "", gr.update(visible=True), gr.update(visible=True), st_json, "", "" - - life_row = df_eos.iloc[int(chosen_row)] - eos, eol, status = row_to_dates_and_status(life_row) - - repl = pick_replacements_lifecycle(life_row, status, use_gpt=True) - canon_make = str(life_row.get("_canon_make","UNKNOWN")) - mimo = infer_mimo_for_5g(repl.get("repl_5g","")) - tech = "5G" if repl.get("repl_5g") and repl.get("repl_5g") != "Not listed" else ("4G" if device_is_4g(life_row) else "Unknown") - ant = antenna_options_for(repl.get("repl_5g") or str(life_row.get("sku","")), tech, mimo) - - output = assemble_output(life_row, status, eos, eol, repl, ant) - st_out = {"row_idx": int(chosen_row), "repl": repl, "ant": ant, "raw": st.get("raw","")} - url5 = _best_effort_manufacturer_url(repl.get('repl_5g',''), canon_make) - link = f"**5G manufacturer page (best effort):** {url5}" if url5 else "" - feat_df = build_replacement_features_table(repl.get('repl_4g',''), repl.get('repl_5g',''), canon_make) - fit = build_fit_table(repl.get('repl_4g',''), repl.get('repl_5g',''), canon_make) - return output, link, feat_df, fit, "", gr.update(visible=False), gr.update(visible=False), state_dump(st_out), "", "" - -def make_install_ready(st_json: str): - st = state_load(st_json) - if not st or "row_idx" not in st: - return "Run a lookup first." - life_row = df_eos.iloc[int(st["row_idx"])] - current_sku = str(life_row.get("sku","") or "") - return install_ready_checklist(current_sku, st.get("repl", {}) or {}, st.get("ant", {}) or {}) - - - -# ============================ -# Q&A about the suggested device (post-recommendation) -# ============================ -def answer_question(question: str, st_json: str) -> str: - q = str(question or "").strip() - if not q: + return state_default() + + +def format_match_choices(ranked: List[Tuple[str, int]]) -> str: + if not ranked: return "" - st = state_load(st_json) - if not st or "repl" not in st: - return "Run a lookup first, then ask your question." - - repl = st.get("repl", {}) or {} - ant = st.get("ant", {}) or {} - repl5 = str(repl.get("repl_5g","") or "").strip() - repl4 = str(repl.get("repl_4g","") or "").strip() - # Pull a bit of dec context for the 5G model (if possible) - canon_make = "" - try: - # Try to infer maker family from stored row_idx - if "row_idx" in st: - row = df_eos.iloc[int(st["row_idx"])] - canon_make = str(row.get("_canon_make","UNKNOWN")) - except Exception: - canon_make = "" + top = ranked[:6] + lines = ["I found a few close matches. Reply with the number:"] + for i, (sku, score) in enumerate(top, 1): + lines.append(f"{i}) {sku} ({score})") + return "\n".join(lines) - # Manufacturer link (best effort) - url5 = _best_effort_manufacturer_url(repl5, canon_make) if repl5 else "" - # Feature table row for 5G (helps the LLM answer spec questions without web scraping) - feat5 = {} - try: - feat5 = _features_from_dec(repl5, canon_make) if repl5 else {} - except Exception: - feat5 = {} +def handle_user_message( + message: str, + chat_history: List[List[str]], + st_json: str, +) -> Tuple[List[List[str]], str, List[List[Any]], str, str]: + """Main chat handler. + + Returns: + - chat_history + - new_state_json + - table_rows + - summary_text + - links_md + """ + + st = state_from_json(st_json) + msg = clean_str(message) + + if not msg: + return chat_history, state_to_json(st), [], "", "" - sys = ( - "You are a Verizon field rep assistant. Answer questions about the suggested router in a fast, practical way. " - "Use the provided context; do not mention internal tools, prompts, embeddings, or databases. " - "If the question is about specs and the value is unknown, say 'Not listed' and suggest checking the manufacturer page. " - "Keep it concise and scannable." + # User wants a fresh case. + if msg.lower().strip() in {"new", "new case", "reset", "clear"}: + st = state_default() + chat_history = chat_history + [[msg, "Got it. Send a router SKU to start."]] + return chat_history, state_to_json(st), [], "", "" + + # Step 1: waiting for disambiguation selection + pending = st.get("pending_choices", []) or [] + if pending: + chosen_sku = "" + m = re.match(r"^(\d+)\s*$", msg) + if m: + idx = int(m.group(1)) - 1 + if 0 <= idx < len(pending): + chosen_sku = pending[idx] + else: + # User typed an SKU directly. + chosen_sku = msg + + rec = eos_index().get(norm_sku(chosen_sku)) + st["pending_choices"] = [] + + if not rec: + chat_history = chat_history + [[msg, "I couldn't map that choice to a device. Try the SKU again."]] + return chat_history, state_to_json(st), [], "", "" + + reco = build_recommendation(rec) + st["last_reco"] = reco + + assistant_msg = render_reco_message(rec, reco) + chat_history = chat_history + [[msg, assistant_msg]] + + return ( + chat_history, + state_to_json(st), + reco["table_rows"], + reco["summary"], + render_links_md(reco), + ) + + # Step 2: detect SKU lookup + if looks_like_sku(msg) or len(msg) <= 40: + rec, ranked = find_eos_record(msg) + + if rec is None: + if ranked: + # Ask user to pick + st["pending_choices"] = [r[0] for r in ranked[:6]] + assistant_msg = format_match_choices(ranked) + chat_history = chat_history + [[msg, assistant_msg]] + return chat_history, state_to_json(st), [], "", "" + + chat_history = chat_history + [[msg, "No match found in the lifecycle table."]] + return chat_history, state_to_json(st), [], "", "" + + reco = build_recommendation(rec) + st["last_reco"] = reco + + assistant_msg = render_reco_message(rec, reco) + chat_history = chat_history + [[msg, assistant_msg]] + + return ( + chat_history, + state_to_json(st), + reco["table_rows"], + reco["summary"], + render_links_md(reco), + ) + + # Step 3: treat as follow-up question + last = st.get("last_reco") or {} + if last: + answer = answer_question_with_context(msg, last) + if not answer: + answer = "I couldn't reach the model right now. Try again in a moment." + chat_history = chat_history + [[msg, answer]] + return chat_history, state_to_json(st), last.get("table_rows", []), last.get("summary", ""), render_links_md(last) + + chat_history = chat_history + [[msg, "Send a router SKU (example: IBR900) to get started."]] + return chat_history, state_to_json(st), [], "", "" + + +def render_reco_message(rec: EosRecord, reco: Dict[str, Any]) -> str: + sku = rec.sku + make = rec.manufacturer + + lines: List[str] = [] + lines.append(f"**{make} {sku}**") + + if clean_str(rec.end_of_sale) or clean_str(rec.end_of_life): + lines.append("Lifecycle:") + if clean_str(rec.end_of_sale): + lines.append(f"- End of sale: {rec.end_of_sale}") + if clean_str(rec.end_of_life): + lines.append(f"- End of life: {rec.end_of_life}") + + if clean_str(rec.description): + lines.append(f"Description: {rec.description}") + + lines.append("") + lines.append("Suggested replacements:") + + r4 = clean_str(reco.get("repl_4g_text", "")) + r5 = clean_str(reco.get("repl_5g_text", "")) + if r4: + lines.append(f"- 4G option: {r4}") + else: + lines.append("- 4G option: (none on file)") + + if r5: + lines.append(f"- 5G option: {r5}") + else: + lines.append("- 5G option: (none on file)") + + lines.append("") + lines.append("I dropped a feature table and a customer-ready email draft below.") + lines.append("Got questions about the suggested devices? Ask here.") + + return "\n".join(lines).strip() + + +def render_links_md(reco: Dict[str, Any]) -> str: + """Nice clickable links under the table.""" + det5 = reco.get("det_5g") or {} + det4 = reco.get("det_4g") or {} + + links: List[str] = [] + + l5 = clean_str(det5.get("link")) + if l5: + links.append(f"**5G info link:** {l5}") + + l4 = clean_str(det4.get("link")) + if l4: + links.append(f"**4G info link:** {l4}") + + return "\n\n".join(links) + + +def answer_question_with_context(question: str, reco: Dict[str, Any]) -> str: + """Answer user follow-up using device details as grounding.""" + + client = get_openai_client() + if client is None: + return "" + + # Keep context tight so we stay fast. + ctx = { + "router": reco.get("eos", {}), + "4g": reco.get("det_4g", {}), + "5g": reco.get("det_5g", {}), + } + + system = ( + "You are a router replacement assistant for a Verizon seller. " + "Answer using only the provided JSON context. " + "If the context lacks the answer, say what is missing and suggest what to check." ) - context = { - "recommended_5g": repl5, - "recommended_4g": repl4 if repl4 and repl4.lower() != "not applicable" else "", - "manufacturer_link_5g": url5, - "known_5g_features": feat5, - "antenna_stationary": ant.get("stationary_omni", {}), - "antenna_vehicle": ant.get("vehicle_omni", {}), + user = { + "context": ctx, + "question": question, } - user = "Context:\n" + json.dumps(context, ensure_ascii=False) + "\n\nQuestion:\n" + q - - ans = gpt_answer_md(sys, user, max_tokens=650) - # Small safety fallback - return ans if ans else "I couldn't generate an answer right now. Try again." - -# ============================ -# UI -# ============================ -with gr.Blocks(title="Only-Routers") as demo: - gr.Markdown("## Only-Routers\nSingle lookup + Batch upload for Verizon reps.") - - with gr.Tabs(): - with gr.Tab("Single"): - # Inputs - user_text = gr.Textbox( - label="Router SKU or model", - placeholder="Examples: IBR650B, AER1600, ES450, WR21, RUT240", - lines=1, - ) - st = gr.State("{}") # JSON string state - - # Actions - check_btn = gr.Button("Check", variant="primary") - pick_dd = gr.Dropdown(label="Pick A or B", choices=[], visible=False) - use_btn = gr.Button("Use selection", visible=False) - - # Main outputs - output_md = gr.Markdown() - link_md = gr.Markdown() - features_df = gr.Dataframe(headers=FEATURE_COLS, interactive=False, wrap=True) - fit_df = gr.Dataframe(headers=FIT_COLS, interactive=False, wrap=True) - qa_md = gr.Markdown() - - # Post-recommendation Q&A - gr.Markdown("### Questions about the suggested device?") - question_box = gr.Textbox( - label="Ask a question (optional)", - placeholder="Example: Does the 5G device support dual-SIM? How many ethernet ports? Does it support Wi‑Fi?", - lines=2, - ) - ask_btn = gr.Button("Ask", variant="secondary") - - # Install-ready checklist - install_btn = gr.Button("Make install-ready checklist") - install_md = gr.Markdown() - - # Customer-ready email summary - gr.Markdown("### Customer-ready email") - email_btn = gr.Button("Generate customer email") - customer_email_box = gr.Textbox(label="Email draft", lines=10) - - # Wiring (api_name=False avoids HF/Gradio API schema issues) - check_btn.click( - fn=run_lookup, - inputs=[user_text, st], - outputs=[output_md, link_md, features_df, fit_df, qa_md, pick_dd, use_btn, st, install_md, customer_email_box], - api_name=False, - ) - use_btn.click( - fn=use_selection, - inputs=[pick_dd, st], - outputs=[output_md, link_md, features_df, fit_df, qa_md, pick_dd, use_btn, st, install_md, customer_email_box], - api_name=False, - ) - ask_btn.click( - fn=answer_question, - inputs=[question_box, st], - outputs=[qa_md], - api_name=False, - ) - install_btn.click( - fn=make_install_ready, - inputs=[st], - outputs=[install_md], - api_name=False, - ) - email_btn.click( - fn=generate_customer_email, - inputs=[st], - outputs=[customer_email_box], - api_name=False, - ) + resp = openai_chat( + model=OPENAI_MODEL_QA, + messages=[ + {"role": "system", "content": system}, + {"role": "user", "content": json.dumps(user, ensure_ascii=False)}, + ], + max_tokens=500, + temperature=0.2, + ) - with gr.Tab("Batch"): - gr.Markdown("Paste one per line or upload a CSV (first column). Batch runs fast (no GPT).") - batch_text = gr.Textbox(label="Paste devices (one per line)", lines=8, placeholder="WR21\nRUT240\nIBR650B") - batch_file = gr.File(label="Upload CSV", file_types=[".csv"]) - include_ant = gr.Checkbox(label="Include antenna picks (slower)", value=False) - run_btn = gr.Button("Run batch", variant="primary") - - summary_md = gr.Markdown() - rollup_md = gr.Markdown() - table = gr.Dataframe(interactive=False, wrap=True) - dl = gr.File(label="Download results CSV") - - run_btn.click( - fn=run_batch, - inputs=[batch_text, batch_file, include_ant], - outputs=[summary_md, table, dl, rollup_md], - api_name=False, - ) + return clean_str(resp) + + +# ----------------------------- +# Gradio UI +# ----------------------------- + + +TABLE_HEADERS = [ + "Role", + "Make", + "Model", + "Modem", + "Fit badges", + "Cell MIMO", + "Eth ports", + "Battery", + "Wi-Fi", + "Rugged", + "Dual-SIM", + "Serial", + "Throughput", + "Link", +] + + +def build_demo() -> gr.Blocks: + with gr.Blocks(title="Only Routers") as demo: + gr.Markdown( + "# Only Routers\n" + "Type a router SKU to get a 4G + 5G replacement suggestion." + ) + + st = gr.State(value=state_to_json(state_default())) + + with gr.Row(): + with gr.Column(scale=3): + chatbot = gr.Chatbot(label="Chat") + user_in = gr.Textbox(label="Message", placeholder="Example: IBR900", lines=1) + send = gr.Button("Send") + clear = gr.Button("Clear") + + with gr.Column(scale=2): + gr.Markdown("### Replacement device details") + table = gr.Dataframe( + headers=TABLE_HEADERS, + value=[], + interactive=False, + ) + + links_md = gr.Markdown("") + + gr.Markdown("### Customer-ready email") + summary = gr.Textbox( + value="", + lines=14, + label="", + interactive=False, + show_copy_button=True, + ) -demo.launch(show_api=False) + def _on_send(msg: str, hist: List[List[str]], st_json: str): + hist2, st2, rows, summary_txt, links_txt = handle_user_message(msg, hist or [], st_json) + return "", hist2, st2, rows, summary_txt, links_txt + + send.click(_on_send, inputs=[user_in, chatbot, st], outputs=[user_in, chatbot, st, table, summary, links_md]) + user_in.submit(_on_send, inputs=[user_in, chatbot, st], outputs=[user_in, chatbot, st, table, summary, links_md]) + + def _on_clear(): + return [], state_to_json(state_default()), [], "", "" + + clear.click(_on_clear, inputs=[], outputs=[chatbot, st, table, summary, links_md]) + + return demo + + +demo = build_demo() + +if __name__ == "__main__": + demo.queue() + demo.launch( + server_name="0.0.0.0", + server_port=int(os.getenv("PORT", "7860")), + share=False, + show_api=False, + )