Spaces:
Sleeping
Sleeping
| import os | |
| import json | |
| import glob | |
| from dataclasses import asdict | |
| from qdrant_client import QdrantClient | |
| from qdrant_client.http import models | |
| from core.chunking import HybridChunker | |
| from utils.config import settings | |
| class IngestionService: | |
| def __init__(self): | |
| print("Initializing Ingestion Service...") | |
| self.client = QdrantClient(host=settings.QDRANT_HOST, port=settings.QDRANT_PORT) | |
| self.chunker = HybridChunker() | |
| self.collection_name = "constitution_amendments" | |
| def ensure_collection(self): | |
| if self.client.collection_exists(self.collection_name): | |
| print(f"Collection '{self.collection_name}' exists. Deleting for fresh start...") | |
| self.client.delete_collection(self.collection_name) | |
| print(f"Creating collection '{self.collection_name}'...") | |
| self.client.create_collection( | |
| collection_name=self.collection_name, | |
| vectors_config=models.VectorParams( | |
| size=384, # BAAI/bge-small-en-v1.5 dimension is 384 | |
| distance=models.Distance.COSINE | |
| ) | |
| ) | |
| def ingest_all(self): | |
| self.ensure_collection() | |
| # Scan for files | |
| root_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) | |
| extracted_dir = os.path.join(root_dir, "extracted_data") | |
| all_files = glob.glob(os.path.join(extracted_dir, "**", "*.json"), recursive=True) | |
| all_files = sorted(all_files) | |
| print(f"Found {len(all_files)} files to ingest.") | |
| batch_size = 100 | |
| points = [] | |
| total_chunks = 0 | |
| for i, file_path in enumerate(all_files): | |
| try: | |
| with open(file_path, 'r', encoding='utf-8') as f: | |
| data = json.load(f) | |
| file_chunks = self.chunker.chunk(data) | |
| for chunk in file_chunks: | |
| vector = self.chunker.model.encode(chunk.text).tolist() | |
| points.append(models.PointStruct( | |
| id=total_chunks, | |
| vector=vector, | |
| payload={ | |
| "text": chunk.text, | |
| "chunk_id": chunk.id, | |
| **chunk.metadata | |
| } | |
| )) | |
| total_chunks += 1 | |
| except Exception as e: | |
| print(f"Error processing {file_path}: {e}") | |
| continue | |
| if len(points) >= batch_size: | |
| self.client.upsert( | |
| collection_name=self.collection_name, | |
| points=points | |
| ) | |
| print(f"Uploaded batch of {len(points)} points. (Total: {total_chunks})") | |
| points = [] | |
| if points: | |
| self.client.upsert( | |
| collection_name=self.collection_name, | |
| points=points | |
| ) | |
| print(f"Uploaded final batch of {len(points)} points.") | |
| print(f"\nIngestion Complete! Total {total_chunks} chunks indexed.") | |
| if __name__ == "__main__": | |
| service = IngestionService() | |
| service.ingest_all() | |