|
|
import json |
|
|
from copy import deepcopy |
|
|
|
|
|
from dotenv import find_dotenv, load_dotenv |
|
|
from llama_index.core import StorageContext, VectorStoreIndex |
|
|
from llama_index.core.node_parser import SentenceSplitter |
|
|
from llama_index.core.schema import Document |
|
|
from llama_index.embeddings.huggingface import HuggingFaceEmbedding |
|
|
from llama_index.vector_stores.milvus import MilvusVectorStore |
|
|
from llama_index.vector_stores.milvus.utils import BGEM3SparseEmbeddingFunction |
|
|
|
|
|
from src.agent_hackathon.consts import PROJECT_ROOT_DIR |
|
|
from src.agent_hackathon.logger import get_logger |
|
|
|
|
|
logger = get_logger(log_name="create_vector_db", log_dir=PROJECT_ROOT_DIR / "logs") |
|
|
|
|
|
|
|
|
class VectorDBCreator: |
|
|
"""Handles creation of a Milvus vector database from arXiv data.""" |
|
|
|
|
|
def __init__( |
|
|
self, |
|
|
data_path: str, |
|
|
db_uri: str, |
|
|
embedding_model: str = "Qwen/Qwen3-Embedding-0.6B", |
|
|
chunk_size: int = 20_000, |
|
|
chunk_overlap: int = 0, |
|
|
vector_dim: int = 1024, |
|
|
insert_batch_size: int = 8192, |
|
|
) -> None: |
|
|
""" |
|
|
Initialize the VectorDBCreator. |
|
|
|
|
|
Args: |
|
|
data_path: Path to the JSON data file. |
|
|
db_uri: URI for the Milvus database. |
|
|
embedding_model: Name of the embedding model. |
|
|
chunk_size: Size of text chunks for splitting. |
|
|
chunk_overlap: Overlap between text chunks. |
|
|
vector_dim: Dimension of the embedding vectors. |
|
|
insert_batch_size: Batch size for insertion. |
|
|
""" |
|
|
self.data_path = data_path |
|
|
self.db_uri = db_uri |
|
|
self.embedding_model = embedding_model |
|
|
self.chunk_size = chunk_size |
|
|
self.chunk_overlap = chunk_overlap |
|
|
self.vector_dim = vector_dim |
|
|
self.insert_batch_size = insert_batch_size |
|
|
self.embed_model = HuggingFaceEmbedding( |
|
|
model_name=self.embedding_model, device="cpu" |
|
|
) |
|
|
self.sent_splitter = SentenceSplitter( |
|
|
chunk_size=self.chunk_size, chunk_overlap=self.chunk_overlap |
|
|
) |
|
|
logger.info("VectorDBCreator initialized.") |
|
|
|
|
|
def load_data(self) -> list[dict]: |
|
|
""" |
|
|
Load and return data from the JSON file. |
|
|
|
|
|
Returns: |
|
|
List of dictionaries containing arXiv data. |
|
|
""" |
|
|
logger.info(f"Loading data from {self.data_path}") |
|
|
with open(file=self.data_path) as f: |
|
|
data = json.load(fp=f) |
|
|
logger.info("Data loaded successfully.") |
|
|
return deepcopy(x=data) |
|
|
|
|
|
def prepare_documents(self, data: list[dict]) -> list[Document]: |
|
|
""" |
|
|
Convert raw data into a list of Document objects. |
|
|
|
|
|
Args: |
|
|
data: List of dictionaries with arXiv data. |
|
|
|
|
|
Returns: |
|
|
List of Document objects. |
|
|
""" |
|
|
logger.info("Preparing documents from data.") |
|
|
docs = [Document(text=d.pop("abstract"), metadata=d) for d in data] |
|
|
logger.info(f"Prepared {len(docs)} documents.") |
|
|
return docs |
|
|
|
|
|
def create_vector_store(self) -> MilvusVectorStore: |
|
|
""" |
|
|
Create and return a MilvusVectorStore instance. |
|
|
|
|
|
Returns: |
|
|
Configured MilvusVectorStore. |
|
|
""" |
|
|
logger.info(f"Creating MilvusVectorStore at {self.db_uri}") |
|
|
store = MilvusVectorStore( |
|
|
uri=self.db_uri, |
|
|
dim=self.vector_dim, |
|
|
enable_sparse=True, |
|
|
sparse_embedding_function=BGEM3SparseEmbeddingFunction(), |
|
|
) |
|
|
logger.info("MilvusVectorStore created.") |
|
|
return store |
|
|
|
|
|
def build_index( |
|
|
self, docs_list: list[Document], vector_store: MilvusVectorStore |
|
|
) -> VectorStoreIndex: |
|
|
""" |
|
|
Build and return a VectorStoreIndex from documents. |
|
|
|
|
|
Args: |
|
|
docs_list: List of Document objects. |
|
|
vector_store: MilvusVectorStore instance. |
|
|
|
|
|
Returns: |
|
|
VectorStoreIndex object. |
|
|
""" |
|
|
logger.info("Building VectorStoreIndex.") |
|
|
storage_context = StorageContext.from_defaults(vector_store=vector_store) |
|
|
index = VectorStoreIndex.from_documents( |
|
|
documents=docs_list, |
|
|
storage_context=storage_context, |
|
|
embed_model=self.embed_model, |
|
|
transformations=[self.sent_splitter], |
|
|
show_progress=True, |
|
|
insert_batch_size=self.insert_batch_size, |
|
|
) |
|
|
logger.info("VectorStoreIndex built.") |
|
|
return index |
|
|
|
|
|
def run(self) -> None: |
|
|
""" |
|
|
Execute the full pipeline: load data, prepare documents, create vector store, and build index. |
|
|
""" |
|
|
logger.info("Running full vector DB creation pipeline.") |
|
|
data = self.load_data() |
|
|
docs_list = self.prepare_documents(data=data) |
|
|
vector_store = self.create_vector_store() |
|
|
self.build_index(docs_list=docs_list, vector_store=vector_store) |
|
|
logger.info("Pipeline finished.") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|