ConstitutionAgent / core /ingestion.py
Meshyboi's picture
Upload 53 files
0cd3dc5 verified
import os
import json
import glob
from dataclasses import asdict
from qdrant_client import QdrantClient
from qdrant_client.http import models
from core.chunking import HybridChunker
from utils.config import settings
class IngestionService:
def __init__(self):
print("Initializing Ingestion Service...")
self.client = QdrantClient(host=settings.QDRANT_HOST, port=settings.QDRANT_PORT)
self.chunker = HybridChunker()
self.collection_name = "constitution_amendments"
def ensure_collection(self):
if self.client.collection_exists(self.collection_name):
print(f"Collection '{self.collection_name}' exists. Deleting for fresh start...")
self.client.delete_collection(self.collection_name)
print(f"Creating collection '{self.collection_name}'...")
self.client.create_collection(
collection_name=self.collection_name,
vectors_config=models.VectorParams(
size=384, # BAAI/bge-small-en-v1.5 dimension is 384
distance=models.Distance.COSINE
)
)
def ingest_all(self):
self.ensure_collection()
# Scan for files
root_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
extracted_dir = os.path.join(root_dir, "extracted_data")
all_files = glob.glob(os.path.join(extracted_dir, "**", "*.json"), recursive=True)
all_files = sorted(all_files)
print(f"Found {len(all_files)} files to ingest.")
batch_size = 100
points = []
total_chunks = 0
for i, file_path in enumerate(all_files):
try:
with open(file_path, 'r', encoding='utf-8') as f:
data = json.load(f)
file_chunks = self.chunker.chunk(data)
for chunk in file_chunks:
vector = self.chunker.model.encode(chunk.text).tolist()
points.append(models.PointStruct(
id=total_chunks,
vector=vector,
payload={
"text": chunk.text,
"chunk_id": chunk.id,
**chunk.metadata
}
))
total_chunks += 1
except Exception as e:
print(f"Error processing {file_path}: {e}")
continue
if len(points) >= batch_size:
self.client.upsert(
collection_name=self.collection_name,
points=points
)
print(f"Uploaded batch of {len(points)} points. (Total: {total_chunks})")
points = []
if points:
self.client.upsert(
collection_name=self.collection_name,
points=points
)
print(f"Uploaded final batch of {len(points)} points.")
print(f"\nIngestion Complete! Total {total_chunks} chunks indexed.")
if __name__ == "__main__":
service = IngestionService()
service.ingest_all()