""" Document Ingestion Pipeline ---------------------------- Loads the IJNet knowledge base, applies smart chunking strategies, embeds documents, and stores them in a FAISS vector store. Chunking Strategy: - Structured records (opportunities): kept as single documents with rich metadata - Unstructured text (articles): split with RecursiveCharacterTextSplitter using semantic paragraph boundaries - IJNet info: kept as single documents Each chunk carries full metadata for source attribution and filtering. """ import json import hashlib from datetime import datetime from pathlib import Path from langchain_core.documents import Document from langchain_text_splitters import RecursiveCharacterTextSplitter from langchain_community.vectorstores import FAISS # --------------------------------------------------------------------------- # 1. DOCUMENT LOADING # --------------------------------------------------------------------------- def load_knowledge_base(path: str = "data/knowledge_base.json") -> dict: """Load the raw knowledge base JSON.""" with open(path, "r") as f: return json.load(f) # --------------------------------------------------------------------------- # 2. SMART CHUNKING # --------------------------------------------------------------------------- def _opportunity_to_document(opp: dict) -> Document: """ Convert a structured opportunity record into a single LangChain Document. Strategy: Compose a natural-language representation that includes all searchable fields. This lets semantic search match on any aspect (region, topic, type, deadline, eligibility) without needing separate structured queries. """ # Build rich text representation parts = [ f"Title: {opp['title']}", f"Type: {opp['type'].capitalize()}", f"Organization: {opp['organization']}", f"Description: {opp['description']}", f"Eligibility: {opp['eligibility']}", f"Regions: {', '.join(opp['regions'])}", f"Topics: {', '.join(opp['topics'])}", f"Deadline: {opp['deadline']}", f"Duration: {opp['duration']}", f"Benefits: {opp['benefits']}", f"Language: {opp['language']}", ] text = "\n".join(parts) metadata = { "source": opp.get("source_url", ""), "source_type": "opportunity", "doc_id": opp["id"], "title": opp["title"], "opp_type": opp["type"], "organization": opp["organization"], "regions": ", ".join(opp["regions"]), "topics": ", ".join(opp["topics"]), "deadline": opp["deadline"], "language": opp.get("language", "English"), } return Document(page_content=text, metadata=metadata) def _article_to_chunks(article: dict, splitter: RecursiveCharacterTextSplitter) -> list[Document]: """ Split an article into semantic chunks, preserving metadata on each. Strategy: Use paragraph-aware splitting. Articles are longer so we split them, but keep chunks large enough to preserve context (~400-600 tokens). """ full_text = f"Article: {article['title']}\nAuthor: {article.get('author', 'IJNet Staff')}\n\n{article['full_text']}" base_metadata = { "source": article.get("source_url", ""), "source_type": "article", "doc_id": article["id"], "title": article["title"], "author": article.get("author", "IJNet Staff"), "date": article.get("date", ""), "section": article.get("section", ""), "topics": ", ".join(article.get("topics", [])), } chunks = splitter.split_text(full_text) documents = [] for i, chunk in enumerate(chunks): meta = {**base_metadata, "chunk_index": i, "total_chunks": len(chunks)} documents.append(Document(page_content=chunk, metadata=meta)) return documents def _ijnet_info_to_document(info: dict) -> list[Document]: """Convert IJNet organizational info into documents.""" docs = [] # About document about_text = ( f"About IJNet: {info['about']}\n\n" f"IJNet is available in these languages: {', '.join(info['languages'])}.\n" f"Website: {info['website']}" ) docs.append(Document( page_content=about_text, metadata={ "source": info["website"], "source_type": "ijnet_info", "doc_id": "ijnet-about", "title": "About IJNet", } )) # Services document services_text = ( "IJNet Services and Offerings:\n" + "\n".join(f"- {s}" for s in info["services"]) ) docs.append(Document( page_content=services_text, metadata={ "source": info["website"], "source_type": "ijnet_info", "doc_id": "ijnet-services", "title": "IJNet Services", } )) return docs def build_documents(kb: dict) -> list[Document]: """ Build the full document list from the knowledge base. Returns documents ready for embedding and indexing. """ documents = [] # --- Opportunities: one doc per opportunity (no splitting) --- for opp in kb.get("opportunities", []): documents.append(_opportunity_to_document(opp)) # --- Articles: split into semantic chunks --- splitter = RecursiveCharacterTextSplitter( chunk_size=800, # ~200 tokens — small enough for precise retrieval chunk_overlap=150, # overlap to preserve context at boundaries separators=["\n\n", "\n", ". ", ", ", " "], # prefer paragraph boundaries length_function=len, ) for article in kb.get("articles", []): if article.get("full_text"): chunks = _article_to_chunks(article, splitter) documents.extend(chunks) # --- IJNet Info --- if kb.get("ijnet_info"): documents.extend(_ijnet_info_to_document(kb["ijnet_info"])) return documents # --------------------------------------------------------------------------- # 3. EMBEDDING + VECTOR STORE # --------------------------------------------------------------------------- from langchain_core.embeddings import Embeddings as EmbeddingsBase class SimpleEmbeddings(EmbeddingsBase): """ Lightweight TF-IDF-based fallback embeddings for environments where HuggingFace model downloads are blocked. Uses sklearn's TfidfVectorizer fit on the corpus vocabulary for decent keyword-aware embeddings. NOT recommended for production — use HuggingFace sentence-transformers when internet access is available. """ dimension: int = 384 def __init__(self, dimension: int = 384, **kwargs): super().__init__(**kwargs) from sklearn.feature_extraction.text import TfidfVectorizer import numpy as np self.dimension = dimension self._vectorizer = TfidfVectorizer(max_features=dimension, stop_words="english") self._fitted = False self._np = np def _ensure_fitted(self, texts: list[str]): if not self._fitted: self._vectorizer.fit(texts) self._fitted = True def embed_documents(self, texts: list[str]) -> list[list[float]]: self._ensure_fitted(texts) matrix = self._vectorizer.transform(texts).toarray() result = self._np.zeros((len(texts), self.dimension)) cols = min(matrix.shape[1], self.dimension) result[:, :cols] = matrix[:, :cols] norms = self._np.linalg.norm(result, axis=1, keepdims=True) norms[norms == 0] = 1 result = result / norms return result.tolist() def embed_query(self, text: str) -> list[float]: return self.embed_documents([text])[0] def get_embeddings(model_name: str = "sentence-transformers/all-MiniLM-L6-v2", use_fallback: bool = False): """ Initialize the embedding model. Primary: HuggingFace sentence-transformers (requires internet on first run). Fallback: TF-IDF based embeddings (works fully offline). Args: model_name: HuggingFace model name for sentence-transformers use_fallback: Force using the lightweight TF-IDF fallback """ if use_fallback: print("Using fallback TF-IDF embeddings (offline mode)") return SimpleEmbeddings() try: from langchain_huggingface import HuggingFaceEmbeddings return HuggingFaceEmbeddings( model_name=model_name, model_kwargs={"device": "cpu"}, encode_kwargs={"normalize_embeddings": True}, ) except Exception as e: print(f"Warning: Could not load HuggingFace embeddings ({e})") print("Falling back to TF-IDF embeddings.") return SimpleEmbeddings() def build_vector_store( documents: list[Document], embeddings=None, save_path: str = "data/faiss_index" ) -> FAISS: """ Build and persist a FAISS vector store from documents. """ if embeddings is None: embeddings = get_embeddings() print(f"Building vector store with {len(documents)} documents...") vector_store = FAISS.from_documents(documents, embeddings) # Persist to disk vector_store.save_local(save_path) print(f"Vector store saved to {save_path}") return vector_store def load_vector_store( save_path: str = "data/faiss_index", embeddings=None ) -> FAISS: """Load a persisted FAISS vector store.""" if embeddings is None: embeddings = get_embeddings() return FAISS.load_local( save_path, embeddings, allow_dangerous_deserialization=True ) # --------------------------------------------------------------------------- # 4. FULL INGESTION PIPELINE # --------------------------------------------------------------------------- def run_ingestion( kb_path: str = "data/knowledge_base.json", index_path: str = "data/faiss_index", ) -> FAISS: """ Complete ingestion pipeline: load → chunk → embed → store. """ print("=" * 50) print("IJNet Knowledge Base Ingestion Pipeline") print("=" * 50) # Load print("\n[1/4] Loading knowledge base...") kb = load_knowledge_base(kb_path) print(f" - {len(kb.get('opportunities', []))} opportunities") print(f" - {len(kb.get('articles', []))} articles") # Chunk print("\n[2/4] Building documents with smart chunking...") documents = build_documents(kb) print(f" - {len(documents)} total document chunks") # Show breakdown by_type = {} for doc in documents: t = doc.metadata.get("source_type", "unknown") by_type[t] = by_type.get(t, 0) + 1 for t, count in by_type.items(): print(f" • {t}: {count} chunks") # Embed + Store print("\n[3/4] Initializing embedding model...") embeddings = get_embeddings() print("\n[4/4] Building FAISS vector store...") vector_store = build_vector_store(documents, embeddings, index_path) print("\n✓ Ingestion complete!") return vector_store if __name__ == "__main__": run_ingestion()