| | from langchain_community.document_loaders import PDFMinerLoader,TextLoader |
| | from langchain_text_splitters import RecursiveCharacterTextSplitter |
| | from langchain_milvus import Milvus |
| | from langchain_core.documents import Document |
| | from langchain_openai import ChatOpenAI |
| | from dotenv import load_dotenv, find_dotenv |
| | from typing import List |
| | import uuid |
| |
|
| | from .index import MetaData |
| | from .utils import mask_pii |
| |
|
| | find_dotenv() |
| | load_dotenv() |
| |
|
| | model = ChatOpenAI(model="gpt-5-nano") |
| |
|
| |
|
| | def load_documents(file_paths: List[str]): |
| | """Ingest files into vectorstore after processing and chunking.""" |
| | documents: list[Document] = [] |
| | for file_path in file_paths: |
| | if file_path.endswith(".txt"): |
| | docs = TextLoader(file_path, encoding="utf-8").load() |
| | elif file_path.endswith(".pdf"): |
| | docs = PDFMinerLoader(file_path).load() |
| | else: |
| | print(f"Unsupported file format: {file_path}") |
| | continue |
| | documents.extend(docs) |
| |
|
| | print(f"loaded {len(documents)} documents from {len(file_paths)} files.") |
| | return documents |
| |
|
| | def get_chunks(documents: List[Document], metadata: MetaData): |
| | """Split documents into chunks and mask PII.""" |
| | text_splitter = RecursiveCharacterTextSplitter( |
| | chunk_size=1200, |
| | chunk_overlap=200, |
| | add_start_index=True, |
| | ) |
| | chunks = text_splitter.split_documents(documents) |
| | print(f"generated {len(chunks)} chunks.") |
| |
|
| | doc_id = str(uuid.uuid4()) |
| | chunks = [ |
| | Document( |
| | page_content=mask_pii(chunk.page_content), |
| | metadata={ |
| | "doc_id": doc_id, |
| | "chunk_id": str(uuid.uuid4()), |
| | "source_name": chunk.metadata.get("source",'Not Available').split("/")[-1], |
| | "start_index": chunk.metadata.get("start_index",0), |
| | **metadata.model_dump(), |
| | }, |
| | ) |
| | for chunk in chunks |
| | ] |
| | return chunks |
| |
|
| |
|
| | def ingest_documents(docs: List[Document], vectorstore:Milvus): |
| | """Ingest documents into the specified vectorstore collection.""" |
| | |
| | ids = [str(uuid.uuid4()) for _ in range(len(docs))] |
| | vectorstore.add_documents(docs, ids=ids) |
| | success_message = f"Ingested {len(docs)} documents into {vectorstore.collection_name} index." |
| | print(success_message) |
| | return success_message |
| |
|
| |
|