SkyNait commited on
Commit
8f78162
·
1 Parent(s): 99cd3b7

s3 bucket

Browse files
__pycache__/inference_svm_model.cpython-310.pyc CHANGED
Binary files a/__pycache__/inference_svm_model.cpython-310.pyc and b/__pycache__/inference_svm_model.cpython-310.pyc differ
 
__pycache__/mineru_single.cpython-310.pyc CHANGED
Binary files a/__pycache__/mineru_single.cpython-310.pyc and b/__pycache__/mineru_single.cpython-310.pyc differ
 
__pycache__/table_row_extraction.cpython-310.pyc CHANGED
Binary files a/__pycache__/table_row_extraction.cpython-310.pyc and b/__pycache__/table_row_extraction.cpython-310.pyc differ
 
__pycache__/worker.cpython-310.pyc CHANGED
Binary files a/__pycache__/worker.cpython-310.pyc and b/__pycache__/worker.cpython-310.pyc differ
 
topic_extraction.py CHANGED
@@ -5,9 +5,12 @@ import gc
5
  import json
6
  import logging
7
  import fitz
 
8
  import base64
9
  import time
10
  import asyncio
 
 
11
  from io import BytesIO
12
  from typing import List, Dict, Any
13
 
@@ -32,10 +35,57 @@ logger.addHandler(file_handler)
32
 
33
  _GEMINI_CLIENT = None
34
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
35
  def preprocess_image(image_data: bytes, max_dim: int = 600, quality: int = 60) -> bytes:
36
- """
37
- Downscale the image to reduce payload size.
38
- """
39
  arr = np.frombuffer(image_data, np.uint8)
40
  img = cv2.imdecode(arr, cv2.IMREAD_COLOR)
41
  if img is not None:
@@ -52,9 +102,6 @@ def preprocess_image(image_data: bytes, max_dim: int = 600, quality: int = 60) -
52
  return image_data
53
 
54
  def call_gemini_for_table_classification(image_data: bytes, api_key: str, max_retries: int = 1) -> str:
55
- """
56
- Synchronously call the Gemini API to classify a table image.
57
- """
58
  for attempt in range(max_retries + 1):
59
  try:
60
  prompt = """You are given an image. Determine if it shows a table that has exactly 2 or 3 columns.
@@ -72,10 +119,7 @@ The two-column 'table' image include such key features:
72
  If the image is a relevant table with 2 columns, respond with 'TWO_COLUMN'.
73
  If the image is a relevant table with 3 columns, respond with 'THREE_COLUMN'.
74
  If the image does not show a table at all, respond with 'NO_TABLE'.
75
- Return only one of these exact labels as your entire response:
76
- TWO_COLUMN
77
- THREE_COLUMN
78
- NO_TABLE
79
  """
80
  global _GEMINI_CLIENT
81
  client = _GEMINI_CLIENT
@@ -94,7 +138,7 @@ NO_TABLE
94
  ]
95
  }
96
  ],
97
- config=types.GenerateContentConfig(temperature=0.0)
98
  )
99
  if resp and resp.text:
100
  classification = resp.text.strip().upper()
@@ -104,54 +148,246 @@ NO_TABLE
104
  return "TWO_COLUMN"
105
  return "NO_TABLE"
106
  except Exception as e:
107
- error_msg = str(e)
108
- logger.error(f"Gemini table classification error: {error_msg}")
109
- if "503" in error_msg:
110
  return "NO_TABLE"
111
  if attempt < max_retries:
112
- logger.warning("Retrying classification due to error... attempt %d", attempt + 1)
113
  time.sleep(0.5)
114
  else:
115
  return "NO_TABLE"
116
 
117
  async def classify_image_async(image_data: bytes, api_key: str, max_retries: int = 1) -> str:
118
- """
119
- Asynchronous wrapper for image classification.
120
- """
121
  loop = asyncio.get_event_loop()
122
  preprocessed = preprocess_image(image_data)
123
  return await loop.run_in_executor(None, call_gemini_for_table_classification, preprocessed, api_key, max_retries)
124
 
125
- def unify_whitespace(text: str) -> str:
126
- return re.sub(r"\s+", " ", text).strip().lower()
 
 
 
 
 
127
 
128
- def find_all_occurrences(pdf_bytes: bytes, search_text: str) -> List[int]:
129
- doc = fitz.open(stream=pdf_bytes, filetype="pdf")
130
- st_norm = unify_whitespace(search_text)
131
- found = []
132
- for i in range(doc.page_count):
133
- raw = doc[i].get_text("raw")
134
- norm = unify_whitespace(raw)
135
- if st_norm in norm:
136
- found.append(i)
137
- doc.close()
138
- return sorted(found)
139
 
140
- def create_subset_pdf(original_pdf_bytes: bytes, page_indices: List[int]) -> bytes:
141
- if not page_indices:
142
- raise ValueError("No page indices provided for subset creation.")
143
- doc = fitz.open(stream=original_pdf_bytes, filetype="pdf")
144
- new_doc = fitz.open()
145
- for p in sorted(set(page_indices)):
146
- if 0 <= p < doc.page_count:
147
- new_doc.insert_pdf(doc, from_page=p, to_page=p)
148
- else:
149
- logger.error(f"Page index {p} out of range (0..{doc.page_count - 1}).")
150
- raise ValueError(f"Page index {p} out of range.")
151
- subset_bytes = new_doc.tobytes()
152
- new_doc.close()
153
- doc.close()
154
- return subset_bytes
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
155
 
156
  class GeminiTopicExtractor:
157
  def __init__(self, api_key: str = None, num_pages: int = 10):
@@ -163,7 +399,7 @@ class GeminiTopicExtractor:
163
  if not first_pages_text.strip():
164
  logger.error("No text from first pages => cannot extract subtopics.")
165
  return {}
166
-
167
  prompt = f"""
168
  You have the first pages of a PDF specification, including a table of contents.
169
 
@@ -306,8 +542,6 @@ Now, extract topics from this text:
306
  {first_pages_text}
307
  """
308
  global _GEMINI_CLIENT
309
- if _GEMINI_CLIENT is None:
310
- _GEMINI_CLIENT = genai.Client(api_key=self.api_key)
311
  client = _GEMINI_CLIENT
312
  try:
313
  response = client.models.generate_content(
@@ -318,7 +552,6 @@ Now, extract topics from this text:
318
  if not response or not response.text:
319
  logger.warning("No text from LLM => returning empty subtopics.")
320
  return {}
321
-
322
  raw_json = response.text.strip()
323
  cleaned = raw_json.replace("```json", "").replace("```", "")
324
  try:
@@ -348,7 +581,16 @@ Now, extract topics from this text:
348
  def _read_first_pages_raw(self, pdf_path: str, num_pages: int) -> str:
349
  text_parts = []
350
  try:
351
- doc = fitz.open(pdf_path)
 
 
 
 
 
 
 
 
 
352
  pages_to_read = min(num_pages, doc.page_count)
353
  for i in range(pages_to_read):
354
  raw_text = doc[i].get_text("raw")
@@ -358,123 +600,6 @@ Now, extract topics from this text:
358
  logger.error(f"Could not open PDF: {e}")
359
  return "\n".join(text_parts)
360
 
361
- class LocalImageWriter(DataWriter):
362
- def __init__(self, output_folder: str, gemini_api_key: str):
363
- self.output_folder = output_folder
364
- os.makedirs(self.output_folder, exist_ok=True)
365
- self.descriptions = {}
366
- self._img_count = 0
367
- self.gemini_api_key = gemini_api_key
368
-
369
- def write(self, path: str, data: bytes) -> None:
370
- self._img_count += 1
371
- unique_id = f"img_{self._img_count}.jpg"
372
- self.descriptions[path] = {
373
- "data": data,
374
- "relative_path": unique_id,
375
- "table_classification": "NO_TABLE",
376
- "final_alt": ""
377
- }
378
-
379
- async def post_process_async(self, key: str, md_content: str) -> str:
380
- logger.info("Classifying images to detect tables (async).")
381
- tasks = []
382
- for p, info in self.descriptions.items():
383
- tasks.append((p, classify_image_async(info["data"], self.gemini_api_key)))
384
- for p, task in tasks:
385
- try:
386
- classification = await task
387
- self.descriptions[p]['table_classification'] = classification
388
- except Exception as e:
389
- logger.error(f"Table classification error: {e}")
390
- self.descriptions[p]['table_classification'] = "NO_TABLE"
391
- for p, info in self.descriptions.items():
392
- cls = info['table_classification']
393
- if cls == "TWO_COLUMN":
394
- info['final_alt'] = "HAS TO BE PROCESSED - two column table"
395
- elif cls == "THREE_COLUMN":
396
- info['final_alt'] = "HAS TO BE PROCESSED - three column table"
397
- else:
398
- info['final_alt'] = "NO_TABLE image"
399
- for p, info in self.descriptions.items():
400
- old_md = f"![]({key}{p})"
401
- new_md = f"![{info['final_alt']}]({info['relative_path']})"
402
- md_content = md_content.replace(old_md, new_md)
403
-
404
- md_content = self._process_table_images_in_markdown(md_content)
405
- final_lines = []
406
- for line in md_content.split("\n"):
407
- if re.match(r"^\!\[.*\]\(.*\)", line.strip()):
408
- final_lines.append(line.strip())
409
- return "\n".join(final_lines)
410
-
411
- def post_process(self, key: str, md_content: str) -> str:
412
- """
413
- Synchronous wrapper around the asynchronous post_process_async.
414
- """
415
- return asyncio.run(self.post_process_async(key, md_content))
416
-
417
- def _process_table_images_in_markdown(self, md_content: str) -> str:
418
- pat = r"!\[HAS TO BE PROCESSED - (two|three) column table\]\(([^)]+)\)"
419
- matches = re.findall(pat, md_content, flags=re.IGNORECASE)
420
- if not matches:
421
- return md_content
422
- for (col_type, image_id) in matches:
423
- logger.info(f"Processing table image => {image_id}, columns={col_type}")
424
- temp_path = os.path.join(self.output_folder, image_id)
425
- desc_item = None
426
- for k, val in self.descriptions.items():
427
- if val["relative_path"] == image_id:
428
- desc_item = val
429
- break
430
- if not desc_item:
431
- logger.warning(f"No matching image data for {image_id}, skipping extraction.")
432
- continue
433
- if not os.path.exists(temp_path):
434
- with open(temp_path, "wb") as f:
435
- f.write(desc_item["data"])
436
- try:
437
- if col_type.lower() == 'two':
438
- extractor = TableExtractor(
439
- skip_header=True,
440
- merge_two_col_rows=True,
441
- enable_subtopic_merge=True,
442
- subtopic_threshold=0.2
443
- )
444
- else:
445
- extractor = TableExtractor(
446
- skip_header=True,
447
- merge_two_col_rows=False,
448
- enable_subtopic_merge=False,
449
- subtopic_threshold=0.2
450
- )
451
- row_boxes = extractor.process_image(temp_path)
452
-
453
- out_folder = temp_path + "_rows"
454
- os.makedirs(out_folder, exist_ok=True)
455
-
456
- extractor.save_extracted_cells(temp_path, row_boxes, out_folder)
457
-
458
- snippet = ["**Extracted table cells:**"]
459
- for i, row in enumerate(row_boxes):
460
- row_dir = os.path.join(out_folder, f"row_{i}")
461
- for j, _ in enumerate(row):
462
- cell_file = f"col_{j}.jpg"
463
- cell_path = os.path.join(row_dir, cell_file)
464
- relp = os.path.relpath(cell_path, self.output_folder)
465
- snippet.append(f"![Row {i} Col {j}]({relp})")
466
- new_snip = "\n".join(snippet)
467
-
468
- old_line = f"![HAS TO BE PROCESSED - {col_type} column table]({image_id})"
469
-
470
- md_content = md_content.replace(old_line, new_snip)
471
- except Exception as e:
472
- logger.error(f"Error processing table image {image_id}: {e}")
473
- finally:
474
- if os.path.exists(temp_path):
475
- os.remove(temp_path)
476
- return md_content
477
-
478
  class MineruNoTextProcessor:
479
  def __init__(self, output_folder: str, gemini_api_key: str = None):
480
  self.output_folder = output_folder
@@ -486,6 +611,20 @@ class MineruNoTextProcessor:
486
  self.subtopic_extractor = GeminiTopicExtractor(api_key=gemini_api_key, num_pages=10)
487
  self.gemini_api_key = gemini_api_key or os.getenv("GEMINI_API_KEY", "")
488
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
489
  def cleanup_gpu(self):
490
  try:
491
  gc.collect()
@@ -498,18 +637,27 @@ class MineruNoTextProcessor:
498
  logger.info(f"Processing PDF: {pdf_path}")
499
  try:
500
  subtopics = self.subtopic_extractor.extract_subtopics(pdf_path)
501
-
502
  logger.info(f"Gemini returned subtopics: {subtopics}")
503
- with open(pdf_path, "rb") as f:
504
- pdf_bytes = f.read()
505
-
 
 
 
 
 
 
 
 
 
 
506
  doc = fitz.open(stream=pdf_bytes, filetype="pdf")
507
  total_pages = doc.page_count
508
  doc.close()
509
-
510
  final_pages = set()
511
  if not subtopics:
512
- logger.warning("No subtopics found. We'll process the entire PDF as fallback.")
513
  final_pages = set(range(total_pages))
514
  else:
515
  for subname, rng in subtopics.items():
@@ -529,16 +677,10 @@ class MineruNoTextProcessor:
529
  chosen_page = p
530
  break
531
  if chosen_page is None:
532
- if occs:
533
- chosen_page = occs[-1]
534
- logger.warning(f"No occurrence >= {doc_start_0} for '{subname}'. Using last => {chosen_page}")
535
- else:
536
- chosen_page = 0
537
- logger.warning(f"No occurrences for '{subname}'. Using page 0.")
538
-
539
  raw_offset = chosen_page - doc_start_0
540
  offset = max(0, raw_offset)
541
- logger.info(f"Subtopic '{subname}': doc_start={start_p}, chosen_page={chosen_page}, raw_offset={raw_offset}, offset={offset}")
542
  s0 = (start_p - 1) + offset
543
  e0 = (end_p - 1) + offset
544
  s0 = max(0, min(total_pages - 1, s0))
@@ -546,12 +688,11 @@ class MineruNoTextProcessor:
546
  for pp in range(s0, e0 + 1):
547
  final_pages.add(pp)
548
  if not final_pages:
549
- logger.warning("No valid pages after offset. We'll process entire PDF.")
550
  final_pages = set(range(total_pages))
551
  logger.info(f"Processing pages (0-based): {sorted(final_pages)}")
552
 
553
  subset_pdf_bytes = create_subset_pdf(pdf_bytes, sorted(final_pages))
554
-
555
  dataset = PymuDocDataset(subset_pdf_bytes)
556
  inference = doc_analyze(
557
  dataset,
@@ -562,18 +703,24 @@ class MineruNoTextProcessor:
562
  table_enable=self.table_enable
563
  )
564
  logger.info("doc_analyze complete. Extracting images.")
565
- writer = LocalImageWriter(self.output_folder, self.gemini_api_key)
 
 
 
 
 
566
  pipe_result = inference.pipe_ocr_mode(writer, lang=self.language)
567
- md_content = pipe_result.get_markdown("local-unique-prefix/")
568
-
569
- final_markdown = writer.post_process("local-unique-prefix/", md_content)
570
- out_path = os.path.join(self.output_folder, "final_output.md")
571
-
572
- with open(out_path, "w", encoding="utf-8") as f:
573
- f.write(final_markdown)
574
-
 
 
575
  return final_markdown
576
-
577
  finally:
578
  self.cleanup_gpu()
579
 
@@ -584,5 +731,6 @@ if __name__ == "__main__":
584
  try:
585
  processor = MineruNoTextProcessor(output_folder=output_dir, gemini_api_key=gemini_key)
586
  md_output = processor.process(input_pdf)
 
587
  except Exception as e:
588
  logger.error(f"Processing failed: {e}")
 
5
  import json
6
  import logging
7
  import fitz
8
+ import boto3
9
  import base64
10
  import time
11
  import asyncio
12
+ import tempfile
13
+ import requests
14
  from io import BytesIO
15
  from typing import List, Dict, Any
16
 
 
35
 
36
  _GEMINI_CLIENT = None
37
 
38
+ def unify_whitespace(text: str) -> str:
39
+ return re.sub(r"\s+", " ", text).strip()
40
+
41
+ def find_all_occurrences(pdf_bytes: bytes, search_text: str) -> List[int]:
42
+ doc = fitz.open(stream=pdf_bytes, filetype="pdf")
43
+ st_norm = unify_whitespace(search_text)
44
+ found = []
45
+ for i in range(doc.page_count):
46
+ raw = doc[i].get_text("raw")
47
+ norm = unify_whitespace(raw)
48
+ if st_norm in norm:
49
+ found.append(i)
50
+ doc.close()
51
+ return sorted(found)
52
+
53
+ def create_subset_pdf(original_pdf_bytes: bytes, page_indices: List[int]) -> bytes:
54
+ if not page_indices:
55
+ raise ValueError("No page indices provided for subset creation.")
56
+ doc = fitz.open(stream=original_pdf_bytes, filetype="pdf")
57
+ new_doc = fitz.open()
58
+ for p in sorted(set(page_indices)):
59
+ if 0 <= p < doc.page_count:
60
+ new_doc.insert_pdf(doc, from_page=p, to_page=p)
61
+ else:
62
+ logger.error(f"Page index {p} out of range (0..{doc.page_count - 1}).")
63
+ raise ValueError(f"Page index {p} out of range.")
64
+ subset_bytes = new_doc.tobytes()
65
+ new_doc.close()
66
+ doc.close()
67
+ return subset_bytes
68
+
69
+ class s3Writer:
70
+ def __init__(self, ak: str, sk: str, bucket: str, endpoint_url: str):
71
+ self.bucket = bucket
72
+ self.client = boto3.client(
73
+ 's3',
74
+ aws_access_key_id=ak,
75
+ aws_secret_access_key=sk,
76
+ endpoint_url=endpoint_url
77
+ )
78
+
79
+ def write(self, path: str, data: bytes) -> None:
80
+ file_obj = BytesIO(data)
81
+ self.client.upload_fileobj(
82
+ file_obj,
83
+ self.bucket, path
84
+ )
85
+ logger.info(f"Uploaded to S3: {path}")
86
+
87
+ #reduce img size, save time for gemini call
88
  def preprocess_image(image_data: bytes, max_dim: int = 600, quality: int = 60) -> bytes:
 
 
 
89
  arr = np.frombuffer(image_data, np.uint8)
90
  img = cv2.imdecode(arr, cv2.IMREAD_COLOR)
91
  if img is not None:
 
102
  return image_data
103
 
104
  def call_gemini_for_table_classification(image_data: bytes, api_key: str, max_retries: int = 1) -> str:
 
 
 
105
  for attempt in range(max_retries + 1):
106
  try:
107
  prompt = """You are given an image. Determine if it shows a table that has exactly 2 or 3 columns.
 
119
  If the image is a relevant table with 2 columns, respond with 'TWO_COLUMN'.
120
  If the image is a relevant table with 3 columns, respond with 'THREE_COLUMN'.
121
  If the image does not show a table at all, respond with 'NO_TABLE'.
122
+ Return only one of these exact labels.
 
 
 
123
  """
124
  global _GEMINI_CLIENT
125
  client = _GEMINI_CLIENT
 
138
  ]
139
  }
140
  ],
141
+ config=types.GenerateContentConfig(temperature=0.)
142
  )
143
  if resp and resp.text:
144
  classification = resp.text.strip().upper()
 
148
  return "TWO_COLUMN"
149
  return "NO_TABLE"
150
  except Exception as e:
151
+ logger.error(f"Gemini table classification error: {e}")
152
+ if "503" in str(e):
 
153
  return "NO_TABLE"
154
  if attempt < max_retries:
 
155
  time.sleep(0.5)
156
  else:
157
  return "NO_TABLE"
158
 
159
  async def classify_image_async(image_data: bytes, api_key: str, max_retries: int = 1) -> str:
 
 
 
160
  loop = asyncio.get_event_loop()
161
  preprocessed = preprocess_image(image_data)
162
  return await loop.run_in_executor(None, call_gemini_for_table_classification, preprocessed, api_key, max_retries)
163
 
164
+ class S3ImageWriter(DataWriter):
165
+ def __init__(self, s3_writer: s3Writer, base_path: str, gemini_api_key: str):
166
+ self.s3_writer = s3_writer
167
+ self.base_path = base_path if base_path.endswith("/") else base_path + "/"
168
+ self.gemini_api_key = gemini_api_key
169
+ self.descriptions = {}
170
+ self._img_count = 0
171
 
172
+ def write(self, path: str, data: bytes) -> None:
173
+ self._img_count += 1
174
+ unique_id = f"img_{self._img_count}.jpg"
175
+ s3_key = f"{self.base_path}{unique_id}"
176
+ self.s3_writer.write(s3_key, data)
177
+ self.descriptions[path] = {
178
+ "data": data,
179
+ "s3_path": s3_key,
180
+ "table_classification": "NO_TABLE",
181
+ "final_alt": ""
182
+ }
183
 
184
+ async def post_process_async(self, key: str, md_content: str) -> str:
185
+ logger.info("Classifying images to detect tables.")
186
+ tasks = []
187
+ for p, info in self.descriptions.items():
188
+ tasks.append((p, classify_image_async(info["data"], self.gemini_api_key)))
189
+
190
+ for p, task in tasks:
191
+ try:
192
+ classification = await task
193
+ self.descriptions[p]['table_classification'] = classification
194
+ except Exception as e:
195
+ logger.error(f"Table classification error: {e}")
196
+ self.descriptions[p]['table_classification'] = "NO_TABLE"
197
+
198
+ for p, info in self.descriptions.items():
199
+ cls = info['table_classification']
200
+ if cls == "TWO_COLUMN":
201
+ info['final_alt'] = "HAS TO BE PROCESSED - two column table"
202
+ elif cls == "THREE_COLUMN":
203
+ info['final_alt'] = "HAS TO BE PROCESSED - three column table"
204
+ else:
205
+ info['final_alt'] = "NO_TABLE image"
206
+ md_content = md_content.replace(f"![]({key}{p})", f"![{info['final_alt']}]({info['s3_path']})")
207
+
208
+ md_content = await self._process_table_images_in_markdown(key, md_content)
209
+ final_lines = []
210
+
211
+ for line in md_content.split("\n"):
212
+ if re.match(r"^\!\[.*\]\(.*\)", line.strip()):
213
+ final_lines.append(line.strip())
214
+
215
+ return "\n".join(final_lines)
216
+
217
+ async def _process_table_images_in_markdown(self, key: str, md_content: str) -> str:
218
+ pat = r"!\[HAS TO BE PROCESSED - (two|three) column table\]\(([^)]+)\)"
219
+ matches = re.findall(pat, md_content, flags=re.IGNORECASE)
220
+
221
+ if not matches:
222
+ return md_content
223
+
224
+ for (col_type, s3_key) in matches:
225
+ logger.info(f"Processing table image: {s3_key}, columns={col_type}")
226
+ img_data = None
227
+ for desc in self.descriptions.values():
228
+ if desc.get("s3_path") == s3_key:
229
+ img_data = desc.get("data")
230
+ break
231
+ if img_data is None:
232
+ logger.warning(f"No image data found for S3 key {s3_key}. Skipping.")
233
+ continue
234
+ with tempfile.NamedTemporaryFile(delete=False, suffix=".jpg") as temp_file:
235
+ temp_file.write(img_data)
236
+ temp_path = temp_file.name
237
+ try:
238
+ if col_type.lower() == 'two':
239
+ extractor = TableExtractor(
240
+ skip_header=True,
241
+ merge_two_col_rows=True,
242
+ enable_subtopic_merge=True,
243
+ subtopic_threshold=0.2
244
+ )
245
+ else:
246
+ extractor = TableExtractor(
247
+ skip_header=True,
248
+ merge_two_col_rows=False,
249
+ enable_subtopic_merge=False,
250
+ subtopic_threshold=0.2
251
+ )
252
+ row_boxes = extractor.process_image(temp_path)
253
+
254
+ snippet = ["**Extracted table cells:**"]
255
+
256
+ for i, row in enumerate(row_boxes):
257
+ for j, _ in enumerate(row):
258
+ cell_unique_key = f"{self.base_path}cells/{os.path.basename(s3_key).split('.')[0]}_row{i}_col{j}.jpg"
259
+ self.s3_writer.write(cell_unique_key, img_data)
260
+ snippet.append(f"![Row {i} Col {j}]({cell_unique_key})")
261
+ new_snip = "\n".join(snippet)
262
+
263
+ old_line = f"![HAS TO BE PROCESSED - {col_type} column table]({s3_key})"
264
+ md_content = md_content.replace(old_line, new_snip)
265
+
266
+ except Exception as e:
267
+ logger.error(f"Error processing table image {s3_key}: {e}")
268
+ finally:
269
+ try:
270
+ os.remove(temp_path)
271
+ except Exception:
272
+ pass
273
+ return md_content
274
+
275
+ def post_process(self, key: str, md_content: str) -> str:
276
+ return asyncio.run(self.post_process_async(key, md_content))
277
+ #test
278
+ class LocalImageWriter(DataWriter):
279
+ def __init__(self, output_folder: str, gemini_api_key: str):
280
+ self.output_folder = output_folder
281
+ os.makedirs(self.output_folder, exist_ok=True)
282
+ self.descriptions = {}
283
+ self._img_count = 0
284
+ self.gemini_api_key = gemini_api_key
285
+
286
+ def write(self, path: str, data: bytes) -> None:
287
+ self._img_count += 1
288
+ unique_id = f"img_{self._img_count}.jpg"
289
+ self.descriptions[path] = {
290
+ "data": data,
291
+ "relative_path": unique_id,
292
+ "table_classification": "NO_TABLE",
293
+ "final_alt": ""
294
+ }
295
+
296
+ async def post_process_async(self, key: str, md_content: str) -> str:
297
+ logger.info("Classifying images to detect tables.")
298
+ tasks = []
299
+ for p, info in self.descriptions.items():
300
+ tasks.append((p, classify_image_async(info["data"], self.gemini_api_key)))
301
+
302
+ for p, task in tasks:
303
+ try:
304
+ classification = await task
305
+ self.descriptions[p]['table_classification'] = classification
306
+ except Exception as e:
307
+ logger.error(f"Table classification error: {e}")
308
+ self.descriptions[p]['table_classification'] = "NO_TABLE"
309
+
310
+ for p, info in self.descriptions.items():
311
+ cls = info['table_classification']
312
+ if cls == "TWO_COLUMN":
313
+ info['final_alt'] = "HAS TO BE PROCESSED - two column table"
314
+ elif cls == "THREE_COLUMN":
315
+ info['final_alt'] = "HAS TO BE PROCESSED - three column table"
316
+ else:
317
+ info['final_alt'] = "NO_TABLE image"
318
+ md_content = md_content.replace(f"![]({key}{p})", f"![{info['final_alt']}]({info['relative_path']})")
319
+
320
+ md_content = self._process_table_images_in_markdown(md_content)
321
+ final_lines = []
322
+
323
+ for line in md_content.split("\n"):
324
+ if re.match(r"^\!\[.*\]\(.*\)", line.strip()):
325
+ final_lines.append(line.strip())
326
+ return "\n".join(final_lines)
327
+
328
+ def _process_table_images_in_markdown(self, md_content: str) -> str:
329
+ pat = r"!\[HAS TO BE PROCESSED - (two|three) column table\]\(([^)]+)\)"
330
+ matches = re.findall(pat, md_content, flags=re.IGNORECASE)
331
+ if not matches:
332
+ return md_content
333
+
334
+ for (col_type, image_id) in matches:
335
+ logger.info(f"Processing table image => {image_id}, columns={col_type}")
336
+ temp_path = os.path.join(self.output_folder, image_id)
337
+ desc_item = None
338
+ for k, val in self.descriptions.items():
339
+ if val["relative_path"] == image_id:
340
+ desc_item = val
341
+ break
342
+ if not desc_item:
343
+ logger.warning(f"No matching image data for {image_id}, skipping extraction.")
344
+ continue
345
+ if not os.path.exists(temp_path):
346
+ with open(temp_path, "wb") as f:
347
+ f.write(desc_item["data"])
348
+ try:
349
+ if col_type.lower() == 'two':
350
+ extractor = TableExtractor(
351
+ skip_header=True,
352
+ merge_two_col_rows=True,
353
+ enable_subtopic_merge=True,
354
+ subtopic_threshold=0.2
355
+ )
356
+ else:
357
+ extractor = TableExtractor(
358
+ skip_header=True,
359
+ merge_two_col_rows=False,
360
+ enable_subtopic_merge=False,
361
+ subtopic_threshold=0.2
362
+ )
363
+ row_boxes = extractor.process_image(temp_path)
364
+
365
+ out_folder = temp_path + "_rows"
366
+ os.makedirs(out_folder, exist_ok=True)
367
+
368
+ extractor.save_extracted_cells(temp_path, row_boxes, out_folder)
369
+
370
+ snippet = ["**Extracted table cells:**"]
371
+ for i, row in enumerate(row_boxes):
372
+ row_dir = os.path.join(out_folder, f"row_{i}")
373
+ for j, _ in enumerate(row):
374
+ cell_file = f"col_{j}.jpg"
375
+ cell_path = os.path.join(row_dir, cell_file)
376
+ relp = os.path.relpath(cell_path, self.output_folder)
377
+ snippet.append(f"![Row {i} Col {j}]({relp})")
378
+ new_snip = "\n".join(snippet)
379
+ old_line = f"![HAS TO BE PROCESSED - {col_type} column table]({image_id})"
380
+ md_content = md_content.replace(old_line, new_snip)
381
+
382
+ except Exception as e:
383
+ logger.error(f"Error processing table image {image_id}: {e}")
384
+ finally:
385
+ if os.path.exists(temp_path):
386
+ os.remove(temp_path)
387
+ return md_content
388
+
389
+ def post_process(self, key: str, md_content: str) -> str:
390
+ return asyncio.run(self.post_process_async(key, md_content))
391
 
392
  class GeminiTopicExtractor:
393
  def __init__(self, api_key: str = None, num_pages: int = 10):
 
399
  if not first_pages_text.strip():
400
  logger.error("No text from first pages => cannot extract subtopics.")
401
  return {}
402
+
403
  prompt = f"""
404
  You have the first pages of a PDF specification, including a table of contents.
405
 
 
542
  {first_pages_text}
543
  """
544
  global _GEMINI_CLIENT
 
 
545
  client = _GEMINI_CLIENT
546
  try:
547
  response = client.models.generate_content(
 
552
  if not response or not response.text:
553
  logger.warning("No text from LLM => returning empty subtopics.")
554
  return {}
 
555
  raw_json = response.text.strip()
556
  cleaned = raw_json.replace("```json", "").replace("```", "")
557
  try:
 
581
  def _read_first_pages_raw(self, pdf_path: str, num_pages: int) -> str:
582
  text_parts = []
583
  try:
584
+ if pdf_path.startswith("http://") or pdf_path.startswith("https://"):
585
+ response = requests.get(pdf_path)
586
+ if response.status_code != 200:
587
+ logger.error("Failed to download PDF from %s. Status code: %d", pdf_path, response.status_code)
588
+ return ""
589
+ pdf_bytes = response.content
590
+ else:
591
+ with open(pdf_path, "rb") as f:
592
+ pdf_bytes = f.read()
593
+ doc = fitz.open(stream=pdf_bytes, filetype="pdf")
594
  pages_to_read = min(num_pages, doc.page_count)
595
  for i in range(pages_to_read):
596
  raw_text = doc[i].get_text("raw")
 
600
  logger.error(f"Could not open PDF: {e}")
601
  return "\n".join(text_parts)
602
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
603
  class MineruNoTextProcessor:
604
  def __init__(self, output_folder: str, gemini_api_key: str = None):
605
  self.output_folder = output_folder
 
611
  self.subtopic_extractor = GeminiTopicExtractor(api_key=gemini_api_key, num_pages=10)
612
  self.gemini_api_key = gemini_api_key or os.getenv("GEMINI_API_KEY", "")
613
 
614
+ if (os.getenv("S3_ACCESS_KEY") and os.getenv("S3_SECRET_KEY") and
615
+ os.getenv("S3_BUCKET_NAME") and os.getenv("S3_ENDPOINT")):
616
+ self.use_s3 = True
617
+ self.s3_writer = s3Writer(
618
+ ak=os.getenv("S3_ACCESS_KEY"),
619
+ sk=os.getenv("S3_SECRET_KEY"),
620
+ bucket=os.getenv("S3_BUCKET_NAME"),
621
+ endpoint_url=os.getenv("S3_ENDPOINT")
622
+ )
623
+ self.base_path = "topic_extraction/"
624
+ else:
625
+ self.use_s3 = False
626
+
627
+
628
  def cleanup_gpu(self):
629
  try:
630
  gc.collect()
 
637
  logger.info(f"Processing PDF: {pdf_path}")
638
  try:
639
  subtopics = self.subtopic_extractor.extract_subtopics(pdf_path)
 
640
  logger.info(f"Gemini returned subtopics: {subtopics}")
641
+
642
+ if pdf_path.startswith("http://") or pdf_path.startswith("https://"):
643
+ response = requests.get(pdf_path)
644
+ if response.status_code != 200:
645
+ logger.error("Failed to download PDF from %s. Status code: %d", pdf_path, response.status_code)
646
+ raise Exception(f"Failed to download PDF: {pdf_path}")
647
+ pdf_bytes = response.content
648
+ logger.info("Downloaded %d bytes for pdf_url='%s'", len(pdf_bytes), pdf_path)
649
+ else:
650
+ with open(pdf_path, "rb") as f:
651
+ pdf_bytes = f.read()
652
+ logger.info("Loaded %d bytes from local file '%s'", len(pdf_bytes), pdf_path)
653
+
654
  doc = fitz.open(stream=pdf_bytes, filetype="pdf")
655
  total_pages = doc.page_count
656
  doc.close()
657
+
658
  final_pages = set()
659
  if not subtopics:
660
+ logger.warning("No subtopics found. Processing entire PDF as fallback.")
661
  final_pages = set(range(total_pages))
662
  else:
663
  for subname, rng in subtopics.items():
 
677
  chosen_page = p
678
  break
679
  if chosen_page is None:
680
+ chosen_page = occs[-1] if occs else 0
681
+ logger.warning(f"No suitable occurrence for '{subname}'. Using page {chosen_page}.")
 
 
 
 
 
682
  raw_offset = chosen_page - doc_start_0
683
  offset = max(0, raw_offset)
 
684
  s0 = (start_p - 1) + offset
685
  e0 = (end_p - 1) + offset
686
  s0 = max(0, min(total_pages - 1, s0))
 
688
  for pp in range(s0, e0 + 1):
689
  final_pages.add(pp)
690
  if not final_pages:
691
+ logger.warning("No valid pages after offset. Processing entire PDF.")
692
  final_pages = set(range(total_pages))
693
  logger.info(f"Processing pages (0-based): {sorted(final_pages)}")
694
 
695
  subset_pdf_bytes = create_subset_pdf(pdf_bytes, sorted(final_pages))
 
696
  dataset = PymuDocDataset(subset_pdf_bytes)
697
  inference = doc_analyze(
698
  dataset,
 
703
  table_enable=self.table_enable
704
  )
705
  logger.info("doc_analyze complete. Extracting images.")
706
+ if self.use_s3:
707
+ writer = S3ImageWriter(self.s3_writer, self.base_path, self.gemini_api_key)
708
+ md_prefix = self.base_path
709
+ else:
710
+ writer = LocalImageWriter(self.output_folder, self.gemini_api_key)
711
+ md_prefix = "local-unique-prefix/"
712
  pipe_result = inference.pipe_ocr_mode(writer, lang=self.language)
713
+ md_content = pipe_result.get_markdown(md_prefix)
714
+ final_markdown = writer.post_process(md_prefix, md_content)
715
+ if self.use_s3:
716
+ final_md_key = f"{self.base_path}final_output.md"
717
+ self.s3_writer.write(final_md_key, final_markdown.encode("utf-8"))
718
+ logger.info(f"Final markdown uploaded to S3 at {final_md_key}")
719
+ else:
720
+ out_path = os.path.join(self.output_folder, "final_output.md")
721
+ with open(out_path, "w", encoding="utf-8") as f:
722
+ f.write(final_markdown)
723
  return final_markdown
 
724
  finally:
725
  self.cleanup_gpu()
726
 
 
731
  try:
732
  processor = MineruNoTextProcessor(output_folder=output_dir, gemini_api_key=gemini_key)
733
  md_output = processor.process(input_pdf)
734
+ logger.info("Processing completed successfully.")
735
  except Exception as e:
736
  logger.error(f"Processing failed: {e}")
worker.py CHANGED
@@ -121,7 +121,24 @@ class RabbitMQWorker:
121
  logger.error("[Worker %s] Failed to publish results.", thread_id)
122
 
123
  logger.info("[Worker %s] Contexts: %s", thread_id, contexts)
124
-
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
125
  else:
126
  ch.basic_ack(delivery_tag=method.delivery_tag, requeue=False)
127
  logger.warning("[Worker %s] Unknown pattern type in headers: %s", thread_id, pattern)
 
121
  logger.error("[Worker %s] Failed to publish results.", thread_id)
122
 
123
  logger.info("[Worker %s] Contexts: %s", thread_id, contexts)
124
+
125
+ elif pattern == "extract_topics":
126
+ data = body_dict.get("data")
127
+ pdf_path = data.get("pdf_path") #url
128
+ topic_processor = MineruNoTextProcessor(gemini_api_key=os.getenv("GEMINI_API_KEY"))
129
+ try:
130
+ topics_markdown = topic_processor.process(pdf_path)
131
+ data["topics_markdown"] = topics_markdown
132
+ body_dict["pattern"] = "topic_extraction_update_from_gpu_server"
133
+ body_dict["data"] = data
134
+ if self.publish_message(body_dict, headers):
135
+ ch.basic_ack(delivery_tag=method.delivery_tag)
136
+ else:
137
+ ch.basic_nack(delivery_tag=method.delivery_tag, requeue=True)
138
+ except Exception as e:
139
+ logger.error(f"Error processing topic extraction: {e}")
140
+ ch.basic_nack(delivery_tag=method.delivery_tag, requeue=True)
141
+
142
  else:
143
  ch.basic_ack(delivery_tag=method.delivery_tag, requeue=False)
144
  logger.warning("[Worker %s] Unknown pattern type in headers: %s", thread_id, pattern)