Spaces:
Sleeping
Sleeping
| import os | |
| import re | |
| import json | |
| import math | |
| import time | |
| import hashlib | |
| import tempfile | |
| from dataclasses import dataclass | |
| from datetime import datetime, date | |
| from functools import lru_cache | |
| from typing import Any, Dict, List, Optional, Tuple | |
| import numpy as np | |
| import pandas as pd | |
| import fitz # PyMuPDF | |
| import faiss | |
| from sentence_transformers import SentenceTransformer | |
| from rapidfuzz import fuzz, process | |
| import gradio as gr | |
| from openai import OpenAI | |
| # ============================================================ | |
| # Only-Routers (Chat, production-lean) | |
| # - Fast model by default (no reasoning payload) | |
| # - One LLM call max per lookup (enrichment only, cached) | |
| # - No HTTP crawling during normal lookup (links are deterministic) | |
| # - Timing logs to HF console when DEBUG_TIMING=1 | |
| # ============================================================ | |
| # ---------------------------- | |
| # Settings | |
| # ---------------------------- | |
| TODAY = date(2026, 1, 18) | |
| # Fast default model (override via env) | |
| OPENAI_MODEL = os.getenv("OPENAI_MODEL", "gpt-5.2").strip() | |
| # Disable LLM at runtime: OPENAI_DISABLE=1 | |
| OPENAI_DISABLE = os.getenv("OPENAI_DISABLE", "0").strip() == "1" | |
| # Timing logs | |
| DEBUG_TIMING = os.getenv("DEBUG_TIMING", "0").strip() == "1" | |
| # Matching thresholds | |
| MATCH_OK = 82 | |
| MATCH_AUTOPICK = 95 | |
| MATCH_GAP = 8 | |
| # Embeddings | |
| EMBED_MODEL_NAME = os.getenv("EMBED_MODEL_NAME", "sentence-transformers/all-MiniLM-L6-v2").strip() | |
| # Parsec PDF slicing | |
| PARSEC_CONTEXT_BEFORE = 900 | |
| PARSEC_CONTEXT_AFTER = 1600 | |
| # ---------------------------- | |
| # OpenAI client | |
| # ---------------------------- | |
| API_KEY = os.getenv("OPENAI_API_KEY", "").strip() | |
| client = None if (not API_KEY or OPENAI_DISABLE) else OpenAI(api_key=API_KEY) | |
| # ---------------------------- | |
| # Timing helper | |
| # ---------------------------- | |
| def _tlog(label: str, t0: float) -> None: | |
| if DEBUG_TIMING: | |
| dt = time.perf_counter() - t0 | |
| print(f"[TIMER] {label}: {dt:.2f}s") | |
| # ---------------------------- | |
| # JSON-safe helpers | |
| # ---------------------------- | |
| def _json_load_safe(s: str) -> Dict[str, Any]: | |
| try: | |
| return json.loads(s) | |
| except Exception: | |
| return {} | |
| def _json_dump_safe(obj: Any) -> str: | |
| try: | |
| return json.dumps(obj, ensure_ascii=False) | |
| except Exception: | |
| return "{}" | |
| # ---------------------------- | |
| # Gradio state helpers (string JSON only) | |
| # ---------------------------- | |
| def state_load(st_json: str) -> Dict[str, Any]: | |
| try: | |
| return json.loads(st_json) if isinstance(st_json, str) and st_json else {} | |
| except Exception: | |
| return {} | |
| def state_dump(st: Dict[str, Any]) -> str: | |
| return _json_dump_safe(st or {}) | |
| # ---------------------------- | |
| # Normalization | |
| # ---------------------------- | |
| def norm_text(x: Any) -> str: | |
| try: | |
| if x is None or (isinstance(x, float) and math.isnan(x)) or pd.isna(x): | |
| return "" | |
| except Exception: | |
| pass | |
| s = str(x).strip().lower() | |
| s = re.sub(r"[^a-z0-9\s\-\/]", " ", s) | |
| s = re.sub(r"\s+", " ", s).strip() | |
| return s | |
| def safe_str(x: Any) -> str: | |
| if x is None or (isinstance(x, float) and pd.isna(x)) or pd.isna(x): | |
| return "" | |
| return str(x).strip() | |
| def is_5g_text(s: str) -> bool: | |
| t = norm_text(s) | |
| return ("5g" in t) or ("nr" in t) | |
| def is_4g_lte_family(row: pd.Series) -> bool: | |
| # Treat LTE categories as 4G | |
| t = norm_text(row.get("description", "")) + " " + norm_text(row.get("notes", "")) | |
| if "5g" in t or "nr" in t: | |
| return False | |
| if "lte" in t or "4g" in t: | |
| return True | |
| if re.search(r"\bcat\s*[-]?\s*(m1|m2)\b", t): | |
| return True | |
| if re.search(r"\bcat\s*[-]?\s*\d{1,2}\b", t): | |
| return True | |
| if "cat" in t: | |
| return True | |
| return False | |
| # ---------------------------- | |
| # Lifecycle CSV normalization | |
| # ---------------------------- | |
| def _normalize_lifecycle_df(df: pd.DataFrame) -> pd.DataFrame: | |
| df = df.copy() | |
| lower_cols = {c.lower(): c for c in df.columns} | |
| def _pick(*names): | |
| for n in names: | |
| if n.lower() in lower_cols: | |
| return lower_cols[n.lower()] | |
| return None | |
| col_map = {} | |
| sku_col = _pick("sku", "SKU") | |
| if sku_col: | |
| col_map[sku_col] = "sku" | |
| mfr_col = _pick("manufacturer", "Manufacturer") | |
| if mfr_col: | |
| col_map[mfr_col] = "manufacturer" | |
| dt_col = _pick("device type", "Device Type", "device_type") | |
| if dt_col: | |
| col_map[dt_col] = "device_type" | |
| eos_col = _pick("end_of_sale", "end of sale", "End of Sale", "eos") | |
| if eos_col: | |
| col_map[eos_col] = "end_of_sale" | |
| eol_col = _pick("end_of_life", "end of life", "End of Life", "eol") | |
| if eol_col: | |
| col_map[eol_col] = "end_of_life" | |
| sr_col = _pick("suggested_replacement", "Suggested Replacement") | |
| if sr_col: | |
| col_map[sr_col] = "suggested_replacement" | |
| a5_col = _pick("advanced_5g_option", "Advanced 5G Option", "advanced 5g option") | |
| if a5_col: | |
| col_map[a5_col] = "advanced_5g_option" | |
| df = df.rename(columns=col_map) | |
| for req in ["sku", "manufacturer", "device_type", "end_of_sale", "end_of_life", "suggested_replacement", "advanced_5g_option"]: | |
| if req not in df.columns: | |
| df[req] = "" | |
| # Compatibility fields used by matching/output | |
| if "description" not in df.columns: | |
| df["description"] = df["sku"].astype(str) | |
| if "notes" not in df.columns: | |
| df["notes"] = "" | |
| if "region" not in df.columns: | |
| df["region"] = "" | |
| return df | |
| # ---------------------------- | |
| # Maker mapping | |
| # ---------------------------- | |
| CANON_MAKER = { | |
| "CRADLEPOINT": {"cradlepoint", "ericsson", "ericsson enterprise wireless"}, | |
| "SIERRA": {"sierra", "sierra wireless", "semtech", "airlink"}, | |
| "FEENEY": {"feeney", "feeney wireless", "inseego"}, | |
| "DIGI": {"digi", "accelerated", "accelerated concepts"}, | |
| "CISCO_MERAKI": {"meraki", "cisco meraki"}, | |
| "CISCO": {"cisco"}, | |
| "TELTONIKA": {"teltonika"}, | |
| } | |
| def canon_maker_from_text(s: Any) -> str: | |
| t = norm_text(s) | |
| for canon, terms in CANON_MAKER.items(): | |
| for term in terms: | |
| if term in t: | |
| return canon | |
| return "UNKNOWN" | |
| # ---------------------------- | |
| # Date parsing | |
| # ---------------------------- | |
| class ParsedDate: | |
| raw: str | |
| kind: str | |
| value: Optional[date] | |
| def parse_date_field(x: Any) -> ParsedDate: | |
| raw = safe_str(x) | |
| if not raw: | |
| return ParsedDate(raw="", kind="missing", value=None) | |
| # MM/DD/YY or M/D/YY | |
| if re.fullmatch(r"\d{1,2}/\d{1,2}/\d{2,4}", raw): | |
| try: | |
| parts = raw.split("/") | |
| m = int(parts[0]); d = int(parts[1]); y = int(parts[2]) | |
| if y < 100: | |
| y += 2000 | |
| dt = date(y, m, d) | |
| return ParsedDate(raw=f"{y:04d}-{m:02d}-{d:02d}", kind="full", value=dt) | |
| except Exception: | |
| return ParsedDate(raw=raw, kind="bad", value=None) | |
| # YYYY | |
| if re.fullmatch(r"\d{4}", raw): | |
| y = int(raw) | |
| if y == TODAY.year: | |
| return ParsedDate(raw=raw, kind="year", value=date(y, 1, 1)) | |
| if y < TODAY.year: | |
| return ParsedDate(raw=raw, kind="year", value=date(y, 1, 1)) | |
| return ParsedDate(raw=raw, kind="year", value=date(y, 12, 31)) | |
| # YYYY-MM | |
| if re.fullmatch(r"\d{4}-\d{2}", raw): | |
| try: | |
| y, m = raw.split("-") | |
| dt = date(int(y), int(m), 1) | |
| return ParsedDate(raw=raw, kind="year_month", value=dt) | |
| except Exception: | |
| return ParsedDate(raw=raw, kind="bad", value=None) | |
| # YYYY-MM-DD | |
| if re.fullmatch(r"\d{4}-\d{2}-\d{2}", raw): | |
| try: | |
| dt = datetime.strptime(raw, "%Y-%m-%d").date() | |
| return ParsedDate(raw=raw, kind="full", value=dt) | |
| except Exception: | |
| return ParsedDate(raw=raw, kind="bad", value=None) | |
| return ParsedDate(raw=raw, kind="bad", value=None) | |
| def display_date(pd_: ParsedDate) -> str: | |
| if pd_.kind == "missing": | |
| return "Not listed" | |
| if pd_.kind == "bad": | |
| return pd_.raw or "Not listed" | |
| return pd_.raw | |
| def status_from_eos_eol(eos: ParsedDate, eol: ParsedDate) -> str: | |
| if eos.value is None and eol.value is None: | |
| return "Unknown" | |
| if eol.value is not None and eol.value <= TODAY: | |
| return "End of Life" | |
| if eos.value is not None and eos.value <= TODAY: | |
| return "End of Sale" | |
| return "Active" | |
| def row_to_dates_and_status(row: pd.Series) -> Tuple[str, str, str]: | |
| eos = parse_date_field(row.get("end_of_sale")) | |
| eol = parse_date_field(row.get("end_of_life")) | |
| return display_date(eos), display_date(eol), status_from_eos_eol(eos, eol) | |
| # ---------------------------- | |
| # Files | |
| # ---------------------------- | |
| EOS_PATH = "routers_eos_eol_by_sku.csv" | |
| DEC_PATH = "dec2025routers.csv" | |
| PARSEC_PDF = "ParsecCatalog.pdf" | |
| if not os.path.exists(EOS_PATH): | |
| raise FileNotFoundError(f"Missing {EOS_PATH} in repo.") | |
| if not os.path.exists(DEC_PATH): | |
| raise FileNotFoundError(f"Missing {DEC_PATH} in repo.") | |
| if not os.path.exists(PARSEC_PDF): | |
| raise FileNotFoundError(f"Missing {PARSEC_PDF} in repo.") | |
| t0 = time.perf_counter() | |
| df_eos = pd.read_csv(EOS_PATH).copy() | |
| df_dec = pd.read_csv(DEC_PATH).copy() | |
| df_eos = _normalize_lifecycle_df(df_eos) | |
| # Canon columns | |
| df_eos["_canon_make"] = df_eos["manufacturer"].apply(canon_maker_from_text) | |
| df_eos["_norm_sku"] = df_eos["sku"].apply(norm_text) | |
| df_eos["_norm_desc"] = df_eos["description"].apply(norm_text) | |
| df_eos["_norm_notes"] = df_eos["notes"].apply(norm_text) | |
| df_dec["_canon_make"] = df_dec["Make"].apply(canon_maker_from_text) if "Make" in df_dec.columns else "UNKNOWN" | |
| df_dec["_norm_model"] = df_dec["Model"].apply(norm_text) if "Model" in df_dec.columns else "" | |
| df_dec["_is5g"] = df_dec["Modem Type"].apply(lambda x: is_5g_text(str(x))) if "Modem Type" in df_dec.columns else False | |
| _tlog("load csv", t0) | |
| # ---------------------------- | |
| # Build fuzzy corpus for device matching | |
| # ---------------------------- | |
| def _label_for_row(i: int) -> str: | |
| r = df_eos.iloc[i] | |
| return f"{r.get('sku','')} — {r.get('manufacturer','')} — {r.get('description','')}"[:220] | |
| EOS_LABELS = [_label_for_row(i) for i in range(len(df_eos))] | |
| EOS_CORPUS = [] | |
| for _, r in df_eos.iterrows(): | |
| EOS_CORPUS.append(" ".join([r.get("_norm_sku",""), r.get("_canon_make",""), r.get("_norm_desc",""), r.get("_norm_notes","")])) | |
| def resolve_device(term: str) -> Dict[str, Any]: | |
| q = norm_text(term) | |
| if not q: | |
| return {"mode": "not_found"} | |
| exact = df_eos.index[df_eos["_norm_sku"] == q].tolist() | |
| if len(exact) == 1: | |
| return {"mode":"ok","row_idx": int(exact[0])} | |
| hits = process.extract(q, EOS_CORPUS, scorer=fuzz.WRatio, limit=6) | |
| cands = [(int(idx), int(score), EOS_LABELS[int(idx)]) for _, score, idx in hits] | |
| if not cands: | |
| return {"mode":"not_found"} | |
| if cands[0][1] >= MATCH_AUTOPICK and (len(cands) == 1 or (cands[0][1] - cands[1][1]) >= MATCH_GAP): | |
| return {"mode":"ok","row_idx": cands[0][0]} | |
| opts = [{"row_idx": cands[0][0], "label": cands[0][2]}] | |
| if len(cands) > 1: | |
| opts.append({"row_idx": cands[1][0], "label": cands[1][2]}) | |
| return {"mode":"pick","options": opts} | |
| # ---------------------------- | |
| # Parsec RAG (FAISS) | |
| # ---------------------------- | |
| t0 = time.perf_counter() | |
| embedder = SentenceTransformer(EMBED_MODEL_NAME) | |
| def extract_pdf_text_pages(path: str) -> List[str]: | |
| doc = fitz.open(path) | |
| return [doc[i].get_text("text") for i in range(len(doc))] | |
| def build_parsec_cards(pages: List[str]) -> List[str]: | |
| cards = [] | |
| for p in pages: | |
| for m in re.finditer(r"Standard\s+SKU:", p): | |
| start = max(0, m.start() - PARSEC_CONTEXT_BEFORE) | |
| end = min(len(p), m.start() + PARSEC_CONTEXT_AFTER) | |
| c = p[start:end].strip() | |
| if len(c) >= 200: | |
| cards.append(c) | |
| out, seen = [], set() | |
| for c in cards: | |
| h = hashlib.sha1(c.encode("utf-8")).hexdigest() | |
| if h not in seen: | |
| seen.add(h); out.append(c) | |
| return out | |
| parsec_cards = build_parsec_cards(extract_pdf_text_pages(PARSEC_PDF)) | |
| parsec_emb = embedder.encode(parsec_cards, batch_size=64, show_progress_bar=False, normalize_embeddings=True) | |
| parsec_emb = np.asarray(parsec_emb, dtype=np.float32) | |
| parsec_index = faiss.IndexFlatIP(parsec_emb.shape[1]) | |
| parsec_index.add(parsec_emb) | |
| _tlog("parsec index", t0) | |
| PARSEC_FAMILY_WORDS = {"chinook","labrador","boxer","bloodhound","husky","beagle","mastiff","collie","shepherd","belgian","australian","terrier","pyrenees"} | |
| def _parsec_name_from_card(card_text: str) -> str: | |
| low = card_text.lower() | |
| for fam in PARSEC_FAMILY_WORDS: | |
| if fam in low: | |
| return fam.capitalize() | |
| return "Parsec antenna" | |
| def _parsec_part_from_card(t: str) -> str: | |
| m = re.search(r"Standard\s+SKU:\s*([A-Z0-9]+)", t) | |
| return m.group(1).strip() if m else "" | |
| def _parsec_desc_from_card(t: str) -> str: | |
| m = re.search(r"Description:\s*(.+?)(?:\n|$)", t, flags=re.IGNORECASE) | |
| return re.sub(r"\s+"," ",m.group(1).strip())[:220] if m else "" | |
| def _parsec_connectors_from_card(t: str) -> str: | |
| m = re.search(r"Standard\s+Connectors:\s*(.+)", t, flags=re.IGNORECASE) | |
| return re.sub(r"\s+"," ",m.group(1).strip())[:80] if m else "" | |
| def parsec_retrieve(query: str, top_k: int = 8) -> List[Dict[str, Any]]: | |
| qv = embedder.encode([query], normalize_embeddings=True) | |
| qv = np.asarray(qv, dtype=np.float32) | |
| scores, ids = parsec_index.search(qv, top_k) | |
| out = [] | |
| for sc, i in zip(scores[0].tolist(), ids[0].tolist()): | |
| if 0 <= int(i) < len(parsec_cards): | |
| card = parsec_cards[int(i)] | |
| out.append({ | |
| "score": float(sc), | |
| "name": _parsec_name_from_card(card), | |
| "part_number": _parsec_part_from_card(card), | |
| "description": _parsec_desc_from_card(card), | |
| "connectors": _parsec_connectors_from_card(card), | |
| }) | |
| return out | |
| def antenna_pick(repl5: str, mode: str, detail: Optional[str]) -> Dict[str, Any]: | |
| mimo = "4x4" # rule: all 5G -> 4x4 | |
| tech = "5G" | |
| if mode == "vehicle": | |
| q = f"{repl5} {tech} {mimo} omni vehicle mobile magnetic through-bolt" | |
| c = parsec_retrieve(q, top_k=8) | |
| best = c[0] if c else {"name":"Parsec antenna","part_number":"","description":"","connectors":""} | |
| best.update({"mimo": mimo, "why": "Vehicle omni best match."}) | |
| return best | |
| if detail == "directional": | |
| q = f"{repl5} {tech} {mimo} directional fixed site" | |
| c = parsec_retrieve(q, top_k=8) | |
| best = c[0] if c else {"name":"Parsec antenna","part_number":"","description":"","connectors":""} | |
| best.update({"mimo": mimo, "why": "Stationary directional best match."}) | |
| return best | |
| if detail == "indoor": | |
| q = f"{repl5} {tech} {mimo} omni indoor" | |
| c = parsec_retrieve(q, top_k=8) | |
| best = c[0] if c else {"name":"Parsec antenna","part_number":"","description":"","connectors":""} | |
| best.update({"mimo": mimo, "why": "Stationary indoor omni best match."}) | |
| return best | |
| q = f"{repl5} {tech} {mimo} omni outdoor pole wall fixed site" | |
| c = parsec_retrieve(q, top_k=8) | |
| best = c[0] if c else {"name":"Parsec antenna","part_number":"","description":"","connectors":""} | |
| best.update({"mimo": mimo, "why": "Stationary outdoor omni best match."}) | |
| return best | |
| # ---------------------------- | |
| # Replacement selection (lifecycle-first) | |
| # ---------------------------- | |
| def extract_model_token(text: str) -> str: | |
| s = safe_str(text) | |
| if not s: | |
| return "" | |
| parts = [p.strip() for p in s.split("|") if p.strip()] | |
| candidates = parts[::-1] if parts else [s] | |
| for cand in candidates: | |
| u = cand.upper() | |
| m = re.search(r"\bRUT[A-Z]?\d{2,4}\b", u) | |
| if m: | |
| return m.group(0) | |
| m = re.search(r"\bRUTM\d{2,3}\b", u) | |
| if m: | |
| return m.group(0) | |
| m = re.search(r"\bIX\d{2}\b", u) | |
| if m: | |
| return m.group(0) | |
| m = re.search(r"\b(R\d{3,4}|E\d{3,4}|S\d{3,4})\b", u) | |
| if m: | |
| return m.group(0) | |
| m = re.search(r"\b[A-Z]{1,6}\d{2,4}[A-Z]?\b", u) | |
| if m: | |
| return m.group(0) | |
| return candidates[0][:60] | |
| def pick_replacements(row: pd.Series, status: str) -> Dict[str, str]: | |
| sug = safe_str(row.get("suggested_replacement", "")) | |
| adv = safe_str(row.get("advanced_5g_option", "")) | |
| repl_4g = extract_model_token(sug) if sug else "Not applicable" | |
| repl_5g = extract_model_token(adv) if adv else "Not listed" | |
| # Always provide some 5G answer: if lifecycle missing, pick top 5G from dec (same maker) | |
| if repl_5g in {"", "Not listed"}: | |
| canon_make = str(row.get("_canon_make","UNKNOWN")) | |
| pool = df_dec[(df_dec["_canon_make"] == canon_make) & (df_dec["_is5g"] == True)].copy() | |
| repl_5g = str(pool.iloc[0]["Model"]).strip() if not pool.empty else "Not listed" | |
| return {"repl_4g": repl_4g or "Not applicable", "repl_5g": repl_5g or "Not listed"} | |
| # ---------------------------- | |
| # Features + Fit (dec first, single LLM enrichment call if needed) | |
| # ---------------------------- | |
| FEATURE_COLS = ["Device", "Modem technology", "WiFi", "Ports", "Antennas", "Ruggedness", "Use case"] | |
| FIT_COLS = ["Device", "Fit badges", "Ethernet ports", "Battery"] | |
| def _features_from_dec(model: str, canon_make: str) -> Dict[str, str]: | |
| if not model or model in {"Not listed", "Not applicable"}: | |
| return {k: "Not listed" for k in FEATURE_COLS[1:]} | |
| pool = df_dec[df_dec["_canon_make"] == canon_make].copy() | |
| if pool.empty: | |
| return {k: "Not listed" for k in FEATURE_COLS[1:]} | |
| hit = process.extractOne(norm_text(model), pool["_norm_model"].tolist(), scorer=fuzz.WRatio) | |
| if not hit or hit[1] < MATCH_OK: | |
| return {k: "Not listed" for k in FEATURE_COLS[1:]} | |
| r = pool.iloc[int(hit[2])] | |
| ports = f"WAN: {r.get('WAN ports and speed','')} | LAN: {r.get('LAN ports and speed','')}".strip() | |
| return { | |
| "Modem technology": str(r.get("Modem Type","") or "Not listed"), | |
| "WiFi": str(r.get("WiFi type","") or "Not listed"), | |
| "Ports": ports if ports else "Not listed", | |
| "Antennas": str(r.get("Antennas (internal/external/both)","") or "Not listed"), | |
| "Ruggedness": str(r.get("Ruggedization","") or "Not listed"), | |
| "Use case": str(r.get("Primary use case","") or "Not listed"), | |
| } | |
| def _fit_from_dec(model: str, canon_make: str, is5: bool) -> Dict[str, str]: | |
| badges = [] | |
| eth = "Not listed" | |
| bat = "Not listed" | |
| if is5: | |
| badges.append("4x4 MIMO") | |
| pool = df_dec[df_dec["_canon_make"] == canon_make].copy() | |
| if pool.empty or not model or model in {"Not listed", "Not applicable"}: | |
| return {"Fit badges": ", ".join(badges) if badges else "Not listed", "Ethernet ports": eth, "Battery": bat} | |
| hit = process.extractOne(norm_text(model), pool["_norm_model"].tolist(), scorer=fuzz.WRatio) | |
| if not hit or hit[1] < MATCH_OK: | |
| return {"Fit badges": ", ".join(badges) if badges else "Not listed", "Ethernet ports": eth, "Battery": bat} | |
| r = pool.iloc[int(hit[2])] | |
| use_case = str(r.get("Primary use case","") or "").lower() | |
| rugged = str(r.get("Ruggedization","") or "").lower() | |
| wifi = str(r.get("WiFi type","") or "").strip().lower() | |
| serial = str(r.get("Serial port (yes/no)","") or "").strip().lower() | |
| battery = str(r.get("Battery (internal/removable/none/optional)","") or "").strip().lower() | |
| notes_blob = " ".join([str(r.get("Special notes","") or ""), str(r.get("summary and use case","") or "")]).lower() | |
| if any(k in use_case for k in ["vehicle","mobile","fleet","in-vehicle"]) or "vehicle" in rugged: | |
| badges.append("Vehicle") | |
| else: | |
| badges.append("Fixed site") | |
| if wifi and wifi not in {"none","no","n/a"}: | |
| badges.append("Wi‑Fi") | |
| if any(k in rugged for k in ["rugged","industrial","ip","harsh"]): | |
| badges.append("Rugged") | |
| if "dual" in notes_blob and "sim" in notes_blob: | |
| badges.append("Dual‑SIM") | |
| if serial in {"yes","y","true"}: | |
| badges.append("Serial") | |
| if battery: | |
| if "none" in battery: | |
| bat = "No" | |
| else: | |
| bat = "Yes" | |
| badges_csv = ", ".join(dict.fromkeys(badges)) if badges else "Not listed" | |
| return {"Fit badges": badges_csv, "Ethernet ports": eth, "Battery": bat} | |
| # Enrichment cache (one call per (make, repl4, repl5)) | |
| _ENRICH_CACHE: Dict[str, Dict[str, Any]] = {} | |
| def _enrich_key(canon_make: str, repl4: str, repl5: str) -> str: | |
| return hashlib.sha1(f"{canon_make}|{repl4}|{repl5}".encode("utf-8")).hexdigest() | |
| def gpt_enrich(repl4: str, repl5: str, canon_make: str, feat4: Dict[str,str], feat5: Dict[str,str], fit4: Dict[str,str], fit5: Dict[str,str]) -> Dict[str, Any]: | |
| if client is None: | |
| return {"feat4": feat4, "feat5": feat5, "fit4": fit4, "fit5": fit5} | |
| key = _enrich_key(canon_make, repl4, repl5) | |
| if key in _ENRICH_CACHE: | |
| return _ENRICH_CACHE[key] | |
| def miss(d: Dict[str,str]) -> List[str]: | |
| out=[] | |
| for k,v in d.items(): | |
| if (not v) or str(v).strip().lower() in {"not listed","nan",""}: | |
| out.append(k) | |
| return out | |
| m_feat4 = miss(feat4); m_feat5 = miss(feat5) | |
| m_fit4 = miss(fit4); m_fit5 = miss(fit5) | |
| if not (m_feat4 or m_feat5 or m_fit4 or m_fit5): | |
| pack = {"feat4": feat4, "feat5": feat5, "fit4": fit4, "fit5": fit5} | |
| _ENRICH_CACHE[key] = pack | |
| return pack | |
| sys = ( | |
| "You are helping a Verizon rep. Fill missing router feature fields and fit traits. Return strict JSON only. " | |
| "Keep values short. " | |
| "Fit badges must be chosen from: ['Vehicle','Fixed site','Wi‑Fi','Rugged','Dual‑SIM','4x4 MIMO','High throughput','Serial'] only. " | |
| "Rule: if a router is 5G, include '4x4 MIMO'. " | |
| "Ethernet ports must be a single integer as a string when possible; else 'Not listed'. " | |
| "Battery must be 'Yes', 'No', or 'Not listed'." | |
| ) | |
| payload = { | |
| "maker_family": canon_make, | |
| "models": {"repl4": repl4, "repl5": repl5}, | |
| "known": {"feat4": feat4, "feat5": feat5, "fit4": fit4, "fit5": fit5}, | |
| "missing": {"feat4": m_feat4, "feat5": m_feat5, "fit4": m_fit4, "fit5": m_fit5}, | |
| "output_schema": { | |
| "feat4": {k: "string" for k in m_feat4}, | |
| "feat5": {k: "string" for k in m_feat5}, | |
| "fit4": {k: "string" for k in m_fit4}, | |
| "fit5": {k: "string" for k in m_fit5}, | |
| }, | |
| } | |
| t0 = time.perf_counter() | |
| resp = client.responses.create( | |
| model=OPENAI_MODEL, | |
| input=[{"role":"system","content":sys},{"role":"user","content":_json_dump_safe(payload)}], | |
| max_output_tokens=420, | |
| ) | |
| _tlog("llm enrich", t0) | |
| out = _json_load_safe(getattr(resp, "output_text", "") or "") | |
| def merge(base: Dict[str,str], patch: Any) -> Dict[str,str]: | |
| if isinstance(patch, dict): | |
| for k,v in patch.items(): | |
| sv = str(v or "").strip() | |
| if sv: | |
| base[k] = sv | |
| return base | |
| feat4x = merge(dict(feat4), out.get("feat4", {})) | |
| feat5x = merge(dict(feat5), out.get("feat5", {})) | |
| fit4x = merge(dict(fit4), out.get("fit4", {})) | |
| fit5x = merge(dict(fit5), out.get("fit5", {})) | |
| # Enforce 5G 4x4 badge | |
| b = str(fit5x.get("Fit badges","") or "") | |
| if "4x4 MIMO" not in b: | |
| fit5x["Fit badges"] = (b + ", 4x4 MIMO").strip(", ").strip() if b and b != "Not listed" else "4x4 MIMO" | |
| pack = {"feat4": feat4x, "feat5": feat5x, "fit4": fit4x, "fit5": fit5x} | |
| _ENRICH_CACHE[key] = pack | |
| return pack | |
| def build_tables(repl4: str, repl5: str, canon_make: str) -> Tuple[pd.DataFrame, pd.DataFrame]: | |
| feat4 = _features_from_dec(repl4, canon_make) | |
| feat5 = _features_from_dec(repl5, canon_make) | |
| fit4 = _fit_from_dec(repl4, canon_make, is5=False) | |
| fit5 = _fit_from_dec(repl5, canon_make, is5=True) | |
| pack = gpt_enrich(repl4, repl5, canon_make, feat4, feat5, fit4, fit5) | |
| feat_df = pd.DataFrame([ | |
| {"Device":"4G alternative", **pack["feat4"]}, | |
| {"Device":"5G replacement", **pack["feat5"]}, | |
| ], columns=FEATURE_COLS) | |
| fit_df = pd.DataFrame([ | |
| {"Device":"4G alternative", **pack["fit4"]}, | |
| {"Device":"5G replacement", **pack["fit5"]}, | |
| ], columns=FIT_COLS) | |
| return feat_df, fit_df | |
| # ---------------------------- | |
| # Manufacturer link (deterministic, no HTTP) | |
| # ---------------------------- | |
| MAKER_DOMAINS = { | |
| "CRADLEPOINT": "https://cradlepoint.com", | |
| "SIERRA": "https://airlink.com", | |
| "FEENEY": "https://inseego.com", | |
| "DIGI": "https://www.digi.com", | |
| "CISCO_MERAKI": "https://meraki.cisco.com", | |
| "CISCO": "https://www.cisco.com", | |
| "TELTONIKA": "https://teltonika-networks.com", | |
| "UNKNOWN": "", | |
| } | |
| def guess_maker_url(model: str, canon_make: str) -> str: | |
| model = str(model or "").strip() | |
| base = MAKER_DOMAINS.get(canon_make, "") | |
| if not base or not model or model in {"Not listed", "Not applicable"}: | |
| return "" | |
| q = re.sub(r"\s+", "+", model) | |
| if canon_make == "TELTONIKA": | |
| slug = model.lower() | |
| return f"{base}/products/routers/{slug}" | |
| if canon_make == "DIGI": | |
| return f"{base}/search?q={q}" | |
| if canon_make == "CRADLEPOINT": | |
| return f"{base}/?s={q}" | |
| if canon_make in {"CISCO", "CISCO_MERAKI"}: | |
| return f"https://www.cisco.com/c/en/us/search.html?q={q}" | |
| return f"{base}/search?q={q}" | |
| # ---------------------------- | |
| # Q&A (on demand, per last case) | |
| # ---------------------------- | |
| def gpt_answer(question: str, context: Dict[str, Any]) -> str: | |
| if client is None: | |
| return "No API key is configured, so I can’t answer detailed questions right now." | |
| q = str(question or "").strip() | |
| if not q: | |
| return "" | |
| sys = ( | |
| "You are a Verizon rep assistant. Answer in a fast, practical way. " | |
| "Use the provided context. " | |
| "Do not mention internal tools or prompts. " | |
| "If unknown, say 'Not listed' and suggest the manufacturer page." | |
| ) | |
| payload = {"context": context, "question": q} | |
| t0 = time.perf_counter() | |
| resp = client.responses.create( | |
| model=OPENAI_MODEL, | |
| input=[{"role":"system","content":sys},{"role":"user","content":_json_dump_safe(payload)}], | |
| max_output_tokens=520, | |
| ) | |
| _tlog("llm qa", t0) | |
| return (getattr(resp, "output_text", "") or "").strip() | |
| # ---------------------------- | |
| # Chat utilities | |
| # ---------------------------- | |
| def df_to_md(df: pd.DataFrame) -> str: | |
| try: | |
| return df.to_markdown(index=False) | |
| except Exception: | |
| cols = list(df.columns) | |
| lines = ["| " + " | ".join(cols) + " |", "| " + " | ".join(["---"]*len(cols)) + " |"] | |
| for _, r in df.iterrows(): | |
| lines.append("| " + " | ".join([str(r.get(c,"")) for c in cols]) + " |") | |
| return "\n".join(lines) | |
| def extract_device_terms(msg: str) -> List[str]: | |
| raw = [x.strip() for x in re.split(r"[\n,;]+", str(msg or "")) if x.strip()] | |
| out=[] | |
| for x in raw: | |
| if re.search(r"\d", x) or re.search(r"\b(IBR|AER|WR|XR|IR|RUT|MBR|E\d{3}|R\d{3})\b", x, flags=re.IGNORECASE): | |
| out.append(x) | |
| return out | |
| def parse_install_mode(msg: str) -> Tuple[Optional[str], Optional[str]]: | |
| t = str(msg or "").strip().lower() | |
| mode = None | |
| detail = None | |
| if "vehicle" in t or "mobile" in t: | |
| mode = "vehicle" | |
| if "stationary" in t or "fixed" in t or "site" in t: | |
| mode = "stationary" | |
| if "indoor" in t: | |
| detail = "indoor" | |
| if "outdoor" in t: | |
| detail = "outdoor" | |
| if "directional" in t: | |
| detail = "directional" | |
| return mode, detail | |
| def make_case_key(s: str) -> str: | |
| s = str(s or "").strip() | |
| return re.sub(r"\s+", " ", s)[:80] | |
| # ---------------------------- | |
| # Chat UI (schema-safe) | |
| # ---------------------------- | |
| with gr.Blocks(title="Only-Routers") as demo: | |
| gr.Markdown("## Only-Routers\nChat mode for Verizon reps (multiple devices per message).") | |
| state = gr.State("{}") | |
| chatbot = gr.Chatbot(label="Only-Routers Chat", height=560, type="tuples") | |
| msg = gr.Textbox(label="Message", placeholder="Example: RUT240, WR21\nVehicle install", lines=2) | |
| send = gr.Button("Send", variant="primary") | |
| def chat_fn(user_msg, history, st_json): | |
| t0 = time.perf_counter() | |
| st = state_load(st_json) | |
| st.setdefault("cases", {}) | |
| st.setdefault("last_case_keys", []) | |
| st.setdefault("pending", {}) | |
| st.setdefault("awaiting_questions", False) | |
| text = (user_msg or "").strip() | |
| if not text: | |
| return history, state_dump(st) | |
| # Pending A/B pick | |
| if st.get("pending", {}).get("type") == "pick": | |
| opts = st["pending"].get("options", []) | |
| choice = text.strip().lower() | |
| idx = 0 if choice in {"a","1"} else (1 if choice in {"b","2"} else None) | |
| if idx is None or idx >= len(opts): | |
| history.append((text, "Please reply with **A** or **B**.")) | |
| return history, state_dump(st) | |
| chosen_row = int(opts[idx]["row_idx"]) | |
| life_row = df_eos.iloc[chosen_row] | |
| eos, eol, status = row_to_dates_and_status(life_row) | |
| repl = pick_replacements(life_row, status) | |
| canon_make = str(life_row.get("_canon_make","UNKNOWN")) | |
| feat_df, fit_df = build_tables(repl["repl_4g"], repl["repl_5g"], canon_make) | |
| url4 = guess_maker_url(repl["repl_4g"], canon_make) if repl["repl_4g"] != "Not applicable" else "" | |
| url5 = guess_maker_url(repl["repl_5g"], canon_make) if repl["repl_5g"] != "Not listed" else "" | |
| ck = make_case_key(str(life_row.get("sku",""))) | |
| st["cases"][ck] = {"row_idx": chosen_row, "repl": repl, "canon_make": canon_make, "status": status, "eos": eos, "eol": eol, "urls": {"4g": url4, "5g": url5}} | |
| st["last_case_keys"].append(ck) | |
| st["pending"] = {"type":"install_mode", "case_keys":[ck]} | |
| st["awaiting_questions"] = True | |
| bot = [] | |
| bot.append(f"**{ck}**") | |
| bot.append(f"- Status: **{status}** | EOS: **{eos}** | EOL: **{eol}**") | |
| bot.append(f"- 4G alternative: **{repl['repl_4g']}**") | |
| bot.append(f"- 5G replacement: **{repl['repl_5g']}**") | |
| if url4: | |
| bot.append(f"- 4G manufacturer page: {url4}") | |
| if url5: | |
| bot.append(f"- 5G manufacturer page: {url5}") | |
| bot.append("\n**Replacement features**\n" + df_to_md(feat_df)) | |
| bot.append("\n**Verizon fit**\n" + df_to_md(fit_df)) | |
| bot.append("\nFor antennas: **Vehicle/Mobile** or **Stationary**? If Stationary: **Indoor**, **Outdoor**, or **Directional**.") | |
| bot.append("Any questions about the suggested device(s)?") | |
| history.append((text, "\n".join(bot))) | |
| _tlog("chat pick flow", t0) | |
| return history, state_dump(st) | |
| # Pending install-mode | |
| if st.get("pending", {}).get("type") == "install_mode": | |
| mode, detail = parse_install_mode(text) | |
| if mode is None: | |
| history.append((text, "Quick one: **Vehicle/Mobile** or **Stationary**? If Stationary: **Indoor**, **Outdoor**, or **Directional**.")) | |
| return history, state_dump(st) | |
| updates=[] | |
| for ck in st["pending"].get("case_keys", []): | |
| case = st["cases"].get(ck, {}) | |
| repl5 = (case.get("repl", {}) or {}).get("repl_5g","") | |
| ant = antenna_pick(repl5, mode=mode, detail=detail) | |
| case.setdefault("antennas", {}) | |
| case["antennas"][f"{mode}:{detail or ''}"] = ant | |
| st["cases"][ck] = case | |
| updates.append(f"**{ck}** antenna ({mode}{' / '+detail if detail else ''}): {ant.get('name','')} (PN {ant.get('part_number','')})") | |
| st["pending"] = {} | |
| history.append((text, "\n".join(updates))) | |
| _tlog("chat antenna flow", t0) | |
| return history, state_dump(st) | |
| # Device lookup | |
| device_terms = extract_device_terms(text) | |
| if device_terms: | |
| bots=[] | |
| new_case_keys=[] | |
| for term in device_terms: | |
| res = resolve_device(term) | |
| if res.get("mode") == "pick": | |
| st["pending"] = {"type":"pick", "options": res.get("options", []), "raw": term} | |
| opts = res.get("options", []) | |
| bot = "I found more than one close match. Reply **A** or **B**:\n" | |
| for i,o in enumerate(opts): | |
| bot += f"- **{'A' if i==0 else 'B'}**: {o.get('label','')}\n" | |
| history.append((text, bot.strip())) | |
| _tlog("chat resolve->pick", t0) | |
| return history, state_dump(st) | |
| if res.get("mode") != "ok": | |
| bots.append(f"**{term}**: not found in lifecycle list. Who makes it (manufacturer) and what's the exact model/SKU?") | |
| continue | |
| life_row = df_eos.iloc[int(res["row_idx"])] | |
| eos, eol, status = row_to_dates_and_status(life_row) | |
| repl = pick_replacements(life_row, status) | |
| canon_make = str(life_row.get("_canon_make","UNKNOWN")) | |
| t1 = time.perf_counter() | |
| feat_df, fit_df = build_tables(repl["repl_4g"], repl["repl_5g"], canon_make) | |
| _tlog("tables", t1) | |
| url4 = guess_maker_url(repl["repl_4g"], canon_make) if repl["repl_4g"] != "Not applicable" else "" | |
| url5 = guess_maker_url(repl["repl_5g"], canon_make) if repl["repl_5g"] != "Not listed" else "" | |
| ck = make_case_key(str(life_row.get("sku","")) or term) | |
| st["cases"][ck] = {"row_idx": int(res["row_idx"]), "repl": repl, "canon_make": canon_make, "status": status, "eos": eos, "eol": eol, "urls": {"4g": url4, "5g": url5}} | |
| st["last_case_keys"].append(ck) | |
| new_case_keys.append(ck) | |
| bot=[] | |
| bot.append(f"**{ck}**") | |
| bot.append(f"- Status: **{status}** | EOS: **{eos}** | EOL: **{eol}**") | |
| bot.append(f"- 4G alternative: **{repl['repl_4g']}**") | |
| bot.append(f"- 5G replacement: **{repl['repl_5g']}**") | |
| if url4: | |
| bot.append(f"- 4G manufacturer page: {url4}") | |
| if url5: | |
| bot.append(f"- 5G manufacturer page: {url5}") | |
| bot.append("\n**Replacement features**\n" + df_to_md(feat_df)) | |
| bot.append("\n**Verizon fit**\n" + df_to_md(fit_df)) | |
| bots.append("\n".join(bot)) | |
| if new_case_keys: | |
| st["pending"] = {"type":"install_mode", "case_keys": new_case_keys} | |
| bots.append("\nFor antennas: **Vehicle/Mobile** or **Stationary**? If Stationary: **Indoor**, **Outdoor**, or **Directional**.") | |
| bots.append("Any questions about the suggested device(s)?") | |
| st["awaiting_questions"] = True | |
| history.append((text, "\n\n---\n\n".join(bots))) | |
| _tlog("chat lookup flow", t0) | |
| return history, state_dump(st) | |
| # Q&A about most recent case | |
| if not st.get("last_case_keys"): | |
| history.append((text, "Tell me the router model/SKU you’re working with (you can paste multiple).")) | |
| return history, state_dump(st) | |
| ck = st["last_case_keys"][-1] | |
| case = st["cases"].get(ck, {}) | |
| ctx = {"case": ck, "replacements": case.get("repl", {}), "urls": case.get("urls", {}), "antennas": case.get("antennas", {})} | |
| ans = gpt_answer(text, ctx) | |
| history.append((text, ans)) | |
| _tlog("chat qa flow", t0) | |
| return history, state_dump(st) | |
| send.click(fn=chat_fn, inputs=[msg, chatbot, state], outputs=[chatbot, state], api_name=False) | |
| demo.launch(server_name="0.0.0.0", server_port=int(os.getenv("PORT","7860")), share=False, show_api=False) | |