Spaces:
Sleeping
Sleeping
| """ | |
| Document Ingestion Pipeline | |
| ---------------------------- | |
| Loads the IJNet knowledge base, applies smart chunking strategies, | |
| embeds documents, and stores them in a FAISS vector store. | |
| Chunking Strategy: | |
| - Structured records (opportunities): kept as single documents with rich metadata | |
| - Unstructured text (articles): split with RecursiveCharacterTextSplitter | |
| using semantic paragraph boundaries | |
| - IJNet info: kept as single documents | |
| Each chunk carries full metadata for source attribution and filtering. | |
| """ | |
| import json | |
| import hashlib | |
| from datetime import datetime | |
| from pathlib import Path | |
| from langchain_core.documents import Document | |
| from langchain_text_splitters import RecursiveCharacterTextSplitter | |
| from langchain_community.vectorstores import FAISS | |
| # --------------------------------------------------------------------------- | |
| # 1. DOCUMENT LOADING | |
| # --------------------------------------------------------------------------- | |
| def load_knowledge_base(path: str = "data/knowledge_base.json") -> dict: | |
| """Load the raw knowledge base JSON.""" | |
| with open(path, "r") as f: | |
| return json.load(f) | |
| # --------------------------------------------------------------------------- | |
| # 2. SMART CHUNKING | |
| # --------------------------------------------------------------------------- | |
| def _opportunity_to_document(opp: dict) -> Document: | |
| """ | |
| Convert a structured opportunity record into a single LangChain Document. | |
| Strategy: Compose a natural-language representation that includes all | |
| searchable fields. This lets semantic search match on any aspect | |
| (region, topic, type, deadline, eligibility) without needing separate | |
| structured queries. | |
| """ | |
| # Build rich text representation | |
| parts = [ | |
| f"Title: {opp['title']}", | |
| f"Type: {opp['type'].capitalize()}", | |
| f"Organization: {opp['organization']}", | |
| f"Description: {opp['description']}", | |
| f"Eligibility: {opp['eligibility']}", | |
| f"Regions: {', '.join(opp['regions'])}", | |
| f"Topics: {', '.join(opp['topics'])}", | |
| f"Deadline: {opp['deadline']}", | |
| f"Duration: {opp['duration']}", | |
| f"Benefits: {opp['benefits']}", | |
| f"Language: {opp['language']}", | |
| ] | |
| text = "\n".join(parts) | |
| metadata = { | |
| "source": opp.get("source_url", ""), | |
| "source_type": "opportunity", | |
| "doc_id": opp["id"], | |
| "title": opp["title"], | |
| "opp_type": opp["type"], | |
| "organization": opp["organization"], | |
| "regions": ", ".join(opp["regions"]), | |
| "topics": ", ".join(opp["topics"]), | |
| "deadline": opp["deadline"], | |
| "language": opp.get("language", "English"), | |
| } | |
| return Document(page_content=text, metadata=metadata) | |
| def _article_to_chunks(article: dict, splitter: RecursiveCharacterTextSplitter) -> list[Document]: | |
| """ | |
| Split an article into semantic chunks, preserving metadata on each. | |
| Strategy: Use paragraph-aware splitting. Articles are longer so we split | |
| them, but keep chunks large enough to preserve context (~400-600 tokens). | |
| """ | |
| full_text = f"Article: {article['title']}\nAuthor: {article.get('author', 'IJNet Staff')}\n\n{article['full_text']}" | |
| base_metadata = { | |
| "source": article.get("source_url", ""), | |
| "source_type": "article", | |
| "doc_id": article["id"], | |
| "title": article["title"], | |
| "author": article.get("author", "IJNet Staff"), | |
| "date": article.get("date", ""), | |
| "section": article.get("section", ""), | |
| "topics": ", ".join(article.get("topics", [])), | |
| } | |
| chunks = splitter.split_text(full_text) | |
| documents = [] | |
| for i, chunk in enumerate(chunks): | |
| meta = {**base_metadata, "chunk_index": i, "total_chunks": len(chunks)} | |
| documents.append(Document(page_content=chunk, metadata=meta)) | |
| return documents | |
| def _ijnet_info_to_document(info: dict) -> list[Document]: | |
| """Convert IJNet organizational info into documents.""" | |
| docs = [] | |
| # About document | |
| about_text = ( | |
| f"About IJNet: {info['about']}\n\n" | |
| f"IJNet is available in these languages: {', '.join(info['languages'])}.\n" | |
| f"Website: {info['website']}" | |
| ) | |
| docs.append(Document( | |
| page_content=about_text, | |
| metadata={ | |
| "source": info["website"], | |
| "source_type": "ijnet_info", | |
| "doc_id": "ijnet-about", | |
| "title": "About IJNet", | |
| } | |
| )) | |
| # Services document | |
| services_text = ( | |
| "IJNet Services and Offerings:\n" + | |
| "\n".join(f"- {s}" for s in info["services"]) | |
| ) | |
| docs.append(Document( | |
| page_content=services_text, | |
| metadata={ | |
| "source": info["website"], | |
| "source_type": "ijnet_info", | |
| "doc_id": "ijnet-services", | |
| "title": "IJNet Services", | |
| } | |
| )) | |
| return docs | |
| def build_documents(kb: dict) -> list[Document]: | |
| """ | |
| Build the full document list from the knowledge base. | |
| Returns documents ready for embedding and indexing. | |
| """ | |
| documents = [] | |
| # --- Opportunities: one doc per opportunity (no splitting) --- | |
| for opp in kb.get("opportunities", []): | |
| documents.append(_opportunity_to_document(opp)) | |
| # --- Articles: split into semantic chunks --- | |
| splitter = RecursiveCharacterTextSplitter( | |
| chunk_size=800, # ~200 tokens — small enough for precise retrieval | |
| chunk_overlap=150, # overlap to preserve context at boundaries | |
| separators=["\n\n", "\n", ". ", ", ", " "], # prefer paragraph boundaries | |
| length_function=len, | |
| ) | |
| for article in kb.get("articles", []): | |
| if article.get("full_text"): | |
| chunks = _article_to_chunks(article, splitter) | |
| documents.extend(chunks) | |
| # --- IJNet Info --- | |
| if kb.get("ijnet_info"): | |
| documents.extend(_ijnet_info_to_document(kb["ijnet_info"])) | |
| return documents | |
| # --------------------------------------------------------------------------- | |
| # 3. EMBEDDING + VECTOR STORE | |
| # --------------------------------------------------------------------------- | |
| from langchain_core.embeddings import Embeddings as EmbeddingsBase | |
| class SimpleEmbeddings(EmbeddingsBase): | |
| """ | |
| Lightweight TF-IDF-based fallback embeddings for environments where | |
| HuggingFace model downloads are blocked. Uses sklearn's TfidfVectorizer | |
| fit on the corpus vocabulary for decent keyword-aware embeddings. | |
| NOT recommended for production — use HuggingFace sentence-transformers | |
| when internet access is available. | |
| """ | |
| dimension: int = 384 | |
| def __init__(self, dimension: int = 384, **kwargs): | |
| super().__init__(**kwargs) | |
| from sklearn.feature_extraction.text import TfidfVectorizer | |
| import numpy as np | |
| self.dimension = dimension | |
| self._vectorizer = TfidfVectorizer(max_features=dimension, stop_words="english") | |
| self._fitted = False | |
| self._np = np | |
| def _ensure_fitted(self, texts: list[str]): | |
| if not self._fitted: | |
| self._vectorizer.fit(texts) | |
| self._fitted = True | |
| def embed_documents(self, texts: list[str]) -> list[list[float]]: | |
| self._ensure_fitted(texts) | |
| matrix = self._vectorizer.transform(texts).toarray() | |
| result = self._np.zeros((len(texts), self.dimension)) | |
| cols = min(matrix.shape[1], self.dimension) | |
| result[:, :cols] = matrix[:, :cols] | |
| norms = self._np.linalg.norm(result, axis=1, keepdims=True) | |
| norms[norms == 0] = 1 | |
| result = result / norms | |
| return result.tolist() | |
| def embed_query(self, text: str) -> list[float]: | |
| return self.embed_documents([text])[0] | |
| def get_embeddings(model_name: str = "sentence-transformers/all-MiniLM-L6-v2", use_fallback: bool = False): | |
| """ | |
| Initialize the embedding model. | |
| Primary: HuggingFace sentence-transformers (requires internet on first run). | |
| Fallback: TF-IDF based embeddings (works fully offline). | |
| Args: | |
| model_name: HuggingFace model name for sentence-transformers | |
| use_fallback: Force using the lightweight TF-IDF fallback | |
| """ | |
| if use_fallback: | |
| print("Using fallback TF-IDF embeddings (offline mode)") | |
| return SimpleEmbeddings() | |
| try: | |
| from langchain_huggingface import HuggingFaceEmbeddings | |
| return HuggingFaceEmbeddings( | |
| model_name=model_name, | |
| model_kwargs={"device": "cpu"}, | |
| encode_kwargs={"normalize_embeddings": True}, | |
| ) | |
| except Exception as e: | |
| print(f"Warning: Could not load HuggingFace embeddings ({e})") | |
| print("Falling back to TF-IDF embeddings.") | |
| return SimpleEmbeddings() | |
| def build_vector_store( | |
| documents: list[Document], | |
| embeddings=None, | |
| save_path: str = "data/faiss_index" | |
| ) -> FAISS: | |
| """ | |
| Build and persist a FAISS vector store from documents. | |
| """ | |
| if embeddings is None: | |
| embeddings = get_embeddings() | |
| print(f"Building vector store with {len(documents)} documents...") | |
| vector_store = FAISS.from_documents(documents, embeddings) | |
| # Persist to disk | |
| vector_store.save_local(save_path) | |
| print(f"Vector store saved to {save_path}") | |
| return vector_store | |
| def load_vector_store( | |
| save_path: str = "data/faiss_index", | |
| embeddings=None | |
| ) -> FAISS: | |
| """Load a persisted FAISS vector store.""" | |
| if embeddings is None: | |
| embeddings = get_embeddings() | |
| return FAISS.load_local( | |
| save_path, embeddings, allow_dangerous_deserialization=True | |
| ) | |
| # --------------------------------------------------------------------------- | |
| # 4. FULL INGESTION PIPELINE | |
| # --------------------------------------------------------------------------- | |
| def run_ingestion( | |
| kb_path: str = "data/knowledge_base.json", | |
| index_path: str = "data/faiss_index", | |
| ) -> FAISS: | |
| """ | |
| Complete ingestion pipeline: load → chunk → embed → store. | |
| """ | |
| print("=" * 50) | |
| print("IJNet Knowledge Base Ingestion Pipeline") | |
| print("=" * 50) | |
| # Load | |
| print("\n[1/4] Loading knowledge base...") | |
| kb = load_knowledge_base(kb_path) | |
| print(f" - {len(kb.get('opportunities', []))} opportunities") | |
| print(f" - {len(kb.get('articles', []))} articles") | |
| # Chunk | |
| print("\n[2/4] Building documents with smart chunking...") | |
| documents = build_documents(kb) | |
| print(f" - {len(documents)} total document chunks") | |
| # Show breakdown | |
| by_type = {} | |
| for doc in documents: | |
| t = doc.metadata.get("source_type", "unknown") | |
| by_type[t] = by_type.get(t, 0) + 1 | |
| for t, count in by_type.items(): | |
| print(f" • {t}: {count} chunks") | |
| # Embed + Store | |
| print("\n[3/4] Initializing embedding model...") | |
| embeddings = get_embeddings() | |
| print("\n[4/4] Building FAISS vector store...") | |
| vector_store = build_vector_store(documents, embeddings, index_path) | |
| print("\n✓ Ingestion complete!") | |
| return vector_store | |
| if __name__ == "__main__": | |
| run_ingestion() | |