Spaces:
Build error
Build error
| import io | |
| import json | |
| from google.cloud import vision | |
| from base import OCROut, OCRTextOut, TextWithLanguage, ocr | |
| from nltk.tokenize.punkt import PunktParameters, PunktSentenceTokenizer | |
| from langdetect import detect | |
| import os | |
| from langdetect.lang_detect_exception import LangDetectException | |
| from typing import Any, Union, List | |
| from glob import glob | |
| from math import sqrt | |
| import pathlib | |
| import pandas as pd | |
| from dotenv import load_dotenv | |
| load_dotenv() | |
| gcp_credentials = os.getenv('gcp_credentials') | |
| creds = json.loads(gcp_credentials) | |
| google_client = vision.ImageAnnotatorClient.from_service_account_info(creds) | |
| def image_to_byte_array(image) -> bytes: | |
| imgByteArr = io.BytesIO() | |
| image.save(imgByteArr, format="png") | |
| image_buffer: bytes = imgByteArr.getvalue() | |
| return image_buffer | |
| def resize_to_megapixels(image, max_megapixels=20): | |
| """ | |
| If input image exceeds the max megapixels limit, resize it to fit in max_megapixels budget | |
| """ | |
| original_x = image.size[0] | |
| original_y = image.size[1] | |
| megapixel = max_megapixels * 1000000 | |
| if original_x * original_y > megapixel: | |
| new_x = int((sqrt((original_x * megapixel) / original_y)).real) | |
| new_y = int((sqrt((original_y * megapixel) / original_x)).real) | |
| new_image = image.resize((new_x, new_y)) | |
| return new_image | |
| else: | |
| return image | |
| def load_product_json_data(path): | |
| # Loads all the json from the passed path and puts them in a dictionary | |
| paths = glob(path+"*.json") | |
| output_dictionary = {} | |
| for path in paths: | |
| product_name = pathlib.Path(path).stem | |
| f = open(path) | |
| json_content = json.load(f) | |
| output_dictionary[product_name] = json_content | |
| return output_dictionary | |
| def load_product_dataframe_data(path): | |
| # Loads all the json from the passed path and puts them in a dictionary | |
| paths = glob(path + "*.json") | |
| output_dictionary = {} | |
| for path in paths: | |
| product_name = pathlib.Path(path).stem | |
| dataframe = pd.read_json(path, orient = 'index') | |
| output_dictionary[product_name] = dataframe | |
| return output_dictionary | |
| def run_image_ocr(image_bytes: bytes, google_client) -> ocr.Output: | |
| """OCR for a PNG image.""" | |
| # Create a client with the secrets in the local key file | |
| # Send image to API | |
| google_response = google_client.text_detection( | |
| image=vision.Image(content=image_bytes) | |
| ) | |
| # Return response | |
| google_response_json = json.loads( | |
| vision.AnnotateImageResponse.to_json(google_response) | |
| ) | |
| return ocr.Output(**google_response_json), google_response_json | |
| def get_block_language(block: ocr.Block) -> str: | |
| """Returns most confident language in block, or "unk" if there is no language""" | |
| if not block.property: | |
| return "unk" | |
| try: | |
| prop = max(block.property.detectedLanguages, key=lambda x: x.confidence) | |
| return prop.languageCode | |
| except Exception: | |
| return "unk" | |
| def block2text(block: ocr.Block) -> TextWithLanguage: | |
| """ | |
| Extract the text from a block. | |
| """ | |
| lang_code = get_block_language(block) | |
| text = "" | |
| for paragraph in block.paragraphs: | |
| for word in paragraph.words: | |
| for symbol in word.symbols: | |
| text += symbol.text | |
| if symbol.property: | |
| if symbol.property.detectedBreak: | |
| text += " " | |
| return TextWithLanguage(text=text.strip(), lang_code=lang_code) | |
| # Takes a list of blocks, and parses the sentences in each block | |
| def get_sentences(blocks: List[TextWithLanguage]) -> List[TextWithLanguage]: | |
| """ | |
| Split the sentences of the blocks and return a list of sentences. | |
| """ | |
| punkt_param = PunktParameters() | |
| punkt_param.abbrev_types = { | |
| "dr", | |
| "vs", | |
| "mr", | |
| "mrs", | |
| "prof", | |
| "inc", | |
| "vit", | |
| "o.a.", | |
| "o.b.v.", | |
| "s.p.", | |
| "m.u.v.", | |
| "i.v.m.", | |
| "a.k.a.", | |
| "e.g.", | |
| "m.b.v.", | |
| "max.", | |
| "min.", | |
| } | |
| sentence_splitter = PunktSentenceTokenizer(punkt_param) | |
| sentences = [] | |
| for block in blocks: | |
| for sentence in sentence_splitter.tokenize(block.text): | |
| try: | |
| lang_code = detect(str(sentence).lower()) | |
| # Because of lack of context in a sentence | |
| # afrikaans is often recognized | |
| if lang_code == "af": | |
| lang_code = "nl" | |
| except LangDetectException: | |
| lang_code = "unk" | |
| sentences.append(TextWithLanguage(text=str(sentence), lang_code=lang_code)) | |
| return sentences | |
| def run_ocr(image_bytes: bytes) -> Union[OCROut, Any]: | |
| # API response is of the type AnnotateImageResponse, see | |
| # https://cloud.google.com/vision/docs/reference/rest/v1/AnnotateImageResponse | |
| # for more details. | |
| ocr_image_annotation, response_json = run_image_ocr(image_bytes, google_client) | |
| # We assume we will only process pictures of one page, | |
| # and no documents of more than one page. Hence, we | |
| # take the first page here. | |
| # Check if the fullTextAnnotations are filled | |
| if ocr_image_annotation.fullTextAnnotation: | |
| ocr_blocks = ocr_image_annotation.fullTextAnnotation.pages[0].blocks | |
| text = ocr_image_annotation.fullTextAnnotation.text | |
| blocks = [block2text(block) for block in ocr_blocks] | |
| block_texts = [block.text for block in blocks] | |
| sentences = get_sentences(blocks) | |
| else: | |
| block_texts = [""] | |
| text = "" | |
| sentences = [TextWithLanguage(text="", lang_code="")] | |
| ocr_text_out = OCRTextOut(blocks=block_texts, full_text=text, sentences=sentences) | |
| return ocr_text_out, response_json |