Spaces:
Sleeping
Sleeping
Update utils/ingestion.py
Browse files- utils/ingestion.py +26 -17
utils/ingestion.py
CHANGED
|
@@ -23,7 +23,7 @@ class DocumentProcessor:
|
|
| 23 |
"""Initialize document processor with necessary components"""
|
| 24 |
self.setup_document_converter()
|
| 25 |
self.embed_model = FastEmbedEmbeddings()
|
| 26 |
-
self.client = chromadb.PersistentClient(path="chroma_db") #
|
| 27 |
|
| 28 |
def setup_document_converter(self):
|
| 29 |
"""Configure document converter with advanced processing capabilities"""
|
|
@@ -33,9 +33,17 @@ class DocumentProcessor:
|
|
| 33 |
pipeline_options.table_structure_options.do_cell_matching = True
|
| 34 |
pipeline_options.ocr_options.lang = ["en"]
|
| 35 |
pipeline_options.table_structure_options.mode = TableFormerMode.ACCURATE
|
| 36 |
-
|
| 37 |
-
|
| 38 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 39 |
|
| 40 |
self.converter = DocumentConverter(
|
| 41 |
format_options={
|
|
@@ -49,7 +57,7 @@ class DocumentProcessor:
|
|
| 49 |
def extract_chunk_metadata(self, chunk) -> Dict[str, Any]:
|
| 50 |
"""Extract essential metadata from a chunk"""
|
| 51 |
metadata = {
|
| 52 |
-
"text": chunk.text,
|
| 53 |
"headings": [],
|
| 54 |
"page_info": None,
|
| 55 |
"content_type": None
|
|
@@ -73,7 +81,7 @@ class DocumentProcessor:
|
|
| 73 |
|
| 74 |
def process_document(self, pdf_path: str):
|
| 75 |
"""Process document and create searchable index with metadata"""
|
| 76 |
-
print(f"Processing document: {pdf_path}")
|
| 77 |
start_time = time.time()
|
| 78 |
|
| 79 |
result = self.converter.convert(pdf_path)
|
|
@@ -87,7 +95,7 @@ class DocumentProcessor:
|
|
| 87 |
metadata = self.extract_chunk_metadata(chunk)
|
| 88 |
processed_chunks.append(metadata)
|
| 89 |
|
| 90 |
-
print("
|
| 91 |
collection = self.client.get_or_create_collection(name="document_chunks")
|
| 92 |
|
| 93 |
documents = []
|
|
@@ -98,10 +106,10 @@ class DocumentProcessor:
|
|
| 98 |
for idx, chunk in enumerate(processed_chunks):
|
| 99 |
text = chunk.get('text', '').strip()
|
| 100 |
if not text:
|
| 101 |
-
print(f"Skipping empty chunk at index {idx}")
|
| 102 |
continue # Skip empty chunks
|
| 103 |
|
| 104 |
-
embedding = self.embed_model.embed_documents([text])[0] # ✅
|
| 105 |
documents.append(text)
|
| 106 |
embeddings.append(embedding)
|
| 107 |
metadata_list.append({
|
|
@@ -111,14 +119,15 @@ class DocumentProcessor:
|
|
| 111 |
})
|
| 112 |
ids.append(str(idx))
|
| 113 |
|
| 114 |
-
|
| 115 |
-
|
| 116 |
-
|
| 117 |
-
|
| 118 |
-
|
| 119 |
-
|
| 120 |
-
|
|
|
|
| 121 |
|
| 122 |
processing_time = time.time() - start_time
|
| 123 |
-
print(f"
|
| 124 |
return collection
|
|
|
|
| 23 |
"""Initialize document processor with necessary components"""
|
| 24 |
self.setup_document_converter()
|
| 25 |
self.embed_model = FastEmbedEmbeddings()
|
| 26 |
+
self.client = chromadb.PersistentClient(path="chroma_db") # Persistent Storage
|
| 27 |
|
| 28 |
def setup_document_converter(self):
|
| 29 |
"""Configure document converter with advanced processing capabilities"""
|
|
|
|
| 33 |
pipeline_options.table_structure_options.do_cell_matching = True
|
| 34 |
pipeline_options.ocr_options.lang = ["en"]
|
| 35 |
pipeline_options.table_structure_options.mode = TableFormerMode.ACCURATE
|
| 36 |
+
|
| 37 |
+
# ✅ Automatically handle CPU fallback
|
| 38 |
+
try:
|
| 39 |
+
pipeline_options.accelerator_options = AcceleratorOptions(
|
| 40 |
+
num_threads=8, device=AcceleratorDevice.MPS
|
| 41 |
+
)
|
| 42 |
+
except Exception as e:
|
| 43 |
+
print("⚠️ MPS is not available. Falling back to CPU.")
|
| 44 |
+
pipeline_options.accelerator_options = AcceleratorOptions(
|
| 45 |
+
num_threads=8, device=AcceleratorDevice.CPU
|
| 46 |
+
)
|
| 47 |
|
| 48 |
self.converter = DocumentConverter(
|
| 49 |
format_options={
|
|
|
|
| 57 |
def extract_chunk_metadata(self, chunk) -> Dict[str, Any]:
|
| 58 |
"""Extract essential metadata from a chunk"""
|
| 59 |
metadata = {
|
| 60 |
+
"text": chunk.text.strip(),
|
| 61 |
"headings": [],
|
| 62 |
"page_info": None,
|
| 63 |
"content_type": None
|
|
|
|
| 81 |
|
| 82 |
def process_document(self, pdf_path: str):
|
| 83 |
"""Process document and create searchable index with metadata"""
|
| 84 |
+
print(f"📄 Processing document: {pdf_path}")
|
| 85 |
start_time = time.time()
|
| 86 |
|
| 87 |
result = self.converter.convert(pdf_path)
|
|
|
|
| 95 |
metadata = self.extract_chunk_metadata(chunk)
|
| 96 |
processed_chunks.append(metadata)
|
| 97 |
|
| 98 |
+
print("✅ Chunking completed. Creating vector database...")
|
| 99 |
collection = self.client.get_or_create_collection(name="document_chunks")
|
| 100 |
|
| 101 |
documents = []
|
|
|
|
| 106 |
for idx, chunk in enumerate(processed_chunks):
|
| 107 |
text = chunk.get('text', '').strip()
|
| 108 |
if not text:
|
| 109 |
+
print(f"⚠️ Skipping empty chunk at index {idx}")
|
| 110 |
continue # Skip empty chunks
|
| 111 |
|
| 112 |
+
embedding = self.embed_model.embed_documents([text])[0] # ✅ Corrected method
|
| 113 |
documents.append(text)
|
| 114 |
embeddings.append(embedding)
|
| 115 |
metadata_list.append({
|
|
|
|
| 119 |
})
|
| 120 |
ids.append(str(idx))
|
| 121 |
|
| 122 |
+
if documents:
|
| 123 |
+
collection.add(
|
| 124 |
+
ids=ids,
|
| 125 |
+
embeddings=embeddings,
|
| 126 |
+
documents=documents,
|
| 127 |
+
metadatas=metadata_list
|
| 128 |
+
)
|
| 129 |
+
print(f"✅ Successfully added {len(documents)} chunks to the database.")
|
| 130 |
|
| 131 |
processing_time = time.time() - start_time
|
| 132 |
+
print(f"✅ Document processing completed in {processing_time:.2f} seconds")
|
| 133 |
return collection
|