ikram98ai commited on
Commit
d8dedbc
·
1 Parent(s): 4eadf6e

refactoring ingest function

Browse files
Files changed (4) hide show
  1. src/app.py +11 -3
  2. src/core/ingest.py +20 -7
  3. src/core/rag.ipynb +127 -12
  4. src/core/retrieval.py +3 -0
src/app.py CHANGED
@@ -10,7 +10,7 @@ _project_root = Path(__file__).resolve().parents[1]
10
  if str(_project_root) not in sys.path:
11
  sys.path.insert(0, str(_project_root))
12
 
13
- from src.core.ingest import ingest
14
  from src.core.retrieval import generate, retrieval
15
  from src.core.index import MetaData
16
  from src.core.synthetic_data import EVAL_QUERIES, SYNTHETIC_DOCUMENTS
@@ -48,8 +48,16 @@ def ingest_files(files:List[str], index_name:str, lang:Literal["en", "ja"], doma
48
  filter_data = MetaData(
49
  language=lang, domain=domain, section=section, topic=topic, doc_type=doc_type
50
  )
51
- result = ingest(index_name, filter_data, files)
52
- return {"status": "success", "message": result}
 
 
 
 
 
 
 
 
53
 
54
  def _add_metric(doc):
55
  return (f"\n### source: {doc.metadata.get('source_name','None')}"
 
10
  if str(_project_root) not in sys.path:
11
  sys.path.insert(0, str(_project_root))
12
 
13
+ from src.core.ingest import load_documents, get_chunks, ingest_documents
14
  from src.core.retrieval import generate, retrieval
15
  from src.core.index import MetaData
16
  from src.core.synthetic_data import EVAL_QUERIES, SYNTHETIC_DOCUMENTS
 
48
  filter_data = MetaData(
49
  language=lang, domain=domain, section=section, topic=topic, doc_type=doc_type
50
  )
51
+ try:
52
+ docs = load_documents(files)
53
+ chunks = get_chunks(docs, filter_data)
54
+ message = ingest_documents(chunks, index_name)
55
+ except Exception as e:
56
+ message = f"Error during ingestion: {str(e)}"
57
+ print(message)
58
+ return {"status": "error", "message": message}
59
+
60
+ return {"status": "success", "message": message}
61
 
62
  def _add_metric(doc):
63
  return (f"\n### source: {doc.metadata.get('source_name','None')}"
src/core/ingest.py CHANGED
@@ -5,7 +5,9 @@ from langchain_openai import ChatOpenAI
5
  from dotenv import load_dotenv, find_dotenv
6
  from typing import List
7
  import uuid
8
- from .index import get_vectorstore, MetaData
 
 
9
  from .utils import mask_pii
10
 
11
  find_dotenv()
@@ -14,18 +16,24 @@ load_dotenv()
14
  model = ChatOpenAI(model="gpt-5-nano")
15
 
16
 
17
- def ingest(file_paths: List[str], collection_name: str, metadata: MetaData):
 
18
  documents: list[Document] = []
19
  for file_path in file_paths:
20
  if file_path.endswith(".txt"):
21
  docs = TextLoader(file_path, encoding="utf-8").load()
22
  elif file_path.endswith(".pdf"):
23
  docs = PDFMinerLoader(file_path).load()
 
 
 
24
  documents.extend(docs)
25
- for doc in docs:
26
- doc.metadata["source"] = file_path.split("/")[-1]
27
-
28
  print(f"loaded {len(documents)} documents from {len(file_paths)} files.")
 
 
 
 
29
  text_splitter = RecursiveCharacterTextSplitter(
30
  chunk_size=1200, # chunk size (characters)
31
  chunk_overlap=200, # chunk overlap (characters)
@@ -35,23 +43,28 @@ def ingest(file_paths: List[str], collection_name: str, metadata: MetaData):
35
  print(f"generated {len(chunks)} chunks.")
36
 
37
  doc_id = str(uuid.uuid4())
38
- docs = [
39
  Document(
40
  page_content=mask_pii(chunk.page_content),
41
  metadata={
42
  "doc_id": doc_id,
43
  "chunk_id": str(uuid.uuid4()),
44
- "source_name": chunk.metadata["source"],
45
  "start_index": chunk.metadata["start_index"],
46
  **metadata.model_dump(),
47
  },
48
  )
49
  for chunk in chunks
50
  ]
 
51
 
 
 
 
52
  vectorstore = get_vectorstore(collection_name)
53
  ids = [str(uuid.uuid4()) for _ in range(len(docs))]
54
  vectorstore.add_documents(docs, ids=ids)
55
  success_message = f"Ingested {len(docs)} documents into {collection_name} index."
56
  print(success_message)
57
  return success_message
 
 
5
  from dotenv import load_dotenv, find_dotenv
6
  from typing import List
7
  import uuid
8
+
9
+ from src.core.index import get_vectorstore
10
+ from .index import MetaData
11
  from .utils import mask_pii
12
 
13
  find_dotenv()
 
16
  model = ChatOpenAI(model="gpt-5-nano")
17
 
18
 
19
+ def load_documents(file_paths: List[str]):
20
+ """Ingest files into vectorstore after processing and chunking."""
21
  documents: list[Document] = []
22
  for file_path in file_paths:
23
  if file_path.endswith(".txt"):
24
  docs = TextLoader(file_path, encoding="utf-8").load()
25
  elif file_path.endswith(".pdf"):
26
  docs = PDFMinerLoader(file_path).load()
27
+ else:
28
+ print(f"Unsupported file format: {file_path}")
29
+ continue
30
  documents.extend(docs)
31
+
 
 
32
  print(f"loaded {len(documents)} documents from {len(file_paths)} files.")
33
+ return documents
34
+
35
+ def get_chunks(documents: List[Document], metadata: MetaData):
36
+ """Split documents into chunks and mask PII."""
37
  text_splitter = RecursiveCharacterTextSplitter(
38
  chunk_size=1200, # chunk size (characters)
39
  chunk_overlap=200, # chunk overlap (characters)
 
43
  print(f"generated {len(chunks)} chunks.")
44
 
45
  doc_id = str(uuid.uuid4())
46
+ chunks = [
47
  Document(
48
  page_content=mask_pii(chunk.page_content),
49
  metadata={
50
  "doc_id": doc_id,
51
  "chunk_id": str(uuid.uuid4()),
52
+ "source_name": chunk.metadata["source"].split("/")[-1],
53
  "start_index": chunk.metadata["start_index"],
54
  **metadata.model_dump(),
55
  },
56
  )
57
  for chunk in chunks
58
  ]
59
+ return chunks
60
 
61
+
62
+ def ingest_documents(docs: List[Document], collection_name: str):
63
+ """Ingest documents into the specified vectorstore collection."""
64
  vectorstore = get_vectorstore(collection_name)
65
  ids = [str(uuid.uuid4()) for _ in range(len(docs))]
66
  vectorstore.add_documents(docs, ids=ids)
67
  success_message = f"Ingested {len(docs)} documents into {collection_name} index."
68
  print(success_message)
69
  return success_message
70
+
src/core/rag.ipynb CHANGED
@@ -51,7 +51,7 @@
51
  },
52
  {
53
  "cell_type": "code",
54
- "execution_count": 3,
55
  "id": "db72701e",
56
  "metadata": {},
57
  "outputs": [],
@@ -78,12 +78,12 @@
78
  },
79
  {
80
  "cell_type": "code",
81
- "execution_count": 4,
82
  "id": "f6037cfd",
83
  "metadata": {},
84
  "outputs": [],
85
  "source": [
86
- "from langchain_community.document_loaders import PDFMinerLoader\n",
87
  "from langchain_text_splitters import RecursiveCharacterTextSplitter\n",
88
  "from langchain_core.documents import Document\n",
89
  "from langchain_openai import ChatOpenAI\n",
@@ -96,17 +96,81 @@
96
  "load_dotenv()\n",
97
  "\n",
98
  "model = ChatOpenAI(model=\"gpt-5-nano\")\n",
 
 
 
 
 
 
 
 
 
 
99
  "\n",
100
- "\n",
101
- "def ingest(file_paths: List[str], collection_name: str, metadata: MetaData):\n",
102
  " documents: list[Document] = []\n",
103
  " for file_path in file_paths:\n",
104
- " docs = PDFMinerLoader(file_path).load()\n",
 
 
 
105
  " documents.extend(docs)\n",
106
- " for doc in docs:\n",
107
- " doc.metadata[\"source\"] = file_path.split(\"/\")[-1]\n",
108
- " \n",
109
  " print(f\"loaded {len(documents)} documents from {len(file_paths)} files.\")\n",
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
110
  " text_splitter = RecursiveCharacterTextSplitter(\n",
111
  " chunk_size=1200, # chunk size (characters)\n",
112
  " chunk_overlap=200, # chunk overlap (characters)\n",
@@ -116,21 +180,72 @@
116
  " print(f\"generated {len(chunks)} chunks.\")\n",
117
  "\n",
118
  " doc_id = str(uuid.uuid4())\n",
119
- " docs = [\n",
120
  " Document(\n",
121
  " page_content=mask_pii(chunk.page_content),\n",
122
  " metadata={\n",
123
  " \"doc_id\": doc_id,\n",
124
  " \"chunk_id\": str(uuid.uuid4()),\n",
125
- " \"source_name\": chunk.metadata[\"source\"],\n",
126
- " \"total_pages\": chunk.metadata[\"total_pages\"],\n",
127
  " \"start_index\": chunk.metadata[\"start_index\"],\n",
128
  " **metadata.model_dump(),\n",
129
  " },\n",
130
  " )\n",
131
  " for chunk in chunks\n",
132
  " ]\n",
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
133
  "\n",
 
 
134
  " vectorstore = get_vectorstore(collection_name)\n",
135
  " ids = [str(uuid.uuid4()) for _ in range(len(docs))]\n",
136
  " vectorstore.add_documents(docs, ids=ids)\n",
 
51
  },
52
  {
53
  "cell_type": "code",
54
+ "execution_count": 2,
55
  "id": "db72701e",
56
  "metadata": {},
57
  "outputs": [],
 
78
  },
79
  {
80
  "cell_type": "code",
81
+ "execution_count": 3,
82
  "id": "f6037cfd",
83
  "metadata": {},
84
  "outputs": [],
85
  "source": [
86
+ "from langchain_community.document_loaders import PDFMinerLoader, TextLoader\n",
87
  "from langchain_text_splitters import RecursiveCharacterTextSplitter\n",
88
  "from langchain_core.documents import Document\n",
89
  "from langchain_openai import ChatOpenAI\n",
 
96
  "load_dotenv()\n",
97
  "\n",
98
  "model = ChatOpenAI(model=\"gpt-5-nano\")\n",
99
+ "\n"
100
+ ]
101
+ },
102
+ {
103
+ "cell_type": "code",
104
+ "execution_count": 4,
105
+ "id": "03501b3b",
106
+ "metadata": {},
107
+ "outputs": [],
108
+ "source": [
109
  "\n",
110
+ "def load_documents(file_paths: List[str]):\n",
111
+ " \"\"\"Ingest files into vectorstore after processing and chunking.\"\"\"\n",
112
  " documents: list[Document] = []\n",
113
  " for file_path in file_paths:\n",
114
+ " if file_path.endswith(\".txt\"):\n",
115
+ " docs = TextLoader(file_path, encoding=\"utf-8\").load()\n",
116
+ " elif file_path.endswith(\".pdf\"):\n",
117
+ " docs = PDFMinerLoader(file_path).load()\n",
118
  " documents.extend(docs)\n",
119
+ "\n",
 
 
120
  " print(f\"loaded {len(documents)} documents from {len(file_paths)} files.\")\n",
121
+ " return documents\n"
122
+ ]
123
+ },
124
+ {
125
+ "cell_type": "code",
126
+ "execution_count": 10,
127
+ "id": "0b901011",
128
+ "metadata": {},
129
+ "outputs": [
130
+ {
131
+ "name": "stdout",
132
+ "output_type": "stream",
133
+ "text": [
134
+ "loaded 1 documents from 1 files.\n"
135
+ ]
136
+ }
137
+ ],
138
+ "source": [
139
+ "doc = load_documents([\"../../data/gemma.pdf\",])"
140
+ ]
141
+ },
142
+ {
143
+ "cell_type": "code",
144
+ "execution_count": 15,
145
+ "id": "3381a9e7",
146
+ "metadata": {},
147
+ "outputs": [
148
+ {
149
+ "data": {
150
+ "text/plain": [
151
+ "'../../data/gemma.pdf'"
152
+ ]
153
+ },
154
+ "execution_count": 15,
155
+ "metadata": {},
156
+ "output_type": "execute_result"
157
+ }
158
+ ],
159
+ "source": [
160
+ "doc[0].metadata[\"source\"]"
161
+ ]
162
+ },
163
+ {
164
+ "cell_type": "code",
165
+ "execution_count": 16,
166
+ "id": "67879c7d",
167
+ "metadata": {},
168
+ "outputs": [],
169
+ "source": [
170
+ "\n",
171
+ "\n",
172
+ "def get_chunks(documents: List[Document], metadata: MetaData):\n",
173
+ " \"\"\"Split documents into chunks and mask PII.\"\"\"\n",
174
  " text_splitter = RecursiveCharacterTextSplitter(\n",
175
  " chunk_size=1200, # chunk size (characters)\n",
176
  " chunk_overlap=200, # chunk overlap (characters)\n",
 
180
  " print(f\"generated {len(chunks)} chunks.\")\n",
181
  "\n",
182
  " doc_id = str(uuid.uuid4())\n",
183
+ " chunks = [\n",
184
  " Document(\n",
185
  " page_content=mask_pii(chunk.page_content),\n",
186
  " metadata={\n",
187
  " \"doc_id\": doc_id,\n",
188
  " \"chunk_id\": str(uuid.uuid4()),\n",
189
+ " \"source_name\": chunk.metadata[\"source\"].split(\"/\")[-1],\n",
 
190
  " \"start_index\": chunk.metadata[\"start_index\"],\n",
191
  " **metadata.model_dump(),\n",
192
  " },\n",
193
  " )\n",
194
  " for chunk in chunks\n",
195
  " ]\n",
196
+ " return chunks\n"
197
+ ]
198
+ },
199
+ {
200
+ "cell_type": "code",
201
+ "execution_count": 18,
202
+ "id": "8ab1d37f",
203
+ "metadata": {},
204
+ "outputs": [
205
+ {
206
+ "name": "stdout",
207
+ "output_type": "stream",
208
+ "text": [
209
+ "generated 72 chunks.\n"
210
+ ]
211
+ }
212
+ ],
213
+ "source": [
214
+ "chunks = get_chunks(doc, MetaData(language=\"en\"))"
215
+ ]
216
+ },
217
+ {
218
+ "cell_type": "code",
219
+ "execution_count": 20,
220
+ "id": "f739cda1",
221
+ "metadata": {},
222
+ "outputs": [
223
+ {
224
+ "data": {
225
+ "text/plain": [
226
+ "dict_items([('doc_id', '8cf509fb-56d0-436b-b712-3a12e092c60f'), ('chunk_id', '2fe270b3-4d8c-4629-9d6b-e36e93f3294b'), ('source_name', 'gemma.pdf'), ('start_index', 0), ('language', 'en'), ('domain', None), ('section', None), ('topic', None), ('doc_type', None)])"
227
+ ]
228
+ },
229
+ "execution_count": 20,
230
+ "metadata": {},
231
+ "output_type": "execute_result"
232
+ }
233
+ ],
234
+ "source": [
235
+ "chunks[0].metadata.items()"
236
+ ]
237
+ },
238
+ {
239
+ "cell_type": "code",
240
+ "execution_count": null,
241
+ "id": "df9c6181",
242
+ "metadata": {},
243
+ "outputs": [],
244
+ "source": [
245
+ "\n",
246
  "\n",
247
+ "def ingest_to_vectorstore(docs: List[Document], collection_name: str):\n",
248
+ " \"\"\"Ingest documents into the specified vectorstore collection.\"\"\"\n",
249
  " vectorstore = get_vectorstore(collection_name)\n",
250
  " ids = [str(uuid.uuid4()) for _ in range(len(docs))]\n",
251
  " vectorstore.add_documents(docs, ids=ids)\n",
src/core/retrieval.py CHANGED
@@ -12,6 +12,7 @@ model = ChatOpenAI(model="gpt-5-nano")
12
 
13
 
14
  def reranker(query: str, docs: List[Document]) -> List[Document]:
 
15
  print(f"Retrieved {len(docs)} documents")
16
  if len(docs) <= 1:
17
  return docs
@@ -24,6 +25,7 @@ def reranker(query: str, docs: List[Document]) -> List[Document]:
24
  def retrieval(
25
  query: str, collection_name: str, filter_data: MetaData
26
  ) -> List[tuple[Document, float]]:
 
27
  vectorstore = get_vectorstore(collection_name)
28
  print(
29
  f"RETRIEVAL query: {query[:40]}, for {collection_name} collection, with filters: {filter_data}"
@@ -57,6 +59,7 @@ def retrieval(
57
 
58
 
59
  def generate(query: str, ctx_docs: List[Document]) -> str:
 
60
  context = "\n".join([doc.page_content for doc in ctx_docs])
61
  prompt = f"""Answer shortly to the user question according to the given context. Only answer if the context is given to you.
62
  question: {query}
 
12
 
13
 
14
  def reranker(query: str, docs: List[Document]) -> List[Document]:
15
+ """Rerank documents using BM25Retriever"""
16
  print(f"Retrieved {len(docs)} documents")
17
  if len(docs) <= 1:
18
  return docs
 
25
  def retrieval(
26
  query: str, collection_name: str, filter_data: MetaData
27
  ) -> List[tuple[Document, float]]:
28
+ """Retrieve relevant documents from the vector store based on the query and filters."""
29
  vectorstore = get_vectorstore(collection_name)
30
  print(
31
  f"RETRIEVAL query: {query[:40]}, for {collection_name} collection, with filters: {filter_data}"
 
59
 
60
 
61
  def generate(query: str, ctx_docs: List[Document]) -> str:
62
+ """Generate answer using the language model based on the query and context documents."""
63
  context = "\n".join([doc.page_content for doc in ctx_docs])
64
  prompt = f"""Answer shortly to the user question according to the given context. Only answer if the context is given to you.
65
  question: {query}