rag-bible / rag /ingest.py
GitHub Actions
Deploy from GitHub (751bcb4)
d9e0e26
"""Atomic ingestion script: filter, embed, and index Bible verses."""
import json
import logging
import sqlite3
from pathlib import Path
from typing import Any
import faiss
from sentence_transformers import SentenceTransformer
import config
from rag.embeddings import encode_texts, load_embedding_model
logger = logging.getLogger(__name__)
def fetch_verses(db_path: Path) -> list[dict[str, Any]]:
"""Fetch all verses from the SQLite database.
Parameters
----------
db_path : Path
Path to the SQLite database file.
Returns
-------
list[dict[str, Any]]
List of verse dicts with rowid and all column values.
"""
with sqlite3.connect(db_path) as conn:
conn.row_factory = sqlite3.Row
cursor = conn.execute(
"SELECT rowid, book, book_id, book_title, chapter, chapter_id, "
"chapter_title, verse, text FROM verses"
)
return [dict(row) for row in cursor.fetchall()]
def filter_verses(
verses: list[dict[str, Any]],
min_length: int = config.MIN_TEXT_LENGTH,
min_words: int = config.MIN_WORD_COUNT,
) -> list[dict[str, Any]]:
"""Filter out non-content verses based on text length and word count.
Parameters
----------
verses : list[dict[str, Any]]
Raw verse dicts from the database.
min_length : int
Minimum character length for verse text.
min_words : int
Minimum word count for verse text.
Returns
-------
list[dict[str, Any]]
Filtered verse dicts.
"""
return [
v for v in verses if len(v["text"]) >= min_length and len(v["text"].split()) >= min_words
]
def build_index(
texts: list[str],
model: SentenceTransformer,
dimension: int = config.EMBEDDING_DIMENSION,
) -> faiss.Index:
"""Embed texts and build a FAISS inner-product index.
Parameters
----------
texts : list[str]
Texts to embed.
model : SentenceTransformer
Loaded embedding model.
dimension : int
Embedding dimension.
Returns
-------
faiss.Index
FAISS IndexFlatIP with all embeddings added.
"""
embeddings = encode_texts(model, texts)
index = faiss.IndexFlatIP(dimension)
index.add(embeddings)
return index
def save_artifacts(
index: faiss.Index,
mapping: list[dict[str, Any]],
index_path: Path,
mapping_path: Path,
) -> None:
"""Save FAISS index and JSON mapping to disk.
Parameters
----------
index : faiss.Index
FAISS index to save.
mapping : list[dict[str, Any]]
Verse metadata mapping.
index_path : Path
Output path for the FAISS index file.
mapping_path : Path
Output path for the JSON mapping file.
"""
index_path.parent.mkdir(parents=True, exist_ok=True)
faiss.write_index(index, str(index_path))
with open(mapping_path, "w", encoding="utf-8") as f:
json.dump(mapping, f, ensure_ascii=False, indent=2)
def main(
db_path: Path = config.DB_PATH,
index_path: Path = config.INDEX_PATH,
mapping_path: Path = config.MAPPING_PATH,
) -> None:
"""Run the full ingestion pipeline.
Parameters
----------
db_path : Path
Path to the SQLite database.
index_path : Path
Output path for the FAISS index.
mapping_path : Path
Output path for the JSON mapping.
"""
logger.info("Fetching verses from %s", db_path)
verses = fetch_verses(db_path)
logger.info("Total verses: %d", len(verses))
filtered = filter_verses(verses)
logger.info("After filtering: %d", len(filtered))
# filtered dicts serve as the mapping (schema: rowid, book, book_id,
# book_title, chapter, chapter_id, chapter_title, verse, text)
mapping = filtered
texts = [v["text"] for v in filtered]
logger.info("Loading embedding model...")
model = load_embedding_model()
logger.info("Building FAISS index...")
index = build_index(texts, model)
logger.info("Index size: %d", index.ntotal)
logger.info("Saving artifacts to %s", index_path.parent)
save_artifacts(index, mapping, index_path, mapping_path)
logger.info("Done.")
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO, format="%(levelname)s: %(message)s")
main()