#!/usr/bin/env python3 from __future__ import annotations import csv import os import re from dataclasses import dataclass from pathlib import Path from typing import Any, Dict, Iterable, List, Optional, Sequence, Tuple import openpyxl import pdfplumber REPO_ROOT = Path(__file__).resolve().parents[2] SOURCES_DIR = REPO_ROOT / "backend" / "app" / "knowledgebase" / "data" / "pricing_sources" OUT_DIR = REPO_ROOT / "backend" / "app" / "knowledgebase" / "data" / "normalized" EXTRA_SOURCES_ENV = "ROUTER_PRICING_EXTRA_SOURCE_DIRS" DEFAULT_EXTERNAL_SOURCE_DIRS: Tuple[Path, ...] = ( REPO_ROOT.parent / "RAG SKU and PRice List", ) def _display_path(path: Optional[Path]) -> str: if path is None: return "" try: return str(path.resolve().relative_to(REPO_ROOT)) except Exception: try: return str(path.relative_to(REPO_ROOT)) except Exception: return str(path) FAMILY_HINTS = [ "AKITA", "CHINOOK", "HUSKY", "ALBATROSS", "WHIPPET", "AIREDALE", "BEAGLE", "BOXER", "DALMATIAN", "GREYHOUND", "IRISH SETTER", "LABRADOR", "ROTTWEILER", "DOBERMAN", "BORDER COLLIE", "GREAT DANE", "ST. BERNARD", ] VERIZON_GATEWAY_WEB_ROWS: Tuple[Dict[str, str], ...] = ( { "manufacturer": "Verizon (OEM Arcadyan)", "model": "XC46BE", "title": "Verizon Business Internet Gateway XC46BE (Dragon)", }, { "manufacturer": "Verizon (OEM Askey)", "model": "FSNO21VA", "title": "Verizon Internet Gateway Business FSNO21VA", }, { "manufacturer": "Verizon (OEM Askey)", "model": "ASK-NCM1100E", "title": "Verizon Business Internet Gateway ASK-NCM1100E (Crown)", }, { "manufacturer": "Verizon (OEM Askey)", "model": "ASK-NCQ1338E", "title": "Verizon Internet Gateway for Business ASK-NCQ1338E", }, { "manufacturer": "Verizon (OEM Arris)", "model": "NVG558", "title": "Arris NVG558 LTE Router", }, { "manufacturer": "Verizon (OEM Askey)", "model": "ASK-NCM1100", "title": "Verizon Home Internet Router ASK-NCM1100", }, { "manufacturer": "Verizon (OEM Askey)", "model": "ASK-NCQ1338", "title": "Verizon Internet Gateway ASK-NCQ1338", }, { "manufacturer": "Verizon (OEM Askey)", "model": "ASK-NCQ1338FA", "title": "Verizon Internet Gateway ASK-NCQ1338FA", }, { "manufacturer": "Verizon (OEM Arcadyan)", "model": "ARC-XCI55AX", "title": "Verizon Internet Gateway ARC-XCI55AX", }, { "manufacturer": "Verizon (OEM Wistron NeWeb)", "model": "WNC-CR200A", "title": "Verizon Internet Gateway WNC-CR200A", }, ) MODEL_ALIAS = { "RX50": "RX55", "RX60": "XR60", "CR202LITE": "CR202", "CR202-LITE": "CR202", "MAXBR1PRO": "MAXBR1PRO", "MAXBR1PRO5G": "MAXBR1PRO5G", "BR1PRO": "MAXBR1PRO", "BR1PRO5G": "MAXBR1PRO5G", "BR1MINI": "BR1MINI5G", "MAXBR1MINI": "BR1MINI5G", "BR1MINI5G": "BR1MINI5G", "FW2000": "FW2000E", "MG51E": "MG51", } HEADER_ALIASES = { "sku": ("sku", "partnumber", "part number", "item name", "part #", "parsec part #", "parsec accessories part #"), "title": ("product", "model", "item name", "short description", "product model", "variant"), "description": ("description", "long description", "desc (bom & accessory content)", "desc", "special offers & notes", "notes", "remark"), "msrp": ("msrp", "msrp / retail price", "list price", "list ($ usd)", "retail price", "map"), "term": ("term", "registration", "subscription duration included"), "wifi": ("wifi", "wi-fi", "wifi standard", "wi-fi standard", "wifi capability", "wireless"), "ethernet": ("ethernet ports", "ethernet port", "lan", "lan ports and speed", "wan", "wan ports and speed"), "serial": ("serial", "serial port", "serial ports"), "battery": ("battery",), "modem": ("cellular networks", "wan", "modem", "modem type", "cat/cellular", "cat"), "poe": ("poe",), } @dataclass class CatalogRow: source_file: str source_sheet: str source_row: int manufacturer: str item_type: str title: str model: str model_key: str sku: str msrp: str msrp_numeric: Optional[float] term: str wifi: str ethernet_ports: str serial_ports: str battery: str ruggedization: str modem_type: str primary_use_case: str device_type: str poe: str notes: str sources_label: str def _norm(text: Any) -> str: return re.sub(r"\s+", " ", str(text or "").strip()) def _compact(text: Any) -> str: return re.sub(r"[^A-Z0-9]", "", _norm(text).upper()) def _file_key(path_or_name: Any) -> str: name = Path(str(path_or_name or "")).name return re.sub(r"[^a-z0-9]+", "", name.lower()) def _safe_mtime(path: Path) -> float: try: return float(path.stat().st_mtime) except Exception: return 0.0 def _iter_source_dirs() -> List[Path]: dirs: List[Path] = [SOURCES_DIR] for d in DEFAULT_EXTERNAL_SOURCE_DIRS: if d.exists() and d.is_dir(): dirs.append(d) raw_extra = str(os.getenv(EXTRA_SOURCES_ENV, "") or "").strip() if raw_extra: parts = [p.strip() for p in re.split(r"[,;]", raw_extra) if p.strip()] for part in parts: p = Path(part).expanduser() if p.exists() and p.is_dir(): dirs.append(p) out: List[Path] = [] seen: set[str] = set() for d in dirs: key = str(d.resolve()) if d.exists() else str(d) if key in seen: continue seen.add(key) out.append(d) return out def _iter_source_files(extensions: Sequence[str]) -> List[Path]: normalized_ext = {str(x).strip().lower() for x in extensions if str(x).strip()} by_key: Dict[str, Path] = {} for src_dir in _iter_source_dirs(): for path in sorted(src_dir.iterdir()): if (not path.is_file()) or path.name.startswith("~$"): continue if normalized_ext and path.suffix.lower() not in normalized_ext: continue key = _file_key(path.name) prev = by_key.get(key) if prev is None or _safe_mtime(path) > _safe_mtime(prev): by_key[key] = path return sorted(by_key.values(), key=lambda p: (p.name.lower(), str(p))) def _is_parsec_source_name(path: Path) -> bool: key = _file_key(path.name) return ("parsec" in key) and key.endswith("xlsx") def _is_peplink_overlay_source_name(path: Path) -> bool: key = _file_key(path.name) if not key.endswith("xlsx"): return False if "peplink" not in key or "device" not in key: return False return ("replacement" in key) or ("repalcement" in key) def _find_parsec_source_xlsx() -> Optional[Path]: candidates = [p for p in _iter_source_files((".xlsx",)) if _is_parsec_source_name(p)] if not candidates: return None candidates.sort( key=lambda p: ( 0 if "pricing" in _file_key(p.name) else 1, -_safe_mtime(p), p.name.lower(), ) ) return candidates[0] def _find_peplink_overlay_source_xlsx() -> Optional[Path]: candidates = [p for p in _iter_source_files((".xlsx",)) if _is_peplink_overlay_source_name(p)] if not candidates: return None candidates.sort( key=lambda p: ( 0 if "replacementdevices" in _file_key(p.name) else 1, -_safe_mtime(p), p.name.lower(), ) ) return candidates[0] def _iter_router_pricing_xlsx_sources() -> List[Path]: out: List[Path] = [] for path in _iter_source_files((".xlsx",)): if _is_parsec_source_name(path): continue if _is_peplink_overlay_source_name(path): continue out.append(path) return out def _find_atel_pdf() -> Optional[Path]: candidates: List[Path] = [] for path in _iter_source_files((".pdf",)): key = _file_key(path.name) if "atel" in key and ("pricing" in key or "wholesale" in key): candidates.append(path) if not candidates: return None candidates.sort(key=lambda p: (-_safe_mtime(p), p.name.lower())) return candidates[0] def _parse_money(value: Any) -> Tuple[str, Optional[float]]: raw = _norm(value) if not raw: return "", None m = re.search(r"(-?\d{1,3}(?:,\d{3})*(?:\.\d+)?|-?\d+(?:\.\d+)?)", raw.replace("$", "")) if not m: return raw, None try: num = float(m.group(1).replace(",", "")) except Exception: return raw, None return f"${num:,.2f}", num def _extract_term(text: str) -> str: low = text.lower() m = re.search(r"\b([135])\s*[- ]?year\b", low) if m: return f"{m.group(1)}YR" m = re.search(r"\b([135])\s*yr\b", low) if m: return f"{m.group(1)}YR" return "" def _model_candidates(text: str) -> List[str]: raw = _norm(text) if not raw: return [] out: List[str] = [] seen: set[str] = set() phrase_patterns = [ r"\bmax\s*br\s*1\s*pro\s*5g\b", r"\bmax\s*br\s*1\s*mini\s*5g\b", r"\bmax\s*br\s*1\s*pro\b", r"\bmax\s*br\s*1\s*mini\b", ] for patt in phrase_patterns: for m in re.finditer(patt, raw, flags=re.IGNORECASE): tok = _compact(m.group(0)) tok = MODEL_ALIAS.get(tok, tok) if tok and tok not in seen: seen.add(tok) out.append(tok) for m in re.finditer(r"\b[A-Za-z]{1,8}[- ]?\d{2,4}[A-Za-z0-9\-]*\b", raw): tok = _compact(m.group(0)) if not tok: continue if tok.isdigit() or len(tok) < 4: continue if (not any(c.isalpha() for c in tok)) or (not any(c.isdigit() for c in tok)): continue tok = MODEL_ALIAS.get(tok, tok) if tok in {"SKU", "MSRP", "MODEL"}: continue if tok not in seen: seen.add(tok) out.append(tok) return out def _manufacturer_from_filename(name: str) -> str: low = name.lower() if "semtech" in low or "sierra" in low: return "Semtech" if "ericsson" in low or "cradlepoint" in low: return "Ericsson Cradlepoint" if "inseego" in low: return "Inseego" if "inhand" in low: return "InHand Networks" if "peplink" in low: return "Peplink" if "parsec" in low: return "Parsec" if "atel" in low: return "ATEL" return "Unknown" def _clean_header(text: Any) -> str: return _compact(text).lower() def _header_index_map(header_row: Sequence[Any]) -> Dict[str, int]: idx: Dict[str, int] = {} clean = [_clean_header(h) for h in header_row] for key, aliases in HEADER_ALIASES.items(): for i, h in enumerate(clean): if not h: continue for cand in aliases: if _clean_header(cand) == h: idx[key] = i break if key in idx: break if key in idx: continue for i, h in enumerate(clean): if not h: continue if key == "msrp" and ("msrp" in h or "listprice" in h or "retailprice" in h or "listusd" in h): idx[key] = i break if key == "sku" and ("sku" in h or "partnumber" in h or h.startswith("part")): idx[key] = i break if key == "title" and any(t in h for t in ("product", "model", "itemname", "shortdescription")): idx[key] = i break if key == "description" and any(t in h for t in ("description", "longdescription", "desc", "notes", "remark")): idx[key] = i break if key == "ethernet" and "ethernet" in h: idx[key] = i break if key == "serial" and "serial" in h: idx[key] = i break if key == "wifi" and "wifi" in h: idx[key] = i break if key == "battery" and "battery" in h: idx[key] = i break if key == "modem" and ("cellular" in h or h == "wan" or "modem" in h or "cat" in h): idx[key] = i break if key == "poe" and "poe" in h: idx[key] = i break return idx def _detect_header_row(rows: Sequence[Sequence[Any]]) -> Optional[int]: for i, row in enumerate(rows[:35]): joined = " ".join(_norm(v).lower() for v in row if _norm(v)) if not joined: continue score = 0 if "msrp" in joined or "list price" in joined or "retail price" in joined: score += 2 if "sku" in joined or "part number" in joined or "partnumber" in joined: score += 2 if "description" in joined or "product" in joined or "model" in joined: score += 2 if score >= 4: return i return None def _rowval(row: Sequence[Any], idx_map: Dict[str, int], key: str) -> str: ix = idx_map.get(key) if ix is None or ix >= len(row): return "" return _norm(row[ix]) def _infer_device_type(blob_low: str) -> str: if any(t in blob_low for t in ("license", "subscription", "support", "renewal", "service", "primecare", "incontrol", "airlink complete")): return "Service/License" if any(t in blob_low for t in ("antenna", "mount", "cable", "adapter", "accessory", "bracket")): return "Accessory" if any(t in blob_low for t in ("gateway", "router", "fwa", "cpe", "modem")): return "Router" return "Unknown" def _extract_ruggedization(blob_low: str) -> str: hits = [] for term in ("ip67", "ip66", "ip65", "ip64", "ip54", "mil-std", "vibration", "ignition sensing", "metal housing", "aluminum"): if term in blob_low: hits.append(term) return ", ".join(dict.fromkeys(hits)) def _extract_poe(blob_low: str, explicit: str) -> str: if explicit: return explicit if "poe" in blob_low: return "yes" return "" def _build_catalog_rows() -> List[CatalogRow]: rows_out: List[CatalogRow] = [] for path in _iter_router_pricing_xlsx_sources(): manu = _manufacturer_from_filename(path.name) wb = openpyxl.load_workbook(path, data_only=True, read_only=True) for ws in wb.worksheets: matrix: List[List[Any]] = [] max_cols = min(80, ws.max_column or 0) for r in ws.iter_rows(min_row=1, max_row=min(4000, ws.max_row or 0), min_col=1, max_col=max_cols, values_only=True): matrix.append(list(r)) if not matrix: continue hdr_idx = _detect_header_row(matrix) if hdr_idx is None: continue hdr = matrix[hdr_idx] idx_map = _header_index_map(hdr) if ("msrp" not in idx_map) or ("sku" not in idx_map and "title" not in idx_map and "description" not in idx_map): continue for i in range(hdr_idx + 1, len(matrix)): row = matrix[i] if not row: continue sku = _rowval(row, idx_map, "sku") title = _rowval(row, idx_map, "title") desc = _rowval(row, idx_map, "description") msrp_raw = _rowval(row, idx_map, "msrp") msrp, msrp_n = _parse_money(msrp_raw) if not msrp and msrp_n is None: continue blob = " ".join(x for x in (title, desc, sku) if x) blob_low = blob.lower() if not blob: continue device_type = _infer_device_type(blob_low) if device_type == "Service/License": # keep for SKU term lookups but skip as router model catalog rows continue model_candidates = _model_candidates(blob) if not model_candidates: continue model_key = model_candidates[0] model = model_key term = _rowval(row, idx_map, "term") or _extract_term(blob) wifi = _rowval(row, idx_map, "wifi") ethernet = _rowval(row, idx_map, "ethernet") serial = _rowval(row, idx_map, "serial") battery = _rowval(row, idx_map, "battery") modem = _rowval(row, idx_map, "modem") poe = _extract_poe(blob_low, _rowval(row, idx_map, "poe")) rugged = _extract_ruggedization(blob_low) item_type = "Router" if device_type == "Router" else "Accessory" rows_out.append( CatalogRow( source_file=path.name, source_sheet=ws.title, source_row=i + 1, manufacturer=manu, item_type=item_type, title=title or model, model=model, model_key=model_key, sku=sku, msrp=msrp or _norm(msrp_raw), msrp_numeric=msrp_n, term=term, wifi=wifi, ethernet_ports=ethernet, serial_ports=serial, battery=battery, ruggedization=rugged, modem_type=modem, primary_use_case="", device_type=device_type, poe=poe, notes=desc, sources_label="manufacturer pricing sheet", ) ) # ATEL PDF (MSRP only; skip wholesale fields) atel = _find_atel_pdf() if atel and atel.exists(): with pdfplumber.open(str(atel)) as pdf: for pidx, page in enumerate(pdf.pages, start=1): tables = page.extract_tables() or [] for table in tables: if not table or len(table) < 2: continue head = [(_norm(x).lower()) for x in table[1]] def col(name: str) -> int: for i, h in enumerate(head): if name in h: return i return -1 model_i = col("model") sku_i = col("sku") msrp_i = col("msrp") desc_i = col("desc") if model_i < 0 or msrp_i < 0: continue for ridx, r in enumerate(table[2:], start=3): model_raw = _norm(r[model_i] if model_i < len(r) else "") sku = _norm(r[sku_i] if sku_i < len(r) and sku_i >= 0 else "") desc = _norm(r[desc_i] if desc_i < len(r) and desc_i >= 0 else "") msrp_raw = _norm(r[msrp_i] if msrp_i < len(r) else "") if not model_raw or not msrp_raw: continue msrp, msrp_n = _parse_money(msrp_raw) model_cands = _model_candidates(f"{model_raw} {desc}") model_key = model_cands[0] if model_cands else _compact(model_raw) if not model_key or model_key.isdigit() or len(model_key) < 4: continue rows_out.append( CatalogRow( source_file=atel.name, source_sheet=f"page_{pidx}", source_row=ridx, manufacturer="ATEL", item_type="Router", title=model_raw, model=model_key, model_key=model_key, sku=sku, msrp=msrp or msrp_raw, msrp_numeric=msrp_n, term=_extract_term(f"{model_raw} {desc}"), wifi="", ethernet_ports="", serial_ports="", battery="", ruggedization=_extract_ruggedization(desc.lower()), modem_type="", primary_use_case="", device_type="Router", poe=_extract_poe(desc.lower(), ""), notes=desc, sources_label="manufacturer pricing sheet (MSRP only)", ) ) deduped: List[CatalogRow] = [] seen: set[Tuple[str, str, str, str, str]] = set() for row in rows_out: fp = ( _compact(row.manufacturer), _compact(row.model_key), _compact(row.sku), _compact(row.term), _compact(row.msrp), ) if fp in seen: continue seen.add(fp) deduped.append(row) return deduped def _verizon_gateway_web_rows() -> List[CatalogRow]: """ Add Verizon gateway model tokens so deterministic router facts can resolve them. These rows are explicitly marked as web-sourced and are not used to fabricate pricing. """ rows: List[CatalogRow] = [] for idx, spec in enumerate(VERIZON_GATEWAY_WEB_ROWS, start=1): model = _norm(spec.get("model", "")) model_key = _compact(model) if not model_key: continue title = _norm(spec.get("title", "")) or model manufacturer = _norm(spec.get("manufacturer", "")) or "Verizon" rows.append( CatalogRow( source_file="verizon_support_gateway_models_web.csv", source_sheet="verizon_support_web", source_row=idx, manufacturer=manufacturer, item_type="Router", title=title, model=model_key, model_key=model_key, sku=model, msrp="Unknown, ask Masters", msrp_numeric=None, term="", wifi="", ethernet_ports="", serial_ports="", battery="", ruggedization="", modem_type="5G gateway (web-sourced model index)", primary_use_case="Verizon gateway model token coverage", device_type="Router", poe="", notes=( "Web-sourced (not from internal docs): model token from Verizon support " "gateway equipment pages; verify exact hardware revision before quoting." ), sources_label="Web-sourced (not from internal docs): Verizon support equipment listing", ) ) return rows def _write_router_catalog(rows: Sequence[CatalogRow]) -> Path: OUT_DIR.mkdir(parents=True, exist_ok=True) out = OUT_DIR / "router_pricing_catalog_normalized.csv" fields = [ "source_file", "source_sheet", "source_row", "manufacturer", "item_type", "title", "model", "model_key", "sku", "msrp", "msrp_numeric", "term", "wifi", "ethernet_ports", "serial_ports", "battery", "ruggedization", "modem_type", "primary_use_case", "device_type", "poe", "notes", "sources_label", ] with out.open("w", encoding="utf-8", newline="") as f: w = csv.DictWriter(f, fieldnames=fields) w.writeheader() for r in rows: w.writerow({ "source_file": r.source_file, "source_sheet": r.source_sheet, "source_row": r.source_row, "manufacturer": r.manufacturer, "item_type": r.item_type, "title": r.title, "model": r.model, "model_key": r.model_key, "sku": r.sku, "msrp": r.msrp, "msrp_numeric": f"{r.msrp_numeric:.2f}" if isinstance(r.msrp_numeric, float) else "", "term": r.term, "wifi": r.wifi, "ethernet_ports": r.ethernet_ports, "serial_ports": r.serial_ports, "battery": r.battery, "ruggedization": r.ruggedization, "modem_type": r.modem_type, "primary_use_case": r.primary_use_case, "device_type": r.device_type, "poe": r.poe, "notes": r.notes, "sources_label": r.sources_label, }) return out def _write_variant_options(rows: Sequence[CatalogRow]) -> Path: out = OUT_DIR / "router_variant_options_normalized.csv" fields = [ "manufacturer", "model_key", "model", "default_option", "sku", "title", "term", "msrp", "msrp_numeric", "wifi", "ethernet_ports", "serial_ports", "poe", "battery", "modem_type", "source_file", "source_sheet", "source_row", ] groups: Dict[Tuple[str, str], List[CatalogRow]] = {} for r in rows: groups.setdefault((r.manufacturer, r.model_key), []).append(r) def rank_row(r: CatalogRow) -> Tuple[int, float, str]: term = _norm(r.term).upper() if term == "1YR": term_rank = 0 elif not term: term_rank = 1 else: term_rank = 2 msrp = float(r.msrp_numeric) if isinstance(r.msrp_numeric, float) else 999999.0 return (term_rank, msrp, _norm(r.sku).upper()) with out.open("w", encoding="utf-8", newline="") as f: w = csv.DictWriter(f, fieldnames=fields) w.writeheader() for (manu, key), items in sorted(groups.items()): items_sorted = sorted(items, key=rank_row) default_sku = _norm(items_sorted[0].sku) for r in items_sorted: w.writerow( { "manufacturer": manu, "model_key": key, "model": r.model, "default_option": "yes" if _norm(r.sku) == default_sku else "no", "sku": r.sku, "title": r.title, "term": r.term, "msrp": r.msrp, "msrp_numeric": f"{r.msrp_numeric:.2f}" if isinstance(r.msrp_numeric, float) else "", "wifi": r.wifi, "ethernet_ports": r.ethernet_ports, "serial_ports": r.serial_ports, "poe": r.poe, "battery": r.battery, "modem_type": r.modem_type, "source_file": r.source_file, "source_sheet": r.source_sheet, "source_row": r.source_row, } ) return out def _parse_parsec_family(description: str) -> str: up = _norm(description).upper() for fam in FAMILY_HINTS: if fam in up: return fam.title() m = re.search(r"PRO\s+([A-Z][A-Z ]{2,30})", up) if m: return _norm(m.group(1)).title() return "" def _write_parsec_pricing() -> Path: src = _find_parsec_source_xlsx() out = OUT_DIR / "parsec_pricing_normalized.csv" fields = [ "source_file", "source_sheet", "source_row", "manufacturer", "category", "family", "part_number", "title", "description", "msrp", "msrp_numeric", "connector_summary", "fit_profile", ] rows: List[Dict[str, Any]] = [] if src and src.exists(): wb = openpyxl.load_workbook(src, data_only=True, read_only=True) for ws in wb.worksheets: matrix = [list(r) for r in ws.iter_rows(min_row=1, max_row=min(2500, ws.max_row or 0), min_col=1, max_col=min(20, ws.max_column or 0), values_only=True)] if not matrix: continue hdr_idx = _detect_header_row(matrix) if hdr_idx is None: continue idx_map = _header_index_map(matrix[hdr_idx]) if "sku" not in idx_map or "msrp" not in idx_map: continue for i in range(hdr_idx + 1, len(matrix)): row = matrix[i] part = _rowval(row, idx_map, "sku") desc = _rowval(row, idx_map, "description") or _rowval(row, idx_map, "title") msrp_raw = _rowval(row, idx_map, "msrp") if not part or not msrp_raw: continue msrp, msrp_n = _parse_money(msrp_raw) if msrp_n is None: continue fit = "" low_desc = desc.lower() if "vehicle" in low_desc or "mobile" in low_desc: fit = "vehicle" elif "indoor" in low_desc: fit = "indoor" elif "directional" in low_desc: fit = "directional" elif "outdoor" in low_desc or "pole" in low_desc or "wall" in low_desc: fit = "outdoor fixed" family = _parse_parsec_family(desc or part) connector_summary = "" for patt in (r"\b\d\s*LTE\b", r"\b\d\s*WiFi\b", r"\b\d\s*GNSS\b", r"SMA", r"RP\s*SMA"): m = re.search(patt, desc, flags=re.IGNORECASE) if m: connector_summary += ("; " if connector_summary else "") + _norm(m.group(0)) rows.append( { "source_file": src.name, "source_sheet": ws.title, "source_row": i + 1, "manufacturer": "Parsec", "category": ws.title, "family": family, "part_number": part, "title": part, "description": desc, "msrp": msrp, "msrp_numeric": f"{msrp_n:.2f}", "connector_summary": connector_summary, "fit_profile": fit, } ) with out.open("w", encoding="utf-8", newline="") as f: w = csv.DictWriter(f, fieldnames=fields) w.writeheader() for r in rows: w.writerow(r) return out def _write_peplink_overlay() -> Path: src = _find_peplink_overlay_source_xlsx() out = OUT_DIR / "peplink_replacement_overlay.csv" fields = [ "source_file", "source_sheet", "source_row", "old_item_type", "old_item_name", "old_description", "old_model_key", "old_sku", "new_item_type", "new_item_name", "new_description", "new_model_key", "new_sku", "eos_year", "eol_year", "notes", ] rows: List[Dict[str, Any]] = [] if src and src.exists(): wb = openpyxl.load_workbook(src, data_only=True, read_only=True) ws = wb.worksheets[0] matrix = [list(r) for r in ws.iter_rows(min_row=1, max_row=min(1000, ws.max_row or 0), min_col=1, max_col=min(20, ws.max_column or 0), values_only=True)] if matrix: hdr = [_norm(x).lower() for x in matrix[0]] def ix(name: str) -> int: for i, h in enumerate(hdr): if name in h: return i return -1 old_item_t = ix("item type") old_item = ix("item name") old_desc = ix("description") new_item_t = ix("alternative item type") new_item = ix("alternative item name") new_desc = ix("alternative description") for ridx, r in enumerate(matrix[1:], start=2): old_name = _norm(r[old_item]) if old_item >= 0 and old_item < len(r) else "" new_name = _norm(r[new_item]) if new_item >= 0 and new_item < len(r) else "" if not old_name or not new_name: continue old_d = _norm(r[old_desc]) if old_desc >= 0 and old_desc < len(r) else "" new_d = _norm(r[new_desc]) if new_desc >= 0 and new_desc < len(r) else "" old_model = (_model_candidates(f"{old_name} {old_d}") or [_compact(old_name)])[0] new_model = (_model_candidates(f"{new_name} {new_d}") or [_compact(new_name)])[0] old_sku = next((m for m in _model_candidates(old_d) if m != old_model), "") new_sku = next((m for m in _model_candidates(new_d) if m != new_model), "") rows.append( { "source_file": src.name, "source_sheet": ws.title, "source_row": ridx, "old_item_type": _norm(r[old_item_t]) if old_item_t >= 0 and old_item_t < len(r) else "", "old_item_name": old_name, "old_description": old_d, "old_model_key": old_model, "old_sku": old_sku, "new_item_type": _norm(r[new_item_t]) if new_item_t >= 0 and new_item_t < len(r) else "", "new_item_name": new_name, "new_description": new_d, "new_model_key": new_model, "new_sku": new_sku, "eos_year": "2024", "eol_year": "2025", "notes": "Policy default overlay from Peplink replacement list (EOS=2024, EOL=2025); retain stricter existing lifecycle dates where present.", } ) with out.open("w", encoding="utf-8", newline="") as f: w = csv.DictWriter(f, fieldnames=fields) w.writeheader() for r in rows: w.writerow(r) return out def main() -> None: OUT_DIR.mkdir(parents=True, exist_ok=True) catalog_rows = _build_catalog_rows() catalog_rows.extend(_verizon_gateway_web_rows()) catalog_rows = [r for r in catalog_rows if r.model_key and r.msrp] catalog_path = _write_router_catalog(catalog_rows) variants_path = _write_variant_options(catalog_rows) parsec_path = _write_parsec_pricing() peplink_overlay_path = _write_peplink_overlay() source_dirs = _iter_source_dirs() parsec_source = _find_parsec_source_xlsx() peplink_overlay_source = _find_peplink_overlay_source_xlsx() atel_source = _find_atel_pdf() summary = OUT_DIR / "pricing_normalization_summary.txt" model_count = len({r.model_key for r in catalog_rows}) verizon_model_count = len({r.model_key for r in catalog_rows if "verizon" in _norm(r.sources_label).lower()}) with summary.open("w", encoding="utf-8") as f: f.write(f"source_dirs={';'.join(_display_path(d) for d in source_dirs)}\n") f.write(f"parsec_source={_display_path(parsec_source)}\n") f.write(f"peplink_overlay_source={_display_path(peplink_overlay_source)}\n") f.write(f"atel_source={_display_path(atel_source)}\n") f.write(f"catalog_rows={len(catalog_rows)}\n") f.write(f"catalog_models={model_count}\n") f.write(f"verizon_gateway_models={verizon_model_count}\n") f.write(f"catalog_path={_display_path(catalog_path)}\n") f.write(f"variants_path={_display_path(variants_path)}\n") f.write(f"parsec_path={_display_path(parsec_path)}\n") f.write(f"peplink_overlay_path={_display_path(peplink_overlay_path)}\n") print(f"Wrote {catalog_path}") print(f"Wrote {variants_path}") print(f"Wrote {parsec_path}") print(f"Wrote {peplink_overlay_path}") print(f"Summary: {summary}") if __name__ == "__main__": main()