| import re |
| import numpy as np |
| import time |
| import cv2 |
| import random |
| import string |
| from typing import List, Dict, Any, Tuple, Optional, Union |
|
|
| import httpx |
| from PIL import Image |
| import io |
| import json5 |
| from urllib.parse import parse_qs, urlparse |
|
|
| |
| |
| |
|
|
| from .base import register_OCR, OCRBase, TextBlock |
|
|
|
|
| class LensCore: |
| LENS_UPLOAD_ENDPOINT = "https://lens.google.com/v3/upload" |
| LENS_METADATA_ENDPOINT = "https://lens.google.com/qfmetadata" |
| |
| HEADERS = { |
| "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7", |
| "Accept-Language": "ru", |
| "Cache-Control": "max-age=0", |
| "Sec-Ch-Ua": '"Not-A.Brand";v="8", "Chromium";v="135", "Google Chrome";v="135"', |
| "Sec-Ch-Ua-Mobile": "?0", |
| "Sec-Ch-Ua-Platform": '"Windows"', |
| "Upgrade-Insecure-Requests": "1", |
| "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/135.0.0.0 Safari/537.36", |
| "Origin": "https://www.google.com", |
| "Referer": "https://www.google.com/", |
| "Sec-Fetch-Site": "same-site", |
| "Sec-Fetch-Mode": "navigate", |
| "Sec-Fetch-Dest": "document", |
| "Sec-Fetch-User": "?1", |
| "Priority": "u=0, i", |
| } |
| SUPPORTED_MIMES = [ |
| "image/x-icon", |
| "image/bmp", |
| "image/jpeg", |
| "image/png", |
| "image/tiff", |
| "image/webp", |
| "image/heic", |
| ] |
|
|
| def __init__(self, proxy: Optional[Union[str, Dict[str, str]]] = None): |
| self.proxy = proxy |
| |
|
|
| @staticmethod |
| def _extract_ids_from_url(url_string: str) -> Tuple[Optional[str], Optional[str]]: |
| try: |
| parsed_url = urlparse(url_string) |
| query_params = parse_qs(parsed_url.query) |
| vsrid = query_params.get("vsrid", [None])[0] |
| lsessionid = query_params.get("lsessionid", [None])[0] |
| return vsrid, lsessionid |
| except Exception as e: |
| |
| print( |
| f"Error extracting IDs from URL {url_string}: {e}" |
| ) |
| return None, None |
|
|
| def _create_httpx_client(self) -> httpx.Client: |
| """Создает httpx клиент с настройками прокси.""" |
| client_kwargs = { |
| "follow_redirects": True, |
| "timeout": httpx.Timeout(30.0, connect=10.0), |
| "limits": httpx.Limits(max_keepalive_connections=5, max_connections=10), |
| "http2": False, |
| "verify": True, |
| } |
|
|
| if self.proxy: |
| mounts = {} |
| try: |
| if isinstance(self.proxy, str): |
| |
| |
| proxy_url = self.proxy |
| if "://" not in proxy_url: |
| proxy_url = f"http://{proxy_url}" |
|
|
| |
| https_proxy_url = proxy_url |
| if urlparse(proxy_url).scheme in ["http", "socks4", "socks5"]: |
| |
| pass |
| |
| |
| |
|
|
| mounts["http://"] = httpx.HTTPTransport(proxy=proxy_url) |
| mounts["https://"] = httpx.HTTPTransport(proxy=https_proxy_url) |
|
|
| elif isinstance(self.proxy, dict): |
| |
| if "http://" in self.proxy: |
| mounts["http://"] = httpx.HTTPTransport( |
| proxy=self.proxy["http://"] |
| ) |
| if "https://" in self.proxy: |
| |
| |
| https_proxy_spec = self.proxy["https://"] |
| if ( |
| isinstance(https_proxy_spec, str) |
| and urlparse(https_proxy_spec).scheme == "https" |
| ): |
| |
| mounts["https://"] = httpx.HTTPTransport( |
| proxy=https_proxy_spec |
| ) |
| elif isinstance(https_proxy_spec, str): |
| |
| mounts["https://"] = httpx.HTTPTransport( |
| proxy=https_proxy_spec |
| ) |
| |
| |
| |
|
|
| if mounts: |
| client_kwargs["mounts"] = mounts |
| except Exception as e: |
| |
| print(f"Error setting up proxy: {e}") |
| |
|
|
| return httpx.Client(**client_kwargs) |
|
|
| def scan_by_data( |
| self, data: bytes, mime: str, dimensions: Tuple[int, int] |
| ) -> Dict[str, Any]: |
| """ |
| Отправляет данные изображения в Google Lens и получает результат OCR. |
| Использует двухэтапный процесс: загрузка, затем получение метаданных. |
| """ |
| random_filename = "".join(random.choices(string.ascii_letters, k=8)) |
| |
| ext = ".jpg" |
| if mime == "image/png": |
| ext = ".png" |
| elif mime == "image/webp": |
| ext = ".webp" |
| elif mime == "image/gif": |
| ext = ".gif" |
| |
|
|
| filename = f"{random_filename}{ext}" |
|
|
| files = {"encoded_image": (filename, data, mime)} |
| |
| params_upload = { |
| "hl": "ru", |
| "re": "av", |
| "vpw": "1903", |
| "vph": "953", |
| "ep": "gsbubb", |
| "st": str(int(time.time() * 1000)), |
| } |
|
|
| try: |
| |
| with self._create_httpx_client() as client: |
| |
| upload_headers = self.HEADERS.copy() |
| response_upload = client.post( |
| self.LENS_UPLOAD_ENDPOINT, |
| headers=upload_headers, |
| files=files, |
| params=params_upload, |
| ) |
| response_upload.raise_for_status() |
|
|
| final_url = str(response_upload.url) |
|
|
| |
| vsrid, lsessionid = self._extract_ids_from_url(final_url) |
| if not vsrid or not lsessionid: |
| raise Exception( |
| "Failed to extract vsrid or lsessionid from upload redirect URL." |
| ) |
|
|
| |
| metadata_params = { |
| "vsrid": vsrid, |
| "lsessionid": lsessionid, |
| "hl": params_upload["hl"], |
| "qf": "CAI%3D", |
| "st": str(int(time.time() * 1000)), |
| "vpw": params_upload["vpw"], |
| "vph": params_upload["vph"], |
| "source": "lens", |
| } |
| |
| metadata_headers = self.HEADERS.copy() |
| metadata_headers.update( |
| { |
| "Accept": "*/*", |
| "Referer": final_url, |
| "Sec-Fetch-Site": "same-origin", |
| "Sec-Fetch-Mode": "cors", |
| "Sec-Fetch-Dest": "empty", |
| "Priority": "u=1, i", |
| } |
| ) |
| |
| metadata_headers.pop("Upgrade-Insecure-Requests", None) |
| metadata_headers.pop("Sec-Fetch-User", None) |
| metadata_headers.pop("Cache-Control", None) |
| metadata_headers.pop("Origin", None) |
|
|
| response_metadata = client.get( |
| self.LENS_METADATA_ENDPOINT, |
| headers=metadata_headers, |
| params=metadata_params, |
| ) |
| response_metadata.raise_for_status() |
|
|
| |
| response_text = response_metadata.text |
| |
| if response_text.startswith(")]}'\n"): |
| response_text = response_text[5:] |
| elif response_text.startswith(")]}'"): |
| response_text = response_text[4:] |
|
|
| |
| metadata_json = json5.loads(response_text) |
| return metadata_json |
|
|
| except httpx.HTTPStatusError as e: |
| |
| print(f"HTTP error: {e.response.status_code} for URL {e.request.url}") |
| raise Exception(f"HTTP Error {e.response.status_code}") from e |
| except httpx.RequestError as e: |
| print(f"Request error: {e}") |
| raise Exception(f"Request Error: {e}") from e |
| except Exception as e: |
| print(f"Unexpected error in scan_by_data: {e}") |
| raise |
|
|
|
|
| class Lens(LensCore): |
| def __init__(self, proxy: Optional[Union[str, Dict[str, str]]] = None): |
| super().__init__(proxy=proxy) |
|
|
| @staticmethod |
| def resize_image( |
| image: Image.Image, max_size: Tuple[int, int] = (1000, 1000) |
| ) -> Tuple[bytes, Tuple[int, int]]: |
| """Изменяет размер изображения и конвертирует в JPEG байты.""" |
| |
| image.thumbnail(max_size) |
| if image.mode in ("RGBA", "P"): |
| image = image.convert("RGB") |
| buffer = io.BytesIO() |
| image.save(buffer, format="JPEG", quality=95) |
| return buffer.getvalue(), image.size |
|
|
| def scan_by_file(self, file_path: str) -> Dict[str, Any]: |
| """Сканирует изображение из файла.""" |
| try: |
| with Image.open(file_path) as img: |
| img_data, dimensions = self.resize_image(img) |
| |
| mime = Image.MIME.get(img.format) or "image/jpeg" |
| return self.scan_by_data(img_data, mime, dimensions) |
| except FileNotFoundError: |
| raise FileNotFoundError(f"Image file not found: {file_path}") |
| except Exception as e: |
| raise Exception(f"Error processing image file {file_path}: {e}") from e |
|
|
| def scan_by_buffer(self, buffer: bytes) -> Dict[str, Any]: |
| """Сканирует изображение из байтового буфера.""" |
| try: |
| img = Image.open(io.BytesIO(buffer)) |
| img_data, dimensions = self.resize_image(img) |
| mime = Image.MIME.get(img.format) or "image/jpeg" |
| return self.scan_by_data(img_data, mime, dimensions) |
| except Exception as e: |
| raise Exception(f"Error processing image buffer: {e}") from e |
|
|
|
|
| class LensAPI: |
| def __init__(self, proxy: Optional[Union[str, Dict[str, str]]] = None): |
| self.lens = Lens(proxy=proxy) |
|
|
| @staticmethod |
| def adaptive_parse_text_and_language( |
| metadata_json: Any, |
| ) -> Tuple[Optional[str], List[str], List[Dict[str, Any]]]: |
| """ |
| Адаптивно парсит JSON для извлечения языка, текстовых блоков и аннотаций слов. |
| Это синхронная версия функции из нового скрипта. |
| """ |
| language = None |
| all_word_annotations = [] |
| reconstructed_blocks = [] |
|
|
| try: |
| if not isinstance(metadata_json, list) or not metadata_json: |
| |
| print("Invalid JSON structure: metadata_json is not a non-empty list.") |
| return None, [], [] |
|
|
| response_container = next( |
| ( |
| item |
| for item in metadata_json |
| if isinstance(item, list) |
| and item |
| and item[0] == "fetch_query_formulation_metadata_response" |
| ), |
| None, |
| ) |
| if response_container is None: |
| print( |
| "Could not find 'fetch_query_formulation_metadata_response' container." |
| ) |
| return None, [], [] |
|
|
| |
| try: |
| |
| |
| if len(response_container) > 2 and isinstance( |
| response_container[2], list |
| ): |
| lang_section = response_container[2] |
| |
| language = next( |
| ( |
| element |
| for element in lang_section |
| if isinstance(element, str) and len(element) == 2 |
| ), |
| None, |
| ) |
| |
| |
| |
| |
|
|
| except (IndexError, TypeError, StopIteration): |
| print("Could not find language code in expected structure.") |
| pass |
|
|
| |
| segments_iterable = None |
| |
| |
| possible_paths_to_segments_list = [ |
| lambda rc: ( |
| rc[1][0][0][0] |
| if len(rc) > 1 |
| and isinstance(rc[1], list) |
| and rc[1] |
| and isinstance(rc[1][0], list) |
| and rc[1][0] |
| and isinstance(rc[1][0][0], list) |
| and rc[1][0][0] |
| and isinstance(rc[1][0][0][0], list) |
| else None |
| ), |
| lambda rc: ( |
| rc[2][0][0][0] |
| if len(rc) > 2 |
| and isinstance(rc[2], list) |
| and rc[2] |
| and isinstance(rc[2][0], list) |
| and rc[2][0] |
| and isinstance(rc[2][0][0], list) |
| and rc[2][0][0] |
| and isinstance(rc[2][0][0][0], list) |
| else None |
| ), |
| lambda rc: ( |
| rc[1][0][0] |
| if len(rc) > 1 |
| and isinstance(rc[1], list) |
| and rc[1] |
| and isinstance(rc[1][0], list) |
| and rc[1][0] |
| and isinstance(rc[1][0][0], list) |
| else None |
| ), |
| lambda rc: ( |
| rc[2][0][0] |
| if len(rc) > 2 |
| and isinstance(rc[2], list) |
| and rc[2] |
| and isinstance(rc[2][0], list) |
| and rc[2][0] |
| and isinstance(rc[2][0][0], list) |
| else None |
| ), |
| ] |
|
|
| for path_func in possible_paths_to_segments_list: |
| try: |
| candidate_iterable = path_func(response_container) |
| |
| if isinstance(candidate_iterable, list) and candidate_iterable: |
| |
| first_segment = candidate_iterable[0] |
| if ( |
| isinstance(first_segment, list) |
| and len(first_segment) > 1 |
| and isinstance(first_segment[1], list) |
| ): |
| |
| if ( |
| first_segment[1] |
| and isinstance(first_segment[1][0], list) |
| and first_segment[1][0] |
| and isinstance(first_segment[1][0][0], list) |
| ): |
| segments_iterable = candidate_iterable |
| break |
| except (IndexError, TypeError, AttributeError): |
| continue |
|
|
| if segments_iterable is None: |
| print(f"Could not identify valid text segments list.") |
| |
| try: |
| full_text_alt = response_container[2][0][ |
| 0 |
| ] |
| if isinstance(full_text_alt, list): |
| reconstructed_blocks = full_text_alt |
| elif isinstance(full_text_alt, str): |
| reconstructed_blocks = [full_text_alt] |
| print("Found alternative full text structure.") |
| return ( |
| language, |
| reconstructed_blocks, |
| [], |
| ) |
| except (IndexError, TypeError): |
| print("Could not find alternative full text structure either.") |
| return language, [], [] |
|
|
| |
| for segment_list in segments_iterable: |
| current_block_word_annotations = [] |
| block_text_builder = io.StringIO() |
| last_word_ends_with_space = False |
|
|
| if ( |
| not isinstance(segment_list, list) |
| or len(segment_list) < 2 |
| or not isinstance(segment_list[1], list) |
| ): |
| print( |
| f"Skipping segment: Invalid structure or word group list not found at index [1]. Segment: {segment_list}" |
| ) |
| continue |
|
|
| try: |
| word_groups_list = segment_list[1] |
|
|
| for group_count, word_group in enumerate(word_groups_list, 1): |
| |
| if not ( |
| isinstance(word_group, list) |
| and len(word_group) > 0 |
| and isinstance(word_group[0], list) |
| ): |
| print( |
| f"Skipping word group: Invalid structure. Group: {word_group}" |
| ) |
| continue |
|
|
| word_list = word_group[0] |
|
|
| if ( |
| group_count > 1 |
| and block_text_builder.tell() > 0 |
| and not last_word_ends_with_space |
| ): |
| block_text_builder.write(" ") |
| last_word_ends_with_space = True |
|
|
| for word_info in word_list: |
| |
| try: |
| if ( |
| isinstance(word_info, list) |
| and len(word_info) > 3 |
| and isinstance(word_info[1], str) |
| and isinstance(word_info[2], str) |
| and isinstance(word_info[3], list) |
| and word_info[3] |
| and isinstance(word_info[3][0], list) |
| ): |
|
|
| text = word_info[1] |
| space_indicator = word_info[2] |
| bbox = word_info[3][ |
| 0 |
| ] |
| |
| |
|
|
| current_block_word_annotations.append( |
| {"text": text, "bbox": bbox} |
| ) |
|
|
| block_text_builder.write(text) |
| if ( |
| space_indicator |
| ): |
| block_text_builder.write(space_indicator) |
| last_word_ends_with_space = ( |
| space_indicator == " " |
| ) |
| else: |
| last_word_ends_with_space = False |
| else: |
| print( |
| f"Skipping word info: Invalid structure. Info: {word_info}" |
| ) |
|
|
| except (IndexError, TypeError): |
| print(f"Error processing word info: {word_info}") |
| continue |
|
|
| except (IndexError, TypeError) as e: |
| print(f"Error processing word groups in segment: {e}") |
| except Exception as e: |
| print(f"Unexpected error processing segment: {e}") |
|
|
| reconstructed_text = block_text_builder.getvalue().rstrip( |
| " " |
| ) |
| block_text_builder.close() |
|
|
| if ( |
| reconstructed_text or current_block_word_annotations |
| ): |
| reconstructed_blocks.append(reconstructed_text) |
| all_word_annotations.extend(current_block_word_annotations) |
|
|
| except Exception as e: |
| print(f"Critical error during adaptive text extraction: {e}") |
| |
| return language, reconstructed_blocks, all_word_annotations |
|
|
| |
| return language, reconstructed_blocks, all_word_annotations |
|
|
| @staticmethod |
| def stitch_text_sequential(word_annotations: List[Dict[str, Any]]) -> str: |
| """Просто соединяет текст из аннотаций слов.""" |
| |
| |
| |
| text = " ".join([item["text"] for item in word_annotations]) |
| |
| text = re.sub(r"\s+([,.!?])", r"\1", text) |
| return text.strip() |
|
|
| @staticmethod |
| def stitch_text_smart(word_annotations: List[Dict[str, Any]]) -> str: |
| if ( |
| not word_annotations |
| or "bbox" not in word_annotations[0] |
| or not isinstance(word_annotations[0]["bbox"], list) |
| or not word_annotations[0]["bbox"] |
| ): |
| print("Cannot perform smart stitching: bbox data missing or invalid.") |
| return LensAPI.stitch_text_sequential( |
| word_annotations |
| ) |
|
|
| try: |
| |
| def get_sort_key(element): |
| try: |
| |
| bbox = element["bbox"] |
| if isinstance(bbox[0], list) and len(bbox[0]) >= 2: |
| y = bbox[0][1] |
| x = bbox[0][0] |
| return (round(y, 2), x) |
| |
| elif len(bbox) >= 2: |
| y = bbox[1] |
| x = bbox[0] |
| return (round(y, 2), x) |
| else: |
| return (0, 0) |
| except (IndexError, TypeError): |
| return (0, 0) |
|
|
| sorted_elements = sorted(word_annotations, key=get_sort_key) |
|
|
| stitched_lines = [] |
| current_line = [] |
| current_y = None |
| |
| |
| y_threshold = 10 |
|
|
| for element in sorted_elements: |
| element_y, _ = get_sort_key(element) |
| text = element["text"] |
|
|
| if current_y is None or abs(element_y - current_y) > y_threshold: |
| if current_line: |
| stitched_lines.append(" ".join(current_line)) |
| current_line = [text] |
| current_y = element_y |
| else: |
| |
| if text in [",", ".", "!", "?", ";", ":"] and current_line: |
| current_line[-1] += text |
| else: |
| current_line.append(text) |
|
|
| if current_line: |
| stitched_lines.append(" ".join(current_line)) |
|
|
| return "\n".join(stitched_lines).strip() |
|
|
| except Exception as e: |
| print(f"Error during smart stitching: {e}") |
| return LensAPI.stitch_text_sequential( |
| word_annotations |
| ) |
|
|
| def process_image( |
| self, |
| image_path: Optional[str] = None, |
| image_buffer: Optional[bytes] = None, |
| response_method: str = "Full Text", |
| ) -> Dict[str, Any]: |
| """Обрабатывает изображение и возвращает результат OCR.""" |
| if image_path: |
| raw_result_json = self.lens.scan_by_file(image_path) |
| elif image_buffer: |
| raw_result_json = self.lens.scan_by_buffer(image_buffer) |
| else: |
| raise ValueError("Either image_path or image_buffer must be provided") |
|
|
| |
| language, reconstructed_blocks, word_annotations = ( |
| self.adaptive_parse_text_and_language(raw_result_json) |
| ) |
|
|
| |
| full_text = "" |
| if response_method == "Full Text": |
| |
| full_text = "\n".join(reconstructed_blocks).strip() |
| if not full_text and word_annotations: |
| full_text = self.stitch_text_sequential(word_annotations) |
|
|
| elif response_method == "Coordinate sequence": |
| |
| full_text = self.stitch_text_sequential(word_annotations) |
|
|
| elif response_method == "Location coordinates": |
| |
| |
| |
| full_text = "\n".join(reconstructed_blocks).strip() |
| if not full_text and word_annotations: |
| full_text = self.stitch_text_sequential(word_annotations) |
| else: |
| raise ValueError(f"Invalid response method: {response_method}") |
|
|
| return { |
| "full_text": ( |
| full_text if full_text else "" |
| ), |
| "language": language if language else "und", |
| "text_with_coordinates": word_annotations, |
| } |
|
|
|
|
| def format_ocr_result(result: Dict[str, Any]) -> str: |
| """Форматирует результат OCR для вывода (например, в debug).""" |
| |
| formatted_result = { |
| "language": result.get("language", "und"), |
| "full_text_preview": result.get("full_text", "")[:100] |
| + "...", |
| "word_annotations_count": len(result.get("text_with_coordinates", [])), |
| |
| "word_annotations_preview": [ |
| f"{item['text']} (bbox: {item.get('bbox', 'N/A')})" |
| for item in result.get("text_with_coordinates", [])[:5] |
| ], |
| } |
| try: |
| |
| return json5.dumps( |
| formatted_result, indent=2, ensure_ascii=False, quote_keys=True |
| ) |
| except Exception: |
| return str(formatted_result) |
|
|
|
|
| @register_OCR("google_lens") |
| class OCRLensAPI(OCRBase): |
| |
| params = { |
| "delay": 1.0, |
| "newline_handling": { |
| "type": "selector", |
| "options": ["preserve", "remove"], |
| "value": "preserve", |
| "description": "Handle newline characters in OCR result (only affects final string)", |
| }, |
| "no_uppercase": { |
| "type": "checkbox", |
| "value": False, |
| "description": "Convert text to lowercase except first letter of sentences", |
| }, |
| "response_method": { |
| "type": "selector", |
| "options": ["Full Text", "Coordinate sequence", "Location coordinates"], |
| "value": "Full Text", |
| "description": "Method for extracting/stitching text from image data", |
| }, |
| "proxy": { |
| "value": "", |
| "description": 'Proxy address (e.g., http://user:pass@host:port, socks5://host:port, or dict {"http://": ..., "https://": ...})', |
| }, |
| "description": "OCR using Google Lens API (New Engine)", |
| } |
|
|
| |
| @property |
| def request_delay(self): |
| try: |
| return float(self.get_param_value("delay")) |
| except (ValueError, TypeError): |
| return 1.0 |
|
|
| @property |
| def newline_handling(self): |
| return self.get_param_value("newline_handling") |
|
|
| @property |
| def no_uppercase(self): |
| return self.get_param_value("no_uppercase") |
|
|
| @property |
| def response_method(self): |
| return self.get_param_value("response_method") |
|
|
| @property |
| def proxy(self) -> Optional[Union[str, Dict[str, str]]]: |
| val = self.get_param_value("proxy") |
| |
| if isinstance(val, str) and val.strip().startswith("{"): |
| try: |
| parsed_dict = json5.loads(val) |
| if isinstance(parsed_dict, dict): |
| return parsed_dict |
| except Exception: |
| pass |
| return val if val else None |
|
|
| def __init__(self, **params) -> None: |
| |
| super().__init__(**params) |
| |
| self.api = LensAPI(proxy=self.proxy) |
| self.last_request_time = 0 |
|
|
| def _ocr_blk_list( |
| self, img: np.ndarray, blk_list: List[TextBlock], *args, **kwargs |
| ): |
| im_h, im_w = img.shape[:2] |
| if self.debug_mode: |
| self.logger.debug(f"Image size: {im_h}x{im_w}") |
| for blk in blk_list: |
| x1, y1, x2, y2 = blk.xyxy |
| if self.debug_mode: |
| self.logger.debug(f"Processing block: ({x1, y1, x2, y2})") |
| if 0 <= y1 < y2 <= im_h and 0 <= x1 < x2 <= im_w: |
| cropped_img = img[y1:y2, x1:x2] |
| if cropped_img.size > 0: |
| if self.debug_mode: |
| self.logger.debug(f"Cropped image size: {cropped_img.shape}") |
| blk.text = self.ocr(cropped_img) |
| else: |
| if self.debug_mode: |
| self.logger.warning( |
| f"Empty cropped image for block: ({x1, y1, x2, y2})" |
| ) |
| blk.text = "" |
| else: |
| if self.debug_mode: |
| self.logger.warning( |
| f"Invalid text bbox {blk.xyxy} for image size {im_h}x{im_w}" |
| ) |
| blk.text = "" |
|
|
| def ocr_img(self, img: np.ndarray) -> str: |
| if self.debug_mode: |
| self.logger.debug(f"ocr_img shape: {img.shape}") |
| return self.ocr(img) |
|
|
| def ocr(self, img: np.ndarray) -> str: |
| if self.debug_mode: |
| self.logger.debug(f"Starting OCR on image shape: {img.shape}") |
| self._respect_delay() |
| try: |
| if img.size == 0: |
| if self.debug_mode: |
| self.logger.warning("Empty image provided for OCR") |
| return "" |
|
|
| is_success, buffer = cv2.imencode( |
| ".jpg", img, [int(cv2.IMWRITE_JPEG_QUALITY), 95] |
| ) |
| if not is_success: |
| if self.debug_mode: |
| self.logger.error("Failed to encode image to JPEG buffer") |
| return "" |
|
|
| |
| result = self.api.process_image( |
| image_buffer=buffer.tobytes(), response_method=self.response_method |
| ) |
|
|
| if self.debug_mode: |
| formatted_res_str = format_ocr_result(result) |
| self.logger.debug(f"OCR raw result:\n{formatted_res_str}") |
|
|
| full_text = result.get("full_text", "") |
|
|
| if self.newline_handling == "remove": |
| full_text = full_text.replace("\n", " ").replace( |
| "\r", "" |
| ) |
|
|
| full_text = self._apply_punctuation_and_spacing(full_text) |
|
|
| if self.no_uppercase: |
| full_text = self._apply_no_uppercase(full_text) |
|
|
| return str(full_text) if full_text is not None else "" |
|
|
| except Exception as e: |
| if self.debug_mode: |
| self.logger.error(f"OCR error occurred", exc_info=True) |
| else: |
| self.logger.error(f"OCR error: {type(e).__name__} - {e}") |
| return "" |
|
|
| def _apply_no_uppercase(self, text: str) -> str: |
| def process_sentence(sentence): |
| sentence = sentence.strip() |
| if not sentence: |
| return "" |
| return sentence[0].upper() + sentence[1:].lower() |
|
|
| sentences = re.split(r"(?<=[.!?…])\s+", text) |
| processed_sentences = [process_sentence(s) for s in sentences if s] |
|
|
| return " ".join(processed_sentences) |
|
|
| def _apply_punctuation_and_spacing(self, text: str) -> str: |
| text = re.sub(r"\s+([,.!?…:;])", r"\1", text) |
| text = re.sub(r"([,.!?…:;])(?![,.!?…:;\s])(?!\Z)", r"\1 ", text) |
| text = re.sub(r"\s+", " ", text) |
| return text.strip() |
|
|
| def _respect_delay(self): |
| current_time = time.time() |
| time_since_last_request = current_time - self.last_request_time |
| delay = self.request_delay |
|
|
| if time_since_last_request < delay: |
| sleep_time = delay - time_since_last_request |
| if self.debug_mode: |
| self.logger.info( |
| f"Delay active. Sleeping for {sleep_time:.3f} seconds (Delay: {delay:.1f}s, Since last: {time_since_last_request:.3f}s)" |
| ) |
| time.sleep(sleep_time) |
| self.last_request_time = time.time() |
|
|
| def updateParam(self, param_key: str, param_content: Any): |
| if param_key == "delay": |
| try: |
| param_content = float(param_content) |
| except (ValueError, TypeError): |
| param_content = 1.0 |
|
|
| super().updateParam(param_key, param_content) |
|
|
| if param_key == "proxy": |
| if self.debug_mode: |
| self.logger.info( |
| f"Proxy parameter updated to: {self.proxy}. Reinitializing LensAPI." |
| ) |
| try: |
| self.api = LensAPI(proxy=self.proxy) |
| except Exception as e: |
| self.logger.error( |
| f"Failed to reinitialize LensAPI after proxy update: {e}", |
| exc_info=True, |
| ) |
|
|