Updated_code_complaince / data /ingest_chromadb.py
Ryan2219's picture
Upload 70 files
e1ced8e verified
"""Ingest preprocessed NYC code JSON files into ChromaDB with bge-large-en-v1.5."""
from __future__ import annotations
import json
import os
import sys
import chromadb
from chromadb.utils import embedding_functions
EMBEDDING_MODEL = "BAAI/bge-large-en-v1.5"
COLLECTION_NAME = "nyc_building_codes"
DB_PATH = os.path.join(os.path.dirname(__file__), "nyc_code_db")
# Map of JSON files to their code types
CODE_FILES = {
"BUILDING_CODE.json": "Building",
"FUEL_GAS_CODE.json": "FuelGas",
"GENERAL_ADMINISTRATIVE_PROVISIONS.json": "Administrative",
"MECHANICAL_CODE.json": "Mechanical",
"PLUMBING_CODE.json": "Plumbing",
}
def create_collection(db_path: str = DB_PATH, reset: bool = True):
"""Create or reset the ChromaDB collection."""
client = chromadb.PersistentClient(path=db_path)
embedding_fn = embedding_functions.SentenceTransformerEmbeddingFunction(
model_name=EMBEDDING_MODEL,
)
if reset:
try:
client.delete_collection(name=COLLECTION_NAME)
print(f"Deleted existing collection '{COLLECTION_NAME}'.")
except Exception:
pass
collection = client.create_collection(
name=COLLECTION_NAME,
embedding_function=embedding_fn,
)
return client, collection
def ingest_json_file(collection, json_path: str, code_type: str) -> int:
"""Ingest a single JSON file into the collection. Returns count of sections added."""
with open(json_path, "r", encoding="utf-8") as f:
data = json.load(f)
documents = []
metadatas = []
ids = []
seen_ids: set[str] = set()
for entry in data:
meta = entry["metadata"]
# Ensure code_type is set (should already be from preprocessing)
meta["code_type"] = code_type
unique_id = f"{code_type}_{entry['id']}"
if unique_id in seen_ids:
continue
# Flatten list-type metadata for ChromaDB (only supports str/int/float/bool)
flat_meta = {}
for k, v in meta.items():
if isinstance(v, list):
flat_meta[k] = ", ".join(str(x) for x in v) if v else ""
elif isinstance(v, bool):
flat_meta[k] = v
elif isinstance(v, (int, float)):
flat_meta[k] = v
else:
flat_meta[k] = str(v)
documents.append(entry["text"])
metadatas.append(flat_meta)
ids.append(unique_id)
seen_ids.add(unique_id)
# Batch upsert
batch_size = 200 # Smaller batches for larger embeddings
for i in range(0, len(documents), batch_size):
batch_end = min(i + batch_size, len(documents))
collection.upsert(
documents=documents[i:batch_end],
metadatas=metadatas[i:batch_end],
ids=ids[i:batch_end],
)
print(f" Batch {i // batch_size + 1}: upserted {batch_end - i} sections")
return len(ids)
def ingest_all(data_dir: str, db_path: str = DB_PATH) -> dict[str, int]:
"""Ingest all code JSON files into a fresh ChromaDB collection."""
print(f"Creating ChromaDB at {db_path} with embedding model: {EMBEDDING_MODEL}")
_client, collection = create_collection(db_path, reset=True)
counts: dict[str, int] = {}
for filename, code_type in CODE_FILES.items():
json_path = os.path.join(data_dir, filename)
if os.path.exists(json_path):
print(f"\nIngesting {filename} as '{code_type}'...")
count = ingest_json_file(collection, json_path, code_type)
counts[code_type] = count
print(f" -> {count} sections ingested")
else:
print(f"WARNING: {json_path} not found, skipping.")
total = sum(counts.values())
print(f"\nIngestion complete. Total: {total} sections across {len(counts)} code types.")
return counts
if __name__ == "__main__":
data_dir = sys.argv[1] if len(sys.argv) > 1 else os.path.dirname(__file__)
ingest_all(data_dir)