| """Background ingest worker invoked from FastAPI ``BackgroundTasks``. |
| |
| For each temp file: load β chunk β embed β add to Chroma, then update job progress in SQLite. |
| Temp files are always deleted in a ``finally`` block. |
| """ |
|
|
| import asyncio |
| from pathlib import Path |
|
|
| from rag.chunker import chunk_documents |
| from rag.embedder import create_embedding_function |
| from rag.loader import load_documents |
| from rag.vector_store import add_documents, get_vector_store |
| from storage.job_store import ( |
| complete_ingest_job, |
| fail_ingest_job, |
| mark_job_processing, |
| update_job_progress, |
| ) |
|
|
|
|
| def _ingest_one_file_sync(temp_path: str, collection_name: str, chroma_persist_directory: str) -> tuple[list[str], int]: |
| """Blocking ingest for one path; returns ``(chunk_vector_ids, chunk_count)``.""" |
| documents = load_documents(temp_path) |
| chunks = chunk_documents(documents) |
| if not chunks: |
| raise ValueError("No content to ingest.") |
| embedding_function = create_embedding_function() |
| vector_store = get_vector_store( |
| persist_directory=chroma_persist_directory, |
| collection_name=collection_name, |
| embedding_function=embedding_function, |
| ) |
| document_ids = add_documents(vector_store, chunks) |
| return document_ids, len(chunks) |
|
|
|
|
| async def run_ingest_job( |
| job_id: str, |
| files: list[tuple[str, str]], |
| collection_name: str, |
| jobs_db_path: str, |
| chroma_persist_directory: str, |
| ) -> None: |
| """ |
| Process one or more temp files for a single job. ``files`` is (temp_path, display_name). |
| """ |
| all_doc_ids: list[str] = [] |
| errors: list[str] = [] |
| processed = 0 |
| failed = 0 |
| total = len(files) |
| if total == 0: |
| await fail_ingest_job(jobs_db_path, job_id, message="No files to ingest.") |
| return |
|
|
| try: |
| await mark_job_processing(jobs_db_path, job_id) |
| for temp_path, display_name in files: |
| try: |
| doc_ids, num_chunks = await asyncio.to_thread( |
| _ingest_one_file_sync, |
| temp_path, |
| collection_name, |
| chroma_persist_directory, |
| ) |
| all_doc_ids.extend(doc_ids) |
| processed += 1 |
| await update_job_progress( |
| jobs_db_path, |
| job_id, |
| processed_files=processed, |
| failed_files=failed, |
| errors=errors, |
| message=f"Ingested {display_name} ({num_chunks} chunks).", |
| ) |
| except Exception as exc: |
| failed += 1 |
| errors.append(f"{display_name}: {exc}") |
| await update_job_progress( |
| jobs_db_path, |
| job_id, |
| processed_files=processed, |
| failed_files=failed, |
| errors=errors, |
| message=f"Failed on {display_name}: {exc}", |
| ) |
| finally: |
| Path(temp_path).unlink(missing_ok=True) |
|
|
| if processed == 0: |
| await fail_ingest_job( |
| jobs_db_path, |
| job_id, |
| message="All files failed ingestion.", |
| errors=errors, |
| ) |
| return |
|
|
| chunk_note = f"{len(all_doc_ids)} chunk vector(s) across {processed} file(s)." |
| await complete_ingest_job( |
| jobs_db_path, |
| job_id, |
| document_ids=all_doc_ids, |
| message=f"Ingestion completed. {chunk_note}", |
| ) |
| except Exception as exc: |
| await fail_ingest_job(jobs_db_path, job_id, message=str(exc), errors=errors + [str(exc)]) |
|
|