Spaces:
Running
Running
| """ | |
| Ingest curriculum PDFs from Firebase Storage into ChromaDB. | |
| Run: python -m backend.scripts.ingest_from_storage | |
| """ | |
| from __future__ import annotations | |
| import logging | |
| import os | |
| import sys | |
| from pathlib import Path | |
| from typing import Any, Dict, List, Optional | |
| logger = logging.getLogger("mathpulse.ingest") | |
| sys.path.insert(0, str(Path(__file__).resolve().parents[2])) | |
| from rag.firebase_storage_loader import ( | |
| PDF_METADATA, | |
| download_pdf_from_storage, | |
| list_curriculum_blobs, | |
| ) | |
| _CONTENT_DOMAIN_CLASSIFIERS = [ | |
| ("introduction", ["introduction", "welcome", "overview", "objectives", "learning objectives"]), | |
| ("key_concepts", ["key concepts", "key ideas", "main concepts", "definitions", "key terms"]), | |
| ("worked_examples", ["example", "worked example", "illustrative example", "sample problem", "solution"]), | |
| ("important_notes", ["important", "note", "remember", "tip", "caution", "warning", "key point"]), | |
| ("practice", ["practice", "exercise", "try it", "your turn", "activity", "problem set"]), | |
| ("summary", ["summary", "recap", "key takeaways", "wrap-up", "conclusion"]), | |
| ("assessment", ["assessment", "quiz", "test", "evaluation", "exam"]), | |
| ] | |
| _CONTENT_TYPE_CLASSIFIERS = [ | |
| ("definition", ["definition", "define", "means", "is defined as"]), | |
| ("formula", ["formula", "equation", "expression", "rule"]), | |
| ("procedure", ["step", "method", "how to", "procedure", "process"]), | |
| ("concept", ["concept", "idea", "principle", "theory"]), | |
| ("application", ["application", "use", "example", "solve", "problem"]), | |
| ] | |
| def _classify_chunk(content: str) -> tuple[str, str]: | |
| content_lower = content.lower() | |
| content_domain = "general" | |
| chunk_type = "concept" | |
| for domain, keywords in _CONTENT_DOMAIN_CLASSIFIERS: | |
| if any(kw in content_lower for kw in keywords): | |
| content_domain = domain | |
| break | |
| for ctype, keywords in _CONTENT_TYPE_CLASSIFIERS: | |
| if any(kw in content_lower for kw in keywords): | |
| chunk_type = ctype | |
| break | |
| return content_domain, chunk_type | |
| def _classify_lesson_section(content: str) -> str: | |
| content_lower = content.lower().strip() | |
| first_sentence = content_lower[:200] | |
| for domain, keywords in _CONTENT_DOMAIN_CLASSIFIERS: | |
| if any(kw in first_sentence for kw in keywords): | |
| return domain | |
| return "general" | |
| def chunk_text_preserve_pages(text: str, page_starts: List[int], chunk_size: int = 500, overlap: int = 80) -> List[Dict[str, Any]]: | |
| """Split text into overlapping chunks, preserving page traceability.""" | |
| # Filter out None/empty entries that can result from malformed PDF text extraction | |
| words = [w for w in text.split() if w is not None and str(w).strip()] | |
| chunks = [] | |
| i = 0 | |
| chunk_idx = 0 | |
| while i < len(words): | |
| chunk_words = words[i : i + chunk_size] | |
| chunk_text = " ".join(str(w) for w in chunk_words) | |
| estimated_page = max(1, (i // chunk_size) + 1) | |
| content_domain, chunk_type = _classify_chunk(chunk_text) | |
| chunks.append({ | |
| "text": chunk_text, | |
| "chunk_index": chunk_idx, | |
| "estimated_page": estimated_page, | |
| "content_domain": content_domain, | |
| "chunk_type": chunk_type, | |
| }) | |
| i += chunk_size - overlap | |
| chunk_idx += 1 | |
| return chunks | |
| def extract_pdf_text_and_pages(pdf_bytes: bytes) -> tuple[str, List[int]]: | |
| """Extract text from PDF bytes, returning full text and page start positions.""" | |
| try: | |
| from pypdf import PdfReader | |
| except ImportError: | |
| try: | |
| import PyPDF2 as PdfReaderModule | |
| from PyPDF2 import PdfReader | |
| except ImportError: | |
| logger.error("No PDF library available. Install: pip install pypdf") | |
| return "", [] | |
| import io | |
| reader = PdfReader(io.BytesIO(pdf_bytes)) | |
| pages: List[str] = [] | |
| for page in reader.pages: | |
| text = page.extract_text() or "" | |
| pages.append(text) | |
| page_starts = [] | |
| position = 0 | |
| for page_text in pages: | |
| page_starts.append(position) | |
| position += len(page_text) + 1 | |
| full_text = "\n".join(pages) | |
| return full_text, page_starts | |
| def get_firestore_client(): | |
| try: | |
| import firebase_admin | |
| from firebase_admin import firestore | |
| if not firebase_admin._apps: | |
| sa_json = os.getenv("FIREBASE_SERVICE_ACCOUNT_JSON") | |
| sa_file = os.getenv("FIREBASE_SERVICE_ACCOUNT_FILE") | |
| bucket_name = os.getenv("FIREBASE_STORAGE_BUCKET", "mathpulse-ai-2026.firebasestorage.app") | |
| if sa_json: | |
| import json as _json | |
| from firebase_admin import credentials | |
| creds = credentials.Certificate(_json.loads(sa_json)) | |
| firebase_admin.initialize_app(creds, {"storageBucket": bucket_name}) | |
| elif sa_file and Path(sa_file).exists(): | |
| from firebase_admin import credentials | |
| creds = credentials.Certificate(sa_file) | |
| firebase_admin.initialize_app(creds, {"storageBucket": bucket_name}) | |
| else: | |
| firebase_admin.initialize_app(options={"storageBucket": bucket_name}) | |
| return firestore.client() | |
| except Exception as e: | |
| logger.warning("Firestore unavailable: %s", e) | |
| return None | |
| def ingest_from_firebase_storage(force_reindex: bool = False): | |
| """Download PDFs from Firebase Storage and ingest into ChromaDB.""" | |
| try: | |
| from sentence_transformers import SentenceTransformer | |
| import chromadb | |
| except ImportError: | |
| logger.error("Missing dependencies. Install: pip install chromadb sentence-transformers pypdf") | |
| return | |
| chroma_path = os.getenv("CURRICULUM_VECTORSTORE_DIR", "datasets/vectorstore") | |
| chroma_client = chromadb.PersistentClient(path=chroma_path) | |
| collection = chroma_client.get_or_create_collection( | |
| name="curriculum_chunks", | |
| metadata={"hnsw:space": "cosine"}, | |
| ) | |
| embedder = SentenceTransformer("BAAI/bge-base-en-v1.5") | |
| db = get_firestore_client() | |
| logger.info("Starting ingestion from Firebase Storage...") | |
| ingested_count = 0 | |
| skipped_count = 0 | |
| error_count = 0 | |
| for storage_path, metadata in PDF_METADATA.items(): | |
| doc_id = storage_path.replace("/", "_").replace(".pdf", "") | |
| if db: | |
| try: | |
| doc_ref = db.collection("curriculumDocuments").document(doc_id) | |
| existing = doc_ref.get() | |
| if existing.exists: | |
| if not force_reindex and existing.to_dict().get("status") == "ingested": | |
| logger.info("[SKIP] %s already ingested", storage_path) | |
| skipped_count += 1 | |
| continue | |
| except Exception as e: | |
| logger.warning("Firestore check failed for %s: %s", storage_path, e) | |
| logger.info("Downloading: %s", storage_path) | |
| pdf_bytes = download_pdf_from_storage(storage_path) | |
| if pdf_bytes is None: | |
| logger.error("[ERROR] Failed to download: %s", storage_path) | |
| if db: | |
| try: | |
| doc_ref.set({ | |
| "storagePath": storage_path, | |
| "status": "failed", | |
| "error": "download_failed", | |
| **metadata, | |
| }, merge=True) | |
| except: | |
| pass | |
| error_count += 1 | |
| continue | |
| logger.info("Extracting text from: %s (%d bytes)", storage_path, len(pdf_bytes)) | |
| full_text, page_starts = extract_pdf_text_and_pages(pdf_bytes) | |
| if not full_text.strip(): | |
| logger.warning("[WARN] No text extracted from: %s", storage_path) | |
| error_count += 1 | |
| continue | |
| chunks = chunk_text_preserve_pages(full_text, page_starts) | |
| logger.info(" -> %d chunks created", len(chunks)) | |
| existing_ids = [cid for cid in collection.get()["ids"] if cid.startswith(f"{doc_id}_chunk_")] | |
| if existing_ids: | |
| collection.delete(ids=existing_ids) | |
| logger.info(" Removed %d existing chunks", len(existing_ids)) | |
| for chunk in chunks: | |
| chunk_text = chunk.get("text", "") | |
| if not isinstance(chunk_text, str) or not chunk_text.strip(): | |
| logger.warning(" Skipping empty/invalid chunk %s (type=%s, len=%d)", chunk.get("chunk_index"), type(chunk_text), len(chunk_text)) | |
| continue | |
| chunk_id = f"{doc_id}_chunk_{chunk['chunk_index']}" | |
| try: | |
| embedding = embedder.encode(chunk_text, normalize_embeddings=True).tolist() | |
| except Exception as enc_err: | |
| logger.warning(" Skipping unencodable chunk %s: %s", chunk.get("chunk_index"), enc_err) | |
| continue | |
| collection.add( | |
| embeddings=[embedding], | |
| documents=[chunk_text], | |
| metadatas=[{ | |
| "document_id": doc_id, | |
| "module_id": metadata.get("subjectId", ""), | |
| "lesson_id": f"lesson-{doc_id}", | |
| "title": metadata.get("subject", ""), | |
| "subject": metadata.get("subject", ""), | |
| "subjectId": metadata.get("subjectId", ""), | |
| "quarter": metadata.get("quarter", 1), | |
| "competency_code": metadata.get("competency_code", ""), | |
| "content_domain": chunk["content_domain"], | |
| "chunk_type": chunk["chunk_type"], | |
| "source_file": storage_path.split("/")[-1], | |
| "storage_path": storage_path, | |
| "page": chunk["estimated_page"], | |
| "chunk_index": chunk["chunk_index"], | |
| "type": metadata.get("type", ""), | |
| }], | |
| ids=[chunk_id], | |
| ) | |
| if db: | |
| try: | |
| doc_ref.set({ | |
| "id": doc_id, | |
| "storagePath": storage_path, | |
| "status": "ingested", | |
| "ingestedAt": __import__("firebase_admin").firestore.SERVER_TIMESTAMP, | |
| "chunkCount": len(chunks), | |
| **metadata, | |
| }, merge=True) | |
| except Exception as e: | |
| logger.warning("Firestore update failed: %s", e) | |
| logger.info("[OK] Ingested %s (%d chunks)", storage_path, len(chunks)) | |
| ingested_count += 1 | |
| logger.info("=" * 50) | |
| logger.info("Ingestion complete: %d ingested, %d skipped, %d errors", ingested_count, skipped_count, error_count) | |
| logger.info("Total chunks in ChromaDB: %d", collection.count()) | |
| if __name__ == "__main__": | |
| import argparse | |
| logging.basicConfig(level=logging.INFO, format="%(levelname)s: %(message)s") | |
| parser = argparse.ArgumentParser(description="Ingest curriculum PDFs from Firebase Storage into ChromaDB") | |
| parser.add_argument("--force", action="store_true", help="Re-ingest even if already ingested") | |
| args = parser.parse_args() | |
| ingest_from_firebase_storage(force_reindex=args.force) |