Masters-four-Tab-OpenAI / backend /scripts /generate_router_missing_fields_audit.py
Pete Dunn
Complete 10-step router/pots normalization and deterministic resolver rollout
65ecef2
#!/usr/bin/env python3
from __future__ import annotations
import csv
import re
from pathlib import Path
from typing import Any, Dict, List
REPO_ROOT = Path(__file__).resolve().parents[2]
LIFECYCLE_CSV = REPO_ROOT / "routers_eos_eol_by_sku.csv"
FACT_CSVS = [
REPO_ROOT / "feb2026routers.csv",
REPO_ROOT / "replacement_devices_missing_from_dec2025routers.csv",
REPO_ROOT / "backend" / "app" / "knowledgebase" / "data" / "normalized" / "router_pricing_catalog_normalized.csv",
]
OUT_CSV = REPO_ROOT / "docs" / "reports" / "router_missing_fields_audit.csv"
REQ_FIELDS = [
"ruggedization",
"modem_type",
"device_type",
"poe",
"wifi",
"wan_ports",
"lan_ports",
"ethernet_ports",
"serial_ports",
]
def _norm(v: Any) -> str:
return re.sub(r"\s+", " ", str(v or "").strip())
def _compact(v: Any) -> str:
return re.sub(r"[^A-Z0-9]", "", _norm(v).upper())
def _header_get(row: Dict[str, Any], *names: str) -> str:
lut = {str(k).strip().lower(): k for k in row.keys()}
for n in names:
k = lut.get(n.strip().lower())
if k:
return _norm(row.get(k, ""))
return ""
def _extract_models(text: str) -> List[str]:
out: List[str] = []
seen = set()
for m in re.finditer(r"\b[A-Za-z]{1,8}[- ]?\d{2,4}[A-Za-z0-9\-]*\b", text):
tok = _compact(m.group(0))
if not tok or tok.isdigit() or len(tok) < 4:
continue
if (not any(c.isalpha() for c in tok)) or (not any(c.isdigit() for c in tok)):
continue
if tok not in seen:
seen.add(tok)
out.append(tok)
return out
def _infer_poe(blob: str) -> str:
low = blob.lower()
if "poe" in low:
if "no poe" in low or "without poe" in low:
return "no"
return "yes"
return ""
def _load_fact_index() -> Dict[str, Dict[str, str]]:
idx: Dict[str, Dict[str, str]] = {}
for path in FACT_CSVS:
if not path.exists():
continue
with path.open("r", encoding="utf-8-sig", newline="") as f:
reader = csv.DictReader(f)
for ridx, row in enumerate(reader, start=2):
model = _header_get(row, "Model", "SKU", "Title", "Device", "router", "model_key")
model_cands = _extract_models(model)
if not model_cands:
model_cands = _extract_models(_header_get(row, "Title", "Product description", "description", "notes", "sku"))
if not model_cands:
continue
mk = model_cands[0]
record = idx.setdefault(
mk,
{
"model_key": mk,
"display_model": _header_get(row, "Title", "Model", "SKU", "Device") or mk,
"sources": "",
"ruggedization": "",
"modem_type": "",
"device_type": "",
"poe": "",
"wifi": "",
"wan_ports": "",
"lan_ports": "",
"ethernet_ports": "",
"serial_ports": "",
},
)
source_name = path.name
if source_name not in record["sources"]:
record["sources"] = (record["sources"] + "; " + source_name).strip("; ")
raw_blob = " ".join(
[
_header_get(row, "Product description", "Description", "notes", "Special notes", "short description", "Long Description"),
_header_get(row, "WAN ports and speed", "WAN ports", "wan_ports", "ethernet_ports"),
_header_get(row, "LAN ports and speed", "LAN ports", "lan_ports", "ethernet_ports"),
]
)
field_map = {
"ruggedization": _header_get(row, "Ruggedization", "ruggedization"),
"modem_type": _header_get(row, "Modem Type", "modem_type", "modem", "Cellular Networks"),
"device_type": _header_get(row, "Type", "Device Type", "device_type"),
"poe": _header_get(row, "poe", "PoE") or _infer_poe(raw_blob),
"wifi": _header_get(row, "WiFi type", "WiFi", "wifi", "wireless"),
"wan_ports": _header_get(row, "WAN ports and speed", "WAN ports", "wan_ports"),
"lan_ports": _header_get(row, "LAN ports and speed", "LAN ports", "lan_ports"),
"ethernet_ports": _header_get(row, "Ethernet Ports", "ethernet_ports", "WAN/LAN", "WAN ports and speed"),
"serial_ports": _header_get(row, "Serial port (yes/no)", "Serial Ports", "serial", "serial_ports"),
}
for k, val in field_map.items():
if (not record[k]) and val:
record[k] = val
# fill from free-text cues
if (not record["device_type"]) and raw_blob:
low = raw_blob.lower()
if "adapter" in low:
record["device_type"] = "Adapter"
elif any(t in low for t in ("router", "gateway", "fwa", "cpe")):
record["device_type"] = "Router"
if (not record["serial_ports"]) and ("serial" in raw_blob.lower()):
record["serial_ports"] = "yes"
return idx
def _load_lifecycle_index() -> Dict[str, Dict[str, str]]:
out: Dict[str, Dict[str, str]] = {}
if not LIFECYCLE_CSV.exists():
return out
with LIFECYCLE_CSV.open("r", encoding="utf-8-sig", newline="") as f:
r = csv.DictReader(f)
for row in r:
sku = _norm(row.get("SKU", ""))
if not sku:
continue
key = _compact(sku)
if not key:
continue
out[key] = {
"sku": sku,
"manufacturer": _norm(row.get("manufacturer", "")),
"tech": _norm(row.get("Tech", "")),
"eos": _norm(row.get("end_of_sale", "")),
"eol": _norm(row.get("end_of_life", "")),
}
return out
def main() -> None:
facts = _load_fact_index()
life = _load_lifecycle_index()
keys = set(facts.keys()) | set(life.keys())
OUT_CSV.parent.mkdir(parents=True, exist_ok=True)
fields = [
"model_key",
"display_model",
"manufacturer",
"tech",
"eos",
"eol",
"ruggedization",
"modem_type",
"device_type",
"poe",
"wifi",
"wan_ports",
"lan_ports",
"ethernet_ports",
"serial_ports",
"missing_fields",
"source_docs",
]
rows_out: List[Dict[str, str]] = []
for k in sorted(keys):
f = facts.get(k, {})
l = life.get(k, {})
record = {
"model_key": k,
"display_model": f.get("display_model", "") or l.get("sku", "") or k,
"manufacturer": l.get("manufacturer", ""),
"tech": l.get("tech", ""),
"eos": l.get("eos", ""),
"eol": l.get("eol", ""),
"ruggedization": f.get("ruggedization", ""),
"modem_type": f.get("modem_type", ""),
"device_type": f.get("device_type", ""),
"poe": f.get("poe", ""),
"wifi": f.get("wifi", ""),
"wan_ports": f.get("wan_ports", ""),
"lan_ports": f.get("lan_ports", ""),
"ethernet_ports": f.get("ethernet_ports", ""),
"serial_ports": f.get("serial_ports", ""),
"source_docs": f.get("sources", "") or ("routers_eos_eol_by_sku.csv" if l else ""),
}
missing = [fld for fld in REQ_FIELDS if not _norm(record.get(fld, ""))]
if missing:
record["missing_fields"] = ", ".join(missing)
rows_out.append(record)
with OUT_CSV.open("w", encoding="utf-8", newline="") as f:
w = csv.DictWriter(f, fieldnames=fields)
w.writeheader()
for row in rows_out:
w.writerow(row)
print(f"Wrote {OUT_CSV} with {len(rows_out)} rows needing enrichment")
if __name__ == "__main__":
main()