""" PDF Ingestion Module for Quiz Battle RAG Question Bank. Ingests PDFs from Firebase Storage, extracts text, chunks content, generates embeddings, calls DeepSeek to produce base questions, and stores results in Firestore. """ import asyncio import hashlib import io import json import logging import os import random from dataclasses import dataclass from datetime import datetime, timezone from typing import Optional from google.cloud.firestore import Client from langchain_text_splitters import RecursiveCharacterTextSplitter from sentence_transformers import SentenceTransformer import pypdf from rag.firebase_storage_loader import _init_firebase_storage from services.ai_client import get_deepseek_client, CHAT_MODEL logger = logging.getLogger(__name__) EMBEDDING_MODEL = os.getenv("EMBEDDING_MODEL", "all-MiniLM-L6-v2") DEFAULT_FIREBASE_PROJECT = os.getenv("FIREBASE_AUTH_PROJECT_ID", "mathpulse-ai-2026") @dataclass class IngestionResult: """Result of a PDF ingestion operation.""" filename: str processed: bool question_count: int grade_level: int topic: str storage_path: str timestamp: datetime def _extract_filename(storage_path: str) -> str: """Extract filename from a Firebase Storage path.""" return storage_path.split("/")[-1] def _generate_chunk_id(source_chunk_id: str, question_text: str) -> str: """Generate a unique document ID for a question.""" return hashlib.md5(f"{source_chunk_id}:{question_text}".encode()).hexdigest() def _strip_json_fences(text: str) -> str: """Strip markdown JSON fences from text.""" text = text.strip() if text.startswith("```json"): text = text[7:] if text.startswith("```"): text = text[3:] if text.endswith("```"): text = text[:-3] return text.strip() async def _generate_questions_for_chunk( chunk_text: str, chunk_id: str, topic: str, grade_level: int, deepseek_client, ) -> list[dict]: """Call DeepSeek to generate MCQs for a text chunk.""" system_prompt = ( "You are a DepEd-aligned math question generator for Filipino students. " "Given a curriculum excerpt, generate 5 multiple-choice questions. " "Return ONLY a JSON array. No markdown, no explanation." ) user_prompt = f"""Given this curriculum excerpt: {chunk_text} Generate 5 multiple-choice questions. For each question output JSON: {{ "question": "...", "choices": ["A) ...", "B) ...", "C) ...", "D) ..."], "correct_answer": "A", "explanation": "...", "topic": "{topic}", "difficulty": "easy|medium|hard", "grade_level": {grade_level}, "source_chunk_id": "{chunk_id}" }} Return a JSON array only, no extra text.""" try: response = deepseek_client.chat.completions.create( model=CHAT_MODEL, messages=[ {"role": "system", "content": system_prompt}, {"role": "user", "content": user_prompt}, ], temperature=0.7, ) raw_response = response.choices[0].message.content clean_response = _strip_json_fences(raw_response) questions = json.loads(clean_response) return questions if isinstance(questions, list) else [] except json.JSONDecodeError as e: logger.error(f"Failed to parse DeepSeek response as JSON for chunk {chunk_id}: {e}") return [] except Exception as e: logger.error(f"Error calling DeepSeek for chunk {chunk_id}: {e}") return [] def _chunk_text(text: str) -> list[str]: """Split text into chunks using RecursiveCharacterTextSplitter.""" splitter = RecursiveCharacterTextSplitter( chunk_size=500, chunk_overlap=50, length_function=len, separators=["\n\n", "\n", " ", ""], ) return splitter.split_text(text) def _extract_pdf_text(pdf_bytes: bytes) -> str: """Extract text from PDF bytes using pypdf.""" reader = pypdf.PdfReader(io.BytesIO(pdf_bytes)) text_parts = [] for page in reader.pages: text_parts.append(page.extract_text()) return "\n".join(text_parts) async def _save_questions_batch( firestore_client: Client, questions: list[dict], grade_level: int, topic: str, ) -> int: """Save questions to Firestore using batch writes. Returns count saved.""" batch = firestore_client.batch() question_count = 0 for question in questions: doc_id = question.get("id") or _generate_chunk_id( question.get("source_chunk_id", ""), question.get("question", ""), ) doc_ref = firestore_client.collection("question_bank").document( str(grade_level) ).collection(topic).document("questions").collection("questions").document(doc_id) doc_data = { "question": question.get("question", ""), "choices": question.get("choices", []), "correct_answer": question.get("correct_answer", ""), "explanation": question.get("explanation", ""), "topic": question.get("topic", topic), "difficulty": question.get("difficulty", "medium"), "grade_level": question.get("grade_level", grade_level), "source_chunk_id": question.get("source_chunk_id", ""), "random_seed": random.random(), "created_at": datetime.now(timezone.utc), } batch.set(doc_ref, doc_data) question_count += 1 if question_count % 500 == 0: await batch.commit() batch = firestore_client.batch() await batch.commit() return question_count async def _save_embeddings_batch( firestore_client: Client, chunks: list[dict], filename: str, ) -> int: """Save chunk embeddings to Firestore. Returns count saved.""" batch = firestore_client.batch() count = 0 for chunk in chunks: chunk_id = chunk["id"] doc_ref = firestore_client.collection("question_bank_embeddings").document(chunk_id) doc_data = { "chunk_id": chunk_id, "text": chunk["text"], "embedding": chunk["embedding"], "filename": filename, "created_at": datetime.now(timezone.utc), } batch.set(doc_ref, doc_data) count += 1 if count % 500 == 0: await batch.commit() batch = firestore_client.batch() await batch.commit() return count async def _save_processing_manifest( firestore_client: Client, filename: str, question_count: int, chunk_count: int, grade_level: int, topic: str, storage_path: str, ) -> None: """Save processing manifest to Firestore.""" doc_ref = firestore_client.collection("pdf_processing_status").document(filename) doc_data = { "filename": filename, "question_count": question_count, "chunk_count": chunk_count, "grade_level": grade_level, "topic": topic, "storage_path": storage_path, "processed_at": datetime.now(timezone.utc), "status": "completed", } await doc_ref.set(doc_data) async def ingest_pdf( storage_path: str, grade_level: int, topic: str, force_reingest: bool = False, ) -> IngestionResult: """ Ingest a PDF from Firebase Storage, generate questions, and store in Firestore. Args: storage_path: Path to PDF in Firebase Storage (e.g., "rag-pdfs/filename.pdf") grade_level: Grade level (11 or 12) topic: Topic identifier for the questions force_reingest: If True, reprocess even if already processed Returns: IngestionResult with processing summary """ filename = _extract_filename(storage_path) project_id = os.getenv("FIREBASE_AUTH_PROJECT_ID", DEFAULT_FIREBASE_PROJECT) firestore_client = Client(project=project_id) # Step 1: Check if already processed if not force_reingest: status_ref = firestore_client.collection("pdf_processing_status").document(filename) status_doc = await status_ref.get() if status_doc.exists: logger.info(f"PDF {filename} already processed, skipping (use force_reingest=True to override)") data = status_doc.to_dict() or {} return IngestionResult( filename=filename, processed=True, question_count=data.get("question_count", 0), grade_level=data.get("grade_level", grade_level), topic=data.get("topic", topic), storage_path=data.get("storage_path", storage_path), timestamp=data.get("timestamp", datetime.now(timezone.utc)), ) # Step 2: Download PDF from Firebase Storage try: _, bucket = _init_firebase_storage() blob = bucket.blob(storage_path) pdf_bytes = blob.download_as_bytes() except Exception as e: logger.error(f"Failed to download PDF from Firebase Storage: {e}") return IngestionResult( filename=filename, processed=False, question_count=0, grade_level=grade_level, topic=topic, storage_path=storage_path, timestamp=datetime.now(timezone.utc), ) # Step 3: Extract text from PDF try: text = _extract_pdf_text(pdf_bytes) except Exception as e: logger.error(f"Failed to extract text from PDF: {e}") return IngestionResult( filename=filename, processed=False, question_count=0, grade_level=grade_level, topic=topic, storage_path=storage_path, timestamp=datetime.now(timezone.utc), ) # Step 4: Chunk text chunks = _chunk_text(text) # Step 5: Generate embeddings embedding_model = SentenceTransformer(EMBEDDING_MODEL) chunk_ids = [] chunk_data = [] for i, chunk_text in enumerate(chunks): chunk_id = hashlib.md5(f"{filename}:{i}:{chunk_text[:100]}".encode()).hexdigest() embedding = embedding_model.encode(chunk_text).tolist() chunk_ids.append(chunk_id) chunk_data.append({ "id": chunk_id, "text": chunk_text, "embedding": embedding, }) # Step 6: Initialize DeepSeek client deepseek_client = get_deepseek_client() # Step 7: Generate questions for each chunk all_questions = [] for i, chunk_text in enumerate(chunks): chunk_id = chunk_ids[i] questions = await _generate_questions_for_chunk( chunk_text, chunk_id, topic, grade_level, deepseek_client ) for q in questions: q["id"] = _generate_chunk_id(chunk_id, q.get("question", "")) all_questions.extend(questions) # Step 8: Save questions to Firestore question_count = await _save_questions_batch( firestore_client, all_questions, grade_level, topic ) # Step 9: Save embeddings to Firestore await _save_embeddings_batch(firestore_client, chunk_data, filename) # Step 10: Save manifest to Firestore await _save_processing_manifest( firestore_client, filename, question_count, len(chunks), grade_level, topic, storage_path ) logger.info( f"Completed ingestion for {filename}: {question_count} questions, " f"{len(chunks)} chunks" ) return IngestionResult( filename=filename, processed=True, question_count=question_count, grade_level=grade_level, topic=topic, storage_path=storage_path, timestamp=datetime.now(timezone.utc), )