import os import uuid from typing import List, Dict, Any from dotenv import load_dotenv from chunker import chunk_text from llm import LLMProvider from pinecone_client import PineconeClient try: from pypdf import PdfReader except Exception: # pragma: no cover PdfReader = None load_dotenv() def read_txt(path: str) -> str: with open(path, "r", encoding="utf-8", errors="ignore") as f: return f.read() def read_pdf(path: str) -> str: if PdfReader is None: raise RuntimeError("pypdf is not installed. Please install pypdf to read PDFs.") reader = PdfReader(path) texts: List[str] = [] for page in reader.pages: texts.append(page.extract_text() or "") return "\n".join(texts) def load_documents(data_dir: str) -> List[Dict[str, Any]]: docs: List[Dict[str, Any]] = [] for root, _, files in os.walk(data_dir): for name in files: path = os.path.join(root, name) ext = os.path.splitext(name)[1].lower() try: if ext in [".txt", ".md", ".log"]: text = read_txt(path) elif ext in [".pdf"]: text = read_pdf(path) else: continue if text and text.strip(): docs.append({"path": path, "text": text}) except Exception as e: # skip problematic files print(f"[warn] Failed to read {path}: {e}") return docs def ingest(data_dir: str = None, chunk_size: int = None, chunk_overlap: int = None) -> int: data_dir = data_dir or os.getenv("DATA_DIR", "./data") chunk_size = int(chunk_size or os.getenv("CHUNK_SIZE", 800)) chunk_overlap = int(chunk_overlap or os.getenv("CHUNK_OVERLAP", 120)) os.makedirs(data_dir, exist_ok=True) docs = load_documents(data_dir) if not docs: print(f"No documents found in {data_dir}") return 0 llm = LLMProvider() pc = PineconeClient() # Ensure index exists based on embedding dimension test_vec = llm.embed_texts(["dimension probe"])[0] pc.ensure_index(dimension=len(test_vec)) total_chunks = 0 batch: List[Dict[str, Any]] = [] for doc in docs: path = doc["path"] chunks = chunk_text(doc["text"], chunk_size=chunk_size, chunk_overlap=chunk_overlap) embeddings = llm.embed_texts(chunks) for i, (text, vec) in enumerate(zip(chunks, embeddings)): total_chunks += 1 item = { "id": str(uuid.uuid4()), "values": vec, "metadata": { "text": text, "source": path, "chunk": i, }, } batch.append(item) if len(batch) >= 100: pc.upsert_embeddings(batch) batch = [] if batch: pc.upsert_embeddings(batch) print(f"Ingested {total_chunks} chunks from {len(docs)} documents.") return total_chunks if __name__ == "__main__": ingest()