File size: 9,447 Bytes
e1ced8e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
"""Improved NYC code preprocessing — fixes duplicates, improves metadata, preserves structure."""
from __future__ import annotations

import hashlib
import json
import os
import re
from collections import Counter, OrderedDict


# ---------------------------------------------------------------------------
# Text cleaning
# ---------------------------------------------------------------------------

def clean_and_flatten(text: str) -> str:
    """Fix mid-word line breaks and collapse whitespace while preserving list structure."""
    # Fix words split by hyphens across lines (e.g., "accord-\nance")
    text = re.sub(r"(\w+)-\s*\n\s*(\w+)", r"\1\2", text)
    # Preserve numbered list items by inserting a marker before cleanup
    text = re.sub(r"\n\s*(\d+\.)\s+", r" __LISTBREAK__ \1 ", text)
    text = re.sub(r"\n\s*(Exception(?:s)?[\s:.])", r" __LISTBREAK__ \1", text)
    text = text.replace("\n", " ")
    # Clean spacing around dashes in section numbers (e.g., 28 - 101)
    text = re.sub(r"(\d+)\s*-\s*(\d+)", r"\1-\2", text)
    text = re.sub(r"\s+", " ", text).strip()
    # Restore list breaks as newlines
    text = text.replace("__LISTBREAK__", "\n")
    return text


# ---------------------------------------------------------------------------
# Anchor / section detection
# ---------------------------------------------------------------------------

def get_dominant_anchor(content: str) -> str | None:
    """Detect the dominant chapter digit (1-9) or Appendix letter (A-Z)."""
    anchors = re.findall(
        r"(?m)^(?:\*?\s?§?\s?)(?:([1-9])\d{2,3}\.|([A-Z])(?:\d{2,3})?\.)",
        content,
    )
    found = [item for sublist in anchors for item in sublist if item]
    if not found:
        return None
    return Counter(found).most_common(1)[0][0]


# ---------------------------------------------------------------------------
# Metadata extraction from section text
# ---------------------------------------------------------------------------

_OCCUPANCY_RE = re.compile(
    r"\b(?:Group|Occupancy|Classification)\s+"
    r"([A-Z]-?\d?(?:\s*,\s*[A-Z]-?\d?)*)",
    re.IGNORECASE,
)
_CONSTRUCTION_TYPE_RE = re.compile(
    r"\bType\s+(I[A-B]?|II[A-B]?|III[A-B]?|IV[A-B]?|V[A-B]?)\b",
    re.IGNORECASE,
)
_EXCEPTION_RE = re.compile(r"\bException(?:s)?\s*[:.]", re.IGNORECASE)
_CROSS_REF_RE = re.compile(
    r"(?:Section|Sections|§)\s+(\d{2,4}(?:\.\d+)*(?:\s*(?:,|and|through)\s*\d{2,4}(?:\.\d+)*)*)",
    re.IGNORECASE,
)


def extract_rich_metadata(section_id: str, text: str, code_type: str) -> dict:
    """Extract enhanced metadata from section text for better filtering."""
    id_parts = section_id.split(".")
    parent_major = id_parts[0]
    parent_minor = ".".join(id_parts[:2]) if len(id_parts) > 1 else parent_major

    # Occupancy classes mentioned
    occ_matches = _OCCUPANCY_RE.findall(text)
    occupancy_classes = []
    for m in occ_matches:
        for cls in re.split(r"\s*,\s*", m):
            cls = cls.strip().upper()
            if cls and cls not in occupancy_classes:
                occupancy_classes.append(cls)

    # Construction types mentioned
    const_matches = _CONSTRUCTION_TYPE_RE.findall(text)
    construction_types = sorted(set(m.upper() for m in const_matches))

    # Exception detection
    has_exceptions = bool(_EXCEPTION_RE.search(text))
    exception_count = len(_EXCEPTION_RE.findall(text))

    # Cross-references
    xref_matches = _CROSS_REF_RE.findall(text)
    cross_references = []
    for m in xref_matches:
        for ref in re.split(r"\s*(?:,|and|through)\s*", m):
            ref = ref.strip()
            if ref and ref != section_id and ref not in cross_references:
                cross_references.append(ref)

    return {
        "section_full": section_id,
        "parent_major": parent_major,
        "parent_minor": parent_minor,
        "code_type": code_type,
        "occupancy_classes": occupancy_classes,
        "construction_types": construction_types,
        "has_exceptions": has_exceptions,
        "exception_count": exception_count,
        "cross_references": cross_references,
    }


# ---------------------------------------------------------------------------
# Core extraction with deduplication
# ---------------------------------------------------------------------------

def extract_trade_sections(

    file_path: str,

    global_dict: OrderedDict,

    code_type: str,

    seen_hashes: dict[str, set[str]],

) -> OrderedDict:
    """Extract code sections from a single source file with deduplication."""
    if not os.path.exists(file_path):
        return global_dict

    with open(file_path, "r", encoding="utf-8") as f:
        content = f.read().replace("\xa0", " ")

    anchor = get_dominant_anchor(content)
    if not anchor:
        return global_dict

    # Build section-matching regex
    if anchor.isalpha():
        id_pattern = rf"[A-Z]?{re.escape(anchor)}\d*(?:\.\d+)+"
    else:
        id_pattern = rf"{re.escape(anchor)}\d{{2,3}}(?:\.\d+)+"

    pattern = rf"(?m)^\s*[\*§]?\s*({id_pattern})\s+([A-Z\w]+)"
    matches = list(re.finditer(pattern, content))

    skip_words = {
        "and", "through", "to", "or", "sections", "the", "of", "in", "under", "as",
    }

    for i in range(len(matches)):
        clean_id = matches[i].group(1).strip()
        first_word = matches[i].group(2)

        if first_word.lower() in skip_words:
            continue

        start_pos = matches[i].start()
        end_pos = matches[i + 1].start() if i + 1 < len(matches) else len(content)

        raw_body = content[start_pos:end_pos]
        clean_body = clean_and_flatten(raw_body)

        if len(clean_body) < 60:
            continue

        # ------ DEDUPLICATION via content hashing ------
        block_hash = hashlib.md5(clean_body.encode()).hexdigest()

        if clean_id in global_dict:
            # Check if this block is a genuine duplicate
            if clean_id not in seen_hashes:
                seen_hashes[clean_id] = set()
            if block_hash in seen_hashes[clean_id]:
                continue  # Skip exact duplicate
            seen_hashes[clean_id].add(block_hash)

            global_dict[clean_id]["text"] += f" [CONT.]: {clean_body}"
            source_name = os.path.basename(file_path)
            if source_name not in global_dict[clean_id]["metadata"]["source"]:
                global_dict[clean_id]["metadata"]["source"] += f", {source_name}"
        else:
            seen_hashes[clean_id] = {block_hash}
            metadata = extract_rich_metadata(clean_id, clean_body, code_type)
            metadata["source"] = os.path.basename(file_path)

            global_dict[clean_id] = {
                "id": clean_id,
                "text": f"CONTEXT: {metadata['parent_major']} > {metadata['parent_minor']} | CONTENT: {clean_id} {clean_body}",
                "metadata": metadata,
            }

    return global_dict


# ---------------------------------------------------------------------------
# Main pipeline
# ---------------------------------------------------------------------------

# File ranges per code type (same as original, but parameterized)
CODE_CONFIGS = {
    "Building": {
        "file_range": [i for i in range(58, 112) if i not in {90, 91, 92, 93, 94, 100, 101, 103, 106, 107}],
        "output_file": "BUILDING_CODE.json",
    },
    "FuelGas": {
        "file_range": [i for i in range(43, 58) if i not in {50, 51, 52, 53, 54, 56}],
        "output_file": "FUEL_GAS_CODE.json",
    },
    "Mechanical": {
        "file_range": [i for i in range(24, 43) if i not in {30, 31}],
        "output_file": "MECHANICAL_CODE.json",
    },
    "Plumbing": {
        "file_range": list(range(1, 24)),
        "output_file": "PLUMBING_CODE.json",
    },
    "Administrative": {
        "file_range": list(range(112, 160)),
        "output_file": "GENERAL_ADMINISTRATIVE_PROVISIONS.json",
    },
}


def preprocess_all(text_dir: str, output_dir: str) -> dict[str, int]:
    """Run preprocessing for all code types. Returns counts per type."""
    os.makedirs(output_dir, exist_ok=True)
    counts: dict[str, int] = {}

    for code_type, cfg in CODE_CONFIGS.items():
        master_dict: OrderedDict = OrderedDict()
        seen_hashes: dict[str, set[str]] = {}

        for file_num in cfg["file_range"]:
            path = os.path.join(text_dir, f"{file_num:03d}.txt")
            if os.path.exists(path):
                print(f"[{code_type}] Processing {path}...")
                extract_trade_sections(path, master_dict, code_type, seen_hashes)

        result = list(master_dict.values())
        output_path = os.path.join(output_dir, cfg["output_file"])
        with open(output_path, "w", encoding="utf-8") as f:
            json.dump(result, f, indent=2, ensure_ascii=False)

        counts[code_type] = len(result)
        print(f"[{code_type}] Wrote {len(result)} sections to {output_path}")

    return counts


if __name__ == "__main__":
    import sys

    text_dir = sys.argv[1] if len(sys.argv) > 1 else "Text"
    output_dir = sys.argv[2] if len(sys.argv) > 2 else "data"

    counts = preprocess_all(text_dir, output_dir)
    print(f"\nPreprocessing complete: {counts}")