| | """Improved NYC code preprocessing — fixes duplicates, improves metadata, preserves structure."""
|
| | from __future__ import annotations
|
| |
|
| | import hashlib
|
| | import json
|
| | import os
|
| | import re
|
| | from collections import Counter, OrderedDict
|
| |
|
| |
|
| |
|
| |
|
| |
|
| |
|
| | def clean_and_flatten(text: str) -> str:
|
| | """Fix mid-word line breaks and collapse whitespace while preserving list structure."""
|
| |
|
| | text = re.sub(r"(\w+)-\s*\n\s*(\w+)", r"\1\2", text)
|
| |
|
| | text = re.sub(r"\n\s*(\d+\.)\s+", r" __LISTBREAK__ \1 ", text)
|
| | text = re.sub(r"\n\s*(Exception(?:s)?[\s:.])", r" __LISTBREAK__ \1", text)
|
| | text = text.replace("\n", " ")
|
| |
|
| | text = re.sub(r"(\d+)\s*-\s*(\d+)", r"\1-\2", text)
|
| | text = re.sub(r"\s+", " ", text).strip()
|
| |
|
| | text = text.replace("__LISTBREAK__", "\n")
|
| | return text
|
| |
|
| |
|
| |
|
| |
|
| |
|
| |
|
| | def get_dominant_anchor(content: str) -> str | None:
|
| | """Detect the dominant chapter digit (1-9) or Appendix letter (A-Z)."""
|
| | anchors = re.findall(
|
| | r"(?m)^(?:\*?\s?§?\s?)(?:([1-9])\d{2,3}\.|([A-Z])(?:\d{2,3})?\.)",
|
| | content,
|
| | )
|
| | found = [item for sublist in anchors for item in sublist if item]
|
| | if not found:
|
| | return None
|
| | return Counter(found).most_common(1)[0][0]
|
| |
|
| |
|
| |
|
| |
|
| |
|
| |
|
| | _OCCUPANCY_RE = re.compile(
|
| | r"\b(?:Group|Occupancy|Classification)\s+"
|
| | r"([A-Z]-?\d?(?:\s*,\s*[A-Z]-?\d?)*)",
|
| | re.IGNORECASE,
|
| | )
|
| | _CONSTRUCTION_TYPE_RE = re.compile(
|
| | r"\bType\s+(I[A-B]?|II[A-B]?|III[A-B]?|IV[A-B]?|V[A-B]?)\b",
|
| | re.IGNORECASE,
|
| | )
|
| | _EXCEPTION_RE = re.compile(r"\bException(?:s)?\s*[:.]", re.IGNORECASE)
|
| | _CROSS_REF_RE = re.compile(
|
| | r"(?:Section|Sections|§)\s+(\d{2,4}(?:\.\d+)*(?:\s*(?:,|and|through)\s*\d{2,4}(?:\.\d+)*)*)",
|
| | re.IGNORECASE,
|
| | )
|
| |
|
| |
|
| | def extract_rich_metadata(section_id: str, text: str, code_type: str) -> dict:
|
| | """Extract enhanced metadata from section text for better filtering."""
|
| | id_parts = section_id.split(".")
|
| | parent_major = id_parts[0]
|
| | parent_minor = ".".join(id_parts[:2]) if len(id_parts) > 1 else parent_major
|
| |
|
| |
|
| | occ_matches = _OCCUPANCY_RE.findall(text)
|
| | occupancy_classes = []
|
| | for m in occ_matches:
|
| | for cls in re.split(r"\s*,\s*", m):
|
| | cls = cls.strip().upper()
|
| | if cls and cls not in occupancy_classes:
|
| | occupancy_classes.append(cls)
|
| |
|
| |
|
| | const_matches = _CONSTRUCTION_TYPE_RE.findall(text)
|
| | construction_types = sorted(set(m.upper() for m in const_matches))
|
| |
|
| |
|
| | has_exceptions = bool(_EXCEPTION_RE.search(text))
|
| | exception_count = len(_EXCEPTION_RE.findall(text))
|
| |
|
| |
|
| | xref_matches = _CROSS_REF_RE.findall(text)
|
| | cross_references = []
|
| | for m in xref_matches:
|
| | for ref in re.split(r"\s*(?:,|and|through)\s*", m):
|
| | ref = ref.strip()
|
| | if ref and ref != section_id and ref not in cross_references:
|
| | cross_references.append(ref)
|
| |
|
| | return {
|
| | "section_full": section_id,
|
| | "parent_major": parent_major,
|
| | "parent_minor": parent_minor,
|
| | "code_type": code_type,
|
| | "occupancy_classes": occupancy_classes,
|
| | "construction_types": construction_types,
|
| | "has_exceptions": has_exceptions,
|
| | "exception_count": exception_count,
|
| | "cross_references": cross_references,
|
| | }
|
| |
|
| |
|
| |
|
| |
|
| |
|
| |
|
| | def extract_trade_sections(
|
| | file_path: str,
|
| | global_dict: OrderedDict,
|
| | code_type: str,
|
| | seen_hashes: dict[str, set[str]],
|
| | ) -> OrderedDict:
|
| | """Extract code sections from a single source file with deduplication."""
|
| | if not os.path.exists(file_path):
|
| | return global_dict
|
| |
|
| | with open(file_path, "r", encoding="utf-8") as f:
|
| | content = f.read().replace("\xa0", " ")
|
| |
|
| | anchor = get_dominant_anchor(content)
|
| | if not anchor:
|
| | return global_dict
|
| |
|
| |
|
| | if anchor.isalpha():
|
| | id_pattern = rf"[A-Z]?{re.escape(anchor)}\d*(?:\.\d+)+"
|
| | else:
|
| | id_pattern = rf"{re.escape(anchor)}\d{{2,3}}(?:\.\d+)+"
|
| |
|
| | pattern = rf"(?m)^\s*[\*§]?\s*({id_pattern})\s+([A-Z\w]+)"
|
| | matches = list(re.finditer(pattern, content))
|
| |
|
| | skip_words = {
|
| | "and", "through", "to", "or", "sections", "the", "of", "in", "under", "as",
|
| | }
|
| |
|
| | for i in range(len(matches)):
|
| | clean_id = matches[i].group(1).strip()
|
| | first_word = matches[i].group(2)
|
| |
|
| | if first_word.lower() in skip_words:
|
| | continue
|
| |
|
| | start_pos = matches[i].start()
|
| | end_pos = matches[i + 1].start() if i + 1 < len(matches) else len(content)
|
| |
|
| | raw_body = content[start_pos:end_pos]
|
| | clean_body = clean_and_flatten(raw_body)
|
| |
|
| | if len(clean_body) < 60:
|
| | continue
|
| |
|
| |
|
| | block_hash = hashlib.md5(clean_body.encode()).hexdigest()
|
| |
|
| | if clean_id in global_dict:
|
| |
|
| | if clean_id not in seen_hashes:
|
| | seen_hashes[clean_id] = set()
|
| | if block_hash in seen_hashes[clean_id]:
|
| | continue
|
| | seen_hashes[clean_id].add(block_hash)
|
| |
|
| | global_dict[clean_id]["text"] += f" [CONT.]: {clean_body}"
|
| | source_name = os.path.basename(file_path)
|
| | if source_name not in global_dict[clean_id]["metadata"]["source"]:
|
| | global_dict[clean_id]["metadata"]["source"] += f", {source_name}"
|
| | else:
|
| | seen_hashes[clean_id] = {block_hash}
|
| | metadata = extract_rich_metadata(clean_id, clean_body, code_type)
|
| | metadata["source"] = os.path.basename(file_path)
|
| |
|
| | global_dict[clean_id] = {
|
| | "id": clean_id,
|
| | "text": f"CONTEXT: {metadata['parent_major']} > {metadata['parent_minor']} | CONTENT: {clean_id} {clean_body}",
|
| | "metadata": metadata,
|
| | }
|
| |
|
| | return global_dict
|
| |
|
| |
|
| |
|
| |
|
| |
|
| |
|
| |
|
| | CODE_CONFIGS = {
|
| | "Building": {
|
| | "file_range": [i for i in range(58, 112) if i not in {90, 91, 92, 93, 94, 100, 101, 103, 106, 107}],
|
| | "output_file": "BUILDING_CODE.json",
|
| | },
|
| | "FuelGas": {
|
| | "file_range": [i for i in range(43, 58) if i not in {50, 51, 52, 53, 54, 56}],
|
| | "output_file": "FUEL_GAS_CODE.json",
|
| | },
|
| | "Mechanical": {
|
| | "file_range": [i for i in range(24, 43) if i not in {30, 31}],
|
| | "output_file": "MECHANICAL_CODE.json",
|
| | },
|
| | "Plumbing": {
|
| | "file_range": list(range(1, 24)),
|
| | "output_file": "PLUMBING_CODE.json",
|
| | },
|
| | "Administrative": {
|
| | "file_range": list(range(112, 160)),
|
| | "output_file": "GENERAL_ADMINISTRATIVE_PROVISIONS.json",
|
| | },
|
| | }
|
| |
|
| |
|
| | def preprocess_all(text_dir: str, output_dir: str) -> dict[str, int]:
|
| | """Run preprocessing for all code types. Returns counts per type."""
|
| | os.makedirs(output_dir, exist_ok=True)
|
| | counts: dict[str, int] = {}
|
| |
|
| | for code_type, cfg in CODE_CONFIGS.items():
|
| | master_dict: OrderedDict = OrderedDict()
|
| | seen_hashes: dict[str, set[str]] = {}
|
| |
|
| | for file_num in cfg["file_range"]:
|
| | path = os.path.join(text_dir, f"{file_num:03d}.txt")
|
| | if os.path.exists(path):
|
| | print(f"[{code_type}] Processing {path}...")
|
| | extract_trade_sections(path, master_dict, code_type, seen_hashes)
|
| |
|
| | result = list(master_dict.values())
|
| | output_path = os.path.join(output_dir, cfg["output_file"])
|
| | with open(output_path, "w", encoding="utf-8") as f:
|
| | json.dump(result, f, indent=2, ensure_ascii=False)
|
| |
|
| | counts[code_type] = len(result)
|
| | print(f"[{code_type}] Wrote {len(result)} sections to {output_path}")
|
| |
|
| | return counts
|
| |
|
| |
|
| | if __name__ == "__main__":
|
| | import sys
|
| |
|
| | text_dir = sys.argv[1] if len(sys.argv) > 1 else "Text"
|
| | output_dir = sys.argv[2] if len(sys.argv) > 2 else "data"
|
| |
|
| | counts = preprocess_all(text_dir, output_dir)
|
| | print(f"\nPreprocessing complete: {counts}")
|
| |
|