Spaces:
Sleeping
Sleeping
File size: 8,368 Bytes
236675e | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 | """
app/services/parser.py
PDF text extraction and article chunking for both English and Arabic.
Pure functions โ no global state.
English path: reads from pre-extracted text file (extracted_text_en.txt)
placed in the same directory as main.py.
The uploaded PDF is accepted by the route but ignored for extraction.
OCR code is kept below but commented out.
Arabic path: native PyMuPDF text extraction (unchanged).
"""
from __future__ import annotations
import re
import unicodedata
import os
import fitz # PyMuPDF
# โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
# ENGLISH โ pre-extracted text file
# โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
# Path to the pre-extracted OCR text file.
# Place extracted_text_en.txt next to main.py (i.e. inside the lexai/ folder).
_EN_TEXT_FILE = os.path.join(
os.path.dirname(os.path.dirname(os.path.dirname(__file__))), # โ lexai/
"extracted_text_en.txt",
)
def _load_english_text() -> str:
"""Read the pre-extracted English text file from disk."""
if not os.path.exists(_EN_TEXT_FILE):
raise FileNotFoundError(
f"Pre-extracted English text not found at: {_EN_TEXT_FILE}\n"
"Place 'extracted_text_en.txt' inside the lexai/ folder."
)
with open(_EN_TEXT_FILE, "r", encoding="utf-8") as f:
return f.read()
# โโ OCR extraction (commented out โ kept for reference) โโโโโโโโโโโโโโโโโโโโโโ
# def extract_text_ocr(pdf_bytes: bytes, gpu: bool = False) -> str:
# """Render every page at 300 DPI and run EasyOCR over it."""
# import easyocr
# import numpy as np
#
# reader = easyocr.Reader(["en"], gpu=gpu)
# doc = fitz.open(stream=pdf_bytes, filetype="pdf")
# raw_text = ""
#
# for page in doc:
# mat = fitz.Matrix(300 / 72, 300 / 72)
# pix = page.get_pixmap(matrix=mat)
# img = np.frombuffer(pix.samples, dtype=np.uint8).reshape(
# pix.height, pix.width, pix.n
# )
# if pix.n == 4:
# img = img[:, :, :3]
#
# results = reader.readtext(img, detail=0, paragraph=True)
# raw_text += "\n".join(results) + "\n"
#
# doc.close()
# return raw_text
# โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
def clean_ocr_english(text: str) -> str:
text = re.sub(
r'\bArtic1e\b|\bArticie\b|\bARTIC1E\b|\bArt1cle\b',
"Article", text, flags=re.IGNORECASE
)
text = re.sub(r'\s+', ' ', text)
return text.strip()
def split_articles_english(text: str) -> list[dict]:
pattern = r'(Article\s*\(\s*\d+\s*\))'
parts = re.split(pattern, text, flags=re.IGNORECASE)
articles: list[dict] = []
for i in range(1, len(parts), 2):
header = parts[i].strip()
body = parts[i + 1].strip() if i + 1 < len(parts) else ""
if len(body) < 20:
continue
num_match = re.search(r'\d+', header)
num = int(num_match.group()) if num_match else -1
articles.append({
"article_id" : f"en_{num}",
"article_num": num,
"header" : header,
"text" : f"{header} {body}",
"lang" : "en",
})
seen: dict[int, dict] = {}
for art in articles:
n = art["article_num"]
if n not in seen or len(art["text"]) > len(seen[n]["text"]):
seen[n] = art
return sorted(seen.values(), key=lambda x: x["article_num"])
def parse_english_pdf(pdf_bytes: bytes, gpu: bool = False) -> list[dict]:
"""
Parse English articles from the pre-extracted text file.
pdf_bytes is accepted so the route signature stays the same but ignored.
gpu is kept for API compatibility but unused.
"""
raw = _load_english_text()
text = clean_ocr_english(raw)
return split_articles_english(text)
# โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
# ARABIC โ native PDF text extraction
# โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
_AR_INDIC = str.maketrans("ู ูกูขูฃูคูฅูฆูงูจูฉ", "0123456789")
_PAT_WITH_PAREN = re.compile(
r"ู
\s?ุง\s?ุฏ\s?ุฉ\s*[\(\)]\s*"
r"((?:[ู -ูฉ]|\d)(?:[\n\s]*(?:[ู -ูฉ]|\d))*)"
r"\s*[\(\)]"
)
_PAT_NO_OPEN_PAREN = re.compile(r"ู
ุงุฏุฉ(\d+)\n\)")
def _extract_text_native(pdf_bytes: bytes) -> str:
doc = fitz.open(stream=pdf_bytes, filetype="pdf")
raw = "\n".join(page.get_text() for page in doc)
doc.close()
return raw
def _clean_arabic(text: str) -> str:
text = unicodedata.normalize("NFC", text)
for ch in '\u202a\u202b\u202c\u202d\u202e\u200e\u200f\u2066\u2067\u2068\u2069':
text = text.replace(ch, '')
text = re.sub(r'[\u0610-\u061A\u064B-\u065F]', '', text)
text = text.replace("ุงุฅู", "ุงูุฅ")
text = text.replace("ุงุฃู", "ุงูุฃ")
text = text.replace("ุงุขู", "ุงูุข")
text = re.sub(r'\bุงู\b', 'ูุง', text)
text = re.sub(r'\bูุงู\b', 'ููุง', text)
text = re.sub(r'\bุฅุงู\b', 'ุฅูุง', text)
text = re.sub(r'\bุฃุงู\b', 'ุฃูุง', text)
text = re.sub(r'\bูุงู\b', 'ููุง', text)
text = re.sub(r'\b([ุจุชุซุฌุญุฎุฏุฐุฑุฒุณุดุตุถุทุธุนุบููููู
ููู])\s+', r'\1', text)
text = re.sub(r'\s+([ุ\.ุ:ุ!])', r'\1', text)
text = re.sub(r'([ุ\.ุ:ุ!])(?=[^\s\d])', r'\1 ', text)
return re.sub(r' {2,}', ' ', text).strip()
def _normalize_num(raw: str) -> int:
d = "".join(raw.split())
if re.fullmatch(r"[ู -ูฉ]+", d):
d = d[::-1]
return int(d.translate(_AR_INDIC))
def _normalize_body(body: str) -> str:
body = re.sub(r"([\u0600-\u06FF])\n([ูุง])(?=[\n ุ.ุุ!]|$)", r"\1\2", body)
body = re.sub(r"(?<!\n)\n(?!\n)", " ", body)
return re.sub(r" +", " ", body).strip()
def _chunk_arabic(text: str, noise_phrases: list[str] | None = None) -> list[dict]:
noise_phrases = noise_phrases or []
seen: dict[int, int] = {}
for m in _PAT_WITH_PAREN.finditer(text):
try:
n = _normalize_num(m.group(1))
if n not in seen:
seen[n] = m.start()
except ValueError:
continue
for m in _PAT_NO_OPEN_PAREN.finditer(text):
try:
n = int(m.group(1))
if n not in seen:
seen[n] = m.start()
except ValueError:
continue
headers = sorted((offset, num) for num, offset in seen.items())
articles: list[dict] = []
for i, (start, num) in enumerate(headers):
end = headers[i + 1][0] if i + 1 < len(headers) else len(text)
raw_body = text[start:end]
raw_body = re.sub(r"^ู
\s?ุง\s?ุฏ\s?ุฉ[\s\S]{0,20}?\)\s*", "", raw_body).strip()
if len(raw_body) < 10:
continue
body = _normalize_body(raw_body)
for noise in noise_phrases:
body = body.replace(noise, "")
articles.append({
"article_id" : f"ar_{num}",
"article_num": num,
"header" : f"ู
ุงุฏุฉ ({num})",
"text" : "\u200f" + f"ู
ุงุฏุฉ ({num})\n" + body.strip(),
"lang" : "ar",
})
articles.sort(key=lambda a: a["article_num"])
return articles
def parse_arabic_pdf(
pdf_bytes: bytes,
noise_phrases: list[str] | None = None,
) -> list[dict]:
raw = _extract_text_native(pdf_bytes)
text = _clean_arabic(raw)
return _chunk_arabic(text, noise_phrases) |