Masters-four-Tab-OpenAI / backend /scripts /normalize_router_pricing_sources.py
Pete Dunn
Improve KB synthesis and rapid router canary flow
ed025e5
#!/usr/bin/env python3
from __future__ import annotations
import csv
import os
import re
from dataclasses import dataclass
from pathlib import Path
from typing import Any, Dict, Iterable, List, Optional, Sequence, Tuple
import openpyxl
import pdfplumber
REPO_ROOT = Path(__file__).resolve().parents[2]
SOURCES_DIR = REPO_ROOT / "backend" / "app" / "knowledgebase" / "data" / "pricing_sources"
OUT_DIR = REPO_ROOT / "backend" / "app" / "knowledgebase" / "data" / "normalized"
EXTRA_SOURCES_ENV = "ROUTER_PRICING_EXTRA_SOURCE_DIRS"
DEFAULT_EXTERNAL_SOURCE_DIRS: Tuple[Path, ...] = (
REPO_ROOT.parent / "RAG SKU and PRice List",
)
def _display_path(path: Optional[Path]) -> str:
if path is None:
return ""
try:
return str(path.resolve().relative_to(REPO_ROOT))
except Exception:
try:
return str(path.relative_to(REPO_ROOT))
except Exception:
return str(path)
FAMILY_HINTS = [
"AKITA",
"CHINOOK",
"HUSKY",
"ALBATROSS",
"WHIPPET",
"AIREDALE",
"BEAGLE",
"BOXER",
"DALMATIAN",
"GREYHOUND",
"IRISH SETTER",
"LABRADOR",
"ROTTWEILER",
"DOBERMAN",
"BORDER COLLIE",
"GREAT DANE",
"ST. BERNARD",
]
VERIZON_GATEWAY_WEB_ROWS: Tuple[Dict[str, str], ...] = (
{
"manufacturer": "Verizon (OEM Arcadyan)",
"model": "XC46BE",
"title": "Verizon Business Internet Gateway XC46BE (Dragon)",
},
{
"manufacturer": "Verizon (OEM Askey)",
"model": "FSNO21VA",
"title": "Verizon Internet Gateway Business FSNO21VA",
},
{
"manufacturer": "Verizon (OEM Askey)",
"model": "ASK-NCM1100E",
"title": "Verizon Business Internet Gateway ASK-NCM1100E (Crown)",
},
{
"manufacturer": "Verizon (OEM Askey)",
"model": "ASK-NCQ1338E",
"title": "Verizon Internet Gateway for Business ASK-NCQ1338E",
},
{
"manufacturer": "Verizon (OEM Arris)",
"model": "NVG558",
"title": "Arris NVG558 LTE Router",
},
{
"manufacturer": "Verizon (OEM Askey)",
"model": "ASK-NCM1100",
"title": "Verizon Home Internet Router ASK-NCM1100",
},
{
"manufacturer": "Verizon (OEM Askey)",
"model": "ASK-NCQ1338",
"title": "Verizon Internet Gateway ASK-NCQ1338",
},
{
"manufacturer": "Verizon (OEM Askey)",
"model": "ASK-NCQ1338FA",
"title": "Verizon Internet Gateway ASK-NCQ1338FA",
},
{
"manufacturer": "Verizon (OEM Arcadyan)",
"model": "ARC-XCI55AX",
"title": "Verizon Internet Gateway ARC-XCI55AX",
},
{
"manufacturer": "Verizon (OEM Wistron NeWeb)",
"model": "WNC-CR200A",
"title": "Verizon Internet Gateway WNC-CR200A",
},
)
MODEL_ALIAS = {
"RX50": "RX55",
"RX60": "XR60",
"CR202LITE": "CR202",
"CR202-LITE": "CR202",
"MAXBR1PRO": "MAXBR1PRO",
"MAXBR1PRO5G": "MAXBR1PRO5G",
"BR1PRO": "MAXBR1PRO",
"BR1PRO5G": "MAXBR1PRO5G",
"BR1MINI": "BR1MINI5G",
"MAXBR1MINI": "BR1MINI5G",
"BR1MINI5G": "BR1MINI5G",
"FW2000": "FW2000E",
"MG51E": "MG51",
}
HEADER_ALIASES = {
"sku": ("sku", "partnumber", "part number", "item name", "part #", "parsec part #", "parsec accessories part #"),
"title": ("product", "model", "item name", "short description", "product model", "variant"),
"description": ("description", "long description", "desc (bom & accessory content)", "desc", "special offers & notes", "notes", "remark"),
"msrp": ("msrp", "msrp / retail price", "list price", "list ($ usd)", "retail price", "map"),
"term": ("term", "registration", "subscription duration included"),
"wifi": ("wifi", "wi-fi", "wifi standard", "wi-fi standard", "wifi capability", "wireless"),
"ethernet": ("ethernet ports", "ethernet port", "lan", "lan ports and speed", "wan", "wan ports and speed"),
"serial": ("serial", "serial port", "serial ports"),
"battery": ("battery",),
"modem": ("cellular networks", "wan", "modem", "modem type", "cat/cellular", "cat"),
"poe": ("poe",),
}
@dataclass
class CatalogRow:
source_file: str
source_sheet: str
source_row: int
manufacturer: str
item_type: str
title: str
model: str
model_key: str
sku: str
msrp: str
msrp_numeric: Optional[float]
term: str
wifi: str
ethernet_ports: str
serial_ports: str
battery: str
ruggedization: str
modem_type: str
primary_use_case: str
device_type: str
poe: str
notes: str
sources_label: str
def _norm(text: Any) -> str:
return re.sub(r"\s+", " ", str(text or "").strip())
def _compact(text: Any) -> str:
return re.sub(r"[^A-Z0-9]", "", _norm(text).upper())
def _file_key(path_or_name: Any) -> str:
name = Path(str(path_or_name or "")).name
return re.sub(r"[^a-z0-9]+", "", name.lower())
def _safe_mtime(path: Path) -> float:
try:
return float(path.stat().st_mtime)
except Exception:
return 0.0
def _iter_source_dirs() -> List[Path]:
dirs: List[Path] = [SOURCES_DIR]
for d in DEFAULT_EXTERNAL_SOURCE_DIRS:
if d.exists() and d.is_dir():
dirs.append(d)
raw_extra = str(os.getenv(EXTRA_SOURCES_ENV, "") or "").strip()
if raw_extra:
parts = [p.strip() for p in re.split(r"[,;]", raw_extra) if p.strip()]
for part in parts:
p = Path(part).expanduser()
if p.exists() and p.is_dir():
dirs.append(p)
out: List[Path] = []
seen: set[str] = set()
for d in dirs:
key = str(d.resolve()) if d.exists() else str(d)
if key in seen:
continue
seen.add(key)
out.append(d)
return out
def _iter_source_files(extensions: Sequence[str]) -> List[Path]:
normalized_ext = {str(x).strip().lower() for x in extensions if str(x).strip()}
by_key: Dict[str, Path] = {}
for src_dir in _iter_source_dirs():
for path in sorted(src_dir.iterdir()):
if (not path.is_file()) or path.name.startswith("~$"):
continue
if normalized_ext and path.suffix.lower() not in normalized_ext:
continue
key = _file_key(path.name)
prev = by_key.get(key)
if prev is None or _safe_mtime(path) > _safe_mtime(prev):
by_key[key] = path
return sorted(by_key.values(), key=lambda p: (p.name.lower(), str(p)))
def _is_parsec_source_name(path: Path) -> bool:
key = _file_key(path.name)
return ("parsec" in key) and key.endswith("xlsx")
def _is_peplink_overlay_source_name(path: Path) -> bool:
key = _file_key(path.name)
if not key.endswith("xlsx"):
return False
if "peplink" not in key or "device" not in key:
return False
return ("replacement" in key) or ("repalcement" in key)
def _find_parsec_source_xlsx() -> Optional[Path]:
candidates = [p for p in _iter_source_files((".xlsx",)) if _is_parsec_source_name(p)]
if not candidates:
return None
candidates.sort(
key=lambda p: (
0 if "pricing" in _file_key(p.name) else 1,
-_safe_mtime(p),
p.name.lower(),
)
)
return candidates[0]
def _find_peplink_overlay_source_xlsx() -> Optional[Path]:
candidates = [p for p in _iter_source_files((".xlsx",)) if _is_peplink_overlay_source_name(p)]
if not candidates:
return None
candidates.sort(
key=lambda p: (
0 if "replacementdevices" in _file_key(p.name) else 1,
-_safe_mtime(p),
p.name.lower(),
)
)
return candidates[0]
def _iter_router_pricing_xlsx_sources() -> List[Path]:
out: List[Path] = []
for path in _iter_source_files((".xlsx",)):
if _is_parsec_source_name(path):
continue
if _is_peplink_overlay_source_name(path):
continue
out.append(path)
return out
def _find_atel_pdf() -> Optional[Path]:
candidates: List[Path] = []
for path in _iter_source_files((".pdf",)):
key = _file_key(path.name)
if "atel" in key and ("pricing" in key or "wholesale" in key):
candidates.append(path)
if not candidates:
return None
candidates.sort(key=lambda p: (-_safe_mtime(p), p.name.lower()))
return candidates[0]
def _parse_money(value: Any) -> Tuple[str, Optional[float]]:
raw = _norm(value)
if not raw:
return "", None
m = re.search(r"(-?\d{1,3}(?:,\d{3})*(?:\.\d+)?|-?\d+(?:\.\d+)?)", raw.replace("$", ""))
if not m:
return raw, None
try:
num = float(m.group(1).replace(",", ""))
except Exception:
return raw, None
return f"${num:,.2f}", num
def _extract_term(text: str) -> str:
low = text.lower()
m = re.search(r"\b([135])\s*[- ]?year\b", low)
if m:
return f"{m.group(1)}YR"
m = re.search(r"\b([135])\s*yr\b", low)
if m:
return f"{m.group(1)}YR"
return ""
def _model_candidates(text: str) -> List[str]:
raw = _norm(text)
if not raw:
return []
out: List[str] = []
seen: set[str] = set()
phrase_patterns = [
r"\bmax\s*br\s*1\s*pro\s*5g\b",
r"\bmax\s*br\s*1\s*mini\s*5g\b",
r"\bmax\s*br\s*1\s*pro\b",
r"\bmax\s*br\s*1\s*mini\b",
]
for patt in phrase_patterns:
for m in re.finditer(patt, raw, flags=re.IGNORECASE):
tok = _compact(m.group(0))
tok = MODEL_ALIAS.get(tok, tok)
if tok and tok not in seen:
seen.add(tok)
out.append(tok)
for m in re.finditer(r"\b[A-Za-z]{1,8}[- ]?\d{2,4}[A-Za-z0-9\-]*\b", raw):
tok = _compact(m.group(0))
if not tok:
continue
if tok.isdigit() or len(tok) < 4:
continue
if (not any(c.isalpha() for c in tok)) or (not any(c.isdigit() for c in tok)):
continue
tok = MODEL_ALIAS.get(tok, tok)
if tok in {"SKU", "MSRP", "MODEL"}:
continue
if tok not in seen:
seen.add(tok)
out.append(tok)
return out
def _manufacturer_from_filename(name: str) -> str:
low = name.lower()
if "semtech" in low or "sierra" in low:
return "Semtech"
if "ericsson" in low or "cradlepoint" in low:
return "Ericsson Cradlepoint"
if "inseego" in low:
return "Inseego"
if "inhand" in low:
return "InHand Networks"
if "peplink" in low:
return "Peplink"
if "parsec" in low:
return "Parsec"
if "atel" in low:
return "ATEL"
return "Unknown"
def _clean_header(text: Any) -> str:
return _compact(text).lower()
def _header_index_map(header_row: Sequence[Any]) -> Dict[str, int]:
idx: Dict[str, int] = {}
clean = [_clean_header(h) for h in header_row]
for key, aliases in HEADER_ALIASES.items():
for i, h in enumerate(clean):
if not h:
continue
for cand in aliases:
if _clean_header(cand) == h:
idx[key] = i
break
if key in idx:
break
if key in idx:
continue
for i, h in enumerate(clean):
if not h:
continue
if key == "msrp" and ("msrp" in h or "listprice" in h or "retailprice" in h or "listusd" in h):
idx[key] = i
break
if key == "sku" and ("sku" in h or "partnumber" in h or h.startswith("part")):
idx[key] = i
break
if key == "title" and any(t in h for t in ("product", "model", "itemname", "shortdescription")):
idx[key] = i
break
if key == "description" and any(t in h for t in ("description", "longdescription", "desc", "notes", "remark")):
idx[key] = i
break
if key == "ethernet" and "ethernet" in h:
idx[key] = i
break
if key == "serial" and "serial" in h:
idx[key] = i
break
if key == "wifi" and "wifi" in h:
idx[key] = i
break
if key == "battery" and "battery" in h:
idx[key] = i
break
if key == "modem" and ("cellular" in h or h == "wan" or "modem" in h or "cat" in h):
idx[key] = i
break
if key == "poe" and "poe" in h:
idx[key] = i
break
return idx
def _detect_header_row(rows: Sequence[Sequence[Any]]) -> Optional[int]:
for i, row in enumerate(rows[:35]):
joined = " ".join(_norm(v).lower() for v in row if _norm(v))
if not joined:
continue
score = 0
if "msrp" in joined or "list price" in joined or "retail price" in joined:
score += 2
if "sku" in joined or "part number" in joined or "partnumber" in joined:
score += 2
if "description" in joined or "product" in joined or "model" in joined:
score += 2
if score >= 4:
return i
return None
def _rowval(row: Sequence[Any], idx_map: Dict[str, int], key: str) -> str:
ix = idx_map.get(key)
if ix is None or ix >= len(row):
return ""
return _norm(row[ix])
def _infer_device_type(blob_low: str) -> str:
if any(t in blob_low for t in ("license", "subscription", "support", "renewal", "service", "primecare", "incontrol", "airlink complete")):
return "Service/License"
if any(t in blob_low for t in ("antenna", "mount", "cable", "adapter", "accessory", "bracket")):
return "Accessory"
if any(t in blob_low for t in ("gateway", "router", "fwa", "cpe", "modem")):
return "Router"
return "Unknown"
def _extract_ruggedization(blob_low: str) -> str:
hits = []
for term in ("ip67", "ip66", "ip65", "ip64", "ip54", "mil-std", "vibration", "ignition sensing", "metal housing", "aluminum"):
if term in blob_low:
hits.append(term)
return ", ".join(dict.fromkeys(hits))
def _extract_poe(blob_low: str, explicit: str) -> str:
if explicit:
return explicit
if "poe" in blob_low:
return "yes"
return ""
def _build_catalog_rows() -> List[CatalogRow]:
rows_out: List[CatalogRow] = []
for path in _iter_router_pricing_xlsx_sources():
manu = _manufacturer_from_filename(path.name)
wb = openpyxl.load_workbook(path, data_only=True, read_only=True)
for ws in wb.worksheets:
matrix: List[List[Any]] = []
max_cols = min(80, ws.max_column or 0)
for r in ws.iter_rows(min_row=1, max_row=min(4000, ws.max_row or 0), min_col=1, max_col=max_cols, values_only=True):
matrix.append(list(r))
if not matrix:
continue
hdr_idx = _detect_header_row(matrix)
if hdr_idx is None:
continue
hdr = matrix[hdr_idx]
idx_map = _header_index_map(hdr)
if ("msrp" not in idx_map) or ("sku" not in idx_map and "title" not in idx_map and "description" not in idx_map):
continue
for i in range(hdr_idx + 1, len(matrix)):
row = matrix[i]
if not row:
continue
sku = _rowval(row, idx_map, "sku")
title = _rowval(row, idx_map, "title")
desc = _rowval(row, idx_map, "description")
msrp_raw = _rowval(row, idx_map, "msrp")
msrp, msrp_n = _parse_money(msrp_raw)
if not msrp and msrp_n is None:
continue
blob = " ".join(x for x in (title, desc, sku) if x)
blob_low = blob.lower()
if not blob:
continue
device_type = _infer_device_type(blob_low)
if device_type == "Service/License":
# keep for SKU term lookups but skip as router model catalog rows
continue
model_candidates = _model_candidates(blob)
if not model_candidates:
continue
model_key = model_candidates[0]
model = model_key
term = _rowval(row, idx_map, "term") or _extract_term(blob)
wifi = _rowval(row, idx_map, "wifi")
ethernet = _rowval(row, idx_map, "ethernet")
serial = _rowval(row, idx_map, "serial")
battery = _rowval(row, idx_map, "battery")
modem = _rowval(row, idx_map, "modem")
poe = _extract_poe(blob_low, _rowval(row, idx_map, "poe"))
rugged = _extract_ruggedization(blob_low)
item_type = "Router" if device_type == "Router" else "Accessory"
rows_out.append(
CatalogRow(
source_file=path.name,
source_sheet=ws.title,
source_row=i + 1,
manufacturer=manu,
item_type=item_type,
title=title or model,
model=model,
model_key=model_key,
sku=sku,
msrp=msrp or _norm(msrp_raw),
msrp_numeric=msrp_n,
term=term,
wifi=wifi,
ethernet_ports=ethernet,
serial_ports=serial,
battery=battery,
ruggedization=rugged,
modem_type=modem,
primary_use_case="",
device_type=device_type,
poe=poe,
notes=desc,
sources_label="manufacturer pricing sheet",
)
)
# ATEL PDF (MSRP only; skip wholesale fields)
atel = _find_atel_pdf()
if atel and atel.exists():
with pdfplumber.open(str(atel)) as pdf:
for pidx, page in enumerate(pdf.pages, start=1):
tables = page.extract_tables() or []
for table in tables:
if not table or len(table) < 2:
continue
head = [(_norm(x).lower()) for x in table[1]]
def col(name: str) -> int:
for i, h in enumerate(head):
if name in h:
return i
return -1
model_i = col("model")
sku_i = col("sku")
msrp_i = col("msrp")
desc_i = col("desc")
if model_i < 0 or msrp_i < 0:
continue
for ridx, r in enumerate(table[2:], start=3):
model_raw = _norm(r[model_i] if model_i < len(r) else "")
sku = _norm(r[sku_i] if sku_i < len(r) and sku_i >= 0 else "")
desc = _norm(r[desc_i] if desc_i < len(r) and desc_i >= 0 else "")
msrp_raw = _norm(r[msrp_i] if msrp_i < len(r) else "")
if not model_raw or not msrp_raw:
continue
msrp, msrp_n = _parse_money(msrp_raw)
model_cands = _model_candidates(f"{model_raw} {desc}")
model_key = model_cands[0] if model_cands else _compact(model_raw)
if not model_key or model_key.isdigit() or len(model_key) < 4:
continue
rows_out.append(
CatalogRow(
source_file=atel.name,
source_sheet=f"page_{pidx}",
source_row=ridx,
manufacturer="ATEL",
item_type="Router",
title=model_raw,
model=model_key,
model_key=model_key,
sku=sku,
msrp=msrp or msrp_raw,
msrp_numeric=msrp_n,
term=_extract_term(f"{model_raw} {desc}"),
wifi="",
ethernet_ports="",
serial_ports="",
battery="",
ruggedization=_extract_ruggedization(desc.lower()),
modem_type="",
primary_use_case="",
device_type="Router",
poe=_extract_poe(desc.lower(), ""),
notes=desc,
sources_label="manufacturer pricing sheet (MSRP only)",
)
)
deduped: List[CatalogRow] = []
seen: set[Tuple[str, str, str, str, str]] = set()
for row in rows_out:
fp = (
_compact(row.manufacturer),
_compact(row.model_key),
_compact(row.sku),
_compact(row.term),
_compact(row.msrp),
)
if fp in seen:
continue
seen.add(fp)
deduped.append(row)
return deduped
def _verizon_gateway_web_rows() -> List[CatalogRow]:
"""
Add Verizon gateway model tokens so deterministic router facts can resolve them.
These rows are explicitly marked as web-sourced and are not used to fabricate pricing.
"""
rows: List[CatalogRow] = []
for idx, spec in enumerate(VERIZON_GATEWAY_WEB_ROWS, start=1):
model = _norm(spec.get("model", ""))
model_key = _compact(model)
if not model_key:
continue
title = _norm(spec.get("title", "")) or model
manufacturer = _norm(spec.get("manufacturer", "")) or "Verizon"
rows.append(
CatalogRow(
source_file="verizon_support_gateway_models_web.csv",
source_sheet="verizon_support_web",
source_row=idx,
manufacturer=manufacturer,
item_type="Router",
title=title,
model=model_key,
model_key=model_key,
sku=model,
msrp="Unknown, ask Masters",
msrp_numeric=None,
term="",
wifi="",
ethernet_ports="",
serial_ports="",
battery="",
ruggedization="",
modem_type="5G gateway (web-sourced model index)",
primary_use_case="Verizon gateway model token coverage",
device_type="Router",
poe="",
notes=(
"Web-sourced (not from internal docs): model token from Verizon support "
"gateway equipment pages; verify exact hardware revision before quoting."
),
sources_label="Web-sourced (not from internal docs): Verizon support equipment listing",
)
)
return rows
def _write_router_catalog(rows: Sequence[CatalogRow]) -> Path:
OUT_DIR.mkdir(parents=True, exist_ok=True)
out = OUT_DIR / "router_pricing_catalog_normalized.csv"
fields = [
"source_file",
"source_sheet",
"source_row",
"manufacturer",
"item_type",
"title",
"model",
"model_key",
"sku",
"msrp",
"msrp_numeric",
"term",
"wifi",
"ethernet_ports",
"serial_ports",
"battery",
"ruggedization",
"modem_type",
"primary_use_case",
"device_type",
"poe",
"notes",
"sources_label",
]
with out.open("w", encoding="utf-8", newline="") as f:
w = csv.DictWriter(f, fieldnames=fields)
w.writeheader()
for r in rows:
w.writerow({
"source_file": r.source_file,
"source_sheet": r.source_sheet,
"source_row": r.source_row,
"manufacturer": r.manufacturer,
"item_type": r.item_type,
"title": r.title,
"model": r.model,
"model_key": r.model_key,
"sku": r.sku,
"msrp": r.msrp,
"msrp_numeric": f"{r.msrp_numeric:.2f}" if isinstance(r.msrp_numeric, float) else "",
"term": r.term,
"wifi": r.wifi,
"ethernet_ports": r.ethernet_ports,
"serial_ports": r.serial_ports,
"battery": r.battery,
"ruggedization": r.ruggedization,
"modem_type": r.modem_type,
"primary_use_case": r.primary_use_case,
"device_type": r.device_type,
"poe": r.poe,
"notes": r.notes,
"sources_label": r.sources_label,
})
return out
def _write_variant_options(rows: Sequence[CatalogRow]) -> Path:
out = OUT_DIR / "router_variant_options_normalized.csv"
fields = [
"manufacturer",
"model_key",
"model",
"default_option",
"sku",
"title",
"term",
"msrp",
"msrp_numeric",
"wifi",
"ethernet_ports",
"serial_ports",
"poe",
"battery",
"modem_type",
"source_file",
"source_sheet",
"source_row",
]
groups: Dict[Tuple[str, str], List[CatalogRow]] = {}
for r in rows:
groups.setdefault((r.manufacturer, r.model_key), []).append(r)
def rank_row(r: CatalogRow) -> Tuple[int, float, str]:
term = _norm(r.term).upper()
if term == "1YR":
term_rank = 0
elif not term:
term_rank = 1
else:
term_rank = 2
msrp = float(r.msrp_numeric) if isinstance(r.msrp_numeric, float) else 999999.0
return (term_rank, msrp, _norm(r.sku).upper())
with out.open("w", encoding="utf-8", newline="") as f:
w = csv.DictWriter(f, fieldnames=fields)
w.writeheader()
for (manu, key), items in sorted(groups.items()):
items_sorted = sorted(items, key=rank_row)
default_sku = _norm(items_sorted[0].sku)
for r in items_sorted:
w.writerow(
{
"manufacturer": manu,
"model_key": key,
"model": r.model,
"default_option": "yes" if _norm(r.sku) == default_sku else "no",
"sku": r.sku,
"title": r.title,
"term": r.term,
"msrp": r.msrp,
"msrp_numeric": f"{r.msrp_numeric:.2f}" if isinstance(r.msrp_numeric, float) else "",
"wifi": r.wifi,
"ethernet_ports": r.ethernet_ports,
"serial_ports": r.serial_ports,
"poe": r.poe,
"battery": r.battery,
"modem_type": r.modem_type,
"source_file": r.source_file,
"source_sheet": r.source_sheet,
"source_row": r.source_row,
}
)
return out
def _parse_parsec_family(description: str) -> str:
up = _norm(description).upper()
for fam in FAMILY_HINTS:
if fam in up:
return fam.title()
m = re.search(r"PRO\s+([A-Z][A-Z ]{2,30})", up)
if m:
return _norm(m.group(1)).title()
return ""
def _write_parsec_pricing() -> Path:
src = _find_parsec_source_xlsx()
out = OUT_DIR / "parsec_pricing_normalized.csv"
fields = [
"source_file",
"source_sheet",
"source_row",
"manufacturer",
"category",
"family",
"part_number",
"title",
"description",
"msrp",
"msrp_numeric",
"connector_summary",
"fit_profile",
]
rows: List[Dict[str, Any]] = []
if src and src.exists():
wb = openpyxl.load_workbook(src, data_only=True, read_only=True)
for ws in wb.worksheets:
matrix = [list(r) for r in ws.iter_rows(min_row=1, max_row=min(2500, ws.max_row or 0), min_col=1, max_col=min(20, ws.max_column or 0), values_only=True)]
if not matrix:
continue
hdr_idx = _detect_header_row(matrix)
if hdr_idx is None:
continue
idx_map = _header_index_map(matrix[hdr_idx])
if "sku" not in idx_map or "msrp" not in idx_map:
continue
for i in range(hdr_idx + 1, len(matrix)):
row = matrix[i]
part = _rowval(row, idx_map, "sku")
desc = _rowval(row, idx_map, "description") or _rowval(row, idx_map, "title")
msrp_raw = _rowval(row, idx_map, "msrp")
if not part or not msrp_raw:
continue
msrp, msrp_n = _parse_money(msrp_raw)
if msrp_n is None:
continue
fit = ""
low_desc = desc.lower()
if "vehicle" in low_desc or "mobile" in low_desc:
fit = "vehicle"
elif "indoor" in low_desc:
fit = "indoor"
elif "directional" in low_desc:
fit = "directional"
elif "outdoor" in low_desc or "pole" in low_desc or "wall" in low_desc:
fit = "outdoor fixed"
family = _parse_parsec_family(desc or part)
connector_summary = ""
for patt in (r"\b\d\s*LTE\b", r"\b\d\s*WiFi\b", r"\b\d\s*GNSS\b", r"SMA", r"RP\s*SMA"):
m = re.search(patt, desc, flags=re.IGNORECASE)
if m:
connector_summary += ("; " if connector_summary else "") + _norm(m.group(0))
rows.append(
{
"source_file": src.name,
"source_sheet": ws.title,
"source_row": i + 1,
"manufacturer": "Parsec",
"category": ws.title,
"family": family,
"part_number": part,
"title": part,
"description": desc,
"msrp": msrp,
"msrp_numeric": f"{msrp_n:.2f}",
"connector_summary": connector_summary,
"fit_profile": fit,
}
)
with out.open("w", encoding="utf-8", newline="") as f:
w = csv.DictWriter(f, fieldnames=fields)
w.writeheader()
for r in rows:
w.writerow(r)
return out
def _write_peplink_overlay() -> Path:
src = _find_peplink_overlay_source_xlsx()
out = OUT_DIR / "peplink_replacement_overlay.csv"
fields = [
"source_file",
"source_sheet",
"source_row",
"old_item_type",
"old_item_name",
"old_description",
"old_model_key",
"old_sku",
"new_item_type",
"new_item_name",
"new_description",
"new_model_key",
"new_sku",
"eos_year",
"eol_year",
"notes",
]
rows: List[Dict[str, Any]] = []
if src and src.exists():
wb = openpyxl.load_workbook(src, data_only=True, read_only=True)
ws = wb.worksheets[0]
matrix = [list(r) for r in ws.iter_rows(min_row=1, max_row=min(1000, ws.max_row or 0), min_col=1, max_col=min(20, ws.max_column or 0), values_only=True)]
if matrix:
hdr = [_norm(x).lower() for x in matrix[0]]
def ix(name: str) -> int:
for i, h in enumerate(hdr):
if name in h:
return i
return -1
old_item_t = ix("item type")
old_item = ix("item name")
old_desc = ix("description")
new_item_t = ix("alternative item type")
new_item = ix("alternative item name")
new_desc = ix("alternative description")
for ridx, r in enumerate(matrix[1:], start=2):
old_name = _norm(r[old_item]) if old_item >= 0 and old_item < len(r) else ""
new_name = _norm(r[new_item]) if new_item >= 0 and new_item < len(r) else ""
if not old_name or not new_name:
continue
old_d = _norm(r[old_desc]) if old_desc >= 0 and old_desc < len(r) else ""
new_d = _norm(r[new_desc]) if new_desc >= 0 and new_desc < len(r) else ""
old_model = (_model_candidates(f"{old_name} {old_d}") or [_compact(old_name)])[0]
new_model = (_model_candidates(f"{new_name} {new_d}") or [_compact(new_name)])[0]
old_sku = next((m for m in _model_candidates(old_d) if m != old_model), "")
new_sku = next((m for m in _model_candidates(new_d) if m != new_model), "")
rows.append(
{
"source_file": src.name,
"source_sheet": ws.title,
"source_row": ridx,
"old_item_type": _norm(r[old_item_t]) if old_item_t >= 0 and old_item_t < len(r) else "",
"old_item_name": old_name,
"old_description": old_d,
"old_model_key": old_model,
"old_sku": old_sku,
"new_item_type": _norm(r[new_item_t]) if new_item_t >= 0 and new_item_t < len(r) else "",
"new_item_name": new_name,
"new_description": new_d,
"new_model_key": new_model,
"new_sku": new_sku,
"eos_year": "2024",
"eol_year": "2025",
"notes": "Policy default overlay from Peplink replacement list (EOS=2024, EOL=2025); retain stricter existing lifecycle dates where present.",
}
)
with out.open("w", encoding="utf-8", newline="") as f:
w = csv.DictWriter(f, fieldnames=fields)
w.writeheader()
for r in rows:
w.writerow(r)
return out
def main() -> None:
OUT_DIR.mkdir(parents=True, exist_ok=True)
catalog_rows = _build_catalog_rows()
catalog_rows.extend(_verizon_gateway_web_rows())
catalog_rows = [r for r in catalog_rows if r.model_key and r.msrp]
catalog_path = _write_router_catalog(catalog_rows)
variants_path = _write_variant_options(catalog_rows)
parsec_path = _write_parsec_pricing()
peplink_overlay_path = _write_peplink_overlay()
source_dirs = _iter_source_dirs()
parsec_source = _find_parsec_source_xlsx()
peplink_overlay_source = _find_peplink_overlay_source_xlsx()
atel_source = _find_atel_pdf()
summary = OUT_DIR / "pricing_normalization_summary.txt"
model_count = len({r.model_key for r in catalog_rows})
verizon_model_count = len({r.model_key for r in catalog_rows if "verizon" in _norm(r.sources_label).lower()})
with summary.open("w", encoding="utf-8") as f:
f.write(f"source_dirs={';'.join(_display_path(d) for d in source_dirs)}\n")
f.write(f"parsec_source={_display_path(parsec_source)}\n")
f.write(f"peplink_overlay_source={_display_path(peplink_overlay_source)}\n")
f.write(f"atel_source={_display_path(atel_source)}\n")
f.write(f"catalog_rows={len(catalog_rows)}\n")
f.write(f"catalog_models={model_count}\n")
f.write(f"verizon_gateway_models={verizon_model_count}\n")
f.write(f"catalog_path={_display_path(catalog_path)}\n")
f.write(f"variants_path={_display_path(variants_path)}\n")
f.write(f"parsec_path={_display_path(parsec_path)}\n")
f.write(f"peplink_overlay_path={_display_path(peplink_overlay_path)}\n")
print(f"Wrote {catalog_path}")
print(f"Wrote {variants_path}")
print(f"Wrote {parsec_path}")
print(f"Wrote {peplink_overlay_path}")
print(f"Summary: {summary}")
if __name__ == "__main__":
main()