davidtran999 commited on
Commit
ed538dd
·
verified ·
1 Parent(s): 7e0d0df

Upload backend/core/etl/legal_document_loader.py with huggingface_hub

Browse files
backend/core/etl/legal_document_loader.py ADDED
@@ -0,0 +1,489 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ Utilities to ingest PDF/DOCX legal documents while preserving text, structure, and images.
3
+ """
4
+
5
+ from __future__ import annotations
6
+
7
+ import re
8
+ import os
9
+ from dataclasses import dataclass
10
+ from pathlib import Path
11
+ from typing import BinaryIO, Iterable, List, Optional, Union
12
+ from io import BytesIO
13
+
14
+ import fitz # PyMuPDF
15
+ from docx import Document as DocxDocument
16
+ from PIL import Image as PILImage
17
+ try:
18
+ import pytesseract
19
+
20
+ OCR_AVAILABLE = True
21
+ except Exception: # pragma: no cover - optional dependency
22
+ pytesseract = None
23
+ OCR_AVAILABLE = False
24
+
25
+ # Support for .doc files (Word 97-2003)
26
+ # We'll convert .doc to .docx using LibreOffice or use python-docx2txt
27
+ try:
28
+ import subprocess
29
+ SUBPROCESS_AVAILABLE = True
30
+ except ImportError:
31
+ SUBPROCESS_AVAILABLE = False
32
+
33
+
34
+ @dataclass
35
+ class SectionChunk:
36
+ """Structured chunk extracted from a legal document."""
37
+
38
+ level: str
39
+ code: str
40
+ title: str
41
+ content: str
42
+ page_start: Optional[int] = None
43
+ page_end: Optional[int] = None
44
+ is_ocr: bool = False
45
+ metadata: Optional[dict] = None
46
+
47
+
48
+ @dataclass
49
+ class ExtractedImage:
50
+ """Image extracted from the source document."""
51
+
52
+ data: bytes
53
+ extension: str
54
+ content_type: str
55
+ page_number: Optional[int] = None
56
+ description: str = ""
57
+ width: Optional[int] = None
58
+ height: Optional[int] = None
59
+
60
+
61
+ @dataclass
62
+ class ExtractedDocument:
63
+ """Return value when parsing one document."""
64
+
65
+ text: str
66
+ page_count: int
67
+ sections: List[SectionChunk]
68
+ images: List[ExtractedImage]
69
+ ocr_text: Optional[str] = None
70
+
71
+
72
+ SECTION_REGEX = re.compile(
73
+ r"^(Chương\s+[IVXLC\d]+|Mục\s+[IVXLC\d]+|Điều\s+\d+[\w]*)",
74
+ re.IGNORECASE,
75
+ )
76
+
77
+
78
+ def _detect_level(header: str) -> str:
79
+ header_lower = header.lower()
80
+ if header_lower.startswith("chương"):
81
+ return "chapter"
82
+ if header_lower.startswith("mục"):
83
+ return "section"
84
+ if header_lower.startswith("điều"):
85
+ return "article"
86
+ return "other"
87
+
88
+
89
+ def _split_sections(paragraphs: Iterable[str], *, is_ocr: bool = False) -> List[SectionChunk]:
90
+ sections: List[SectionChunk] = []
91
+ current: Optional[SectionChunk] = None
92
+
93
+ for paragraph in paragraphs:
94
+ paragraph = paragraph.strip()
95
+ if not paragraph:
96
+ continue
97
+
98
+ match = SECTION_REGEX.match(paragraph)
99
+ if match:
100
+ header = match.group(0)
101
+ rest = paragraph[len(header) :].strip()
102
+ level = _detect_level(header)
103
+ current = SectionChunk(
104
+ level=level,
105
+ code=header,
106
+ title=rest,
107
+ content=paragraph,
108
+ is_ocr=is_ocr,
109
+ )
110
+ sections.append(current)
111
+ elif current:
112
+ current.content += "\n" + paragraph
113
+ else:
114
+ current = SectionChunk(
115
+ level="other",
116
+ code="Lời mở đầu",
117
+ title="",
118
+ content=paragraph,
119
+ is_ocr=is_ocr,
120
+ )
121
+ sections.append(current)
122
+
123
+ return sections
124
+
125
+
126
+ def _extract_docx_images(doc: DocxDocument) -> List[ExtractedImage]:
127
+ images: List[ExtractedImage] = []
128
+ rels = doc.part._rels.values()
129
+ for rel in rels:
130
+ if "image" not in rel.reltype:
131
+ continue
132
+ part = rel.target_part
133
+ data = part.blob
134
+ # Determine extension and metadata
135
+ partname = Path(part.partname)
136
+ ext = partname.suffix.lstrip(".") or "bin"
137
+ content_type = getattr(part, "content_type", "application/octet-stream")
138
+ width = None
139
+ height = None
140
+ try:
141
+ with PILImage.open(BytesIO(data)) as pil_img:
142
+ width, height = pil_img.size
143
+ except Exception:
144
+ pass
145
+ images.append(
146
+ ExtractedImage(
147
+ data=data,
148
+ extension=ext,
149
+ content_type=content_type,
150
+ page_number=None,
151
+ width=width,
152
+ height=height,
153
+ )
154
+ )
155
+ return images
156
+
157
+
158
+ def extract_from_docx(path: Optional[Path] = None, data: Optional[bytes] = None) -> ExtractedDocument:
159
+ """Parse DOCX file (path or bytes), keeping paragraphs in order and capturing embedded images."""
160
+ if path is None and data is None:
161
+ raise ValueError("DOCX extraction requires path or bytes.")
162
+ if data is not None:
163
+ doc = DocxDocument(BytesIO(data))
164
+ else:
165
+ doc = DocxDocument(path)
166
+ paragraphs = [para.text for para in doc.paragraphs]
167
+ full_text = "\n".join(paragraphs)
168
+ sections = _split_sections(paragraphs, is_ocr=False)
169
+ images = _extract_docx_images(doc)
170
+ # DOCX has no fixed page count; approximate by paragraphs length
171
+ sections = _apply_chunk_strategy(sections, full_text)
172
+ return ExtractedDocument(
173
+ text=full_text,
174
+ page_count=len(doc.paragraphs) or 1,
175
+ sections=sections,
176
+ images=images,
177
+ ocr_text=None,
178
+ )
179
+
180
+
181
+ def _pixmap_to_pil(pix: fitz.Pixmap) -> PILImage.Image:
182
+ mode = "RGB"
183
+ if pix.n == 1:
184
+ mode = "L"
185
+ elif pix.n == 4:
186
+ mode = "RGBA"
187
+ return PILImage.frombytes(mode, [pix.width, pix.height], pix.samples)
188
+
189
+
190
+ def _perform_ocr_on_page(page: fitz.Page) -> str:
191
+ if not OCR_AVAILABLE:
192
+ return ""
193
+ try:
194
+ zoom = os.getenv("OCR_PDF_ZOOM", "2.0")
195
+ try:
196
+ zoom_val = float(zoom)
197
+ except ValueError:
198
+ zoom_val = 2.0
199
+ matrix = fitz.Matrix(zoom_val, zoom_val)
200
+ pix = page.get_pixmap(matrix=matrix)
201
+ pil_img = _pixmap_to_pil(pix)
202
+ langs = os.getenv("OCR_LANGS", "vie+eng")
203
+ text = pytesseract.image_to_string(pil_img, lang=langs)
204
+ return text.strip()
205
+ except Exception:
206
+ return ""
207
+
208
+
209
+ def _extract_pdf_images(pdf: fitz.Document) -> List[ExtractedImage]:
210
+ images: List[ExtractedImage] = []
211
+ for page_index in range(pdf.page_count):
212
+ page = pdf.load_page(page_index)
213
+ for image in page.get_images(full=True):
214
+ xref = image[0]
215
+ try:
216
+ pix = fitz.Pixmap(pdf, xref)
217
+ if pix.n - pix.alpha > 3:
218
+ pix = fitz.Pixmap(fitz.csRGB, pix)
219
+ img_bytes = pix.tobytes("png")
220
+ images.append(
221
+ ExtractedImage(
222
+ data=img_bytes,
223
+ extension="png",
224
+ content_type="image/png",
225
+ page_number=page_index + 1,
226
+ width=pix.width,
227
+ height=pix.height,
228
+ )
229
+ )
230
+ if pix.alpha and pix.n > 4:
231
+ pix = None
232
+ except Exception:
233
+ continue
234
+ return images
235
+
236
+
237
+ def extract_from_doc(path: Optional[Path] = None, data: Optional[bytes] = None) -> ExtractedDocument:
238
+ """
239
+ Parse .doc file (Word 97-2003 format).
240
+ Converts .doc to .docx using LibreOffice if available, then processes as .docx.
241
+ Otherwise, extracts text using basic methods.
242
+ """
243
+ if path is None and data is None:
244
+ raise ValueError("DOC extraction requires path or bytes.")
245
+
246
+ import tempfile
247
+ import shutil
248
+
249
+ # If we have data, save to temp file
250
+ if data is not None:
251
+ with tempfile.NamedTemporaryFile(delete=False, suffix='.doc') as tmp:
252
+ tmp.write(data)
253
+ doc_path = Path(tmp.name)
254
+ temp_created = True
255
+ else:
256
+ doc_path = Path(path)
257
+ temp_created = False
258
+
259
+ try:
260
+ # Try to convert .doc to .docx using LibreOffice
261
+ if SUBPROCESS_AVAILABLE:
262
+ try:
263
+ # Check if LibreOffice is available
264
+ result = subprocess.run(
265
+ ['which', 'libreoffice'] if os.name != 'nt' else ['where', 'libreoffice'],
266
+ capture_output=True,
267
+ text=True
268
+ )
269
+ if result.returncode == 0 or shutil.which('libreoffice') or shutil.which('soffice'):
270
+ # Convert .doc to .docx
271
+ with tempfile.TemporaryDirectory() as tmpdir:
272
+ output_dir = Path(tmpdir)
273
+ # Use soffice (LibreOffice) or libreoffice command
274
+ cmd = shutil.which('soffice') or shutil.which('libreoffice')
275
+ if cmd:
276
+ subprocess.run(
277
+ [cmd, '--headless', '--convert-to', 'docx', '--outdir', str(output_dir), str(doc_path)],
278
+ check=True,
279
+ capture_output=True,
280
+ timeout=30
281
+ )
282
+ # Find the converted file
283
+ converted_file = output_dir / (doc_path.stem + '.docx')
284
+ if converted_file.exists():
285
+ # Process as .docx
286
+ return extract_from_docx(path=converted_file)
287
+ except (subprocess.SubprocessError, FileNotFoundError, TimeoutError):
288
+ pass # Fall through to basic text extraction
289
+
290
+ # Fallback: Basic text extraction using python-docx (won't work for .doc)
291
+ # Or try to read as plain text
292
+ try:
293
+ # Try to read as text (basic fallback)
294
+ with open(doc_path, 'rb') as f:
295
+ # Skip binary header, try to extract readable text
296
+ content = f.read()
297
+ # Very basic: try to extract text between null bytes or readable ranges
298
+ # This is a last resort and won't work well
299
+ text_parts = []
300
+ current_text = ""
301
+ for byte in content:
302
+ if 32 <= byte <= 126 or byte in [9, 10, 13]: # Printable ASCII
303
+ current_text += chr(byte)
304
+ else:
305
+ if len(current_text) > 10:
306
+ text_parts.append(current_text)
307
+ current_text = ""
308
+ if current_text:
309
+ text_parts.append(current_text)
310
+
311
+ full_text = "\n".join(text_parts)
312
+ if len(full_text) > 100: # If we got reasonable text
313
+ paragraphs = [p.strip() for p in full_text.split('\n') if p.strip()]
314
+ sections = _split_sections(paragraphs, is_ocr=False)
315
+ sections = _apply_chunk_strategy(sections, full_text)
316
+ return ExtractedDocument(
317
+ text=full_text,
318
+ page_count=len(paragraphs) or 1,
319
+ sections=sections,
320
+ images=[],
321
+ ocr_text=None,
322
+ )
323
+ except Exception:
324
+ pass
325
+
326
+ # If all else fails, raise helpful error
327
+ raise ValueError(
328
+ "File type .doc (Word 97-2003) is not fully supported. "
329
+ "Please convert the file to .docx format using Microsoft Word or LibreOffice, "
330
+ "or install LibreOffice command-line tools for automatic conversion."
331
+ )
332
+ finally:
333
+ if temp_created and doc_path.exists():
334
+ os.unlink(doc_path)
335
+
336
+
337
+ def extract_from_pdf(path: Optional[Path] = None, data: Optional[bytes] = None) -> ExtractedDocument:
338
+ """Parse PDF file using PyMuPDF (path or bytes) and capture page text + images."""
339
+ if path is None and data is None:
340
+ raise ValueError("PDF extraction requires path or bytes.")
341
+ if data is not None:
342
+ pdf = fitz.open(stream=data, filetype="pdf")
343
+ else:
344
+ pdf = fitz.open(path)
345
+
346
+ fragments: List[str] = []
347
+ ocr_fragments: List[str] = []
348
+ sections: List[SectionChunk] = []
349
+ current: Optional[SectionChunk] = None
350
+
351
+ for page_index in range(pdf.page_count):
352
+ page = pdf.load_page(page_index)
353
+ page_text = page.get_text("text").strip()
354
+ page_is_ocr = False
355
+ if not page_text:
356
+ ocr_text = _perform_ocr_on_page(page)
357
+ if ocr_text:
358
+ page_text = ocr_text
359
+ page_is_ocr = True
360
+ ocr_fragments.append(ocr_text)
361
+ fragments.append(page_text)
362
+
363
+ for paragraph in page_text.splitlines():
364
+ paragraph = paragraph.strip()
365
+ if not paragraph:
366
+ continue
367
+ match = SECTION_REGEX.match(paragraph)
368
+ if match:
369
+ header = match.group(0)
370
+ rest = paragraph[len(header) :].strip()
371
+ level = _detect_level(header)
372
+ current = SectionChunk(
373
+ level=level,
374
+ code=header,
375
+ title=rest,
376
+ content=paragraph,
377
+ page_start=page_index + 1,
378
+ page_end=page_index + 1,
379
+ is_ocr=page_is_ocr,
380
+ )
381
+ sections.append(current)
382
+ elif current:
383
+ current.content += "\n" + paragraph
384
+ current.page_end = page_index + 1
385
+ current.is_ocr = current.is_ocr or page_is_ocr
386
+ else:
387
+ current = SectionChunk(
388
+ level="other",
389
+ code="Trang đầu",
390
+ title="",
391
+ content=paragraph,
392
+ page_start=page_index + 1,
393
+ page_end=page_index + 1,
394
+ is_ocr=page_is_ocr,
395
+ )
396
+ sections.append(current)
397
+
398
+ images = _extract_pdf_images(pdf)
399
+ full_text = "\n".join(fragments)
400
+ ocr_text = "\n".join(ocr_fragments) if ocr_fragments else None
401
+ sections = _apply_chunk_strategy(sections, full_text)
402
+ return ExtractedDocument(
403
+ text=full_text,
404
+ page_count=pdf.page_count,
405
+ sections=sections,
406
+ images=images,
407
+ ocr_text=ocr_text,
408
+ )
409
+
410
+
411
+ def _generate_semantic_chunks(text: str, chunk_size: int, overlap: int) -> List[SectionChunk]:
412
+ if chunk_size <= 0:
413
+ return []
414
+ overlap = max(0, min(overlap, chunk_size - 1))
415
+ chunks: List[SectionChunk] = []
416
+ length = len(text)
417
+ start = 0
418
+ idx = 1
419
+ while start < length:
420
+ end = min(length, start + chunk_size)
421
+ chunk_content = text[start:end].strip()
422
+ if chunk_content:
423
+ chunks.append(
424
+ SectionChunk(
425
+ level="chunk",
426
+ code=f"Chunk {idx}",
427
+ title="",
428
+ content=chunk_content,
429
+ metadata={"chunk_strategy": "semantic"},
430
+ )
431
+ )
432
+ idx += 1
433
+ if end >= length:
434
+ break
435
+ start = max(0, end - overlap)
436
+ return chunks
437
+
438
+
439
+ def _apply_chunk_strategy(sections: List[SectionChunk], full_text: str) -> List[SectionChunk]:
440
+ strategy = os.getenv("LEGAL_CHUNK_STRATEGY", "structure").lower()
441
+ if strategy != "hybrid":
442
+ return sections
443
+ try:
444
+ chunk_size = int(os.getenv("LEGAL_CHUNK_SIZE", "1200"))
445
+ except ValueError:
446
+ chunk_size = 1200
447
+ try:
448
+ overlap = int(os.getenv("LEGAL_CHUNK_OVERLAP", "200"))
449
+ except ValueError:
450
+ overlap = 200
451
+ new_sections = list(sections)
452
+ new_sections.extend(_generate_semantic_chunks(full_text, chunk_size, overlap))
453
+ return new_sections
454
+
455
+
456
+ SourceType = Union[str, Path, BinaryIO]
457
+
458
+
459
+ def load_legal_document(source: SourceType, filename: Optional[str] = None) -> ExtractedDocument:
460
+ """
461
+ Dispatch helper depending on file type.
462
+
463
+ Args:
464
+ source: path or binary handle.
465
+ filename: optional original filename (needed when source is a stream).
466
+
467
+ Raises:
468
+ ValueError: if extension unsupported.
469
+ """
470
+ path_obj: Optional[Path] = None
471
+ data: Optional[bytes] = None
472
+
473
+ if isinstance(source, (str, Path)):
474
+ path_obj = Path(source)
475
+ suffix = path_obj.suffix.lower()
476
+ else:
477
+ data = source.read()
478
+ if hasattr(source, "seek"):
479
+ source.seek(0)
480
+ suffix = Path(filename or "").suffix.lower()
481
+
482
+ if suffix == ".docx":
483
+ return extract_from_docx(path=path_obj, data=data)
484
+ if suffix == ".doc":
485
+ return extract_from_doc(path=path_obj, data=data)
486
+ if suffix == ".pdf":
487
+ return extract_from_pdf(path=path_obj, data=data)
488
+ raise ValueError(f"Unsupported file type: {suffix or 'unknown'}")
489
+