mini-rag / ingest.py
navyamehta's picture
Upload 11 files
33f5651 verified
import os
import uuid
from typing import List, Dict, Any
from dotenv import load_dotenv
from chunker import chunk_text
from llm import LLMProvider
from pinecone_client import PineconeClient
try:
from pypdf import PdfReader
except Exception: # pragma: no cover
PdfReader = None
load_dotenv()
def read_txt(path: str) -> str:
with open(path, "r", encoding="utf-8", errors="ignore") as f:
return f.read()
def read_pdf(path: str) -> str:
if PdfReader is None:
raise RuntimeError("pypdf is not installed. Please install pypdf to read PDFs.")
reader = PdfReader(path)
texts: List[str] = []
for page in reader.pages:
texts.append(page.extract_text() or "")
return "\n".join(texts)
def load_documents(data_dir: str) -> List[Dict[str, Any]]:
docs: List[Dict[str, Any]] = []
for root, _, files in os.walk(data_dir):
for name in files:
path = os.path.join(root, name)
ext = os.path.splitext(name)[1].lower()
try:
if ext in [".txt", ".md", ".log"]:
text = read_txt(path)
elif ext in [".pdf"]:
text = read_pdf(path)
else:
continue
if text and text.strip():
docs.append({"path": path, "text": text})
except Exception as e: # skip problematic files
print(f"[warn] Failed to read {path}: {e}")
return docs
def ingest(data_dir: str = None, chunk_size: int = None, chunk_overlap: int = None) -> int:
data_dir = data_dir or os.getenv("DATA_DIR", "./data")
chunk_size = int(chunk_size or os.getenv("CHUNK_SIZE", 800))
chunk_overlap = int(chunk_overlap or os.getenv("CHUNK_OVERLAP", 120))
os.makedirs(data_dir, exist_ok=True)
docs = load_documents(data_dir)
if not docs:
print(f"No documents found in {data_dir}")
return 0
llm = LLMProvider()
pc = PineconeClient()
# Ensure index exists based on embedding dimension
test_vec = llm.embed_texts(["dimension probe"])[0]
pc.ensure_index(dimension=len(test_vec))
total_chunks = 0
batch: List[Dict[str, Any]] = []
for doc in docs:
path = doc["path"]
chunks = chunk_text(doc["text"], chunk_size=chunk_size, chunk_overlap=chunk_overlap)
embeddings = llm.embed_texts(chunks)
for i, (text, vec) in enumerate(zip(chunks, embeddings)):
total_chunks += 1
item = {
"id": str(uuid.uuid4()),
"values": vec,
"metadata": {
"text": text,
"source": path,
"chunk": i,
},
}
batch.append(item)
if len(batch) >= 100:
pc.upsert_embeddings(batch)
batch = []
if batch:
pc.upsert_embeddings(batch)
print(f"Ingested {total_chunks} chunks from {len(docs)} documents.")
return total_chunks
if __name__ == "__main__":
ingest()