Spaces:
Running
Running
| """Ingestion Manager — orchestrates the full source processing pipeline.""" | |
| import logging | |
| from ingestion_engine import pdf_extractor, text_extractor, url_scrapper, transcripter | |
| from ingestion_engine.chunker import chunk_text | |
| from ingestion_engine.embedding_generator import generate | |
| from persistence.vector_store import VectorStore | |
| logger = logging.getLogger(__name__) | |
| EXTRACTORS = { | |
| "pdf": lambda fp, _url: pdf_extractor.extract(fp), | |
| "txt": lambda fp, _url: text_extractor.extract(fp), | |
| "pptx": lambda fp, _url: _extract_pptx(fp), | |
| "url": lambda _fp, url: url_scrapper.extract(url), | |
| "youtube": lambda _fp, url: transcripter.extract(url), | |
| } | |
| def _extract_pptx(file_path: str) -> str: | |
| """Extract text from a PPTX file (lazy import to keep python-pptx optional).""" | |
| from pptx import Presentation | |
| prs = Presentation(file_path) | |
| texts = [] | |
| for slide in prs.slides: | |
| for shape in slide.shapes: | |
| if shape.has_text_frame: | |
| for paragraph in shape.text_frame.paragraphs: | |
| text = paragraph.text.strip() | |
| if text: | |
| texts.append(text) | |
| return "\n\n".join(texts) | |
| class IngestionManager: | |
| """Orchestrates: extract -> chunk -> embed -> store in Pinecone.""" | |
| def __init__(self): | |
| self.vector_store = VectorStore() | |
| def process_source(self, source, file_path: str | None, notebook_id: str) -> tuple[int, str | None]: | |
| """ | |
| Run the full ingestion pipeline for a single source. | |
| Args: | |
| source: state.Source object | |
| file_path: local file path (None for URL/YouTube sources) | |
| notebook_id: used as Pinecone namespace | |
| Returns: | |
| (chunk_count, error_message) — error_message is None on success | |
| """ | |
| try: | |
| # Step 1: Extract text | |
| extractor = EXTRACTORS.get(source.file_type) | |
| if not extractor: | |
| return 0, f"Unsupported file type: {source.file_type}" | |
| raw_text = extractor(file_path, source.source_url) | |
| if not raw_text or not raw_text.strip(): | |
| return 0, "No text could be extracted from this source." | |
| logger.info("Extracted %d characters from %s", len(raw_text), source.filename) | |
| # Step 2: Chunk | |
| chunks = chunk_text(raw_text, chunk_size=500, chunk_overlap=50) | |
| if not chunks: | |
| return 0, "Text was extracted but produced no usable chunks." | |
| logger.info("Created %d chunks from %s", len(chunks), source.filename) | |
| # Step 3: Embed | |
| chunk_texts = [c["text"] for c in chunks] | |
| vectors = generate(chunk_texts) | |
| logger.info("Generated %d embeddings for %s", len(vectors), source.filename) | |
| # Step 4: Prepare records and upsert to Pinecone | |
| records = [] | |
| for chunk, vector in zip(chunks, vectors): | |
| records.append({ | |
| "id": f"{source.id}_{chunk['chunk_index']}", | |
| "values": vector, | |
| "metadata": { | |
| "source_id": source.id, | |
| "source_filename": source.filename, | |
| "chunk_index": chunk["chunk_index"], | |
| "text": chunk["text"][:1000], | |
| }, | |
| }) | |
| self.vector_store.upsert(records, namespace=notebook_id) | |
| logger.info("Stored %d vectors for %s in namespace %s", len(records), source.filename, notebook_id) | |
| return len(chunks), None | |
| except Exception as e: | |
| logger.error("Ingestion failed for %s: %s", source.filename, e) | |
| return 0, f"Ingestion error: {str(e)}" | |