NeilDriscoll's picture
Upload 13 files
3c3e122 verified
"""
Insta-AutoApp Ingestion Pipeline
Preprocesses the OEM manual PDF into a FAISS vector index.
Usage:
python ingest.py <path_to_pdf>
Example:
python ingest.py manual/bronco_2023_manual.pdf
"""
import os
import sys
import re
import pickle
import logging
from pathlib import Path
import fitz # PyMuPDF
import faiss
import numpy as np
from sentence_transformers import SentenceTransformer
from langchain_text_splitters import RecursiveCharacterTextSplitter
from config import (
FAISS_INDEX_PATH,
FAISS_DOCSTORE_PATH,
EMBEDDING_MODEL,
CHUNK_SIZE,
CHUNK_OVERLAP
)
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")
logger = logging.getLogger(__name__)
# =============================================================================
# Content Filtering
# =============================================================================
# Pages/sections to exclude (low-value content)
EXCLUDE_PATTERNS = [
r"table of contents",
r"^\s*index\s*$",
r"alphabetical index",
r"copyright.*ford",
r"all rights reserved",
r"^\s*page\s+\d+\s*$",
r"^\s*\d+\s*$", # Just page numbers
r"www\.ford\.com",
r"owner\.ford\.com",
]
# Sections to prioritize (high-value content)
PRIORITY_KEYWORDS = [
"warning",
"indicator",
"light",
"lamp",
"symptom",
"troubleshoot",
"problem",
"issue",
"check engine",
"drivetrain",
"4x4",
"four-wheel",
"trail",
"goat mode",
"terrain",
"brake",
"steering",
"overheat",
"temperature",
"oil pressure",
"battery",
"transmission",
"traction control",
"stability control",
"abs",
"tire pressure",
"tpms",
]
def should_exclude_text(text: str) -> bool:
"""Check if text chunk should be excluded (low-value content)."""
text_lower = text.lower().strip()
# Exclude very short chunks
if len(text_lower) < 50:
return True
# Check exclusion patterns
for pattern in EXCLUDE_PATTERNS:
if re.search(pattern, text_lower, re.IGNORECASE):
return True
return False
def is_priority_content(text: str) -> bool:
"""Check if text contains priority keywords (symptom-relevant content)."""
text_lower = text.lower()
return any(keyword in text_lower for keyword in PRIORITY_KEYWORDS)
# =============================================================================
# PDF Extraction
# =============================================================================
def extract_text_from_pdf(pdf_path: str) -> list[dict]:
"""
Extract text from PDF using PyMuPDF.
Args:
pdf_path: Path to the PDF file
Returns:
List of dicts with 'text' and 'page' keys
"""
logger.info(f"Opening PDF: {pdf_path}")
doc = fitz.open(pdf_path)
pages_content = []
for page_num in range(len(doc)):
page = doc[page_num]
text = page.get_text("text")
if text.strip():
pages_content.append({
"text": text,
"page": page_num + 1 # 1-indexed
})
doc.close()
logger.info(f"Extracted text from {len(pages_content)} pages")
return pages_content
# =============================================================================
# Text Chunking
# =============================================================================
def chunk_documents(pages_content: list[dict]) -> list[dict]:
"""
Split extracted text into chunks using LangChain.
Args:
pages_content: List of page dicts from extract_text_from_pdf
Returns:
List of chunk dicts with 'text', 'page', and 'is_priority' keys
"""
# Combine all text with page markers
full_text = ""
page_boundaries = []
for page_data in pages_content:
start_idx = len(full_text)
full_text += page_data["text"] + "\n\n"
page_boundaries.append({
"start": start_idx,
"end": len(full_text),
"page": page_data["page"]
})
# Create text splitter
# Approximate tokens to characters (1 token ≈ 4 characters)
chunk_size_chars = CHUNK_SIZE * 4
chunk_overlap_chars = CHUNK_OVERLAP * 4
splitter = RecursiveCharacterTextSplitter(
chunk_size=chunk_size_chars,
chunk_overlap=chunk_overlap_chars,
separators=["\n\n", "\n", ". ", " ", ""],
length_function=len
)
# Split text
chunks_text = splitter.split_text(full_text)
logger.info(f"Split into {len(chunks_text)} raw chunks")
# Map chunks back to pages and filter
chunks = []
filtered_count = 0
for chunk_text in chunks_text:
# Skip excluded content
if should_exclude_text(chunk_text):
filtered_count += 1
continue
# Find which page this chunk came from (approximate)
chunk_start = full_text.find(chunk_text[:100]) # Find by first 100 chars
page_num = 1
for boundary in page_boundaries:
if boundary["start"] <= chunk_start < boundary["end"]:
page_num = boundary["page"]
break
chunks.append({
"text": chunk_text.strip(),
"page": page_num,
"is_priority": is_priority_content(chunk_text)
})
logger.info(f"Filtered out {filtered_count} low-value chunks")
logger.info(f"Final chunk count: {len(chunks)}")
# Log priority content stats
priority_count = sum(1 for c in chunks if c["is_priority"])
logger.info(f"Priority chunks (symptom-relevant): {priority_count}")
return chunks
# =============================================================================
# Embedding and Index Creation
# =============================================================================
def create_faiss_index(chunks: list[dict]) -> tuple[faiss.Index, list[str], list[dict]]:
"""
Create FAISS index from document chunks.
Args:
chunks: List of chunk dicts from chunk_documents
Returns:
Tuple of (faiss_index, documents_list, metadata_list)
"""
# Load embedding model
logger.info(f"Loading embedding model: {EMBEDDING_MODEL}")
model = SentenceTransformer(EMBEDDING_MODEL)
# Extract texts
texts = [chunk["text"] for chunk in chunks]
# Generate embeddings
logger.info(f"Generating embeddings for {len(texts)} chunks...")
embeddings = model.encode(texts, show_progress_bar=True, convert_to_numpy=True)
embeddings = embeddings.astype("float32")
# Create FAISS index
dimension = embeddings.shape[1]
logger.info(f"Creating FAISS index (dimension: {dimension})")
index = faiss.IndexFlatL2(dimension)
index.add(embeddings)
# Prepare metadata
metadata = [{"page": chunk["page"], "is_priority": chunk["is_priority"]} for chunk in chunks]
return index, texts, metadata
def save_index(index: faiss.Index, documents: list[str], metadata: list[dict]):
"""Save FAISS index and document store to disk."""
# Ensure data directory exists
os.makedirs(os.path.dirname(FAISS_INDEX_PATH), exist_ok=True)
# Save FAISS index
logger.info(f"Saving FAISS index to {FAISS_INDEX_PATH}")
faiss.write_index(index, FAISS_INDEX_PATH)
# Save document store
logger.info(f"Saving document store to {FAISS_DOCSTORE_PATH}")
docstore = {
"documents": documents,
"metadata": metadata
}
with open(FAISS_DOCSTORE_PATH, "wb") as f:
pickle.dump(docstore, f)
logger.info("Index saved successfully!")
# =============================================================================
# Main
# =============================================================================
def main():
if len(sys.argv) < 2:
print("Usage: python ingest.py <path_to_pdf>")
print("Example: python ingest.py manual/bronco_2023_manual.pdf")
sys.exit(1)
pdf_path = sys.argv[1]
if not os.path.exists(pdf_path):
logger.error(f"PDF file not found: {pdf_path}")
sys.exit(1)
logger.info("=" * 60)
logger.info("Insta-AutoApp Ingestion Pipeline")
logger.info("=" * 60)
# Step 1: Extract text from PDF
pages_content = extract_text_from_pdf(pdf_path)
# Step 2: Chunk documents
chunks = chunk_documents(pages_content)
# Step 3: Create FAISS index
index, documents, metadata = create_faiss_index(chunks)
# Step 4: Save to disk
save_index(index, documents, metadata)
logger.info("=" * 60)
logger.info("Ingestion complete!")
logger.info(f"Total chunks indexed: {len(documents)}")
logger.info(f"Index file: {FAISS_INDEX_PATH}")
logger.info(f"Document store: {FAISS_DOCSTORE_PATH}")
logger.info("=" * 60)
if __name__ == "__main__":
main()