Spaces:
Sleeping
Sleeping
| import os | |
| import uuid | |
| from typing import List, Dict, Any | |
| from dotenv import load_dotenv | |
| from chunker import chunk_text | |
| from llm import LLMProvider | |
| from pinecone_client import PineconeClient | |
| try: | |
| from pypdf import PdfReader | |
| except Exception: # pragma: no cover | |
| PdfReader = None | |
| load_dotenv() | |
| def read_txt(path: str) -> str: | |
| with open(path, "r", encoding="utf-8", errors="ignore") as f: | |
| return f.read() | |
| def read_pdf(path: str) -> str: | |
| if PdfReader is None: | |
| raise RuntimeError("pypdf is not installed. Please install pypdf to read PDFs.") | |
| reader = PdfReader(path) | |
| texts: List[str] = [] | |
| for page in reader.pages: | |
| texts.append(page.extract_text() or "") | |
| return "\n".join(texts) | |
| def load_documents(data_dir: str) -> List[Dict[str, Any]]: | |
| docs: List[Dict[str, Any]] = [] | |
| for root, _, files in os.walk(data_dir): | |
| for name in files: | |
| path = os.path.join(root, name) | |
| ext = os.path.splitext(name)[1].lower() | |
| try: | |
| if ext in [".txt", ".md", ".log"]: | |
| text = read_txt(path) | |
| elif ext in [".pdf"]: | |
| text = read_pdf(path) | |
| else: | |
| continue | |
| if text and text.strip(): | |
| docs.append({"path": path, "text": text}) | |
| except Exception as e: # skip problematic files | |
| print(f"[warn] Failed to read {path}: {e}") | |
| return docs | |
| def ingest(data_dir: str = None, chunk_size: int = None, chunk_overlap: int = None) -> int: | |
| data_dir = data_dir or os.getenv("DATA_DIR", "./data") | |
| chunk_size = int(chunk_size or os.getenv("CHUNK_SIZE", 800)) | |
| chunk_overlap = int(chunk_overlap or os.getenv("CHUNK_OVERLAP", 120)) | |
| os.makedirs(data_dir, exist_ok=True) | |
| docs = load_documents(data_dir) | |
| if not docs: | |
| print(f"No documents found in {data_dir}") | |
| return 0 | |
| llm = LLMProvider() | |
| pc = PineconeClient() | |
| # Ensure index exists based on embedding dimension | |
| test_vec = llm.embed_texts(["dimension probe"])[0] | |
| pc.ensure_index(dimension=len(test_vec)) | |
| total_chunks = 0 | |
| batch: List[Dict[str, Any]] = [] | |
| for doc in docs: | |
| path = doc["path"] | |
| chunks = chunk_text(doc["text"], chunk_size=chunk_size, chunk_overlap=chunk_overlap) | |
| embeddings = llm.embed_texts(chunks) | |
| for i, (text, vec) in enumerate(zip(chunks, embeddings)): | |
| total_chunks += 1 | |
| item = { | |
| "id": str(uuid.uuid4()), | |
| "values": vec, | |
| "metadata": { | |
| "text": text, | |
| "source": path, | |
| "chunk": i, | |
| }, | |
| } | |
| batch.append(item) | |
| if len(batch) >= 100: | |
| pc.upsert_embeddings(batch) | |
| batch = [] | |
| if batch: | |
| pc.upsert_embeddings(batch) | |
| print(f"Ingested {total_chunks} chunks from {len(docs)} documents.") | |
| return total_chunks | |
| if __name__ == "__main__": | |
| ingest() | |