#!/usr/bin/env python3 from __future__ import annotations import csv import re from pathlib import Path from typing import Any, Dict, List REPO_ROOT = Path(__file__).resolve().parents[2] LIFECYCLE_CSV = REPO_ROOT / "routers_eos_eol_by_sku.csv" FACT_CSVS = [ REPO_ROOT / "feb2026routers.csv", REPO_ROOT / "replacement_devices_missing_from_dec2025routers.csv", REPO_ROOT / "backend" / "app" / "knowledgebase" / "data" / "normalized" / "router_pricing_catalog_normalized.csv", ] OUT_CSV = REPO_ROOT / "docs" / "reports" / "router_missing_fields_audit.csv" REQ_FIELDS = [ "ruggedization", "modem_type", "device_type", "poe", "wifi", "wan_ports", "lan_ports", "ethernet_ports", "serial_ports", ] def _norm(v: Any) -> str: return re.sub(r"\s+", " ", str(v or "").strip()) def _compact(v: Any) -> str: return re.sub(r"[^A-Z0-9]", "", _norm(v).upper()) def _header_get(row: Dict[str, Any], *names: str) -> str: lut = {str(k).strip().lower(): k for k in row.keys()} for n in names: k = lut.get(n.strip().lower()) if k: return _norm(row.get(k, "")) return "" def _extract_models(text: str) -> List[str]: out: List[str] = [] seen = set() for m in re.finditer(r"\b[A-Za-z]{1,8}[- ]?\d{2,4}[A-Za-z0-9\-]*\b", text): tok = _compact(m.group(0)) if not tok or tok.isdigit() or len(tok) < 4: continue if (not any(c.isalpha() for c in tok)) or (not any(c.isdigit() for c in tok)): continue if tok not in seen: seen.add(tok) out.append(tok) return out def _infer_poe(blob: str) -> str: low = blob.lower() if "poe" in low: if "no poe" in low or "without poe" in low: return "no" return "yes" return "" def _load_fact_index() -> Dict[str, Dict[str, str]]: idx: Dict[str, Dict[str, str]] = {} for path in FACT_CSVS: if not path.exists(): continue with path.open("r", encoding="utf-8-sig", newline="") as f: reader = csv.DictReader(f) for ridx, row in enumerate(reader, start=2): model = _header_get(row, "Model", "SKU", "Title", "Device", "router", "model_key") model_cands = _extract_models(model) if not model_cands: model_cands = _extract_models(_header_get(row, "Title", "Product description", "description", "notes", "sku")) if not model_cands: continue mk = model_cands[0] record = idx.setdefault( mk, { "model_key": mk, "display_model": _header_get(row, "Title", "Model", "SKU", "Device") or mk, "sources": "", "ruggedization": "", "modem_type": "", "device_type": "", "poe": "", "wifi": "", "wan_ports": "", "lan_ports": "", "ethernet_ports": "", "serial_ports": "", }, ) source_name = path.name if source_name not in record["sources"]: record["sources"] = (record["sources"] + "; " + source_name).strip("; ") raw_blob = " ".join( [ _header_get(row, "Product description", "Description", "notes", "Special notes", "short description", "Long Description"), _header_get(row, "WAN ports and speed", "WAN ports", "wan_ports", "ethernet_ports"), _header_get(row, "LAN ports and speed", "LAN ports", "lan_ports", "ethernet_ports"), ] ) field_map = { "ruggedization": _header_get(row, "Ruggedization", "ruggedization"), "modem_type": _header_get(row, "Modem Type", "modem_type", "modem", "Cellular Networks"), "device_type": _header_get(row, "Type", "Device Type", "device_type"), "poe": _header_get(row, "poe", "PoE") or _infer_poe(raw_blob), "wifi": _header_get(row, "WiFi type", "WiFi", "wifi", "wireless"), "wan_ports": _header_get(row, "WAN ports and speed", "WAN ports", "wan_ports"), "lan_ports": _header_get(row, "LAN ports and speed", "LAN ports", "lan_ports"), "ethernet_ports": _header_get(row, "Ethernet Ports", "ethernet_ports", "WAN/LAN", "WAN ports and speed"), "serial_ports": _header_get(row, "Serial port (yes/no)", "Serial Ports", "serial", "serial_ports"), } for k, val in field_map.items(): if (not record[k]) and val: record[k] = val # fill from free-text cues if (not record["device_type"]) and raw_blob: low = raw_blob.lower() if "adapter" in low: record["device_type"] = "Adapter" elif any(t in low for t in ("router", "gateway", "fwa", "cpe")): record["device_type"] = "Router" if (not record["serial_ports"]) and ("serial" in raw_blob.lower()): record["serial_ports"] = "yes" return idx def _load_lifecycle_index() -> Dict[str, Dict[str, str]]: out: Dict[str, Dict[str, str]] = {} if not LIFECYCLE_CSV.exists(): return out with LIFECYCLE_CSV.open("r", encoding="utf-8-sig", newline="") as f: r = csv.DictReader(f) for row in r: sku = _norm(row.get("SKU", "")) if not sku: continue key = _compact(sku) if not key: continue out[key] = { "sku": sku, "manufacturer": _norm(row.get("manufacturer", "")), "tech": _norm(row.get("Tech", "")), "eos": _norm(row.get("end_of_sale", "")), "eol": _norm(row.get("end_of_life", "")), } return out def main() -> None: facts = _load_fact_index() life = _load_lifecycle_index() keys = set(facts.keys()) | set(life.keys()) OUT_CSV.parent.mkdir(parents=True, exist_ok=True) fields = [ "model_key", "display_model", "manufacturer", "tech", "eos", "eol", "ruggedization", "modem_type", "device_type", "poe", "wifi", "wan_ports", "lan_ports", "ethernet_ports", "serial_ports", "missing_fields", "source_docs", ] rows_out: List[Dict[str, str]] = [] for k in sorted(keys): f = facts.get(k, {}) l = life.get(k, {}) record = { "model_key": k, "display_model": f.get("display_model", "") or l.get("sku", "") or k, "manufacturer": l.get("manufacturer", ""), "tech": l.get("tech", ""), "eos": l.get("eos", ""), "eol": l.get("eol", ""), "ruggedization": f.get("ruggedization", ""), "modem_type": f.get("modem_type", ""), "device_type": f.get("device_type", ""), "poe": f.get("poe", ""), "wifi": f.get("wifi", ""), "wan_ports": f.get("wan_ports", ""), "lan_ports": f.get("lan_ports", ""), "ethernet_ports": f.get("ethernet_ports", ""), "serial_ports": f.get("serial_ports", ""), "source_docs": f.get("sources", "") or ("routers_eos_eol_by_sku.csv" if l else ""), } missing = [fld for fld in REQ_FIELDS if not _norm(record.get(fld, ""))] if missing: record["missing_fields"] = ", ".join(missing) rows_out.append(record) with OUT_CSV.open("w", encoding="utf-8", newline="") as f: w = csv.DictWriter(f, fieldnames=fields) w.writeheader() for row in rows_out: w.writerow(row) print(f"Wrote {OUT_CSV} with {len(rows_out)} rows needing enrichment") if __name__ == "__main__": main()