ReceiptSplitAI / process_images_gemini_multithread.py
valentynliubchenko
merging
eba303d
import os
import base64
import json
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
from utils import encode_image_to_jpeg_base64
# Import your VertexAIService class
from vertex_ai_service import VertexAIService # Replace with the module name where VertexAIService is defined
def process_image_file(file_path, client):
"""
Processes an image file and returns the path to the JSON file with results.
:param file_path: Path to the image file
:param client: Instance of VertexAIService
:return: Path to the saved JSON file
"""
input_image64 = encode_image_to_jpeg_base64(file_path)
# Processing settings
prompt = "Read the text"
system = "You are receipt recognizer"
# Call image processing
result_img = client.process_image(input_image64, "gemini-1.5-pro", prompt, system, 0.0)
# Create path for JSON file
json_file_path = os.path.splitext(file_path)[0] + ".json"
# Write the result to a JSON file
with open(json_file_path, 'w', encoding='utf-8') as json_file:
json_file.write(result_img)
return json_file_path
def process_folder_images(folder_path, json_key_path, project_id):
"""
Processes all images in the specified folder concurrently.
:param folder_path: Path to the folder containing images
:param json_key_path: Path to the JSON key for Vertex AI authentication
"""
# Initialize Vertex AI client
client = VertexAIService(json_key_path=json_key_path, project=project_id)
# Get a list of all images in the folder
image_files = [os.path.join(folder_path, f) for f in os.listdir(folder_path) if f.endswith(('.jpg', '.jpeg', '.png','webp'))]
# Use ThreadPoolExecutor for parallel processing
with ThreadPoolExecutor() as executor:
# future_to_file = {executor.submit(process_image_file, file_path, client): file_path for file_path in image_files}
future_to_file = {executor.submit(process_image_file_with_retry, file_path, client, 5): file_path for file_path in image_files}
for future in as_completed(future_to_file):
file_path = future_to_file[future]
try:
json_file_path = future.result()
print(f"Processed: {file_path}, result saved to: {json_file_path}")
except Exception as exc:
print(f"Failed to process file {file_path}. Error: {exc}")
import random
def process_image_file_with_retry(file_path, client, max_retries=5):
"""
Processes an image file with retry logic and returns the path to the JSON file with results.
:param file_path: Path to the image file
:param client: Instance of VertexAIService
:param max_retries: Maximum number of retries for handling quota errors
:return: Path to the saved JSON file
"""
retries = 0
while retries < max_retries:
try:
json_file_path = process_image_file(file_path, client)
return json_file_path
except Exception as exc:
if "429" in str(exc):
retries += 1
wait_time = 2 ** retries + random.uniform(0, 1)
print(f"Quota exceeded for {file_path}. Retrying in {wait_time:.2f} seconds... (Attempt {retries}/{max_retries})")
time.sleep(wait_time)
else:
raise exc
raise Exception(f"Max retries exceeded for file {file_path}")
# Call the function to process the folder
if __name__ == '__main__':
folder_path = './examples_sl' # Set the path to your folder
project_id = 'igneous-spanner-441609-h6'
json_key_path = 'secrets/GOOGLE_VERTEX_AI_KEY_SYTOSS-441609.json' # Set the path to your JSON key
start_time = time.time()
process_folder_images(folder_path, json_key_path, project_id)
end_time = time.time()
elapsed_time = end_time - start_time
print(f"Execution time: {elapsed_time:.2f} seconds")