Spaces:
Sleeping
Sleeping
| import json | |
| import os | |
| import time | |
| from loguru import logger | |
| from openai import OpenAI | |
| from common.enum.ai_service_error import AiServiceError | |
| from common.exceptions import AiServiceException | |
| from common.utils import encode_image_to_webp_base64 | |
| from image_processing_interface import ImageProcessingInterface | |
| class OpenAIService(ImageProcessingInterface): | |
| _instance = None | |
| TOOLS = [ | |
| { | |
| "type": "function", | |
| "function": { | |
| "name": "parse_image", | |
| "description": "Parses receipt data from image into a structured JSON format.", | |
| "parameters": { | |
| "type": "object", | |
| "properties": { | |
| "store_name": {"type": "string"}, | |
| "country": {"type": "string"}, | |
| "receipt_type": {"type": "string"}, | |
| "address": {"type": "string"}, | |
| "datetime": {"type": "string"}, | |
| "currency": {"type": "string"}, | |
| "sub_total_amount": {"type": "number"}, | |
| "total_price": {"type": "number"}, | |
| "total_discount": {"type": "number"}, | |
| "all_items_price_with_tax": {"type": "boolean"}, | |
| "payment_method": { | |
| "type": "string", | |
| "enum": ["card", "cash", "unknown"] | |
| }, | |
| "rounding": {"type": "number"}, | |
| "tax": {"type": "number"}, | |
| "taxes_not_included_sum": {"type": "number"}, | |
| "tips": {"type": "number"}, | |
| "items": { | |
| "type": "array", | |
| "items": { | |
| "type": "object", | |
| "properties": { | |
| "name": {"type": "string"}, | |
| "unit_price": {"type": "number"}, | |
| "quantity": {"type": "number"}, | |
| "measurement_unit": {"type": "string"}, | |
| "total_price_without_discount": {"type": "number"}, | |
| "total_price_with_discount": {"type": "number"}, | |
| "discount": {"type": "number"}, | |
| "category": {"type": "string"}, | |
| "item_price_with_tax": {"type": "boolean"}, | |
| }, | |
| "required": ["name", "unit_price", "quantity", "total_price_without_discount"] | |
| } | |
| }, | |
| "taxs_items": { | |
| "type": "array", | |
| "items": { | |
| "type": "object", | |
| "properties": { | |
| "tax_name": {"type": "string"}, | |
| "percentage": {"type": "number"}, | |
| "tax_from_amount": {"type": "number"}, | |
| "tax": {"type": "number"}, | |
| "total": {"type": "number"}, | |
| "tax_included": {"type": "boolean"}, | |
| }, | |
| } | |
| } | |
| }, | |
| "required": ["total_price", "items"] | |
| } | |
| } | |
| } | |
| ] | |
| def __new__(cls, *args, **kwargs): | |
| if cls._instance is None: | |
| cls._instance = super(OpenAIService, cls).__new__(cls) | |
| return cls._instance | |
| def __init__(self, api_key=None): | |
| if not hasattr(self, "_initialized"): | |
| self.api_key = api_key or os.environ.get("OPENAI_API_KEY") | |
| if self.api_key: | |
| logger.info("OPENAI_API_KEY was found.") | |
| else: | |
| raise ValueError("OPENAI_API_KEY not found.") | |
| self.client = OpenAI(api_key=self.api_key) | |
| self._initialized = True | |
| def process_image(self, input_image64, model_name, prompt, system="You are a receipt recognizer!", temperature=0.0): | |
| if not input_image64: | |
| raise ValueError("No image provided.") | |
| try: | |
| start_time = time.time() | |
| response = self.client.chat.completions.create( | |
| model=model_name, | |
| messages=[ | |
| {"role": "system", "content": system}, | |
| {"role": "user", "content": [ | |
| {"type": "text", "text": prompt}, | |
| {"type": "image_url", "image_url": { | |
| "url": f"data:image/webp;base64,{input_image64}"} | |
| } | |
| ]} | |
| ], | |
| tools=self.TOOLS, | |
| temperature=temperature, | |
| response_format={"type": "json_object"} | |
| ) | |
| end_time = time.time() | |
| logger.info(f"Recognition spent {end_time - start_time:.2f} seconds.") | |
| # Extract the function call result | |
| if not response.choices: | |
| raise ValueError("The API response does not contain valid choices.") | |
| choice = response.choices[0] | |
| if not choice.message.tool_calls: | |
| raise ValueError(choice.message.content or "No tool calls found in the API response.") | |
| function_call = choice.message.tool_calls[0] | |
| if not (function_call.function and function_call.function.arguments): | |
| raise ValueError("No valid function call data found in the API response.") | |
| logger.debug(f"Raw API Response: {function_call.function.arguments}") | |
| try: | |
| json_content = json.loads(function_call.function.arguments) | |
| except json.JSONDecodeError: | |
| error_message = f"The receipt could not be recognized. Please retake the photo." | |
| logger.error(error_message) | |
| raise AiServiceException(AiServiceError.RETAKE_PHOTO, error_message) | |
| if not self._validate_receipt_data(json_content): | |
| error_message = f"The receipt is empty or contains no valid items. Please ensure the receipt is correctly scanned and try again" | |
| logger.error(error_message) | |
| raise AiServiceException(AiServiceError.RETAKE_PHOTO, error_message) | |
| json_content = self._add_total_price(json_content) | |
| json_content = self._add_discount_item(json_content) | |
| json_content = self._add_rounding_item(json_content) | |
| # Add token usage information | |
| json_content['input_tokens'] = response.usage.prompt_tokens | |
| json_content['output_tokens'] = response.usage.completion_tokens | |
| json_content['total_tokens'] = response.usage.total_tokens | |
| json_content['time'] = end_time - start_time | |
| model_input = { | |
| "system": system, | |
| "prompt": prompt | |
| } | |
| return json.dumps(json_content, indent=4), model_input | |
| except Exception as e: | |
| raise RuntimeError(f"Failed to process the image: {str(e)}") | |
| if __name__ == "__main__": | |
| try: | |
| processor = OpenAIService() | |
| # Image processing | |
| image_path = "./examples/fatlouis.webp" | |
| input_image64 = encode_image_to_webp_base64(image_path) | |
| system = "You are a receipt recognizer." | |
| with open('common/prompt_v1.txt', 'r', encoding='utf-8') as file: | |
| prompt = file.read() | |
| result = processor.process_image(input_image64, "gpt-4o-mini", prompt, system, 0.0) | |
| print(f'Image processing result: {result}') | |
| except Exception as e: | |
| print(f"Error: {e}") | |