Spaces:
Running
Running
Antigravity AI Agent
Optimize document loading performance, make document info circular icon-only, align header button fonts, and move selection hint slightly down
9873f3c | import base64 | |
| import hashlib | |
| import io | |
| import textwrap | |
| import zipfile | |
| from collections import OrderedDict | |
| from pathlib import Path | |
| from xml.etree import ElementTree | |
| from PIL import Image, ImageDraw, ImageFilter, ImageFont, ImageOps | |
| from backend.glossary_service import GlossaryService | |
| from backend.ocr_service import OCRService | |
| class DocumentService: | |
| MAX_BYTES = 15 * 1024 * 1024 | |
| MAX_PDF_PAGES = 12 | |
| def __init__(self, ocr: OCRService, glossary: GlossaryService): | |
| self.ocr = ocr | |
| self.glossary = glossary | |
| self._cache: OrderedDict[str, dict] = OrderedDict() | |
| def analyze(self, encoded: str, filename: str, language: str) -> dict: | |
| try: | |
| raw = base64.b64decode(encoded, validate=True) | |
| except Exception as exc: | |
| raise ValueError("Invalid uploaded document") from exc | |
| if not raw or len(raw) > self.MAX_BYTES: | |
| raise ValueError("Document must be between 1 byte and 15 MB") | |
| cache_key = hashlib.sha256(raw + filename.lower().encode() + language.encode()).hexdigest() | |
| if cache_key in self._cache: | |
| self._cache.move_to_end(cache_key) | |
| return {**self._cache[cache_key], "cached": True} | |
| extension = Path(filename).suffix.lower() | |
| if extension in {".jpg", ".jpeg", ".png", ".webp"}: | |
| result = self._image(raw, language) | |
| elif extension == ".pdf": | |
| result = self._pdf(raw) | |
| elif extension == ".docx": | |
| result = self._docx(raw) | |
| else: | |
| raise ValueError("Supported formats are JPG, PNG, WebP, PDF, and DOCX") | |
| self._cache[cache_key] = result | |
| while len(self._cache) > 16: | |
| self._cache.popitem(last=False) | |
| return {**result, "cached": False} | |
| def _image(self, raw: bytes, language: str) -> dict: | |
| try: | |
| image = Image.open(io.BytesIO(raw)).convert("RGB") | |
| image.load() | |
| except Exception as exc: | |
| raise ValueError("The uploaded image could not be opened") from exc | |
| image.thumbnail((1000, 1000), Image.Resampling.BICUBIC) | |
| prepared = ImageOps.autocontrast(image.convert("L")).filter(ImageFilter.SHARPEN).convert("RGB") | |
| detections = self.ocr.extract(prepared, language) | |
| terms = self.glossary.match_regions(detections) | |
| if len(terms) < 3: | |
| terms.extend(self.glossary.contextual_fallbacks(detections, 3 - len(terms), terms)) | |
| _, unknown = self.glossary.match_ocr(detections) | |
| return self._result(image, terms, detections, unknown, "image_ocr") | |
| def _pdf(self, raw: bytes) -> dict: | |
| try: | |
| import fitz | |
| except ImportError as exc: | |
| raise RuntimeError("PDF support is not installed") from exc | |
| try: | |
| document = fitz.open(stream=raw, filetype="pdf") | |
| if document.page_count < 1: | |
| raise ValueError("The PDF has no pages") | |
| page_count = min(document.page_count, self.MAX_PDF_PAGES) | |
| rendered = [] | |
| canvas_width = 0 | |
| for index in range(page_count): | |
| page = document[index] | |
| zoom = min(1.2, 800 / max(1, page.rect.width)) | |
| pixmap = page.get_pixmap(matrix=fitz.Matrix(zoom, zoom), alpha=False) | |
| page_image = Image.frombytes("RGB", [pixmap.width, pixmap.height], pixmap.samples) | |
| rendered.append((page, page_image, zoom)) | |
| canvas_width = max(canvas_width, page_image.width) | |
| gap = 18 | |
| total_height = sum(item[1].height for item in rendered) + gap * max(0, page_count - 1) | |
| image = Image.new("RGB", (canvas_width, total_height), "#dfe3df") | |
| regions = [] | |
| y_offset = 0 | |
| for page, page_image, zoom in rendered: | |
| x_offset = (canvas_width - page_image.width) // 2 | |
| image.paste(page_image, (x_offset, y_offset)) | |
| page_regions = [] | |
| for block in page.get_text("blocks"): | |
| x1, y1, x2, y2, text = block[:5] | |
| if text.strip(): | |
| page_regions.append({ | |
| "text": text, | |
| "bbox": [x_offset + x1 * zoom, y_offset + y1 * zoom, | |
| x_offset + x2 * zoom, y_offset + y2 * zoom], | |
| "confidence": 1.0, | |
| "language": self._language(text), | |
| }) | |
| if page_regions: | |
| regions.extend(page_regions) | |
| else: | |
| page_detections = self.ocr.extract(page_image, "en") | |
| for item in page_detections: | |
| item["bbox"] = [item["bbox"][0] + x_offset, item["bbox"][1] + y_offset, | |
| item["bbox"][2] + x_offset, item["bbox"][3] + y_offset] | |
| regions.extend(page_detections) | |
| y_offset += page_image.height + gap | |
| terms = self.glossary.match_regions(regions) | |
| if len(terms) < 3: | |
| terms.extend(self.glossary.contextual_fallbacks(regions, 3 - len(terms), terms)) | |
| result = self._result(image, terms, regions, [], "pdf_multpage") | |
| result.update({"page_count": document.page_count, "pages_analyzed": page_count, | |
| "truncated": document.page_count > page_count}) | |
| return result | |
| except (ValueError, RuntimeError): | |
| raise | |
| except Exception as exc: | |
| raise ValueError("The PDF could not be processed") from exc | |
| def _docx(self, raw: bytes) -> dict: | |
| try: | |
| with zipfile.ZipFile(io.BytesIO(raw)) as archive: | |
| xml = archive.read("word/document.xml") | |
| except Exception as exc: | |
| raise ValueError("The Word document could not be opened") from exc | |
| root = ElementTree.fromstring(xml) | |
| paragraphs = [] | |
| for paragraph in root.iter("{http://schemas.openxmlformats.org/wordprocessingml/2006/main}p"): | |
| text = "".join(node.text or "" for node in paragraph.iter("{http://schemas.openxmlformats.org/wordprocessingml/2006/main}t")) | |
| if text.strip(): | |
| paragraphs.append(text.strip()) | |
| if not paragraphs: | |
| raise ValueError("The Word document contains no readable text") | |
| return self._render_text(paragraphs) | |
| def _render_text(self, paragraphs: list[str]) -> dict: | |
| width, margin, line_height = 1000, 80, 36 | |
| wrapped = [] | |
| for paragraph in paragraphs: | |
| wrapped.extend(textwrap.wrap(paragraph, width=65, break_long_words=False) or [""]) | |
| wrapped.append("") | |
| height = max(900, margin * 2 + len(wrapped) * line_height) | |
| image = Image.new("RGB", (width, height), "#ffffff") | |
| draw = ImageDraw.Draw(image) | |
| try: | |
| font = ImageFont.truetype("/usr/share/fonts/truetype/dejavu/DejaVuSans.ttf", 28) | |
| except OSError: | |
| font = ImageFont.load_default() | |
| regions = [] | |
| y = margin | |
| for line in wrapped: | |
| if y + line_height > height - margin: | |
| break | |
| if line: | |
| draw.text((margin, y), line, fill="#17211e", font=font) | |
| regions.append({"text": line, "bbox": [margin, y, width - margin, y + line_height], | |
| "confidence": 1.0, "language": self._language(line)}) | |
| y += line_height | |
| terms = self.glossary.match_regions(regions) | |
| if len(terms) < 3: | |
| terms.extend(self.glossary.contextual_fallbacks(regions, 3 - len(terms), terms)) | |
| return self._result(image, terms, regions, [], "docx_text") | |
| def _language(text: str) -> str: | |
| return "ar" if any("\u0600" <= character <= "\u06ff" for character in text) else "en" | |
| def _result(image: Image.Image, terms: list, detections: list, unknown: list, method: str) -> dict: | |
| output = io.BytesIO() | |
| image.save(output, format="JPEG", quality=70, optimize=True) | |
| return { | |
| "detected_terms": terms, | |
| "ocr_items": detections, | |
| "unknown_terms": unknown[:10], | |
| "frame_width": image.width, | |
| "frame_height": image.height, | |
| "preview_base64": "data:image/jpeg;base64," + base64.b64encode(output.getvalue()).decode(), | |
| "ocr_available": True, | |
| "analysis_method": method, | |
| "mean_ocr_confidence": round(sum(float(item.get("confidence", 1)) for item in detections) / len(detections), 3) if detections else 0, | |
| } | |