| |
| from __future__ import annotations |
|
|
| import csv |
| import re |
| from pathlib import Path |
| from typing import Any, Dict, List |
|
|
| REPO_ROOT = Path(__file__).resolve().parents[2] |
| LIFECYCLE_CSV = REPO_ROOT / "routers_eos_eol_by_sku.csv" |
| FACT_CSVS = [ |
| REPO_ROOT / "feb2026routers.csv", |
| REPO_ROOT / "replacement_devices_missing_from_dec2025routers.csv", |
| REPO_ROOT / "backend" / "app" / "knowledgebase" / "data" / "normalized" / "router_pricing_catalog_normalized.csv", |
| ] |
| OUT_CSV = REPO_ROOT / "docs" / "reports" / "router_missing_fields_audit.csv" |
|
|
|
|
| REQ_FIELDS = [ |
| "ruggedization", |
| "modem_type", |
| "device_type", |
| "poe", |
| "wifi", |
| "wan_ports", |
| "lan_ports", |
| "ethernet_ports", |
| "serial_ports", |
| ] |
|
|
|
|
| def _norm(v: Any) -> str: |
| return re.sub(r"\s+", " ", str(v or "").strip()) |
|
|
|
|
| def _compact(v: Any) -> str: |
| return re.sub(r"[^A-Z0-9]", "", _norm(v).upper()) |
|
|
|
|
| def _header_get(row: Dict[str, Any], *names: str) -> str: |
| lut = {str(k).strip().lower(): k for k in row.keys()} |
| for n in names: |
| k = lut.get(n.strip().lower()) |
| if k: |
| return _norm(row.get(k, "")) |
| return "" |
|
|
|
|
| def _extract_models(text: str) -> List[str]: |
| out: List[str] = [] |
| seen = set() |
| for m in re.finditer(r"\b[A-Za-z]{1,8}[- ]?\d{2,4}[A-Za-z0-9\-]*\b", text): |
| tok = _compact(m.group(0)) |
| if not tok or tok.isdigit() or len(tok) < 4: |
| continue |
| if (not any(c.isalpha() for c in tok)) or (not any(c.isdigit() for c in tok)): |
| continue |
| if tok not in seen: |
| seen.add(tok) |
| out.append(tok) |
| return out |
|
|
|
|
| def _infer_poe(blob: str) -> str: |
| low = blob.lower() |
| if "poe" in low: |
| if "no poe" in low or "without poe" in low: |
| return "no" |
| return "yes" |
| return "" |
|
|
|
|
| def _load_fact_index() -> Dict[str, Dict[str, str]]: |
| idx: Dict[str, Dict[str, str]] = {} |
| for path in FACT_CSVS: |
| if not path.exists(): |
| continue |
| with path.open("r", encoding="utf-8-sig", newline="") as f: |
| reader = csv.DictReader(f) |
| for ridx, row in enumerate(reader, start=2): |
| model = _header_get(row, "Model", "SKU", "Title", "Device", "router", "model_key") |
| model_cands = _extract_models(model) |
| if not model_cands: |
| model_cands = _extract_models(_header_get(row, "Title", "Product description", "description", "notes", "sku")) |
| if not model_cands: |
| continue |
| mk = model_cands[0] |
| record = idx.setdefault( |
| mk, |
| { |
| "model_key": mk, |
| "display_model": _header_get(row, "Title", "Model", "SKU", "Device") or mk, |
| "sources": "", |
| "ruggedization": "", |
| "modem_type": "", |
| "device_type": "", |
| "poe": "", |
| "wifi": "", |
| "wan_ports": "", |
| "lan_ports": "", |
| "ethernet_ports": "", |
| "serial_ports": "", |
| }, |
| ) |
| source_name = path.name |
| if source_name not in record["sources"]: |
| record["sources"] = (record["sources"] + "; " + source_name).strip("; ") |
|
|
| raw_blob = " ".join( |
| [ |
| _header_get(row, "Product description", "Description", "notes", "Special notes", "short description", "Long Description"), |
| _header_get(row, "WAN ports and speed", "WAN ports", "wan_ports", "ethernet_ports"), |
| _header_get(row, "LAN ports and speed", "LAN ports", "lan_ports", "ethernet_ports"), |
| ] |
| ) |
|
|
| field_map = { |
| "ruggedization": _header_get(row, "Ruggedization", "ruggedization"), |
| "modem_type": _header_get(row, "Modem Type", "modem_type", "modem", "Cellular Networks"), |
| "device_type": _header_get(row, "Type", "Device Type", "device_type"), |
| "poe": _header_get(row, "poe", "PoE") or _infer_poe(raw_blob), |
| "wifi": _header_get(row, "WiFi type", "WiFi", "wifi", "wireless"), |
| "wan_ports": _header_get(row, "WAN ports and speed", "WAN ports", "wan_ports"), |
| "lan_ports": _header_get(row, "LAN ports and speed", "LAN ports", "lan_ports"), |
| "ethernet_ports": _header_get(row, "Ethernet Ports", "ethernet_ports", "WAN/LAN", "WAN ports and speed"), |
| "serial_ports": _header_get(row, "Serial port (yes/no)", "Serial Ports", "serial", "serial_ports"), |
| } |
| for k, val in field_map.items(): |
| if (not record[k]) and val: |
| record[k] = val |
|
|
| |
| if (not record["device_type"]) and raw_blob: |
| low = raw_blob.lower() |
| if "adapter" in low: |
| record["device_type"] = "Adapter" |
| elif any(t in low for t in ("router", "gateway", "fwa", "cpe")): |
| record["device_type"] = "Router" |
| if (not record["serial_ports"]) and ("serial" in raw_blob.lower()): |
| record["serial_ports"] = "yes" |
|
|
| return idx |
|
|
|
|
| def _load_lifecycle_index() -> Dict[str, Dict[str, str]]: |
| out: Dict[str, Dict[str, str]] = {} |
| if not LIFECYCLE_CSV.exists(): |
| return out |
| with LIFECYCLE_CSV.open("r", encoding="utf-8-sig", newline="") as f: |
| r = csv.DictReader(f) |
| for row in r: |
| sku = _norm(row.get("SKU", "")) |
| if not sku: |
| continue |
| key = _compact(sku) |
| if not key: |
| continue |
| out[key] = { |
| "sku": sku, |
| "manufacturer": _norm(row.get("manufacturer", "")), |
| "tech": _norm(row.get("Tech", "")), |
| "eos": _norm(row.get("end_of_sale", "")), |
| "eol": _norm(row.get("end_of_life", "")), |
| } |
| return out |
|
|
|
|
| def main() -> None: |
| facts = _load_fact_index() |
| life = _load_lifecycle_index() |
|
|
| keys = set(facts.keys()) | set(life.keys()) |
| OUT_CSV.parent.mkdir(parents=True, exist_ok=True) |
| fields = [ |
| "model_key", |
| "display_model", |
| "manufacturer", |
| "tech", |
| "eos", |
| "eol", |
| "ruggedization", |
| "modem_type", |
| "device_type", |
| "poe", |
| "wifi", |
| "wan_ports", |
| "lan_ports", |
| "ethernet_ports", |
| "serial_ports", |
| "missing_fields", |
| "source_docs", |
| ] |
|
|
| rows_out: List[Dict[str, str]] = [] |
| for k in sorted(keys): |
| f = facts.get(k, {}) |
| l = life.get(k, {}) |
| record = { |
| "model_key": k, |
| "display_model": f.get("display_model", "") or l.get("sku", "") or k, |
| "manufacturer": l.get("manufacturer", ""), |
| "tech": l.get("tech", ""), |
| "eos": l.get("eos", ""), |
| "eol": l.get("eol", ""), |
| "ruggedization": f.get("ruggedization", ""), |
| "modem_type": f.get("modem_type", ""), |
| "device_type": f.get("device_type", ""), |
| "poe": f.get("poe", ""), |
| "wifi": f.get("wifi", ""), |
| "wan_ports": f.get("wan_ports", ""), |
| "lan_ports": f.get("lan_ports", ""), |
| "ethernet_ports": f.get("ethernet_ports", ""), |
| "serial_ports": f.get("serial_ports", ""), |
| "source_docs": f.get("sources", "") or ("routers_eos_eol_by_sku.csv" if l else ""), |
| } |
| missing = [fld for fld in REQ_FIELDS if not _norm(record.get(fld, ""))] |
| if missing: |
| record["missing_fields"] = ", ".join(missing) |
| rows_out.append(record) |
|
|
| with OUT_CSV.open("w", encoding="utf-8", newline="") as f: |
| w = csv.DictWriter(f, fieldnames=fields) |
| w.writeheader() |
| for row in rows_out: |
| w.writerow(row) |
|
|
| print(f"Wrote {OUT_CSV} with {len(rows_out)} rows needing enrichment") |
|
|
|
|
| if __name__ == "__main__": |
| main() |
|
|