File size: 3,706 Bytes
d44b33d | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 | """Background ingest worker invoked from FastAPI ``BackgroundTasks``.
For each temp file: load → chunk → embed → add to Chroma, then update job progress in SQLite.
Temp files are always deleted in a ``finally`` block.
"""
import asyncio
from pathlib import Path
from rag.chunker import chunk_documents
from rag.embedder import create_embedding_function
from rag.loader import load_documents
from rag.vector_store import add_documents, get_vector_store
from storage.job_store import (
complete_ingest_job,
fail_ingest_job,
mark_job_processing,
update_job_progress,
)
def _ingest_one_file_sync(temp_path: str, collection_name: str, chroma_persist_directory: str) -> tuple[list[str], int]:
"""Blocking ingest for one path; returns ``(chunk_vector_ids, chunk_count)``."""
documents = load_documents(temp_path)
chunks = chunk_documents(documents)
if not chunks:
raise ValueError("No content to ingest.")
embedding_function = create_embedding_function()
vector_store = get_vector_store(
persist_directory=chroma_persist_directory,
collection_name=collection_name,
embedding_function=embedding_function,
)
document_ids = add_documents(vector_store, chunks)
return document_ids, len(chunks)
async def run_ingest_job(
job_id: str,
files: list[tuple[str, str]],
collection_name: str,
jobs_db_path: str,
chroma_persist_directory: str,
) -> None:
"""
Process one or more temp files for a single job. ``files`` is (temp_path, display_name).
"""
all_doc_ids: list[str] = []
errors: list[str] = []
processed = 0
failed = 0
total = len(files)
if total == 0:
await fail_ingest_job(jobs_db_path, job_id, message="No files to ingest.")
return
try:
await mark_job_processing(jobs_db_path, job_id)
for temp_path, display_name in files:
try:
doc_ids, num_chunks = await asyncio.to_thread(
_ingest_one_file_sync,
temp_path,
collection_name,
chroma_persist_directory,
)
all_doc_ids.extend(doc_ids)
processed += 1
await update_job_progress(
jobs_db_path,
job_id,
processed_files=processed,
failed_files=failed,
errors=errors,
message=f"Ingested {display_name} ({num_chunks} chunks).",
)
except Exception as exc:
failed += 1
errors.append(f"{display_name}: {exc}")
await update_job_progress(
jobs_db_path,
job_id,
processed_files=processed,
failed_files=failed,
errors=errors,
message=f"Failed on {display_name}: {exc}",
)
finally:
Path(temp_path).unlink(missing_ok=True)
if processed == 0:
await fail_ingest_job(
jobs_db_path,
job_id,
message="All files failed ingestion.",
errors=errors,
)
return
chunk_note = f"{len(all_doc_ids)} chunk vector(s) across {processed} file(s)."
await complete_ingest_job(
jobs_db_path,
job_id,
document_ids=all_doc_ids,
message=f"Ingestion completed. {chunk_note}",
)
except Exception as exc:
await fail_ingest_job(jobs_db_path, job_id, message=str(exc), errors=errors + [str(exc)])
|