diff --git "a/app.py" "b/app.py" --- "a/app.py" +++ "b/app.py" @@ -1,1175 +1,1878 @@ -"""Only Routers (chat) - -A Gradio chat app for Verizon reps to: -- Look up a router SKU (or close match) -- Show suggested 4G + 5G replacements -- Show a small feature table for each suggested device -- Produce a customer-ready email-style summary -- Answer follow-up questions about the suggested device(s) - -Data files (keep them next to this app.py) -- routers_eos_eol_by_sku.csv (EoS/EoL + suggested replacements) -- dec2025routers.csv (device detail table) - -OpenAI -- Set OPENAI_API_KEY as a Space secret. -- Default model is a non-thinking GPT-5.2 variant for lower latency. - -Notes -- For 5G replacements, cellular MIMO is always treated as 4x4. -- The app uses the Dec 2025 device table first. If a field is missing, it can ask GPT. -""" - -from __future__ import annotations - -import csv -import json -import logging import os import re -from dataclasses import dataclass, field -from functools import lru_cache -from pathlib import Path +import json +import math +import hashlib +import tempfile +from dataclasses import dataclass +from datetime import datetime, date from typing import Any, Dict, List, Optional, Tuple -import gradio as gr - -# Optional deps. The app runs without them, with fewer niceties. -try: - from openai import OpenAI # type: ignore -except Exception: # pragma: no cover - OpenAI = None # type: ignore - -try: - import requests # type: ignore -except Exception: # pragma: no cover - requests = None # type: ignore - -try: - from rapidfuzz import fuzz as rf_fuzz # type: ignore - from rapidfuzz import process as rf_process # type: ignore -except Exception: # pragma: no cover - rf_fuzz = None # type: ignore - rf_process = None # type: ignore - - -# ----------------------------- -# App config -# ----------------------------- - -LOG_LEVEL = os.getenv("LOG_LEVEL", "INFO").upper() -logging.basicConfig(level=LOG_LEVEL, format="%(levelname)s | %(message)s") -log = logging.getLogger("only-routers") - -HERE = Path(__file__).resolve().parent - -EOS_CSV_PATH = Path(os.getenv("EOS_CSV_PATH", str(HERE / "routers_eos_eol_by_sku.csv"))) -CATALOG_CSV_PATH = Path(os.getenv("CATALOG_CSV_PATH", str(HERE / "dec2025routers.csv"))) - -# Default to the faster, non-thinking GPT-5.2 route. -OPENAI_MODEL = os.getenv("OPENAI_MODEL", "gpt-5.2-chat-latest") -OPENAI_MODEL_QA = os.getenv("OPENAI_MODEL_QA", OPENAI_MODEL) -OPENAI_TEMPERATURE = float(os.getenv("OPENAI_TEMPERATURE", "0.2")) -OPENAI_MAX_TOKENS = int(os.getenv("OPENAI_MAX_TOKENS", "700")) +import numpy as np +import pandas as pd -# Fast timeouts help avoid hanging requests in a Space. -OPENAI_TIMEOUT_SEC = float(os.getenv("OPENAI_TIMEOUT_SEC", "25")) -HTTP_TIMEOUT_SEC = float(os.getenv("HTTP_TIMEOUT_SEC", "7")) +import fitz # PyMuPDF +import faiss +from sentence_transformers import SentenceTransformer +from rapidfuzz import fuzz, process -# If fuzzy matching is present, these control auto-pick. -FUZZY_AUTOPICK_MIN = int(os.getenv("FUZZY_AUTOPICK_MIN", "92")) -FUZZY_AUTOPICK_GAP = int(os.getenv("FUZZY_AUTOPICK_GAP", "5")) - -CONTACT_LINE = "Contact Peter Dunn @ 786.999.9127 or peter.dunn@masterstelecom.com for pricing." - - -# ----------------------------- -# Data models -# ----------------------------- - - -@dataclass(frozen=True) -class EosRecord: - sku: str - manufacturer: str - end_of_sale: str - end_of_life: str - description: str - suggested_replacement: str - advanced_5g_option: str - region: str - source: str - source_detail: str - notes: str - - -@dataclass -class DeviceDetails: - make: str = "" - model: str = "" - modem_type: str = "" - rugged: str = "" # Yes/No/Unknown - wifi: str = "" # WiFi type, or "None" - primary_use_case: str = "" - serial: str = "" # Yes/No/Unknown - throughput: str = "" # keep as text - antennas: str = "" # keep as text - ethernet_ports_total: Optional[int] = None - battery: str = "" # Yes/No/Unknown - dual_sim: str = "" # Yes/No/Unknown - cell_mimo: str = "" # 2x2 / 4x4 - link: str = "" # manufacturer page or datasheet - raw: Dict[str, str] = field(default_factory=dict) - - def is_5g(self) -> bool: - txt = f"{self.modem_type} {self.model} {self.raw.get('Notes','')}".lower() - return "5g" in txt - - -# ----------------------------- -# Small helpers -# ----------------------------- +import gradio as gr +from openai import OpenAI -_SKU_CLEAN_RE = re.compile(r"[^A-Z0-9\-]+") +# ============================ +# Settings +# ============================ +TODAY = date(2026, 1, 18) +OPENAI_MODEL = "gpt-5.2" +OPENAI_REASONING = {"effort": "high"} +MATCH_OK = 80 +EMBED_MODEL_NAME = "sentence-transformers/all-MiniLM-L6-v2" +PARSEC_CONTEXT_BEFORE = 900 +PARSEC_CONTEXT_AFTER = 1600 -def norm_sku(s: str) -> str: - """Normalize a user-entered SKU for matching.""" - s = (s or "").strip().upper() - return _SKU_CLEAN_RE.sub("", s) +# ============================ +# OpenAI client (HF Space secret: OPENAI_API_KEY) +# ============================ +API_KEY = os.getenv("OPENAI_API_KEY", "").strip() +client = OpenAI(api_key=API_KEY) if API_KEY else None -def clean_str(x: Any) -> str: - if x is None: - return "" - s = str(x) - if s.strip().lower() in {"nan", "none"}: - return "" - return s.strip() - - -def first_nonempty(*vals: str) -> str: - for v in vals: - v = clean_str(v) - if v: - return v - return "" - - -def safe_int(x: Any) -> Optional[int]: +# ---------------------------- +# Gradio state helpers +# Keep state as a JSON STRING to avoid schema issues on Hugging Face. +# ---------------------------- +def state_load(st_json: str) -> Dict[str, Any]: try: - return int(str(x).strip()) + if not st_json: + return {} + return json.loads(st_json) if isinstance(st_json, str) else {} except Exception: - return None - - -def looks_like_sku(s: str) -> bool: - """Heuristic: user typed something like RUT240, IBR900, CBA850, etc.""" - s2 = norm_sku(s) - return len(s2) >= 4 and any(c.isdigit() for c in s2) - - -def parse_yes_no(val: str) -> str: - v = clean_str(val).lower() - if not v: - return "Unknown" - if v in {"y", "yes", "true", "1"}: - return "Yes" - if v in {"n", "no", "false", "0"}: - return "No" - # handle mixed text - if "yes" in v: - return "Yes" - if "no" in v: - return "No" - return val.strip() - - -def is_wifi_present(wifi_type: str) -> bool: - v = clean_str(wifi_type).lower() - if not v: - return False - if v in {"none", "n/a", "na", "no"}: - return False - return True - - -def extract_models_from_text(text: str, known_models: List[str]) -> List[str]: - """Find known model tokens inside a longer string.""" - t = (text or "") - if not t: - return [] - t_up = t.upper() - hits: List[str] = [] - for m in known_models: - if m and m.upper() in t_up: - hits.append(m) - # De-dupe while keeping order - seen: set[str] = set() - out: List[str] = [] - for h in hits: - if h.upper() not in seen: - out.append(h) - seen.add(h.upper()) - return out - - -_PORT_TOKEN_RE = re.compile(r"(\d+)\s*[x×]\s*") - - -def parse_port_count(text: str) -> int: - """Best-effort parse of strings like '1x GbE', '2 x FE', 'WAN: 1x, LAN: 4x'.""" - t = clean_str(text) - if not t: - return 0 - - total = 0 - for m in _PORT_TOKEN_RE.finditer(t.lower()): - n = safe_int(m.group(1)) - if n: - total += n - - # Fallback: if the string is literally '1' or '2' - if total == 0: - n = safe_int(t) - if n is not None and 0 <= n <= 16: - total = n - - return total - - -def best_effort_total_ethernet(wan: str, lan: str) -> Optional[int]: - wan_n = parse_port_count(wan) - lan_n = parse_port_count(lan) - total = wan_n + lan_n - return total if total > 0 else None - - -def fit_badges(details: DeviceDetails) -> List[str]: - """Derive simple badges for quick scanning.""" - badges: List[str] = [] - - use_case = clean_str(details.primary_use_case).lower() - if "vehicle" in use_case or "mobile" in use_case or "fleet" in use_case: - badges.append("Vehicle") - if "fixed" in use_case or "branch" in use_case or "site" in use_case or "industrial" in use_case: - badges.append("Fixed site") - - if is_wifi_present(details.wifi): - badges.append("Wi-Fi") - - if parse_yes_no(details.rugged) == "Yes": - badges.append("Rugged") - - if parse_yes_no(details.dual_sim) == "Yes": - badges.append("Dual-SIM") - - if clean_str(details.cell_mimo): - if "4" in details.cell_mimo and "x" in details.cell_mimo: - badges.append("4x4 MIMO") - elif "2" in details.cell_mimo and "x" in details.cell_mimo: - badges.append("2x2 MIMO") - - # Very rough check for higher throughput. - thr = clean_str(details.throughput).lower() - if any(tok in thr for tok in ["1 gb", "1000", "2 gb", "2000", "multi-g", "gig"]): - badges.append("High throughput") - - if parse_yes_no(details.serial) == "Yes": - badges.append("Serial") - - return badges - - -# ----------------------------- -# CSV loading -# ----------------------------- - - -def _read_csv(path: Path) -> List[Dict[str, str]]: - if not path.exists(): - raise FileNotFoundError(f"Missing file: {path}") - - with path.open("r", encoding="utf-8-sig", newline="") as f: - reader = csv.DictReader(f) - rows: List[Dict[str, str]] = [] - for r in reader: - rows.append({k: clean_str(v) for k, v in (r or {}).items()}) - return rows - - -@lru_cache(maxsize=1) -def load_eos_records() -> List[EosRecord]: - rows = _read_csv(EOS_CSV_PATH) - out: List[EosRecord] = [] - for r in rows: - out.append( - EosRecord( - sku=clean_str(r.get("sku")), - manufacturer=clean_str(r.get("manufacturer")), - end_of_sale=clean_str(r.get("end_of_sale")), - end_of_life=clean_str(r.get("end_of_life")), - description=clean_str(r.get("description")), - suggested_replacement=clean_str(r.get("suggested_replacement")), - advanced_5g_option=clean_str(r.get("advanced_5g_option")), - region=clean_str(r.get("region")), - source=clean_str(r.get("source")), - source_detail=clean_str(r.get("source_detail")), - notes=clean_str(r.get("notes")), - ) - ) - return out - - -@lru_cache(maxsize=1) -def eos_index() -> Dict[str, EosRecord]: - """Index by normalized SKU.""" - idx: Dict[str, EosRecord] = {} - for rec in load_eos_records(): - key = norm_sku(rec.sku) - if key: - idx[key] = rec - return idx - - -@lru_cache(maxsize=1) -def eos_choices() -> List[str]: - """List of raw SKU strings, for fuzzy match UI.""" - return [r.sku for r in load_eos_records() if clean_str(r.sku)] - - -@lru_cache(maxsize=1) -def load_catalog_rows() -> List[Dict[str, str]]: - return _read_csv(CATALOG_CSV_PATH) - - -@lru_cache(maxsize=1) -def catalog_models() -> List[str]: - models: List[str] = [] - for r in load_catalog_rows(): - m = clean_str(r.get("Model")) - if m: - models.append(m) - # De-dupe - seen: set[str] = set() - out: List[str] = [] - for m in models: - mu = m.upper() - if mu not in seen: - out.append(m) - seen.add(mu) - return out - - -@lru_cache(maxsize=1) -def catalog_index_by_model() -> Dict[str, Dict[str, str]]: - idx: Dict[str, Dict[str, str]] = {} - for r in load_catalog_rows(): - m = clean_str(r.get("Model")) - if not m: - continue - idx[m.upper()] = r - return idx - - -def catalog_lookup(model: str) -> Optional[Dict[str, str]]: - if not model: - return None - return catalog_index_by_model().get(model.strip().upper()) - - -# ----------------------------- -# OpenAI client + calls -# ----------------------------- - - -@lru_cache(maxsize=1) -def get_openai_client() -> Optional[Any]: - if OpenAI is None: - return None - - api_key = os.getenv("OPENAI_API_KEY", "").strip() - if not api_key: - return None + return {} +def state_dump(st: Dict[str, Any]) -> str: try: - # OpenAI python SDK v1 - return OpenAI(api_key=api_key) + return json.dumps(st or {}, ensure_ascii=False) except Exception: - return None + return "{}" -def openai_chat( - *, - model: str, - messages: List[Dict[str, str]], - max_tokens: int = OPENAI_MAX_TOKENS, - temperature: float = OPENAI_TEMPERATURE, -) -> str: - """Small wrapper that works across common OpenAI SDK shapes.""" - client = get_openai_client() - if client is None: - return "" - - # Try chat.completions first. - try: - resp = client.chat.completions.create( - model=model, - messages=messages, - temperature=temperature, - max_tokens=max_tokens, - timeout=OPENAI_TIMEOUT_SEC, - ) - return (resp.choices[0].message.content or "").strip() - except Exception as e: - log.warning("OpenAI chat call failed: %s", e) - # Fallback to responses API when available. +# ============================ +# Helpers +# ============================ +def norm_text(s: Any) -> str: try: - resp = client.responses.create( - model=model, - input=messages, - temperature=temperature, - max_output_tokens=max_tokens, - timeout=OPENAI_TIMEOUT_SEC, - ) - # Best-effort text extraction. - if hasattr(resp, "output_text"): - return (resp.output_text or "").strip() - return "" - except Exception as e: - log.warning("OpenAI responses call failed: %s", e) - return "" - - -def try_parse_json(text: str) -> Optional[Dict[str, Any]]: - """Parse JSON from an LLM response. - - The model is asked for strict JSON. - This still handles the common case where JSON is wrapped in fences. - """ - raw = clean_str(text) - if not raw: - return None - - # Strip ```json fences - raw = raw.strip() - raw = re.sub(r"^```json\s*", "", raw, flags=re.IGNORECASE).strip() - raw = re.sub(r"^```\s*", "", raw, flags=re.IGNORECASE).strip() - raw = re.sub(r"```\s*$", "", raw).strip() - - try: - obj = json.loads(raw) - if isinstance(obj, dict): - return obj - return None + if s is None or (isinstance(s, float) and math.isnan(s)) or pd.isna(s): + return "" except Exception: - return None - + pass + s = str(s).strip().lower() + s = re.sub(r"[^a-z0-9\s\-\/]", " ", s) + s = re.sub(r"\s+", " ", s).strip() + return s -def validate_url(url: str) -> str: - """Return url if it looks usable. No throw.""" - u = clean_str(url) - if not u: - return "" - if not (u.startswith("http://") or u.startswith("https://")): +def safe_str(v: Any) -> str: + if v is None or (isinstance(v, float) and pd.isna(v)) or pd.isna(v): return "" + return str(v).strip() - if requests is None: - return u +def is_5g(modem_type: Any) -> bool: + s = norm_text(modem_type) + return ("5g" in s) or ("nr" in s) - # Quick HEAD/GET check; if blocked, keep the URL anyway. +def json_load_safe(s: str) -> Dict[str, Any]: try: - r = requests.head(u, timeout=HTTP_TIMEOUT_SEC, allow_redirects=True) - if 200 <= int(getattr(r, "status_code", 0)) < 400: - return u + return json.loads(s) except Exception: - pass - - try: - r = requests.get(u, timeout=HTTP_TIMEOUT_SEC, allow_redirects=True) - if 200 <= int(getattr(r, "status_code", 0)) < 400: - return u - except Exception: - pass - - return u - - -# ----------------------------- -# Device detail build + GPT fill -# ----------------------------- - - -def details_from_catalog_row(row: Dict[str, str]) -> DeviceDetails: - """Convert a Dec 2025 catalog row into the normalized DeviceDetails.""" - det = DeviceDetails( - make=clean_str(row.get("Make")), - model=clean_str(row.get("Model")), - modem_type=clean_str(row.get("Modem Type")), - rugged=parse_yes_no(row.get("Ruggedization", "")), - wifi=clean_str(row.get("WiFi type")), - primary_use_case=clean_str(row.get("Primary use case")), - serial=parse_yes_no(row.get("Serial port", "")), - throughput=clean_str(row.get("Router throughput")), - antennas=clean_str(row.get("Antennas")), - ethernet_ports_total=best_effort_total_ethernet( - row.get("WAN ports and speed", ""), - row.get("LAN ports and speed", ""), - ), - battery=parse_yes_no(row.get("Battery", "")), - # Dual-SIM is not a column in this sheet today. - dual_sim="Unknown", - raw=row, - ) - - # Best-effort cellular MIMO guess from the Antennas cell. - ant = det.antennas.lower() - if "4x4" in ant or "4 x 4" in ant: - det.cell_mimo = "4x4" - elif "2x2" in ant or "2 x 2" in ant: - det.cell_mimo = "2x2" - - return det - - -def enforce_5g_rules(det: DeviceDetails) -> DeviceDetails: - """Apply hard rules that should never vary.""" - if det.is_5g(): - det.cell_mimo = "4x4" # always - return det - - -@lru_cache(maxsize=256) -def _gpt_enrich_missing_fields_cached( - make: str, - model: str, - is_5g: bool, - catalog_blob: str, - want_link: bool, -) -> dict: - """Cached GPT call used by :func:`gpt_fill_missing_fields`. + return {} - Cache key is (make, model, is_5g, catalog_blob, want_link). - """ - client = get_openai_client() +def gpt_json(system: str, payload: Dict[str, Any], max_tokens: int = 600) -> Dict[str, Any]: if client is None: return {} - - system = ( - "You are helping a Verizon seller prep a device swap. " - "The catalog text is the main reference. " - "For dual_sim and ethernet_ports_total, only use the catalog text. " - "If not stated, return null. " - "For link, you may propose an official manufacturer product page or datasheet URL " - "if you are confident it is correct; else return null. " - "Return strict JSON only." + resp = client.responses.create( + model=OPENAI_MODEL, + reasoning=OPENAI_REASONING, + input=[{"role":"system","content":system},{"role":"user","content":json.dumps(payload)}], + max_output_tokens=max_tokens, ) + return json_load_safe(getattr(resp, "output_text", "") or "") - user = { - "task": "Fill missing device fields.", - "device_make": make, - "device_model": model, - "device_is_5g": is_5g, - "catalog_text": catalog_blob, - "fields": { - "dual_sim": "Yes/No or null", - "ethernet_ports_total": "integer or null", - "link": "URL string or null", - }, - "notes": { - "link_requested": want_link, - "cell_mimo_rule": "If device_is_5g is true, cellular MIMO is 4x4 (already enforced elsewhere).", - }, - } - resp = openai_chat( +def gpt_answer_md(system: str, user: str, max_tokens: int = 650) -> str: + """Return a rep-friendly markdown answer.""" + if client is None: + return "No API key is configured, so I can't answer detailed questions right now." + resp = client.responses.create( model=OPENAI_MODEL, - messages=[ + reasoning=OPENAI_REASONING, + input=[ {"role": "system", "content": system}, - {"role": "user", "content": json.dumps(user, ensure_ascii=False)}, + {"role": "user", "content": user}, ], - max_tokens=420, - temperature=0.0, - ) - - return try_parse_json(resp) or {} - - -def gpt_fill_missing_fields(det: DeviceDetails) -> DeviceDetails: - """Fill missing fields via GPT. - - This only runs when we have gaps, then writes back into the passed DeviceDetails. - """ - - if get_openai_client() is None: - return det - - # Only ask GPT when we have gaps. - needs_dual_sim = parse_yes_no(det.dual_sim) == "Unknown" - needs_ports = det.ethernet_ports_total is None - - # Links can be slow (and are not always needed). Default: fetch for 5G only. - links_mode = clean_str(os.getenv("FETCH_DEVICE_LINKS", "5g")).lower() # 5g | all | none - needs_link = False - if links_mode != "none" and not clean_str(det.link): - if links_mode == "all": - needs_link = True - elif links_mode == "5g" and det.is_5g(): - needs_link = True - - if not (needs_dual_sim or needs_ports or needs_link): - return det - - # Use the catalog row text as context. - catalog_blob = "\n".join([f"{k}: {v}" for k, v in det.raw.items() if clean_str(v)]) - - obj = _gpt_enrich_missing_fields_cached( - make=det.make, - model=det.model, - is_5g=det.is_5g(), - catalog_blob=catalog_blob, - want_link=needs_link, + max_output_tokens=max_tokens, ) + return (getattr(resp, "output_text", "") or "").strip() + + +# ============================ +# Load data +# ============================ +EOS_PATH = "routers_eos_eol_by_sku.csv" +DEC_PATH = "dec2025routers.csv" +PARSEC_PDF = "ParsecCatalog.pdf" + +if not os.path.exists(EOS_PATH): + raise FileNotFoundError(f"Missing {EOS_PATH} in repo.") +if not os.path.exists(DEC_PATH): + raise FileNotFoundError(f"Missing {DEC_PATH} in repo.") +if not os.path.exists(PARSEC_PDF): + raise FileNotFoundError(f"Missing {PARSEC_PDF} in repo.") + +df_eos = pd.read_csv(EOS_PATH).copy() +df_dec = pd.read_csv(DEC_PATH).copy()# ---------------------------- +# Lifecycle CSV normalization (supports simplified format) +# ---------------------------- +# New format example columns: +# SKU, manufacturer, Device Type, end_of_sale, end_of_life, suggested_replacement, advanced_5g_option +# We normalize to internal lowercase names and synthesize missing fields used by matching. +def _normalize_lifecycle_df(df: pd.DataFrame) -> pd.DataFrame: + df = df.copy() + # map columns case-insensitively + col_map = {} + lower_cols = {c.lower(): c for c in df.columns} + + def _pick(*names): + for n in names: + if n.lower() in lower_cols: + return lower_cols[n.lower()] + return None - if not obj: - return det + sku_col = _pick("sku", "SKU") + if sku_col: + col_map[sku_col] = "sku" + mfr_col = _pick("manufacturer", "Manufacturer") + if mfr_col: + col_map[mfr_col] = "manufacturer" + dt_col = _pick("device type", "Device Type", "device_type") + if dt_col: + col_map[dt_col] = "device_type" + eos_col = _pick("end_of_sale", "end of sale", "End of Sale", "eos") + if eos_col: + col_map[eos_col] = "end_of_sale" + eol_col = _pick("end_of_life", "end of life", "End of Life", "eol") + if eol_col: + col_map[eol_col] = "end_of_life" + sr_col = _pick("suggested_replacement", "Suggested Replacement") + if sr_col: + col_map[sr_col] = "suggested_replacement" + a5_col = _pick("advanced_5g_option", "Advanced 5G Option", "advanced 5g option") + if a5_col: + col_map[a5_col] = "advanced_5g_option" + + df = df.rename(columns=col_map) + + # Ensure required columns exist + for req in ["sku", "manufacturer", "device_type", "end_of_sale", "end_of_life", "suggested_replacement", "advanced_5g_option"]: + if req not in df.columns: + df[req] = "" + + # Synthesize description/notes/region for backward compatibility (matching + display) + if "description" not in df.columns: + df["description"] = df["sku"].astype(str) + if "notes" not in df.columns: + df["notes"] = "" + if "region" not in df.columns: + df["region"] = "" + + return df + +df_eos = _normalize_lifecycle_df(df_eos) + + + + +def _canonize_eos_columns(df: pd.DataFrame) -> pd.DataFrame: + """Normalize lifecycle CSV column names (case-insensitive) and create expected columns.""" + # Map various header spellings to canonical names used by the app + mapping = {} + for c in df.columns: + k = str(c).strip().lower().replace(" ", "_") + if k in {"sku", "model", "device", "device_sku"}: + mapping[c] = "sku" + elif k in {"manufacturer", "make", "vendor"}: + mapping[c] = "manufacturer" + elif k in {"device_type", "type"}: + mapping[c] = "device_type" + elif k in {"end_of_sale", "eos", "end_sale", "end_of_sales"}: + mapping[c] = "end_of_sale" + elif k in {"end_of_life", "eol", "end_life"}: + mapping[c] = "end_of_life" + elif k in {"suggested_replacement", "replacement_4g", "lte_replacement", "replacement_lte", "replacement"}: + mapping[c] = "suggested_replacement" + elif k in {"advanced_5g_option", "replacement_5g", "fiveg_replacement", "5g_replacement", "upgrade_5g"}: + mapping[c] = "advanced_5g_option" + elif k in {"region", "market"}: + mapping[c] = "region" + elif k in {"notes", "note"}: + mapping[c] = "notes" + elif k in {"description", "device_description", "name"}: + mapping[c] = "description" + + df = df.rename(columns=mapping).copy() + + # Create expected columns if missing + if "sku" not in df.columns: + # Try the common capitalized header as a fallback + if "SKU" in df.columns: + df["sku"] = df["SKU"].astype(str) + else: + df["sku"] = "" + + if "manufacturer" not in df.columns: + df["manufacturer"] = "" + + if "device_type" not in df.columns: + df["device_type"] = "" + + if "description" not in df.columns: + # If the simplified file removed description, use SKU as description (still searchable) + df["description"] = df["sku"].astype(str) + + if "notes" not in df.columns: + df["notes"] = "" + + if "region" not in df.columns: + df["region"] = "" + + if "suggested_replacement" not in df.columns: + df["suggested_replacement"] = "" + + if "advanced_5g_option" not in df.columns: + df["advanced_5g_option"] = "" + + if "end_of_sale" not in df.columns: + df["end_of_sale"] = "" + + if "end_of_life" not in df.columns: + df["end_of_life"] = "" + + return df + +df_eos = _canonize_eos_columns(df_eos) + + +def region_ok(x: Any) -> bool: + s = str(x or "").strip().lower() + if not s: + return True + if "not specified" in s: + return True + if "north america" in s: + return True + if re.search(r"\busa\b", s): + return True + if re.search(r"\bunited\s+states\b", s): + return True + if re.search(r"\bu\.?s\.?\b", s): + return True + return False + +if "region" in df_eos.columns: + df_eos = df_eos[df_eos["region"].apply(region_ok)].reset_index(drop=True) + +# Maker mapping (includes Teltonika) +CANON_MAKER = { + "CRADLEPOINT": {"cradlepoint", "ericsson", "ericsson enterprise wireless"}, + "SIERRA": {"sierra", "sierra wireless", "semtech", "airlink"}, + "FEENEY": {"feeney", "feeney wireless", "inseego"}, + "DIGI": {"digi", "accelerated", "accelerated concepts"}, + "CISCO_MERAKI": {"meraki", "cisco meraki"}, + "CISCO": {"cisco"}, + "TELTONIKA": {"teltonika"}, +} + +def canon_maker_from_text(s: Any) -> str: + t = norm_text(s) + for canon, terms in CANON_MAKER.items(): + for term in terms: + if term in t: + return canon + return "UNKNOWN" + +df_eos["_canon_make"] = df_eos["manufacturer"].apply(canon_maker_from_text) if "manufacturer" in df_eos.columns else "UNKNOWN" +df_eos["_norm_sku"] = df_eos["sku"].apply(norm_text) if "sku" in df_eos.columns else "" +df_eos["_norm_desc"] = df_eos["description"].apply(norm_text) if "description" in df_eos.columns else "" +df_eos["_norm_notes"] = df_eos["notes"].apply(norm_text) if "notes" in df_eos.columns else "" + +df_dec["_canon_make"] = df_dec["Make"].apply(canon_maker_from_text) if "Make" in df_dec.columns else "UNKNOWN" +df_dec["_norm_model"] = df_dec["Model"].apply(norm_text) if "Model" in df_dec.columns else "" +df_dec["_is5g"] = df_dec["Modem Type"].apply(is_5g) if "Modem Type" in df_dec.columns else False + + +# ============================ +# Date helpers +# ============================ +@dataclass +class ParsedDate: + raw: str + kind: str + value: Optional[date] - if needs_dual_sim: - det.dual_sim = parse_yes_no(clean_str(obj.get("dual_sim"))) +def parse_date_field(x: Any) -> ParsedDate: + raw = str(x or "").strip() + if not raw: + return ParsedDate(raw="", kind="missing", value=None) + + # Common US formats: M/D/YY or M/D/YYYY (e.g., 6/24/24, 9/30/21) + for fmt in ("%m/%d/%y", "%m/%d/%Y", "%-m/%-d/%y", "%-m/%-d/%Y"): + try: + dt = datetime.strptime(raw, fmt).date() + return ParsedDate(raw=raw, kind="full", value=dt) + except Exception: + pass + + # ISO-ish: YYYY + if re.fullmatch(r"\d{4}", raw): + y = int(raw) + if y == TODAY.year: + return ParsedDate(raw=raw, kind="year", value=date(y, 1, 1)) + if y < TODAY.year: + return ParsedDate(raw=raw, kind="year", value=date(y, 1, 1)) + return ParsedDate(raw=raw, kind="year", value=date(y, 12, 31)) + + # YYYY-MM + if re.fullmatch(r"\d{4}-\d{2}", raw): + try: + y, m = raw.split("-") + return ParsedDate(raw=raw, kind="year_month", value=date(int(y), int(m), 1)) + except Exception: + return ParsedDate(raw=raw, kind="bad", value=None) + + # YYYY-MM-DD + if re.fullmatch(r"\d{4}-\d{2}-\d{2}", raw): + try: + dt = datetime.strptime(raw, "%Y-%m-%d").date() + return ParsedDate(raw=raw, kind="full", value=dt) + except Exception: + return ParsedDate(raw=raw, kind="bad", value=None) + + # Last resort: leave as raw (unparsed) + return ParsedDate(raw=raw, kind="bad", value=None) + + if re.fullmatch(r"\d{4}-\d{2}-\d{2}", raw): + try: + dt = datetime.strptime(raw, "%Y-%m-%d").date() + return ParsedDate(raw=raw, kind="full", value=dt) + except Exception: + return ParsedDate(raw=raw, kind="bad", value=None) + + return ParsedDate(raw=raw, kind="bad", value=None) + +def display_date(pd_: ParsedDate) -> str: + if pd_.kind == "missing": + return "Not listed" + if pd_.kind == "bad": + return pd_.raw or "Not listed" + return pd_.raw + +def status_from_eos_eol(eos: ParsedDate, eol: ParsedDate) -> str: + if eos.value is None and eol.value is None: + return "Unknown" + if eol.value is not None and eol.value <= TODAY: + return "End of Life" + if eos.value is not None and eos.value <= TODAY: + return "End of Sale" + return "Active" + +def row_to_dates_and_status(row: pd.Series) -> Tuple[str, str, str]: + eos = parse_date_field(row.get("end_of_sale")) + eol = parse_date_field(row.get("end_of_life")) + return display_date(eos), display_date(eol), status_from_eos_eol(eos, eol) + + +# ============================ +# Embeddings + Parsec index +# ============================ +embedder = SentenceTransformer(EMBED_MODEL_NAME) + +def extract_pdf_text_pages(path: str) -> List[str]: + doc = fitz.open(path) + return [doc[i].get_text("text") for i in range(len(doc))] + +def build_parsec_cards(pages: List[str]) -> List[str]: + cards = [] + for p in pages: + for m in re.finditer(r"Standard\s+SKU:", p): + start = max(0, m.start() - PARSEC_CONTEXT_BEFORE) + end = min(len(p), m.start() + PARSEC_CONTEXT_AFTER) + c = p[start:end].strip() + if len(c) >= 200: + cards.append(c) + out, seen = [], set() + for c in cards: + h = hashlib.sha1(c.encode("utf-8")).hexdigest() + if h not in seen: + seen.add(h); out.append(c) + return out - if needs_ports: - det.ethernet_ports_total = safe_int(obj.get("ethernet_ports_total")) +parsec_cards = build_parsec_cards(extract_pdf_text_pages(PARSEC_PDF)) +parsec_emb = embedder.encode(parsec_cards, batch_size=64, show_progress_bar=False, normalize_embeddings=True) +parsec_emb = np.asarray(parsec_emb, dtype=np.float32) +parsec_index = faiss.IndexFlatIP(parsec_emb.shape[1]) +parsec_index.add(parsec_emb) - if needs_link: - det.link = validate_url(clean_str(obj.get("link"))) - return det +# ============================ +# Device resolution +# ============================ +def label_for_row(i: int) -> str: + r = df_eos.iloc[i] + return f"{r.get('sku','')} — {r.get('manufacturer','')} — {r.get('description','')}"[:220] +EOS_LABELS = [label_for_row(i) for i in range(len(df_eos))] +EOS_CORPUS = [] +for _, r in df_eos.iterrows(): + EOS_CORPUS.append(" ".join([r.get("_norm_sku",""), r.get("_canon_make",""), r.get("_norm_desc",""), r.get("_norm_notes","")])) -# ----------------------------- -# Lookup + recommendation -# ----------------------------- +def local_candidates(query: str, top_k: int = 6) -> List[Tuple[int, int, str]]: + q = norm_text(query) + hits = process.extract(q, EOS_CORPUS, scorer=fuzz.WRatio, limit=top_k) + return [(int(idx), int(score), EOS_LABELS[int(idx)]) for _, score, idx in hits] +def gpt_choose_device(user_text: str, candidates: List[Tuple[int,int,str]]) -> Dict[str, Any]: + if client is None: + return {} + sys = "Pick which router the user meant. Never invent. Return strict JSON only." + payload = { + "user_input": user_text, + "candidates": [{"row_idx": i, "score": s, "label": lbl} for (i,s,lbl) in candidates], + "rules": [ + "If one is clearly correct, return mode='ok' with row_idx.", + "If two are plausible, return mode='pick' with top 2 options." + ], + "output_schema": {"mode":"ok|pick","row_idx":"int","options":[{"row_idx":"int","label":"string"}]} + } + return gpt_json(sys, payload, max_tokens=280) + +def resolve_device(user_text: str) -> Dict[str, Any]: + q = norm_text(user_text) + exact = df_eos.index[df_eos["_norm_sku"] == q].tolist() + if len(exact) == 1: + return {"mode":"ok","row_idx": int(exact[0])} + if len(exact) > 1: + opts = [{"row_idx": int(i), "label": EOS_LABELS[int(i)]} for i in exact[:2]] + return {"mode":"pick","options": opts} + + cands = local_candidates(user_text, top_k=6) + if not cands: + return {"mode":"not_found"} + + if cands[0][1] >= 95 and (len(cands) == 1 or (cands[0][1] - cands[1][1]) >= 8): + return {"mode":"ok","row_idx": cands[0][0]} + + g = gpt_choose_device(user_text, cands) + if g.get("mode") == "ok" and isinstance(g.get("row_idx"), int): + return {"mode":"ok","row_idx": int(g["row_idx"])} + + if g.get("mode") == "pick": + opts = g.get("options", []) or [] + opts2 = [{"row_idx": int(o["row_idx"]), "label": str(o["label"])} for o in opts[:2] if "row_idx" in o] + if opts2: + return {"mode":"pick","options": opts2} + + if len(cands) > 1: + return {"mode":"pick","options":[{"row_idx":cands[0][0],"label":cands[0][2]},{"row_idx":cands[1][0],"label":cands[1][2]}]} + return {"mode":"pick","options":[{"row_idx":cands[0][0],"label":cands[0][2]}]} + + +# ============================ +# Replacements — lifecycle CSV source of truth +# ============================ +def extract_model_token(text: str) -> str: + s = safe_str(text) + if not s: + return "" + parts = [p.strip() for p in s.split("|") if p.strip()] + candidates = parts[::-1] if parts else [s] + for cand in candidates: + m = re.search(r"\bRUT[A-Z]?\d{2,4}\b", cand.upper()) + if m: + return m.group(0).upper() + m = re.search(r"\bIX\d{2}\b", cand, flags=re.IGNORECASE) + if m: + return m.group(0).upper() + m = re.search(r"\b(R\d{3,4}|E\d{3,4}|S\d{3,4})\b", cand, flags=re.IGNORECASE) + if m: + return m.group(0).upper() + m = re.search(r"\b[A-Z]{1,6}\d{2,4}[A-Z]?\b", cand.upper()) + if m: + return m.group(0).upper() + return candidates[0][:60] -def fuzzy_match_sku(query: str) -> Tuple[Optional[str], List[Tuple[str, int]]]: - """Return (autopick, ranked list).""" - q = clean_str(query) - if not q: - return None, [] - - # If rapidfuzz is missing, we cannot do ranked match. - if rf_process is None or rf_fuzz is None: - return None, [] - - choices = eos_choices() - ranked = rf_process.extract( - q, - choices, - scorer=rf_fuzz.WRatio, - limit=8, - ) - ranked2: List[Tuple[str, int]] = [(m[0], int(m[1])) for m in ranked] +def device_is_4g(row: pd.Series) -> bool: + # Detect LTE/4G even when the description uses "Cat 4 / Cat6 / Cat 12" without saying "LTE" + t = norm_text(row.get("description","")) + " " + norm_text(row.get("notes","")) + " " + norm_text(row.get("sku","")) - if not ranked2: - return None, [] + # If it explicitly says 5G/NR, treat as not 4G-only + if ("5g" in t) or ("nr" in t): + return False - best_sku, best_score = ranked2[0] - second_score = ranked2[1][1] if len(ranked2) > 1 else 0 + # Classic signals + if ("lte" in t) or ("4g" in t): + return True - if best_score >= FUZZY_AUTOPICK_MIN and (best_score - second_score) >= FUZZY_AUTOPICK_GAP: - return best_sku, ranked2 + # LTE category signals (Cat 1..20 are LTE categories; Cat M1/M2 are LTE-M) + if re.search(r"\bcat\s*[-]?\s*(m1|m2)\b", t): + return True - return None, ranked2 + m = re.search(r"\bcat\s*[-]?\s*(\d{1,2})\b", t) + if m: + try: + cat = int(m.group(1)) + if 0 < cat <= 20: + return True + except Exception: + pass + # If "cat" appears at all, it's almost always LTE-family + if "cat" in t: + return True -def find_eos_record(query: str) -> Tuple[Optional[EosRecord], List[Tuple[str, int]]]: - """Try exact SKU match, then fuzzy match.""" + return False - q_norm = norm_sku(query) - if q_norm and q_norm in eos_index(): - return eos_index()[q_norm], [] + # If it explicitly says 5G/NR, treat as not 4G-only + if ("5g" in t) or ("nr" in t): + return False - autopick, ranked = fuzzy_match_sku(query) - if autopick: - rec = eos_index().get(norm_sku(autopick)) - if rec: - return rec, ranked + # Classic signals + if ("lte" in t) or ("4g" in t): + return True - return None, ranked + # LTE category signals (Cat 1..20 are LTE categories; Cat M1/M2 are LTE-M) + if re.search(r"\bcat\s*[-]?\s*(m1|m2)\b", t): + return True + m = re.search(r"\bcat\s*[-]?\s*(\d{1,2})\b", t) + if m: + try: + cat = int(m.group(1)) + if 0 < cat <= 20: + return True + except Exception: + pass -def pick_model_for_replacement(text: str) -> Optional[str]: - """Pick the best catalog model that appears inside the replacement text.""" - hits = extract_models_from_text(text, catalog_models()) - if hits: - # prefer the longest token (tends to be the real model) - hits_sorted = sorted(hits, key=lambda x: len(x), reverse=True) - return hits_sorted[0] - return None + # If "cat" appears at all, it's almost always LTE-family + if "cat" in t: + return True + return False -def build_device_details(model: str) -> Optional[DeviceDetails]: - row = catalog_lookup(model) - if not row: - return None - det = details_from_catalog_row(row) - det = enforce_5g_rules(det) - det = gpt_fill_missing_fields(det) - det = enforce_5g_rules(det) - return det - - -def build_recommendation(rec: EosRecord) -> Dict[str, Any]: - """Return a dict with 4G/5G device details, plus a small table and summary.""" - - # Replacement strings from the EoS table. - repl_4g_text = clean_str(rec.suggested_replacement) - repl_5g_text = clean_str(rec.advanced_5g_option) - - # Try to map them to catalog models. - repl_4g_model = pick_model_for_replacement(repl_4g_text) if repl_4g_text else None - repl_5g_model = pick_model_for_replacement(repl_5g_text) if repl_5g_text else None - - det_4g = build_device_details(repl_4g_model) if repl_4g_model else None - det_5g = build_device_details(repl_5g_model) if repl_5g_model else None - - # Feature table - table_cols = [ - "Role", - "Make", - "Model", - "Modem", - "Fit badges", - "Cell MIMO", - "Eth ports", - "Battery", - "Wi-Fi", - "Rugged", - "Dual-SIM", - "Serial", - "Throughput", - "Link", - ] - - def _row(role: str, d: Optional[DeviceDetails]) -> List[Any]: - if d is None: - return [role] + [""] * (len(table_cols) - 1) - badges = ", ".join(fit_badges(d)) - link = d.link - return [ - role, - d.make, - d.model, - d.modem_type, - badges, - d.cell_mimo, - d.ethernet_ports_total if d.ethernet_ports_total is not None else "", - parse_yes_no(d.battery), - d.wifi, - parse_yes_no(d.rugged), - parse_yes_no(d.dual_sim), - parse_yes_no(d.serial), - d.throughput, - link, - ] - table_rows = [ - _row("4G option", det_4g), - _row("5G option", det_5g), - ] +def candidate_5g_models_from_lifecycle(manufacturer: str) -> List[str]: + mfr = norm_text(manufacturer) + pool = df_eos[df_eos["manufacturer"].astype(str).str.lower().eq(mfr)].copy() if "manufacturer" in df_eos.columns else df_eos.copy() + vals = pool["advanced_5g_option"].tolist() if "advanced_5g_option" in pool.columns else [] + out, seen = [], set() + for v in vals: + tok = extract_model_token(v) + if tok and tok.lower() != "nan" and tok not in seen: + seen.add(tok); out.append(tok) + return out - summary = customer_email_summary(rec, det_4g, det_5g) +def candidate_4g_models_from_lifecycle(manufacturer: str) -> List[str]: + mfr = norm_text(manufacturer) + pool = df_eos[df_eos["manufacturer"].astype(str).str.lower().eq(mfr)].copy() if "manufacturer" in df_eos.columns else df_eos.copy() + vals = pool["suggested_replacement"].tolist() if "suggested_replacement" in pool.columns else [] + out, seen = [], set() + for v in vals: + tok = extract_model_token(v) + if tok and tok.lower() != "nan" and tok not in seen: + seen.add(tok); out.append(tok) + return out - return { - "eos": rec.__dict__, - "repl_4g_text": repl_4g_text, - "repl_5g_text": repl_5g_text, - "repl_4g_model": repl_4g_model or "", - "repl_5g_model": repl_5g_model or "", - "det_4g": det_4g.__dict__ if det_4g else {}, - "det_5g": det_5g.__dict__ if det_5g else {}, - "table_cols": table_cols, - "table_rows": table_rows, - "summary": summary, +def gpt_pick_from_candidates(old_row: pd.Series, candidates: List[str], need: str) -> str: + if client is None or not candidates: + return "" + sys = "Pick the best replacement model. Choose only from candidates. Return strict JSON only." + payload = { + "old_device": { + "sku": str(old_row.get("sku","")), + "manufacturer": str(old_row.get("manufacturer","")), + "description": str(old_row.get("description","")), + "need": need, + }, + "candidates": candidates[:40], + "output_schema": {"choice":"string"} } + out = gpt_json(sys, payload, max_tokens=240) or {} + choice = str(out.get("choice","") or "").strip() + return choice if choice in candidates else "" + +def fallback_5g_from_dec(canon_make: str) -> str: + pool5 = df_dec[(df_dec["_canon_make"] == canon_make) & (df_dec["_is5g"] == True)] + return str(pool5.iloc[0]["Model"]).strip() if not pool5.empty else "" + +def pick_replacements_lifecycle(row: pd.Series, status: str, use_gpt: bool = True) -> Dict[str, Any]: + canon = str(row.get("_canon_make","UNKNOWN")) + manufacturer = str(row.get("manufacturer","") or "") + + sug_raw = safe_str(row.get("suggested_replacement","")) + adv_raw = safe_str(row.get("advanced_5g_option","")) + + has_4g_alt = bool(sug_raw.strip()) + has_5g_alt = bool(adv_raw.strip()) + + # Treat as 4G if the description indicates LTE OR lifecycle provides a 4G suggested replacement + is_4g = device_is_4g(row) or has_4g_alt + + # Provide 5G option if the unit is 4G, EOS/EOL, or lifecycle explicitly provides advanced_5g_option + want_5g = is_4g or (status in {"End of Sale","End of Life"}) or has_5g_alt + + # 4G alternative: show whenever lifecycle provides it (or device appears 4G) + repl_4g = "Not applicable" + if is_4g or has_4g_alt: + repl_4g = extract_model_token(sug_raw) + if not repl_4g: + cand4 = candidate_4g_models_from_lifecycle(manufacturer) + repl_4g = (gpt_pick_from_candidates(row, cand4, "4G alternative") if (use_gpt and client) else "") or (cand4[0] if cand4 else "") + if not repl_4g: + repl_4g = "Not applicable" + + # 5G replacement: prefer lifecycle advanced_5g_option whenever present + repl_5g = "Not listed" + if want_5g: + repl_5g = extract_model_token(adv_raw) + if not repl_5g: + cand5 = candidate_5g_models_from_lifecycle(manufacturer) + repl_5g = (gpt_pick_from_candidates(row, cand5, "5G replacement/upgrade") if (use_gpt and client) else "") or (cand5[0] if cand5 else "") + if not repl_5g: + repl_5g = fallback_5g_from_dec(canon) or "Not listed" + + if repl_5g.lower() == "nan": + repl_5g = "Not listed" + + return {"repl_4g": repl_4g, "repl_5g": repl_5g, "sources": ["lifecycle_csv"] + (["gpt"] if (use_gpt and client) else [])} + + +# ============================ +# Antennas (Parsec-only) +# ============================ +PARSEC_FAMILY_WORDS = {"chinook","labrador","boxer","bloodhound","husky","beagle","mastiff","collie","shepherd","belgian","australian","terrier","pyrenees"} +BAD_NAME_MARKERS = {"customization","standard connectors","connectors","features","benefits","specifications","mechanical","electrical","mounting","accessories","description:","standard sku"} + +def clean_line(s: str) -> str: + s = re.sub(r"\s+", " ", str(s or "").strip()) + if re.fullmatch(r"-[a-z0-9]+", s.lower()): + return "" + return s + +def is_bad_name_line(line: str) -> bool: + low = line.lower() + if any(m in low for m in BAD_NAME_MARKERS): + return True + if re.search(r"\b-[a-z0-9]{1,4}\b", low) and len(low) <= 25: + return True + return False + +def family_from_line(line: str) -> str: + low = line.lower() + for fam in PARSEC_FAMILY_WORDS: + if fam in low: + return fam.capitalize() + return "" +def parsec_connectors_from_card(t: str) -> str: + m = re.search(r"Standard\s+Connectors:\s*(.+)", t, flags=re.IGNORECASE) + if m: + return re.sub(r"\s+", " ", m.group(1).strip())[:80] + return "" -def customer_email_summary(rec: EosRecord, det_4g: Optional[DeviceDetails], det_5g: Optional[DeviceDetails]) -> str: - """Customer-ready email-style summary.""" - - sku = rec.sku - make = rec.manufacturer - - eos = clean_str(rec.end_of_sale) - eol = clean_str(rec.end_of_life) +def parsec_mounts_from_card(t: str) -> List[str]: + mounts = [] + for m in re.finditer(r"Mount:\s*(.+)", t, flags=re.IGNORECASE): + val = re.sub(r"\s+", " ", m.group(1).strip()) + parts = [p.strip().lower() for p in val.split(",") if p.strip()] + mounts.extend(parts) + out = [] + seen = set() + for x in mounts: + if x not in seen: + seen.add(x); out.append(x) + return out - subject = f"Subject: Replacement options for {make} {sku}".strip() +def parsec_name_from_card(card_text: str) -> str: + lines = [clean_line(ln) for ln in str(card_text or "").splitlines()] + lines = [ln for ln in lines if ln] - lines: List[str] = [subject, "", "Hi," , "", "Quick heads-up on lifecycle timing:"] + for ln in lines: + if is_bad_name_line(ln): + continue + fam = family_from_line(ln) + if fam: + return fam + + sku_i = None + for i, ln in enumerate(lines): + if "standard sku" in ln.lower(): + sku_i = i + break + if sku_i is not None: + window = lines[max(0, sku_i - 12):sku_i] + for ln in reversed(window): + if is_bad_name_line(ln): + continue + if 3 <= len(ln) <= 40 and re.search(r"[A-Za-z]", ln): + return ln.split()[0].capitalize() + + return "Parsec antenna" + +def parsec_part_from_card(t: str) -> str: + m = re.search(r"Standard\s+SKU:\s*([A-Z0-9]+)", t) + return m.group(1).strip() if m else "" + +def parsec_desc_from_card(t: str) -> str: + m = re.search(r"Description:\s*(.+?)(?:\n|$)", t, flags=re.IGNORECASE) + return re.sub(r"\s+"," ",m.group(1).strip())[:220] if m else "" + +def parsec_retrieve(query: str, top_k: int = 12) -> List[Dict[str, Any]]: + qv = embedder.encode([query], normalize_embeddings=True) + qv = np.asarray(qv, dtype=np.float32) + scores, ids = parsec_index.search(qv, top_k) + out: List[Dict[str, Any]] = [] + for sc, i in zip(scores[0].tolist(), ids[0].tolist()): + if 0 <= int(i) < len(parsec_cards): + card = parsec_cards[int(i)] + out.append({ + "score": float(sc), + "name": parsec_name_from_card(card), + "part_number": parsec_part_from_card(card), + "description": parsec_desc_from_card(card), + "connectors": parsec_connectors_from_card(card), + "mounts": parsec_mounts_from_card(card), + "_card": card.lower(), + }) + return out - if eos: - lines.append(f"- End of sale: {eos}") - if eol: - lines.append(f"- End of life: {eol}") +def choose_best_parsec(cands: List[Dict[str, Any]], mode: str) -> Dict[str, Any]: + best = None + best_score = -1e9 + + for c in cands: + card = c.get("_card","") + mounts = c.get("mounts", []) or [] + score = float(c.get("score", 0.0)) + + if "omni" in card: + score += 0.6 + if "directional" in card: + score -= 1.5 + + if mode == "vehicle": + if any("magnetic" in m for m in mounts): + score += 3.0 + if any("through" in m for m in mounts): + score += 2.0 + if any("wall" in m for m in mounts) or any("pole" in m for m in mounts): + score -= 1.2 + if "app: fixed" in card and "mobile" not in card: + score -= 2.0 + + if mode == "stationary": + if any("wall" in m for m in mounts): + score += 2.0 + if any("pole" in m for m in mounts): + score += 1.8 + + if score > best_score: + best_score = score + best = c + + if not best: + return {"name":"Parsec antenna","part_number":"","description":"","connectors":"","mounts":[]} + + best = dict(best) + best.pop("_card", None) + return best + + +def infer_mimo_for_5g(repl_5g_model: str) -> str: + """Rule: every 5G router uses a 4x4 antenna.""" + return "4x4" + + # If the model name hints 5G, lean 4x4 + if "5g" in model.lower() or model.upper().startswith(("R", "E", "S", "IX", "RUTM")): + default = "4x4" + else: + default = "2x2" - desc = clean_str(rec.description) - if desc: - lines.append(f"- Current device: {desc}") + # Use dec2025routers.csv if we can match the model under the same maker family + try: + pool = df_dec[df_dec["_canon_make"] == canon_make].copy() + if pool.empty: + return default + hit = process.extractOne(norm_text(model), pool["_norm_model"].tolist(), scorer=fuzz.WRatio) + if not hit or hit[1] < MATCH_OK: + return default + row = pool.iloc[int(hit[2])] + txt2 = (str(row.get("Antennas (internal/external/both)", "")) + " " + str(row.get("Modem Type", "")) + " " + str(row.get("Special notes",""))).lower() + if "4x4" in txt2 or "4 x 4" in txt2 or "4x 4" in txt2: + return "4x4" + if "2x2" in txt2 or "2 x 2" in txt2: + return "2x2" + # If modem type includes 5G, lean 4x4 + if "5g" in txt2 or "nr" in txt2: + return "4x4" + return default + except Exception: + return default - lines.append("") - lines.append("Recommended replacement paths:") +def antenna_options_for(router_model: str, tech: str, mimo: str) -> Dict[str, Any]: + q_stationary = f"{router_model} {tech} {mimo} omni stationary pole wall fixed site Parsec" + q_vehicle = f"{router_model} {tech} {mimo} omni vehicle mobile magnetic through-bolt Parsec" - def _device_block(label: str, det: Optional[DeviceDetails], fallback_text: str) -> None: - if det is None: - if fallback_text: - lines.append(f"- {label}: {fallback_text}") - else: - lines.append(f"- {label}: (no suggestion on file)") - return + cand_stationary = parsec_retrieve(q_stationary, top_k=12) + cand_vehicle = parsec_retrieve(q_vehicle, top_k=12) - ports = det.ethernet_ports_total - ports_txt = f"{ports} total" if ports is not None else "Unknown" - bat = parse_yes_no(det.battery) + s = choose_best_parsec(cand_stationary, mode="stationary") + v = choose_best_parsec(cand_vehicle, mode="vehicle") - badges = fit_badges(det) - badges_txt = ", ".join(badges) if badges else "" + s.update({"mimo": mimo, "why": "Stationary omni best match."}) + v.update({"mimo": mimo, "why": "Vehicle omni best match."}) - line = f"- {label}: {det.make} {det.model} ({det.modem_type})" - if badges_txt: - line += f" | {badges_txt}" - lines.append(line) - lines.append(f" - Ethernet ports: {ports_txt}") - lines.append(f" - Battery: {bat}") - if clean_str(det.link): - lines.append(f" - Info link: {det.link}") + return {"stationary_omni": s, "vehicle_omni": v, "sources":["parsec_rag"]} - _device_block("4G option", det_4g, rec.suggested_replacement) - _device_block("5G option", det_5g, rec.advanced_5g_option) - lines.append("") - lines.append("If you tell me vehicle vs fixed site (and indoor vs outdoor), I can sanity-check antennas too.") - lines.append("") - lines.append(CONTACT_LINE) +# ============================ +# Install-ready checklist +# ============================ +def install_ready_checklist(current_sku: str, repl: Dict[str,Any], ant: Dict[str,Any]) -> str: + st = ant.get("stationary_omni", {}) + vh = ant.get("vehicle_omni", {}) + if client is not None: + sys = "Create a short, install-ready checklist for a Verizon rep. Return markdown only." + payload = {"current_device": current_sku, "replacements": repl, "antennas": {"stationary": st, "vehicle": vh}} + resp = client.responses.create( + model=OPENAI_MODEL, + reasoning=OPENAI_REASONING, + input=[{"role":"system","content":sys},{"role":"user","content":json.dumps(payload)}], + max_output_tokens=520, + ) + return (getattr(resp, "output_text", "") or "").strip() + return "\n".join([ + "### Install-ready checklist", + f"- Current device: {current_sku}", + f"- 5G replacement: {repl.get('repl_5g','')}", + f"- 4G alternative: {repl.get('repl_4g','Not applicable')}", + f"- Stationary omni antenna: {st.get('name','')} (PN {st.get('part_number','')})", + f"- Vehicle omni antenna: {vh.get('name','')} (PN {vh.get('part_number','')})", + "- Next steps: confirm mounting + cable lengths + power; place order; schedule install.", + ]) + + +# ============================ +# Batch mode (NO GPT) +# ============================ +def parse_batch_inputs(text_blob: str, file_obj: Any) -> List[str]: + items: List[str] = [] + if file_obj is not None: + try: + path = file_obj.name if hasattr(file_obj, "name") else str(file_obj) + df = pd.read_csv(path) + col = df.columns[0] + items.extend([str(x).strip() for x in df[col].tolist() if str(x).strip()]) + except Exception: + pass + if text_blob: + for ln in str(text_blob).splitlines(): + ln = ln.strip() + if ln: + items.append(ln) + seen=set() + out=[] + for x in items: + k=norm_text(x) + if k and k not in seen: + seen.add(k); out.append(x) + return out - return "\n".join(lines).strip() +def run_batch(text_blob: str, file_obj: Any, include_antennas: bool): + inputs = parse_batch_inputs(text_blob, file_obj) + if not inputs: + return "", None, None, "" + rows=[] + for item in inputs: + res = resolve_device(item) + if res.get("mode") != "ok": + rows.append({"Input": item, "Matched":"", "Status":"Needs review", "EOS":"", "EOL":"", "4G alternative":"", "5G replacement":"", "Notes":"Not found/ambiguous"}) + continue -# ----------------------------- -# Chat state + handlers -# ----------------------------- + life_row = df_eos.iloc[int(res["row_idx"])] + eos, eol, status = row_to_dates_and_status(life_row) + repl = pick_replacements_lifecycle(life_row, status, use_gpt=False) + + rows.append({ + "Input": item, + "Matched": str(life_row.get("sku","")), + "Status": status, + "EOS": eos, + "EOL": eol, + "4G alternative": repl.get("repl_4g",""), + "5G replacement": repl.get("repl_5g",""), + "Notes": "", + }) + + out_df = pd.DataFrame(rows) + counts = out_df["Status"].value_counts(dropna=False).to_dict() + top_5g = out_df["5G replacement"].value_counts(dropna=False).head(5).to_dict() + summary = f"Rows: {len(out_df)} | " + " | ".join([f"{k}: {v}" for k,v in counts.items()]) + rollup = "Top 5G recommendations:\n" + "\n".join([f"- {k}: {v}" for k,v in top_5g.items() if str(k).strip()]) + + tmp = tempfile.NamedTemporaryFile(delete=False, suffix=".csv") + out_df.to_csv(tmp.name, index=False) + + return summary, out_df, tmp.name, rollup + + +# ============================ +# Replacement feature table + manufacturer link (5G device) +# ============================ + +FEATURE_COLS = ["Device", "Modem technology", "WiFi", "Ports", "Antennas", "Ruggedness", "Use case"] + +# Manufacturer domains used for best-effort link resolution (no non-maker domains). +MAKER_DOMAINS = { + "CRADLEPOINT": ["cradlepoint.com", "ericsson.com"], + "SIERRA": ["semtech.com", "airlink.com"], + "FEENEY": ["inseego.com"], + "DIGI": ["digi.com"], + "CISCO_MERAKI": ["meraki.cisco.com", "cisco.com"], + "CISCO": ["cisco.com"], + "TELTONIKA": ["teltonika-networks.com"], + "UNKNOWN": [], +} + +HTTP_HEADERS = { + "User-Agent": "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 " + "(KHTML, like Gecko) Chrome/120.0 Safari/537.36" +} +HTTP_TIMEOUT = 12 + +def _best_effort_manufacturer_url(model: str, canon_make: str) -> str: + """Try to find a manufacturer page or datasheet link using simple on-domain searches. + If we can't confirm a page, return the manufacturer homepage for the maker family. + """ + model = str(model or "").strip() + if not model or model in {"Not listed", "Not applicable"}: + return "" + domains = MAKER_DOMAINS.get(canon_make, []) or [] + if not domains: + return "" -def state_default() -> Dict[str, Any]: - return { - "pending_choices": [], # list[str] - "last_reco": {}, - } + # Candidate on-domain search URLs (common patterns across sites). + # We keep these on the manufacturer domain (no Google/Bing). + q = re.sub(r"\s+", "+", model) + url_candidates = [] + for d in domains: + url_candidates += [ + f"https://{d}/search?q={q}", + f"https://{d}/search?query={q}", + f"https://{d}/?s={q}", + f"https://www.{d}/search?q={q}", + f"https://www.{d}/search?query={q}", + f"https://www.{d}/?s={q}", + ] + # Also try a few direct product patterns for known makers (best effort). + if canon_make == "TELTONIKA": + slug = model.lower() + url_candidates += [ + f"https://teltonika-networks.com/products/routers/{slug}", + f"https://teltonika-networks.com/product/{slug}", + "https://teltonika-networks.com/products/routers/", + ] + if canon_make == "DIGI": + url_candidates += [ + "https://www.digi.com/products/networking/cellular-routers", + f"https://www.digi.com/search?q={q}", + ] + if canon_make == "CRADLEPOINT": + url_candidates += [ + "https://cradlepoint.com/products/", + f"https://cradlepoint.com/?s={q}", + ] + if canon_make in {"CISCO", "CISCO_MERAKI"}: + url_candidates += [ + f"https://www.cisco.com/c/en/us/search.html?q={q}", + ] -def state_to_json(st: Dict[str, Any]) -> str: - return json.dumps(st, ensure_ascii=False) + # Try to confirm a working page (HTTP 200 and model string somewhere in HTML). + for u in url_candidates[:18]: + try: + import requests + r = requests.get(u, headers=HTTP_HEADERS, timeout=HTTP_TIMEOUT, allow_redirects=True) + if r.status_code != 200: + continue + html = (r.text or "").lower() + if model.lower() in html or "datasheet" in html or "data sheet" in html: + return r.url + except Exception: + continue + # Fallback: maker homepage + d0 = domains[0] + return f"https://{d0}" -def state_from_json(st_json: str) -> Dict[str, Any]: +def _fetch_page_text(url: str, max_chars: int = 12000) -> str: + """Fetch page HTML and return a simplified text blob for GPT (best effort).""" + if not url: + return "" try: - obj = json.loads(st_json or "{}") - if isinstance(obj, dict): - return obj - return state_default() + import requests + r = requests.get(url, headers=HTTP_HEADERS, timeout=HTTP_TIMEOUT, allow_redirects=True) + if r.status_code != 200: + return "" + html = r.text or "" + html = re.sub(r"(?is).*?", " ", html) + html = re.sub(r"(?is).*?", " ", html) + text = re.sub(r"(?is)<[^>]+>", " ", html) + text = re.sub(r"\s+", " ", text).strip() + return text[:max_chars] except Exception: - return state_default() - - -def format_match_choices(ranked: List[Tuple[str, int]]) -> str: - if not ranked: return "" - top = ranked[:6] - lines = ["I found a few close matches. Reply with the number:"] - for i, (sku, score) in enumerate(top, 1): - lines.append(f"{i}) {sku} ({score})") - return "\n".join(lines) -def handle_user_message( - message: str, - chat_history: List[List[str]], - st_json: str, -) -> Tuple[List[List[str]], str, List[List[Any]], str, str]: - """Main chat handler. +def _features_from_dec(model: str, canon_make: str) -> Dict[str, str]: + """Lookup a router model in dec2025routers.csv and return the key feature fields.""" + if not model or model in {"Not listed", "Not applicable"}: + return {k: "Not listed" for k in FEATURE_COLS[1:]} - Returns: - - chat_history - - new_state_json - - table_rows - - summary_text - - links_md - """ + pool = df_dec[df_dec["_canon_make"] == canon_make].copy() + if pool.empty: + return {k: "Not listed" for k in FEATURE_COLS[1:]} + + hit = process.extractOne(norm_text(model), pool["_norm_model"].tolist(), scorer=fuzz.WRatio) + if not hit or hit[1] < MATCH_OK: + return {k: "Not listed" for k in FEATURE_COLS[1:]} - st = state_from_json(st_json) - msg = clean_str(message) + r = pool.iloc[int(hit[2])] + ports = f"WAN: {r.get('WAN ports and speed','')} | LAN: {r.get('LAN ports and speed','')}" + return { + "Modem technology": str(r.get("Modem Type","")) or "Not listed", + "WiFi": str(r.get("WiFi type","")) or "Not listed", + "Ports": ports.strip() if ports.strip() else "Not listed", + "Antennas": str(r.get("Antennas (internal/external/both)","")) or "Not listed", + "Ruggedness": str(r.get("Ruggedization","")) or "Not listed", + "Use case": str(r.get("Primary use case","")) or "Not listed", + } - if not msg: - return chat_history, state_to_json(st), [], "", "" +def _gpt_fill_feature_row(device_label: str, model: str, canon_make: str, row: Dict[str, str], manufacturer_url: str = "", page_text: str = "") -> Dict[str, str]: + """If dec can't supply values, ask GPT to fill missing ones (best guess).""" + if client is None: + return row - # User wants a fresh case. - if msg.lower().strip() in {"new", "new case", "reset", "clear"}: - st = state_default() - chat_history = chat_history + [[msg, "Got it. Send a router SKU to start."]] - return chat_history, state_to_json(st), [], "", "" + missing = [k for k,v in row.items() if (not v) or str(v).strip().lower() in {"not listed","nan",""}] + if not missing: + return row - # Step 1: waiting for disambiguation selection - pending = st.get("pending_choices", []) or [] - if pending: - chosen_sku = "" - m = re.match(r"^(\d+)\s*$", msg) + sys = ( + "Fill missing router feature fields for a Verizon rep. Return strict JSON only. " + "Use manufacturer page text when available. If still unknown, make a best-guess." + ) + payload = { + "device_label": device_label, + "model": model, + "maker_family": canon_make, + "manufacturer_url": manufacturer_url, + "manufacturer_page_text": page_text[:8000], + "known": row, + "fill_only": missing, + "rules": ["Fill only requested fields.", "Short phrases only.", "Return JSON only."], + "output_schema": {k: "string" for k in missing}, + } + out = gpt_json(sys, payload, max_tokens=320) or {} + for k in missing: + val = str(out.get(k, "") or "").strip() + if val: + row[k] = val + return row + missing = [k for k,v in row.items() if (not v) or str(v).strip().lower() in {"not listed","nan",""}] + if not missing: + return row + + sys = "Fill missing router feature fields for a Verizon rep. Return strict JSON only." + payload = { + "device_label": device_label, + "model": model, + "maker_family": canon_make, + "known": row, + "fill_only": missing, + "rules": [ + "Fill only the requested fields.", + "Best guess if needed. Short phrases only.", + "Return JSON only." + ], + "output_schema": {k: "string" for k in missing} + } + out = gpt_json(sys, payload, max_tokens=260) or {} + for k in missing: + val = str(out.get(k, "") or "").strip() + if val: + row[k] = val + return row + +def build_replacement_features_table(repl_4g: str, repl_5g: str, canon_make: str) -> pd.DataFrame: + rows = [] + + # 4G alternative row + row4 = _features_from_dec(repl_4g, canon_make) + url4 = _best_effort_manufacturer_url(repl_4g, canon_make) if repl_4g else "" + txt4 = _fetch_page_text(url4) if url4 else "" + row4 = _gpt_fill_feature_row("4G alternative", repl_4g, canon_make, row4, manufacturer_url=url4, page_text=txt4) + rows.append({"Device": "4G alternative", **row4}) + + # 5G replacement row + row5 = _features_from_dec(repl_5g, canon_make) + url5 = _best_effort_manufacturer_url(repl_5g, canon_make) if repl_5g else "" + txt5 = _fetch_page_text(url5) if url5 else "" + row5 = _gpt_fill_feature_row("5G replacement", repl_5g, canon_make, row5, manufacturer_url=url5, page_text=txt5) + rows.append({"Device": "5G replacement", **row5}) + + df = pd.DataFrame(rows, columns=FEATURE_COLS) + return df +# ============================ +# Verizon fit badges (small table) for recommended devices +# ============================ + +FIT_COLS = ["Device", "Fit badges", "Ethernet ports", "Battery"] + +def _parse_ethernet_ports(wan_field: str, lan_field: str) -> str: + """Best-effort total ethernet ports based on WAN/LAN text.""" + def _count(field: str) -> int: + s = str(field or "") + # Common forms: "1x GbE", "2 x 10/100", "WAN: 1", etc. + nums = [int(x) for x in re.findall(r"(\\d+)\\s*x", s.lower())] + if nums: + return sum(nums) + # Fallback: if it contains 'port' with a number + m = re.search(r"(\\d+)\\s*port", s.lower()) if m: - idx = int(m.group(1)) - 1 - if 0 <= idx < len(pending): - chosen_sku = pending[idx] - else: - # User typed an SKU directly. - chosen_sku = msg + return int(m.group(1)) + # If it contains '1' and 'wan' in short text, guess 1 + if "wan" in s.lower() and re.search(r"\\b1\\b", s): + return 1 + return 0 - rec = eos_index().get(norm_sku(chosen_sku)) - st["pending_choices"] = [] + total = _count(wan_field) + _count(lan_field) + return str(total) if total > 0 else "Not listed" - if not rec: - chat_history = chat_history + [[msg, "I couldn't map that choice to a device. Try the SKU again."]] - return chat_history, state_to_json(st), [], "", "" +def _battery_badge(battery_field: str) -> str: + s = str(battery_field or "").strip().lower() + if not s or s in {"none", "no", "n/a", "not listed"}: + return "No" + return "Yes" + +def _bool_badge(flag: bool) -> str: + return "Yes" if flag else "No" + +def _dual_sim_from_row_text(*fields: str) -> bool: + txt = " ".join([str(x or "") for x in fields]).lower() + return ("dual sim" in txt) or ("2 sim" in txt) or ("two sim" in txt) or ("dual-sim" in txt) + +def _throughput_high(throughput_field: str) -> bool: + t = str(throughput_field or "").lower() + # Heuristic: anything mentioning gbps or >=1000 mbps + if "gbps" in t: + return True + m = re.search(r"(\\d+(?:\\.\\d+)?)\\s*mbps", t) + if m: + try: + return float(m.group(1)) >= 1000.0 + except Exception: + pass + return False + +def _gpt_fit_badges(model: str, canon_make: str, is_5g: bool, dec_row: Optional[pd.Series]) -> Tuple[str, str, str]: + """ + GPT-based fill for Fit badges / Ethernet ports / Battery, used when dec is missing or incomplete. + Returns (badges_csv, ethernet_ports, battery_yesno). + """ + if client is None: + return ("Not listed", "Not listed", "Not listed") + + dec_ctx = {} + if dec_row is not None: + try: + dec_ctx = { + "Model": str(dec_row.get("Model","")), + "Modem Type": str(dec_row.get("Modem Type","")), + "Ruggedization": str(dec_row.get("Ruggedization","")), + "WAN ports and speed": str(dec_row.get("WAN ports and speed","")), + "LAN ports and speed": str(dec_row.get("LAN ports and speed","")), + "Antennas": str(dec_row.get("Antennas (internal/external/both)","")), + "WiFi type": str(dec_row.get("WiFi type","")), + "Primary use case": str(dec_row.get("Primary use case","")), + "Serial port": str(dec_row.get("Serial port (yes/no)","")), + "VPN": str(dec_row.get("VPN capabilities","")), + "Throughput": str(dec_row.get("Router throughput","")), + "Battery": str(dec_row.get("Battery (internal/removable/none/optional)","")), + "Special notes": str(dec_row.get("Special notes","")), + "Summary": str(dec_row.get("summary and use case","")), + } + except Exception: + dec_ctx = {} + + sys = ( + "You are helping a Verizon rep. Based on the provided router context, output fit badges and a couple quick traits.\n" + "Return STRICT JSON only.\n" + "Badges must be chosen from this set only:\n" + "['Vehicle','Fixed site','Wi‑Fi','Rugged','Dual‑SIM','4x4 MIMO','High throughput','Serial'].\n" + "Rules:\n" + "- If is_5g is true, ALWAYS include '4x4 MIMO'.\n" + "- Ethernet ports: return a single integer as a string if you can infer total ethernet ports, otherwise 'Not listed'.\n" + "- Battery: return 'Yes' or 'No' if you can infer, otherwise 'Not listed'.\n" + "- If uncertain between Vehicle vs Fixed site, pick the most likely based on use case/ruggedization.\n" + ) - reco = build_recommendation(rec) - st["last_reco"] = reco + payload = { + "model": model, + "maker_family": canon_make, + "is_5g": bool(is_5g), + "dec_context": dec_ctx, + "output_schema": { + "badges": ["string"], + "ethernet_ports": "string", + "battery": "Yes|No|Not listed" + } + } - assistant_msg = render_reco_message(rec, reco) - chat_history = chat_history + [[msg, assistant_msg]] + out = gpt_json(sys, payload, max_tokens=260) or {} + + badges = out.get("badges", []) or [] + allowed = {"Vehicle","Fixed site","Wi‑Fi","Rugged","Dual‑SIM","4x4 MIMO","High throughput","Serial"} + clean = [] + for b in badges: + bs = str(b).strip() + if bs in allowed: + clean.append(bs) + + if is_5g and "4x4 MIMO" not in clean: + clean.append("4x4 MIMO") + + eth = str(out.get("ethernet_ports","") or "").strip() + if not eth or eth.lower() in {"nan","none"}: + eth = "Not listed" + m = re.search(r"\d+", eth) + eth = m.group(0) if m else ("Not listed" if eth == "Not listed" else eth) + + bat = str(out.get("battery","") or "").strip() + if not bat: + bat = "Not listed" + if bat.lower().startswith("y"): + bat = "Yes" + elif bat.lower().startswith("n"): + bat = "No" + elif bat not in {"Yes","No","Not listed"}: + bat = "Not listed" + + dedup=[] + seen=set() + for b in clean: + if b not in seen: + seen.add(b); dedup.append(b) + badges_csv = ", ".join(dedup) if dedup else "Not listed" + return (badges_csv, eth, bat) + + +def _fit_badges_for_model(model: str, canon_make: str, is_5g: bool) -> Tuple[str, str, str]: + """Return (badges_csv, ethernet_ports, battery_yesno). Uses dec2025routers.csv first, then GPT fill.""" + model = str(model or "").strip() + if not model or model in {"Not listed", "Not applicable"}: + return ("Not listed", "Not listed", "Not listed") + + pool = df_dec[df_dec["_canon_make"] == canon_make].copy() + row = None + if not pool.empty: + hit = process.extractOne(norm_text(model), pool["_norm_model"].tolist(), scorer=fuzz.WRatio) + if hit and hit[1] >= MATCH_OK: + row = pool.iloc[int(hit[2])] + + badges = [] + eth = "Not listed" + bat_yes = "Not listed" + + if row is not None: + use_case = str(row.get("Primary use case","") or "").lower() + rugged = str(row.get("Ruggedization","") or "").lower() + + if any(k in use_case for k in ["vehicle","mobile","fleet","in-vehicle"]) or "vehicle" in rugged: + badges.append("Vehicle") + else: + badges.append("Fixed site") - return ( - chat_history, - state_to_json(st), - reco["table_rows"], - reco["summary"], - render_links_md(reco), - ) + wifi = str(row.get("WiFi type","") or "").strip() + if wifi and wifi.lower() not in {"none","no","n/a"}: + badges.append("Wi‑Fi") - # Step 2: detect SKU lookup - if looks_like_sku(msg) or len(msg) <= 40: - rec, ranked = find_eos_record(msg) - - if rec is None: - if ranked: - # Ask user to pick - st["pending_choices"] = [r[0] for r in ranked[:6]] - assistant_msg = format_match_choices(ranked) - chat_history = chat_history + [[msg, assistant_msg]] - return chat_history, state_to_json(st), [], "", "" - - chat_history = chat_history + [[msg, "No match found in the lifecycle table."]] - return chat_history, state_to_json(st), [], "", "" - - reco = build_recommendation(rec) - st["last_reco"] = reco - - assistant_msg = render_reco_message(rec, reco) - chat_history = chat_history + [[msg, assistant_msg]] - - return ( - chat_history, - state_to_json(st), - reco["table_rows"], - reco["summary"], - render_links_md(reco), - ) + if any(k in rugged for k in ["rugged","industrial","ip","harsh"]): + badges.append("Rugged") - # Step 3: treat as follow-up question - last = st.get("last_reco") or {} - if last: - answer = answer_question_with_context(msg, last) - if not answer: - answer = "I couldn't reach the model right now. Try again in a moment." - chat_history = chat_history + [[msg, answer]] - return chat_history, state_to_json(st), last.get("table_rows", []), last.get("summary", ""), render_links_md(last) + notes_blob = " ".join([ + str(row.get("Special notes","") or ""), + str(row.get("summary and use case","") or ""), + ]).lower() + if "dual" in notes_blob and "sim" in notes_blob: + badges.append("Dual‑SIM") - chat_history = chat_history + [[msg, "Send a router SKU (example: IBR900) to get started."]] - return chat_history, state_to_json(st), [], "", "" + if is_5g: + badges.append("4x4 MIMO") + thr = str(row.get("Router throughput","") or "").lower() + m = re.search(r"(\d+(\.\d+)?)\s*gb", thr) + if m: + try: + if float(m.group(1)) >= 1.0: + badges.append("High throughput") + except Exception: + pass + + serial = str(row.get("Serial port (yes/no)","") or "").strip().lower() + if serial in {"yes","y","true"}: + badges.append("Serial") + + wan = str(row.get("WAN ports and speed","") or "") + lan = str(row.get("LAN ports and speed","") or "") + m1 = re.search(r"(\d+)\s*x", wan.lower()) + m2 = re.search(r"(\d+)\s*x", lan.lower()) + if m1 or m2: + total = (int(m1.group(1)) if m1 else 0) + (int(m2.group(1)) if m2 else 0) + eth = str(total) if total > 0 else "Not listed" + + bat = str(row.get("Battery (internal/removable/none/optional)","") or "") + bat_l = bat.lower().strip() + if bat_l: + if "none" in bat_l: + bat_yes = "No" + else: + bat_yes = "Yes" + + # Use GPT when anything is missing (instead of best-effort inference) + if (row is None) or (eth == "Not listed") or (bat_yes == "Not listed") or (not badges): + g_badges, g_eth, g_bat = _gpt_fit_badges(model, canon_make, is_5g, row) + + if badges: + if is_5g and "4x4 MIMO" not in badges: + badges.append("4x4 MIMO") + dedup=[] + seen=set() + for b in badges: + if b not in seen: + seen.add(b); dedup.append(b) + badges_csv = ", ".join(dedup) + else: + badges_csv = g_badges + + eth = eth if eth != "Not listed" else g_eth + bat_yes = bat_yes if bat_yes != "Not listed" else g_bat + return (badges_csv or "Not listed", eth or "Not listed", bat_yes or "Not listed") + + dedup=[] + seen=set() + for b in badges: + if b not in seen: + seen.add(b); dedup.append(b) + badges_csv = ", ".join(dedup) if dedup else "Not listed" + return (badges_csv, eth, bat_yes) + +def build_fit_table(repl_4g: str, repl_5g: str, canon_make: str) -> pd.DataFrame: + rows = [] + # 4G alt row (is_5g False) + b4, eth4, bat4 = _fit_badges_for_model(repl_4g, canon_make, is_5g=False) + rows.append({"Device": "4G alternative", "Fit badges": b4, "Ethernet ports": eth4, "Battery": bat4}) + # 5G row (is_5g True) + b5, eth5, bat5 = _fit_badges_for_model(repl_5g, canon_make, is_5g=True) + rows.append({"Device": "5G replacement", "Fit badges": b5, "Ethernet ports": eth5, "Battery": bat5}) + return pd.DataFrame(rows, columns=FIT_COLS) + +# ============================ +# Output +# ============================ +def assemble_output(life_row: pd.Series, status: str, eos: str, eol: str, repl: Dict[str,Any], ant: Dict[str,Any]) -> str: + current_name = f"{life_row.get('sku','')} — {life_row.get('description','')}".strip(" —") + st = ant.get("stationary_omni", {}) + vh = ant.get("vehicle_omni", {}) + + lines = [] + lines.append(f"1. Current device: **{current_name}**") + lines.append(f"2. Status: **{status}**") + lines.append(f"3. End of Sale date: **{eos}**") + lines.append(f"4. End of Life date: **{eol}**") + lines.append(f"5. 4G alternative (lifecycle): **{repl.get('repl_4g','Not applicable')}**") + lines.append(f"6. 5G replacement (lifecycle): **{repl.get('repl_5g','Not listed')}**") + lines.append("7. Antenna options (Parsec-only):") + conn_s = f" | Conn: {st.get('connectors','')}" if st.get("connectors") else "" + conn_v = f" | Conn: {vh.get('connectors','')}" if vh.get("connectors") else "" + lines.append(f" - Stationary (Omni): **{st.get('name','')}** (Part #: {st.get('part_number','')}) — {st.get('description','')} — MIMO: {st.get('mimo','')}{conn_s}") + lines.append(f" - Vehicle (Omni): **{vh.get('name','')}** (Part #: {vh.get('part_number','')}) — {vh.get('description','')} — MIMO: {vh.get('mimo','')}{conn_v}") + + lines.append("\nSources (debug):") + for s in repl.get("sources", []) if isinstance(repl.get("sources"), list) else []: + lines.append(f"- {s}") + lines.append("- ParsecCatalog.pdf (local RAG)") + lines.append("- routers_eos_eol_by_sku.csv (replacements)") + return "\n".join(lines) -def render_reco_message(rec: EosRecord, reco: Dict[str, Any]) -> str: - sku = rec.sku - make = rec.manufacturer - lines: List[str] = [] - lines.append(f"**{make} {sku}**") +# ============================ +# Customer-ready email summary (single lookup only) +# ============================ - if clean_str(rec.end_of_sale) or clean_str(rec.end_of_life): - lines.append("Lifecycle:") - if clean_str(rec.end_of_sale): - lines.append(f"- End of sale: {rec.end_of_sale}") - if clean_str(rec.end_of_life): - lines.append(f"- End of life: {rec.end_of_life}") +def build_customer_email(life_row: pd.Series, status: str, eos: str, eol: str, repl: Dict[str,Any], ant: Dict[str,Any], link5: str) -> str: + """Email-style summary the rep can paste to a customer (lightly sales-y).""" + current = f"{life_row.get('sku','')} — {life_row.get('description','')}".strip(" —") + repl5 = str(repl.get("repl_5g","") or "").strip() + repl4 = str(repl.get("repl_4g","") or "").strip() - if clean_str(rec.description): - lines.append(f"Description: {rec.description}") + st = ant.get("stationary_omni", {}) or {} + vh = ant.get("vehicle_omni", {}) or {} + lines = [] + lines.append("Subject: Router replacement recommendation") lines.append("") - lines.append("Suggested replacements:") - - r4 = clean_str(reco.get("repl_4g_text", "")) - r5 = clean_str(reco.get("repl_5g_text", "")) - if r4: - lines.append(f"- 4G option: {r4}") - else: - lines.append("- 4G option: (none on file)") - - if r5: - lines.append(f"- 5G option: {r5}") - else: - lines.append("- 5G option: (none on file)") - + lines.append("Hi there,") lines.append("") - lines.append("I dropped a feature table and a customer-ready email draft below.") - lines.append("Got questions about the suggested devices? Ask here.") - - return "\n".join(lines).strip() - - -def render_links_md(reco: Dict[str, Any]) -> str: - """Nice clickable links under the table.""" - det5 = reco.get("det_5g") or {} - det4 = reco.get("det_4g") or {} - - links: List[str] = [] - - l5 = clean_str(det5.get("link")) - if l5: - links.append(f"**5G info link:** {l5}") - - l4 = clean_str(det4.get("link")) - if l4: - links.append(f"**4G info link:** {l4}") - - return "\n\n".join(links) - - -def answer_question_with_context(question: str, reco: Dict[str, Any]) -> str: - """Answer user follow-up using device details as grounding.""" + lines.append(f"We reviewed your current router (**{current}**) and recommend the following path forward:") + lines.append("") + lines.append(f"- **Status:** {status}") + lines.append(f"- **End of Sale:** {eos}") + lines.append(f"- **End of Life:** {eol}") + lines.append("") + lines.append("**Recommended replacement (5G):**") + lines.append(f"- {repl5 if repl5 else 'Not listed'}") + if link5: + lines.append(f"- Manufacturer page (best effort): {link5}") + lines.append("") + lines.append("**Optional 4G alternative (if needed):**") + lines.append(f"- {repl4 if repl4 and repl4.lower() != 'not applicable' else 'Not applicable'}") + lines.append("") + lines.append("**Antenna suggestions (Parsec):**") + lines.append(f"- Stationary (Omni): {st.get('name','')} (PN {st.get('part_number','')})") + lines.append(f"- Vehicle (Omni): {vh.get('name','')} (PN {vh.get('part_number','')})") + lines.append("") + lines.append("If you’d like, we can confirm the best-fit option for your install environment and provide pricing.") + lines.append("") + lines.append("Contact Peter Dunn @ 786.999.9127 or peter.dunn@masterstelecom.com for pricing.") + lines.append("") + lines.append("Thanks,") + lines.append("Peter Dunn") + return "\n".join(lines) - client = get_openai_client() - if client is None: +def generate_customer_email(st_json: str) -> str: + st = state_load(st_json) + if not st or "row_idx" not in st: + return "Run a lookup first." + try: + life_row = df_eos.iloc[int(st["row_idx"])] + except Exception: + return "Run a lookup first." + + eos, eol, status = row_to_dates_and_status(life_row) + repl = st.get("repl", {}) or {} + ant = st.get("ant", {}) or {} + + canon_make = str(life_row.get("_canon_make","UNKNOWN")) + url5 = _best_effort_manufacturer_url(str(repl.get("repl_5g","") or ""), canon_make) + return build_customer_email(life_row, status, eos, eol, repl, ant, url5) + +# ============================ +# Gradio callbacks +# IMPORTANT: no dict state and ALL events have api_name=False (prevents api_info schema generation) +# ============================ +def run_lookup(user_text: str, st_json: str): + user_text = str(user_text or "").strip() + if not user_text: + return "Enter a router SKU/model.", "", None, None, "", gr.update(visible=False), gr.update(visible=False), "{}", "", "" + + res = resolve_device(user_text) + + if res.get("mode") == "pick": + opts = res.get("options", []) + choices = [o["label"] for o in opts] + st2 = {"mode":"pick","options": opts, "raw": user_text} + return "Did you mean A or B? Pick one, then click Use selection.", "", None, None, "", gr.update(choices=choices, value=None, visible=True), gr.update(visible=True), state_dump(st2), "", "" + + if res.get("mode") != "ok": + return "Not found.", "", None, None, "", gr.update(visible=False), gr.update(visible=False), "{}", "", "" + + life_row = df_eos.iloc[int(res["row_idx"])] + eos, eol, status = row_to_dates_and_status(life_row) + + repl = pick_replacements_lifecycle(life_row, status, use_gpt=True) + canon_make = str(life_row.get("_canon_make","UNKNOWN")) + mimo = infer_mimo_for_5g(repl.get("repl_5g","")) + tech = "5G" if repl.get("repl_5g") and repl.get("repl_5g") != "Not listed" else ("4G" if device_is_4g(life_row) else "Unknown") + ant = antenna_options_for(repl.get("repl_5g") or str(life_row.get("sku","")), tech, mimo) + + output = assemble_output(life_row, status, eos, eol, repl, ant) + st_out = {"row_idx": int(res["row_idx"]), "repl": repl, "ant": ant, "raw": user_text} + url5 = _best_effort_manufacturer_url(repl.get('repl_5g',''), canon_make) + link = f"**5G manufacturer page (best effort):** {url5}" if url5 else "" + feat_df = build_replacement_features_table(repl.get('repl_4g',''), repl.get('repl_5g',''), canon_make) + fit = build_fit_table(repl.get('repl_4g',''), repl.get('repl_5g',''), canon_make) + return output, link, feat_df, fit, "", gr.update(visible=False), gr.update(visible=False), state_dump(st_out), "", "" + +def use_selection(selected_label: str, st_json: str): + st = state_load(st_json) + if not st or st.get("mode") != "pick": + return "Run a search first.", "", None, None, "", gr.update(visible=False), gr.update(visible=False), "{}", "", "" + + if not selected_label: + return "Pick A or B first.", "", None, None, "", gr.update(visible=True), gr.update(visible=True), st_json, "", "" + + chosen_row = None + for o in st.get("options", []): + if o.get("label") == selected_label: + chosen_row = int(o["row_idx"]) + break + if chosen_row is None: + return "Pick a valid option.", "", None, None, "", gr.update(visible=True), gr.update(visible=True), st_json, "", "" + + life_row = df_eos.iloc[int(chosen_row)] + eos, eol, status = row_to_dates_and_status(life_row) + + repl = pick_replacements_lifecycle(life_row, status, use_gpt=True) + canon_make = str(life_row.get("_canon_make","UNKNOWN")) + mimo = infer_mimo_for_5g(repl.get("repl_5g","")) + tech = "5G" if repl.get("repl_5g") and repl.get("repl_5g") != "Not listed" else ("4G" if device_is_4g(life_row) else "Unknown") + ant = antenna_options_for(repl.get("repl_5g") or str(life_row.get("sku","")), tech, mimo) + + output = assemble_output(life_row, status, eos, eol, repl, ant) + st_out = {"row_idx": int(chosen_row), "repl": repl, "ant": ant, "raw": st.get("raw","")} + url5 = _best_effort_manufacturer_url(repl.get('repl_5g',''), canon_make) + link = f"**5G manufacturer page (best effort):** {url5}" if url5 else "" + feat_df = build_replacement_features_table(repl.get('repl_4g',''), repl.get('repl_5g',''), canon_make) + fit = build_fit_table(repl.get('repl_4g',''), repl.get('repl_5g',''), canon_make) + return output, link, feat_df, fit, "", gr.update(visible=False), gr.update(visible=False), state_dump(st_out), "", "" + +def make_install_ready(st_json: str): + st = state_load(st_json) + if not st or "row_idx" not in st: + return "Run a lookup first." + life_row = df_eos.iloc[int(st["row_idx"])] + current_sku = str(life_row.get("sku","") or "") + return install_ready_checklist(current_sku, st.get("repl", {}) or {}, st.get("ant", {}) or {}) + + + +# ============================ +# Q&A about the suggested device (post-recommendation) +# ============================ +def answer_question(question: str, st_json: str) -> str: + q = str(question or "").strip() + if not q: return "" + st = state_load(st_json) + if not st or "repl" not in st: + return "Run a lookup first, then ask your question." + + repl = st.get("repl", {}) or {} + ant = st.get("ant", {}) or {} + repl5 = str(repl.get("repl_5g","") or "").strip() + repl4 = str(repl.get("repl_4g","") or "").strip() + # Pull a bit of dec context for the 5G model (if possible) + canon_make = "" + try: + # Try to infer maker family from stored row_idx + if "row_idx" in st: + row = df_eos.iloc[int(st["row_idx"])] + canon_make = str(row.get("_canon_make","UNKNOWN")) + except Exception: + canon_make = "" - # Keep context tight so we stay fast. - ctx = { - "router": reco.get("eos", {}), - "4g": reco.get("det_4g", {}), - "5g": reco.get("det_5g", {}), - } - - system = ( - "You are a router replacement assistant for a Verizon seller. " - "Answer using only the provided JSON context. " - "If the context lacks the answer, say what is missing and suggest what to check." - ) + # Manufacturer link (best effort) + url5 = _best_effort_manufacturer_url(repl5, canon_make) if repl5 else "" - user = { - "context": ctx, - "question": question, - } + # Feature table row for 5G (helps the LLM answer spec questions without web scraping) + feat5 = {} + try: + feat5 = _features_from_dec(repl5, canon_make) if repl5 else {} + except Exception: + feat5 = {} - resp = openai_chat( - model=OPENAI_MODEL_QA, - messages=[ - {"role": "system", "content": system}, - {"role": "user", "content": json.dumps(user, ensure_ascii=False)}, - ], - max_tokens=500, - temperature=0.2, + sys = ( + "You are a Verizon field rep assistant. Answer questions about the suggested router in a fast, practical way. " + "Use the provided context; do not mention internal tools, prompts, embeddings, or databases. " + "If the question is about specs and the value is unknown, say 'Not listed' and suggest checking the manufacturer page. " + "Keep it concise and scannable." ) - return clean_str(resp) - - -# ----------------------------- -# Gradio UI -# ----------------------------- - - -TABLE_HEADERS = [ - "Role", - "Make", - "Model", - "Modem", - "Fit badges", - "Cell MIMO", - "Eth ports", - "Battery", - "Wi-Fi", - "Rugged", - "Dual-SIM", - "Serial", - "Throughput", - "Link", -] - - -def build_demo() -> gr.Blocks: - with gr.Blocks(title="Only Routers") as demo: - gr.Markdown( - "# Only Routers\n" - "Type a router SKU to get a 4G + 5G replacement suggestion." - ) - - st = gr.State(value=state_to_json(state_default())) - - with gr.Row(): - with gr.Column(scale=3): - chatbot = gr.Chatbot(label="Chat") - user_in = gr.Textbox(label="Message", placeholder="Example: IBR900", lines=1) - send = gr.Button("Send") - clear = gr.Button("Clear") - - with gr.Column(scale=2): - gr.Markdown("### Replacement device details") - table = gr.Dataframe( - headers=TABLE_HEADERS, - value=[], - interactive=False, - ) - - links_md = gr.Markdown("") - - gr.Markdown("### Customer-ready email") - summary = gr.Textbox( - value="", - lines=14, - label="", - interactive=False, - show_copy_button=True, - ) - - def _on_send(msg: str, hist: List[List[str]], st_json: str): - hist2, st2, rows, summary_txt, links_txt = handle_user_message(msg, hist or [], st_json) - return "", hist2, st2, rows, summary_txt, links_txt - - send.click(_on_send, inputs=[user_in, chatbot, st], outputs=[user_in, chatbot, st, table, summary, links_md]) - user_in.submit(_on_send, inputs=[user_in, chatbot, st], outputs=[user_in, chatbot, st, table, summary, links_md]) + context = { + "recommended_5g": repl5, + "recommended_4g": repl4 if repl4 and repl4.lower() != "not applicable" else "", + "manufacturer_link_5g": url5, + "known_5g_features": feat5, + "antenna_stationary": ant.get("stationary_omni", {}), + "antenna_vehicle": ant.get("vehicle_omni", {}), + } - def _on_clear(): - return [], state_to_json(state_default()), [], "", "" + user = "Context:\n" + json.dumps(context, ensure_ascii=False) + "\n\nQuestion:\n" + q - clear.click(_on_clear, inputs=[], outputs=[chatbot, st, table, summary, links_md]) + ans = gpt_answer_md(sys, user, max_tokens=650) + # Small safety fallback + return ans if ans else "I couldn't generate an answer right now. Try again." - return demo +# ============================ +# UI +# ============================ -demo = build_demo() +# ============================ +# Chat helpers +# ============================ +def _df_to_md(df: pd.DataFrame) -> str: + if df is None or (hasattr(df, "empty") and df.empty): + return "" + try: + return df.to_markdown(index=False) + except Exception: + cols = list(df.columns) + lines = ["| " + " | ".join(cols) + " |", "| " + " | ".join(["---"]*len(cols)) + " |"] + for _, r in df.iterrows(): + lines.append("| " + " | ".join([str(r.get(c,"")) for c in cols]) + " |") + return "\n".join(lines) + +def _extract_device_terms(msg: str) -> List[str]: + raw = [x.strip() for x in re.split(r"[\n,;]+", str(msg or "")) if x.strip()] + out=[] + for x in raw: + if re.search(r"\d", x) or re.search(r"\b(IBR|AER|WR|XR|IR|RUT|MBR|E\d{3}|R\d{3})\b", x, flags=re.IGNORECASE): + out.append(x) + return out -if __name__ == "__main__": - demo.queue() - demo.launch( - server_name="0.0.0.0", - server_port=int(os.getenv("PORT", "7860")), - share=False, - show_api=False, - ) +def _looks_like_yes(msg: str) -> bool: + return str(msg or "").strip().lower() in {"yes","y","yeah","yep","sure","ok","okay"} + +def _parse_install_mode(msg: str) -> Tuple[Optional[str], Optional[str]]: + t = str(msg or "").strip().lower() + mode = None + detail = None + if "vehicle" in t or "mobile" in t: + mode = "vehicle" + if "stationary" in t or "fixed" in t or "site" in t: + mode = "stationary" + if "indoor" in t: + detail = "indoor" + if "outdoor" in t: + detail = "outdoor" + if "directional" in t: + detail = "directional" + return mode, detail + +def _antenna_for_mode(repl5: str, canon_make: str, mode: str, detail: Optional[str]) -> Dict[str, Any]: + mimo = "4x4" # rule: all 5G = 4x4 + tech = "5G" + if mode == "vehicle": + return antenna_options_for(repl5, tech, mimo).get("vehicle_omni", {}) + if detail == "directional": + return antenna_options_for(repl5 + " directional", tech, mimo).get("stationary_omni", {}) + if detail == "indoor": + return antenna_options_for(repl5 + " indoor", tech, mimo).get("stationary_omni", {}) + return antenna_options_for(repl5, tech, mimo).get("stationary_omni", {}) + +def _make_case_key(s: str) -> str: + s = str(s or "").strip() + return re.sub(r"\s+", " ", s)[:80] + +with gr.Blocks(title="Only-Routers") as demo: + gr.Markdown("## Only-Routers\nChat mode for Verizon reps (multiple devices per message) + Batch tab.") + + state = gr.State("{}") + + with gr.Tabs(): + with gr.Tab("Chat"): + chatbot = gr.Chatbot(label="Only-Routers Chat", height=520, type="tuple") + msg = gr.Textbox(label="Message", placeholder="Example: IBR650B, WR21\nVehicle install", lines=2) + send = gr.Button("Send", variant="primary") + + def chat_fn(user_msg, history, st_json): + st = state_load(st_json) + st.setdefault("cases", {}) + st.setdefault("last_case_keys", []) + st.setdefault("pending", {}) + st.setdefault("awaiting_questions", False) + + text = (user_msg or "").strip() + if not text: + return history, state_dump(st) + + # Pending pick (A/B) + if st.get("pending", {}).get("type") == "pick": + pend = st["pending"] + opts = pend.get("options", []) + choice = text.strip().lower() + idx = None + if choice in {"a","1","option a"} and len(opts) >= 1: + idx = 0 + elif choice in {"b","2","option b"} and len(opts) >= 2: + idx = 1 + if idx is None: + for i,o in enumerate(opts): + if str(o.get("label","")).lower() in choice: + idx = i + break + if idx is None: + history.append((text, "Please reply with **A** or **B**.")) + return history, state_dump(st) + + chosen_row = int(opts[idx]["row_idx"]) + life_row = df_eos.iloc[chosen_row] + eos, eol, status = row_to_dates_and_status(life_row) + repl = pick_replacements_lifecycle(life_row, status, use_gpt=True) + canon_make = str(life_row.get("_canon_make","UNKNOWN")) + + feat_df = build_replacement_features_table(repl.get("repl_4g",""), repl.get("repl_5g",""), canon_make) + fit_df = build_fit_table(repl.get("repl_4g",""), repl.get("repl_5g",""), canon_make) + + url4 = _best_effort_manufacturer_url(repl.get("repl_4g",""), canon_make) if repl.get("repl_4g","") not in {"Not applicable",""} else "" + url5 = _best_effort_manufacturer_url(repl.get("repl_5g",""), canon_make) if repl.get("repl_5g","") not in {"Not listed",""} else "" + + case_key = _make_case_key(str(life_row.get("sku","")) or pend.get("raw","")) + st["cases"][case_key] = {"row_idx": chosen_row, "repl": repl, "canon_make": canon_make, "eos": eos, "eol": eol, "status": status, "urls": {"4g": url4, "5g": url5}} + st["last_case_keys"].append(case_key) + st["pending"] = {"type": "install_mode", "case_keys": [case_key]} + + bot = [] + bot.append(f"**{case_key}**") + bot.append(f"- Status: **{status}** | EOS: **{eos}** | EOL: **{eol}**") + bot.append(f"- 4G alternative: **{repl.get('repl_4g','Not applicable')}**") + bot.append(f"- 5G replacement: **{repl.get('repl_5g','Not listed')}**") + if url4: + bot.append(f"- 4G manufacturer page: {url4}") + if url5: + bot.append(f"- 5G manufacturer page: {url5}") + bot.append("\n**Replacement features**\n" + _df_to_md(feat_df)) + bot.append("\n**Verizon fit**\n" + _df_to_md(fit_df)) + bot.append("\nFor antennas: **Vehicle/Mobile** or **Stationary**? If Stationary: **Indoor**, **Outdoor**, or **Directional**.") + bot.append("\nAny questions about the suggested device(s)?") + history.append((text, "\n".join(bot))) + st["awaiting_questions"] = True + return history, state_dump(st) + + # Pending install mode + if st.get("pending", {}).get("type") == "install_mode": + mode, detail = _parse_install_mode(text) + if mode is None: + history.append((text, "Quick one: **Vehicle/Mobile** or **Stationary**? If Stationary: **Indoor**, **Outdoor**, or **Directional**.")) + return history, state_dump(st) + + case_keys = st["pending"].get("case_keys", []) or st.get("last_case_keys", []) + updates=[] + for ck in case_keys: + case = st["cases"].get(ck, {}) + repl5 = (case.get("repl", {}) or {}).get("repl_5g","") + canon_make = case.get("canon_make","UNKNOWN") + ant = _antenna_for_mode(repl5, canon_make, mode, detail) + case.setdefault("antennas", {}) + case["antennas"][f"{mode}:{detail or ''}"] = ant + st["cases"][ck] = case + updates.append(f"**{ck}** antenna ({mode}{' / '+detail if detail else ''}): {ant.get('name','')} (PN {ant.get('part_number','')})") + + st["pending"] = {} + history.append((text, "\n".join(updates))) + return history, state_dump(st) + + # If user says yes to questions + if st.get("awaiting_questions") and _looks_like_yes(text): + history.append((text, "Ask away — what do you want to know about the suggested device(s)?")) + return history, state_dump(st) + + # Device lookup + device_terms = _extract_device_terms(text) + if device_terms: + bots=[] + new_case_keys=[] + for term in device_terms: + res = resolve_device(term) + if res.get("mode") == "pick": + st["pending"] = {"type":"pick", "options": res.get("options", []), "raw": term} + opts = res.get("options", []) + bot = "I found more than one close match. Reply **A** or **B**:\n" + for i,o in enumerate(opts): + bot += f"- **{'A' if i==0 else 'B'}**: {o.get('label','')}\n" + history.append((text, bot.strip())) + return history, state_dump(st) + if res.get("mode") != "ok": + bots.append(f"**{term}**: not found in lifecycle list. Who makes it (manufacturer) and what's the exact model/SKU?") + continue + + life_row = df_eos.iloc[int(res["row_idx"])] + eos, eol, status = row_to_dates_and_status(life_row) + repl = pick_replacements_lifecycle(life_row, status, use_gpt=True) + canon_make = str(life_row.get("_canon_make","UNKNOWN")) + + feat_df = build_replacement_features_table(repl.get("repl_4g",""), repl.get("repl_5g",""), canon_make) + fit_df = build_fit_table(repl.get("repl_4g",""), repl.get("repl_5g",""), canon_make) + + url4 = _best_effort_manufacturer_url(repl.get("repl_4g",""), canon_make) if repl.get("repl_4g","") not in {"Not applicable",""} else "" + url5 = _best_effort_manufacturer_url(repl.get("repl_5g",""), canon_make) if repl.get("repl_5g","") not in {"Not listed",""} else "" + + ck = _make_case_key(str(life_row.get("sku","")) or term) + st["cases"][ck] = {"row_idx": int(res["row_idx"]), "repl": repl, "canon_make": canon_make, "eos": eos, "eol": eol, "status": status, "urls": {"4g": url4, "5g": url5}} + st["last_case_keys"].append(ck) + new_case_keys.append(ck) + + bot=[] + bot.append(f"**{ck}**") + bot.append(f"- Status: **{status}** | EOS: **{eos}** | EOL: **{eol}**") + bot.append(f"- 4G alternative: **{repl.get('repl_4g','Not applicable')}**") + bot.append(f"- 5G replacement: **{repl.get('repl_5g','Not listed')}**") + if url4: + bot.append(f"- 4G manufacturer page: {url4}") + if url5: + bot.append(f"- 5G manufacturer page: {url5}") + bot.append("\n**Replacement features**\n" + _df_to_md(feat_df)) + bot.append("\n**Verizon fit**\n" + _df_to_md(fit_df)) + bots.append("\n".join(bot)) + + if new_case_keys: + st["pending"] = {"type":"install_mode", "case_keys": new_case_keys} + bots.append("\nFor antennas: **Vehicle/Mobile** or **Stationary**? If Stationary: **Indoor**, **Outdoor**, or **Directional**.") + bots.append("Any questions about the suggested device(s)?") + st["awaiting_questions"] = True + + history.append((text, "\n\n---\n\n".join(bots))) + return history, state_dump(st) + + # Treat as question about most recent case + last_keys = st.get("last_case_keys", []) + if not last_keys: + history.append((text, "Tell me the router model/SKU you’re working with (you can paste multiple).")) + return history, state_dump(st) + + ck = last_keys[-1] + case = st["cases"].get(ck, {}) + mini = {"row_idx": case.get("row_idx"), "repl": case.get("repl", {}), "ant": case.get("antennas", {})} + ans = answer_question(text, state_dump(mini)) + history.append((text, ans)) + return history, state_dump(st) + + send.click(fn=chat_fn, inputs=[msg, chatbot, state], outputs=[chatbot, state], api_name=False) + + with gr.Tab("Batch"): + gr.Markdown("Paste one per line or upload a CSV (first column). Batch runs fast (no GPT).") + batch_text = gr.Textbox(label="Paste devices (one per line)", lines=8, placeholder="WR21\nRUT240\nIBR650B") + batch_file = gr.File(label="Upload CSV", file_types=[".csv"]) + include_ant = gr.Checkbox(label="Include antenna picks (slower)", value=False) + run_btn = gr.Button("Run batch", variant="primary") + + summary_md = gr.Markdown() + rollup_md = gr.Markdown() + table = gr.Dataframe(interactive=False, wrap=True) + dl = gr.File(label="Download results CSV") + + run_btn.click(fn=run_batch, inputs=[batch_text, batch_file, include_ant], outputs=[summary_md, table, dl, rollup_md], api_name=False) + +demo.launch(server_name="0.0.0.0", server_port=int(os.getenv("PORT","7860")), share=False, show_api=False)