""" Insta-AutoApp Ingestion Pipeline Preprocesses the OEM manual PDF into a FAISS vector index. Usage: python ingest.py Example: python ingest.py manual/bronco_2023_manual.pdf """ import os import sys import re import pickle import logging from pathlib import Path import fitz # PyMuPDF import faiss import numpy as np from sentence_transformers import SentenceTransformer from langchain_text_splitters import RecursiveCharacterTextSplitter from config import ( FAISS_INDEX_PATH, FAISS_DOCSTORE_PATH, EMBEDDING_MODEL, CHUNK_SIZE, CHUNK_OVERLAP ) logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s") logger = logging.getLogger(__name__) # ============================================================================= # Content Filtering # ============================================================================= # Pages/sections to exclude (low-value content) EXCLUDE_PATTERNS = [ r"table of contents", r"^\s*index\s*$", r"alphabetical index", r"copyright.*ford", r"all rights reserved", r"^\s*page\s+\d+\s*$", r"^\s*\d+\s*$", # Just page numbers r"www\.ford\.com", r"owner\.ford\.com", ] # Sections to prioritize (high-value content) PRIORITY_KEYWORDS = [ "warning", "indicator", "light", "lamp", "symptom", "troubleshoot", "problem", "issue", "check engine", "drivetrain", "4x4", "four-wheel", "trail", "goat mode", "terrain", "brake", "steering", "overheat", "temperature", "oil pressure", "battery", "transmission", "traction control", "stability control", "abs", "tire pressure", "tpms", ] def should_exclude_text(text: str) -> bool: """Check if text chunk should be excluded (low-value content).""" text_lower = text.lower().strip() # Exclude very short chunks if len(text_lower) < 50: return True # Check exclusion patterns for pattern in EXCLUDE_PATTERNS: if re.search(pattern, text_lower, re.IGNORECASE): return True return False def is_priority_content(text: str) -> bool: """Check if text contains priority keywords (symptom-relevant content).""" text_lower = text.lower() return any(keyword in text_lower for keyword in PRIORITY_KEYWORDS) # ============================================================================= # PDF Extraction # ============================================================================= def extract_text_from_pdf(pdf_path: str) -> list[dict]: """ Extract text from PDF using PyMuPDF. Args: pdf_path: Path to the PDF file Returns: List of dicts with 'text' and 'page' keys """ logger.info(f"Opening PDF: {pdf_path}") doc = fitz.open(pdf_path) pages_content = [] for page_num in range(len(doc)): page = doc[page_num] text = page.get_text("text") if text.strip(): pages_content.append({ "text": text, "page": page_num + 1 # 1-indexed }) doc.close() logger.info(f"Extracted text from {len(pages_content)} pages") return pages_content # ============================================================================= # Text Chunking # ============================================================================= def chunk_documents(pages_content: list[dict]) -> list[dict]: """ Split extracted text into chunks using LangChain. Args: pages_content: List of page dicts from extract_text_from_pdf Returns: List of chunk dicts with 'text', 'page', and 'is_priority' keys """ # Combine all text with page markers full_text = "" page_boundaries = [] for page_data in pages_content: start_idx = len(full_text) full_text += page_data["text"] + "\n\n" page_boundaries.append({ "start": start_idx, "end": len(full_text), "page": page_data["page"] }) # Create text splitter # Approximate tokens to characters (1 token ≈ 4 characters) chunk_size_chars = CHUNK_SIZE * 4 chunk_overlap_chars = CHUNK_OVERLAP * 4 splitter = RecursiveCharacterTextSplitter( chunk_size=chunk_size_chars, chunk_overlap=chunk_overlap_chars, separators=["\n\n", "\n", ". ", " ", ""], length_function=len ) # Split text chunks_text = splitter.split_text(full_text) logger.info(f"Split into {len(chunks_text)} raw chunks") # Map chunks back to pages and filter chunks = [] filtered_count = 0 for chunk_text in chunks_text: # Skip excluded content if should_exclude_text(chunk_text): filtered_count += 1 continue # Find which page this chunk came from (approximate) chunk_start = full_text.find(chunk_text[:100]) # Find by first 100 chars page_num = 1 for boundary in page_boundaries: if boundary["start"] <= chunk_start < boundary["end"]: page_num = boundary["page"] break chunks.append({ "text": chunk_text.strip(), "page": page_num, "is_priority": is_priority_content(chunk_text) }) logger.info(f"Filtered out {filtered_count} low-value chunks") logger.info(f"Final chunk count: {len(chunks)}") # Log priority content stats priority_count = sum(1 for c in chunks if c["is_priority"]) logger.info(f"Priority chunks (symptom-relevant): {priority_count}") return chunks # ============================================================================= # Embedding and Index Creation # ============================================================================= def create_faiss_index(chunks: list[dict]) -> tuple[faiss.Index, list[str], list[dict]]: """ Create FAISS index from document chunks. Args: chunks: List of chunk dicts from chunk_documents Returns: Tuple of (faiss_index, documents_list, metadata_list) """ # Load embedding model logger.info(f"Loading embedding model: {EMBEDDING_MODEL}") model = SentenceTransformer(EMBEDDING_MODEL) # Extract texts texts = [chunk["text"] for chunk in chunks] # Generate embeddings logger.info(f"Generating embeddings for {len(texts)} chunks...") embeddings = model.encode(texts, show_progress_bar=True, convert_to_numpy=True) embeddings = embeddings.astype("float32") # Create FAISS index dimension = embeddings.shape[1] logger.info(f"Creating FAISS index (dimension: {dimension})") index = faiss.IndexFlatL2(dimension) index.add(embeddings) # Prepare metadata metadata = [{"page": chunk["page"], "is_priority": chunk["is_priority"]} for chunk in chunks] return index, texts, metadata def save_index(index: faiss.Index, documents: list[str], metadata: list[dict]): """Save FAISS index and document store to disk.""" # Ensure data directory exists os.makedirs(os.path.dirname(FAISS_INDEX_PATH), exist_ok=True) # Save FAISS index logger.info(f"Saving FAISS index to {FAISS_INDEX_PATH}") faiss.write_index(index, FAISS_INDEX_PATH) # Save document store logger.info(f"Saving document store to {FAISS_DOCSTORE_PATH}") docstore = { "documents": documents, "metadata": metadata } with open(FAISS_DOCSTORE_PATH, "wb") as f: pickle.dump(docstore, f) logger.info("Index saved successfully!") # ============================================================================= # Main # ============================================================================= def main(): if len(sys.argv) < 2: print("Usage: python ingest.py ") print("Example: python ingest.py manual/bronco_2023_manual.pdf") sys.exit(1) pdf_path = sys.argv[1] if not os.path.exists(pdf_path): logger.error(f"PDF file not found: {pdf_path}") sys.exit(1) logger.info("=" * 60) logger.info("Insta-AutoApp Ingestion Pipeline") logger.info("=" * 60) # Step 1: Extract text from PDF pages_content = extract_text_from_pdf(pdf_path) # Step 2: Chunk documents chunks = chunk_documents(pages_content) # Step 3: Create FAISS index index, documents, metadata = create_faiss_index(chunks) # Step 4: Save to disk save_index(index, documents, metadata) logger.info("=" * 60) logger.info("Ingestion complete!") logger.info(f"Total chunks indexed: {len(documents)}") logger.info(f"Index file: {FAISS_INDEX_PATH}") logger.info(f"Document store: {FAISS_DOCSTORE_PATH}") logger.info("=" * 60) if __name__ == "__main__": main()