import os import base64 import json import time from concurrent.futures import ThreadPoolExecutor, as_completed from utils import encode_image_to_jpeg_base64 # Import your VertexAIService class from vertex_ai_service import VertexAIService # Replace with the module name where VertexAIService is defined def process_image_file(file_path, client): """ Processes an image file and returns the path to the JSON file with results. :param file_path: Path to the image file :param client: Instance of VertexAIService :return: Path to the saved JSON file """ input_image64 = encode_image_to_jpeg_base64(file_path) # Processing settings prompt = "Read the text" system = "You are receipt recognizer" # Call image processing result_img = client.process_image(input_image64, "gemini-1.5-pro", prompt, system, 0.0) # Create path for JSON file json_file_path = os.path.splitext(file_path)[0] + ".json" # Write the result to a JSON file with open(json_file_path, 'w', encoding='utf-8') as json_file: json_file.write(result_img) return json_file_path def process_folder_images(folder_path, json_key_path, project_id): """ Processes all images in the specified folder concurrently. :param folder_path: Path to the folder containing images :param json_key_path: Path to the JSON key for Vertex AI authentication """ # Initialize Vertex AI client client = VertexAIService(json_key_path=json_key_path, project=project_id) # Get a list of all images in the folder image_files = [os.path.join(folder_path, f) for f in os.listdir(folder_path) if f.endswith(('.jpg', '.jpeg', '.png','webp'))] # Use ThreadPoolExecutor for parallel processing with ThreadPoolExecutor() as executor: # future_to_file = {executor.submit(process_image_file, file_path, client): file_path for file_path in image_files} future_to_file = {executor.submit(process_image_file_with_retry, file_path, client, 5): file_path for file_path in image_files} for future in as_completed(future_to_file): file_path = future_to_file[future] try: json_file_path = future.result() print(f"Processed: {file_path}, result saved to: {json_file_path}") except Exception as exc: print(f"Failed to process file {file_path}. Error: {exc}") import random def process_image_file_with_retry(file_path, client, max_retries=5): """ Processes an image file with retry logic and returns the path to the JSON file with results. :param file_path: Path to the image file :param client: Instance of VertexAIService :param max_retries: Maximum number of retries for handling quota errors :return: Path to the saved JSON file """ retries = 0 while retries < max_retries: try: json_file_path = process_image_file(file_path, client) return json_file_path except Exception as exc: if "429" in str(exc): retries += 1 wait_time = 2 ** retries + random.uniform(0, 1) print(f"Quota exceeded for {file_path}. Retrying in {wait_time:.2f} seconds... (Attempt {retries}/{max_retries})") time.sleep(wait_time) else: raise exc raise Exception(f"Max retries exceeded for file {file_path}") # Call the function to process the folder if __name__ == '__main__': folder_path = './examples_sl' # Set the path to your folder project_id = 'igneous-spanner-441609-h6' json_key_path = 'secrets/GOOGLE_VERTEX_AI_KEY_SYTOSS-441609.json' # Set the path to your JSON key start_time = time.time() process_folder_images(folder_path, json_key_path, project_id) end_time = time.time() elapsed_time = end_time - start_time print(f"Execution time: {elapsed_time:.2f} seconds")