diff --git "a/app.py" "b/app.py" --- "a/app.py" +++ "b/app.py" @@ -2,10 +2,12 @@ import os import re import json import math +import time import hashlib import tempfile from dataclasses import dataclass from datetime import datetime, date +from functools import lru_cache from typing import Any, Dict, List, Optional, Tuple import numpy as np @@ -19,132 +21,124 @@ from rapidfuzz import fuzz, process import gradio as gr from openai import OpenAI +# ============================================================ +# Only-Routers (Chat, production-lean) +# - Fast model by default (no reasoning payload) +# - One LLM call max per lookup (enrichment only, cached) +# - No HTTP crawling during normal lookup (links are deterministic) +# - Timing logs to HF console when DEBUG_TIMING=1 +# ============================================================ -# ============================ +# ---------------------------- # Settings -# ============================ +# ---------------------------- TODAY = date(2026, 1, 18) -OPENAI_MODEL = "gpt-5.2" -OPENAI_REASONING = {"effort": "high"} -MATCH_OK = 80 -EMBED_MODEL_NAME = "sentence-transformers/all-MiniLM-L6-v2" -PARSEC_CONTEXT_BEFORE = 900 -PARSEC_CONTEXT_AFTER = 1600 +# Fast default model (override via env) +OPENAI_MODEL = os.getenv("OPENAI_MODEL", "gpt-5.2").strip() -import time +# Disable LLM at runtime: OPENAI_DISABLE=1 +OPENAI_DISABLE = os.getenv("OPENAI_DISABLE", "0").strip() == "1" + +# Timing logs +DEBUG_TIMING = os.getenv("DEBUG_TIMING", "0").strip() == "1" -def tlog(label, t0): - print(f"[TIMER] {label}: {time.perf_counter() - t0:.2f}s") +# Matching thresholds +MATCH_OK = 82 +MATCH_AUTOPICK = 95 +MATCH_GAP = 8 -# ============================ -# OpenAI client (HF Space secret: OPENAI_API_KEY) -# ============================ +# Embeddings +EMBED_MODEL_NAME = os.getenv("EMBED_MODEL_NAME", "sentence-transformers/all-MiniLM-L6-v2").strip() + +# Parsec PDF slicing +PARSEC_CONTEXT_BEFORE = 900 +PARSEC_CONTEXT_AFTER = 1600 + +# ---------------------------- +# OpenAI client +# ---------------------------- API_KEY = os.getenv("OPENAI_API_KEY", "").strip() -client = OpenAI(api_key=API_KEY) if API_KEY else None +client = None if (not API_KEY or OPENAI_DISABLE) else OpenAI(api_key=API_KEY) # ---------------------------- -# Gradio state helpers -# Keep state as a JSON STRING to avoid schema issues on Hugging Face. +# Timing helper # ---------------------------- -def state_load(st_json: str) -> Dict[str, Any]: +def _tlog(label: str, t0: float) -> None: + if DEBUG_TIMING: + dt = time.perf_counter() - t0 + print(f"[TIMER] {label}: {dt:.2f}s") + +# ---------------------------- +# JSON-safe helpers +# ---------------------------- +def _json_load_safe(s: str) -> Dict[str, Any]: try: - if not st_json: - return {} - return json.loads(st_json) if isinstance(st_json, str) else {} + return json.loads(s) except Exception: return {} -def state_dump(st: Dict[str, Any]) -> str: +def _json_dump_safe(obj: Any) -> str: try: - return json.dumps(st or {}, ensure_ascii=False) + return json.dumps(obj, ensure_ascii=False) except Exception: return "{}" +# ---------------------------- +# Gradio state helpers (string JSON only) +# ---------------------------- +def state_load(st_json: str) -> Dict[str, Any]: + try: + return json.loads(st_json) if isinstance(st_json, str) and st_json else {} + except Exception: + return {} +def state_dump(st: Dict[str, Any]) -> str: + return _json_dump_safe(st or {}) -# ============================ -# Helpers -# ============================ -def norm_text(s: Any) -> str: +# ---------------------------- +# Normalization +# ---------------------------- +def norm_text(x: Any) -> str: try: - if s is None or (isinstance(s, float) and math.isnan(s)) or pd.isna(s): + if x is None or (isinstance(x, float) and math.isnan(x)) or pd.isna(x): return "" except Exception: pass - s = str(s).strip().lower() + s = str(x).strip().lower() s = re.sub(r"[^a-z0-9\s\-\/]", " ", s) s = re.sub(r"\s+", " ", s).strip() return s -def safe_str(v: Any) -> str: - if v is None or (isinstance(v, float) and pd.isna(v)) or pd.isna(v): +def safe_str(x: Any) -> str: + if x is None or (isinstance(x, float) and pd.isna(x)) or pd.isna(x): return "" - return str(v).strip() + return str(x).strip() -def is_5g(modem_type: Any) -> bool: - s = norm_text(modem_type) - return ("5g" in s) or ("nr" in s) - -def json_load_safe(s: str) -> Dict[str, Any]: - try: - return json.loads(s) - except Exception: - return {} - -def gpt_json(system: str, payload: Dict[str, Any], max_tokens: int = 600) -> Dict[str, Any]: - if client is None: - return {} - resp = client.responses.create( - model=OPENAI_MODEL, - reasoning=OPENAI_REASONING, - input=[{"role":"system","content":system},{"role":"user","content":json.dumps(payload)}], - max_output_tokens=max_tokens, - ) - return json_load_safe(getattr(resp, "output_text", "") or "") - - -def gpt_answer_md(system: str, user: str, max_tokens: int = 650) -> str: - """Return a rep-friendly markdown answer.""" - if client is None: - return "No API key is configured, so I can't answer detailed questions right now." - resp = client.responses.create( - model=OPENAI_MODEL, - reasoning=OPENAI_REASONING, - input=[ - {"role": "system", "content": system}, - {"role": "user", "content": user}, - ], - max_output_tokens=max_tokens, - ) - return (getattr(resp, "output_text", "") or "").strip() - - -# ============================ -# Load data -# ============================ -EOS_PATH = "routers_eos_eol_by_sku.csv" -DEC_PATH = "dec2025routers.csv" -PARSEC_PDF = "ParsecCatalog.pdf" +def is_5g_text(s: str) -> bool: + t = norm_text(s) + return ("5g" in t) or ("nr" in t) -if not os.path.exists(EOS_PATH): - raise FileNotFoundError(f"Missing {EOS_PATH} in repo.") -if not os.path.exists(DEC_PATH): - raise FileNotFoundError(f"Missing {DEC_PATH} in repo.") -if not os.path.exists(PARSEC_PDF): - raise FileNotFoundError(f"Missing {PARSEC_PDF} in repo.") +def is_4g_lte_family(row: pd.Series) -> bool: + # Treat LTE categories as 4G + t = norm_text(row.get("description", "")) + " " + norm_text(row.get("notes", "")) + if "5g" in t or "nr" in t: + return False + if "lte" in t or "4g" in t: + return True + if re.search(r"\bcat\s*[-]?\s*(m1|m2)\b", t): + return True + if re.search(r"\bcat\s*[-]?\s*\d{1,2}\b", t): + return True + if "cat" in t: + return True + return False -df_eos = pd.read_csv(EOS_PATH).copy() -df_dec = pd.read_csv(DEC_PATH).copy()# ---------------------------- -# Lifecycle CSV normalization (supports simplified format) # ---------------------------- -# New format example columns: -# SKU, manufacturer, Device Type, end_of_sale, end_of_life, suggested_replacement, advanced_5g_option -# We normalize to internal lowercase names and synthesize missing fields used by matching. +# Lifecycle CSV normalization +# ---------------------------- def _normalize_lifecycle_df(df: pd.DataFrame) -> pd.DataFrame: df = df.copy() - # map columns case-insensitively - col_map = {} lower_cols = {c.lower(): c for c in df.columns} def _pick(*names): @@ -153,140 +147,55 @@ def _normalize_lifecycle_df(df: pd.DataFrame) -> pd.DataFrame: return lower_cols[n.lower()] return None + col_map = {} + sku_col = _pick("sku", "SKU") if sku_col: col_map[sku_col] = "sku" + mfr_col = _pick("manufacturer", "Manufacturer") if mfr_col: col_map[mfr_col] = "manufacturer" + dt_col = _pick("device type", "Device Type", "device_type") if dt_col: col_map[dt_col] = "device_type" + eos_col = _pick("end_of_sale", "end of sale", "End of Sale", "eos") if eos_col: col_map[eos_col] = "end_of_sale" + eol_col = _pick("end_of_life", "end of life", "End of Life", "eol") if eol_col: col_map[eol_col] = "end_of_life" + sr_col = _pick("suggested_replacement", "Suggested Replacement") if sr_col: col_map[sr_col] = "suggested_replacement" + a5_col = _pick("advanced_5g_option", "Advanced 5G Option", "advanced 5g option") if a5_col: col_map[a5_col] = "advanced_5g_option" df = df.rename(columns=col_map) - # Ensure required columns exist for req in ["sku", "manufacturer", "device_type", "end_of_sale", "end_of_life", "suggested_replacement", "advanced_5g_option"]: if req not in df.columns: df[req] = "" - # Synthesize description/notes/region for backward compatibility (matching + display) - if "description" not in df.columns: - df["description"] = df["sku"].astype(str) - if "notes" not in df.columns: - df["notes"] = "" - if "region" not in df.columns: - df["region"] = "" - - return df - -df_eos = _normalize_lifecycle_df(df_eos) - - - - -def _canonize_eos_columns(df: pd.DataFrame) -> pd.DataFrame: - """Normalize lifecycle CSV column names (case-insensitive) and create expected columns.""" - # Map various header spellings to canonical names used by the app - mapping = {} - for c in df.columns: - k = str(c).strip().lower().replace(" ", "_") - if k in {"sku", "model", "device", "device_sku"}: - mapping[c] = "sku" - elif k in {"manufacturer", "make", "vendor"}: - mapping[c] = "manufacturer" - elif k in {"device_type", "type"}: - mapping[c] = "device_type" - elif k in {"end_of_sale", "eos", "end_sale", "end_of_sales"}: - mapping[c] = "end_of_sale" - elif k in {"end_of_life", "eol", "end_life"}: - mapping[c] = "end_of_life" - elif k in {"suggested_replacement", "replacement_4g", "lte_replacement", "replacement_lte", "replacement"}: - mapping[c] = "suggested_replacement" - elif k in {"advanced_5g_option", "replacement_5g", "fiveg_replacement", "5g_replacement", "upgrade_5g"}: - mapping[c] = "advanced_5g_option" - elif k in {"region", "market"}: - mapping[c] = "region" - elif k in {"notes", "note"}: - mapping[c] = "notes" - elif k in {"description", "device_description", "name"}: - mapping[c] = "description" - - df = df.rename(columns=mapping).copy() - - # Create expected columns if missing - if "sku" not in df.columns: - # Try the common capitalized header as a fallback - if "SKU" in df.columns: - df["sku"] = df["SKU"].astype(str) - else: - df["sku"] = "" - - if "manufacturer" not in df.columns: - df["manufacturer"] = "" - - if "device_type" not in df.columns: - df["device_type"] = "" - + # Compatibility fields used by matching/output if "description" not in df.columns: - # If the simplified file removed description, use SKU as description (still searchable) df["description"] = df["sku"].astype(str) - if "notes" not in df.columns: df["notes"] = "" - if "region" not in df.columns: df["region"] = "" - if "suggested_replacement" not in df.columns: - df["suggested_replacement"] = "" - - if "advanced_5g_option" not in df.columns: - df["advanced_5g_option"] = "" - - if "end_of_sale" not in df.columns: - df["end_of_sale"] = "" - - if "end_of_life" not in df.columns: - df["end_of_life"] = "" - return df -df_eos = _canonize_eos_columns(df_eos) - - -def region_ok(x: Any) -> bool: - s = str(x or "").strip().lower() - if not s: - return True - if "not specified" in s: - return True - if "north america" in s: - return True - if re.search(r"\busa\b", s): - return True - if re.search(r"\bunited\s+states\b", s): - return True - if re.search(r"\bu\.?s\.?\b", s): - return True - return False - -if "region" in df_eos.columns: - df_eos = df_eos[df_eos["region"].apply(region_ok)].reset_index(drop=True) - -# Maker mapping (includes Teltonika) +# ---------------------------- +# Maker mapping +# ---------------------------- CANON_MAKER = { "CRADLEPOINT": {"cradlepoint", "ericsson", "ericsson enterprise wireless"}, "SIERRA": {"sierra", "sierra wireless", "semtech", "airlink"}, @@ -305,19 +214,9 @@ def canon_maker_from_text(s: Any) -> str: return canon return "UNKNOWN" -df_eos["_canon_make"] = df_eos["manufacturer"].apply(canon_maker_from_text) if "manufacturer" in df_eos.columns else "UNKNOWN" -df_eos["_norm_sku"] = df_eos["sku"].apply(norm_text) if "sku" in df_eos.columns else "" -df_eos["_norm_desc"] = df_eos["description"].apply(norm_text) if "description" in df_eos.columns else "" -df_eos["_norm_notes"] = df_eos["notes"].apply(norm_text) if "notes" in df_eos.columns else "" - -df_dec["_canon_make"] = df_dec["Make"].apply(canon_maker_from_text) if "Make" in df_dec.columns else "UNKNOWN" -df_dec["_norm_model"] = df_dec["Model"].apply(norm_text) if "Model" in df_dec.columns else "" -df_dec["_is5g"] = df_dec["Modem Type"].apply(is_5g) if "Modem Type" in df_dec.columns else False - - -# ============================ -# Date helpers -# ============================ +# ---------------------------- +# Date parsing +# ---------------------------- @dataclass class ParsedDate: raw: str @@ -325,19 +224,23 @@ class ParsedDate: value: Optional[date] def parse_date_field(x: Any) -> ParsedDate: - raw = str(x or "").strip() + raw = safe_str(x) if not raw: return ParsedDate(raw="", kind="missing", value=None) - # Common US formats: M/D/YY or M/D/YYYY (e.g., 6/24/24, 9/30/21) - for fmt in ("%m/%d/%y", "%m/%d/%Y", "%-m/%-d/%y", "%-m/%-d/%Y"): + # MM/DD/YY or M/D/YY + if re.fullmatch(r"\d{1,2}/\d{1,2}/\d{2,4}", raw): try: - dt = datetime.strptime(raw, fmt).date() - return ParsedDate(raw=raw, kind="full", value=dt) + parts = raw.split("/") + m = int(parts[0]); d = int(parts[1]); y = int(parts[2]) + if y < 100: + y += 2000 + dt = date(y, m, d) + return ParsedDate(raw=f"{y:04d}-{m:02d}-{d:02d}", kind="full", value=dt) except Exception: - pass + return ParsedDate(raw=raw, kind="bad", value=None) - # ISO-ish: YYYY + # YYYY if re.fullmatch(r"\d{4}", raw): y = int(raw) if y == TODAY.year: @@ -350,21 +253,12 @@ def parse_date_field(x: Any) -> ParsedDate: if re.fullmatch(r"\d{4}-\d{2}", raw): try: y, m = raw.split("-") - return ParsedDate(raw=raw, kind="year_month", value=date(int(y), int(m), 1)) + dt = date(int(y), int(m), 1) + return ParsedDate(raw=raw, kind="year_month", value=dt) except Exception: return ParsedDate(raw=raw, kind="bad", value=None) # YYYY-MM-DD - if re.fullmatch(r"\d{4}-\d{2}-\d{2}", raw): - try: - dt = datetime.strptime(raw, "%Y-%m-%d").date() - return ParsedDate(raw=raw, kind="full", value=dt) - except Exception: - return ParsedDate(raw=raw, kind="bad", value=None) - - # Last resort: leave as raw (unparsed) - return ParsedDate(raw=raw, kind="bad", value=None) - if re.fullmatch(r"\d{4}-\d{2}-\d{2}", raw): try: dt = datetime.strptime(raw, "%Y-%m-%d").date() @@ -395,1250 +289,450 @@ def row_to_dates_and_status(row: pd.Series) -> Tuple[str, str, str]: eol = parse_date_field(row.get("end_of_life")) return display_date(eos), display_date(eol), status_from_eos_eol(eos, eol) +# ---------------------------- +# Files +# ---------------------------- +EOS_PATH = "routers_eos_eol_by_sku.csv" +DEC_PATH = "dec2025routers.csv" +PARSEC_PDF = "ParsecCatalog.pdf" -# ============================ -# Embeddings + Parsec index -# ============================ -embedder = SentenceTransformer(EMBED_MODEL_NAME) +if not os.path.exists(EOS_PATH): + raise FileNotFoundError(f"Missing {EOS_PATH} in repo.") +if not os.path.exists(DEC_PATH): + raise FileNotFoundError(f"Missing {DEC_PATH} in repo.") +if not os.path.exists(PARSEC_PDF): + raise FileNotFoundError(f"Missing {PARSEC_PDF} in repo.") -def extract_pdf_text_pages(path: str) -> List[str]: - doc = fitz.open(path) - return [doc[i].get_text("text") for i in range(len(doc))] +t0 = time.perf_counter() +df_eos = pd.read_csv(EOS_PATH).copy() +df_dec = pd.read_csv(DEC_PATH).copy() +df_eos = _normalize_lifecycle_df(df_eos) -def build_parsec_cards(pages: List[str]) -> List[str]: - cards = [] - for p in pages: - for m in re.finditer(r"Standard\s+SKU:", p): - start = max(0, m.start() - PARSEC_CONTEXT_BEFORE) - end = min(len(p), m.start() + PARSEC_CONTEXT_AFTER) - c = p[start:end].strip() - if len(c) >= 200: - cards.append(c) - out, seen = [], set() - for c in cards: - h = hashlib.sha1(c.encode("utf-8")).hexdigest() - if h not in seen: - seen.add(h); out.append(c) - return out - -parsec_cards = build_parsec_cards(extract_pdf_text_pages(PARSEC_PDF)) -parsec_emb = embedder.encode(parsec_cards, batch_size=64, show_progress_bar=False, normalize_embeddings=True) -parsec_emb = np.asarray(parsec_emb, dtype=np.float32) -parsec_index = faiss.IndexFlatIP(parsec_emb.shape[1]) -parsec_index.add(parsec_emb) +# Canon columns +df_eos["_canon_make"] = df_eos["manufacturer"].apply(canon_maker_from_text) +df_eos["_norm_sku"] = df_eos["sku"].apply(norm_text) +df_eos["_norm_desc"] = df_eos["description"].apply(norm_text) +df_eos["_norm_notes"] = df_eos["notes"].apply(norm_text) +df_dec["_canon_make"] = df_dec["Make"].apply(canon_maker_from_text) if "Make" in df_dec.columns else "UNKNOWN" +df_dec["_norm_model"] = df_dec["Model"].apply(norm_text) if "Model" in df_dec.columns else "" +df_dec["_is5g"] = df_dec["Modem Type"].apply(lambda x: is_5g_text(str(x))) if "Modem Type" in df_dec.columns else False +_tlog("load csv", t0) -# ============================ -# Device resolution -# ============================ -def label_for_row(i: int) -> str: +# ---------------------------- +# Build fuzzy corpus for device matching +# ---------------------------- +def _label_for_row(i: int) -> str: r = df_eos.iloc[i] return f"{r.get('sku','')} — {r.get('manufacturer','')} — {r.get('description','')}"[:220] -EOS_LABELS = [label_for_row(i) for i in range(len(df_eos))] +EOS_LABELS = [_label_for_row(i) for i in range(len(df_eos))] EOS_CORPUS = [] for _, r in df_eos.iterrows(): EOS_CORPUS.append(" ".join([r.get("_norm_sku",""), r.get("_canon_make",""), r.get("_norm_desc",""), r.get("_norm_notes","")])) -def local_candidates(query: str, top_k: int = 6) -> List[Tuple[int, int, str]]: - q = norm_text(query) - hits = process.extract(q, EOS_CORPUS, scorer=fuzz.WRatio, limit=top_k) - return [(int(idx), int(score), EOS_LABELS[int(idx)]) for _, score, idx in hits] - -def gpt_choose_device(user_text: str, candidates: List[Tuple[int,int,str]]) -> Dict[str, Any]: - if client is None: - return {} - sys = "Pick which router the user meant. Never invent. Return strict JSON only." - payload = { - "user_input": user_text, - "candidates": [{"row_idx": i, "score": s, "label": lbl} for (i,s,lbl) in candidates], - "rules": [ - "If one is clearly correct, return mode='ok' with row_idx.", - "If two are plausible, return mode='pick' with top 2 options." - ], - "output_schema": {"mode":"ok|pick","row_idx":"int","options":[{"row_idx":"int","label":"string"}]} - } - return gpt_json(sys, payload, max_tokens=280) +def resolve_device(term: str) -> Dict[str, Any]: + q = norm_text(term) + if not q: + return {"mode": "not_found"} -def resolve_device(user_text: str) -> Dict[str, Any]: - q = norm_text(user_text) exact = df_eos.index[df_eos["_norm_sku"] == q].tolist() if len(exact) == 1: return {"mode":"ok","row_idx": int(exact[0])} - if len(exact) > 1: - opts = [{"row_idx": int(i), "label": EOS_LABELS[int(i)]} for i in exact[:2]] - return {"mode":"pick","options": opts} - cands = local_candidates(user_text, top_k=6) + hits = process.extract(q, EOS_CORPUS, scorer=fuzz.WRatio, limit=6) + cands = [(int(idx), int(score), EOS_LABELS[int(idx)]) for _, score, idx in hits] + if not cands: return {"mode":"not_found"} - if cands[0][1] >= 95 and (len(cands) == 1 or (cands[0][1] - cands[1][1]) >= 8): + if cands[0][1] >= MATCH_AUTOPICK and (len(cands) == 1 or (cands[0][1] - cands[1][1]) >= MATCH_GAP): return {"mode":"ok","row_idx": cands[0][0]} - g = gpt_choose_device(user_text, cands) - if g.get("mode") == "ok" and isinstance(g.get("row_idx"), int): - return {"mode":"ok","row_idx": int(g["row_idx"])} - - if g.get("mode") == "pick": - opts = g.get("options", []) or [] - opts2 = [{"row_idx": int(o["row_idx"]), "label": str(o["label"])} for o in opts[:2] if "row_idx" in o] - if opts2: - return {"mode":"pick","options": opts2} - + opts = [{"row_idx": cands[0][0], "label": cands[0][2]}] if len(cands) > 1: - return {"mode":"pick","options":[{"row_idx":cands[0][0],"label":cands[0][2]},{"row_idx":cands[1][0],"label":cands[1][2]}]} - return {"mode":"pick","options":[{"row_idx":cands[0][0],"label":cands[0][2]}]} + opts.append({"row_idx": cands[1][0], "label": cands[1][2]}) + return {"mode":"pick","options": opts} +# ---------------------------- +# Parsec RAG (FAISS) +# ---------------------------- +t0 = time.perf_counter() +embedder = SentenceTransformer(EMBED_MODEL_NAME) -# ============================ -# Replacements — lifecycle CSV source of truth -# ============================ -def extract_model_token(text: str) -> str: - s = safe_str(text) - if not s: - return "" - parts = [p.strip() for p in s.split("|") if p.strip()] - candidates = parts[::-1] if parts else [s] - for cand in candidates: - m = re.search(r"\bRUT[A-Z]?\d{2,4}\b", cand.upper()) - if m: - return m.group(0).upper() - m = re.search(r"\bIX\d{2}\b", cand, flags=re.IGNORECASE) - if m: - return m.group(0).upper() - m = re.search(r"\b(R\d{3,4}|E\d{3,4}|S\d{3,4})\b", cand, flags=re.IGNORECASE) - if m: - return m.group(0).upper() - m = re.search(r"\b[A-Z]{1,6}\d{2,4}[A-Z]?\b", cand.upper()) - if m: - return m.group(0).upper() - return candidates[0][:60] - -def device_is_4g(row: pd.Series) -> bool: - # Detect LTE/4G even when the description uses "Cat 4 / Cat6 / Cat 12" without saying "LTE" - t = norm_text(row.get("description","")) + " " + norm_text(row.get("notes","")) + " " + norm_text(row.get("sku","")) - - # If it explicitly says 5G/NR, treat as not 4G-only - if ("5g" in t) or ("nr" in t): - return False - - # Classic signals - if ("lte" in t) or ("4g" in t): - return True - - # LTE category signals (Cat 1..20 are LTE categories; Cat M1/M2 are LTE-M) - if re.search(r"\bcat\s*[-]?\s*(m1|m2)\b", t): - return True - - m = re.search(r"\bcat\s*[-]?\s*(\d{1,2})\b", t) - if m: - try: - cat = int(m.group(1)) - if 0 < cat <= 20: - return True - except Exception: - pass - - # If "cat" appears at all, it's almost always LTE-family - if "cat" in t: - return True - - return False - - # If it explicitly says 5G/NR, treat as not 4G-only - if ("5g" in t) or ("nr" in t): - return False - - # Classic signals - if ("lte" in t) or ("4g" in t): - return True - - # LTE category signals (Cat 1..20 are LTE categories; Cat M1/M2 are LTE-M) - if re.search(r"\bcat\s*[-]?\s*(m1|m2)\b", t): - return True - - m = re.search(r"\bcat\s*[-]?\s*(\d{1,2})\b", t) - if m: - try: - cat = int(m.group(1)) - if 0 < cat <= 20: - return True - except Exception: - pass - - # If "cat" appears at all, it's almost always LTE-family - if "cat" in t: - return True - - return False - +def extract_pdf_text_pages(path: str) -> List[str]: + doc = fitz.open(path) + return [doc[i].get_text("text") for i in range(len(doc))] -def candidate_5g_models_from_lifecycle(manufacturer: str) -> List[str]: - mfr = norm_text(manufacturer) - pool = df_eos[df_eos["manufacturer"].astype(str).str.lower().eq(mfr)].copy() if "manufacturer" in df_eos.columns else df_eos.copy() - vals = pool["advanced_5g_option"].tolist() if "advanced_5g_option" in pool.columns else [] +def build_parsec_cards(pages: List[str]) -> List[str]: + cards = [] + for p in pages: + for m in re.finditer(r"Standard\s+SKU:", p): + start = max(0, m.start() - PARSEC_CONTEXT_BEFORE) + end = min(len(p), m.start() + PARSEC_CONTEXT_AFTER) + c = p[start:end].strip() + if len(c) >= 200: + cards.append(c) out, seen = [], set() - for v in vals: - tok = extract_model_token(v) - if tok and tok.lower() != "nan" and tok not in seen: - seen.add(tok); out.append(tok) + for c in cards: + h = hashlib.sha1(c.encode("utf-8")).hexdigest() + if h not in seen: + seen.add(h); out.append(c) return out -def candidate_4g_models_from_lifecycle(manufacturer: str) -> List[str]: - mfr = norm_text(manufacturer) - pool = df_eos[df_eos["manufacturer"].astype(str).str.lower().eq(mfr)].copy() if "manufacturer" in df_eos.columns else df_eos.copy() - vals = pool["suggested_replacement"].tolist() if "suggested_replacement" in pool.columns else [] - out, seen = [], set() - for v in vals: - tok = extract_model_token(v) - if tok and tok.lower() != "nan" and tok not in seen: - seen.add(tok); out.append(tok) - return out +parsec_cards = build_parsec_cards(extract_pdf_text_pages(PARSEC_PDF)) +parsec_emb = embedder.encode(parsec_cards, batch_size=64, show_progress_bar=False, normalize_embeddings=True) +parsec_emb = np.asarray(parsec_emb, dtype=np.float32) +parsec_index = faiss.IndexFlatIP(parsec_emb.shape[1]) +parsec_index.add(parsec_emb) +_tlog("parsec index", t0) -def gpt_pick_from_candidates(old_row: pd.Series, candidates: List[str], need: str) -> str: - if client is None or not candidates: - return "" - sys = "Pick the best replacement model. Choose only from candidates. Return strict JSON only." - payload = { - "old_device": { - "sku": str(old_row.get("sku","")), - "manufacturer": str(old_row.get("manufacturer","")), - "description": str(old_row.get("description","")), - "need": need, - }, - "candidates": candidates[:40], - "output_schema": {"choice":"string"} - } - out = gpt_json(sys, payload, max_tokens=240) or {} - choice = str(out.get("choice","") or "").strip() - return choice if choice in candidates else "" - -def fallback_5g_from_dec(canon_make: str) -> str: - pool5 = df_dec[(df_dec["_canon_make"] == canon_make) & (df_dec["_is5g"] == True)] - return str(pool5.iloc[0]["Model"]).strip() if not pool5.empty else "" - -def pick_replacements_lifecycle(row: pd.Series, status: str, use_gpt: bool = True) -> Dict[str, Any]: - canon = str(row.get("_canon_make","UNKNOWN")) - manufacturer = str(row.get("manufacturer","") or "") - - sug_raw = safe_str(row.get("suggested_replacement","")) - adv_raw = safe_str(row.get("advanced_5g_option","")) - - has_4g_alt = bool(sug_raw.strip()) - has_5g_alt = bool(adv_raw.strip()) - - # Treat as 4G if the description indicates LTE OR lifecycle provides a 4G suggested replacement - is_4g = device_is_4g(row) or has_4g_alt - - # Provide 5G option if the unit is 4G, EOS/EOL, or lifecycle explicitly provides advanced_5g_option - want_5g = is_4g or (status in {"End of Sale","End of Life"}) or has_5g_alt - - # 4G alternative: show whenever lifecycle provides it (or device appears 4G) - repl_4g = "Not applicable" - if is_4g or has_4g_alt: - repl_4g = extract_model_token(sug_raw) - if not repl_4g: - cand4 = candidate_4g_models_from_lifecycle(manufacturer) - repl_4g = (gpt_pick_from_candidates(row, cand4, "4G alternative") if (use_gpt and client) else "") or (cand4[0] if cand4 else "") - if not repl_4g: - repl_4g = "Not applicable" - - # 5G replacement: prefer lifecycle advanced_5g_option whenever present - repl_5g = "Not listed" - if want_5g: - repl_5g = extract_model_token(adv_raw) - if not repl_5g: - cand5 = candidate_5g_models_from_lifecycle(manufacturer) - repl_5g = (gpt_pick_from_candidates(row, cand5, "5G replacement/upgrade") if (use_gpt and client) else "") or (cand5[0] if cand5 else "") - if not repl_5g: - repl_5g = fallback_5g_from_dec(canon) or "Not listed" - - if repl_5g.lower() == "nan": - repl_5g = "Not listed" - - return {"repl_4g": repl_4g, "repl_5g": repl_5g, "sources": ["lifecycle_csv"] + (["gpt"] if (use_gpt and client) else [])} - - -# ============================ -# Antennas (Parsec-only) -# ============================ PARSEC_FAMILY_WORDS = {"chinook","labrador","boxer","bloodhound","husky","beagle","mastiff","collie","shepherd","belgian","australian","terrier","pyrenees"} -BAD_NAME_MARKERS = {"customization","standard connectors","connectors","features","benefits","specifications","mechanical","electrical","mounting","accessories","description:","standard sku"} - -def clean_line(s: str) -> str: - s = re.sub(r"\s+", " ", str(s or "").strip()) - if re.fullmatch(r"-[a-z0-9]+", s.lower()): - return "" - return s - -def is_bad_name_line(line: str) -> bool: - low = line.lower() - if any(m in low for m in BAD_NAME_MARKERS): - return True - if re.search(r"\b-[a-z0-9]{1,4}\b", low) and len(low) <= 25: - return True - return False -def family_from_line(line: str) -> str: - low = line.lower() +def _parsec_name_from_card(card_text: str) -> str: + low = card_text.lower() for fam in PARSEC_FAMILY_WORDS: if fam in low: return fam.capitalize() - return "" - -def parsec_connectors_from_card(t: str) -> str: - m = re.search(r"Standard\s+Connectors:\s*(.+)", t, flags=re.IGNORECASE) - if m: - return re.sub(r"\s+", " ", m.group(1).strip())[:80] - return "" - -def parsec_mounts_from_card(t: str) -> List[str]: - mounts = [] - for m in re.finditer(r"Mount:\s*(.+)", t, flags=re.IGNORECASE): - val = re.sub(r"\s+", " ", m.group(1).strip()) - parts = [p.strip().lower() for p in val.split(",") if p.strip()] - mounts.extend(parts) - out = [] - seen = set() - for x in mounts: - if x not in seen: - seen.add(x); out.append(x) - return out - -def parsec_name_from_card(card_text: str) -> str: - lines = [clean_line(ln) for ln in str(card_text or "").splitlines()] - lines = [ln for ln in lines if ln] - - for ln in lines: - if is_bad_name_line(ln): - continue - fam = family_from_line(ln) - if fam: - return fam - - sku_i = None - for i, ln in enumerate(lines): - if "standard sku" in ln.lower(): - sku_i = i - break - if sku_i is not None: - window = lines[max(0, sku_i - 12):sku_i] - for ln in reversed(window): - if is_bad_name_line(ln): - continue - if 3 <= len(ln) <= 40 and re.search(r"[A-Za-z]", ln): - return ln.split()[0].capitalize() - return "Parsec antenna" -def parsec_part_from_card(t: str) -> str: +def _parsec_part_from_card(t: str) -> str: m = re.search(r"Standard\s+SKU:\s*([A-Z0-9]+)", t) return m.group(1).strip() if m else "" -def parsec_desc_from_card(t: str) -> str: +def _parsec_desc_from_card(t: str) -> str: m = re.search(r"Description:\s*(.+?)(?:\n|$)", t, flags=re.IGNORECASE) return re.sub(r"\s+"," ",m.group(1).strip())[:220] if m else "" -def parsec_retrieve(query: str, top_k: int = 12) -> List[Dict[str, Any]]: +def _parsec_connectors_from_card(t: str) -> str: + m = re.search(r"Standard\s+Connectors:\s*(.+)", t, flags=re.IGNORECASE) + return re.sub(r"\s+"," ",m.group(1).strip())[:80] if m else "" + +def parsec_retrieve(query: str, top_k: int = 8) -> List[Dict[str, Any]]: qv = embedder.encode([query], normalize_embeddings=True) qv = np.asarray(qv, dtype=np.float32) scores, ids = parsec_index.search(qv, top_k) - out: List[Dict[str, Any]] = [] + out = [] for sc, i in zip(scores[0].tolist(), ids[0].tolist()): if 0 <= int(i) < len(parsec_cards): card = parsec_cards[int(i)] out.append({ "score": float(sc), - "name": parsec_name_from_card(card), - "part_number": parsec_part_from_card(card), - "description": parsec_desc_from_card(card), - "connectors": parsec_connectors_from_card(card), - "mounts": parsec_mounts_from_card(card), - "_card": card.lower(), + "name": _parsec_name_from_card(card), + "part_number": _parsec_part_from_card(card), + "description": _parsec_desc_from_card(card), + "connectors": _parsec_connectors_from_card(card), }) return out -def choose_best_parsec(cands: List[Dict[str, Any]], mode: str) -> Dict[str, Any]: - best = None - best_score = -1e9 - - for c in cands: - card = c.get("_card","") - mounts = c.get("mounts", []) or [] - score = float(c.get("score", 0.0)) - - if "omni" in card: - score += 0.6 - if "directional" in card: - score -= 1.5 - - if mode == "vehicle": - if any("magnetic" in m for m in mounts): - score += 3.0 - if any("through" in m for m in mounts): - score += 2.0 - if any("wall" in m for m in mounts) or any("pole" in m for m in mounts): - score -= 1.2 - if "app: fixed" in card and "mobile" not in card: - score -= 2.0 - - if mode == "stationary": - if any("wall" in m for m in mounts): - score += 2.0 - if any("pole" in m for m in mounts): - score += 1.8 - - if score > best_score: - best_score = score - best = c - - if not best: - return {"name":"Parsec antenna","part_number":"","description":"","connectors":"","mounts":[]} - - best = dict(best) - best.pop("_card", None) - return best - - -def infer_mimo_for_5g(repl_5g_model: str) -> str: - """Rule: every 5G router uses a 4x4 antenna.""" - return "4x4" - - # If the model name hints 5G, lean 4x4 - if "5g" in model.lower() or model.upper().startswith(("R", "E", "S", "IX", "RUTM")): - default = "4x4" - else: - default = "2x2" - - # Use dec2025routers.csv if we can match the model under the same maker family - try: - pool = df_dec[df_dec["_canon_make"] == canon_make].copy() - if pool.empty: - return default - hit = process.extractOne(norm_text(model), pool["_norm_model"].tolist(), scorer=fuzz.WRatio) - if not hit or hit[1] < MATCH_OK: - return default - row = pool.iloc[int(hit[2])] - txt2 = (str(row.get("Antennas (internal/external/both)", "")) + " " + str(row.get("Modem Type", "")) + " " + str(row.get("Special notes",""))).lower() - if "4x4" in txt2 or "4 x 4" in txt2 or "4x 4" in txt2: - return "4x4" - if "2x2" in txt2 or "2 x 2" in txt2: - return "2x2" - # If modem type includes 5G, lean 4x4 - if "5g" in txt2 or "nr" in txt2: - return "4x4" - return default - except Exception: - return default - -def antenna_options_for(router_model: str, tech: str, mimo: str) -> Dict[str, Any]: - q_stationary = f"{router_model} {tech} {mimo} omni stationary pole wall fixed site Parsec" - q_vehicle = f"{router_model} {tech} {mimo} omni vehicle mobile magnetic through-bolt Parsec" - - cand_stationary = parsec_retrieve(q_stationary, top_k=12) - cand_vehicle = parsec_retrieve(q_vehicle, top_k=12) - - s = choose_best_parsec(cand_stationary, mode="stationary") - v = choose_best_parsec(cand_vehicle, mode="vehicle") - - s.update({"mimo": mimo, "why": "Stationary omni best match."}) - v.update({"mimo": mimo, "why": "Vehicle omni best match."}) - - return {"stationary_omni": s, "vehicle_omni": v, "sources":["parsec_rag"]} - - -# ============================ -# Install-ready checklist -# ============================ -def install_ready_checklist(current_sku: str, repl: Dict[str,Any], ant: Dict[str,Any]) -> str: - st = ant.get("stationary_omni", {}) - vh = ant.get("vehicle_omni", {}) - if client is not None: - sys = "Create a short, install-ready checklist for a Verizon rep. Return markdown only." - payload = {"current_device": current_sku, "replacements": repl, "antennas": {"stationary": st, "vehicle": vh}} - resp = client.responses.create( - model=OPENAI_MODEL, - reasoning=OPENAI_REASONING, - input=[{"role":"system","content":sys},{"role":"user","content":json.dumps(payload)}], - max_output_tokens=520, - ) - return (getattr(resp, "output_text", "") or "").strip() - return "\n".join([ - "### Install-ready checklist", - f"- Current device: {current_sku}", - f"- 5G replacement: {repl.get('repl_5g','')}", - f"- 4G alternative: {repl.get('repl_4g','Not applicable')}", - f"- Stationary omni antenna: {st.get('name','')} (PN {st.get('part_number','')})", - f"- Vehicle omni antenna: {vh.get('name','')} (PN {vh.get('part_number','')})", - "- Next steps: confirm mounting + cable lengths + power; place order; schedule install.", - ]) - - -# ============================ -# Batch mode (NO GPT) -# ============================ -def parse_batch_inputs(text_blob: str, file_obj: Any) -> List[str]: - items: List[str] = [] - if file_obj is not None: - try: - path = file_obj.name if hasattr(file_obj, "name") else str(file_obj) - df = pd.read_csv(path) - col = df.columns[0] - items.extend([str(x).strip() for x in df[col].tolist() if str(x).strip()]) - except Exception: - pass - if text_blob: - for ln in str(text_blob).splitlines(): - ln = ln.strip() - if ln: - items.append(ln) - seen=set() - out=[] - for x in items: - k=norm_text(x) - if k and k not in seen: - seen.add(k); out.append(x) - return out - -def run_batch(text_blob: str, file_obj: Any, include_antennas: bool): - inputs = parse_batch_inputs(text_blob, file_obj) - if not inputs: - return "", None, None, "" - - rows=[] - for item in inputs: - res = resolve_device(item) - if res.get("mode") != "ok": - rows.append({"Input": item, "Matched":"", "Status":"Needs review", "EOS":"", "EOL":"", "4G alternative":"", "5G replacement":"", "Notes":"Not found/ambiguous"}) - continue - - life_row = df_eos.iloc[int(res["row_idx"])] - eos, eol, status = row_to_dates_and_status(life_row) - repl = pick_replacements_lifecycle(life_row, status, use_gpt=False) - - rows.append({ - "Input": item, - "Matched": str(life_row.get("sku","")), - "Status": status, - "EOS": eos, - "EOL": eol, - "4G alternative": repl.get("repl_4g",""), - "5G replacement": repl.get("repl_5g",""), - "Notes": "", - }) - - out_df = pd.DataFrame(rows) - counts = out_df["Status"].value_counts(dropna=False).to_dict() - top_5g = out_df["5G replacement"].value_counts(dropna=False).head(5).to_dict() - summary = f"Rows: {len(out_df)} | " + " | ".join([f"{k}: {v}" for k,v in counts.items()]) - rollup = "Top 5G recommendations:\n" + "\n".join([f"- {k}: {v}" for k,v in top_5g.items() if str(k).strip()]) - - tmp = tempfile.NamedTemporaryFile(delete=False, suffix=".csv") - out_df.to_csv(tmp.name, index=False) - - return summary, out_df, tmp.name, rollup - - -# ============================ -# Replacement feature table + manufacturer link (5G device) -# ============================ - -FEATURE_COLS = ["Device", "Modem technology", "WiFi", "Ports", "Antennas", "Ruggedness", "Use case"] - -# Manufacturer domains used for best-effort link resolution (no non-maker domains). -MAKER_DOMAINS = { - "CRADLEPOINT": ["cradlepoint.com", "ericsson.com"], - "SIERRA": ["semtech.com", "airlink.com"], - "FEENEY": ["inseego.com"], - "DIGI": ["digi.com"], - "CISCO_MERAKI": ["meraki.cisco.com", "cisco.com"], - "CISCO": ["cisco.com"], - "TELTONIKA": ["teltonika-networks.com"], - "UNKNOWN": [], -} +def antenna_pick(repl5: str, mode: str, detail: Optional[str]) -> Dict[str, Any]: + mimo = "4x4" # rule: all 5G -> 4x4 + tech = "5G" + if mode == "vehicle": + q = f"{repl5} {tech} {mimo} omni vehicle mobile magnetic through-bolt" + c = parsec_retrieve(q, top_k=8) + best = c[0] if c else {"name":"Parsec antenna","part_number":"","description":"","connectors":""} + best.update({"mimo": mimo, "why": "Vehicle omni best match."}) + return best -HTTP_HEADERS = { - "User-Agent": "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 " - "(KHTML, like Gecko) Chrome/120.0 Safari/537.36" -} -HTTP_TIMEOUT = 12 + if detail == "directional": + q = f"{repl5} {tech} {mimo} directional fixed site" + c = parsec_retrieve(q, top_k=8) + best = c[0] if c else {"name":"Parsec antenna","part_number":"","description":"","connectors":""} + best.update({"mimo": mimo, "why": "Stationary directional best match."}) + return best -def _best_effort_manufacturer_url(model: str, canon_make: str) -> str: - """Try to find a manufacturer page or datasheet link using simple on-domain searches. - If we can't confirm a page, return the manufacturer homepage for the maker family. - """ - model = str(model or "").strip() - if not model or model in {"Not listed", "Not applicable"}: - return "" + if detail == "indoor": + q = f"{repl5} {tech} {mimo} omni indoor" + c = parsec_retrieve(q, top_k=8) + best = c[0] if c else {"name":"Parsec antenna","part_number":"","description":"","connectors":""} + best.update({"mimo": mimo, "why": "Stationary indoor omni best match."}) + return best + + q = f"{repl5} {tech} {mimo} omni outdoor pole wall fixed site" + c = parsec_retrieve(q, top_k=8) + best = c[0] if c else {"name":"Parsec antenna","part_number":"","description":"","connectors":""} + best.update({"mimo": mimo, "why": "Stationary outdoor omni best match."}) + return best - domains = MAKER_DOMAINS.get(canon_make, []) or [] - if not domains: +# ---------------------------- +# Replacement selection (lifecycle-first) +# ---------------------------- +def extract_model_token(text: str) -> str: + s = safe_str(text) + if not s: return "" + parts = [p.strip() for p in s.split("|") if p.strip()] + candidates = parts[::-1] if parts else [s] + for cand in candidates: + u = cand.upper() + m = re.search(r"\bRUT[A-Z]?\d{2,4}\b", u) + if m: + return m.group(0) + m = re.search(r"\bRUTM\d{2,3}\b", u) + if m: + return m.group(0) + m = re.search(r"\bIX\d{2}\b", u) + if m: + return m.group(0) + m = re.search(r"\b(R\d{3,4}|E\d{3,4}|S\d{3,4})\b", u) + if m: + return m.group(0) + m = re.search(r"\b[A-Z]{1,6}\d{2,4}[A-Z]?\b", u) + if m: + return m.group(0) + return candidates[0][:60] - # Candidate on-domain search URLs (common patterns across sites). - # We keep these on the manufacturer domain (no Google/Bing). - q = re.sub(r"\s+", "+", model) - url_candidates = [] - for d in domains: - url_candidates += [ - f"https://{d}/search?q={q}", - f"https://{d}/search?query={q}", - f"https://{d}/?s={q}", - f"https://www.{d}/search?q={q}", - f"https://www.{d}/search?query={q}", - f"https://www.{d}/?s={q}", - ] - - # Also try a few direct product patterns for known makers (best effort). - if canon_make == "TELTONIKA": - slug = model.lower() - url_candidates += [ - f"https://teltonika-networks.com/products/routers/{slug}", - f"https://teltonika-networks.com/product/{slug}", - "https://teltonika-networks.com/products/routers/", - ] - if canon_make == "DIGI": - url_candidates += [ - "https://www.digi.com/products/networking/cellular-routers", - f"https://www.digi.com/search?q={q}", - ] - if canon_make == "CRADLEPOINT": - url_candidates += [ - "https://cradlepoint.com/products/", - f"https://cradlepoint.com/?s={q}", - ] - if canon_make in {"CISCO", "CISCO_MERAKI"}: - url_candidates += [ - f"https://www.cisco.com/c/en/us/search.html?q={q}", - ] +def pick_replacements(row: pd.Series, status: str) -> Dict[str, str]: + sug = safe_str(row.get("suggested_replacement", "")) + adv = safe_str(row.get("advanced_5g_option", "")) - # Try to confirm a working page (HTTP 200 and model string somewhere in HTML). - for u in url_candidates[:18]: - try: - import requests - r = requests.get(u, headers=HTTP_HEADERS, timeout=HTTP_TIMEOUT, allow_redirects=True) - if r.status_code != 200: - continue - html = (r.text or "").lower() - if model.lower() in html or "datasheet" in html or "data sheet" in html: - return r.url - except Exception: - continue + repl_4g = extract_model_token(sug) if sug else "Not applicable" + repl_5g = extract_model_token(adv) if adv else "Not listed" - # Fallback: maker homepage - d0 = domains[0] - return f"https://{d0}" + # Always provide some 5G answer: if lifecycle missing, pick top 5G from dec (same maker) + if repl_5g in {"", "Not listed"}: + canon_make = str(row.get("_canon_make","UNKNOWN")) + pool = df_dec[(df_dec["_canon_make"] == canon_make) & (df_dec["_is5g"] == True)].copy() + repl_5g = str(pool.iloc[0]["Model"]).strip() if not pool.empty else "Not listed" -def _fetch_page_text(url: str, max_chars: int = 12000) -> str: - """Fetch page HTML and return a simplified text blob for GPT (best effort).""" - if not url: - return "" - try: - import requests - r = requests.get(url, headers=HTTP_HEADERS, timeout=HTTP_TIMEOUT, allow_redirects=True) - if r.status_code != 200: - return "" - html = r.text or "" - html = re.sub(r"(?is).*?", " ", html) - html = re.sub(r"(?is).*?", " ", html) - text = re.sub(r"(?is)<[^>]+>", " ", html) - text = re.sub(r"\s+", " ", text).strip() - return text[:max_chars] - except Exception: - return "" + return {"repl_4g": repl_4g or "Not applicable", "repl_5g": repl_5g or "Not listed"} +# ---------------------------- +# Features + Fit (dec first, single LLM enrichment call if needed) +# ---------------------------- +FEATURE_COLS = ["Device", "Modem technology", "WiFi", "Ports", "Antennas", "Ruggedness", "Use case"] +FIT_COLS = ["Device", "Fit badges", "Ethernet ports", "Battery"] def _features_from_dec(model: str, canon_make: str) -> Dict[str, str]: - """Lookup a router model in dec2025routers.csv and return the key feature fields.""" if not model or model in {"Not listed", "Not applicable"}: return {k: "Not listed" for k in FEATURE_COLS[1:]} - pool = df_dec[df_dec["_canon_make"] == canon_make].copy() if pool.empty: return {k: "Not listed" for k in FEATURE_COLS[1:]} - hit = process.extractOne(norm_text(model), pool["_norm_model"].tolist(), scorer=fuzz.WRatio) if not hit or hit[1] < MATCH_OK: return {k: "Not listed" for k in FEATURE_COLS[1:]} - r = pool.iloc[int(hit[2])] - ports = f"WAN: {r.get('WAN ports and speed','')} | LAN: {r.get('LAN ports and speed','')}" + ports = f"WAN: {r.get('WAN ports and speed','')} | LAN: {r.get('LAN ports and speed','')}".strip() return { - "Modem technology": str(r.get("Modem Type","")) or "Not listed", - "WiFi": str(r.get("WiFi type","")) or "Not listed", - "Ports": ports.strip() if ports.strip() else "Not listed", - "Antennas": str(r.get("Antennas (internal/external/both)","")) or "Not listed", - "Ruggedness": str(r.get("Ruggedization","")) or "Not listed", - "Use case": str(r.get("Primary use case","")) or "Not listed", + "Modem technology": str(r.get("Modem Type","") or "Not listed"), + "WiFi": str(r.get("WiFi type","") or "Not listed"), + "Ports": ports if ports else "Not listed", + "Antennas": str(r.get("Antennas (internal/external/both)","") or "Not listed"), + "Ruggedness": str(r.get("Ruggedization","") or "Not listed"), + "Use case": str(r.get("Primary use case","") or "Not listed"), } -def _gpt_fill_feature_row(device_label: str, model: str, canon_make: str, row: Dict[str, str], manufacturer_url: str = "", page_text: str = "") -> Dict[str, str]: - """If dec can't supply values, ask GPT to fill missing ones (best guess).""" - if client is None: - return row +def _fit_from_dec(model: str, canon_make: str, is5: bool) -> Dict[str, str]: + badges = [] + eth = "Not listed" + bat = "Not listed" + if is5: + badges.append("4x4 MIMO") - missing = [k for k,v in row.items() if (not v) or str(v).strip().lower() in {"not listed","nan",""}] - if not missing: - return row + pool = df_dec[df_dec["_canon_make"] == canon_make].copy() + if pool.empty or not model or model in {"Not listed", "Not applicable"}: + return {"Fit badges": ", ".join(badges) if badges else "Not listed", "Ethernet ports": eth, "Battery": bat} - sys = ( - "Fill missing router feature fields for a Verizon rep. Return strict JSON only. " - "Use manufacturer page text when available. If still unknown, make a best-guess." - ) - payload = { - "device_label": device_label, - "model": model, - "maker_family": canon_make, - "manufacturer_url": manufacturer_url, - "manufacturer_page_text": page_text[:8000], - "known": row, - "fill_only": missing, - "rules": ["Fill only requested fields.", "Short phrases only.", "Return JSON only."], - "output_schema": {k: "string" for k in missing}, - } - out = gpt_json(sys, payload, max_tokens=320) or {} - for k in missing: - val = str(out.get(k, "") or "").strip() - if val: - row[k] = val - return row - missing = [k for k,v in row.items() if (not v) or str(v).strip().lower() in {"not listed","nan",""}] - if not missing: - return row - - sys = "Fill missing router feature fields for a Verizon rep. Return strict JSON only." - payload = { - "device_label": device_label, - "model": model, - "maker_family": canon_make, - "known": row, - "fill_only": missing, - "rules": [ - "Fill only the requested fields.", - "Best guess if needed. Short phrases only.", - "Return JSON only." - ], - "output_schema": {k: "string" for k in missing} - } - out = gpt_json(sys, payload, max_tokens=260) or {} - for k in missing: - val = str(out.get(k, "") or "").strip() - if val: - row[k] = val - return row - -def build_replacement_features_table(repl_4g: str, repl_5g: str, canon_make: str) -> pd.DataFrame: - rows = [] - - # 4G alternative row - row4 = _features_from_dec(repl_4g, canon_make) - url4 = _best_effort_manufacturer_url(repl_4g, canon_make) if repl_4g else "" - txt4 = _fetch_page_text(url4) if url4 else "" - row4 = _gpt_fill_feature_row("4G alternative", repl_4g, canon_make, row4, manufacturer_url=url4, page_text=txt4) - rows.append({"Device": "4G alternative", **row4}) - - # 5G replacement row - row5 = _features_from_dec(repl_5g, canon_make) - url5 = _best_effort_manufacturer_url(repl_5g, canon_make) if repl_5g else "" - txt5 = _fetch_page_text(url5) if url5 else "" - row5 = _gpt_fill_feature_row("5G replacement", repl_5g, canon_make, row5, manufacturer_url=url5, page_text=txt5) - rows.append({"Device": "5G replacement", **row5}) - - df = pd.DataFrame(rows, columns=FEATURE_COLS) - return df -# ============================ -# Verizon fit badges (small table) for recommended devices -# ============================ + hit = process.extractOne(norm_text(model), pool["_norm_model"].tolist(), scorer=fuzz.WRatio) + if not hit or hit[1] < MATCH_OK: + return {"Fit badges": ", ".join(badges) if badges else "Not listed", "Ethernet ports": eth, "Battery": bat} -FIT_COLS = ["Device", "Fit badges", "Ethernet ports", "Battery"] + r = pool.iloc[int(hit[2])] + use_case = str(r.get("Primary use case","") or "").lower() + rugged = str(r.get("Ruggedization","") or "").lower() + wifi = str(r.get("WiFi type","") or "").strip().lower() + serial = str(r.get("Serial port (yes/no)","") or "").strip().lower() + battery = str(r.get("Battery (internal/removable/none/optional)","") or "").strip().lower() + notes_blob = " ".join([str(r.get("Special notes","") or ""), str(r.get("summary and use case","") or "")]).lower() + + if any(k in use_case for k in ["vehicle","mobile","fleet","in-vehicle"]) or "vehicle" in rugged: + badges.append("Vehicle") + else: + badges.append("Fixed site") + + if wifi and wifi not in {"none","no","n/a"}: + badges.append("Wi‑Fi") + if any(k in rugged for k in ["rugged","industrial","ip","harsh"]): + badges.append("Rugged") + if "dual" in notes_blob and "sim" in notes_blob: + badges.append("Dual‑SIM") + if serial in {"yes","y","true"}: + badges.append("Serial") + + if battery: + if "none" in battery: + bat = "No" + else: + bat = "Yes" -def _parse_ethernet_ports(wan_field: str, lan_field: str) -> str: - """Best-effort total ethernet ports based on WAN/LAN text.""" - def _count(field: str) -> int: - s = str(field or "") - # Common forms: "1x GbE", "2 x 10/100", "WAN: 1", etc. - nums = [int(x) for x in re.findall(r"(\\d+)\\s*x", s.lower())] - if nums: - return sum(nums) - # Fallback: if it contains 'port' with a number - m = re.search(r"(\\d+)\\s*port", s.lower()) - if m: - return int(m.group(1)) - # If it contains '1' and 'wan' in short text, guess 1 - if "wan" in s.lower() and re.search(r"\\b1\\b", s): - return 1 - return 0 - - total = _count(wan_field) + _count(lan_field) - return str(total) if total > 0 else "Not listed" - -def _battery_badge(battery_field: str) -> str: - s = str(battery_field or "").strip().lower() - if not s or s in {"none", "no", "n/a", "not listed"}: - return "No" - return "Yes" - -def _bool_badge(flag: bool) -> str: - return "Yes" if flag else "No" - -def _dual_sim_from_row_text(*fields: str) -> bool: - txt = " ".join([str(x or "") for x in fields]).lower() - return ("dual sim" in txt) or ("2 sim" in txt) or ("two sim" in txt) or ("dual-sim" in txt) - -def _throughput_high(throughput_field: str) -> bool: - t = str(throughput_field or "").lower() - # Heuristic: anything mentioning gbps or >=1000 mbps - if "gbps" in t: - return True - m = re.search(r"(\\d+(?:\\.\\d+)?)\\s*mbps", t) - if m: - try: - return float(m.group(1)) >= 1000.0 - except Exception: - pass - return False + badges_csv = ", ".join(dict.fromkeys(badges)) if badges else "Not listed" + return {"Fit badges": badges_csv, "Ethernet ports": eth, "Battery": bat} -def _gpt_fit_badges(model: str, canon_make: str, is_5g: bool, dec_row: Optional[pd.Series]) -> Tuple[str, str, str]: - """ - GPT-based fill for Fit badges / Ethernet ports / Battery, used when dec is missing or incomplete. - Returns (badges_csv, ethernet_ports, battery_yesno). - """ +# Enrichment cache (one call per (make, repl4, repl5)) +_ENRICH_CACHE: Dict[str, Dict[str, Any]] = {} + +def _enrich_key(canon_make: str, repl4: str, repl5: str) -> str: + return hashlib.sha1(f"{canon_make}|{repl4}|{repl5}".encode("utf-8")).hexdigest() + +def gpt_enrich(repl4: str, repl5: str, canon_make: str, feat4: Dict[str,str], feat5: Dict[str,str], fit4: Dict[str,str], fit5: Dict[str,str]) -> Dict[str, Any]: if client is None: - return ("Not listed", "Not listed", "Not listed") + return {"feat4": feat4, "feat5": feat5, "fit4": fit4, "fit5": fit5} - dec_ctx = {} - if dec_row is not None: - try: - dec_ctx = { - "Model": str(dec_row.get("Model","")), - "Modem Type": str(dec_row.get("Modem Type","")), - "Ruggedization": str(dec_row.get("Ruggedization","")), - "WAN ports and speed": str(dec_row.get("WAN ports and speed","")), - "LAN ports and speed": str(dec_row.get("LAN ports and speed","")), - "Antennas": str(dec_row.get("Antennas (internal/external/both)","")), - "WiFi type": str(dec_row.get("WiFi type","")), - "Primary use case": str(dec_row.get("Primary use case","")), - "Serial port": str(dec_row.get("Serial port (yes/no)","")), - "VPN": str(dec_row.get("VPN capabilities","")), - "Throughput": str(dec_row.get("Router throughput","")), - "Battery": str(dec_row.get("Battery (internal/removable/none/optional)","")), - "Special notes": str(dec_row.get("Special notes","")), - "Summary": str(dec_row.get("summary and use case","")), - } - except Exception: - dec_ctx = {} + key = _enrich_key(canon_make, repl4, repl5) + if key in _ENRICH_CACHE: + return _ENRICH_CACHE[key] + + def miss(d: Dict[str,str]) -> List[str]: + out=[] + for k,v in d.items(): + if (not v) or str(v).strip().lower() in {"not listed","nan",""}: + out.append(k) + return out + + m_feat4 = miss(feat4); m_feat5 = miss(feat5) + m_fit4 = miss(fit4); m_fit5 = miss(fit5) + + if not (m_feat4 or m_feat5 or m_fit4 or m_fit5): + pack = {"feat4": feat4, "feat5": feat5, "fit4": fit4, "fit5": fit5} + _ENRICH_CACHE[key] = pack + return pack sys = ( - "You are helping a Verizon rep. Based on the provided router context, output fit badges and a couple quick traits.\n" - "Return STRICT JSON only.\n" - "Badges must be chosen from this set only:\n" - "['Vehicle','Fixed site','Wi‑Fi','Rugged','Dual‑SIM','4x4 MIMO','High throughput','Serial'].\n" - "Rules:\n" - "- If is_5g is true, ALWAYS include '4x4 MIMO'.\n" - "- Ethernet ports: return a single integer as a string if you can infer total ethernet ports, otherwise 'Not listed'.\n" - "- Battery: return 'Yes' or 'No' if you can infer, otherwise 'Not listed'.\n" - "- If uncertain between Vehicle vs Fixed site, pick the most likely based on use case/ruggedization.\n" + "You are helping a Verizon rep. Fill missing router feature fields and fit traits. Return strict JSON only. " + "Keep values short. " + "Fit badges must be chosen from: ['Vehicle','Fixed site','Wi‑Fi','Rugged','Dual‑SIM','4x4 MIMO','High throughput','Serial'] only. " + "Rule: if a router is 5G, include '4x4 MIMO'. " + "Ethernet ports must be a single integer as a string when possible; else 'Not listed'. " + "Battery must be 'Yes', 'No', or 'Not listed'." ) payload = { - "model": model, "maker_family": canon_make, - "is_5g": bool(is_5g), - "dec_context": dec_ctx, + "models": {"repl4": repl4, "repl5": repl5}, + "known": {"feat4": feat4, "feat5": feat5, "fit4": fit4, "fit5": fit5}, + "missing": {"feat4": m_feat4, "feat5": m_feat5, "fit4": m_fit4, "fit5": m_fit5}, "output_schema": { - "badges": ["string"], - "ethernet_ports": "string", - "battery": "Yes|No|Not listed" - } + "feat4": {k: "string" for k in m_feat4}, + "feat5": {k: "string" for k in m_feat5}, + "fit4": {k: "string" for k in m_fit4}, + "fit5": {k: "string" for k in m_fit5}, + }, } - out = gpt_json(sys, payload, max_tokens=260) or {} - - badges = out.get("badges", []) or [] - allowed = {"Vehicle","Fixed site","Wi‑Fi","Rugged","Dual‑SIM","4x4 MIMO","High throughput","Serial"} - clean = [] - for b in badges: - bs = str(b).strip() - if bs in allowed: - clean.append(bs) - - if is_5g and "4x4 MIMO" not in clean: - clean.append("4x4 MIMO") - - eth = str(out.get("ethernet_ports","") or "").strip() - if not eth or eth.lower() in {"nan","none"}: - eth = "Not listed" - m = re.search(r"\d+", eth) - eth = m.group(0) if m else ("Not listed" if eth == "Not listed" else eth) - - bat = str(out.get("battery","") or "").strip() - if not bat: - bat = "Not listed" - if bat.lower().startswith("y"): - bat = "Yes" - elif bat.lower().startswith("n"): - bat = "No" - elif bat not in {"Yes","No","Not listed"}: - bat = "Not listed" - - dedup=[] - seen=set() - for b in clean: - if b not in seen: - seen.add(b); dedup.append(b) - badges_csv = ", ".join(dedup) if dedup else "Not listed" - return (badges_csv, eth, bat) - - -def _fit_badges_for_model(model: str, canon_make: str, is_5g: bool) -> Tuple[str, str, str]: - """Return (badges_csv, ethernet_ports, battery_yesno). Uses dec2025routers.csv first, then GPT fill.""" - model = str(model or "").strip() - if not model or model in {"Not listed", "Not applicable"}: - return ("Not listed", "Not listed", "Not listed") + t0 = time.perf_counter() + resp = client.responses.create( + model=OPENAI_MODEL, + input=[{"role":"system","content":sys},{"role":"user","content":_json_dump_safe(payload)}], + max_output_tokens=420, + ) + _tlog("llm enrich", t0) - pool = df_dec[df_dec["_canon_make"] == canon_make].copy() - row = None - if not pool.empty: - hit = process.extractOne(norm_text(model), pool["_norm_model"].tolist(), scorer=fuzz.WRatio) - if hit and hit[1] >= MATCH_OK: - row = pool.iloc[int(hit[2])] + out = _json_load_safe(getattr(resp, "output_text", "") or "") - badges = [] - eth = "Not listed" - bat_yes = "Not listed" + def merge(base: Dict[str,str], patch: Any) -> Dict[str,str]: + if isinstance(patch, dict): + for k,v in patch.items(): + sv = str(v or "").strip() + if sv: + base[k] = sv + return base - if row is not None: - use_case = str(row.get("Primary use case","") or "").lower() - rugged = str(row.get("Ruggedization","") or "").lower() + feat4x = merge(dict(feat4), out.get("feat4", {})) + feat5x = merge(dict(feat5), out.get("feat5", {})) + fit4x = merge(dict(fit4), out.get("fit4", {})) + fit5x = merge(dict(fit5), out.get("fit5", {})) - if any(k in use_case for k in ["vehicle","mobile","fleet","in-vehicle"]) or "vehicle" in rugged: - badges.append("Vehicle") - else: - badges.append("Fixed site") + # Enforce 5G 4x4 badge + b = str(fit5x.get("Fit badges","") or "") + if "4x4 MIMO" not in b: + fit5x["Fit badges"] = (b + ", 4x4 MIMO").strip(", ").strip() if b and b != "Not listed" else "4x4 MIMO" - wifi = str(row.get("WiFi type","") or "").strip() - if wifi and wifi.lower() not in {"none","no","n/a"}: - badges.append("Wi‑Fi") + pack = {"feat4": feat4x, "feat5": feat5x, "fit4": fit4x, "fit5": fit5x} + _ENRICH_CACHE[key] = pack + return pack - if any(k in rugged for k in ["rugged","industrial","ip","harsh"]): - badges.append("Rugged") +def build_tables(repl4: str, repl5: str, canon_make: str) -> Tuple[pd.DataFrame, pd.DataFrame]: + feat4 = _features_from_dec(repl4, canon_make) + feat5 = _features_from_dec(repl5, canon_make) + fit4 = _fit_from_dec(repl4, canon_make, is5=False) + fit5 = _fit_from_dec(repl5, canon_make, is5=True) - notes_blob = " ".join([ - str(row.get("Special notes","") or ""), - str(row.get("summary and use case","") or ""), - ]).lower() - if "dual" in notes_blob and "sim" in notes_blob: - badges.append("Dual‑SIM") + pack = gpt_enrich(repl4, repl5, canon_make, feat4, feat5, fit4, fit5) - if is_5g: - badges.append("4x4 MIMO") + feat_df = pd.DataFrame([ + {"Device":"4G alternative", **pack["feat4"]}, + {"Device":"5G replacement", **pack["feat5"]}, + ], columns=FEATURE_COLS) - thr = str(row.get("Router throughput","") or "").lower() - m = re.search(r"(\d+(\.\d+)?)\s*gb", thr) - if m: - try: - if float(m.group(1)) >= 1.0: - badges.append("High throughput") - except Exception: - pass - - serial = str(row.get("Serial port (yes/no)","") or "").strip().lower() - if serial in {"yes","y","true"}: - badges.append("Serial") - - wan = str(row.get("WAN ports and speed","") or "") - lan = str(row.get("LAN ports and speed","") or "") - m1 = re.search(r"(\d+)\s*x", wan.lower()) - m2 = re.search(r"(\d+)\s*x", lan.lower()) - if m1 or m2: - total = (int(m1.group(1)) if m1 else 0) + (int(m2.group(1)) if m2 else 0) - eth = str(total) if total > 0 else "Not listed" - - bat = str(row.get("Battery (internal/removable/none/optional)","") or "") - bat_l = bat.lower().strip() - if bat_l: - if "none" in bat_l: - bat_yes = "No" - else: - bat_yes = "Yes" - - # Use GPT when anything is missing (instead of best-effort inference) - if (row is None) or (eth == "Not listed") or (bat_yes == "Not listed") or (not badges): - g_badges, g_eth, g_bat = _gpt_fit_badges(model, canon_make, is_5g, row) - - if badges: - if is_5g and "4x4 MIMO" not in badges: - badges.append("4x4 MIMO") - dedup=[] - seen=set() - for b in badges: - if b not in seen: - seen.add(b); dedup.append(b) - badges_csv = ", ".join(dedup) - else: - badges_csv = g_badges - - eth = eth if eth != "Not listed" else g_eth - bat_yes = bat_yes if bat_yes != "Not listed" else g_bat - return (badges_csv or "Not listed", eth or "Not listed", bat_yes or "Not listed") - - dedup=[] - seen=set() - for b in badges: - if b not in seen: - seen.add(b); dedup.append(b) - badges_csv = ", ".join(dedup) if dedup else "Not listed" - return (badges_csv, eth, bat_yes) - -def build_fit_table(repl_4g: str, repl_5g: str, canon_make: str) -> pd.DataFrame: - rows = [] - # 4G alt row (is_5g False) - b4, eth4, bat4 = _fit_badges_for_model(repl_4g, canon_make, is_5g=False) - rows.append({"Device": "4G alternative", "Fit badges": b4, "Ethernet ports": eth4, "Battery": bat4}) - # 5G row (is_5g True) - b5, eth5, bat5 = _fit_badges_for_model(repl_5g, canon_make, is_5g=True) - rows.append({"Device": "5G replacement", "Fit badges": b5, "Ethernet ports": eth5, "Battery": bat5}) - return pd.DataFrame(rows, columns=FIT_COLS) - -# ============================ -# Output -# ============================ -def assemble_output(life_row: pd.Series, status: str, eos: str, eol: str, repl: Dict[str,Any], ant: Dict[str,Any]) -> str: - current_name = f"{life_row.get('sku','')} — {life_row.get('description','')}".strip(" —") - st = ant.get("stationary_omni", {}) - vh = ant.get("vehicle_omni", {}) - - lines = [] - lines.append(f"1. Current device: **{current_name}**") - lines.append(f"2. Status: **{status}**") - lines.append(f"3. End of Sale date: **{eos}**") - lines.append(f"4. End of Life date: **{eol}**") - lines.append(f"5. 4G alternative (lifecycle): **{repl.get('repl_4g','Not applicable')}**") - lines.append(f"6. 5G replacement (lifecycle): **{repl.get('repl_5g','Not listed')}**") - lines.append("7. Antenna options (Parsec-only):") - conn_s = f" | Conn: {st.get('connectors','')}" if st.get("connectors") else "" - conn_v = f" | Conn: {vh.get('connectors','')}" if vh.get("connectors") else "" - lines.append(f" - Stationary (Omni): **{st.get('name','')}** (Part #: {st.get('part_number','')}) — {st.get('description','')} — MIMO: {st.get('mimo','')}{conn_s}") - lines.append(f" - Vehicle (Omni): **{vh.get('name','')}** (Part #: {vh.get('part_number','')}) — {vh.get('description','')} — MIMO: {vh.get('mimo','')}{conn_v}") - - lines.append("\nSources (debug):") - for s in repl.get("sources", []) if isinstance(repl.get("sources"), list) else []: - lines.append(f"- {s}") - lines.append("- ParsecCatalog.pdf (local RAG)") - lines.append("- routers_eos_eol_by_sku.csv (replacements)") - return "\n".join(lines) - - -# ============================ -# Customer-ready email summary (single lookup only) -# ============================ - -def build_customer_email(life_row: pd.Series, status: str, eos: str, eol: str, repl: Dict[str,Any], ant: Dict[str,Any], link5: str) -> str: - """Email-style summary the rep can paste to a customer (lightly sales-y).""" - current = f"{life_row.get('sku','')} — {life_row.get('description','')}".strip(" —") - repl5 = str(repl.get("repl_5g","") or "").strip() - repl4 = str(repl.get("repl_4g","") or "").strip() - - st = ant.get("stationary_omni", {}) or {} - vh = ant.get("vehicle_omni", {}) or {} - - lines = [] - lines.append("Subject: Router replacement recommendation") - lines.append("") - lines.append("Hi there,") - lines.append("") - lines.append(f"We reviewed your current router (**{current}**) and recommend the following path forward:") - lines.append("") - lines.append(f"- **Status:** {status}") - lines.append(f"- **End of Sale:** {eos}") - lines.append(f"- **End of Life:** {eol}") - lines.append("") - lines.append("**Recommended replacement (5G):**") - lines.append(f"- {repl5 if repl5 else 'Not listed'}") - if link5: - lines.append(f"- Manufacturer page (best effort): {link5}") - lines.append("") - lines.append("**Optional 4G alternative (if needed):**") - lines.append(f"- {repl4 if repl4 and repl4.lower() != 'not applicable' else 'Not applicable'}") - lines.append("") - lines.append("**Antenna suggestions (Parsec):**") - lines.append(f"- Stationary (Omni): {st.get('name','')} (PN {st.get('part_number','')})") - lines.append(f"- Vehicle (Omni): {vh.get('name','')} (PN {vh.get('part_number','')})") - lines.append("") - lines.append("If you’d like, we can confirm the best-fit option for your install environment and provide pricing.") - lines.append("") - lines.append("Contact Peter Dunn @ 786.999.9127 or peter.dunn@masterstelecom.com for pricing.") - lines.append("") - lines.append("Thanks,") - lines.append("Peter Dunn") - return "\n".join(lines) - -def generate_customer_email(st_json: str) -> str: - st = state_load(st_json) - if not st or "row_idx" not in st: - return "Run a lookup first." - try: - life_row = df_eos.iloc[int(st["row_idx"])] - except Exception: - return "Run a lookup first." - - eos, eol, status = row_to_dates_and_status(life_row) - repl = st.get("repl", {}) or {} - ant = st.get("ant", {}) or {} - - canon_make = str(life_row.get("_canon_make","UNKNOWN")) - url5 = _best_effort_manufacturer_url(str(repl.get("repl_5g","") or ""), canon_make) - return build_customer_email(life_row, status, eos, eol, repl, ant, url5) - -# ============================ -# Gradio callbacks -# IMPORTANT: no dict state and ALL events have api_name=False (prevents api_info schema generation) -# ============================ -def run_lookup(user_text: str, st_json: str): - user_text = str(user_text or "").strip() - if not user_text: - return "Enter a router SKU/model.", "", None, None, "", gr.update(visible=False), gr.update(visible=False), "{}", "", "" - - res = resolve_device(user_text) - - if res.get("mode") == "pick": - opts = res.get("options", []) - choices = [o["label"] for o in opts] - st2 = {"mode":"pick","options": opts, "raw": user_text} - return "Did you mean A or B? Pick one, then click Use selection.", "", None, None, "", gr.update(choices=choices, value=None, visible=True), gr.update(visible=True), state_dump(st2), "", "" - - if res.get("mode") != "ok": - return "Not found.", "", None, None, "", gr.update(visible=False), gr.update(visible=False), "{}", "", "" - - life_row = df_eos.iloc[int(res["row_idx"])] - eos, eol, status = row_to_dates_and_status(life_row) - - repl = pick_replacements_lifecycle(life_row, status, use_gpt=True) - canon_make = str(life_row.get("_canon_make","UNKNOWN")) - mimo = infer_mimo_for_5g(repl.get("repl_5g","")) - tech = "5G" if repl.get("repl_5g") and repl.get("repl_5g") != "Not listed" else ("4G" if device_is_4g(life_row) else "Unknown") - ant = antenna_options_for(repl.get("repl_5g") or str(life_row.get("sku","")), tech, mimo) - - output = assemble_output(life_row, status, eos, eol, repl, ant) - st_out = {"row_idx": int(res["row_idx"]), "repl": repl, "ant": ant, "raw": user_text} - url5 = _best_effort_manufacturer_url(repl.get('repl_5g',''), canon_make) - link = f"**5G manufacturer page (best effort):** {url5}" if url5 else "" - feat_df = build_replacement_features_table(repl.get('repl_4g',''), repl.get('repl_5g',''), canon_make) - fit = build_fit_table(repl.get('repl_4g',''), repl.get('repl_5g',''), canon_make) - return output, link, feat_df, fit, "", gr.update(visible=False), gr.update(visible=False), state_dump(st_out), "", "" - -def use_selection(selected_label: str, st_json: str): - st = state_load(st_json) - if not st or st.get("mode") != "pick": - return "Run a search first.", "", None, None, "", gr.update(visible=False), gr.update(visible=False), "{}", "", "" - - if not selected_label: - return "Pick A or B first.", "", None, None, "", gr.update(visible=True), gr.update(visible=True), st_json, "", "" - - chosen_row = None - for o in st.get("options", []): - if o.get("label") == selected_label: - chosen_row = int(o["row_idx"]) - break - if chosen_row is None: - return "Pick a valid option.", "", None, None, "", gr.update(visible=True), gr.update(visible=True), st_json, "", "" - - life_row = df_eos.iloc[int(chosen_row)] - eos, eol, status = row_to_dates_and_status(life_row) - - repl = pick_replacements_lifecycle(life_row, status, use_gpt=True) - canon_make = str(life_row.get("_canon_make","UNKNOWN")) - mimo = infer_mimo_for_5g(repl.get("repl_5g","")) - tech = "5G" if repl.get("repl_5g") and repl.get("repl_5g") != "Not listed" else ("4G" if device_is_4g(life_row) else "Unknown") - ant = antenna_options_for(repl.get("repl_5g") or str(life_row.get("sku","")), tech, mimo) - - output = assemble_output(life_row, status, eos, eol, repl, ant) - st_out = {"row_idx": int(chosen_row), "repl": repl, "ant": ant, "raw": st.get("raw","")} - url5 = _best_effort_manufacturer_url(repl.get('repl_5g',''), canon_make) - link = f"**5G manufacturer page (best effort):** {url5}" if url5 else "" - feat_df = build_replacement_features_table(repl.get('repl_4g',''), repl.get('repl_5g',''), canon_make) - fit = build_fit_table(repl.get('repl_4g',''), repl.get('repl_5g',''), canon_make) - return output, link, feat_df, fit, "", gr.update(visible=False), gr.update(visible=False), state_dump(st_out), "", "" - -def make_install_ready(st_json: str): - st = state_load(st_json) - if not st or "row_idx" not in st: - return "Run a lookup first." - life_row = df_eos.iloc[int(st["row_idx"])] - current_sku = str(life_row.get("sku","") or "") - return install_ready_checklist(current_sku, st.get("repl", {}) or {}, st.get("ant", {}) or {}) - - - -# ============================ -# Q&A about the suggested device (post-recommendation) -# ============================ -def answer_question(question: str, st_json: str) -> str: - q = str(question or "").strip() - if not q: - return "" - st = state_load(st_json) - if not st or "repl" not in st: - return "Run a lookup first, then ask your question." - - repl = st.get("repl", {}) or {} - ant = st.get("ant", {}) or {} - repl5 = str(repl.get("repl_5g","") or "").strip() - repl4 = str(repl.get("repl_4g","") or "").strip() - # Pull a bit of dec context for the 5G model (if possible) - canon_make = "" - try: - # Try to infer maker family from stored row_idx - if "row_idx" in st: - row = df_eos.iloc[int(st["row_idx"])] - canon_make = str(row.get("_canon_make","UNKNOWN")) - except Exception: - canon_make = "" + fit_df = pd.DataFrame([ + {"Device":"4G alternative", **pack["fit4"]}, + {"Device":"5G replacement", **pack["fit5"]}, + ], columns=FIT_COLS) - # Manufacturer link (best effort) - url5 = _best_effort_manufacturer_url(repl5, canon_make) if repl5 else "" + return feat_df, fit_df - # Feature table row for 5G (helps the LLM answer spec questions without web scraping) - feat5 = {} - try: - feat5 = _features_from_dec(repl5, canon_make) if repl5 else {} - except Exception: - feat5 = {} +# ---------------------------- +# Manufacturer link (deterministic, no HTTP) +# ---------------------------- +MAKER_DOMAINS = { + "CRADLEPOINT": "https://cradlepoint.com", + "SIERRA": "https://airlink.com", + "FEENEY": "https://inseego.com", + "DIGI": "https://www.digi.com", + "CISCO_MERAKI": "https://meraki.cisco.com", + "CISCO": "https://www.cisco.com", + "TELTONIKA": "https://teltonika-networks.com", + "UNKNOWN": "", +} + +def guess_maker_url(model: str, canon_make: str) -> str: + model = str(model or "").strip() + base = MAKER_DOMAINS.get(canon_make, "") + if not base or not model or model in {"Not listed", "Not applicable"}: + return "" + q = re.sub(r"\s+", "+", model) + if canon_make == "TELTONIKA": + slug = model.lower() + return f"{base}/products/routers/{slug}" + if canon_make == "DIGI": + return f"{base}/search?q={q}" + if canon_make == "CRADLEPOINT": + return f"{base}/?s={q}" + if canon_make in {"CISCO", "CISCO_MERAKI"}: + return f"https://www.cisco.com/c/en/us/search.html?q={q}" + return f"{base}/search?q={q}" +# ---------------------------- +# Q&A (on demand, per last case) +# ---------------------------- +def gpt_answer(question: str, context: Dict[str, Any]) -> str: + if client is None: + return "No API key is configured, so I can’t answer detailed questions right now." + q = str(question or "").strip() + if not q: + return "" sys = ( - "You are a Verizon field rep assistant. Answer questions about the suggested router in a fast, practical way. " - "Use the provided context; do not mention internal tools, prompts, embeddings, or databases. " - "If the question is about specs and the value is unknown, say 'Not listed' and suggest checking the manufacturer page. " - "Keep it concise and scannable." + "You are a Verizon rep assistant. Answer in a fast, practical way. " + "Use the provided context. " + "Do not mention internal tools or prompts. " + "If unknown, say 'Not listed' and suggest the manufacturer page." ) + payload = {"context": context, "question": q} + t0 = time.perf_counter() + resp = client.responses.create( + model=OPENAI_MODEL, + input=[{"role":"system","content":sys},{"role":"user","content":_json_dump_safe(payload)}], + max_output_tokens=520, + ) + _tlog("llm qa", t0) + return (getattr(resp, "output_text", "") or "").strip() - context = { - "recommended_5g": repl5, - "recommended_4g": repl4 if repl4 and repl4.lower() != "not applicable" else "", - "manufacturer_link_5g": url5, - "known_5g_features": feat5, - "antenna_stationary": ant.get("stationary_omni", {}), - "antenna_vehicle": ant.get("vehicle_omni", {}), - } - - user = "Context:\n" + json.dumps(context, ensure_ascii=False) + "\n\nQuestion:\n" + q - - ans = gpt_answer_md(sys, user, max_tokens=650) - # Small safety fallback - return ans if ans else "I couldn't generate an answer right now. Try again." - -# ============================ -# UI -# ============================ - - -# ============================ -# Chat helpers -# ============================ -def _df_to_md(df: pd.DataFrame) -> str: - if df is None or (hasattr(df, "empty") and df.empty): - return "" +# ---------------------------- +# Chat utilities +# ---------------------------- +def df_to_md(df: pd.DataFrame) -> str: try: return df.to_markdown(index=False) except Exception: @@ -1648,7 +742,7 @@ def _df_to_md(df: pd.DataFrame) -> str: lines.append("| " + " | ".join([str(r.get(c,"")) for c in cols]) + " |") return "\n".join(lines) -def _extract_device_terms(msg: str) -> List[str]: +def extract_device_terms(msg: str) -> List[str]: raw = [x.strip() for x in re.split(r"[\n,;]+", str(msg or "")) if x.strip()] out=[] for x in raw: @@ -1656,10 +750,7 @@ def _extract_device_terms(msg: str) -> List[str]: out.append(x) return out -def _looks_like_yes(msg: str) -> bool: - return str(msg or "").strip().lower() in {"yes","y","yeah","yep","sure","ok","okay"} - -def _parse_install_mode(msg: str) -> Tuple[Optional[str], Optional[str]]: +def parse_install_mode(msg: str) -> Tuple[Optional[str], Optional[str]]: t = str(msg or "").strip().lower() mode = None detail = None @@ -1675,208 +766,172 @@ def _parse_install_mode(msg: str) -> Tuple[Optional[str], Optional[str]]: detail = "directional" return mode, detail -def _antenna_for_mode(repl5: str, canon_make: str, mode: str, detail: Optional[str]) -> Dict[str, Any]: - mimo = "4x4" # rule: all 5G = 4x4 - tech = "5G" - if mode == "vehicle": - return antenna_options_for(repl5, tech, mimo).get("vehicle_omni", {}) - if detail == "directional": - return antenna_options_for(repl5 + " directional", tech, mimo).get("stationary_omni", {}) - if detail == "indoor": - return antenna_options_for(repl5 + " indoor", tech, mimo).get("stationary_omni", {}) - return antenna_options_for(repl5, tech, mimo).get("stationary_omni", {}) - -def _make_case_key(s: str) -> str: +def make_case_key(s: str) -> str: s = str(s or "").strip() return re.sub(r"\s+", " ", s)[:80] +# ---------------------------- +# Chat UI (schema-safe) +# ---------------------------- with gr.Blocks(title="Only-Routers") as demo: - gr.Markdown("## Only-Routers\nChat mode for Verizon reps (multiple devices per message) + Batch tab.") - + gr.Markdown("## Only-Routers\nChat mode for Verizon reps (multiple devices per message).") state = gr.State("{}") - with gr.Tabs(): - with gr.Tab("Chat"): - chatbot = gr.Chatbot(label="Only-Routers Chat", height=520, type="tuples") - msg = gr.Textbox(label="Message", placeholder="Example: IBR650B, WR21\nVehicle install", lines=2) - send = gr.Button("Send", variant="primary") - - def chat_fn(user_msg, history, st_json): - st = state_load(st_json) - st.setdefault("cases", {}) - st.setdefault("last_case_keys", []) - st.setdefault("pending", {}) - st.setdefault("awaiting_questions", False) - - text = (user_msg or "").strip() - if not text: - return history, state_dump(st) - - # Pending pick (A/B) - if st.get("pending", {}).get("type") == "pick": - pend = st["pending"] - opts = pend.get("options", []) - choice = text.strip().lower() - idx = None - if choice in {"a","1","option a"} and len(opts) >= 1: - idx = 0 - elif choice in {"b","2","option b"} and len(opts) >= 2: - idx = 1 - if idx is None: - for i,o in enumerate(opts): - if str(o.get("label","")).lower() in choice: - idx = i - break - if idx is None: - history.append((text, "Please reply with **A** or **B**.")) - return history, state_dump(st) - - chosen_row = int(opts[idx]["row_idx"]) - life_row = df_eos.iloc[chosen_row] - eos, eol, status = row_to_dates_and_status(life_row) - repl = pick_replacements_lifecycle(life_row, status, use_gpt=True) - canon_make = str(life_row.get("_canon_make","UNKNOWN")) - - feat_df = build_replacement_features_table(repl.get("repl_4g",""), repl.get("repl_5g",""), canon_make) - fit_df = build_fit_table(repl.get("repl_4g",""), repl.get("repl_5g",""), canon_make) - - url4 = _best_effort_manufacturer_url(repl.get("repl_4g",""), canon_make) if repl.get("repl_4g","") not in {"Not applicable",""} else "" - url5 = _best_effort_manufacturer_url(repl.get("repl_5g",""), canon_make) if repl.get("repl_5g","") not in {"Not listed",""} else "" - - case_key = _make_case_key(str(life_row.get("sku","")) or pend.get("raw","")) - st["cases"][case_key] = {"row_idx": chosen_row, "repl": repl, "canon_make": canon_make, "eos": eos, "eol": eol, "status": status, "urls": {"4g": url4, "5g": url5}} - st["last_case_keys"].append(case_key) - st["pending"] = {"type": "install_mode", "case_keys": [case_key]} - - bot = [] - bot.append(f"**{case_key}**") - bot.append(f"- Status: **{status}** | EOS: **{eos}** | EOL: **{eol}**") - bot.append(f"- 4G alternative: **{repl.get('repl_4g','Not applicable')}**") - bot.append(f"- 5G replacement: **{repl.get('repl_5g','Not listed')}**") - if url4: - bot.append(f"- 4G manufacturer page: {url4}") - if url5: - bot.append(f"- 5G manufacturer page: {url5}") - bot.append("\n**Replacement features**\n" + _df_to_md(feat_df)) - bot.append("\n**Verizon fit**\n" + _df_to_md(fit_df)) - bot.append("\nFor antennas: **Vehicle/Mobile** or **Stationary**? If Stationary: **Indoor**, **Outdoor**, or **Directional**.") - bot.append("\nAny questions about the suggested device(s)?") - history.append((text, "\n".join(bot))) - st["awaiting_questions"] = True - return history, state_dump(st) - - # Pending install mode - if st.get("pending", {}).get("type") == "install_mode": - mode, detail = _parse_install_mode(text) - if mode is None: - history.append((text, "Quick one: **Vehicle/Mobile** or **Stationary**? If Stationary: **Indoor**, **Outdoor**, or **Directional**.")) - return history, state_dump(st) - - case_keys = st["pending"].get("case_keys", []) or st.get("last_case_keys", []) - updates=[] - for ck in case_keys: - case = st["cases"].get(ck, {}) - repl5 = (case.get("repl", {}) or {}).get("repl_5g","") - canon_make = case.get("canon_make","UNKNOWN") - ant = _antenna_for_mode(repl5, canon_make, mode, detail) - case.setdefault("antennas", {}) - case["antennas"][f"{mode}:{detail or ''}"] = ant - st["cases"][ck] = case - updates.append(f"**{ck}** antenna ({mode}{' / '+detail if detail else ''}): {ant.get('name','')} (PN {ant.get('part_number','')})") - - st["pending"] = {} - history.append((text, "\n".join(updates))) - return history, state_dump(st) - - # If user says yes to questions - if st.get("awaiting_questions") and _looks_like_yes(text): - history.append((text, "Ask away — what do you want to know about the suggested device(s)?")) - return history, state_dump(st) - - # Device lookup - device_terms = _extract_device_terms(text) - if device_terms: - bots=[] - new_case_keys=[] - for term in device_terms: - res = resolve_device(term) - if res.get("mode") == "pick": - st["pending"] = {"type":"pick", "options": res.get("options", []), "raw": term} - opts = res.get("options", []) - bot = "I found more than one close match. Reply **A** or **B**:\n" - for i,o in enumerate(opts): - bot += f"- **{'A' if i==0 else 'B'}**: {o.get('label','')}\n" - history.append((text, bot.strip())) - return history, state_dump(st) - if res.get("mode") != "ok": - bots.append(f"**{term}**: not found in lifecycle list. Who makes it (manufacturer) and what's the exact model/SKU?") - continue - - life_row = df_eos.iloc[int(res["row_idx"])] - eos, eol, status = row_to_dates_and_status(life_row) - repl = pick_replacements_lifecycle(life_row, status, use_gpt=True) - canon_make = str(life_row.get("_canon_make","UNKNOWN")) - - feat_df = build_replacement_features_table(repl.get("repl_4g",""), repl.get("repl_5g",""), canon_make) - fit_df = build_fit_table(repl.get("repl_4g",""), repl.get("repl_5g",""), canon_make) - - url4 = _best_effort_manufacturer_url(repl.get("repl_4g",""), canon_make) if repl.get("repl_4g","") not in {"Not applicable",""} else "" - url5 = _best_effort_manufacturer_url(repl.get("repl_5g",""), canon_make) if repl.get("repl_5g","") not in {"Not listed",""} else "" - - ck = _make_case_key(str(life_row.get("sku","")) or term) - st["cases"][ck] = {"row_idx": int(res["row_idx"]), "repl": repl, "canon_make": canon_make, "eos": eos, "eol": eol, "status": status, "urls": {"4g": url4, "5g": url5}} - st["last_case_keys"].append(ck) - new_case_keys.append(ck) - - bot=[] - bot.append(f"**{ck}**") - bot.append(f"- Status: **{status}** | EOS: **{eos}** | EOL: **{eol}**") - bot.append(f"- 4G alternative: **{repl.get('repl_4g','Not applicable')}**") - bot.append(f"- 5G replacement: **{repl.get('repl_5g','Not listed')}**") - if url4: - bot.append(f"- 4G manufacturer page: {url4}") - if url5: - bot.append(f"- 5G manufacturer page: {url5}") - bot.append("\n**Replacement features**\n" + _df_to_md(feat_df)) - bot.append("\n**Verizon fit**\n" + _df_to_md(fit_df)) - bots.append("\n".join(bot)) - - if new_case_keys: - st["pending"] = {"type":"install_mode", "case_keys": new_case_keys} - bots.append("\nFor antennas: **Vehicle/Mobile** or **Stationary**? If Stationary: **Indoor**, **Outdoor**, or **Directional**.") - bots.append("Any questions about the suggested device(s)?") - st["awaiting_questions"] = True - - history.append((text, "\n\n---\n\n".join(bots))) - return history, state_dump(st) - - # Treat as question about most recent case - last_keys = st.get("last_case_keys", []) - if not last_keys: - history.append((text, "Tell me the router model/SKU you’re working with (you can paste multiple).")) - return history, state_dump(st) - - ck = last_keys[-1] - case = st["cases"].get(ck, {}) - mini = {"row_idx": case.get("row_idx"), "repl": case.get("repl", {}), "ant": case.get("antennas", {})} - ans = answer_question(text, state_dump(mini)) - history.append((text, ans)) + chatbot = gr.Chatbot(label="Only-Routers Chat", height=560, type="tuples") + msg = gr.Textbox(label="Message", placeholder="Example: RUT240, WR21\nVehicle install", lines=2) + send = gr.Button("Send", variant="primary") + + def chat_fn(user_msg, history, st_json): + t0 = time.perf_counter() + st = state_load(st_json) + st.setdefault("cases", {}) + st.setdefault("last_case_keys", []) + st.setdefault("pending", {}) + st.setdefault("awaiting_questions", False) + + text = (user_msg or "").strip() + if not text: + return history, state_dump(st) + + # Pending A/B pick + if st.get("pending", {}).get("type") == "pick": + opts = st["pending"].get("options", []) + choice = text.strip().lower() + idx = 0 if choice in {"a","1"} else (1 if choice in {"b","2"} else None) + if idx is None or idx >= len(opts): + history.append((text, "Please reply with **A** or **B**.")) return history, state_dump(st) - send.click(fn=chat_fn, inputs=[msg, chatbot, state], outputs=[chatbot, state], api_name=False) - - with gr.Tab("Batch"): - gr.Markdown("Paste one per line or upload a CSV (first column). Batch runs fast (no GPT).") - batch_text = gr.Textbox(label="Paste devices (one per line)", lines=8, placeholder="WR21\nRUT240\nIBR650B") - batch_file = gr.File(label="Upload CSV", file_types=[".csv"]) - include_ant = gr.Checkbox(label="Include antenna picks (slower)", value=False) - run_btn = gr.Button("Run batch", variant="primary") + chosen_row = int(opts[idx]["row_idx"]) + life_row = df_eos.iloc[chosen_row] + eos, eol, status = row_to_dates_and_status(life_row) + repl = pick_replacements(life_row, status) + canon_make = str(life_row.get("_canon_make","UNKNOWN")) + + feat_df, fit_df = build_tables(repl["repl_4g"], repl["repl_5g"], canon_make) + url4 = guess_maker_url(repl["repl_4g"], canon_make) if repl["repl_4g"] != "Not applicable" else "" + url5 = guess_maker_url(repl["repl_5g"], canon_make) if repl["repl_5g"] != "Not listed" else "" + + ck = make_case_key(str(life_row.get("sku",""))) + st["cases"][ck] = {"row_idx": chosen_row, "repl": repl, "canon_make": canon_make, "status": status, "eos": eos, "eol": eol, "urls": {"4g": url4, "5g": url5}} + st["last_case_keys"].append(ck) + st["pending"] = {"type":"install_mode", "case_keys":[ck]} + st["awaiting_questions"] = True + + bot = [] + bot.append(f"**{ck}**") + bot.append(f"- Status: **{status}** | EOS: **{eos}** | EOL: **{eol}**") + bot.append(f"- 4G alternative: **{repl['repl_4g']}**") + bot.append(f"- 5G replacement: **{repl['repl_5g']}**") + if url4: + bot.append(f"- 4G manufacturer page: {url4}") + if url5: + bot.append(f"- 5G manufacturer page: {url5}") + bot.append("\n**Replacement features**\n" + df_to_md(feat_df)) + bot.append("\n**Verizon fit**\n" + df_to_md(fit_df)) + bot.append("\nFor antennas: **Vehicle/Mobile** or **Stationary**? If Stationary: **Indoor**, **Outdoor**, or **Directional**.") + bot.append("Any questions about the suggested device(s)?") + + history.append((text, "\n".join(bot))) + _tlog("chat pick flow", t0) + return history, state_dump(st) + + # Pending install-mode + if st.get("pending", {}).get("type") == "install_mode": + mode, detail = parse_install_mode(text) + if mode is None: + history.append((text, "Quick one: **Vehicle/Mobile** or **Stationary**? If Stationary: **Indoor**, **Outdoor**, or **Directional**.")) + return history, state_dump(st) - summary_md = gr.Markdown() - rollup_md = gr.Markdown() - table = gr.Dataframe(interactive=False, wrap=True) - dl = gr.File(label="Download results CSV") + updates=[] + for ck in st["pending"].get("case_keys", []): + case = st["cases"].get(ck, {}) + repl5 = (case.get("repl", {}) or {}).get("repl_5g","") + ant = antenna_pick(repl5, mode=mode, detail=detail) + case.setdefault("antennas", {}) + case["antennas"][f"{mode}:{detail or ''}"] = ant + st["cases"][ck] = case + updates.append(f"**{ck}** antenna ({mode}{' / '+detail if detail else ''}): {ant.get('name','')} (PN {ant.get('part_number','')})") + + st["pending"] = {} + history.append((text, "\n".join(updates))) + _tlog("chat antenna flow", t0) + return history, state_dump(st) + + # Device lookup + device_terms = extract_device_terms(text) + if device_terms: + bots=[] + new_case_keys=[] + for term in device_terms: + res = resolve_device(term) + if res.get("mode") == "pick": + st["pending"] = {"type":"pick", "options": res.get("options", []), "raw": term} + opts = res.get("options", []) + bot = "I found more than one close match. Reply **A** or **B**:\n" + for i,o in enumerate(opts): + bot += f"- **{'A' if i==0 else 'B'}**: {o.get('label','')}\n" + history.append((text, bot.strip())) + _tlog("chat resolve->pick", t0) + return history, state_dump(st) - run_btn.click(fn=run_batch, inputs=[batch_text, batch_file, include_ant], outputs=[summary_md, table, dl, rollup_md], api_name=False) + if res.get("mode") != "ok": + bots.append(f"**{term}**: not found in lifecycle list. Who makes it (manufacturer) and what's the exact model/SKU?") + continue + + life_row = df_eos.iloc[int(res["row_idx"])] + eos, eol, status = row_to_dates_and_status(life_row) + repl = pick_replacements(life_row, status) + canon_make = str(life_row.get("_canon_make","UNKNOWN")) + + t1 = time.perf_counter() + feat_df, fit_df = build_tables(repl["repl_4g"], repl["repl_5g"], canon_make) + _tlog("tables", t1) + + url4 = guess_maker_url(repl["repl_4g"], canon_make) if repl["repl_4g"] != "Not applicable" else "" + url5 = guess_maker_url(repl["repl_5g"], canon_make) if repl["repl_5g"] != "Not listed" else "" + + ck = make_case_key(str(life_row.get("sku","")) or term) + st["cases"][ck] = {"row_idx": int(res["row_idx"]), "repl": repl, "canon_make": canon_make, "status": status, "eos": eos, "eol": eol, "urls": {"4g": url4, "5g": url5}} + st["last_case_keys"].append(ck) + new_case_keys.append(ck) + + bot=[] + bot.append(f"**{ck}**") + bot.append(f"- Status: **{status}** | EOS: **{eos}** | EOL: **{eol}**") + bot.append(f"- 4G alternative: **{repl['repl_4g']}**") + bot.append(f"- 5G replacement: **{repl['repl_5g']}**") + if url4: + bot.append(f"- 4G manufacturer page: {url4}") + if url5: + bot.append(f"- 5G manufacturer page: {url5}") + bot.append("\n**Replacement features**\n" + df_to_md(feat_df)) + bot.append("\n**Verizon fit**\n" + df_to_md(fit_df)) + bots.append("\n".join(bot)) + + if new_case_keys: + st["pending"] = {"type":"install_mode", "case_keys": new_case_keys} + bots.append("\nFor antennas: **Vehicle/Mobile** or **Stationary**? If Stationary: **Indoor**, **Outdoor**, or **Directional**.") + bots.append("Any questions about the suggested device(s)?") + st["awaiting_questions"] = True + + history.append((text, "\n\n---\n\n".join(bots))) + _tlog("chat lookup flow", t0) + return history, state_dump(st) + + # Q&A about most recent case + if not st.get("last_case_keys"): + history.append((text, "Tell me the router model/SKU you’re working with (you can paste multiple).")) + return history, state_dump(st) + + ck = st["last_case_keys"][-1] + case = st["cases"].get(ck, {}) + ctx = {"case": ck, "replacements": case.get("repl", {}), "urls": case.get("urls", {}), "antennas": case.get("antennas", {})} + ans = gpt_answer(text, ctx) + history.append((text, ans)) + _tlog("chat qa flow", t0) + return history, state_dump(st) + + send.click(fn=chat_fn, inputs=[msg, chatbot, state], outputs=[chatbot, state], api_name=False) demo.launch(server_name="0.0.0.0", server_port=int(os.getenv("PORT","7860")), share=False, show_api=False)