| | """Ingest preprocessed NYC code JSON files into ChromaDB with bge-large-en-v1.5."""
|
| | from __future__ import annotations
|
| |
|
| | import json
|
| | import os
|
| | import sys
|
| |
|
| | import chromadb
|
| | from chromadb.utils import embedding_functions
|
| |
|
| |
|
| | EMBEDDING_MODEL = "BAAI/bge-large-en-v1.5"
|
| | COLLECTION_NAME = "nyc_building_codes"
|
| | DB_PATH = os.path.join(os.path.dirname(__file__), "nyc_code_db")
|
| |
|
| |
|
| | CODE_FILES = {
|
| | "BUILDING_CODE.json": "Building",
|
| | "FUEL_GAS_CODE.json": "FuelGas",
|
| | "GENERAL_ADMINISTRATIVE_PROVISIONS.json": "Administrative",
|
| | "MECHANICAL_CODE.json": "Mechanical",
|
| | "PLUMBING_CODE.json": "Plumbing",
|
| | }
|
| |
|
| |
|
| | def create_collection(db_path: str = DB_PATH, reset: bool = True):
|
| | """Create or reset the ChromaDB collection."""
|
| | client = chromadb.PersistentClient(path=db_path)
|
| | embedding_fn = embedding_functions.SentenceTransformerEmbeddingFunction(
|
| | model_name=EMBEDDING_MODEL,
|
| | )
|
| |
|
| | if reset:
|
| | try:
|
| | client.delete_collection(name=COLLECTION_NAME)
|
| | print(f"Deleted existing collection '{COLLECTION_NAME}'.")
|
| | except Exception:
|
| | pass
|
| |
|
| | collection = client.create_collection(
|
| | name=COLLECTION_NAME,
|
| | embedding_function=embedding_fn,
|
| | )
|
| | return client, collection
|
| |
|
| |
|
| | def ingest_json_file(collection, json_path: str, code_type: str) -> int:
|
| | """Ingest a single JSON file into the collection. Returns count of sections added."""
|
| | with open(json_path, "r", encoding="utf-8") as f:
|
| | data = json.load(f)
|
| |
|
| | documents = []
|
| | metadatas = []
|
| | ids = []
|
| | seen_ids: set[str] = set()
|
| |
|
| | for entry in data:
|
| | meta = entry["metadata"]
|
| |
|
| | meta["code_type"] = code_type
|
| |
|
| | unique_id = f"{code_type}_{entry['id']}"
|
| | if unique_id in seen_ids:
|
| | continue
|
| |
|
| |
|
| | flat_meta = {}
|
| | for k, v in meta.items():
|
| | if isinstance(v, list):
|
| | flat_meta[k] = ", ".join(str(x) for x in v) if v else ""
|
| | elif isinstance(v, bool):
|
| | flat_meta[k] = v
|
| | elif isinstance(v, (int, float)):
|
| | flat_meta[k] = v
|
| | else:
|
| | flat_meta[k] = str(v)
|
| |
|
| | documents.append(entry["text"])
|
| | metadatas.append(flat_meta)
|
| | ids.append(unique_id)
|
| | seen_ids.add(unique_id)
|
| |
|
| |
|
| | batch_size = 200
|
| | for i in range(0, len(documents), batch_size):
|
| | batch_end = min(i + batch_size, len(documents))
|
| | collection.upsert(
|
| | documents=documents[i:batch_end],
|
| | metadatas=metadatas[i:batch_end],
|
| | ids=ids[i:batch_end],
|
| | )
|
| | print(f" Batch {i // batch_size + 1}: upserted {batch_end - i} sections")
|
| |
|
| | return len(ids)
|
| |
|
| |
|
| | def ingest_all(data_dir: str, db_path: str = DB_PATH) -> dict[str, int]:
|
| | """Ingest all code JSON files into a fresh ChromaDB collection."""
|
| | print(f"Creating ChromaDB at {db_path} with embedding model: {EMBEDDING_MODEL}")
|
| | _client, collection = create_collection(db_path, reset=True)
|
| |
|
| | counts: dict[str, int] = {}
|
| | for filename, code_type in CODE_FILES.items():
|
| | json_path = os.path.join(data_dir, filename)
|
| | if os.path.exists(json_path):
|
| | print(f"\nIngesting {filename} as '{code_type}'...")
|
| | count = ingest_json_file(collection, json_path, code_type)
|
| | counts[code_type] = count
|
| | print(f" -> {count} sections ingested")
|
| | else:
|
| | print(f"WARNING: {json_path} not found, skipping.")
|
| |
|
| | total = sum(counts.values())
|
| | print(f"\nIngestion complete. Total: {total} sections across {len(counts)} code types.")
|
| | return counts
|
| |
|
| |
|
| | if __name__ == "__main__":
|
| | data_dir = sys.argv[1] if len(sys.argv) > 1 else os.path.dirname(__file__)
|
| | ingest_all(data_dir)
|
| |
|