"""Background ingest worker invoked from FastAPI ``BackgroundTasks``. For each temp file: load → chunk → embed → add to Chroma, then update job progress in SQLite. Temp files are always deleted in a ``finally`` block. """ import asyncio from pathlib import Path from rag.chunker import chunk_documents from rag.embedder import create_embedding_function from rag.loader import load_documents from rag.vector_store import add_documents, get_vector_store from storage.job_store import ( complete_ingest_job, fail_ingest_job, mark_job_processing, update_job_progress, ) def _ingest_one_file_sync(temp_path: str, collection_name: str, chroma_persist_directory: str) -> tuple[list[str], int]: """Blocking ingest for one path; returns ``(chunk_vector_ids, chunk_count)``.""" documents = load_documents(temp_path) chunks = chunk_documents(documents) if not chunks: raise ValueError("No content to ingest.") embedding_function = create_embedding_function() vector_store = get_vector_store( persist_directory=chroma_persist_directory, collection_name=collection_name, embedding_function=embedding_function, ) document_ids = add_documents(vector_store, chunks) return document_ids, len(chunks) async def run_ingest_job( job_id: str, files: list[tuple[str, str]], collection_name: str, jobs_db_path: str, chroma_persist_directory: str, ) -> None: """ Process one or more temp files for a single job. ``files`` is (temp_path, display_name). """ all_doc_ids: list[str] = [] errors: list[str] = [] processed = 0 failed = 0 total = len(files) if total == 0: await fail_ingest_job(jobs_db_path, job_id, message="No files to ingest.") return try: await mark_job_processing(jobs_db_path, job_id) for temp_path, display_name in files: try: doc_ids, num_chunks = await asyncio.to_thread( _ingest_one_file_sync, temp_path, collection_name, chroma_persist_directory, ) all_doc_ids.extend(doc_ids) processed += 1 await update_job_progress( jobs_db_path, job_id, processed_files=processed, failed_files=failed, errors=errors, message=f"Ingested {display_name} ({num_chunks} chunks).", ) except Exception as exc: failed += 1 errors.append(f"{display_name}: {exc}") await update_job_progress( jobs_db_path, job_id, processed_files=processed, failed_files=failed, errors=errors, message=f"Failed on {display_name}: {exc}", ) finally: Path(temp_path).unlink(missing_ok=True) if processed == 0: await fail_ingest_job( jobs_db_path, job_id, message="All files failed ingestion.", errors=errors, ) return chunk_note = f"{len(all_doc_ids)} chunk vector(s) across {processed} file(s)." await complete_ingest_job( jobs_db_path, job_id, document_ids=all_doc_ids, message=f"Ingestion completed. {chunk_note}", ) except Exception as exc: await fail_ingest_job(jobs_db_path, job_id, message=str(exc), errors=errors + [str(exc)])