| """ |
| Insta-AutoApp Ingestion Pipeline |
| Preprocesses the OEM manual PDF into a FAISS vector index. |
| |
| Usage: |
| python ingest.py <path_to_pdf> |
| |
| Example: |
| python ingest.py manual/bronco_2023_manual.pdf |
| """ |
|
|
| import os |
| import sys |
| import re |
| import pickle |
| import logging |
| from pathlib import Path |
|
|
| import fitz |
| import faiss |
| import numpy as np |
| from sentence_transformers import SentenceTransformer |
| from langchain_text_splitters import RecursiveCharacterTextSplitter |
|
|
| from config import ( |
| FAISS_INDEX_PATH, |
| FAISS_DOCSTORE_PATH, |
| EMBEDDING_MODEL, |
| CHUNK_SIZE, |
| CHUNK_OVERLAP |
| ) |
|
|
| logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s") |
| logger = logging.getLogger(__name__) |
|
|
| |
| |
| |
|
|
| |
| EXCLUDE_PATTERNS = [ |
| r"table of contents", |
| r"^\s*index\s*$", |
| r"alphabetical index", |
| r"copyright.*ford", |
| r"all rights reserved", |
| r"^\s*page\s+\d+\s*$", |
| r"^\s*\d+\s*$", |
| r"www\.ford\.com", |
| r"owner\.ford\.com", |
| ] |
|
|
| |
| PRIORITY_KEYWORDS = [ |
| "warning", |
| "indicator", |
| "light", |
| "lamp", |
| "symptom", |
| "troubleshoot", |
| "problem", |
| "issue", |
| "check engine", |
| "drivetrain", |
| "4x4", |
| "four-wheel", |
| "trail", |
| "goat mode", |
| "terrain", |
| "brake", |
| "steering", |
| "overheat", |
| "temperature", |
| "oil pressure", |
| "battery", |
| "transmission", |
| "traction control", |
| "stability control", |
| "abs", |
| "tire pressure", |
| "tpms", |
| ] |
|
|
|
|
| def should_exclude_text(text: str) -> bool: |
| """Check if text chunk should be excluded (low-value content).""" |
| text_lower = text.lower().strip() |
| |
| |
| if len(text_lower) < 50: |
| return True |
| |
| |
| for pattern in EXCLUDE_PATTERNS: |
| if re.search(pattern, text_lower, re.IGNORECASE): |
| return True |
| |
| return False |
|
|
|
|
| def is_priority_content(text: str) -> bool: |
| """Check if text contains priority keywords (symptom-relevant content).""" |
| text_lower = text.lower() |
| return any(keyword in text_lower for keyword in PRIORITY_KEYWORDS) |
|
|
|
|
| |
| |
| |
|
|
| def extract_text_from_pdf(pdf_path: str) -> list[dict]: |
| """ |
| Extract text from PDF using PyMuPDF. |
| |
| Args: |
| pdf_path: Path to the PDF file |
| |
| Returns: |
| List of dicts with 'text' and 'page' keys |
| """ |
| logger.info(f"Opening PDF: {pdf_path}") |
| doc = fitz.open(pdf_path) |
| |
| pages_content = [] |
| |
| for page_num in range(len(doc)): |
| page = doc[page_num] |
| text = page.get_text("text") |
| |
| if text.strip(): |
| pages_content.append({ |
| "text": text, |
| "page": page_num + 1 |
| }) |
| |
| doc.close() |
| logger.info(f"Extracted text from {len(pages_content)} pages") |
| return pages_content |
|
|
|
|
| |
| |
| |
|
|
| def chunk_documents(pages_content: list[dict]) -> list[dict]: |
| """ |
| Split extracted text into chunks using LangChain. |
| |
| Args: |
| pages_content: List of page dicts from extract_text_from_pdf |
| |
| Returns: |
| List of chunk dicts with 'text', 'page', and 'is_priority' keys |
| """ |
| |
| full_text = "" |
| page_boundaries = [] |
| |
| for page_data in pages_content: |
| start_idx = len(full_text) |
| full_text += page_data["text"] + "\n\n" |
| page_boundaries.append({ |
| "start": start_idx, |
| "end": len(full_text), |
| "page": page_data["page"] |
| }) |
| |
| |
| |
| chunk_size_chars = CHUNK_SIZE * 4 |
| chunk_overlap_chars = CHUNK_OVERLAP * 4 |
| |
| splitter = RecursiveCharacterTextSplitter( |
| chunk_size=chunk_size_chars, |
| chunk_overlap=chunk_overlap_chars, |
| separators=["\n\n", "\n", ". ", " ", ""], |
| length_function=len |
| ) |
| |
| |
| chunks_text = splitter.split_text(full_text) |
| logger.info(f"Split into {len(chunks_text)} raw chunks") |
| |
| |
| chunks = [] |
| filtered_count = 0 |
| |
| for chunk_text in chunks_text: |
| |
| if should_exclude_text(chunk_text): |
| filtered_count += 1 |
| continue |
| |
| |
| chunk_start = full_text.find(chunk_text[:100]) |
| page_num = 1 |
| for boundary in page_boundaries: |
| if boundary["start"] <= chunk_start < boundary["end"]: |
| page_num = boundary["page"] |
| break |
| |
| chunks.append({ |
| "text": chunk_text.strip(), |
| "page": page_num, |
| "is_priority": is_priority_content(chunk_text) |
| }) |
| |
| logger.info(f"Filtered out {filtered_count} low-value chunks") |
| logger.info(f"Final chunk count: {len(chunks)}") |
| |
| |
| priority_count = sum(1 for c in chunks if c["is_priority"]) |
| logger.info(f"Priority chunks (symptom-relevant): {priority_count}") |
| |
| return chunks |
|
|
|
|
| |
| |
| |
|
|
| def create_faiss_index(chunks: list[dict]) -> tuple[faiss.Index, list[str], list[dict]]: |
| """ |
| Create FAISS index from document chunks. |
| |
| Args: |
| chunks: List of chunk dicts from chunk_documents |
| |
| Returns: |
| Tuple of (faiss_index, documents_list, metadata_list) |
| """ |
| |
| logger.info(f"Loading embedding model: {EMBEDDING_MODEL}") |
| model = SentenceTransformer(EMBEDDING_MODEL) |
| |
| |
| texts = [chunk["text"] for chunk in chunks] |
| |
| |
| logger.info(f"Generating embeddings for {len(texts)} chunks...") |
| embeddings = model.encode(texts, show_progress_bar=True, convert_to_numpy=True) |
| embeddings = embeddings.astype("float32") |
| |
| |
| dimension = embeddings.shape[1] |
| logger.info(f"Creating FAISS index (dimension: {dimension})") |
| index = faiss.IndexFlatL2(dimension) |
| index.add(embeddings) |
| |
| |
| metadata = [{"page": chunk["page"], "is_priority": chunk["is_priority"]} for chunk in chunks] |
| |
| return index, texts, metadata |
|
|
|
|
| def save_index(index: faiss.Index, documents: list[str], metadata: list[dict]): |
| """Save FAISS index and document store to disk.""" |
| |
| os.makedirs(os.path.dirname(FAISS_INDEX_PATH), exist_ok=True) |
| |
| |
| logger.info(f"Saving FAISS index to {FAISS_INDEX_PATH}") |
| faiss.write_index(index, FAISS_INDEX_PATH) |
| |
| |
| logger.info(f"Saving document store to {FAISS_DOCSTORE_PATH}") |
| docstore = { |
| "documents": documents, |
| "metadata": metadata |
| } |
| with open(FAISS_DOCSTORE_PATH, "wb") as f: |
| pickle.dump(docstore, f) |
| |
| logger.info("Index saved successfully!") |
|
|
|
|
| |
| |
| |
|
|
| def main(): |
| if len(sys.argv) < 2: |
| print("Usage: python ingest.py <path_to_pdf>") |
| print("Example: python ingest.py manual/bronco_2023_manual.pdf") |
| sys.exit(1) |
| |
| pdf_path = sys.argv[1] |
| |
| if not os.path.exists(pdf_path): |
| logger.error(f"PDF file not found: {pdf_path}") |
| sys.exit(1) |
| |
| logger.info("=" * 60) |
| logger.info("Insta-AutoApp Ingestion Pipeline") |
| logger.info("=" * 60) |
| |
| |
| pages_content = extract_text_from_pdf(pdf_path) |
| |
| |
| chunks = chunk_documents(pages_content) |
| |
| |
| index, documents, metadata = create_faiss_index(chunks) |
| |
| |
| save_index(index, documents, metadata) |
| |
| logger.info("=" * 60) |
| logger.info("Ingestion complete!") |
| logger.info(f"Total chunks indexed: {len(documents)}") |
| logger.info(f"Index file: {FAISS_INDEX_PATH}") |
| logger.info(f"Document store: {FAISS_DOCSTORE_PATH}") |
| logger.info("=" * 60) |
|
|
|
|
| if __name__ == "__main__": |
| main() |
|
|