manga / modules /ocr /ocr_google_lens.py
sayed555's picture
Upload 140 files
82f073c verified
import re
import numpy as np
import time
import cv2
import random
import string
from typing import List, Dict, Any, Tuple, Optional, Union
import httpx
from PIL import Image
import io
import json5 # Используем json5 напрямую
from urllib.parse import parse_qs, urlparse
# Убираем lxml.html и http.cookiejar, так как они больше не нужны
# import lxml.html
# import http.cookiejar as cookielib
from .base import register_OCR, OCRBase, TextBlock
class LensCore:
LENS_UPLOAD_ENDPOINT = "https://lens.google.com/v3/upload"
LENS_METADATA_ENDPOINT = "https://lens.google.com/qfmetadata"
# Обновленные заголовки из нового скрипта
HEADERS = {
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7",
"Accept-Language": "ru", # Используем 'ru' как в новом скрипте, можно сделать параметром при желании
"Cache-Control": "max-age=0",
"Sec-Ch-Ua": '"Not-A.Brand";v="8", "Chromium";v="135", "Google Chrome";v="135"',
"Sec-Ch-Ua-Mobile": "?0",
"Sec-Ch-Ua-Platform": '"Windows"',
"Upgrade-Insecure-Requests": "1",
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/135.0.0.0 Safari/537.36",
"Origin": "https://www.google.com", # Изменено
"Referer": "https://www.google.com/", # Изменено
"Sec-Fetch-Site": "same-site", # Изменено
"Sec-Fetch-Mode": "navigate",
"Sec-Fetch-Dest": "document",
"Sec-Fetch-User": "?1",
"Priority": "u=0, i", # Добавлено
}
SUPPORTED_MIMES = [ # Оставляем из старого кода, хотя новый код определяет сам
"image/x-icon",
"image/bmp",
"image/jpeg",
"image/png",
"image/tiff",
"image/webp",
"image/heic",
]
def __init__(self, proxy: Optional[Union[str, Dict[str, str]]] = None):
self.proxy = proxy
# self.cookie_jar = cookielib.CookieJar() # Убрали, httpx управляет куками внутри клиента
@staticmethod
def _extract_ids_from_url(url_string: str) -> Tuple[Optional[str], Optional[str]]:
try:
parsed_url = urlparse(url_string)
query_params = parse_qs(parsed_url.query)
vsrid = query_params.get("vsrid", [None])[0]
lsessionid = query_params.get("lsessionid", [None])[0]
return vsrid, lsessionid
except Exception as e:
# Логирование можно добавить сюда при необходимости, если self.logger доступен
print(
f"Error extracting IDs from URL {url_string}: {e}"
) # Просто вывод для примера
return None, None
def _create_httpx_client(self) -> httpx.Client:
"""Создает httpx клиент с настройками прокси."""
client_kwargs = {
"follow_redirects": True,
"timeout": httpx.Timeout(30.0, connect=10.0),
"limits": httpx.Limits(max_keepalive_connections=5, max_connections=10),
"http2": False,
"verify": True, # Можно сделать параметром, если нужно отключать проверку SSL
}
if self.proxy:
mounts = {}
try:
if isinstance(self.proxy, str):
# Если прокси строка, применяем ко всем протоколам
# Проверяем схему, чтобы использовать http:// для https:// туннелирования, если не указано явно
proxy_url = self.proxy
if "://" not in proxy_url:
proxy_url = f"http://{proxy_url}" # По умолчанию http
# Проверяем схему для https://
https_proxy_url = proxy_url
if urlparse(proxy_url).scheme in ["http", "socks4", "socks5"]:
# Для https запросов чаще всего используется http прокси или socks
pass # Используем тот же прокси
# Если указан https прокси, используем его, хотя это редкость
# elif urlparse(proxy_url).scheme == 'https':
# https_proxy_url = proxy_url
mounts["http://"] = httpx.HTTPTransport(proxy=proxy_url)
mounts["https://"] = httpx.HTTPTransport(proxy=https_proxy_url)
elif isinstance(self.proxy, dict):
# Если словарь, используем указанные транспорты
if "http://" in self.proxy:
mounts["http://"] = httpx.HTTPTransport(
proxy=self.proxy["http://"]
)
if "https://" in self.proxy:
# Применяем "gotcha" из документации httpx:
# URL прокси для https:// обычно должен быть http://, если не указано иное
https_proxy_spec = self.proxy["https://"]
if (
isinstance(https_proxy_spec, str)
and urlparse(https_proxy_spec).scheme == "https"
):
# Если пользователь явно указал https:// для https-прокси, используем это
mounts["https://"] = httpx.HTTPTransport(
proxy=https_proxy_spec
)
elif isinstance(https_proxy_spec, str):
# Иначе, предполагаем, что нужен http:// или socks для https-трафика
mounts["https://"] = httpx.HTTPTransport(
proxy=https_proxy_spec
)
# Можно добавить обработку если https_proxy_spec это не строка, а объект Transport
# else:
# mounts["https://"] = https_proxy_spec # Если передали готовый транспорт
if mounts:
client_kwargs["mounts"] = mounts
except Exception as e:
# Логирование или обработка ошибки парсинга/установки прокси
print(f"Error setting up proxy: {e}") # Пример вывода
# Можно либо упасть с ошибкой, либо продолжить без прокси
return httpx.Client(**client_kwargs)
def scan_by_data(
self, data: bytes, mime: str, dimensions: Tuple[int, int]
) -> Dict[str, Any]:
"""
Отправляет данные изображения в Google Lens и получает результат OCR.
Использует двухэтапный процесс: загрузка, затем получение метаданных.
"""
random_filename = "".join(random.choices(string.ascii_letters, k=8))
# Определяем расширение на основе mime-типа для имени файла
ext = ".jpg" # По умолчанию
if mime == "image/png":
ext = ".png"
elif mime == "image/webp":
ext = ".webp"
elif mime == "image/gif":
ext = ".gif"
# Добавьте другие типы при необходимости
filename = f"{random_filename}{ext}"
files = {"encoded_image": (filename, data, mime)}
# Параметры для запроса загрузки из нового скрипта
params_upload = {
"hl": "ru", # Язык интерфейса Lens
"re": "av",
"vpw": "1903", # Используем фиксированные значения из нового скрипта
"vph": "953", # Вместо dimensions, как в новом скрипте
"ep": "gsbubb",
"st": str(int(time.time() * 1000)),
}
try:
# Создаем клиент для сессии (включая обработку прокси)
with self._create_httpx_client() as client:
# 1. Загрузка изображения
upload_headers = self.HEADERS.copy()
response_upload = client.post(
self.LENS_UPLOAD_ENDPOINT,
headers=upload_headers,
files=files,
params=params_upload,
)
response_upload.raise_for_status() # Проверка на HTTP ошибки
final_url = str(response_upload.url) # URL после редиректов
# 2. Извлечение ID сессии
vsrid, lsessionid = self._extract_ids_from_url(final_url)
if not vsrid or not lsessionid:
raise Exception(
"Failed to extract vsrid or lsessionid from upload redirect URL."
)
# 3. Запрос метаданных
metadata_params = {
"vsrid": vsrid,
"lsessionid": lsessionid,
"hl": params_upload["hl"],
"qf": "CAI%3D",
"st": str(int(time.time() * 1000)),
"vpw": params_upload["vpw"],
"vph": params_upload["vph"],
"source": "lens",
}
# Модифицированные заголовки для запроса метаданных из нового скрипта
metadata_headers = self.HEADERS.copy()
metadata_headers.update(
{
"Accept": "*/*",
"Referer": final_url, # Важно указать реферер
"Sec-Fetch-Site": "same-origin", # Изменено
"Sec-Fetch-Mode": "cors", # Изменено
"Sec-Fetch-Dest": "empty", # Изменено
"Priority": "u=1, i", # Изменено
}
)
# Удаляем ненужные для этого запроса заголовки
metadata_headers.pop("Upgrade-Insecure-Requests", None)
metadata_headers.pop("Sec-Fetch-User", None)
metadata_headers.pop("Cache-Control", None)
metadata_headers.pop("Origin", None) # Origin не нужен для GET
response_metadata = client.get(
self.LENS_METADATA_ENDPOINT,
headers=metadata_headers,
params=metadata_params,
)
response_metadata.raise_for_status()
# 4. Парсинг ответа метаданных
response_text = response_metadata.text
# Убираем префикс, если он есть
if response_text.startswith(")]}'\n"):
response_text = response_text[5:]
elif response_text.startswith(")]}'"):
response_text = response_text[4:]
# Используем json5 для парсинга
metadata_json = json5.loads(response_text)
return metadata_json # Возвращаем распарсенный JSON
except httpx.HTTPStatusError as e:
# Логирование можно улучшить, передавая self.logger
print(f"HTTP error: {e.response.status_code} for URL {e.request.url}")
raise Exception(f"HTTP Error {e.response.status_code}") from e
except httpx.RequestError as e:
print(f"Request error: {e}")
raise Exception(f"Request Error: {e}") from e
except Exception as e:
print(f"Unexpected error in scan_by_data: {e}")
raise # Перевыбрасываем другие ошибки
class Lens(LensCore):
def __init__(self, proxy: Optional[Union[str, Dict[str, str]]] = None):
super().__init__(proxy=proxy)
@staticmethod
def resize_image(
image: Image.Image, max_size: Tuple[int, int] = (1000, 1000)
) -> Tuple[bytes, Tuple[int, int]]:
"""Изменяет размер изображения и конвертирует в JPEG байты."""
# Эта логика остается полезной
image.thumbnail(max_size)
if image.mode in ("RGBA", "P"): # Добавим P для палитры
image = image.convert("RGB")
buffer = io.BytesIO()
image.save(buffer, format="JPEG", quality=95) # Можно настроить качество
return buffer.getvalue(), image.size
def scan_by_file(self, file_path: str) -> Dict[str, Any]:
"""Сканирует изображение из файла."""
try:
with Image.open(file_path) as img:
img_data, dimensions = self.resize_image(img)
# Определяем mime тип (хотя scan_by_data пока использует только имя файла)
mime = Image.MIME.get(img.format) or "image/jpeg"
return self.scan_by_data(img_data, mime, dimensions)
except FileNotFoundError:
raise FileNotFoundError(f"Image file not found: {file_path}")
except Exception as e:
raise Exception(f"Error processing image file {file_path}: {e}") from e
def scan_by_buffer(self, buffer: bytes) -> Dict[str, Any]:
"""Сканирует изображение из байтового буфера."""
try:
img = Image.open(io.BytesIO(buffer))
img_data, dimensions = self.resize_image(img)
mime = Image.MIME.get(img.format) or "image/jpeg"
return self.scan_by_data(img_data, mime, dimensions)
except Exception as e:
raise Exception(f"Error processing image buffer: {e}") from e
class LensAPI:
def __init__(self, proxy: Optional[Union[str, Dict[str, str]]] = None):
self.lens = Lens(proxy=proxy)
@staticmethod
def adaptive_parse_text_and_language(
metadata_json: Any,
) -> Tuple[Optional[str], List[str], List[Dict[str, Any]]]:
"""
Адаптивно парсит JSON для извлечения языка, текстовых блоков и аннотаций слов.
Это синхронная версия функции из нового скрипта.
"""
language = None
all_word_annotations = []
reconstructed_blocks = []
try:
if not isinstance(metadata_json, list) or not metadata_json:
# Логгирование или вывод ошибки
print("Invalid JSON structure: metadata_json is not a non-empty list.")
return None, [], []
response_container = next(
(
item
for item in metadata_json
if isinstance(item, list)
and item
and item[0] == "fetch_query_formulation_metadata_response"
),
None,
)
if response_container is None:
print(
"Could not find 'fetch_query_formulation_metadata_response' container."
)
return None, [], []
# --- Извлечение языка ---
try:
# Путь к языку может варьироваться, пробуем наиболее вероятный
# Примерный путь: response_container[2] содержит список, где один из элементов - код языка 'xx'
if len(response_container) > 2 and isinstance(
response_container[2], list
):
lang_section = response_container[2]
# Ищем строку из 2 символов
language = next(
(
element
for element in lang_section
if isinstance(element, str) and len(element) == 2
),
None,
)
# Альтернативный путь, если первый не сработал (гипотетический)
# if not language and len(response_container) > 1 and isinstance(response_container[1], list):
# lang_section = response_container[1]
# language = next((element for element in lang_section if isinstance(element, str) and len(element) == 2), None)
except (IndexError, TypeError, StopIteration):
print("Could not find language code in expected structure.")
pass # Продолжаем без языка
# --- Извлечение текста/слов ---
segments_iterable = None
# Пробуем разные пути к списку сегментов/блоков текста
# Эти пути основаны на анализе JSON ответа Lens и могут потребовать корректировки
possible_paths_to_segments_list = [
lambda rc: (
rc[1][0][0][0]
if len(rc) > 1
and isinstance(rc[1], list)
and rc[1]
and isinstance(rc[1][0], list)
and rc[1][0]
and isinstance(rc[1][0][0], list)
and rc[1][0][0]
and isinstance(rc[1][0][0][0], list)
else None
), # Самый частый путь из нового скрипта
lambda rc: (
rc[2][0][0][0]
if len(rc) > 2
and isinstance(rc[2], list)
and rc[2]
and isinstance(rc[2][0], list)
and rc[2][0]
and isinstance(rc[2][0][0], list)
and rc[2][0][0]
and isinstance(rc[2][0][0][0], list)
else None
), # Другой возможный путь
lambda rc: (
rc[1][0][0]
if len(rc) > 1
and isinstance(rc[1], list)
and rc[1]
and isinstance(rc[1][0], list)
and rc[1][0]
and isinstance(rc[1][0][0], list)
else None
), # Более короткий путь
lambda rc: (
rc[2][0][0]
if len(rc) > 2
and isinstance(rc[2], list)
and rc[2]
and isinstance(rc[2][0], list)
and rc[2][0]
and isinstance(rc[2][0][0], list)
else None
),
]
for path_func in possible_paths_to_segments_list:
try:
candidate_iterable = path_func(response_container)
# Проверяем, что это не пустой список списков (сегментов)
if isinstance(candidate_iterable, list) and candidate_iterable:
# Проверяем структуру первого элемента на наличие списка слов
first_segment = candidate_iterable[0]
if (
isinstance(first_segment, list)
and len(first_segment) > 1
and isinstance(first_segment[1], list)
):
# Дополнительная проверка на вложенность, характерную для списка слов
if (
first_segment[1]
and isinstance(first_segment[1][0], list)
and first_segment[1][0]
and isinstance(first_segment[1][0][0], list)
):
segments_iterable = candidate_iterable
break # Нашли подходящую структуру
except (IndexError, TypeError, AttributeError):
continue # Пробуем следующий путь
if segments_iterable is None:
print(f"Could not identify valid text segments list.")
# Попытка найти полный текст в другом месте (как в старом коде)
try:
full_text_alt = response_container[2][0][
0
] # Путь из старого кода data[3][4][0][0] адаптирован
if isinstance(full_text_alt, list):
reconstructed_blocks = full_text_alt
elif isinstance(full_text_alt, str):
reconstructed_blocks = [full_text_alt]
print("Found alternative full text structure.")
return (
language,
reconstructed_blocks,
[],
) # Возвращаем без координат
except (IndexError, TypeError):
print("Could not find alternative full text structure either.")
return language, [], [] # Ничего не найдено
# Обработка найденных сегментов
for segment_list in segments_iterable:
current_block_word_annotations = []
block_text_builder = io.StringIO()
last_word_ends_with_space = False
if (
not isinstance(segment_list, list)
or len(segment_list) < 2
or not isinstance(segment_list[1], list)
):
print(
f"Skipping segment: Invalid structure or word group list not found at index [1]. Segment: {segment_list}"
)
continue
try:
word_groups_list = segment_list[1]
for group_count, word_group in enumerate(word_groups_list, 1):
# Ожидаемая структура: [[word_list]]
if not (
isinstance(word_group, list)
and len(word_group) > 0
and isinstance(word_group[0], list)
):
print(
f"Skipping word group: Invalid structure. Group: {word_group}"
)
continue
word_list = word_group[0]
if (
group_count > 1
and block_text_builder.tell() > 0
and not last_word_ends_with_space
):
block_text_builder.write(" ")
last_word_ends_with_space = True
for word_info in word_list:
# Ожидаемая структура слова: [?, text, space_indicator, [bbox], ...]
try:
if (
isinstance(word_info, list)
and len(word_info) > 3
and isinstance(word_info[1], str)
and isinstance(word_info[2], str)
and isinstance(word_info[3], list)
and word_info[3]
and isinstance(word_info[3][0], list)
):
text = word_info[1]
space_indicator = word_info[2] # Обычно " " или ""
bbox = word_info[3][
0
] # Сам bbox [[x,y], [x,y], [x,y], [x,y]] ? или [x,y,w,h] ? Проверить!
# Новый парсер возвращает bbox, сохраняем его. Формат может потребовать уточнения.
# Пример из нового скрипта предполагает bbox = word_info[3][0], оставляем так.
current_block_word_annotations.append(
{"text": text, "bbox": bbox}
)
block_text_builder.write(text)
if (
space_indicator
): # Добавляем пробел только если он есть
block_text_builder.write(space_indicator)
last_word_ends_with_space = (
space_indicator == " "
)
else:
last_word_ends_with_space = False
else:
print(
f"Skipping word info: Invalid structure. Info: {word_info}"
)
except (IndexError, TypeError):
print(f"Error processing word info: {word_info}")
continue # К следующему слову
except (IndexError, TypeError) as e:
print(f"Error processing word groups in segment: {e}")
except Exception as e:
print(f"Unexpected error processing segment: {e}")
reconstructed_text = block_text_builder.getvalue().rstrip(
" "
) # Убираем лишний пробел в конце блока
block_text_builder.close()
if (
reconstructed_text or current_block_word_annotations
): # Добавляем блок если есть текст или аннотации
reconstructed_blocks.append(reconstructed_text)
all_word_annotations.extend(current_block_word_annotations)
except Exception as e:
print(f"Critical error during adaptive text extraction: {e}")
# Возвращаем то, что успели собрать
return language, reconstructed_blocks, all_word_annotations
# print(f"Adaptive parsing complete. Lang: '{language}'. Blocks: {len(reconstructed_blocks)}. Words: {len(all_word_annotations)}.")
return language, reconstructed_blocks, all_word_annotations
@staticmethod
def stitch_text_sequential(word_annotations: List[Dict[str, Any]]) -> str:
"""Просто соединяет текст из аннотаций слов."""
# Новый парсер уже содержит space_indicator, но для совместимости сделаем просто join
# Возможно, лучше использовать reconstructed_blocks из парсера?
# Пока оставим так для имитации старого поведения 'Coordinate sequence'
text = " ".join([item["text"] for item in word_annotations])
# Применим простое удаление пробелов перед знаками препинания
text = re.sub(r"\s+([,.!?])", r"\1", text)
return text.strip()
@staticmethod
def stitch_text_smart(word_annotations: List[Dict[str, Any]]) -> str:
if (
not word_annotations
or "bbox" not in word_annotations[0]
or not isinstance(word_annotations[0]["bbox"], list)
or not word_annotations[0]["bbox"]
):
print("Cannot perform smart stitching: bbox data missing or invalid.")
return LensAPI.stitch_text_sequential(
word_annotations
) # Возврат к простому соединению
try:
# Пытаемся извлечь координаты для сортировки (берем Y первой точки)
def get_sort_key(element):
try:
# Допустим bbox = [[x1,y1], [x2,y1], [x2,y2], [x1,y2]]
bbox = element["bbox"]
if isinstance(bbox[0], list) and len(bbox[0]) >= 2:
y = bbox[0][1] # Y координата первой точки
x = bbox[0][0] # X координата первой точки
return (round(y, 2), x) # Сортируем по Y, затем по X
# Допустим bbox = [x, y, w, h] (менее вероятно для Lens)
elif len(bbox) >= 2:
y = bbox[1]
x = bbox[0]
return (round(y, 2), x)
else:
return (0, 0) # Не удалось извлечь
except (IndexError, TypeError):
return (0, 0) # Ошибка при извлечении
sorted_elements = sorted(word_annotations, key=get_sort_key)
stitched_lines = []
current_line = []
current_y = None
# Порог для определения новой строки (нужно подбирать экспериментально)
# Зависит от масштаба координат bbox
y_threshold = 10 # Примерное значение, если координаты в пикселях
for element in sorted_elements:
element_y, _ = get_sort_key(element)
text = element["text"]
if current_y is None or abs(element_y - current_y) > y_threshold:
if current_line:
stitched_lines.append(" ".join(current_line))
current_line = [text]
current_y = element_y
else:
# Простая логика склеивания знаков препинания
if text in [",", ".", "!", "?", ";", ":"] and current_line:
current_line[-1] += text
else:
current_line.append(text)
if current_line:
stitched_lines.append(" ".join(current_line))
return "\n".join(stitched_lines).strip()
except Exception as e:
print(f"Error during smart stitching: {e}")
return LensAPI.stitch_text_sequential(
word_annotations
) # Возврат к простому
def process_image(
self,
image_path: Optional[str] = None,
image_buffer: Optional[bytes] = None,
response_method: str = "Full Text",
) -> Dict[str, Any]:
"""Обрабатывает изображение и возвращает результат OCR."""
if image_path:
raw_result_json = self.lens.scan_by_file(image_path)
elif image_buffer:
raw_result_json = self.lens.scan_by_buffer(image_buffer)
else:
raise ValueError("Either image_path or image_buffer must be provided")
# Парсим результат с помощью новой функции
language, reconstructed_blocks, word_annotations = (
self.adaptive_parse_text_and_language(raw_result_json)
)
# Формируем результат в зависимости от выбранного метода
full_text = ""
if response_method == "Full Text":
# Используем текст, реконструированный парсером по блокам
full_text = "\n".join(reconstructed_blocks).strip()
if not full_text and word_annotations: # Если блоки пустые, но слова есть
full_text = self.stitch_text_sequential(word_annotations)
elif response_method == "Coordinate sequence":
# Соединяем слова последовательно
full_text = self.stitch_text_sequential(word_annotations)
elif response_method == "Location coordinates":
# Пытаемся соединить умнее (или возвращаем реконструированные блоки)
# full_text = self.stitch_text_smart(word_annotations) # Можно раскомментировать для теста
# Пока возвращаем реконструированный текст, т.к. парсер уже пытается это делать
full_text = "\n".join(reconstructed_blocks).strip()
if not full_text and word_annotations:
full_text = self.stitch_text_sequential(word_annotations)
else:
raise ValueError(f"Invalid response method: {response_method}")
return {
"full_text": (
full_text if full_text else ""
), # Возвращаем пустую строку если текста нет
"language": language if language else "und", # 'und' - неопределенный
"text_with_coordinates": word_annotations, # Возвращаем список аннотаций слов
}
def format_ocr_result(result: Dict[str, Any]) -> str:
"""Форматирует результат OCR для вывода (например, в debug)."""
# Адаптируем под новый формат 'text_with_coordinates' (список словарей)
formatted_result = {
"language": result.get("language", "und"),
"full_text_preview": result.get("full_text", "")[:100]
+ "...", # Добавим превью текста
"word_annotations_count": len(result.get("text_with_coordinates", [])),
# Выводим первые несколько аннотаций для примера
"word_annotations_preview": [
f"{item['text']} (bbox: {item.get('bbox', 'N/A')})"
for item in result.get("text_with_coordinates", [])[:5] # Первые 5 слов
],
}
try:
# Используем json5 для красивого вывода
return json5.dumps(
formatted_result, indent=2, ensure_ascii=False, quote_keys=True
)
except Exception: # Если json5 не справится со сложными объектами bbox
return str(formatted_result) # Простой вывод строки
@register_OCR("google_lens")
class OCRLensAPI(OCRBase):
# Параметры остаются в основном те же
params = {
"delay": 1.0,
"newline_handling": {
"type": "selector",
"options": ["preserve", "remove"],
"value": "preserve",
"description": "Handle newline characters in OCR result (only affects final string)", # Уточнение
},
"no_uppercase": {
"type": "checkbox",
"value": False,
"description": "Convert text to lowercase except first letter of sentences",
},
"response_method": {
"type": "selector",
"options": ["Full Text", "Coordinate sequence", "Location coordinates"],
"value": "Full Text",
"description": "Method for extracting/stitching text from image data", # Уточнение
},
"proxy": {
"value": "",
"description": 'Proxy address (e.g., http://user:pass@host:port, socks5://host:port, or dict {"http://": ..., "https://": ...})', # Обновлено описание
},
"description": "OCR using Google Lens API (New Engine)", # Обновлено
}
# Свойства остаются те же
@property
def request_delay(self):
try:
return float(self.get_param_value("delay"))
except (ValueError, TypeError):
return 1.0
@property
def newline_handling(self):
return self.get_param_value("newline_handling")
@property
def no_uppercase(self):
return self.get_param_value("no_uppercase")
@property
def response_method(self):
return self.get_param_value("response_method")
@property
def proxy(self) -> Optional[Union[str, Dict[str, str]]]:
val = self.get_param_value("proxy")
# Пробуем распарсить как JSON-словарь, если строка начинается с {
if isinstance(val, str) and val.strip().startswith("{"):
try:
parsed_dict = json5.loads(val)
if isinstance(parsed_dict, dict):
return parsed_dict
except Exception:
pass # Если не парсится как словарь, оставляем строкой
return val if val else None # Возвращаем None если пустая строка
def __init__(self, **params) -> None:
# Инициализация базового класса
super().__init__(**params)
# Инициализация API с текущим прокси
self.api = LensAPI(proxy=self.proxy)
self.last_request_time = 0
def _ocr_blk_list(
self, img: np.ndarray, blk_list: List[TextBlock], *args, **kwargs
):
im_h, im_w = img.shape[:2]
if self.debug_mode:
self.logger.debug(f"Image size: {im_h}x{im_w}")
for blk in blk_list:
x1, y1, x2, y2 = blk.xyxy
if self.debug_mode:
self.logger.debug(f"Processing block: ({x1, y1, x2, y2})")
if 0 <= y1 < y2 <= im_h and 0 <= x1 < x2 <= im_w:
cropped_img = img[y1:y2, x1:x2]
if cropped_img.size > 0: # Доп. проверка, что кроп не пустой
if self.debug_mode:
self.logger.debug(f"Cropped image size: {cropped_img.shape}")
blk.text = self.ocr(cropped_img) # Вызываем основной метод OCR
else:
if self.debug_mode:
self.logger.warning(
f"Empty cropped image for block: ({x1, y1, x2, y2})"
)
blk.text = ""
else:
if self.debug_mode:
self.logger.warning(
f"Invalid text bbox {blk.xyxy} for image size {im_h}x{im_w}"
)
blk.text = ""
def ocr_img(self, img: np.ndarray) -> str:
if self.debug_mode:
self.logger.debug(f"ocr_img shape: {img.shape}")
return self.ocr(img)
def ocr(self, img: np.ndarray) -> str:
if self.debug_mode:
self.logger.debug(f"Starting OCR on image shape: {img.shape}")
self._respect_delay()
try:
if img.size == 0:
if self.debug_mode:
self.logger.warning("Empty image provided for OCR")
return ""
is_success, buffer = cv2.imencode(
".jpg", img, [int(cv2.IMWRITE_JPEG_QUALITY), 95]
)
if not is_success:
if self.debug_mode:
self.logger.error("Failed to encode image to JPEG buffer")
return ""
# Вызываем API
result = self.api.process_image(
image_buffer=buffer.tobytes(), response_method=self.response_method
)
if self.debug_mode:
formatted_res_str = format_ocr_result(result)
self.logger.debug(f"OCR raw result:\n{formatted_res_str}")
full_text = result.get("full_text", "")
if self.newline_handling == "remove":
full_text = full_text.replace("\n", " ").replace(
"\r", ""
) # Убираем и \r
full_text = self._apply_punctuation_and_spacing(full_text)
if self.no_uppercase:
full_text = self._apply_no_uppercase(full_text)
return str(full_text) if full_text is not None else ""
except Exception as e:
if self.debug_mode:
self.logger.error(f"OCR error occurred", exc_info=True)
else:
self.logger.error(f"OCR error: {type(e).__name__} - {e}")
return ""
def _apply_no_uppercase(self, text: str) -> str:
def process_sentence(sentence):
sentence = sentence.strip()
if not sentence:
return ""
return sentence[0].upper() + sentence[1:].lower()
sentences = re.split(r"(?<=[.!?…])\s+", text)
processed_sentences = [process_sentence(s) for s in sentences if s]
return " ".join(processed_sentences)
def _apply_punctuation_and_spacing(self, text: str) -> str:
text = re.sub(r"\s+([,.!?…:;])", r"\1", text)
text = re.sub(r"([,.!?…:;])(?![,.!?…:;\s])(?!\Z)", r"\1 ", text)
text = re.sub(r"\s+", " ", text)
return text.strip()
def _respect_delay(self):
current_time = time.time()
time_since_last_request = current_time - self.last_request_time
delay = self.request_delay
if time_since_last_request < delay:
sleep_time = delay - time_since_last_request
if self.debug_mode:
self.logger.info(
f"Delay active. Sleeping for {sleep_time:.3f} seconds (Delay: {delay:.1f}s, Since last: {time_since_last_request:.3f}s)"
)
time.sleep(sleep_time)
self.last_request_time = time.time()
def updateParam(self, param_key: str, param_content: Any):
if param_key == "delay":
try:
param_content = float(param_content)
except (ValueError, TypeError):
param_content = 1.0
super().updateParam(param_key, param_content)
if param_key == "proxy":
if self.debug_mode:
self.logger.info(
f"Proxy parameter updated to: {self.proxy}. Reinitializing LensAPI."
)
try:
self.api = LensAPI(proxy=self.proxy)
except Exception as e:
self.logger.error(
f"Failed to reinitialize LensAPI after proxy update: {e}",
exc_info=True,
)