| |
| from __future__ import annotations |
|
|
| import csv |
| import os |
| import re |
| from dataclasses import dataclass |
| from pathlib import Path |
| from typing import Any, Dict, Iterable, List, Optional, Sequence, Tuple |
|
|
| import openpyxl |
| import pdfplumber |
|
|
| REPO_ROOT = Path(__file__).resolve().parents[2] |
| SOURCES_DIR = REPO_ROOT / "backend" / "app" / "knowledgebase" / "data" / "pricing_sources" |
| OUT_DIR = REPO_ROOT / "backend" / "app" / "knowledgebase" / "data" / "normalized" |
| EXTRA_SOURCES_ENV = "ROUTER_PRICING_EXTRA_SOURCE_DIRS" |
| DEFAULT_EXTERNAL_SOURCE_DIRS: Tuple[Path, ...] = ( |
| REPO_ROOT.parent / "RAG SKU and PRice List", |
| ) |
|
|
|
|
| def _display_path(path: Optional[Path]) -> str: |
| if path is None: |
| return "" |
| try: |
| return str(path.resolve().relative_to(REPO_ROOT)) |
| except Exception: |
| try: |
| return str(path.relative_to(REPO_ROOT)) |
| except Exception: |
| return str(path) |
|
|
| FAMILY_HINTS = [ |
| "AKITA", |
| "CHINOOK", |
| "HUSKY", |
| "ALBATROSS", |
| "WHIPPET", |
| "AIREDALE", |
| "BEAGLE", |
| "BOXER", |
| "DALMATIAN", |
| "GREYHOUND", |
| "IRISH SETTER", |
| "LABRADOR", |
| "ROTTWEILER", |
| "DOBERMAN", |
| "BORDER COLLIE", |
| "GREAT DANE", |
| "ST. BERNARD", |
| ] |
|
|
| VERIZON_GATEWAY_WEB_ROWS: Tuple[Dict[str, str], ...] = ( |
| { |
| "manufacturer": "Verizon (OEM Arcadyan)", |
| "model": "XC46BE", |
| "title": "Verizon Business Internet Gateway XC46BE (Dragon)", |
| }, |
| { |
| "manufacturer": "Verizon (OEM Askey)", |
| "model": "FSNO21VA", |
| "title": "Verizon Internet Gateway Business FSNO21VA", |
| }, |
| { |
| "manufacturer": "Verizon (OEM Askey)", |
| "model": "ASK-NCM1100E", |
| "title": "Verizon Business Internet Gateway ASK-NCM1100E (Crown)", |
| }, |
| { |
| "manufacturer": "Verizon (OEM Askey)", |
| "model": "ASK-NCQ1338E", |
| "title": "Verizon Internet Gateway for Business ASK-NCQ1338E", |
| }, |
| { |
| "manufacturer": "Verizon (OEM Arris)", |
| "model": "NVG558", |
| "title": "Arris NVG558 LTE Router", |
| }, |
| { |
| "manufacturer": "Verizon (OEM Askey)", |
| "model": "ASK-NCM1100", |
| "title": "Verizon Home Internet Router ASK-NCM1100", |
| }, |
| { |
| "manufacturer": "Verizon (OEM Askey)", |
| "model": "ASK-NCQ1338", |
| "title": "Verizon Internet Gateway ASK-NCQ1338", |
| }, |
| { |
| "manufacturer": "Verizon (OEM Askey)", |
| "model": "ASK-NCQ1338FA", |
| "title": "Verizon Internet Gateway ASK-NCQ1338FA", |
| }, |
| { |
| "manufacturer": "Verizon (OEM Arcadyan)", |
| "model": "ARC-XCI55AX", |
| "title": "Verizon Internet Gateway ARC-XCI55AX", |
| }, |
| { |
| "manufacturer": "Verizon (OEM Wistron NeWeb)", |
| "model": "WNC-CR200A", |
| "title": "Verizon Internet Gateway WNC-CR200A", |
| }, |
| ) |
|
|
| MODEL_ALIAS = { |
| "RX50": "RX55", |
| "RX60": "XR60", |
| "CR202LITE": "CR202", |
| "CR202-LITE": "CR202", |
| "MAXBR1PRO": "MAXBR1PRO", |
| "MAXBR1PRO5G": "MAXBR1PRO5G", |
| "BR1PRO": "MAXBR1PRO", |
| "BR1PRO5G": "MAXBR1PRO5G", |
| "BR1MINI": "BR1MINI5G", |
| "MAXBR1MINI": "BR1MINI5G", |
| "BR1MINI5G": "BR1MINI5G", |
| "FW2000": "FW2000E", |
| "MG51E": "MG51", |
| } |
|
|
| HEADER_ALIASES = { |
| "sku": ("sku", "partnumber", "part number", "item name", "part #", "parsec part #", "parsec accessories part #"), |
| "title": ("product", "model", "item name", "short description", "product model", "variant"), |
| "description": ("description", "long description", "desc (bom & accessory content)", "desc", "special offers & notes", "notes", "remark"), |
| "msrp": ("msrp", "msrp / retail price", "list price", "list ($ usd)", "retail price", "map"), |
| "term": ("term", "registration", "subscription duration included"), |
| "wifi": ("wifi", "wi-fi", "wifi standard", "wi-fi standard", "wifi capability", "wireless"), |
| "ethernet": ("ethernet ports", "ethernet port", "lan", "lan ports and speed", "wan", "wan ports and speed"), |
| "serial": ("serial", "serial port", "serial ports"), |
| "battery": ("battery",), |
| "modem": ("cellular networks", "wan", "modem", "modem type", "cat/cellular", "cat"), |
| "poe": ("poe",), |
| } |
|
|
| @dataclass |
| class CatalogRow: |
| source_file: str |
| source_sheet: str |
| source_row: int |
| manufacturer: str |
| item_type: str |
| title: str |
| model: str |
| model_key: str |
| sku: str |
| msrp: str |
| msrp_numeric: Optional[float] |
| term: str |
| wifi: str |
| ethernet_ports: str |
| serial_ports: str |
| battery: str |
| ruggedization: str |
| modem_type: str |
| primary_use_case: str |
| device_type: str |
| poe: str |
| notes: str |
| sources_label: str |
|
|
|
|
| def _norm(text: Any) -> str: |
| return re.sub(r"\s+", " ", str(text or "").strip()) |
|
|
|
|
| def _compact(text: Any) -> str: |
| return re.sub(r"[^A-Z0-9]", "", _norm(text).upper()) |
|
|
|
|
| def _file_key(path_or_name: Any) -> str: |
| name = Path(str(path_or_name or "")).name |
| return re.sub(r"[^a-z0-9]+", "", name.lower()) |
|
|
|
|
| def _safe_mtime(path: Path) -> float: |
| try: |
| return float(path.stat().st_mtime) |
| except Exception: |
| return 0.0 |
|
|
|
|
| def _iter_source_dirs() -> List[Path]: |
| dirs: List[Path] = [SOURCES_DIR] |
| for d in DEFAULT_EXTERNAL_SOURCE_DIRS: |
| if d.exists() and d.is_dir(): |
| dirs.append(d) |
| raw_extra = str(os.getenv(EXTRA_SOURCES_ENV, "") or "").strip() |
| if raw_extra: |
| parts = [p.strip() for p in re.split(r"[,;]", raw_extra) if p.strip()] |
| for part in parts: |
| p = Path(part).expanduser() |
| if p.exists() and p.is_dir(): |
| dirs.append(p) |
| out: List[Path] = [] |
| seen: set[str] = set() |
| for d in dirs: |
| key = str(d.resolve()) if d.exists() else str(d) |
| if key in seen: |
| continue |
| seen.add(key) |
| out.append(d) |
| return out |
|
|
|
|
| def _iter_source_files(extensions: Sequence[str]) -> List[Path]: |
| normalized_ext = {str(x).strip().lower() for x in extensions if str(x).strip()} |
| by_key: Dict[str, Path] = {} |
| for src_dir in _iter_source_dirs(): |
| for path in sorted(src_dir.iterdir()): |
| if (not path.is_file()) or path.name.startswith("~$"): |
| continue |
| if normalized_ext and path.suffix.lower() not in normalized_ext: |
| continue |
| key = _file_key(path.name) |
| prev = by_key.get(key) |
| if prev is None or _safe_mtime(path) > _safe_mtime(prev): |
| by_key[key] = path |
| return sorted(by_key.values(), key=lambda p: (p.name.lower(), str(p))) |
|
|
|
|
| def _is_parsec_source_name(path: Path) -> bool: |
| key = _file_key(path.name) |
| return ("parsec" in key) and key.endswith("xlsx") |
|
|
|
|
| def _is_peplink_overlay_source_name(path: Path) -> bool: |
| key = _file_key(path.name) |
| if not key.endswith("xlsx"): |
| return False |
| if "peplink" not in key or "device" not in key: |
| return False |
| return ("replacement" in key) or ("repalcement" in key) |
|
|
|
|
| def _find_parsec_source_xlsx() -> Optional[Path]: |
| candidates = [p for p in _iter_source_files((".xlsx",)) if _is_parsec_source_name(p)] |
| if not candidates: |
| return None |
| candidates.sort( |
| key=lambda p: ( |
| 0 if "pricing" in _file_key(p.name) else 1, |
| -_safe_mtime(p), |
| p.name.lower(), |
| ) |
| ) |
| return candidates[0] |
|
|
|
|
| def _find_peplink_overlay_source_xlsx() -> Optional[Path]: |
| candidates = [p for p in _iter_source_files((".xlsx",)) if _is_peplink_overlay_source_name(p)] |
| if not candidates: |
| return None |
| candidates.sort( |
| key=lambda p: ( |
| 0 if "replacementdevices" in _file_key(p.name) else 1, |
| -_safe_mtime(p), |
| p.name.lower(), |
| ) |
| ) |
| return candidates[0] |
|
|
|
|
| def _iter_router_pricing_xlsx_sources() -> List[Path]: |
| out: List[Path] = [] |
| for path in _iter_source_files((".xlsx",)): |
| if _is_parsec_source_name(path): |
| continue |
| if _is_peplink_overlay_source_name(path): |
| continue |
| out.append(path) |
| return out |
|
|
|
|
| def _find_atel_pdf() -> Optional[Path]: |
| candidates: List[Path] = [] |
| for path in _iter_source_files((".pdf",)): |
| key = _file_key(path.name) |
| if "atel" in key and ("pricing" in key or "wholesale" in key): |
| candidates.append(path) |
| if not candidates: |
| return None |
| candidates.sort(key=lambda p: (-_safe_mtime(p), p.name.lower())) |
| return candidates[0] |
|
|
|
|
| def _parse_money(value: Any) -> Tuple[str, Optional[float]]: |
| raw = _norm(value) |
| if not raw: |
| return "", None |
| m = re.search(r"(-?\d{1,3}(?:,\d{3})*(?:\.\d+)?|-?\d+(?:\.\d+)?)", raw.replace("$", "")) |
| if not m: |
| return raw, None |
| try: |
| num = float(m.group(1).replace(",", "")) |
| except Exception: |
| return raw, None |
| return f"${num:,.2f}", num |
|
|
|
|
| def _extract_term(text: str) -> str: |
| low = text.lower() |
| m = re.search(r"\b([135])\s*[- ]?year\b", low) |
| if m: |
| return f"{m.group(1)}YR" |
| m = re.search(r"\b([135])\s*yr\b", low) |
| if m: |
| return f"{m.group(1)}YR" |
| return "" |
|
|
|
|
| def _model_candidates(text: str) -> List[str]: |
| raw = _norm(text) |
| if not raw: |
| return [] |
| out: List[str] = [] |
| seen: set[str] = set() |
|
|
| phrase_patterns = [ |
| r"\bmax\s*br\s*1\s*pro\s*5g\b", |
| r"\bmax\s*br\s*1\s*mini\s*5g\b", |
| r"\bmax\s*br\s*1\s*pro\b", |
| r"\bmax\s*br\s*1\s*mini\b", |
| ] |
| for patt in phrase_patterns: |
| for m in re.finditer(patt, raw, flags=re.IGNORECASE): |
| tok = _compact(m.group(0)) |
| tok = MODEL_ALIAS.get(tok, tok) |
| if tok and tok not in seen: |
| seen.add(tok) |
| out.append(tok) |
|
|
| for m in re.finditer(r"\b[A-Za-z]{1,8}[- ]?\d{2,4}[A-Za-z0-9\-]*\b", raw): |
| tok = _compact(m.group(0)) |
| if not tok: |
| continue |
| if tok.isdigit() or len(tok) < 4: |
| continue |
| if (not any(c.isalpha() for c in tok)) or (not any(c.isdigit() for c in tok)): |
| continue |
| tok = MODEL_ALIAS.get(tok, tok) |
| if tok in {"SKU", "MSRP", "MODEL"}: |
| continue |
| if tok not in seen: |
| seen.add(tok) |
| out.append(tok) |
| return out |
|
|
|
|
| def _manufacturer_from_filename(name: str) -> str: |
| low = name.lower() |
| if "semtech" in low or "sierra" in low: |
| return "Semtech" |
| if "ericsson" in low or "cradlepoint" in low: |
| return "Ericsson Cradlepoint" |
| if "inseego" in low: |
| return "Inseego" |
| if "inhand" in low: |
| return "InHand Networks" |
| if "peplink" in low: |
| return "Peplink" |
| if "parsec" in low: |
| return "Parsec" |
| if "atel" in low: |
| return "ATEL" |
| return "Unknown" |
|
|
|
|
| def _clean_header(text: Any) -> str: |
| return _compact(text).lower() |
|
|
|
|
| def _header_index_map(header_row: Sequence[Any]) -> Dict[str, int]: |
| idx: Dict[str, int] = {} |
| clean = [_clean_header(h) for h in header_row] |
| for key, aliases in HEADER_ALIASES.items(): |
| for i, h in enumerate(clean): |
| if not h: |
| continue |
| for cand in aliases: |
| if _clean_header(cand) == h: |
| idx[key] = i |
| break |
| if key in idx: |
| break |
| if key in idx: |
| continue |
| for i, h in enumerate(clean): |
| if not h: |
| continue |
| if key == "msrp" and ("msrp" in h or "listprice" in h or "retailprice" in h or "listusd" in h): |
| idx[key] = i |
| break |
| if key == "sku" and ("sku" in h or "partnumber" in h or h.startswith("part")): |
| idx[key] = i |
| break |
| if key == "title" and any(t in h for t in ("product", "model", "itemname", "shortdescription")): |
| idx[key] = i |
| break |
| if key == "description" and any(t in h for t in ("description", "longdescription", "desc", "notes", "remark")): |
| idx[key] = i |
| break |
| if key == "ethernet" and "ethernet" in h: |
| idx[key] = i |
| break |
| if key == "serial" and "serial" in h: |
| idx[key] = i |
| break |
| if key == "wifi" and "wifi" in h: |
| idx[key] = i |
| break |
| if key == "battery" and "battery" in h: |
| idx[key] = i |
| break |
| if key == "modem" and ("cellular" in h or h == "wan" or "modem" in h or "cat" in h): |
| idx[key] = i |
| break |
| if key == "poe" and "poe" in h: |
| idx[key] = i |
| break |
| return idx |
|
|
|
|
| def _detect_header_row(rows: Sequence[Sequence[Any]]) -> Optional[int]: |
| for i, row in enumerate(rows[:35]): |
| joined = " ".join(_norm(v).lower() for v in row if _norm(v)) |
| if not joined: |
| continue |
| score = 0 |
| if "msrp" in joined or "list price" in joined or "retail price" in joined: |
| score += 2 |
| if "sku" in joined or "part number" in joined or "partnumber" in joined: |
| score += 2 |
| if "description" in joined or "product" in joined or "model" in joined: |
| score += 2 |
| if score >= 4: |
| return i |
| return None |
|
|
|
|
| def _rowval(row: Sequence[Any], idx_map: Dict[str, int], key: str) -> str: |
| ix = idx_map.get(key) |
| if ix is None or ix >= len(row): |
| return "" |
| return _norm(row[ix]) |
|
|
|
|
| def _infer_device_type(blob_low: str) -> str: |
| if any(t in blob_low for t in ("license", "subscription", "support", "renewal", "service", "primecare", "incontrol", "airlink complete")): |
| return "Service/License" |
| if any(t in blob_low for t in ("antenna", "mount", "cable", "adapter", "accessory", "bracket")): |
| return "Accessory" |
| if any(t in blob_low for t in ("gateway", "router", "fwa", "cpe", "modem")): |
| return "Router" |
| return "Unknown" |
|
|
|
|
| def _extract_ruggedization(blob_low: str) -> str: |
| hits = [] |
| for term in ("ip67", "ip66", "ip65", "ip64", "ip54", "mil-std", "vibration", "ignition sensing", "metal housing", "aluminum"): |
| if term in blob_low: |
| hits.append(term) |
| return ", ".join(dict.fromkeys(hits)) |
|
|
|
|
| def _extract_poe(blob_low: str, explicit: str) -> str: |
| if explicit: |
| return explicit |
| if "poe" in blob_low: |
| return "yes" |
| return "" |
|
|
|
|
| def _build_catalog_rows() -> List[CatalogRow]: |
| rows_out: List[CatalogRow] = [] |
| for path in _iter_router_pricing_xlsx_sources(): |
| manu = _manufacturer_from_filename(path.name) |
| wb = openpyxl.load_workbook(path, data_only=True, read_only=True) |
| for ws in wb.worksheets: |
| matrix: List[List[Any]] = [] |
| max_cols = min(80, ws.max_column or 0) |
| for r in ws.iter_rows(min_row=1, max_row=min(4000, ws.max_row or 0), min_col=1, max_col=max_cols, values_only=True): |
| matrix.append(list(r)) |
| if not matrix: |
| continue |
| hdr_idx = _detect_header_row(matrix) |
| if hdr_idx is None: |
| continue |
| hdr = matrix[hdr_idx] |
| idx_map = _header_index_map(hdr) |
| if ("msrp" not in idx_map) or ("sku" not in idx_map and "title" not in idx_map and "description" not in idx_map): |
| continue |
| for i in range(hdr_idx + 1, len(matrix)): |
| row = matrix[i] |
| if not row: |
| continue |
| sku = _rowval(row, idx_map, "sku") |
| title = _rowval(row, idx_map, "title") |
| desc = _rowval(row, idx_map, "description") |
| msrp_raw = _rowval(row, idx_map, "msrp") |
| msrp, msrp_n = _parse_money(msrp_raw) |
| if not msrp and msrp_n is None: |
| continue |
| blob = " ".join(x for x in (title, desc, sku) if x) |
| blob_low = blob.lower() |
| if not blob: |
| continue |
| device_type = _infer_device_type(blob_low) |
| if device_type == "Service/License": |
| |
| continue |
| model_candidates = _model_candidates(blob) |
| if not model_candidates: |
| continue |
| model_key = model_candidates[0] |
| model = model_key |
| term = _rowval(row, idx_map, "term") or _extract_term(blob) |
| wifi = _rowval(row, idx_map, "wifi") |
| ethernet = _rowval(row, idx_map, "ethernet") |
| serial = _rowval(row, idx_map, "serial") |
| battery = _rowval(row, idx_map, "battery") |
| modem = _rowval(row, idx_map, "modem") |
| poe = _extract_poe(blob_low, _rowval(row, idx_map, "poe")) |
| rugged = _extract_ruggedization(blob_low) |
| item_type = "Router" if device_type == "Router" else "Accessory" |
| rows_out.append( |
| CatalogRow( |
| source_file=path.name, |
| source_sheet=ws.title, |
| source_row=i + 1, |
| manufacturer=manu, |
| item_type=item_type, |
| title=title or model, |
| model=model, |
| model_key=model_key, |
| sku=sku, |
| msrp=msrp or _norm(msrp_raw), |
| msrp_numeric=msrp_n, |
| term=term, |
| wifi=wifi, |
| ethernet_ports=ethernet, |
| serial_ports=serial, |
| battery=battery, |
| ruggedization=rugged, |
| modem_type=modem, |
| primary_use_case="", |
| device_type=device_type, |
| poe=poe, |
| notes=desc, |
| sources_label="manufacturer pricing sheet", |
| ) |
| ) |
|
|
| |
| atel = _find_atel_pdf() |
| if atel and atel.exists(): |
| with pdfplumber.open(str(atel)) as pdf: |
| for pidx, page in enumerate(pdf.pages, start=1): |
| tables = page.extract_tables() or [] |
| for table in tables: |
| if not table or len(table) < 2: |
| continue |
| head = [(_norm(x).lower()) for x in table[1]] |
| def col(name: str) -> int: |
| for i, h in enumerate(head): |
| if name in h: |
| return i |
| return -1 |
| model_i = col("model") |
| sku_i = col("sku") |
| msrp_i = col("msrp") |
| desc_i = col("desc") |
| if model_i < 0 or msrp_i < 0: |
| continue |
| for ridx, r in enumerate(table[2:], start=3): |
| model_raw = _norm(r[model_i] if model_i < len(r) else "") |
| sku = _norm(r[sku_i] if sku_i < len(r) and sku_i >= 0 else "") |
| desc = _norm(r[desc_i] if desc_i < len(r) and desc_i >= 0 else "") |
| msrp_raw = _norm(r[msrp_i] if msrp_i < len(r) else "") |
| if not model_raw or not msrp_raw: |
| continue |
| msrp, msrp_n = _parse_money(msrp_raw) |
| model_cands = _model_candidates(f"{model_raw} {desc}") |
| model_key = model_cands[0] if model_cands else _compact(model_raw) |
| if not model_key or model_key.isdigit() or len(model_key) < 4: |
| continue |
| rows_out.append( |
| CatalogRow( |
| source_file=atel.name, |
| source_sheet=f"page_{pidx}", |
| source_row=ridx, |
| manufacturer="ATEL", |
| item_type="Router", |
| title=model_raw, |
| model=model_key, |
| model_key=model_key, |
| sku=sku, |
| msrp=msrp or msrp_raw, |
| msrp_numeric=msrp_n, |
| term=_extract_term(f"{model_raw} {desc}"), |
| wifi="", |
| ethernet_ports="", |
| serial_ports="", |
| battery="", |
| ruggedization=_extract_ruggedization(desc.lower()), |
| modem_type="", |
| primary_use_case="", |
| device_type="Router", |
| poe=_extract_poe(desc.lower(), ""), |
| notes=desc, |
| sources_label="manufacturer pricing sheet (MSRP only)", |
| ) |
| ) |
| deduped: List[CatalogRow] = [] |
| seen: set[Tuple[str, str, str, str, str]] = set() |
| for row in rows_out: |
| fp = ( |
| _compact(row.manufacturer), |
| _compact(row.model_key), |
| _compact(row.sku), |
| _compact(row.term), |
| _compact(row.msrp), |
| ) |
| if fp in seen: |
| continue |
| seen.add(fp) |
| deduped.append(row) |
| return deduped |
|
|
|
|
| def _verizon_gateway_web_rows() -> List[CatalogRow]: |
| """ |
| Add Verizon gateway model tokens so deterministic router facts can resolve them. |
| These rows are explicitly marked as web-sourced and are not used to fabricate pricing. |
| """ |
| rows: List[CatalogRow] = [] |
| for idx, spec in enumerate(VERIZON_GATEWAY_WEB_ROWS, start=1): |
| model = _norm(spec.get("model", "")) |
| model_key = _compact(model) |
| if not model_key: |
| continue |
| title = _norm(spec.get("title", "")) or model |
| manufacturer = _norm(spec.get("manufacturer", "")) or "Verizon" |
| rows.append( |
| CatalogRow( |
| source_file="verizon_support_gateway_models_web.csv", |
| source_sheet="verizon_support_web", |
| source_row=idx, |
| manufacturer=manufacturer, |
| item_type="Router", |
| title=title, |
| model=model_key, |
| model_key=model_key, |
| sku=model, |
| msrp="Unknown, ask Masters", |
| msrp_numeric=None, |
| term="", |
| wifi="", |
| ethernet_ports="", |
| serial_ports="", |
| battery="", |
| ruggedization="", |
| modem_type="5G gateway (web-sourced model index)", |
| primary_use_case="Verizon gateway model token coverage", |
| device_type="Router", |
| poe="", |
| notes=( |
| "Web-sourced (not from internal docs): model token from Verizon support " |
| "gateway equipment pages; verify exact hardware revision before quoting." |
| ), |
| sources_label="Web-sourced (not from internal docs): Verizon support equipment listing", |
| ) |
| ) |
| return rows |
|
|
|
|
| def _write_router_catalog(rows: Sequence[CatalogRow]) -> Path: |
| OUT_DIR.mkdir(parents=True, exist_ok=True) |
| out = OUT_DIR / "router_pricing_catalog_normalized.csv" |
| fields = [ |
| "source_file", |
| "source_sheet", |
| "source_row", |
| "manufacturer", |
| "item_type", |
| "title", |
| "model", |
| "model_key", |
| "sku", |
| "msrp", |
| "msrp_numeric", |
| "term", |
| "wifi", |
| "ethernet_ports", |
| "serial_ports", |
| "battery", |
| "ruggedization", |
| "modem_type", |
| "primary_use_case", |
| "device_type", |
| "poe", |
| "notes", |
| "sources_label", |
| ] |
| with out.open("w", encoding="utf-8", newline="") as f: |
| w = csv.DictWriter(f, fieldnames=fields) |
| w.writeheader() |
| for r in rows: |
| w.writerow({ |
| "source_file": r.source_file, |
| "source_sheet": r.source_sheet, |
| "source_row": r.source_row, |
| "manufacturer": r.manufacturer, |
| "item_type": r.item_type, |
| "title": r.title, |
| "model": r.model, |
| "model_key": r.model_key, |
| "sku": r.sku, |
| "msrp": r.msrp, |
| "msrp_numeric": f"{r.msrp_numeric:.2f}" if isinstance(r.msrp_numeric, float) else "", |
| "term": r.term, |
| "wifi": r.wifi, |
| "ethernet_ports": r.ethernet_ports, |
| "serial_ports": r.serial_ports, |
| "battery": r.battery, |
| "ruggedization": r.ruggedization, |
| "modem_type": r.modem_type, |
| "primary_use_case": r.primary_use_case, |
| "device_type": r.device_type, |
| "poe": r.poe, |
| "notes": r.notes, |
| "sources_label": r.sources_label, |
| }) |
| return out |
|
|
|
|
| def _write_variant_options(rows: Sequence[CatalogRow]) -> Path: |
| out = OUT_DIR / "router_variant_options_normalized.csv" |
| fields = [ |
| "manufacturer", |
| "model_key", |
| "model", |
| "default_option", |
| "sku", |
| "title", |
| "term", |
| "msrp", |
| "msrp_numeric", |
| "wifi", |
| "ethernet_ports", |
| "serial_ports", |
| "poe", |
| "battery", |
| "modem_type", |
| "source_file", |
| "source_sheet", |
| "source_row", |
| ] |
| groups: Dict[Tuple[str, str], List[CatalogRow]] = {} |
| for r in rows: |
| groups.setdefault((r.manufacturer, r.model_key), []).append(r) |
|
|
| def rank_row(r: CatalogRow) -> Tuple[int, float, str]: |
| term = _norm(r.term).upper() |
| if term == "1YR": |
| term_rank = 0 |
| elif not term: |
| term_rank = 1 |
| else: |
| term_rank = 2 |
| msrp = float(r.msrp_numeric) if isinstance(r.msrp_numeric, float) else 999999.0 |
| return (term_rank, msrp, _norm(r.sku).upper()) |
|
|
| with out.open("w", encoding="utf-8", newline="") as f: |
| w = csv.DictWriter(f, fieldnames=fields) |
| w.writeheader() |
| for (manu, key), items in sorted(groups.items()): |
| items_sorted = sorted(items, key=rank_row) |
| default_sku = _norm(items_sorted[0].sku) |
| for r in items_sorted: |
| w.writerow( |
| { |
| "manufacturer": manu, |
| "model_key": key, |
| "model": r.model, |
| "default_option": "yes" if _norm(r.sku) == default_sku else "no", |
| "sku": r.sku, |
| "title": r.title, |
| "term": r.term, |
| "msrp": r.msrp, |
| "msrp_numeric": f"{r.msrp_numeric:.2f}" if isinstance(r.msrp_numeric, float) else "", |
| "wifi": r.wifi, |
| "ethernet_ports": r.ethernet_ports, |
| "serial_ports": r.serial_ports, |
| "poe": r.poe, |
| "battery": r.battery, |
| "modem_type": r.modem_type, |
| "source_file": r.source_file, |
| "source_sheet": r.source_sheet, |
| "source_row": r.source_row, |
| } |
| ) |
| return out |
|
|
|
|
| def _parse_parsec_family(description: str) -> str: |
| up = _norm(description).upper() |
| for fam in FAMILY_HINTS: |
| if fam in up: |
| return fam.title() |
| m = re.search(r"PRO\s+([A-Z][A-Z ]{2,30})", up) |
| if m: |
| return _norm(m.group(1)).title() |
| return "" |
|
|
|
|
| def _write_parsec_pricing() -> Path: |
| src = _find_parsec_source_xlsx() |
| out = OUT_DIR / "parsec_pricing_normalized.csv" |
| fields = [ |
| "source_file", |
| "source_sheet", |
| "source_row", |
| "manufacturer", |
| "category", |
| "family", |
| "part_number", |
| "title", |
| "description", |
| "msrp", |
| "msrp_numeric", |
| "connector_summary", |
| "fit_profile", |
| ] |
| rows: List[Dict[str, Any]] = [] |
| if src and src.exists(): |
| wb = openpyxl.load_workbook(src, data_only=True, read_only=True) |
| for ws in wb.worksheets: |
| matrix = [list(r) for r in ws.iter_rows(min_row=1, max_row=min(2500, ws.max_row or 0), min_col=1, max_col=min(20, ws.max_column or 0), values_only=True)] |
| if not matrix: |
| continue |
| hdr_idx = _detect_header_row(matrix) |
| if hdr_idx is None: |
| continue |
| idx_map = _header_index_map(matrix[hdr_idx]) |
| if "sku" not in idx_map or "msrp" not in idx_map: |
| continue |
| for i in range(hdr_idx + 1, len(matrix)): |
| row = matrix[i] |
| part = _rowval(row, idx_map, "sku") |
| desc = _rowval(row, idx_map, "description") or _rowval(row, idx_map, "title") |
| msrp_raw = _rowval(row, idx_map, "msrp") |
| if not part or not msrp_raw: |
| continue |
| msrp, msrp_n = _parse_money(msrp_raw) |
| if msrp_n is None: |
| continue |
| fit = "" |
| low_desc = desc.lower() |
| if "vehicle" in low_desc or "mobile" in low_desc: |
| fit = "vehicle" |
| elif "indoor" in low_desc: |
| fit = "indoor" |
| elif "directional" in low_desc: |
| fit = "directional" |
| elif "outdoor" in low_desc or "pole" in low_desc or "wall" in low_desc: |
| fit = "outdoor fixed" |
| family = _parse_parsec_family(desc or part) |
| connector_summary = "" |
| for patt in (r"\b\d\s*LTE\b", r"\b\d\s*WiFi\b", r"\b\d\s*GNSS\b", r"SMA", r"RP\s*SMA"): |
| m = re.search(patt, desc, flags=re.IGNORECASE) |
| if m: |
| connector_summary += ("; " if connector_summary else "") + _norm(m.group(0)) |
| rows.append( |
| { |
| "source_file": src.name, |
| "source_sheet": ws.title, |
| "source_row": i + 1, |
| "manufacturer": "Parsec", |
| "category": ws.title, |
| "family": family, |
| "part_number": part, |
| "title": part, |
| "description": desc, |
| "msrp": msrp, |
| "msrp_numeric": f"{msrp_n:.2f}", |
| "connector_summary": connector_summary, |
| "fit_profile": fit, |
| } |
| ) |
| with out.open("w", encoding="utf-8", newline="") as f: |
| w = csv.DictWriter(f, fieldnames=fields) |
| w.writeheader() |
| for r in rows: |
| w.writerow(r) |
| return out |
|
|
|
|
| def _write_peplink_overlay() -> Path: |
| src = _find_peplink_overlay_source_xlsx() |
| out = OUT_DIR / "peplink_replacement_overlay.csv" |
| fields = [ |
| "source_file", |
| "source_sheet", |
| "source_row", |
| "old_item_type", |
| "old_item_name", |
| "old_description", |
| "old_model_key", |
| "old_sku", |
| "new_item_type", |
| "new_item_name", |
| "new_description", |
| "new_model_key", |
| "new_sku", |
| "eos_year", |
| "eol_year", |
| "notes", |
| ] |
| rows: List[Dict[str, Any]] = [] |
| if src and src.exists(): |
| wb = openpyxl.load_workbook(src, data_only=True, read_only=True) |
| ws = wb.worksheets[0] |
| matrix = [list(r) for r in ws.iter_rows(min_row=1, max_row=min(1000, ws.max_row or 0), min_col=1, max_col=min(20, ws.max_column or 0), values_only=True)] |
| if matrix: |
| hdr = [_norm(x).lower() for x in matrix[0]] |
| def ix(name: str) -> int: |
| for i, h in enumerate(hdr): |
| if name in h: |
| return i |
| return -1 |
| old_item_t = ix("item type") |
| old_item = ix("item name") |
| old_desc = ix("description") |
| new_item_t = ix("alternative item type") |
| new_item = ix("alternative item name") |
| new_desc = ix("alternative description") |
| for ridx, r in enumerate(matrix[1:], start=2): |
| old_name = _norm(r[old_item]) if old_item >= 0 and old_item < len(r) else "" |
| new_name = _norm(r[new_item]) if new_item >= 0 and new_item < len(r) else "" |
| if not old_name or not new_name: |
| continue |
| old_d = _norm(r[old_desc]) if old_desc >= 0 and old_desc < len(r) else "" |
| new_d = _norm(r[new_desc]) if new_desc >= 0 and new_desc < len(r) else "" |
| old_model = (_model_candidates(f"{old_name} {old_d}") or [_compact(old_name)])[0] |
| new_model = (_model_candidates(f"{new_name} {new_d}") or [_compact(new_name)])[0] |
| old_sku = next((m for m in _model_candidates(old_d) if m != old_model), "") |
| new_sku = next((m for m in _model_candidates(new_d) if m != new_model), "") |
| rows.append( |
| { |
| "source_file": src.name, |
| "source_sheet": ws.title, |
| "source_row": ridx, |
| "old_item_type": _norm(r[old_item_t]) if old_item_t >= 0 and old_item_t < len(r) else "", |
| "old_item_name": old_name, |
| "old_description": old_d, |
| "old_model_key": old_model, |
| "old_sku": old_sku, |
| "new_item_type": _norm(r[new_item_t]) if new_item_t >= 0 and new_item_t < len(r) else "", |
| "new_item_name": new_name, |
| "new_description": new_d, |
| "new_model_key": new_model, |
| "new_sku": new_sku, |
| "eos_year": "2024", |
| "eol_year": "2025", |
| "notes": "Policy default overlay from Peplink replacement list (EOS=2024, EOL=2025); retain stricter existing lifecycle dates where present.", |
| } |
| ) |
| with out.open("w", encoding="utf-8", newline="") as f: |
| w = csv.DictWriter(f, fieldnames=fields) |
| w.writeheader() |
| for r in rows: |
| w.writerow(r) |
| return out |
|
|
|
|
| def main() -> None: |
| OUT_DIR.mkdir(parents=True, exist_ok=True) |
| catalog_rows = _build_catalog_rows() |
| catalog_rows.extend(_verizon_gateway_web_rows()) |
| catalog_rows = [r for r in catalog_rows if r.model_key and r.msrp] |
| catalog_path = _write_router_catalog(catalog_rows) |
| variants_path = _write_variant_options(catalog_rows) |
| parsec_path = _write_parsec_pricing() |
| peplink_overlay_path = _write_peplink_overlay() |
| source_dirs = _iter_source_dirs() |
| parsec_source = _find_parsec_source_xlsx() |
| peplink_overlay_source = _find_peplink_overlay_source_xlsx() |
| atel_source = _find_atel_pdf() |
|
|
| summary = OUT_DIR / "pricing_normalization_summary.txt" |
| model_count = len({r.model_key for r in catalog_rows}) |
| verizon_model_count = len({r.model_key for r in catalog_rows if "verizon" in _norm(r.sources_label).lower()}) |
| with summary.open("w", encoding="utf-8") as f: |
| f.write(f"source_dirs={';'.join(_display_path(d) for d in source_dirs)}\n") |
| f.write(f"parsec_source={_display_path(parsec_source)}\n") |
| f.write(f"peplink_overlay_source={_display_path(peplink_overlay_source)}\n") |
| f.write(f"atel_source={_display_path(atel_source)}\n") |
| f.write(f"catalog_rows={len(catalog_rows)}\n") |
| f.write(f"catalog_models={model_count}\n") |
| f.write(f"verizon_gateway_models={verizon_model_count}\n") |
| f.write(f"catalog_path={_display_path(catalog_path)}\n") |
| f.write(f"variants_path={_display_path(variants_path)}\n") |
| f.write(f"parsec_path={_display_path(parsec_path)}\n") |
| f.write(f"peplink_overlay_path={_display_path(peplink_overlay_path)}\n") |
|
|
| print(f"Wrote {catalog_path}") |
| print(f"Wrote {variants_path}") |
| print(f"Wrote {parsec_path}") |
| print(f"Wrote {peplink_overlay_path}") |
| print(f"Summary: {summary}") |
|
|
|
|
| if __name__ == "__main__": |
| main() |
|
|