from pdf2image import convert_from_path import numpy as np import cv2 from PIL import Image import json import sqlite3 from datetime import datetime from transformers import PaliGemmaProcessor, PaliGemmaForConditionalGeneration from dotenv import load_dotenv import os from huggingface_hub import login import torch # from main import predict as predict_main # # # Load environment variables # # load_dotenv() # # # Set the cache directory to a writable path # # os.environ["TORCHINDUCTOR_CACHE_DIR"] = "/tmp/torch_inductor_cache" # # token = os.getenv("huggingface_ankit") # # # Login to the Hugging Face Hub # # login(token) # with open("ocr/VGG Image Annotator_files/mach_labeler.json", "r") as f: # data = json.load(f) # def center_pad_image(image, target_size=448): # # Get original dimensions # original_h, original_w = image.shape[:2] # # If image is larger, resize while maintaining aspect ratio # if original_h > target_size or original_w > target_size: # scale = target_size / max(original_h, original_w) # new_h, new_w = int(original_h * scale), int(original_w * scale) # image = cv2.resize(image, (new_w, new_h), interpolation=cv2.INTER_AREA) # else: # new_h, new_w = original_h, original_w # # Calculate padding # pad_h = (target_size - new_h) // 2 # pad_w = (target_size - new_w) // 2 # # Create black background # new_image = np.ones((target_size, target_size, 3), dtype=np.uint8) * 255 # # Place the resized image at the center # new_image[pad_h:pad_h+new_h, pad_w:pad_w+new_w] = image # return new_image # def predict_check(cropped_image, threshold=0.0870): # gray_image = np.mean(cropped_image, axis=2) # Convert to grayscale # # remove noise # gray_image = cv2.GaussianBlur(gray_image, (5, 5), 0) # pixel_density = np.count_nonzero(gray_image < 128) / gray_image.size # Count dark pixels # # print("pixel threshold ",pixel_density) # if pixel_density > threshold: # return "Ticked" # else: # return "NotTicked" # def make_batch(ocr_regions, batch_size = 6): # for i in range(0, len(ocr_regions), batch_size): # yield ocr_regions[i:i + batch_size] # Yield a batch of size `batch_size` # import requests # def save_images(images,save_dir): # os.makedirs(save_dir, exist_ok=True) # Ensure directory exists # saved_paths = [] # for i, img in enumerate(images): # file_path = os.path.join(save_dir, f"image_{i}.png") # Save as PNG # img.save(file_path) # saved_paths.append(file_path) # Store the file path # return saved_paths # import shutil # def delete_saved_images(save_dir): # if os.path.exists(save_dir): # shutil.rmtree(save_dir) # Deletes the entire folder and its contents # print(f"Deleted all images in {save_dir}") # else: # print(f"Directory {save_dir} does not exist") # def batch_predict_ext(image_batch,save_path): # file_paths = save_images(image_batch,save_path) # # files = [("files", (img, open(img, "rb"), "image/jpeg" if img.endswith(".jpg") else "image/png")) for img in file_paths] # files = [] # for img in file_paths: # with open(img, "rb") as f: # file_content = f.read() # Read the file into memory # file_type = "image/jpeg" if img.endswith(".jpg") else "image/png" # files.append(("files", (img, file_content, file_type))) # Pass the file content # url = "https://aioverlords-amnil-ocr-test-pali.hf.space/batch_extract_text" # headers = {"accept": "application/json"} # response = requests.post(url, files=files, headers=headers) # delete_saved_images(save_path) # if response.status_code == 200: # return response.json() # Returns extracted text as JSON # else: # return {"error": f"Request failed with status code {response.status_code}"} # import uuid # def batch_ocr_ext(file_name,task_id,batch_size): # try: # with open("ocr/VGG Image Annotator_files/mach_labeler.json", "r") as f: # data = json.load(f) # start_time = datetime.now() # check_regions = [] # ocr_regions = [] # blank_regions=[] # # final = [] # j = 0 # for k,v in data['_via_img_metadata'].items(): # # k is the pages in the form # print(k) # # regions is the list of regions in a single page. # # it is a list of dictionary with each dictionary having shape_attributes and region_attributes # regions = data['_via_img_metadata'][k]['regions'] # file = file_name # # Check if the file is pdf # if file.endswith("pdf"): # # Extracts the j-th page from a PDF as an image. # # .convert("L") converts the image to grayscale # # then convert the image to numpy array to process it with opencv # targ_img = np.array(convert_from_path(file)[j].convert("L")) # else: # targ_img = cv2.imread(file, cv2.IMREAD_GRAYSCALE) # # Used for feature detection and image matching # # Possible to optimize? # MAX_NUM_FEATURES = 10000 # orb = cv2.ORB_create(MAX_NUM_FEATURES) # # Load the blank form of j-th page # orig_img = np.array(Image.open(f"ocr/VGG Image Annotator_files/mach_bank_form_page{j}.jpg").convert("L")) # # Detects keypoints (corner-like features) in orig_img and targ_img. # # and computes descriptors, which are binary feature representations for each keypoint. # keypoints1, descriptors1 = orb.detectAndCompute(orig_img, None) # keypoints2, descriptors2 = orb.detectAndCompute(targ_img, None) # # ORB typically works on grayscale images. # # Converts images back to BGR for displaying colored keypoints. # # just for visualization or any other use-case? # img1 = cv2.cvtColor(orig_img, cv2.COLOR_GRAY2BGR) # img2 = cv2.cvtColor(targ_img, cv2.COLOR_GRAY2BGR) # # Match features. # # ORB uses binary descriptors, and Hamming distance counts the number of differing bits. # # Faster than Euclidean distance for binary descriptors. # matcher = cv2.DescriptorMatcher_create(cv2.DESCRIPTOR_MATCHER_BRUTEFORCE_HAMMING) # # match() finds the best match for each descriptor in descriptor1 and descriptor2 # # matches stores a list of cv2.DMatch objects where # # .queryIdx --> index of the keypoint in orig_img # # .trainIdx --> index of matching keypoint in trag_img # # .distance --> Hamming distance # # Converting to list for sorting as tuples are immutable objects. # matches = list(matcher.match(descriptors1, descriptors2, None)) # # Sort matches by score # # Sorting the matches based on hamming distance # matches.sort(key = lambda x: x.distance, reverse = False) # # Remove not so good matches # numGoodMatches = int(0.1*len(matches)) # matches = matches[:numGoodMatches] # # matches = matches[:len(matches)//10] # # Initialize arrays to store Keypoint locations # # float32 used for compatibility with cv2.findHomography() # points1 = np.zeros((len(matches), 2), dtype = np.float32) # points2 = np.zeros((len(matches), 2), dtype = np.float32) # # Extract location of good matches # for i, match in enumerate(matches): # points1[i, :] = keypoints1[match.queryIdx].pt # points2[i, :] = keypoints2[match.trainIdx].pt # # Find homography # h, mask = cv2.findHomography(points2, points1, cv2.RANSAC) # height, width, channels = img1.shape # # Warp img2 to align with img1 # img2_reg = cv2.warpPerspective(img2, h, (width, height)) # region_data = [] # for region in regions: # x, y, width, height = ( # region['shape_attributes']['x'], # region['shape_attributes']['y'], # region['shape_attributes']['width'], # region['shape_attributes']['height'] # ) # name = ( # f"{region['region_attributes']['parent']}_" # f"{region['region_attributes']['key']}_" # f"{region['region_attributes'].get('group', '')}" # ) # name_type = region['region_attributes']['type'] # region_data.append({ # "x": x, # "y": y, # "width": width, # "height": height, # "name": name, # "type": name_type # }) # # iterate through the region_data and crop the images portion and if type is check call predict_check function else call predict function # for region in region_data: # x, y, width, height = region["x"], region["y"], region["width"], region["height"] # cropped_image = img2_reg[y:y+height, x:x+width] # Assuming 'image' is defined # # plt.imshow(cropped_image, cmap='gray') # # plt.axis("off") # # plt.show() # # IF Checkbox, then run checkbox function # # else Check if the cropped image contains any significant edges suggesting there is text and send it to OCR # # If no significant edges are found then not found is returned # # if region["type"] == "check": # # pred = predict_check(cropped_image,threshold=0.0850) # # print(check_status) # if region["type"] == "check": # region["page"] = f"page_{j}" # check_regions.append((region, cropped_image)) # else: # cedge = cv2.Canny(cropped_image[7:-7, 7:-7], 100, 200) # cex_ = cedge.astype(float).sum(axis=0)/255 # cey_ = cedge.astype(float).sum(axis=1)/255 # cex_ = np.count_nonzero(cex_>5) # cey_ = np.count_nonzero(cey_>5) # colr = (0,0,255) # if cex_ > 7 and cey_ > 7: # # Image.fromarray(im).convert('RGB') # im = Image.fromarray(center_pad_image(cropped_image)) # region["page"] = f"page_{j}" # ocr_regions.append((region, im)) # else: # pred = "not found" # region["status"] = pred # region["page"] = f"page_{j}" # blank_regions.append(region) # # if len(check_regions) >= BATCH_SIZE: # # batch_checkpoint(check_regions) # # check_regions = [] # # if len(ocr_regions) >= BATCH_SIZE: # # batch_ocr(ocr_regions) # # ocr_regions =[] # j += 1 # print("Check Regions Started") # # return check_regions,ocr_regions,blank_regions # check_region_data = [] # for check_region in check_regions: # check_region[0]["status"] = predict_check(check_region[1]) # check_region_data.append(check_region[0]) # print("Check Regions End") # print("OCR Regions Started") # region_data = [] # count = 0 # for batch in make_batch(ocr_regions,batch_size): # images = [] # for data in batch: # images.append(data[1]) # print(f"-----Batch {count}------") # save_path = f"{str(uuid.uuid4())}" # response = batch_predict_ext(images,save_path) # extracted_texts = response["extracted_texts"] # print(f"-----Batch {count} Completed------") # for text,region in zip(extracted_texts,batch): # region[0]["status"] = text # region_data.append(region[0]) # count = count + 1 # # Combine all region data # region_data.extend(check_region_data) # region_data.extend(blank_regions) # string_data = json.dumps(region_data) # print(type(string_data)) # # Store the time take for the process to complete # end_time = datetime.now() # time_elapsed = end_time-start_time # time_elapsed_str = str(time_elapsed) # Convert seconds to string # os.remove(file_name) # # Update database # conn = sqlite3.connect('/mnt/data//mnt/data/translations.db') # cursor = conn.cursor() # cursor.execute('UPDATE OCR SET region = ?, time_elapsed = ?, status=?, updated_at = ? WHERE task_id = ? ', # (string_data,time_elapsed_str,"completed",datetime.now(),task_id)) # conn.commit() # conn.close() # print("SUCESSFUL") # except Exception as e: # print(f"OCR Failed : {e}") # try: # conn = sqlite3.connect('/mnt/data//mnt/data/translations.db') # cursor = conn.cursor() # cursor.execute('UPDATE OCR SET status = ? WHERE task_id = ?', ("failed", task_id)) # conn.commit() # conn.close() # except Exception as exec: # print(f"Updating status to database failed: {exec}") from io import BytesIO import requests import cv2 import numpy as np from PIL import Image import json import sqlite3 from datetime import datetime import uuid import aiohttp # Assume data is loaded globally with open("ocr/VGG Image Annotator_files/mach_labeler.json", "r") as f: data = json.load(f) async def check_health(): # Simulating an async health check (e.g., HTTP request) async with aiohttp.ClientSession() as session: async with session.get("https://aioverlords-amnil-ocr-test-pali.hf.space/health") as response: if response.status == 200: return "healthy" else: return "unhealthy" from PIL import Image, ImageDraw def create_at_image(image_height): # Set the height of the image height = image_height # Set a fixed font size, you can adjust it to suit your needs font_size = int(height*0.8) # Font size as a fraction of image height # Create an image with a white background image = Image.new("RGB", (font_size, height), color="white") # Set up the drawing context draw = ImageDraw.Draw(image) # Load a font (You can specify a path to a .ttf file for custom fonts) # font = ImageFont.truetype("arial.ttf", font_size) # Make sure you have arial.ttf # Text to write text = "AAA" # Get the bounding box of the text (replaces textsize()) bbox = draw.textbbox((0, 0), text) text_width = bbox[2] - bbox[0] text_height = bbox[3] - bbox[1] # Position the text at the center of the image text_x = (font_size - text_width) // 2 text_y = (height - text_height) // 2 text_position = (text_x, text_y) # Add the "@" symbol to the image draw.text(text_position, text, fill="black") # Save the image image_array = np.array(image) return image_array import cv2 import numpy as np def center_pad_image(image, target_size=448): h, w = image.shape[:2] if h > target_size or w > target_size: scale = target_size / max(h, w) new_h, new_w = int(h * scale), int(w * scale) image = cv2.resize(image, (new_w, new_h), interpolation=cv2.INTER_AREA) else: new_h, new_w = h, w pad_h = (target_size - new_h) // 2 pad_w = (target_size - new_w) // 2 padded_image = cv2.copyMakeBorder( image, pad_h, target_size - new_h - pad_h, pad_w, target_size - new_w - pad_w, cv2.BORDER_CONSTANT, value=(255, 255, 255) ) return padded_image def resize_or_pad(image, target_height, target_width=448): h, w = image.shape[:2] if h > target_height: # Resize keeping aspect ratio (height first) scale = target_height / h new_w = int(w * scale) image = cv2.resize(image, (new_w, target_height), interpolation=cv2.INTER_AREA) else: # Pad height to match target_height pad_h = (target_height - h) // 2 image = cv2.copyMakeBorder(image, pad_h, target_height - h - pad_h, 0, 0, cv2.BORDER_CONSTANT, value=(255, 255, 255)) # Adjust width (resize if too wide, pad if too narrow) h, w = image.shape[:2] if w > target_width: image = cv2.resize(image, (target_width, h), interpolation=cv2.INTER_AREA) elif w < target_width: pad_w = (target_width - w) // 2 image = cv2.copyMakeBorder(image, 0, 0, pad_w, target_width - w - pad_w, cv2.BORDER_CONSTANT, value=(255, 255, 255)) return image def stack_images_vertically(stack_size, ocr_buffer, target_size=448): occupied_height = target_size - 30 * (stack_size - 1) height = occupied_height // stack_size stacked_image = np.zeros((0, target_size, 3), dtype=np.uint8) for i in range(stack_size): resized_img = resize_or_pad(ocr_buffer[i],height) if i == stack_size - 1: stacked_image = np.vstack([stacked_image, resized_img]) else: stacked_image = np.vstack([stacked_image, resized_img, resize_or_pad(create_at_image(30),30)]) img = center_pad_image(stacked_image, target_size) # cv2.imwrite(f"img/stacked_image_{str(uuid.uuid4())}.png", img) return img def predict_check(cropped_image, threshold=0.0870): gray_image = np.mean(cropped_image, axis=2) gray_image = cv2.GaussianBlur(gray_image, (5, 5), 0) pixel_density = np.count_nonzero(gray_image < 128) / gray_image.size return "Ticked" if pixel_density > threshold else "NotTicked" def make_batch(ocr_regions, batch_size=6): for i in range(0, len(ocr_regions), batch_size): yield ocr_regions[i:i + batch_size] # def batch_predict_ext(image_batch): # files = [] # for i, img in enumerate(image_batch): # buffer = BytesIO() # img.save(buffer, format="PNG") # file_content = buffer.getvalue() # files.append(("files", (f"image_{i}.png", file_content, "image/png"))) # url = "https://aioverlords-amnil-ocr-test-pali.hf.space/batch_extract_text" # headers = {"accept": "application/json"} # response = requests.post(url, files=files, headers=headers) # if response.status_code == 200: # return response.json() # else: # return {"error": f"Request failed with status code {response.status_code}"} async def batch_predict_ext_async(image_batch,batch_size): files = aiohttp.FormData() # Async form data for i, img in enumerate(image_batch): buffer = BytesIO() img.save(buffer, format="PNG") file_content = buffer.getvalue() files.add_field("files", file_content, filename=f"image_{i}.png", content_type="image/png") print("Files added to form data") url = f"https://aioverlords-amnil-ocr-test-pali.hf.space/batch_extract_text?batch_size={batch_size}" headers = {"accept": "application/json"} try: async with aiohttp.ClientSession() as session: async with session.post(url,data= files, headers=headers) as response: if response.status == 200: print("OCR Success") return await response.json() # ✅ Fully async else: # print(await response.json()) return {"error": f"Request failed with status code {response.status}"} except Exception as e: print("Error: ",e) async def batch_predict_ext_async_vllm(image_batch): files = aiohttp.FormData() # Async form data for i, img in enumerate(image_batch): buffer = BytesIO() img.save(buffer, format="png") file_content = buffer.getvalue() files.add_field("files", file_content, filename=f"image_{i}.png", content_type="image/png") print("Files added to form data") url = f"https://aioverlords-amnil-ocr-test-pali.hf.space/batch_extract_text_vllm" headers = {"accept": "application/json"} try: async with aiohttp.ClientSession() as session: async with session.post(url,data= files, headers=headers) as response: if response.status == 200: print("OCR Success") print(await response.json()) return await response.json() # ✅ Fully async else: # print(await response.json()) return {"error": f"Request failed with status code {response.status}"} except Exception as e: print("Error: ",e) async def batch_ocr_ext_async_vllm(file_name, task_id): try: start_time = datetime.now() check_regions = [] ocr_regions = [] blank_regions = [] j = 0 for k, v in data['_via_img_metadata'].items(): regions = data['_via_img_metadata'][k]['regions'] file = file_name if file.endswith("pdf"): targ_img = np.array(convert_from_path(file)[j].convert("L")) else: targ_img = cv2.imread(file, cv2.IMREAD_GRAYSCALE) # ORB and alignment code remains unchanged for brevity # Used for feature detection and image matching # Possible to optimize? MAX_NUM_FEATURES = 10000 orb = cv2.ORB_create(MAX_NUM_FEATURES) # Load the blank form of j-th page orig_img = np.array(Image.open(f"ocr/VGG Image Annotator_files/mach_bank_form_page{j}.jpg").convert("L")) # Detects keypoints (corner-like features) in orig_img and targ_img. # and computes descriptors, which are binary feature representations for each keypoint. keypoints1, descriptors1 = orb.detectAndCompute(orig_img, None) keypoints2, descriptors2 = orb.detectAndCompute(targ_img, None) # ORB typically works on grayscale images. # Converts images back to BGR for displaying colored keypoints. # just for visualization or any other use-case? img1 = cv2.cvtColor(orig_img, cv2.COLOR_GRAY2BGR) img2 = cv2.cvtColor(targ_img, cv2.COLOR_GRAY2BGR) # Match features. # ORB uses binary descriptors, and Hamming distance counts the number of differing bits. # Faster than Euclidean distance for binary descriptors. matcher = cv2.DescriptorMatcher_create(cv2.DESCRIPTOR_MATCHER_BRUTEFORCE_HAMMING) # match() finds the best match for each descriptor in descriptor1 and descriptor2 # matches stores a list of cv2.DMatch objects where # .queryIdx --> index of the keypoint in orig_img # .trainIdx --> index of matching keypoint in trag_img # .distance --> Hamming distance # Converting to list for sorting as tuples are immutable objects. matches = list(matcher.match(descriptors1, descriptors2, None)) # Sort matches by score # Sorting the matches based on hamming distance matches.sort(key = lambda x: x.distance, reverse = False) # Remove not so good matches numGoodMatches = int(0.1*len(matches)) matches = matches[:numGoodMatches] # matches = matches[:len(matches)//10] # Initialize arrays to store Keypoint locations # float32 used for compatibility with cv2.findHomography() points1 = np.zeros((len(matches), 2), dtype = np.float32) points2 = np.zeros((len(matches), 2), dtype = np.float32) # Extract location of good matches for i, match in enumerate(matches): points1[i, :] = keypoints1[match.queryIdx].pt points2[i, :] = keypoints2[match.trainIdx].pt # Find homography h, mask = cv2.findHomography(points2, points1, cv2.RANSAC) height, width, channels = img1.shape # Warp img2 to align with img1 img2_reg = cv2.warpPerspective(img2, h, (width, height)) # For brevity, assume img2_reg is computed as in original # img2_reg = targ_img # Placeholder; replace with actual aligned image region_data = [] for region in regions: x, y, width, height = ( region['shape_attributes']['x'], region['shape_attributes']['y'], region['shape_attributes']['width'], region['shape_attributes']['height'] ) name = ( f"{region['region_attributes']['parent']}_" f"{region['region_attributes']['key']}_" f"{region['region_attributes'].get('group', '')}" ) name_type = region['region_attributes']['type'] region_data.append({"x": x, "y": y, "width": width, "height": height, "name": name, "type": name_type}) for region in region_data: x, y, width, height = region["x"], region["y"], region["width"], region["height"] cropped_image = img2_reg[y:y+height, x:x+width] if region["type"] == "check": region["page"] = f"page_{j}" check_regions.append((region, cropped_image)) else: cedge = cv2.Canny(cropped_image[7:-7, 7:-7], 100, 200) cex_ = cedge.astype(float).sum(axis=0) / 255 cey_ = cedge.astype(float).sum(axis=1) / 255 cex_ = np.count_nonzero(cex_ > 5) cey_ = np.count_nonzero(cey_ > 5) if cex_ > 7 and cey_ > 7: im = Image.fromarray(center_pad_image(cropped_image)) region["page"] = f"page_{j}" ocr_regions.append((region, im)) else: region["status"] = "not found" region["page"] = f"page_{j}" blank_regions.append(region) j += 1 # Process check regions check_region_data = [] for check_region in check_regions: check_region[0]["status"] = predict_check(check_region[1]) check_region_data.append(check_region[0]) # Process OCR regions region_data = [] # for batch in make_batch(ocr_regions, batch_size): # i = 0 # print(task_id,"_s_",i) print("Retrieving images") ocr_images = [data[1] for data in ocr_regions] print("Images retrieved") print("Sending request vllm") response = await batch_predict_ext_async_vllm(ocr_images) print("Request completed") #print(response) extracted_texts = response["extracted_texts"] print("Text Extracted") for text, region in zip(extracted_texts, ocr_regions): region[0]["status"] = text region_data.append(region[0]) # print(task_id,"_c_",i) # i += 1 print("text appended") # Combine and store results region_data.extend(check_region_data) print("Check region data appended") region_data.extend(blank_regions) print("Blank region data appended") string_data = json.dumps(region_data) end_time = datetime.now() time_elapsed_str = str(end_time - start_time) print(time_elapsed_str) os.remove(file_name) conn = sqlite3.connect('/mnt/data/translations.db') cursor = conn.cursor() cursor.execute( 'UPDATE OCR SET region = ?, time_elapsed = ?, status=?, updated_at = ? WHERE task_id = ?', (string_data, time_elapsed_str, "completed", datetime.now(), task_id) ) conn.commit() conn.close() print("SUCCESSFUL vllm") except Exception as e: print(f"OCR Failed vllm: {e}") try: conn = sqlite3.connect('/mnt/data/translations.db') cursor = conn.cursor() cursor.execute('UPDATE OCR SET status = ? WHERE task_id = ?', ("failed", task_id)) conn.commit() conn.close() except Exception as exec: print(f"Updating vllm status to database failed: {exec}") async def batch_ocr_ext_async(file_name, task_id,batch_size): try: start_time = datetime.now() check_regions = [] ocr_regions = [] blank_regions = [] j = 0 for k, v in data['_via_img_metadata'].items(): regions = data['_via_img_metadata'][k]['regions'] file = file_name if file.endswith("pdf"): targ_img = np.array(convert_from_path(file)[j].convert("L")) else: targ_img = cv2.imread(file, cv2.IMREAD_GRAYSCALE) # ORB and alignment code remains unchanged for brevity # Used for feature detection and image matching # Possible to optimize? MAX_NUM_FEATURES = 10000 orb = cv2.ORB_create(MAX_NUM_FEATURES) # Load the blank form of j-th page orig_img = np.array(Image.open(f"ocr/VGG Image Annotator_files/mach_bank_form_page{j}.jpg").convert("L")) # Detects keypoints (corner-like features) in orig_img and targ_img. # and computes descriptors, which are binary feature representations for each keypoint. keypoints1, descriptors1 = orb.detectAndCompute(orig_img, None) keypoints2, descriptors2 = orb.detectAndCompute(targ_img, None) # ORB typically works on grayscale images. # Converts images back to BGR for displaying colored keypoints. # just for visualization or any other use-case? img1 = cv2.cvtColor(orig_img, cv2.COLOR_GRAY2BGR) img2 = cv2.cvtColor(targ_img, cv2.COLOR_GRAY2BGR) # Match features. # ORB uses binary descriptors, and Hamming distance counts the number of differing bits. # Faster than Euclidean distance for binary descriptors. matcher = cv2.DescriptorMatcher_create(cv2.DESCRIPTOR_MATCHER_BRUTEFORCE_HAMMING) # match() finds the best match for each descriptor in descriptor1 and descriptor2 # matches stores a list of cv2.DMatch objects where # .queryIdx --> index of the keypoint in orig_img # .trainIdx --> index of matching keypoint in trag_img # .distance --> Hamming distance # Converting to list for sorting as tuples are immutable objects. matches = list(matcher.match(descriptors1, descriptors2, None)) # Sort matches by score # Sorting the matches based on hamming distance matches.sort(key = lambda x: x.distance, reverse = False) # Remove not so good matches numGoodMatches = int(0.1*len(matches)) matches = matches[:numGoodMatches] # matches = matches[:len(matches)//10] # Initialize arrays to store Keypoint locations # float32 used for compatibility with cv2.findHomography() points1 = np.zeros((len(matches), 2), dtype = np.float32) points2 = np.zeros((len(matches), 2), dtype = np.float32) # Extract location of good matches for i, match in enumerate(matches): points1[i, :] = keypoints1[match.queryIdx].pt points2[i, :] = keypoints2[match.trainIdx].pt # Find homography h, mask = cv2.findHomography(points2, points1, cv2.RANSAC) height, width, channels = img1.shape # Warp img2 to align with img1 img2_reg = cv2.warpPerspective(img2, h, (width, height)) # For brevity, assume img2_reg is computed as in original # img2_reg = targ_img # Placeholder; replace with actual aligned image region_data = [] for region in regions: x, y, width, height = ( region['shape_attributes']['x'], region['shape_attributes']['y'], region['shape_attributes']['width'], region['shape_attributes']['height'] ) name = ( f"{region['region_attributes']['parent']}_" f"{region['region_attributes']['key']}_" f"{region['region_attributes'].get('group', '')}" ) name_type = region['region_attributes']['type'] region_data.append({"x": x, "y": y, "width": width, "height": height, "name": name, "type": name_type}) for region in region_data: x, y, width, height = region["x"], region["y"], region["width"], region["height"] cropped_image = img2_reg[y:y+height, x:x+width] if region["type"] == "check": region["page"] = f"page_{j}" check_regions.append((region, cropped_image)) else: cedge = cv2.Canny(cropped_image[7:-7, 7:-7], 100, 200) cex_ = cedge.astype(float).sum(axis=0) / 255 cey_ = cedge.astype(float).sum(axis=1) / 255 cex_ = np.count_nonzero(cex_ > 5) cey_ = np.count_nonzero(cey_ > 5) if cex_ > 7 and cey_ > 7: im = Image.fromarray(center_pad_image(cropped_image)) region["page"] = f"page_{j}" ocr_regions.append((region, im)) else: region["status"] = "not found" region["page"] = f"page_{j}" blank_regions.append(region) j += 1 # Process check regions check_region_data = [] for check_region in check_regions: check_region[0]["status"] = predict_check(check_region[1]) check_region_data.append(check_region[0]) # Process OCR regions region_data = [] # for batch in make_batch(ocr_regions, batch_size): # i = 0 # print(task_id,"_s_",i) print("Retrieving images") ocr_images = [data[1] for data in ocr_regions] print("Images retrieved") print("Sending request") response = await batch_predict_ext_async(ocr_images,batch_size) print("Request completed") print(response) extracted_texts = response["extracted_texts"] print("Text Extracted") for text, region in zip(extracted_texts, ocr_regions): region[0]["status"] = text region_data.append(region[0]) # print(task_id,"_c_",i) # i += 1 print("text appended") # Combine and store results region_data.extend(check_region_data) print("Check region data appended") region_data.extend(blank_regions) print("Blank region data appended") string_data = json.dumps(check_region_data) end_time = datetime.now() time_elapsed_str = str(end_time - start_time) print(time_elapsed_str) os.remove(file_name) conn = sqlite3.connect('/mnt/data/translations.db') cursor = conn.cursor() cursor.execute( 'UPDATE OCR SET region = ?, time_elapsed = ?, status=?, updated_at = ? WHERE task_id = ?', (string_data, time_elapsed_str, "completed", datetime.now(), task_id) ) conn.commit() conn.close() print("SUCCESSFUL") except Exception as e: print(f"OCR Failed: {e}") try: conn = sqlite3.connect('/mnt/data/translations.db') cursor = conn.cursor() cursor.execute('UPDATE OCR SET status = ? WHERE task_id = ?', ("failed", task_id)) conn.commit() conn.close() except Exception as exec: print(f"Updating status to database failed: {exec}") async def batch_ocr_ext_async_stack(file_name, task_id,batch_size,stack_size): try: start_time = datetime.now() check_regions = [] ocr_regions = [] blank_regions = [] j = 0 for k, v in data['_via_img_metadata'].items(): regions = data['_via_img_metadata'][k]['regions'] file = file_name if file.endswith("pdf"): targ_img = np.array(convert_from_path(file)[j].convert("L")) else: targ_img = cv2.imread(file, cv2.IMREAD_GRAYSCALE) # ORB and alignment code remains unchanged for brevity # Used for feature detection and image matching # Possible to optimize? MAX_NUM_FEATURES = 10000 orb = cv2.ORB_create(MAX_NUM_FEATURES) # Load the blank form of j-th page orig_img = np.array(Image.open(f"ocr/VGG Image Annotator_files/mach_bank_form_page{j}.jpg").convert("L")) # Detects keypoints (corner-like features) in orig_img and targ_img. # and computes descriptors, which are binary feature representations for each keypoint. keypoints1, descriptors1 = orb.detectAndCompute(orig_img, None) keypoints2, descriptors2 = orb.detectAndCompute(targ_img, None) # ORB typically works on grayscale images. # Converts images back to BGR for displaying colored keypoints. # just for visualization or any other use-case? img1 = cv2.cvtColor(orig_img, cv2.COLOR_GRAY2BGR) img2 = cv2.cvtColor(targ_img, cv2.COLOR_GRAY2BGR) # Match features. # ORB uses binary descriptors, and Hamming distance counts the number of differing bits. # Faster than Euclidean distance for binary descriptors. matcher = cv2.DescriptorMatcher_create(cv2.DESCRIPTOR_MATCHER_BRUTEFORCE_HAMMING) # match() finds the best match for each descriptor in descriptor1 and descriptor2 # matches stores a list of cv2.DMatch objects where # .queryIdx --> index of the keypoint in orig_img # .trainIdx --> index of matching keypoint in trag_img # .distance --> Hamming distance # Converting to list for sorting as tuples are immutable objects. matches = list(matcher.match(descriptors1, descriptors2, None)) # Sort matches by score # Sorting the matches based on hamming distance matches.sort(key = lambda x: x.distance, reverse = False) # Remove not so good matches numGoodMatches = int(0.1*len(matches)) matches = matches[:numGoodMatches] # matches = matches[:len(matches)//10] # Initialize arrays to store Keypoint locations # float32 used for compatibility with cv2.findHomography() points1 = np.zeros((len(matches), 2), dtype = np.float32) points2 = np.zeros((len(matches), 2), dtype = np.float32) # Extract location of good matches for i, match in enumerate(matches): points1[i, :] = keypoints1[match.queryIdx].pt points2[i, :] = keypoints2[match.trainIdx].pt # Find homography h, mask = cv2.findHomography(points2, points1, cv2.RANSAC) height, width, channels = img1.shape # Warp img2 to align with img1 img2_reg = cv2.warpPerspective(img2, h, (width, height)) # For brevity, assume img2_reg is computed as in original # img2_reg = targ_img # Placeholder; replace with actual aligned image region_data = [] for region in regions: x, y, width, height = ( region['shape_attributes']['x'], region['shape_attributes']['y'], region['shape_attributes']['width'], region['shape_attributes']['height'] ) name = ( f"{region['region_attributes']['parent']}_" f"{region['region_attributes']['key']}_" f"{region['region_attributes'].get('group', '')}" ) name_type = region['region_attributes']['type'] region_data.append({"x": x, "y": y, "width": width, "height": height, "name": name, "type": name_type}) ocr_buffer=[] buffer_metadata = [] for region in region_data: x, y, width, height = region["x"], region["y"], region["width"], region["height"] cropped_image = img2_reg[y:y+height, x:x+width] if region["type"] == "check": region["page"] = f"page_{j}" check_regions.append((region, cropped_image)) else: cedge = cv2.Canny(cropped_image[7:-7, 7:-7], 100, 200) cex_ = cedge.astype(float).sum(axis=0) / 255 cey_ = cedge.astype(float).sum(axis=1) / 255 cex_ = np.count_nonzero(cex_ > 5) cey_ = np.count_nonzero(cey_ > 5) if cex_ > 7 and cey_ > 7: ocr_buffer.append(cropped_image) region["page"] = f"page_{j}" buffer_metadata.append(region) # stack_images_vertically # im = Image.fromarray(center_pad_image(cropped_image)) if len(ocr_buffer) == stack_size: img = Image.fromarray(stack_images_vertically(stack_size,ocr_buffer)) ocr_regions.append((buffer_metadata, img)) ocr_buffer = [] buffer_metadata = [] else: region["status"] = "not found" region["page"] = f"page_{j}" blank_regions.append(region) j += 1 # Insert any remaining images if len(ocr_buffer) > 0: img = Image.fromarray(stack_images_vertically(len(ocr_buffer),ocr_buffer)) ocr_regions.append((buffer_metadata, img)) # Process check regions check_region_data = [] for check_region in check_regions: check_region[0]["status"] = predict_check(check_region[1]) check_region_data.append(check_region[0]) # Process OCR regions region_data = [] # for batch in make_batch(ocr_regions, batch_size): # i = 0 # print(task_id,"_s_",i) print("Retrieving images") ocr_images = [data[1] for data in ocr_regions] print("Images retrieved") print("Sending request") response = await batch_predict_ext_async(ocr_images,batch_size) print("Request completed") print(response) extracted_texts = response["extracted_texts"] print("Text Extracted") for text, region in zip(extracted_texts, ocr_regions): splitted_text = text.split("AAA") for single_region,single_text in zip(region[0],splitted_text): single_region["status"] = single_text region_data.append(single_region) print("text appended") # print(region_data) # Combine and store results region_data.extend(check_region_data) print("Check region data appended") region_data.extend(blank_regions) print("Blank region data appended") string_data = json.dumps(region_data) end_time = datetime.now() time_elapsed_str = str(end_time - start_time) print(time_elapsed_str) os.remove(file_name) conn = sqlite3.connect('/mnt/data/translations.db') cursor = conn.cursor() cursor.execute( 'UPDATE OCR SET region = ?, time_elapsed = ?, status=?, updated_at = ? WHERE task_id = ?', (string_data, time_elapsed_str, "completed", datetime.now(), task_id) ) conn.commit() conn.close() print("SUCCESSFUL") except Exception as e: print(f"OCR Failed: {e}") try: conn = sqlite3.connect('/mnt/data/translations.db') # For local cursor = conn.cursor() cursor.execute('UPDATE OCR SET status = ? WHERE task_id = ?', ("failed", task_id)) conn.commit() conn.close() except Exception as exec: print(f"Updating status to database failed: {exec}") def batch_ocr_ext(file_name, task_id, batch_size): try: start_time = datetime.now() check_regions = [] ocr_regions = [] blank_regions = [] j = 0 for k, v in data['_via_img_metadata'].items(): regions = data['_via_img_metadata'][k]['regions'] file = file_name if file.endswith("pdf"): targ_img = np.array(convert_from_path(file)[j].convert("L")) else: targ_img = cv2.imread(file, cv2.IMREAD_GRAYSCALE) # ORB and alignment code remains unchanged for brevity # Used for feature detection and image matching # Possible to optimize? MAX_NUM_FEATURES = 10000 orb = cv2.ORB_create(MAX_NUM_FEATURES) # Load the blank form of j-th page orig_img = np.array(Image.open(f"ocr/VGG Image Annotator_files/mach_bank_form_page{j}.jpg").convert("L")) # Detects keypoints (corner-like features) in orig_img and targ_img. # and computes descriptors, which are binary feature representations for each keypoint. keypoints1, descriptors1 = orb.detectAndCompute(orig_img, None) keypoints2, descriptors2 = orb.detectAndCompute(targ_img, None) # ORB typically works on grayscale images. # Converts images back to BGR for displaying colored keypoints. # just for visualization or any other use-case? img1 = cv2.cvtColor(orig_img, cv2.COLOR_GRAY2BGR) img2 = cv2.cvtColor(targ_img, cv2.COLOR_GRAY2BGR) # Match features. # ORB uses binary descriptors, and Hamming distance counts the number of differing bits. # Faster than Euclidean distance for binary descriptors. matcher = cv2.DescriptorMatcher_create(cv2.DESCRIPTOR_MATCHER_BRUTEFORCE_HAMMING) # match() finds the best match for each descriptor in descriptor1 and descriptor2 # matches stores a list of cv2.DMatch objects where # .queryIdx --> index of the keypoint in orig_img # .trainIdx --> index of matching keypoint in trag_img # .distance --> Hamming distance # Converting to list for sorting as tuples are immutable objects. matches = list(matcher.match(descriptors1, descriptors2, None)) # Sort matches by score # Sorting the matches based on hamming distance matches.sort(key = lambda x: x.distance, reverse = False) # Remove not so good matches numGoodMatches = int(0.1*len(matches)) matches = matches[:numGoodMatches] # matches = matches[:len(matches)//10] # Initialize arrays to store Keypoint locations # float32 used for compatibility with cv2.findHomography() points1 = np.zeros((len(matches), 2), dtype = np.float32) points2 = np.zeros((len(matches), 2), dtype = np.float32) # Extract location of good matches for i, match in enumerate(matches): points1[i, :] = keypoints1[match.queryIdx].pt points2[i, :] = keypoints2[match.trainIdx].pt # Find homography h, mask = cv2.findHomography(points2, points1, cv2.RANSAC) height, width, channels = img1.shape # Warp img2 to align with img1 img2_reg = cv2.warpPerspective(img2, h, (width, height)) # For brevity, assume img2_reg is computed as in original # img2_reg = targ_img # Placeholder; replace with actual aligned image region_data = [] for region in regions: x, y, width, height = ( region['shape_attributes']['x'], region['shape_attributes']['y'], region['shape_attributes']['width'], region['shape_attributes']['height'] ) name = ( f"{region['region_attributes']['parent']}_" f"{region['region_attributes']['key']}_" f"{region['region_attributes'].get('group', '')}" ) name_type = region['region_attributes']['type'] region_data.append({"x": x, "y": y, "width": width, "height": height, "name": name, "type": name_type}) for region in region_data: x, y, width, height = region["x"], region["y"], region["width"], region["height"] cropped_image = img2_reg[y:y+height, x:x+width] if region["type"] == "check": region["page"] = f"page_{j}" check_regions.append((region, cropped_image)) else: cedge = cv2.Canny(cropped_image[7:-7, 7:-7], 100, 200) cex_ = cedge.astype(float).sum(axis=0) / 255 cey_ = cedge.astype(float).sum(axis=1) / 255 cex_ = np.count_nonzero(cex_ > 5) cey_ = np.count_nonzero(cey_ > 5) if cex_ > 7 and cey_ > 7: im = Image.fromarray(center_pad_image(cropped_image)) region["page"] = f"page_{j}" ocr_regions.append((region, im)) else: region["status"] = "not found" region["page"] = f"page_{j}" blank_regions.append(region) j += 1 # Process check regions check_region_data = [] for check_region in check_regions: check_region[0]["status"] = predict_check(check_region[1]) check_region_data.append(check_region[0]) # Process OCR regions region_data = [] for batch in make_batch(ocr_regions, batch_size): i = 0 print(task_id,"_s_",i) images = [data[1] for data in batch] response = batch_predict_ext(images) extracted_texts = response["extracted_texts"] for text, region in zip(extracted_texts, batch): region[0]["status"] = text region_data.append(region[0]) print(task_id,"_c_",i) i += 1 # Combine and store results region_data.extend(check_region_data) region_data.extend(blank_regions) string_data = json.dumps(region_data) end_time = datetime.now() time_elapsed_str = str(end_time - start_time) print(time_elapsed_str) os.remove(file_name) conn = sqlite3.connect('/mnt/data/translations.db') # For local cursor = conn.cursor() cursor.execute( 'UPDATE OCR SET region = ?, time_elapsed = ?, status=?, updated_at = ? WHERE task_id = ?', (string_data, time_elapsed_str, "completed", datetime.now(), task_id) ) conn.commit() conn.close() print("SUCCESSFUL") except Exception as e: print(f"OCR Failed: {e}") try: conn = sqlite3.connect('/mnt/data/translations.db') cursor = conn.cursor() cursor.execute('UPDATE OCR SET status = ? WHERE task_id = ?', ("failed", task_id)) conn.commit() conn.close() except Exception as exec: print(f"Updating status to database failed: {exec}") # Example call # batch_ocr_ext("example.pdf", "task123", 10, data)