Mayank Chugh commited on
Commit
c841e94
·
1 Parent(s): 97a5277

Implement Milestone 7 by introducing background ingestion jobs. Update environment configuration to include job database path, enhance job tracking with new endpoints, and refactor ingestion logic to support asynchronous processing. Modify response models to include job details and ensure proper initialization of the jobs database on startup.

Browse files
.env.example CHANGED
@@ -5,6 +5,7 @@ EMBEDDING_MODEL_NAME=nomic-embed-text
5
  OLLAMA_BASE_URL=http://localhost:11434
6
  OPENAI_API_KEY=
7
  CHROMA_PERSIST_DIRECTORY=./data/chroma
 
8
  CHUNK_SIZE=1000
9
  CHUNK_OVERLAP=150
10
  RETRIEVAL_K=4
 
5
  OLLAMA_BASE_URL=http://localhost:11434
6
  OPENAI_API_KEY=
7
  CHROMA_PERSIST_DIRECTORY=./data/chroma
8
+ JOBS_DB_PATH=./data/jobs.db
9
  CHUNK_SIZE=1000
10
  CHUNK_OVERLAP=150
11
  RETRIEVAL_K=4
api/config.py CHANGED
@@ -5,7 +5,12 @@ from pydantic_settings import BaseSettings, SettingsConfigDict
5
 
6
 
7
  class Settings(BaseSettings):
8
- model_config = SettingsConfigDict(env_file=".env", env_file_encoding="utf-8", extra="ignore")
 
 
 
 
 
9
 
10
  app_name: str = Field(default="doc-audi-ai", description="The name of the application")
11
  app_version: str = Field(default="0.1.0", description="The version of the application")
@@ -36,12 +41,12 @@ class Settings(BaseSettings):
36
  top_k_results: int = Field(default=4, ge=1, le=20, description="Number of chunks to retrieve")
37
 
38
  audit_db_path: str = "./audit.db"
39
-
 
40
  max_file_size_mb: int = Field(default=50, ge=1, le=200, description="Max upload file size")
41
  max_documents_per_batch: int = Field(default=100, ge=1, le=1000, description="Max documents per batch")
42
 
43
- model_config = SettingsConfigDict(env_file=".env", case_sensitive=False)
44
-
45
  @lru_cache
46
  def get_settings() -> Settings:
47
  return Settings()
 
5
 
6
 
7
  class Settings(BaseSettings):
8
+ model_config = SettingsConfigDict(
9
+ env_file=".env",
10
+ env_file_encoding="utf-8",
11
+ extra="ignore",
12
+ case_sensitive=False,
13
+ )
14
 
15
  app_name: str = Field(default="doc-audi-ai", description="The name of the application")
16
  app_version: str = Field(default="0.1.0", description="The version of the application")
 
41
  top_k_results: int = Field(default=4, ge=1, le=20, description="Number of chunks to retrieve")
42
 
43
  audit_db_path: str = "./audit.db"
44
+ jobs_db_path: str = Field(default="./data/jobs.db", description="SQLite path for ingest job tracking")
45
+
46
  max_file_size_mb: int = Field(default=50, ge=1, le=200, description="Max upload file size")
47
  max_documents_per_batch: int = Field(default=100, ge=1, le=1000, description="Max documents per batch")
48
 
49
+
 
50
  @lru_cache
51
  def get_settings() -> Settings:
52
  return Settings()
api/main.py CHANGED
@@ -2,6 +2,7 @@ from fastapi import FastAPI
2
 
3
  from api.config import get_settings
4
  from storage.audit_store import init_audit_db
 
5
  from .routes import audit, ingest, jobs, query
6
 
7
  app = FastAPI()
@@ -16,6 +17,7 @@ app.include_router(query.router)
16
  async def startup() -> None:
17
  settings = get_settings()
18
  await init_audit_db(settings.audit_db_path)
 
19
 
20
  @app.get("/health", tags=["Health"])
21
  def health() -> dict[str, str]:
 
2
 
3
  from api.config import get_settings
4
  from storage.audit_store import init_audit_db
5
+ from storage.job_store import init_jobs_db
6
  from .routes import audit, ingest, jobs, query
7
 
8
  app = FastAPI()
 
17
  async def startup() -> None:
18
  settings = get_settings()
19
  await init_audit_db(settings.audit_db_path)
20
+ await init_jobs_db(settings.jobs_db_path)
21
 
22
  @app.get("/health", tags=["Health"])
23
  def health() -> dict[str, str]:
api/routes/ingest.py CHANGED
@@ -2,14 +2,12 @@ from pathlib import Path
2
  from tempfile import NamedTemporaryFile
3
  from typing import Annotated
4
 
5
- from fastapi import APIRouter, File, Form, HTTPException, UploadFile, status
6
 
7
  from api.config import get_settings
8
  from models.responses import IngestUploadResponse
9
- from rag.chunker import chunk_documents
10
- from rag.embedder import create_embedding_function
11
- from rag.loader import load_documents
12
- from rag.vector_store import add_documents, get_vector_store
13
 
14
  router = APIRouter(prefix="/ingest", tags=["ingest"])
15
 
@@ -42,12 +40,14 @@ def _validate_file(file: UploadFile, max_bytes: int) -> str:
42
 
43
  @router.post("/upload", response_model=IngestUploadResponse)
44
  async def upload_endpoint(
 
45
  file: UploadFile = File(..., description="PDF/TXT/MD document to ingest"),
46
  collection_name: Annotated[str, Form(min_length=1, max_length=256)] = "default",
47
  ) -> IngestUploadResponse:
48
  settings = get_settings()
49
  max_bytes = settings.max_file_size_mb * 1024 * 1024
50
  suffix = _validate_file(file, max_bytes)
 
51
 
52
  temp_path = ""
53
  try:
@@ -56,28 +56,34 @@ async def upload_endpoint(
56
  temp_path = tmp.name
57
  tmp.write(file_bytes)
58
 
59
- documents = load_documents(temp_path)
60
- chunks = chunk_documents(documents)
61
- if not chunks:
62
- raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="No content to ingest.")
63
-
64
- embedding_function = create_embedding_function()
65
- vector_store = get_vector_store(
66
- persist_directory=settings.chroma_persist_directory,
67
  collection_name=collection_name,
68
- embedding_function=embedding_function,
 
 
 
 
 
 
 
 
 
69
  )
70
- document_ids = add_documents(vector_store, chunks)
71
  return IngestUploadResponse(
72
- status="success",
73
- message=f"Ingested {len(chunks)} chunks into '{collection_name}'.",
74
- document_ids=document_ids,
 
75
  )
76
  except HTTPException:
 
 
77
  raise
78
  except Exception as exc:
 
 
79
  raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(exc)) from exc
80
  finally:
81
  await file.close()
82
- if temp_path:
83
- Path(temp_path).unlink(missing_ok=True)
 
2
  from tempfile import NamedTemporaryFile
3
  from typing import Annotated
4
 
5
+ from fastapi import APIRouter, BackgroundTasks, File, Form, HTTPException, UploadFile, status
6
 
7
  from api.config import get_settings
8
  from models.responses import IngestUploadResponse
9
+ from storage.job_store import create_ingest_job
10
+ from workers.ingest_worker import run_ingest_job
 
 
11
 
12
  router = APIRouter(prefix="/ingest", tags=["ingest"])
13
 
 
40
 
41
  @router.post("/upload", response_model=IngestUploadResponse)
42
  async def upload_endpoint(
43
+ background_tasks: BackgroundTasks,
44
  file: UploadFile = File(..., description="PDF/TXT/MD document to ingest"),
45
  collection_name: Annotated[str, Form(min_length=1, max_length=256)] = "default",
46
  ) -> IngestUploadResponse:
47
  settings = get_settings()
48
  max_bytes = settings.max_file_size_mb * 1024 * 1024
49
  suffix = _validate_file(file, max_bytes)
50
+ display_name = (file.filename or "upload").strip()
51
 
52
  temp_path = ""
53
  try:
 
56
  temp_path = tmp.name
57
  tmp.write(file_bytes)
58
 
59
+ job_id = await create_ingest_job(
60
+ settings.jobs_db_path,
 
 
 
 
 
 
61
  collection_name=collection_name,
62
+ filename=display_name,
63
+ )
64
+
65
+ background_tasks.add_task(
66
+ run_ingest_job,
67
+ job_id,
68
+ temp_path,
69
+ collection_name,
70
+ settings.jobs_db_path,
71
+ settings.chroma_persist_directory,
72
  )
73
+
74
  return IngestUploadResponse(
75
+ status="queued",
76
+ message=f"Ingestion job accepted. Poll GET /jobs/{job_id} for status.",
77
+ job_id=job_id,
78
+ document_ids=[],
79
  )
80
  except HTTPException:
81
+ if temp_path:
82
+ Path(temp_path).unlink(missing_ok=True)
83
  raise
84
  except Exception as exc:
85
+ if temp_path:
86
+ Path(temp_path).unlink(missing_ok=True)
87
  raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(exc)) from exc
88
  finally:
89
  await file.close()
 
 
api/routes/jobs.py CHANGED
@@ -1,9 +1,11 @@
1
  from typing import Annotated
2
 
3
- from fastapi import APIRouter, Depends, Query
4
 
 
5
  from models.requests import JobsListParams
6
- from models.responses import JobListResponse
 
7
 
8
 
9
  def _jobs_list_params(
@@ -12,14 +14,36 @@ def _jobs_list_params(
12
  ) -> JobsListParams:
13
  return JobsListParams(limit=limit, offset=offset)
14
 
 
15
  router = APIRouter(tags=["jobs"])
16
 
17
- @router.get("/jobs", response_model=JobListResponse )
18
- def jobs_placeholder(
 
19
  params: Annotated[JobsListParams, Depends(_jobs_list_params)],
20
  ) -> JobListResponse:
 
 
 
 
 
 
 
21
  return JobListResponse(
22
- status="placeholder",
23
- message="Jobs not implemented yet.",
24
- jobs=[],
25
- )
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
  from typing import Annotated
2
 
3
+ from fastapi import APIRouter, Depends, HTTPException, Query, status
4
 
5
+ from api.config import get_settings
6
  from models.requests import JobsListParams
7
+ from models.responses import IngestJobDetailResponse, JobListResponse, JobSummary
8
+ from storage.job_store import get_ingest_job, list_ingest_jobs
9
 
10
 
11
  def _jobs_list_params(
 
14
  ) -> JobsListParams:
15
  return JobsListParams(limit=limit, offset=offset)
16
 
17
+
18
  router = APIRouter(tags=["jobs"])
19
 
20
+
21
+ @router.get("/jobs", response_model=JobListResponse)
22
+ async def list_jobs(
23
  params: Annotated[JobsListParams, Depends(_jobs_list_params)],
24
  ) -> JobListResponse:
25
+ settings = get_settings()
26
+ rows = await list_ingest_jobs(
27
+ settings.jobs_db_path,
28
+ limit=params.limit,
29
+ offset=params.offset,
30
+ )
31
+ jobs = [JobSummary.model_validate(row) for row in rows]
32
  return JobListResponse(
33
+ status="success",
34
+ message=f"Returned {len(jobs)} job(s).",
35
+ jobs=jobs,
36
+ )
37
+
38
+
39
+ @router.get("/jobs/{job_id}", response_model=IngestJobDetailResponse)
40
+ async def get_job(job_id: str) -> IngestJobDetailResponse:
41
+ settings = get_settings()
42
+ job = await get_ingest_job(settings.jobs_db_path, job_id)
43
+ if job is None:
44
+ raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Job not found.")
45
+ return IngestJobDetailResponse(
46
+ status="success",
47
+ message="Job found.",
48
+ job=job,
49
+ )
models/requests.py CHANGED
@@ -15,6 +15,7 @@ class IngestUploadRequest(BaseModel):
15
  class JobsListParams(BaseModel):
16
  model_config = ConfigDict(extra="forbid")
17
  limit: int = Field(default=10, ge=1, le=100, description="The limit of the jobs to list")
 
18
 
19
  class AuditListParams(BaseModel):
20
  model_config = ConfigDict(extra="forbid")
 
15
  class JobsListParams(BaseModel):
16
  model_config = ConfigDict(extra="forbid")
17
  limit: int = Field(default=10, ge=1, le=100, description="The limit of the jobs to list")
18
+ offset: int = Field(default=0, ge=0, description="The offset of the jobs to list")
19
 
20
  class AuditListParams(BaseModel):
21
  model_config = ConfigDict(extra="forbid")
models/responses.py CHANGED
@@ -22,17 +22,38 @@ class QueryResponse(BaseModel):
22
  class IngestUploadResponse(BaseModel):
23
  status: str
24
  message: str
 
25
  document_ids: list[str] = Field(default_factory=list)
26
 
27
  class JobSummary(BaseModel):
28
  job_id: str
29
  status: str
 
 
 
30
 
31
  class JobListResponse(BaseModel):
32
  status: str
33
  message: str
34
  jobs: list[JobSummary] = Field(default_factory=list)
35
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
36
  class AuditEvent(BaseModel):
37
  event_id: str
38
  action: str
 
22
  class IngestUploadResponse(BaseModel):
23
  status: str
24
  message: str
25
+ job_id: str
26
  document_ids: list[str] = Field(default_factory=list)
27
 
28
  class JobSummary(BaseModel):
29
  job_id: str
30
  status: str
31
+ collection_name: str | None = None
32
+ filename: str | None = None
33
+ created_at: str | None = None
34
 
35
  class JobListResponse(BaseModel):
36
  status: str
37
  message: str
38
  jobs: list[JobSummary] = Field(default_factory=list)
39
 
40
+
41
+ class IngestJobDetail(BaseModel):
42
+ job_id: str
43
+ status: str
44
+ collection_name: str
45
+ filename: str
46
+ message: str
47
+ document_ids: list[str] = Field(default_factory=list)
48
+ created_at: str
49
+ updated_at: str
50
+
51
+
52
+ class IngestJobDetailResponse(BaseModel):
53
+ status: str
54
+ message: str
55
+ job: IngestJobDetail | None = None
56
+
57
  class AuditEvent(BaseModel):
58
  event_id: str
59
  action: str
storage/job_store.py ADDED
@@ -0,0 +1,120 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import json
2
+ from pathlib import Path
3
+ from typing import Any
4
+ from uuid import uuid4
5
+
6
+ import aiosqlite
7
+
8
+ from models.responses import IngestJobDetail
9
+
10
+
11
+ async def init_jobs_db(db_path: str) -> None:
12
+ db_file = Path(db_path)
13
+ db_file.parent.mkdir(parents=True, exist_ok=True)
14
+ async with aiosqlite.connect(db_file.as_posix()) as conn:
15
+ await conn.execute(
16
+ """
17
+ CREATE TABLE IF NOT EXISTS ingest_jobs (
18
+ job_id TEXT PRIMARY KEY,
19
+ status TEXT NOT NULL,
20
+ collection_name TEXT NOT NULL,
21
+ filename TEXT NOT NULL,
22
+ message TEXT NOT NULL DEFAULT '',
23
+ document_ids_json TEXT NOT NULL DEFAULT '[]',
24
+ created_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP,
25
+ updated_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP
26
+ )
27
+ """
28
+ )
29
+ await conn.commit()
30
+
31
+
32
+ async def create_ingest_job(
33
+ db_path: str,
34
+ *,
35
+ collection_name: str,
36
+ filename: str,
37
+ ) -> str:
38
+ job_id = str(uuid4())
39
+ await init_jobs_db(db_path)
40
+ async with aiosqlite.connect(db_path) as conn:
41
+ await conn.execute(
42
+ """
43
+ INSERT INTO ingest_jobs (
44
+ job_id, status, collection_name, filename, message, document_ids_json
45
+ ) VALUES (?, 'queued', ?, ?, '', '[]')
46
+ """,
47
+ (job_id, collection_name, filename),
48
+ )
49
+ await conn.commit()
50
+ return job_id
51
+
52
+
53
+ async def update_ingest_job(
54
+ db_path: str,
55
+ job_id: str,
56
+ *,
57
+ status: str,
58
+ message: str | None = None,
59
+ document_ids: list[str] | None = None,
60
+ ) -> None:
61
+ await init_jobs_db(db_path)
62
+ async with aiosqlite.connect(db_path) as conn:
63
+ if document_ids is not None:
64
+ await conn.execute(
65
+ """
66
+ UPDATE ingest_jobs
67
+ SET status = ?, message = COALESCE(?, message), document_ids_json = ?,
68
+ updated_at = CURRENT_TIMESTAMP
69
+ WHERE job_id = ?
70
+ """,
71
+ (status, message, json.dumps(document_ids), job_id),
72
+ )
73
+ else:
74
+ await conn.execute(
75
+ """
76
+ UPDATE ingest_jobs
77
+ SET status = ?, message = COALESCE(?, message),
78
+ updated_at = CURRENT_TIMESTAMP
79
+ WHERE job_id = ?
80
+ """,
81
+ (status, message, job_id),
82
+ )
83
+ await conn.commit()
84
+
85
+
86
+ async def get_ingest_job(db_path: str, job_id: str) -> IngestJobDetail | None:
87
+ await init_jobs_db(db_path)
88
+ async with aiosqlite.connect(db_path) as conn:
89
+ conn.row_factory = aiosqlite.Row
90
+ cursor = await conn.execute(
91
+ """
92
+ SELECT job_id, status, collection_name, filename, message, document_ids_json, created_at, updated_at
93
+ FROM ingest_jobs
94
+ WHERE job_id = ?
95
+ """,
96
+ (job_id,),
97
+ )
98
+ row = await cursor.fetchone()
99
+ if row is None:
100
+ return None
101
+ payload = dict(row)
102
+ payload["document_ids"] = json.loads(payload.pop("document_ids_json") or "[]")
103
+ return IngestJobDetail.model_validate(payload)
104
+
105
+
106
+ async def list_ingest_jobs(db_path: str, *, limit: int, offset: int) -> list[dict[str, Any]]:
107
+ await init_jobs_db(db_path)
108
+ async with aiosqlite.connect(db_path) as conn:
109
+ conn.row_factory = aiosqlite.Row
110
+ cursor = await conn.execute(
111
+ """
112
+ SELECT job_id, status, collection_name, filename, created_at
113
+ FROM ingest_jobs
114
+ ORDER BY datetime(updated_at) DESC, rowid DESC
115
+ LIMIT ? OFFSET ?
116
+ """,
117
+ (limit, offset),
118
+ )
119
+ rows = await cursor.fetchall()
120
+ return [dict(row) for row in rows]
workers/__init__.py ADDED
File without changes
workers/ingest_worker.py ADDED
@@ -0,0 +1,61 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import asyncio
2
+ from pathlib import Path
3
+
4
+ from rag.chunker import chunk_documents
5
+ from rag.embedder import create_embedding_function
6
+ from rag.loader import load_documents
7
+ from rag.vector_store import add_documents, get_vector_store
8
+ from storage.job_store import update_ingest_job
9
+
10
+
11
+ def _ingest_sync(temp_path: str, collection_name: str, chroma_persist_directory: str) -> tuple[list[str], int]:
12
+ documents = load_documents(temp_path)
13
+ chunks = chunk_documents(documents)
14
+ if not chunks:
15
+ raise ValueError("No content to ingest.")
16
+ embedding_function = create_embedding_function()
17
+ vector_store = get_vector_store(
18
+ persist_directory=chroma_persist_directory,
19
+ collection_name=collection_name,
20
+ embedding_function=embedding_function,
21
+ )
22
+ document_ids = add_documents(vector_store, chunks)
23
+ return document_ids, len(chunks)
24
+
25
+
26
+ async def run_ingest_job(
27
+ job_id: str,
28
+ temp_path: str,
29
+ collection_name: str,
30
+ jobs_db_path: str,
31
+ chroma_persist_directory: str,
32
+ ) -> None:
33
+ try:
34
+ await update_ingest_job(
35
+ jobs_db_path,
36
+ job_id,
37
+ status="processing",
38
+ message="Ingestion in progress.",
39
+ )
40
+ document_ids, num_chunks = await asyncio.to_thread(
41
+ _ingest_sync,
42
+ temp_path,
43
+ collection_name,
44
+ chroma_persist_directory,
45
+ )
46
+ await update_ingest_job(
47
+ jobs_db_path,
48
+ job_id,
49
+ status="completed",
50
+ message=f"Ingested {num_chunks} chunks.",
51
+ document_ids=document_ids,
52
+ )
53
+ except Exception as exc:
54
+ await update_ingest_job(
55
+ jobs_db_path,
56
+ job_id,
57
+ status="failed",
58
+ message=str(exc),
59
+ )
60
+ finally:
61
+ Path(temp_path).unlink(missing_ok=True)