NotebookLM / ingestion_engine /ingestion_manager.py
internomega-terrablue
ingestion changes
9f911b3
"""Ingestion Manager — orchestrates the full source processing pipeline."""
import logging
from ingestion_engine import pdf_extractor, text_extractor, url_scrapper, transcripter
from ingestion_engine.chunker import chunk_text
from ingestion_engine.embedding_generator import generate
from persistence.vector_store import VectorStore
logger = logging.getLogger(__name__)
EXTRACTORS = {
"pdf": lambda fp, _url: pdf_extractor.extract(fp),
"txt": lambda fp, _url: text_extractor.extract(fp),
"pptx": lambda fp, _url: _extract_pptx(fp),
"url": lambda _fp, url: url_scrapper.extract(url),
"youtube": lambda _fp, url: transcripter.extract(url),
}
def _extract_pptx(file_path: str) -> str:
"""Extract text from a PPTX file (lazy import to keep python-pptx optional)."""
from pptx import Presentation
prs = Presentation(file_path)
texts = []
for slide in prs.slides:
for shape in slide.shapes:
if shape.has_text_frame:
for paragraph in shape.text_frame.paragraphs:
text = paragraph.text.strip()
if text:
texts.append(text)
return "\n\n".join(texts)
class IngestionManager:
"""Orchestrates: extract -> chunk -> embed -> store in Pinecone."""
def __init__(self):
self.vector_store = VectorStore()
def process_source(self, source, file_path: str | None, notebook_id: str) -> tuple[int, str | None]:
"""
Run the full ingestion pipeline for a single source.
Args:
source: state.Source object
file_path: local file path (None for URL/YouTube sources)
notebook_id: used as Pinecone namespace
Returns:
(chunk_count, error_message) — error_message is None on success
"""
try:
# Step 1: Extract text
extractor = EXTRACTORS.get(source.file_type)
if not extractor:
return 0, f"Unsupported file type: {source.file_type}"
raw_text = extractor(file_path, source.source_url)
if not raw_text or not raw_text.strip():
return 0, "No text could be extracted from this source."
logger.info("Extracted %d characters from %s", len(raw_text), source.filename)
# Step 2: Chunk
chunks = chunk_text(raw_text, chunk_size=500, chunk_overlap=50)
if not chunks:
return 0, "Text was extracted but produced no usable chunks."
logger.info("Created %d chunks from %s", len(chunks), source.filename)
# Step 3: Embed
chunk_texts = [c["text"] for c in chunks]
vectors = generate(chunk_texts)
logger.info("Generated %d embeddings for %s", len(vectors), source.filename)
# Step 4: Prepare records and upsert to Pinecone
records = []
for chunk, vector in zip(chunks, vectors):
records.append({
"id": f"{source.id}_{chunk['chunk_index']}",
"values": vector,
"metadata": {
"source_id": source.id,
"source_filename": source.filename,
"chunk_index": chunk["chunk_index"],
"text": chunk["text"][:1000],
},
})
self.vector_store.upsert(records, namespace=notebook_id)
logger.info("Stored %d vectors for %s in namespace %s", len(records), source.filename, notebook_id)
return len(chunks), None
except Exception as e:
logger.error("Ingestion failed for %s: %s", source.filename, e)
return 0, f"Ingestion error: {str(e)}"