Document-Audit-RAG / workers /ingest_worker.py
Mayank Chugh
Deploy DocuAudit AI to Hugging Face Space (no binaries)
d44b33d
"""Background ingest worker invoked from FastAPI ``BackgroundTasks``.
For each temp file: load β†’ chunk β†’ embed β†’ add to Chroma, then update job progress in SQLite.
Temp files are always deleted in a ``finally`` block.
"""
import asyncio
from pathlib import Path
from rag.chunker import chunk_documents
from rag.embedder import create_embedding_function
from rag.loader import load_documents
from rag.vector_store import add_documents, get_vector_store
from storage.job_store import (
complete_ingest_job,
fail_ingest_job,
mark_job_processing,
update_job_progress,
)
def _ingest_one_file_sync(temp_path: str, collection_name: str, chroma_persist_directory: str) -> tuple[list[str], int]:
"""Blocking ingest for one path; returns ``(chunk_vector_ids, chunk_count)``."""
documents = load_documents(temp_path)
chunks = chunk_documents(documents)
if not chunks:
raise ValueError("No content to ingest.")
embedding_function = create_embedding_function()
vector_store = get_vector_store(
persist_directory=chroma_persist_directory,
collection_name=collection_name,
embedding_function=embedding_function,
)
document_ids = add_documents(vector_store, chunks)
return document_ids, len(chunks)
async def run_ingest_job(
job_id: str,
files: list[tuple[str, str]],
collection_name: str,
jobs_db_path: str,
chroma_persist_directory: str,
) -> None:
"""
Process one or more temp files for a single job. ``files`` is (temp_path, display_name).
"""
all_doc_ids: list[str] = []
errors: list[str] = []
processed = 0
failed = 0
total = len(files)
if total == 0:
await fail_ingest_job(jobs_db_path, job_id, message="No files to ingest.")
return
try:
await mark_job_processing(jobs_db_path, job_id)
for temp_path, display_name in files:
try:
doc_ids, num_chunks = await asyncio.to_thread(
_ingest_one_file_sync,
temp_path,
collection_name,
chroma_persist_directory,
)
all_doc_ids.extend(doc_ids)
processed += 1
await update_job_progress(
jobs_db_path,
job_id,
processed_files=processed,
failed_files=failed,
errors=errors,
message=f"Ingested {display_name} ({num_chunks} chunks).",
)
except Exception as exc:
failed += 1
errors.append(f"{display_name}: {exc}")
await update_job_progress(
jobs_db_path,
job_id,
processed_files=processed,
failed_files=failed,
errors=errors,
message=f"Failed on {display_name}: {exc}",
)
finally:
Path(temp_path).unlink(missing_ok=True)
if processed == 0:
await fail_ingest_job(
jobs_db_path,
job_id,
message="All files failed ingestion.",
errors=errors,
)
return
chunk_note = f"{len(all_doc_ids)} chunk vector(s) across {processed} file(s)."
await complete_ingest_job(
jobs_db_path,
job_id,
document_ids=all_doc_ids,
message=f"Ingestion completed. {chunk_note}",
)
except Exception as exc:
await fail_ingest_job(jobs_db_path, job_id, message=str(exc), errors=errors + [str(exc)])