IJNet-assistant / src /ingest.py
Mohammad Haris
Deploy IJNet assistant
b87aca1
Raw
History Blame Contribute Delete
11.2 kB
"""
Document Ingestion Pipeline
----------------------------
Loads the IJNet knowledge base, applies smart chunking strategies,
embeds documents, and stores them in a FAISS vector store.
Chunking Strategy:
- Structured records (opportunities): kept as single documents with rich metadata
- Unstructured text (articles): split with RecursiveCharacterTextSplitter
using semantic paragraph boundaries
- IJNet info: kept as single documents
Each chunk carries full metadata for source attribution and filtering.
"""
import json
import hashlib
from datetime import datetime
from pathlib import Path
from langchain_core.documents import Document
from langchain_text_splitters import RecursiveCharacterTextSplitter
from langchain_community.vectorstores import FAISS
# ---------------------------------------------------------------------------
# 1. DOCUMENT LOADING
# ---------------------------------------------------------------------------
def load_knowledge_base(path: str = "data/knowledge_base.json") -> dict:
"""Load the raw knowledge base JSON."""
with open(path, "r") as f:
return json.load(f)
# ---------------------------------------------------------------------------
# 2. SMART CHUNKING
# ---------------------------------------------------------------------------
def _opportunity_to_document(opp: dict) -> Document:
"""
Convert a structured opportunity record into a single LangChain Document.
Strategy: Compose a natural-language representation that includes all
searchable fields. This lets semantic search match on any aspect
(region, topic, type, deadline, eligibility) without needing separate
structured queries.
"""
# Build rich text representation
parts = [
f"Title: {opp['title']}",
f"Type: {opp['type'].capitalize()}",
f"Organization: {opp['organization']}",
f"Description: {opp['description']}",
f"Eligibility: {opp['eligibility']}",
f"Regions: {', '.join(opp['regions'])}",
f"Topics: {', '.join(opp['topics'])}",
f"Deadline: {opp['deadline']}",
f"Duration: {opp['duration']}",
f"Benefits: {opp['benefits']}",
f"Language: {opp['language']}",
]
text = "\n".join(parts)
metadata = {
"source": opp.get("source_url", ""),
"source_type": "opportunity",
"doc_id": opp["id"],
"title": opp["title"],
"opp_type": opp["type"],
"organization": opp["organization"],
"regions": ", ".join(opp["regions"]),
"topics": ", ".join(opp["topics"]),
"deadline": opp["deadline"],
"language": opp.get("language", "English"),
}
return Document(page_content=text, metadata=metadata)
def _article_to_chunks(article: dict, splitter: RecursiveCharacterTextSplitter) -> list[Document]:
"""
Split an article into semantic chunks, preserving metadata on each.
Strategy: Use paragraph-aware splitting. Articles are longer so we split
them, but keep chunks large enough to preserve context (~400-600 tokens).
"""
full_text = f"Article: {article['title']}\nAuthor: {article.get('author', 'IJNet Staff')}\n\n{article['full_text']}"
base_metadata = {
"source": article.get("source_url", ""),
"source_type": "article",
"doc_id": article["id"],
"title": article["title"],
"author": article.get("author", "IJNet Staff"),
"date": article.get("date", ""),
"section": article.get("section", ""),
"topics": ", ".join(article.get("topics", [])),
}
chunks = splitter.split_text(full_text)
documents = []
for i, chunk in enumerate(chunks):
meta = {**base_metadata, "chunk_index": i, "total_chunks": len(chunks)}
documents.append(Document(page_content=chunk, metadata=meta))
return documents
def _ijnet_info_to_document(info: dict) -> list[Document]:
"""Convert IJNet organizational info into documents."""
docs = []
# About document
about_text = (
f"About IJNet: {info['about']}\n\n"
f"IJNet is available in these languages: {', '.join(info['languages'])}.\n"
f"Website: {info['website']}"
)
docs.append(Document(
page_content=about_text,
metadata={
"source": info["website"],
"source_type": "ijnet_info",
"doc_id": "ijnet-about",
"title": "About IJNet",
}
))
# Services document
services_text = (
"IJNet Services and Offerings:\n" +
"\n".join(f"- {s}" for s in info["services"])
)
docs.append(Document(
page_content=services_text,
metadata={
"source": info["website"],
"source_type": "ijnet_info",
"doc_id": "ijnet-services",
"title": "IJNet Services",
}
))
return docs
def build_documents(kb: dict) -> list[Document]:
"""
Build the full document list from the knowledge base.
Returns documents ready for embedding and indexing.
"""
documents = []
# --- Opportunities: one doc per opportunity (no splitting) ---
for opp in kb.get("opportunities", []):
documents.append(_opportunity_to_document(opp))
# --- Articles: split into semantic chunks ---
splitter = RecursiveCharacterTextSplitter(
chunk_size=800, # ~200 tokens — small enough for precise retrieval
chunk_overlap=150, # overlap to preserve context at boundaries
separators=["\n\n", "\n", ". ", ", ", " "], # prefer paragraph boundaries
length_function=len,
)
for article in kb.get("articles", []):
if article.get("full_text"):
chunks = _article_to_chunks(article, splitter)
documents.extend(chunks)
# --- IJNet Info ---
if kb.get("ijnet_info"):
documents.extend(_ijnet_info_to_document(kb["ijnet_info"]))
return documents
# ---------------------------------------------------------------------------
# 3. EMBEDDING + VECTOR STORE
# ---------------------------------------------------------------------------
from langchain_core.embeddings import Embeddings as EmbeddingsBase
class SimpleEmbeddings(EmbeddingsBase):
"""
Lightweight TF-IDF-based fallback embeddings for environments where
HuggingFace model downloads are blocked. Uses sklearn's TfidfVectorizer
fit on the corpus vocabulary for decent keyword-aware embeddings.
NOT recommended for production — use HuggingFace sentence-transformers
when internet access is available.
"""
dimension: int = 384
def __init__(self, dimension: int = 384, **kwargs):
super().__init__(**kwargs)
from sklearn.feature_extraction.text import TfidfVectorizer
import numpy as np
self.dimension = dimension
self._vectorizer = TfidfVectorizer(max_features=dimension, stop_words="english")
self._fitted = False
self._np = np
def _ensure_fitted(self, texts: list[str]):
if not self._fitted:
self._vectorizer.fit(texts)
self._fitted = True
def embed_documents(self, texts: list[str]) -> list[list[float]]:
self._ensure_fitted(texts)
matrix = self._vectorizer.transform(texts).toarray()
result = self._np.zeros((len(texts), self.dimension))
cols = min(matrix.shape[1], self.dimension)
result[:, :cols] = matrix[:, :cols]
norms = self._np.linalg.norm(result, axis=1, keepdims=True)
norms[norms == 0] = 1
result = result / norms
return result.tolist()
def embed_query(self, text: str) -> list[float]:
return self.embed_documents([text])[0]
def get_embeddings(model_name: str = "sentence-transformers/all-MiniLM-L6-v2", use_fallback: bool = False):
"""
Initialize the embedding model.
Primary: HuggingFace sentence-transformers (requires internet on first run).
Fallback: TF-IDF based embeddings (works fully offline).
Args:
model_name: HuggingFace model name for sentence-transformers
use_fallback: Force using the lightweight TF-IDF fallback
"""
if use_fallback:
print("Using fallback TF-IDF embeddings (offline mode)")
return SimpleEmbeddings()
try:
from langchain_huggingface import HuggingFaceEmbeddings
return HuggingFaceEmbeddings(
model_name=model_name,
model_kwargs={"device": "cpu"},
encode_kwargs={"normalize_embeddings": True},
)
except Exception as e:
print(f"Warning: Could not load HuggingFace embeddings ({e})")
print("Falling back to TF-IDF embeddings.")
return SimpleEmbeddings()
def build_vector_store(
documents: list[Document],
embeddings=None,
save_path: str = "data/faiss_index"
) -> FAISS:
"""
Build and persist a FAISS vector store from documents.
"""
if embeddings is None:
embeddings = get_embeddings()
print(f"Building vector store with {len(documents)} documents...")
vector_store = FAISS.from_documents(documents, embeddings)
# Persist to disk
vector_store.save_local(save_path)
print(f"Vector store saved to {save_path}")
return vector_store
def load_vector_store(
save_path: str = "data/faiss_index",
embeddings=None
) -> FAISS:
"""Load a persisted FAISS vector store."""
if embeddings is None:
embeddings = get_embeddings()
return FAISS.load_local(
save_path, embeddings, allow_dangerous_deserialization=True
)
# ---------------------------------------------------------------------------
# 4. FULL INGESTION PIPELINE
# ---------------------------------------------------------------------------
def run_ingestion(
kb_path: str = "data/knowledge_base.json",
index_path: str = "data/faiss_index",
) -> FAISS:
"""
Complete ingestion pipeline: load → chunk → embed → store.
"""
print("=" * 50)
print("IJNet Knowledge Base Ingestion Pipeline")
print("=" * 50)
# Load
print("\n[1/4] Loading knowledge base...")
kb = load_knowledge_base(kb_path)
print(f" - {len(kb.get('opportunities', []))} opportunities")
print(f" - {len(kb.get('articles', []))} articles")
# Chunk
print("\n[2/4] Building documents with smart chunking...")
documents = build_documents(kb)
print(f" - {len(documents)} total document chunks")
# Show breakdown
by_type = {}
for doc in documents:
t = doc.metadata.get("source_type", "unknown")
by_type[t] = by_type.get(t, 0) + 1
for t, count in by_type.items():
print(f" • {t}: {count} chunks")
# Embed + Store
print("\n[3/4] Initializing embedding model...")
embeddings = get_embeddings()
print("\n[4/4] Building FAISS vector store...")
vector_store = build_vector_store(documents, embeddings, index_path)
print("\n✓ Ingestion complete!")
return vector_store
if __name__ == "__main__":
run_ingestion()