mathpulse-api-v3test / scripts /ingest_from_storage.py
github-actions[bot]
🚀 Auto-deploy backend from GitHub (93e7c2a)
92bfe31
"""
Ingest curriculum PDFs from Firebase Storage into ChromaDB.
Run: python -m backend.scripts.ingest_from_storage
"""
from __future__ import annotations
import logging
import os
import sys
from pathlib import Path
from typing import Any, Dict, List, Optional
logger = logging.getLogger("mathpulse.ingest")
sys.path.insert(0, str(Path(__file__).resolve().parents[2]))
from rag.firebase_storage_loader import (
PDF_METADATA,
download_pdf_from_storage,
list_curriculum_blobs,
)
_CONTENT_DOMAIN_CLASSIFIERS = [
("introduction", ["introduction", "welcome", "overview", "objectives", "learning objectives"]),
("key_concepts", ["key concepts", "key ideas", "main concepts", "definitions", "key terms"]),
("worked_examples", ["example", "worked example", "illustrative example", "sample problem", "solution"]),
("important_notes", ["important", "note", "remember", "tip", "caution", "warning", "key point"]),
("practice", ["practice", "exercise", "try it", "your turn", "activity", "problem set"]),
("summary", ["summary", "recap", "key takeaways", "wrap-up", "conclusion"]),
("assessment", ["assessment", "quiz", "test", "evaluation", "exam"]),
]
_CONTENT_TYPE_CLASSIFIERS = [
("definition", ["definition", "define", "means", "is defined as"]),
("formula", ["formula", "equation", "expression", "rule"]),
("procedure", ["step", "method", "how to", "procedure", "process"]),
("concept", ["concept", "idea", "principle", "theory"]),
("application", ["application", "use", "example", "solve", "problem"]),
]
def _classify_chunk(content: str) -> tuple[str, str]:
content_lower = content.lower()
content_domain = "general"
chunk_type = "concept"
for domain, keywords in _CONTENT_DOMAIN_CLASSIFIERS:
if any(kw in content_lower for kw in keywords):
content_domain = domain
break
for ctype, keywords in _CONTENT_TYPE_CLASSIFIERS:
if any(kw in content_lower for kw in keywords):
chunk_type = ctype
break
return content_domain, chunk_type
def _classify_lesson_section(content: str) -> str:
content_lower = content.lower().strip()
first_sentence = content_lower[:200]
for domain, keywords in _CONTENT_DOMAIN_CLASSIFIERS:
if any(kw in first_sentence for kw in keywords):
return domain
return "general"
def chunk_text_preserve_pages(text: str, page_starts: List[int], chunk_size: int = 500, overlap: int = 80) -> List[Dict[str, Any]]:
"""Split text into overlapping chunks, preserving page traceability."""
# Filter out None/empty entries that can result from malformed PDF text extraction
words = [w for w in text.split() if w is not None and str(w).strip()]
chunks = []
i = 0
chunk_idx = 0
while i < len(words):
chunk_words = words[i : i + chunk_size]
chunk_text = " ".join(str(w) for w in chunk_words)
estimated_page = max(1, (i // chunk_size) + 1)
content_domain, chunk_type = _classify_chunk(chunk_text)
chunks.append({
"text": chunk_text,
"chunk_index": chunk_idx,
"estimated_page": estimated_page,
"content_domain": content_domain,
"chunk_type": chunk_type,
})
i += chunk_size - overlap
chunk_idx += 1
return chunks
def extract_pdf_text_and_pages(pdf_bytes: bytes) -> tuple[str, List[int]]:
"""Extract text from PDF bytes, returning full text and page start positions."""
try:
from pypdf import PdfReader
except ImportError:
try:
import PyPDF2 as PdfReaderModule
from PyPDF2 import PdfReader
except ImportError:
logger.error("No PDF library available. Install: pip install pypdf")
return "", []
import io
reader = PdfReader(io.BytesIO(pdf_bytes))
pages: List[str] = []
for page in reader.pages:
text = page.extract_text() or ""
pages.append(text)
page_starts = []
position = 0
for page_text in pages:
page_starts.append(position)
position += len(page_text) + 1
full_text = "\n".join(pages)
return full_text, page_starts
def get_firestore_client():
try:
import firebase_admin
from firebase_admin import firestore
if not firebase_admin._apps:
sa_json = os.getenv("FIREBASE_SERVICE_ACCOUNT_JSON")
sa_file = os.getenv("FIREBASE_SERVICE_ACCOUNT_FILE")
bucket_name = os.getenv("FIREBASE_STORAGE_BUCKET", "mathpulse-ai-2026.firebasestorage.app")
if sa_json:
import json as _json
from firebase_admin import credentials
creds = credentials.Certificate(_json.loads(sa_json))
firebase_admin.initialize_app(creds, {"storageBucket": bucket_name})
elif sa_file and Path(sa_file).exists():
from firebase_admin import credentials
creds = credentials.Certificate(sa_file)
firebase_admin.initialize_app(creds, {"storageBucket": bucket_name})
else:
firebase_admin.initialize_app(options={"storageBucket": bucket_name})
return firestore.client()
except Exception as e:
logger.warning("Firestore unavailable: %s", e)
return None
def ingest_from_firebase_storage(force_reindex: bool = False):
"""Download PDFs from Firebase Storage and ingest into ChromaDB."""
try:
from sentence_transformers import SentenceTransformer
import chromadb
except ImportError:
logger.error("Missing dependencies. Install: pip install chromadb sentence-transformers pypdf")
return
chroma_path = os.getenv("CURRICULUM_VECTORSTORE_DIR", "datasets/vectorstore")
chroma_client = chromadb.PersistentClient(path=chroma_path)
collection = chroma_client.get_or_create_collection(
name="curriculum_chunks",
metadata={"hnsw:space": "cosine"},
)
embedder = SentenceTransformer("BAAI/bge-base-en-v1.5")
db = get_firestore_client()
logger.info("Starting ingestion from Firebase Storage...")
ingested_count = 0
skipped_count = 0
error_count = 0
for storage_path, metadata in PDF_METADATA.items():
doc_id = storage_path.replace("/", "_").replace(".pdf", "")
if db:
try:
doc_ref = db.collection("curriculumDocuments").document(doc_id)
existing = doc_ref.get()
if existing.exists:
if not force_reindex and existing.to_dict().get("status") == "ingested":
logger.info("[SKIP] %s already ingested", storage_path)
skipped_count += 1
continue
except Exception as e:
logger.warning("Firestore check failed for %s: %s", storage_path, e)
logger.info("Downloading: %s", storage_path)
pdf_bytes = download_pdf_from_storage(storage_path)
if pdf_bytes is None:
logger.error("[ERROR] Failed to download: %s", storage_path)
if db:
try:
doc_ref.set({
"storagePath": storage_path,
"status": "failed",
"error": "download_failed",
**metadata,
}, merge=True)
except:
pass
error_count += 1
continue
logger.info("Extracting text from: %s (%d bytes)", storage_path, len(pdf_bytes))
full_text, page_starts = extract_pdf_text_and_pages(pdf_bytes)
if not full_text.strip():
logger.warning("[WARN] No text extracted from: %s", storage_path)
error_count += 1
continue
chunks = chunk_text_preserve_pages(full_text, page_starts)
logger.info(" -> %d chunks created", len(chunks))
existing_ids = [cid for cid in collection.get()["ids"] if cid.startswith(f"{doc_id}_chunk_")]
if existing_ids:
collection.delete(ids=existing_ids)
logger.info(" Removed %d existing chunks", len(existing_ids))
for chunk in chunks:
chunk_text = chunk.get("text", "")
if not isinstance(chunk_text, str) or not chunk_text.strip():
logger.warning(" Skipping empty/invalid chunk %s (type=%s, len=%d)", chunk.get("chunk_index"), type(chunk_text), len(chunk_text))
continue
chunk_id = f"{doc_id}_chunk_{chunk['chunk_index']}"
try:
embedding = embedder.encode(chunk_text, normalize_embeddings=True).tolist()
except Exception as enc_err:
logger.warning(" Skipping unencodable chunk %s: %s", chunk.get("chunk_index"), enc_err)
continue
collection.add(
embeddings=[embedding],
documents=[chunk_text],
metadatas=[{
"document_id": doc_id,
"module_id": metadata.get("subjectId", ""),
"lesson_id": f"lesson-{doc_id}",
"title": metadata.get("subject", ""),
"subject": metadata.get("subject", ""),
"subjectId": metadata.get("subjectId", ""),
"quarter": metadata.get("quarter", 1),
"competency_code": metadata.get("competency_code", ""),
"content_domain": chunk["content_domain"],
"chunk_type": chunk["chunk_type"],
"source_file": storage_path.split("/")[-1],
"storage_path": storage_path,
"page": chunk["estimated_page"],
"chunk_index": chunk["chunk_index"],
"type": metadata.get("type", ""),
}],
ids=[chunk_id],
)
if db:
try:
doc_ref.set({
"id": doc_id,
"storagePath": storage_path,
"status": "ingested",
"ingestedAt": __import__("firebase_admin").firestore.SERVER_TIMESTAMP,
"chunkCount": len(chunks),
**metadata,
}, merge=True)
except Exception as e:
logger.warning("Firestore update failed: %s", e)
logger.info("[OK] Ingested %s (%d chunks)", storage_path, len(chunks))
ingested_count += 1
logger.info("=" * 50)
logger.info("Ingestion complete: %d ingested, %d skipped, %d errors", ingested_count, skipped_count, error_count)
logger.info("Total chunks in ChromaDB: %d", collection.count())
if __name__ == "__main__":
import argparse
logging.basicConfig(level=logging.INFO, format="%(levelname)s: %(message)s")
parser = argparse.ArgumentParser(description="Ingest curriculum PDFs from Firebase Storage into ChromaDB")
parser.add_argument("--force", action="store_true", help="Re-ingest even if already ingested")
args = parser.parse_args()
ingest_from_firebase_storage(force_reindex=args.force)