Spaces:
Sleeping
Sleeping
File size: 8,721 Bytes
ca4cce7 e1242e7 ca4cce7 e1242e7 ca4cce7 e1242e7 ca4cce7 e1242e7 ca4cce7 e1242e7 ca4cce7 e1242e7 ca4cce7 e1242e7 9873f3c e1242e7 ca4cce7 9873f3c e1242e7 ca4cce7 e1242e7 9873f3c e1242e7 ca4cce7 e1242e7 ca4cce7 e1242e7 ca4cce7 9873f3c ca4cce7 9873f3c ca4cce7 e1242e7 ca4cce7 e565aa7 ca4cce7 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 | import base64
import hashlib
import io
import textwrap
import zipfile
from collections import OrderedDict
from pathlib import Path
from xml.etree import ElementTree
from PIL import Image, ImageDraw, ImageFilter, ImageFont, ImageOps
from backend.glossary_service import GlossaryService
from backend.ocr_service import OCRService
class DocumentService:
MAX_BYTES = 15 * 1024 * 1024
MAX_PDF_PAGES = 12
def __init__(self, ocr: OCRService, glossary: GlossaryService):
self.ocr = ocr
self.glossary = glossary
self._cache: OrderedDict[str, dict] = OrderedDict()
def analyze(self, encoded: str, filename: str, language: str) -> dict:
try:
raw = base64.b64decode(encoded, validate=True)
except Exception as exc:
raise ValueError("Invalid uploaded document") from exc
if not raw or len(raw) > self.MAX_BYTES:
raise ValueError("Document must be between 1 byte and 15 MB")
cache_key = hashlib.sha256(raw + filename.lower().encode() + language.encode()).hexdigest()
if cache_key in self._cache:
self._cache.move_to_end(cache_key)
return {**self._cache[cache_key], "cached": True}
extension = Path(filename).suffix.lower()
if extension in {".jpg", ".jpeg", ".png", ".webp"}:
result = self._image(raw, language)
elif extension == ".pdf":
result = self._pdf(raw)
elif extension == ".docx":
result = self._docx(raw)
else:
raise ValueError("Supported formats are JPG, PNG, WebP, PDF, and DOCX")
self._cache[cache_key] = result
while len(self._cache) > 16:
self._cache.popitem(last=False)
return {**result, "cached": False}
def _image(self, raw: bytes, language: str) -> dict:
try:
image = Image.open(io.BytesIO(raw)).convert("RGB")
image.load()
except Exception as exc:
raise ValueError("The uploaded image could not be opened") from exc
image.thumbnail((1000, 1000), Image.Resampling.BICUBIC)
prepared = ImageOps.autocontrast(image.convert("L")).filter(ImageFilter.SHARPEN).convert("RGB")
detections = self.ocr.extract(prepared, language)
terms = self.glossary.match_regions(detections)
if len(terms) < 3:
terms.extend(self.glossary.contextual_fallbacks(detections, 3 - len(terms), terms))
_, unknown = self.glossary.match_ocr(detections)
return self._result(image, terms, detections, unknown, "image_ocr")
def _pdf(self, raw: bytes) -> dict:
try:
import fitz
except ImportError as exc:
raise RuntimeError("PDF support is not installed") from exc
try:
document = fitz.open(stream=raw, filetype="pdf")
if document.page_count < 1:
raise ValueError("The PDF has no pages")
page_count = min(document.page_count, self.MAX_PDF_PAGES)
rendered = []
canvas_width = 0
for index in range(page_count):
page = document[index]
zoom = min(1.2, 800 / max(1, page.rect.width))
pixmap = page.get_pixmap(matrix=fitz.Matrix(zoom, zoom), alpha=False)
page_image = Image.frombytes("RGB", [pixmap.width, pixmap.height], pixmap.samples)
rendered.append((page, page_image, zoom))
canvas_width = max(canvas_width, page_image.width)
gap = 18
total_height = sum(item[1].height for item in rendered) + gap * max(0, page_count - 1)
image = Image.new("RGB", (canvas_width, total_height), "#dfe3df")
regions = []
y_offset = 0
for page, page_image, zoom in rendered:
x_offset = (canvas_width - page_image.width) // 2
image.paste(page_image, (x_offset, y_offset))
page_regions = []
for block in page.get_text("blocks"):
x1, y1, x2, y2, text = block[:5]
if text.strip():
page_regions.append({
"text": text,
"bbox": [x_offset + x1 * zoom, y_offset + y1 * zoom,
x_offset + x2 * zoom, y_offset + y2 * zoom],
"confidence": 1.0,
"language": self._language(text),
})
if page_regions:
regions.extend(page_regions)
else:
page_detections = self.ocr.extract(page_image, "en")
for item in page_detections:
item["bbox"] = [item["bbox"][0] + x_offset, item["bbox"][1] + y_offset,
item["bbox"][2] + x_offset, item["bbox"][3] + y_offset]
regions.extend(page_detections)
y_offset += page_image.height + gap
terms = self.glossary.match_regions(regions)
if len(terms) < 3:
terms.extend(self.glossary.contextual_fallbacks(regions, 3 - len(terms), terms))
result = self._result(image, terms, regions, [], "pdf_multpage")
result.update({"page_count": document.page_count, "pages_analyzed": page_count,
"truncated": document.page_count > page_count})
return result
except (ValueError, RuntimeError):
raise
except Exception as exc:
raise ValueError("The PDF could not be processed") from exc
def _docx(self, raw: bytes) -> dict:
try:
with zipfile.ZipFile(io.BytesIO(raw)) as archive:
xml = archive.read("word/document.xml")
except Exception as exc:
raise ValueError("The Word document could not be opened") from exc
root = ElementTree.fromstring(xml)
paragraphs = []
for paragraph in root.iter("{http://schemas.openxmlformats.org/wordprocessingml/2006/main}p"):
text = "".join(node.text or "" for node in paragraph.iter("{http://schemas.openxmlformats.org/wordprocessingml/2006/main}t"))
if text.strip():
paragraphs.append(text.strip())
if not paragraphs:
raise ValueError("The Word document contains no readable text")
return self._render_text(paragraphs)
def _render_text(self, paragraphs: list[str]) -> dict:
width, margin, line_height = 1000, 80, 36
wrapped = []
for paragraph in paragraphs:
wrapped.extend(textwrap.wrap(paragraph, width=65, break_long_words=False) or [""])
wrapped.append("")
height = max(900, margin * 2 + len(wrapped) * line_height)
image = Image.new("RGB", (width, height), "#ffffff")
draw = ImageDraw.Draw(image)
try:
font = ImageFont.truetype("/usr/share/fonts/truetype/dejavu/DejaVuSans.ttf", 28)
except OSError:
font = ImageFont.load_default()
regions = []
y = margin
for line in wrapped:
if y + line_height > height - margin:
break
if line:
draw.text((margin, y), line, fill="#17211e", font=font)
regions.append({"text": line, "bbox": [margin, y, width - margin, y + line_height],
"confidence": 1.0, "language": self._language(line)})
y += line_height
terms = self.glossary.match_regions(regions)
if len(terms) < 3:
terms.extend(self.glossary.contextual_fallbacks(regions, 3 - len(terms), terms))
return self._result(image, terms, regions, [], "docx_text")
@staticmethod
def _language(text: str) -> str:
return "ar" if any("\u0600" <= character <= "\u06ff" for character in text) else "en"
@staticmethod
def _result(image: Image.Image, terms: list, detections: list, unknown: list, method: str) -> dict:
output = io.BytesIO()
image.save(output, format="JPEG", quality=70, optimize=True)
return {
"detected_terms": terms,
"ocr_items": detections,
"unknown_terms": unknown[:10],
"frame_width": image.width,
"frame_height": image.height,
"preview_base64": "data:image/jpeg;base64," + base64.b64encode(output.getvalue()).decode(),
"ocr_available": True,
"analysis_method": method,
"mean_ocr_confidence": round(sum(float(item.get("confidence", 1)) for item in detections) / len(detections), 3) if detections else 0,
}
|