Spaces:
Running
Running
| import base64 | |
| import json | |
| import logging | |
| from io import BytesIO | |
| import numpy as np | |
| import pandas as pd | |
| from PIL import Image, ExifTags | |
| from openpyxl import load_workbook | |
| from openpyxl.drawing.image import Image as XLSImage | |
| from openpyxl.styles import Font, Alignment, Border, Side | |
| from algorithm import receipt_calculation | |
| def read_prompt_from_file(file_path): | |
| with open(file_path, 'r', encoding='utf-8') as file: | |
| return file.read() | |
| def encode_image(image_path): | |
| with open(image_path, "rb") as image_file: | |
| return base64.b64encode(image_file.read()).decode('utf-8') | |
| def encode_image_from_gradio(input_image): | |
| if input_image is None: | |
| return None | |
| pil_image = Image.fromarray(np.uint8(input_image)) | |
| buffered = BytesIO() | |
| pil_image.save(buffered, format="JPEG") | |
| base64_image = base64.b64encode(buffered.getvalue()).decode('utf-8') | |
| return base64_image | |
| def encode_image(image_path): | |
| with open(image_path, "rb") as image_file: | |
| return base64.b64encode(image_file.read()).decode('utf-8') | |
| def encode_image_to_jpeg_base64(filepath): | |
| """ | |
| Encodes an image file to JPEG and then to a Base64 string. | |
| :param filepath: Path to the image file | |
| :return: Base64 encoded string of the JPEG image or an error message | |
| """ | |
| if filepath is None: | |
| return None, "File path is None." | |
| try: | |
| pil_image = Image.open(filepath) | |
| if pil_image.mode == 'RGBA': | |
| pil_image = pil_image.convert('RGB') | |
| buffered = BytesIO() | |
| pil_image.save(buffered, format="JPEG") | |
| base64_image = base64.b64encode(buffered.getvalue()).decode('utf-8') | |
| return base64_image | |
| except Exception as e: | |
| return None, str(e) | |
| def encode_image_to_webp_base64(filepath): | |
| """ | |
| Encodes an image file to JPEG and then to a Base64 string. | |
| :param filepath: Path to the image file | |
| :return: Base64 encoded string of the JPEG image or an error message | |
| """ | |
| if filepath is None: | |
| return None, "File path is None." | |
| try: | |
| pil_image = Image.open(filepath) | |
| if pil_image.mode == 'RGBA': | |
| pil_image = pil_image.convert('RGB') | |
| buffered = BytesIO() | |
| pil_image.save(buffered, format="WEBP") | |
| base64_image = base64.b64encode(buffered.getvalue()).decode('utf-8') | |
| return base64_image | |
| except Exception as e: | |
| return None, str(e) | |
| def process_receipt_json(json_input): | |
| try: | |
| # Parse the JSON string | |
| data = json.loads(json_input) | |
| except json.JSONDecodeError: | |
| return "Error: Invalid JSON format", None, None, None | |
| # Initialize the output information | |
| store_info = "" | |
| items_table = [] | |
| # Try to extract store information, if available | |
| file_name = data.get("file_name", "Not specified") | |
| receipt_type = data.get("receipt_type", "Not specified") | |
| store_name = data.get("store_name", "Not specified") | |
| country = data.get("country", "Not specified") | |
| store_address = data.get("store_address", "Not specified") | |
| currency = data.get("currency", "Not specified") | |
| date_time = data.get("date_time", "Not specified") | |
| type = data.get("payment_method", "Not specified") | |
| sub_total_amount = data.get("sub_total_amount", "Not specified") | |
| total_amount = data.get("total_amount", "Not specified") | |
| discount = data.get("total_discount", "Not specified") | |
| all_items_price_with_tax = data.get("all_items_price_with_tax", "Not specified") | |
| taxes_included_sum = data.get("tax", "Not specified") | |
| taxes_not_included_sum = data.get("taxes_not_included_sum", "Not specified") | |
| tips = data.get("tips", "Not specified") | |
| rounding = data.get("rounding", "Not specified") | |
| photo_quality = data.get("photo_quality", "Not specified") | |
| # Format store information | |
| store_info = \ | |
| ( | |
| f"\nFile name: {file_name}" | |
| f"\nReceipt type: {receipt_type}" | |
| f"\nStore: {store_name}" | |
| f"\nCountry: {country}" | |
| f"\nAddress: {store_address}" | |
| f"\nCurrency: {currency}" | |
| f"\nSubTotal Amount: {sub_total_amount}" | |
| f"\nTotal Amount: {total_amount}" | |
| f"\nDate time: {date_time}" | |
| f"\nPayment method: {type}" | |
| f"\nTotal discount: {discount}" | |
| f"\nALL items price with tax: {all_items_price_with_tax}" | |
| f"\nTaxes included (sum): {taxes_included_sum}" | |
| f"\nTaxes not included (sum): {taxes_not_included_sum}" | |
| f"\nTips: {tips}" | |
| f"\nRounding: {rounding}" | |
| f"\nPhoto quality: {photo_quality}" | |
| ) | |
| # Extract items, if available in JSON | |
| items = data.get("items", []) | |
| if items: | |
| items_table = [[item.get("name", "Not specified"), | |
| item.get("category", "Not specified"), | |
| item.get("unit_price", "Not specified"), | |
| item.get("quantity", "Not specified"), | |
| item.get("unit_of_measurement", "Not specified"), | |
| item.get("total_price", "Not specified"), | |
| item.get("discount", "Not specified"), | |
| item.get("item_price_with_tax", "Not specified"),] | |
| for item in items] | |
| else: | |
| items_table = [] | |
| taxs_table = [] | |
| return store_info, items_table, taxs_table, "No items" | |
| taxs_items = data.get("taxs_items", []) | |
| taxs_table = [] | |
| if taxs_items: | |
| taxs_table = [[tax.get("tax_name", "Not specified"), | |
| tax.get("percentage", "Not specified"), | |
| tax.get("tax_from_amount", "Not specified"), | |
| tax.get("tax", "Not specified"), | |
| tax.get("total", "Not specified"), | |
| tax.get("tax_included", "Not specified")] | |
| for tax in taxs_items] | |
| try: | |
| total_product_prices, total_sum = receipt_calculation.calculate_tips_and_taxes(items_table, total_amount, taxes_included_sum, | |
| tips) | |
| message = create_message(total_sum, total_amount, items_table) | |
| except Exception as e: | |
| logging.exception(e) | |
| message = "Something went wrong" | |
| if items_table[0][0] != "No items": | |
| for item in items_table: | |
| item.append(0.0) | |
| if items_table[0][0] != "No items" and message != "Something went wrong": | |
| for idx, item in enumerate(items_table): | |
| item.append(total_product_prices[idx].price) | |
| return store_info, items_table, taxs_table, message | |
| def clean_and_convert_to_float(value, return_value): | |
| return return_value if validate_is_unknown(value) or value is None else clean_value(value) | |
| def validate_is_unknown(value): | |
| return value == "unknown" or value == "not available" or value == "Not specified" | |
| def clean_value(value): | |
| clean_price = ''.join(c for c in str(value) if c.isdigit() or c in ",.") | |
| return float(clean_price.replace(",", ".")) | |
| def create_message(total_sum, total_amount, table): | |
| message = "" | |
| if total_amount in ["Not specified", "unknown", "not available"] or total_amount is None or round( | |
| float(str(total_sum).replace(",", ".")), 2) != round( | |
| float(str(total_amount).replace(",", ".")), 2): | |
| message = message + ( | |
| f"The recognized total sum and product total sum are not equal. Check if the AI model correctly created a JSON.\n" | |
| f"Recognized total sum: {total_amount},\n" | |
| f"Calculated total sum: {total_sum}.\n" | |
| ) | |
| if table[0][0] == "No items": | |
| message = message + "Receipt hasn't items!\n" | |
| else: | |
| for i in range(len(table)): | |
| price = clean_and_convert_to_float(table[i][2], -1.111) | |
| amount = clean_and_convert_to_float(table[i][3], 1) | |
| total_price = clean_value(table[i][5]) | |
| total_price_calculated = round(0 if price == -1.111 else price * amount, 2) | |
| discount = 0 if validate_is_unknown or table[i][6] is None else clean_value(table[i][6]) | |
| total_price_calculated = round(total_price_calculated - discount, 2) | |
| if validate_is_unknown(table[i][2]): | |
| message = message + f" {table[i][0]} have {table[i][2]} price! Please retry!\n" | |
| elif total_price != total_price_calculated: | |
| message = message + f"{table[i][0]} has incorrect 'Total Price'. Recognized total: {total_price} VS Calculated total: {total_price_calculated}. \n" | |
| return "Everything is okay!" if message == "" else message | |
| def save_to_excel(json_output, excel_file_path, image_file_path): | |
| store_info, items_table, taxs_table, _ = process_receipt_json(json_output) | |
| if isinstance(store_info, str) and store_info.startswith("Error:"): | |
| return store_info | |
| store_info_lines = store_info.split('\n') | |
| store_info_parsed = [(line.split(': ')[0], line.split(': ')[1]) for line in store_info_lines if ': ' in line] | |
| store_info_df = pd.DataFrame(store_info_parsed, columns=["Label", "Value"]) | |
| items_headers = [ | |
| "Item Name", "Item category", "Unit Price", "Quantity", "Unit of measurement", "Item Price", "Discount", | |
| "Item price with tax \n(якщо в чеку є item без включених податків - необхідно позначити як FALSE)", "Calculated Total" | |
| ] | |
| if not items_table: | |
| print("Warning: items_table is empty. No items to save.") | |
| items_df = pd.DataFrame(columns=items_headers) | |
| else: | |
| items_df = pd.DataFrame(items_table, columns=items_headers) | |
| items_headers.remove("Calculated Total") | |
| items_df = items_df.drop(columns=["Calculated Total"]) | |
| workbook = load_workbook('./template/template_receipt.xlsx') | |
| worksheet = workbook['Receipt'] | |
| for index, row in enumerate(store_info_df.itertuples(index=False), start=5): # Start from row 1 | |
| for col_index, value in enumerate(row, start=1): | |
| worksheet.cell(row=index, column=col_index, value=value) | |
| top_table = 5 | |
| items_header_cell = worksheet.cell(row=len(store_info_df) + top_table + 1, column=1, value="Items") | |
| items_header_cell.font = Font(bold=True, size=12) | |
| items_df_start_row = len(store_info_df) + 2 + top_table | |
| thin_border = Border( | |
| left=Side(style='thin'), | |
| right=Side(style='thin'), | |
| top=Side(style='thin'), | |
| bottom=Side(style='thin') | |
| ) | |
| for col_index, header in enumerate(items_headers, start=1): | |
| cell = worksheet.cell(row=items_df_start_row, column=col_index, value=header) | |
| cell.font = Font(size=14, bold=True) | |
| cell.alignment = Alignment(wrap_text=True, horizontal='center') | |
| cell.border = thin_border | |
| for index, row in enumerate(items_df.itertuples(index=False), start=items_df_start_row + 1): | |
| for col_index, value in enumerate(row, start=1): | |
| cell = worksheet.cell(row=index, column=col_index, value=value) | |
| cell.border = thin_border | |
| # worksheet.column_dimensions['A'].width = 30 | |
| taxs_headers = [ | |
| "Tax name", "%", "tax from amount", "tax", "total", "tax included" | |
| ] | |
| if not taxs_table: | |
| print("Warning: items_table is empty. No items to save.") | |
| taxs_df = pd.DataFrame(columns=taxs_headers) | |
| else: | |
| taxs_df = pd.DataFrame(taxs_table, columns=taxs_headers) | |
| taxs_df_start_row = items_df_start_row + len(items_df) + 2 | |
| items_header_cell = worksheet.cell(row=taxs_df_start_row, column=1, value="Tax list") | |
| items_header_cell.font = Font(bold=True, size=12) | |
| for col_index, header in enumerate(taxs_headers, start=1): | |
| cell = worksheet.cell(row=taxs_df_start_row+1, column=col_index, value=header) | |
| cell.font = Font(size=14, bold=True) | |
| cell.alignment = Alignment(wrap_text=True, horizontal='center') | |
| cell.border = thin_border | |
| for index, row in enumerate(taxs_df.itertuples(index=False), start=taxs_df_start_row + 2): | |
| for col_index, value in enumerate(row, start=1): | |
| cell = worksheet.cell(row=index, column=col_index, value=value) | |
| cell.border = thin_border | |
| try: | |
| with Image.open(image_file_path) as img: | |
| try: | |
| exif = img._getexif() | |
| orientation_tag = next((tag for tag, value in ExifTags.TAGS.items() if value == 'Orientation'), None) | |
| if orientation_tag is not None and exif is not None: | |
| orientation = exif.get(orientation_tag, 1) | |
| if orientation == 3: | |
| img = img.rotate(180, expand=True) | |
| elif orientation == 6: | |
| img = img.rotate(270, expand=True) | |
| elif orientation == 8: | |
| img = img.rotate(90, expand=True) | |
| except Exception as e: | |
| print(f"EXIF not found: {e}") | |
| img_byte_arr = BytesIO() | |
| img.save(img_byte_arr, format='PNG') | |
| img_byte_arr.seek(0) | |
| img = XLSImage(img_byte_arr) | |
| max_height = 27 * 15 | |
| aspect_ratio = img.width / img.height | |
| new_height = max_height | |
| new_width = int(new_height * aspect_ratio) | |
| img.width = new_width | |
| img.height = new_height | |
| worksheet.add_image(img, 'E2') | |
| except FileNotFoundError: | |
| print(f"Error: File '{image_file_path}' not found.") | |
| except OSError as e: | |
| print(f"Error: Unable to open image '{image_file_path}'. {e}") | |
| except Exception as e: | |
| print(f"An unexpected error occurred: {e}") | |
| workbook.save(excel_file_path) | |
| return excel_file_path | |