Updated_code_complaince / data /preprocess_codes.py
Ryan2219's picture
Upload 70 files
e1ced8e verified
"""Improved NYC code preprocessing — fixes duplicates, improves metadata, preserves structure."""
from __future__ import annotations
import hashlib
import json
import os
import re
from collections import Counter, OrderedDict
# ---------------------------------------------------------------------------
# Text cleaning
# ---------------------------------------------------------------------------
def clean_and_flatten(text: str) -> str:
"""Fix mid-word line breaks and collapse whitespace while preserving list structure."""
# Fix words split by hyphens across lines (e.g., "accord-\nance")
text = re.sub(r"(\w+)-\s*\n\s*(\w+)", r"\1\2", text)
# Preserve numbered list items by inserting a marker before cleanup
text = re.sub(r"\n\s*(\d+\.)\s+", r" __LISTBREAK__ \1 ", text)
text = re.sub(r"\n\s*(Exception(?:s)?[\s:.])", r" __LISTBREAK__ \1", text)
text = text.replace("\n", " ")
# Clean spacing around dashes in section numbers (e.g., 28 - 101)
text = re.sub(r"(\d+)\s*-\s*(\d+)", r"\1-\2", text)
text = re.sub(r"\s+", " ", text).strip()
# Restore list breaks as newlines
text = text.replace("__LISTBREAK__", "\n")
return text
# ---------------------------------------------------------------------------
# Anchor / section detection
# ---------------------------------------------------------------------------
def get_dominant_anchor(content: str) -> str | None:
"""Detect the dominant chapter digit (1-9) or Appendix letter (A-Z)."""
anchors = re.findall(
r"(?m)^(?:\*?\s?§?\s?)(?:([1-9])\d{2,3}\.|([A-Z])(?:\d{2,3})?\.)",
content,
)
found = [item for sublist in anchors for item in sublist if item]
if not found:
return None
return Counter(found).most_common(1)[0][0]
# ---------------------------------------------------------------------------
# Metadata extraction from section text
# ---------------------------------------------------------------------------
_OCCUPANCY_RE = re.compile(
r"\b(?:Group|Occupancy|Classification)\s+"
r"([A-Z]-?\d?(?:\s*,\s*[A-Z]-?\d?)*)",
re.IGNORECASE,
)
_CONSTRUCTION_TYPE_RE = re.compile(
r"\bType\s+(I[A-B]?|II[A-B]?|III[A-B]?|IV[A-B]?|V[A-B]?)\b",
re.IGNORECASE,
)
_EXCEPTION_RE = re.compile(r"\bException(?:s)?\s*[:.]", re.IGNORECASE)
_CROSS_REF_RE = re.compile(
r"(?:Section|Sections|§)\s+(\d{2,4}(?:\.\d+)*(?:\s*(?:,|and|through)\s*\d{2,4}(?:\.\d+)*)*)",
re.IGNORECASE,
)
def extract_rich_metadata(section_id: str, text: str, code_type: str) -> dict:
"""Extract enhanced metadata from section text for better filtering."""
id_parts = section_id.split(".")
parent_major = id_parts[0]
parent_minor = ".".join(id_parts[:2]) if len(id_parts) > 1 else parent_major
# Occupancy classes mentioned
occ_matches = _OCCUPANCY_RE.findall(text)
occupancy_classes = []
for m in occ_matches:
for cls in re.split(r"\s*,\s*", m):
cls = cls.strip().upper()
if cls and cls not in occupancy_classes:
occupancy_classes.append(cls)
# Construction types mentioned
const_matches = _CONSTRUCTION_TYPE_RE.findall(text)
construction_types = sorted(set(m.upper() for m in const_matches))
# Exception detection
has_exceptions = bool(_EXCEPTION_RE.search(text))
exception_count = len(_EXCEPTION_RE.findall(text))
# Cross-references
xref_matches = _CROSS_REF_RE.findall(text)
cross_references = []
for m in xref_matches:
for ref in re.split(r"\s*(?:,|and|through)\s*", m):
ref = ref.strip()
if ref and ref != section_id and ref not in cross_references:
cross_references.append(ref)
return {
"section_full": section_id,
"parent_major": parent_major,
"parent_minor": parent_minor,
"code_type": code_type,
"occupancy_classes": occupancy_classes,
"construction_types": construction_types,
"has_exceptions": has_exceptions,
"exception_count": exception_count,
"cross_references": cross_references,
}
# ---------------------------------------------------------------------------
# Core extraction with deduplication
# ---------------------------------------------------------------------------
def extract_trade_sections(
file_path: str,
global_dict: OrderedDict,
code_type: str,
seen_hashes: dict[str, set[str]],
) -> OrderedDict:
"""Extract code sections from a single source file with deduplication."""
if not os.path.exists(file_path):
return global_dict
with open(file_path, "r", encoding="utf-8") as f:
content = f.read().replace("\xa0", " ")
anchor = get_dominant_anchor(content)
if not anchor:
return global_dict
# Build section-matching regex
if anchor.isalpha():
id_pattern = rf"[A-Z]?{re.escape(anchor)}\d*(?:\.\d+)+"
else:
id_pattern = rf"{re.escape(anchor)}\d{{2,3}}(?:\.\d+)+"
pattern = rf"(?m)^\s*[\*§]?\s*({id_pattern})\s+([A-Z\w]+)"
matches = list(re.finditer(pattern, content))
skip_words = {
"and", "through", "to", "or", "sections", "the", "of", "in", "under", "as",
}
for i in range(len(matches)):
clean_id = matches[i].group(1).strip()
first_word = matches[i].group(2)
if first_word.lower() in skip_words:
continue
start_pos = matches[i].start()
end_pos = matches[i + 1].start() if i + 1 < len(matches) else len(content)
raw_body = content[start_pos:end_pos]
clean_body = clean_and_flatten(raw_body)
if len(clean_body) < 60:
continue
# ------ DEDUPLICATION via content hashing ------
block_hash = hashlib.md5(clean_body.encode()).hexdigest()
if clean_id in global_dict:
# Check if this block is a genuine duplicate
if clean_id not in seen_hashes:
seen_hashes[clean_id] = set()
if block_hash in seen_hashes[clean_id]:
continue # Skip exact duplicate
seen_hashes[clean_id].add(block_hash)
global_dict[clean_id]["text"] += f" [CONT.]: {clean_body}"
source_name = os.path.basename(file_path)
if source_name not in global_dict[clean_id]["metadata"]["source"]:
global_dict[clean_id]["metadata"]["source"] += f", {source_name}"
else:
seen_hashes[clean_id] = {block_hash}
metadata = extract_rich_metadata(clean_id, clean_body, code_type)
metadata["source"] = os.path.basename(file_path)
global_dict[clean_id] = {
"id": clean_id,
"text": f"CONTEXT: {metadata['parent_major']} > {metadata['parent_minor']} | CONTENT: {clean_id} {clean_body}",
"metadata": metadata,
}
return global_dict
# ---------------------------------------------------------------------------
# Main pipeline
# ---------------------------------------------------------------------------
# File ranges per code type (same as original, but parameterized)
CODE_CONFIGS = {
"Building": {
"file_range": [i for i in range(58, 112) if i not in {90, 91, 92, 93, 94, 100, 101, 103, 106, 107}],
"output_file": "BUILDING_CODE.json",
},
"FuelGas": {
"file_range": [i for i in range(43, 58) if i not in {50, 51, 52, 53, 54, 56}],
"output_file": "FUEL_GAS_CODE.json",
},
"Mechanical": {
"file_range": [i for i in range(24, 43) if i not in {30, 31}],
"output_file": "MECHANICAL_CODE.json",
},
"Plumbing": {
"file_range": list(range(1, 24)),
"output_file": "PLUMBING_CODE.json",
},
"Administrative": {
"file_range": list(range(112, 160)),
"output_file": "GENERAL_ADMINISTRATIVE_PROVISIONS.json",
},
}
def preprocess_all(text_dir: str, output_dir: str) -> dict[str, int]:
"""Run preprocessing for all code types. Returns counts per type."""
os.makedirs(output_dir, exist_ok=True)
counts: dict[str, int] = {}
for code_type, cfg in CODE_CONFIGS.items():
master_dict: OrderedDict = OrderedDict()
seen_hashes: dict[str, set[str]] = {}
for file_num in cfg["file_range"]:
path = os.path.join(text_dir, f"{file_num:03d}.txt")
if os.path.exists(path):
print(f"[{code_type}] Processing {path}...")
extract_trade_sections(path, master_dict, code_type, seen_hashes)
result = list(master_dict.values())
output_path = os.path.join(output_dir, cfg["output_file"])
with open(output_path, "w", encoding="utf-8") as f:
json.dump(result, f, indent=2, ensure_ascii=False)
counts[code_type] = len(result)
print(f"[{code_type}] Wrote {len(result)} sections to {output_path}")
return counts
if __name__ == "__main__":
import sys
text_dir = sys.argv[1] if len(sys.argv) > 1 else "Text"
output_dir = sys.argv[2] if len(sys.argv) > 2 else "data"
counts = preprocess_all(text_dir, output_dir)
print(f"\nPreprocessing complete: {counts}")