File size: 8,721 Bytes
ca4cce7
e1242e7
ca4cce7
 
 
e1242e7
ca4cce7
 
 
e1242e7
ca4cce7
 
 
 
 
 
 
e1242e7
ca4cce7
 
 
 
e1242e7
ca4cce7
 
 
 
 
 
 
 
e1242e7
 
 
 
ca4cce7
 
e1242e7
 
 
 
 
 
 
 
9873f3c
e1242e7
 
ca4cce7
 
 
 
 
 
 
9873f3c
e1242e7
 
ca4cce7
 
 
 
 
 
 
 
 
 
 
 
 
 
 
e1242e7
 
 
 
 
9873f3c
e1242e7
 
 
 
 
 
 
ca4cce7
e1242e7
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
ca4cce7
 
 
e1242e7
 
 
 
ca4cce7
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
9873f3c
ca4cce7
 
9873f3c
ca4cce7
e1242e7
ca4cce7
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
e565aa7
ca4cce7
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
import base64
import hashlib
import io
import textwrap
import zipfile
from collections import OrderedDict
from pathlib import Path
from xml.etree import ElementTree

from PIL import Image, ImageDraw, ImageFilter, ImageFont, ImageOps

from backend.glossary_service import GlossaryService
from backend.ocr_service import OCRService


class DocumentService:
    MAX_BYTES = 15 * 1024 * 1024
    MAX_PDF_PAGES = 12

    def __init__(self, ocr: OCRService, glossary: GlossaryService):
        self.ocr = ocr
        self.glossary = glossary
        self._cache: OrderedDict[str, dict] = OrderedDict()

    def analyze(self, encoded: str, filename: str, language: str) -> dict:
        try:
            raw = base64.b64decode(encoded, validate=True)
        except Exception as exc:
            raise ValueError("Invalid uploaded document") from exc
        if not raw or len(raw) > self.MAX_BYTES:
            raise ValueError("Document must be between 1 byte and 15 MB")
        cache_key = hashlib.sha256(raw + filename.lower().encode() + language.encode()).hexdigest()
        if cache_key in self._cache:
            self._cache.move_to_end(cache_key)
            return {**self._cache[cache_key], "cached": True}
        extension = Path(filename).suffix.lower()
        if extension in {".jpg", ".jpeg", ".png", ".webp"}:
            result = self._image(raw, language)
        elif extension == ".pdf":
            result = self._pdf(raw)
        elif extension == ".docx":
            result = self._docx(raw)
        else:
            raise ValueError("Supported formats are JPG, PNG, WebP, PDF, and DOCX")
        self._cache[cache_key] = result
        while len(self._cache) > 16:
            self._cache.popitem(last=False)
        return {**result, "cached": False}

    def _image(self, raw: bytes, language: str) -> dict:
        try:
            image = Image.open(io.BytesIO(raw)).convert("RGB")
            image.load()
        except Exception as exc:
            raise ValueError("The uploaded image could not be opened") from exc
        image.thumbnail((1000, 1000), Image.Resampling.BICUBIC)
        prepared = ImageOps.autocontrast(image.convert("L")).filter(ImageFilter.SHARPEN).convert("RGB")
        detections = self.ocr.extract(prepared, language)
        terms = self.glossary.match_regions(detections)
        if len(terms) < 3:
            terms.extend(self.glossary.contextual_fallbacks(detections, 3 - len(terms), terms))
        _, unknown = self.glossary.match_ocr(detections)
        return self._result(image, terms, detections, unknown, "image_ocr")

    def _pdf(self, raw: bytes) -> dict:
        try:
            import fitz
        except ImportError as exc:
            raise RuntimeError("PDF support is not installed") from exc
        try:
            document = fitz.open(stream=raw, filetype="pdf")
            if document.page_count < 1:
                raise ValueError("The PDF has no pages")
            page_count = min(document.page_count, self.MAX_PDF_PAGES)
            rendered = []
            canvas_width = 0
            for index in range(page_count):
                page = document[index]
                zoom = min(1.2, 800 / max(1, page.rect.width))
                pixmap = page.get_pixmap(matrix=fitz.Matrix(zoom, zoom), alpha=False)
                page_image = Image.frombytes("RGB", [pixmap.width, pixmap.height], pixmap.samples)
                rendered.append((page, page_image, zoom))
                canvas_width = max(canvas_width, page_image.width)
            gap = 18
            total_height = sum(item[1].height for item in rendered) + gap * max(0, page_count - 1)
            image = Image.new("RGB", (canvas_width, total_height), "#dfe3df")
            regions = []
            y_offset = 0
            for page, page_image, zoom in rendered:
                x_offset = (canvas_width - page_image.width) // 2
                image.paste(page_image, (x_offset, y_offset))
                page_regions = []
                for block in page.get_text("blocks"):
                    x1, y1, x2, y2, text = block[:5]
                    if text.strip():
                        page_regions.append({
                            "text": text,
                            "bbox": [x_offset + x1 * zoom, y_offset + y1 * zoom,
                                     x_offset + x2 * zoom, y_offset + y2 * zoom],
                            "confidence": 1.0,
                            "language": self._language(text),
                        })
                if page_regions:
                    regions.extend(page_regions)
                else:
                    page_detections = self.ocr.extract(page_image, "en")
                    for item in page_detections:
                        item["bbox"] = [item["bbox"][0] + x_offset, item["bbox"][1] + y_offset,
                                        item["bbox"][2] + x_offset, item["bbox"][3] + y_offset]
                    regions.extend(page_detections)
                y_offset += page_image.height + gap
            terms = self.glossary.match_regions(regions)
            if len(terms) < 3:
                terms.extend(self.glossary.contextual_fallbacks(regions, 3 - len(terms), terms))
            result = self._result(image, terms, regions, [], "pdf_multpage")
            result.update({"page_count": document.page_count, "pages_analyzed": page_count,
                           "truncated": document.page_count > page_count})
            return result
        except (ValueError, RuntimeError):
            raise
        except Exception as exc:
            raise ValueError("The PDF could not be processed") from exc

    def _docx(self, raw: bytes) -> dict:
        try:
            with zipfile.ZipFile(io.BytesIO(raw)) as archive:
                xml = archive.read("word/document.xml")
        except Exception as exc:
            raise ValueError("The Word document could not be opened") from exc
        root = ElementTree.fromstring(xml)
        paragraphs = []
        for paragraph in root.iter("{http://schemas.openxmlformats.org/wordprocessingml/2006/main}p"):
            text = "".join(node.text or "" for node in paragraph.iter("{http://schemas.openxmlformats.org/wordprocessingml/2006/main}t"))
            if text.strip():
                paragraphs.append(text.strip())
        if not paragraphs:
            raise ValueError("The Word document contains no readable text")
        return self._render_text(paragraphs)

    def _render_text(self, paragraphs: list[str]) -> dict:
        width, margin, line_height = 1000, 80, 36
        wrapped = []
        for paragraph in paragraphs:
            wrapped.extend(textwrap.wrap(paragraph, width=65, break_long_words=False) or [""])
            wrapped.append("")
        height = max(900, margin * 2 + len(wrapped) * line_height)
        image = Image.new("RGB", (width, height), "#ffffff")
        draw = ImageDraw.Draw(image)
        try:
            font = ImageFont.truetype("/usr/share/fonts/truetype/dejavu/DejaVuSans.ttf", 28)
        except OSError:
            font = ImageFont.load_default()
        regions = []
        y = margin
        for line in wrapped:
            if y + line_height > height - margin:
                break
            if line:
                draw.text((margin, y), line, fill="#17211e", font=font)
                regions.append({"text": line, "bbox": [margin, y, width - margin, y + line_height],
                                "confidence": 1.0, "language": self._language(line)})
            y += line_height
        terms = self.glossary.match_regions(regions)
        if len(terms) < 3:
            terms.extend(self.glossary.contextual_fallbacks(regions, 3 - len(terms), terms))
        return self._result(image, terms, regions, [], "docx_text")

    @staticmethod
    def _language(text: str) -> str:
        return "ar" if any("\u0600" <= character <= "\u06ff" for character in text) else "en"

    @staticmethod
    def _result(image: Image.Image, terms: list, detections: list, unknown: list, method: str) -> dict:
        output = io.BytesIO()
        image.save(output, format="JPEG", quality=70, optimize=True)
        return {
            "detected_terms": terms,
            "ocr_items": detections,
            "unknown_terms": unknown[:10],
            "frame_width": image.width,
            "frame_height": image.height,
            "preview_base64": "data:image/jpeg;base64," + base64.b64encode(output.getvalue()).decode(),
            "ocr_available": True,
            "analysis_method": method,
            "mean_ocr_confidence": round(sum(float(item.get("confidence", 1)) for item in detections) / len(detections), 3) if detections else 0,
        }