File size: 3,706 Bytes
d44b33d
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
"""Background ingest worker invoked from FastAPI ``BackgroundTasks``.

For each temp file: load → chunk → embed → add to Chroma, then update job progress in SQLite.
Temp files are always deleted in a ``finally`` block.
"""

import asyncio
from pathlib import Path

from rag.chunker import chunk_documents
from rag.embedder import create_embedding_function
from rag.loader import load_documents
from rag.vector_store import add_documents, get_vector_store
from storage.job_store import (
    complete_ingest_job,
    fail_ingest_job,
    mark_job_processing,
    update_job_progress,
)


def _ingest_one_file_sync(temp_path: str, collection_name: str, chroma_persist_directory: str) -> tuple[list[str], int]:
    """Blocking ingest for one path; returns ``(chunk_vector_ids, chunk_count)``."""
    documents = load_documents(temp_path)
    chunks = chunk_documents(documents)
    if not chunks:
        raise ValueError("No content to ingest.")
    embedding_function = create_embedding_function()
    vector_store = get_vector_store(
        persist_directory=chroma_persist_directory,
        collection_name=collection_name,
        embedding_function=embedding_function,
    )
    document_ids = add_documents(vector_store, chunks)
    return document_ids, len(chunks)


async def run_ingest_job(
    job_id: str,
    files: list[tuple[str, str]],
    collection_name: str,
    jobs_db_path: str,
    chroma_persist_directory: str,
) -> None:
    """
    Process one or more temp files for a single job. ``files`` is (temp_path, display_name).
    """
    all_doc_ids: list[str] = []
    errors: list[str] = []
    processed = 0
    failed = 0
    total = len(files)
    if total == 0:
        await fail_ingest_job(jobs_db_path, job_id, message="No files to ingest.")
        return

    try:
        await mark_job_processing(jobs_db_path, job_id)
        for temp_path, display_name in files:
            try:
                doc_ids, num_chunks = await asyncio.to_thread(
                    _ingest_one_file_sync,
                    temp_path,
                    collection_name,
                    chroma_persist_directory,
                )
                all_doc_ids.extend(doc_ids)
                processed += 1
                await update_job_progress(
                    jobs_db_path,
                    job_id,
                    processed_files=processed,
                    failed_files=failed,
                    errors=errors,
                    message=f"Ingested {display_name} ({num_chunks} chunks).",
                )
            except Exception as exc:
                failed += 1
                errors.append(f"{display_name}: {exc}")
                await update_job_progress(
                    jobs_db_path,
                    job_id,
                    processed_files=processed,
                    failed_files=failed,
                    errors=errors,
                    message=f"Failed on {display_name}: {exc}",
                )
            finally:
                Path(temp_path).unlink(missing_ok=True)

        if processed == 0:
            await fail_ingest_job(
                jobs_db_path,
                job_id,
                message="All files failed ingestion.",
                errors=errors,
            )
            return

        chunk_note = f"{len(all_doc_ids)} chunk vector(s) across {processed} file(s)."
        await complete_ingest_job(
            jobs_db_path,
            job_id,
            document_ids=all_doc_ids,
            message=f"Ingestion completed. {chunk_note}",
        )
    except Exception as exc:
        await fail_ingest_job(jobs_db_path, job_id, message=str(exc), errors=errors + [str(exc)])