Spaces:
Sleeping
Sleeping
| import CDM.detect_text.ocr as ocr | |
| from CDM.detect_text.Text import Text | |
| import numpy as np | |
| import cv2 | |
| import json | |
| import time | |
| import os | |
| from os.path import join as pjoin | |
| # from paddleocr import PaddleOCR | |
| import pytesseract | |
| # paddle_model = PaddleOCR(use_angle_cls=True, lang="en") #'ch' for chinese and english, 'en' for english | |
| def save_detection_json(file_path, texts, img_shape): | |
| f_out = open(file_path, 'w') | |
| output = {'img_shape': img_shape, 'texts': []} | |
| for text in texts: | |
| c = {'id': text.id, 'content': text.content} | |
| loc = text.location | |
| c['column_min'], c['row_min'], c['column_max'], c['row_max'] = loc['left'], loc['top'], loc['right'], loc['bottom'] | |
| c['width'] = text.width | |
| c['height'] = text.height | |
| output['texts'].append(c) | |
| json.dump(output, f_out, indent=4) | |
| def visualize_texts(org_img, texts, shown_resize_height=None, show=False, write_path=None): | |
| img = org_img.copy() | |
| for text in texts: | |
| text.visualize_element(img, line=2) | |
| img_resize = img | |
| if shown_resize_height is not None: | |
| img_resize = cv2.resize(img, (int(shown_resize_height * (img.shape[1]/img.shape[0])), shown_resize_height)) | |
| if show: | |
| cv2.imshow('texts', img_resize) | |
| cv2.waitKey(0) | |
| cv2.destroyWindow('texts') | |
| if write_path is not None: | |
| cv2.imwrite(write_path, img) | |
| def text_sentences_recognition(texts): | |
| ''' | |
| Merge separate words detected by Google ocr into a sentence | |
| ''' | |
| changed = True | |
| while changed: | |
| changed = False | |
| temp_set = [] | |
| for text_a in texts: | |
| merged = False | |
| for text_b in temp_set: | |
| if text_a.is_on_same_line(text_b, 'h', bias_justify=0.2 * min(text_a.height, text_b.height), bias_gap=2 * max(text_a.word_width, text_b.word_width)): | |
| text_b.merge_text(text_a) | |
| merged = True | |
| changed = True | |
| break | |
| if not merged: | |
| temp_set.append(text_a) | |
| texts = temp_set.copy() | |
| for i, text in enumerate(texts): | |
| text.id = i | |
| return texts | |
| def merge_intersected_texts(texts): | |
| ''' | |
| Merge intersected texts (sentences or words) | |
| ''' | |
| changed = True | |
| while changed: | |
| changed = False | |
| temp_set = [] | |
| for text_a in texts: | |
| merged = False | |
| for text_b in temp_set: | |
| if text_a.is_intersected(text_b, bias=2): | |
| text_b.merge_text(text_a) | |
| merged = True | |
| changed = True | |
| break | |
| if not merged: | |
| temp_set.append(text_a) | |
| texts = temp_set.copy() | |
| return texts | |
| def text_cvt_orc_format(ocr_result): | |
| texts = [] | |
| if ocr_result is not None: | |
| for i, result in enumerate(ocr_result): | |
| error = False | |
| x_coordinates = [] | |
| y_coordinates = [] | |
| text_location = result['boundingPoly']['vertices'] | |
| content = result['description'] | |
| for loc in text_location: | |
| if 'x' not in loc or 'y' not in loc: | |
| error = True | |
| break | |
| x_coordinates.append(loc['x']) | |
| y_coordinates.append(loc['y']) | |
| if error: continue | |
| location = {'left': min(x_coordinates), 'top': min(y_coordinates), | |
| 'right': max(x_coordinates), 'bottom': max(y_coordinates)} | |
| texts.append(Text(i, content, location)) | |
| return texts | |
| def text_cvt_orc_format_paddle(paddle_result): | |
| texts = [] | |
| for i, line in enumerate(paddle_result): | |
| points = np.array(line[0]) | |
| # points = points * 5 | |
| location = {'left': int(min(points[:, 0])), 'top': int(min(points[:, 1])), 'right': int(max(points[:, 0])), | |
| 'bottom': int(max(points[:, 1]))} | |
| content = line[1][0] | |
| texts.append(Text(i, content, location)) | |
| return texts | |
| def text_cvt_orc_format_tesseract(tesseract_result): | |
| # texts = [] | |
| # i_real = 0 | |
| # for i, line in enumerate(tesseract_result['text']): | |
| # content = line.strip() | |
| # location = { | |
| # 'left': int(tesseract_result['left'][i]), | |
| # 'top': int(tesseract_result['top'][i]), | |
| # 'right': int(tesseract_result['left'][i]) + int(tesseract_result['width'][i]), | |
| # 'bottom': int(tesseract_result['top'][i]) + int(tesseract_result['height'][i]) | |
| # } | |
| # if len(content) > 0: | |
| # texts.append(Text(i_real, content, location)) | |
| # i_real = i_real + 1 | |
| # Extract line boxes | |
| texts = [] | |
| i_real = 0 | |
| line_boxes = [] | |
| n_boxes = len(tesseract_result['level']) | |
| for i in range(n_boxes): | |
| if tesseract_result['level'][i] == 4 and len(tesseract_result['text'][i].strip()) > 0: | |
| # (x, y, w, h) = (tesseract_result['left'][i], tesseract_result['top'][i], tesseract_result['width'][i], tesseract_result['height'][i]) | |
| content = tesseract_result['text'][i].strip() | |
| location = { | |
| 'left': int(tesseract_result['left'][i]), | |
| 'top': int(tesseract_result['top'][i]), | |
| 'right': int(tesseract_result['left'][i]) + int(tesseract_result['width'][i]), | |
| 'bottom': int(tesseract_result['top'][i]) + int(tesseract_result['height'][i]) | |
| } | |
| texts.append(Text(i_real, content, location)) | |
| i_real = i_real + 1 | |
| # print("ocr result: ", texts) | |
| return texts | |
| def text_cvt_orc_format_tesseract_by_line(data): | |
| # line_data = [] | |
| line_num = None | |
| line_text = [] | |
| line_box = [0, 0, 0, 0] | |
| texts = [] | |
| i_real = 0 | |
| for i in range(len(data['level'])): | |
| # check if the level is word | |
| if data['level'][i] == 5: | |
| if line_num != data['line_num'][i]: | |
| if line_num is not None: # append the previous line data to line_data | |
| content = ' '.join(line_text) | |
| location = { | |
| 'left': line_box[0], | |
| 'top': line_box[1], | |
| 'right': line_box[2], | |
| 'bottom': line_box[3] | |
| } | |
| texts.append(Text(i_real, content, location)) | |
| i_real = i_real + 1 | |
| # start a new line | |
| line_num = data['line_num'][i] | |
| line_text = [data['text'][i]] | |
| line_box = [ | |
| data['left'][i], | |
| data['top'][i], | |
| data['left'][i] + data['width'][i], | |
| data['top'][i] + data['height'][i], | |
| ] | |
| else: # add a word to the current line | |
| line_text.append(data['text'][i]) | |
| line_box[2] = max(line_box[2], data['left'][i] + data['width'][i]) | |
| line_box[3] = max(line_box[3], data['top'][i] + data['height'][i]) | |
| # append the last line data to line_data | |
| if line_text: | |
| content = ' '.join(line_text) | |
| location = { | |
| 'left': line_box[0], | |
| 'top': line_box[1], | |
| 'right': line_box[2], | |
| 'bottom': line_box[3] | |
| } | |
| texts.append(Text(i_real, content, location)) | |
| i_real = i_real + 1 | |
| return texts | |
| def text_filter_noise(texts): | |
| valid_texts = [] | |
| for text in texts: | |
| if len(text.content) <= 1 and text.content.lower() not in ['a', ',', '.', '!', '?', '$', '%', ':', '&', '+']: | |
| continue | |
| valid_texts.append(text) | |
| return valid_texts | |
| def text_detection(input_file='../data/input/30800.jpg', output_file='../data/output', show=False, method='google', paddle_model=None): | |
| ''' | |
| :param method: google or paddle | |
| :param paddle_model: the preload paddle model for paddle ocr | |
| ''' | |
| start = time.time() | |
| name = input_file.split('/')[-1][:-4] | |
| ocr_root = pjoin(output_file, 'ocr') | |
| img = cv2.imread(input_file) | |
| if img is None: | |
| print("imread nothing!") | |
| # resize the img to speed up the ocr | |
| # img = cv2.resize(img, (int(img.shape[1]/5), int(img.shape[0]/5))) | |
| # cv2.imshow("img", img) | |
| # cv2.waitKey(0) | |
| if method == 'google': | |
| print('*** Detect Text through Google OCR ***') | |
| ocr_result = ocr.ocr_detection_google(input_file) | |
| texts = text_cvt_orc_format(ocr_result) | |
| texts = merge_intersected_texts(texts) | |
| texts = text_filter_noise(texts) | |
| texts = text_sentences_recognition(texts) | |
| ocr_time_cost = time.time() - start | |
| elif method == 'paddle': | |
| # The import of the paddle ocr can be separate to the beginning of the program if you decide to use this method | |
| # from paddleocr import PaddleOCR | |
| print('*** Detect Text through Paddle OCR ***') | |
| # if paddle_model is None: | |
| # paddle_model = PaddleOCR(use_angle_cls=True, lang="en") #'ch' for chinese and english, 'en' for english | |
| # None | |
| result = paddle_model.ocr(input_file, cls=True) | |
| ocr_time_cost = time.time() - start | |
| texts = text_cvt_orc_format_paddle(result) | |
| elif method == 'pytesseract': | |
| img_rgb = cv2.cvtColor(img, cv2.COLOR_BGR2RGB) | |
| # Perform OCR using Tesseract | |
| result = pytesseract.image_to_data(img_rgb, output_type=pytesseract.Output.DICT) | |
| print("ocr result: ", result) | |
| ocr_time_cost = time.time() - start | |
| # Convert the Tesseract result to the desired format | |
| texts = text_cvt_orc_format_tesseract_by_line(result) | |
| print("texts: ", texts) | |
| else: | |
| raise ValueError('Method has to be "google" or "paddle" or "pytesseract"') | |
| visualize_texts(img, texts, shown_resize_height=800, show=show, write_path=pjoin(ocr_root, name+'.png')) | |
| save_detection_json(pjoin(ocr_root, name+'.json'), texts, img.shape) | |
| # ocr_time_cost = time.time() - start | |
| print("[Text Detection Completed in %.3f s] Input: %s Output: %s" % (ocr_time_cost, input_file, pjoin(ocr_root, name+'.json'))) | |
| # print("!!! detected content !!!") | |
| # for text in texts: | |
| # print(text.content) | |
| return ocr_time_cost | |
| # text_detection() | |