"""Improved NYC code preprocessing — fixes duplicates, improves metadata, preserves structure.""" from __future__ import annotations import hashlib import json import os import re from collections import Counter, OrderedDict # --------------------------------------------------------------------------- # Text cleaning # --------------------------------------------------------------------------- def clean_and_flatten(text: str) -> str: """Fix mid-word line breaks and collapse whitespace while preserving list structure.""" # Fix words split by hyphens across lines (e.g., "accord-\nance") text = re.sub(r"(\w+)-\s*\n\s*(\w+)", r"\1\2", text) # Preserve numbered list items by inserting a marker before cleanup text = re.sub(r"\n\s*(\d+\.)\s+", r" __LISTBREAK__ \1 ", text) text = re.sub(r"\n\s*(Exception(?:s)?[\s:.])", r" __LISTBREAK__ \1", text) text = text.replace("\n", " ") # Clean spacing around dashes in section numbers (e.g., 28 - 101) text = re.sub(r"(\d+)\s*-\s*(\d+)", r"\1-\2", text) text = re.sub(r"\s+", " ", text).strip() # Restore list breaks as newlines text = text.replace("__LISTBREAK__", "\n") return text # --------------------------------------------------------------------------- # Anchor / section detection # --------------------------------------------------------------------------- def get_dominant_anchor(content: str) -> str | None: """Detect the dominant chapter digit (1-9) or Appendix letter (A-Z).""" anchors = re.findall( r"(?m)^(?:\*?\s?§?\s?)(?:([1-9])\d{2,3}\.|([A-Z])(?:\d{2,3})?\.)", content, ) found = [item for sublist in anchors for item in sublist if item] if not found: return None return Counter(found).most_common(1)[0][0] # --------------------------------------------------------------------------- # Metadata extraction from section text # --------------------------------------------------------------------------- _OCCUPANCY_RE = re.compile( r"\b(?:Group|Occupancy|Classification)\s+" r"([A-Z]-?\d?(?:\s*,\s*[A-Z]-?\d?)*)", re.IGNORECASE, ) _CONSTRUCTION_TYPE_RE = re.compile( r"\bType\s+(I[A-B]?|II[A-B]?|III[A-B]?|IV[A-B]?|V[A-B]?)\b", re.IGNORECASE, ) _EXCEPTION_RE = re.compile(r"\bException(?:s)?\s*[:.]", re.IGNORECASE) _CROSS_REF_RE = re.compile( r"(?:Section|Sections|§)\s+(\d{2,4}(?:\.\d+)*(?:\s*(?:,|and|through)\s*\d{2,4}(?:\.\d+)*)*)", re.IGNORECASE, ) def extract_rich_metadata(section_id: str, text: str, code_type: str) -> dict: """Extract enhanced metadata from section text for better filtering.""" id_parts = section_id.split(".") parent_major = id_parts[0] parent_minor = ".".join(id_parts[:2]) if len(id_parts) > 1 else parent_major # Occupancy classes mentioned occ_matches = _OCCUPANCY_RE.findall(text) occupancy_classes = [] for m in occ_matches: for cls in re.split(r"\s*,\s*", m): cls = cls.strip().upper() if cls and cls not in occupancy_classes: occupancy_classes.append(cls) # Construction types mentioned const_matches = _CONSTRUCTION_TYPE_RE.findall(text) construction_types = sorted(set(m.upper() for m in const_matches)) # Exception detection has_exceptions = bool(_EXCEPTION_RE.search(text)) exception_count = len(_EXCEPTION_RE.findall(text)) # Cross-references xref_matches = _CROSS_REF_RE.findall(text) cross_references = [] for m in xref_matches: for ref in re.split(r"\s*(?:,|and|through)\s*", m): ref = ref.strip() if ref and ref != section_id and ref not in cross_references: cross_references.append(ref) return { "section_full": section_id, "parent_major": parent_major, "parent_minor": parent_minor, "code_type": code_type, "occupancy_classes": occupancy_classes, "construction_types": construction_types, "has_exceptions": has_exceptions, "exception_count": exception_count, "cross_references": cross_references, } # --------------------------------------------------------------------------- # Core extraction with deduplication # --------------------------------------------------------------------------- def extract_trade_sections( file_path: str, global_dict: OrderedDict, code_type: str, seen_hashes: dict[str, set[str]], ) -> OrderedDict: """Extract code sections from a single source file with deduplication.""" if not os.path.exists(file_path): return global_dict with open(file_path, "r", encoding="utf-8") as f: content = f.read().replace("\xa0", " ") anchor = get_dominant_anchor(content) if not anchor: return global_dict # Build section-matching regex if anchor.isalpha(): id_pattern = rf"[A-Z]?{re.escape(anchor)}\d*(?:\.\d+)+" else: id_pattern = rf"{re.escape(anchor)}\d{{2,3}}(?:\.\d+)+" pattern = rf"(?m)^\s*[\*§]?\s*({id_pattern})\s+([A-Z\w]+)" matches = list(re.finditer(pattern, content)) skip_words = { "and", "through", "to", "or", "sections", "the", "of", "in", "under", "as", } for i in range(len(matches)): clean_id = matches[i].group(1).strip() first_word = matches[i].group(2) if first_word.lower() in skip_words: continue start_pos = matches[i].start() end_pos = matches[i + 1].start() if i + 1 < len(matches) else len(content) raw_body = content[start_pos:end_pos] clean_body = clean_and_flatten(raw_body) if len(clean_body) < 60: continue # ------ DEDUPLICATION via content hashing ------ block_hash = hashlib.md5(clean_body.encode()).hexdigest() if clean_id in global_dict: # Check if this block is a genuine duplicate if clean_id not in seen_hashes: seen_hashes[clean_id] = set() if block_hash in seen_hashes[clean_id]: continue # Skip exact duplicate seen_hashes[clean_id].add(block_hash) global_dict[clean_id]["text"] += f" [CONT.]: {clean_body}" source_name = os.path.basename(file_path) if source_name not in global_dict[clean_id]["metadata"]["source"]: global_dict[clean_id]["metadata"]["source"] += f", {source_name}" else: seen_hashes[clean_id] = {block_hash} metadata = extract_rich_metadata(clean_id, clean_body, code_type) metadata["source"] = os.path.basename(file_path) global_dict[clean_id] = { "id": clean_id, "text": f"CONTEXT: {metadata['parent_major']} > {metadata['parent_minor']} | CONTENT: {clean_id} {clean_body}", "metadata": metadata, } return global_dict # --------------------------------------------------------------------------- # Main pipeline # --------------------------------------------------------------------------- # File ranges per code type (same as original, but parameterized) CODE_CONFIGS = { "Building": { "file_range": [i for i in range(58, 112) if i not in {90, 91, 92, 93, 94, 100, 101, 103, 106, 107}], "output_file": "BUILDING_CODE.json", }, "FuelGas": { "file_range": [i for i in range(43, 58) if i not in {50, 51, 52, 53, 54, 56}], "output_file": "FUEL_GAS_CODE.json", }, "Mechanical": { "file_range": [i for i in range(24, 43) if i not in {30, 31}], "output_file": "MECHANICAL_CODE.json", }, "Plumbing": { "file_range": list(range(1, 24)), "output_file": "PLUMBING_CODE.json", }, "Administrative": { "file_range": list(range(112, 160)), "output_file": "GENERAL_ADMINISTRATIVE_PROVISIONS.json", }, } def preprocess_all(text_dir: str, output_dir: str) -> dict[str, int]: """Run preprocessing for all code types. Returns counts per type.""" os.makedirs(output_dir, exist_ok=True) counts: dict[str, int] = {} for code_type, cfg in CODE_CONFIGS.items(): master_dict: OrderedDict = OrderedDict() seen_hashes: dict[str, set[str]] = {} for file_num in cfg["file_range"]: path = os.path.join(text_dir, f"{file_num:03d}.txt") if os.path.exists(path): print(f"[{code_type}] Processing {path}...") extract_trade_sections(path, master_dict, code_type, seen_hashes) result = list(master_dict.values()) output_path = os.path.join(output_dir, cfg["output_file"]) with open(output_path, "w", encoding="utf-8") as f: json.dump(result, f, indent=2, ensure_ascii=False) counts[code_type] = len(result) print(f"[{code_type}] Wrote {len(result)} sections to {output_path}") return counts if __name__ == "__main__": import sys text_dir = sys.argv[1] if len(sys.argv) > 1 else "Text" output_dir = sys.argv[2] if len(sys.argv) > 2 else "data" counts = preprocess_all(text_dir, output_dir) print(f"\nPreprocessing complete: {counts}")