ReceiptSplitAI / utils.py
valentynliubchenko
merging
eba303d
import base64
import json
import logging
from io import BytesIO
import numpy as np
import pandas as pd
from PIL import Image, ExifTags
from openpyxl import load_workbook
from openpyxl.drawing.image import Image as XLSImage
from openpyxl.styles import Font, Alignment, Border, Side
from algorithm import receipt_calculation
def read_prompt_from_file(file_path):
with open(file_path, 'r', encoding='utf-8') as file:
return file.read()
def encode_image(image_path):
with open(image_path, "rb") as image_file:
return base64.b64encode(image_file.read()).decode('utf-8')
def encode_image_from_gradio(input_image):
if input_image is None:
return None
pil_image = Image.fromarray(np.uint8(input_image))
buffered = BytesIO()
pil_image.save(buffered, format="JPEG")
base64_image = base64.b64encode(buffered.getvalue()).decode('utf-8')
return base64_image
def encode_image(image_path):
with open(image_path, "rb") as image_file:
return base64.b64encode(image_file.read()).decode('utf-8')
def encode_image_to_jpeg_base64(filepath):
"""
Encodes an image file to JPEG and then to a Base64 string.
:param filepath: Path to the image file
:return: Base64 encoded string of the JPEG image or an error message
"""
if filepath is None:
return None, "File path is None."
try:
pil_image = Image.open(filepath)
if pil_image.mode == 'RGBA':
pil_image = pil_image.convert('RGB')
buffered = BytesIO()
pil_image.save(buffered, format="JPEG")
base64_image = base64.b64encode(buffered.getvalue()).decode('utf-8')
return base64_image
except Exception as e:
return None, str(e)
def encode_image_to_webp_base64(filepath):
"""
Encodes an image file to JPEG and then to a Base64 string.
:param filepath: Path to the image file
:return: Base64 encoded string of the JPEG image or an error message
"""
if filepath is None:
return None, "File path is None."
try:
pil_image = Image.open(filepath)
if pil_image.mode == 'RGBA':
pil_image = pil_image.convert('RGB')
buffered = BytesIO()
pil_image.save(buffered, format="WEBP")
base64_image = base64.b64encode(buffered.getvalue()).decode('utf-8')
return base64_image
except Exception as e:
return None, str(e)
def process_receipt_json(json_input):
try:
# Parse the JSON string
data = json.loads(json_input)
except json.JSONDecodeError:
return "Error: Invalid JSON format", None, None, None
# Initialize the output information
store_info = ""
items_table = []
# Try to extract store information, if available
file_name = data.get("file_name", "Not specified")
receipt_type = data.get("receipt_type", "Not specified")
store_name = data.get("store_name", "Not specified")
country = data.get("country", "Not specified")
store_address = data.get("store_address", "Not specified")
currency = data.get("currency", "Not specified")
date_time = data.get("date_time", "Not specified")
type = data.get("payment_method", "Not specified")
sub_total_amount = data.get("sub_total_amount", "Not specified")
total_amount = data.get("total_amount", "Not specified")
discount = data.get("total_discount", "Not specified")
all_items_price_with_tax = data.get("all_items_price_with_tax", "Not specified")
taxes_included_sum = data.get("tax", "Not specified")
taxes_not_included_sum = data.get("taxes_not_included_sum", "Not specified")
tips = data.get("tips", "Not specified")
rounding = data.get("rounding", "Not specified")
photo_quality = data.get("photo_quality", "Not specified")
# Format store information
store_info = \
(
f"\nFile name: {file_name}"
f"\nReceipt type: {receipt_type}"
f"\nStore: {store_name}"
f"\nCountry: {country}"
f"\nAddress: {store_address}"
f"\nCurrency: {currency}"
f"\nSubTotal Amount: {sub_total_amount}"
f"\nTotal Amount: {total_amount}"
f"\nDate time: {date_time}"
f"\nPayment method: {type}"
f"\nTotal discount: {discount}"
f"\nALL items price with tax: {all_items_price_with_tax}"
f"\nTaxes included (sum): {taxes_included_sum}"
f"\nTaxes not included (sum): {taxes_not_included_sum}"
f"\nTips: {tips}"
f"\nRounding: {rounding}"
f"\nPhoto quality: {photo_quality}"
)
# Extract items, if available in JSON
items = data.get("items", [])
if items:
items_table = [[item.get("name", "Not specified"),
item.get("category", "Not specified"),
item.get("unit_price", "Not specified"),
item.get("quantity", "Not specified"),
item.get("unit_of_measurement", "Not specified"),
item.get("total_price", "Not specified"),
item.get("discount", "Not specified"),
item.get("item_price_with_tax", "Not specified"),]
for item in items]
else:
items_table = []
taxs_table = []
return store_info, items_table, taxs_table, "No items"
taxs_items = data.get("taxs_items", [])
taxs_table = []
if taxs_items:
taxs_table = [[tax.get("tax_name", "Not specified"),
tax.get("percentage", "Not specified"),
tax.get("tax_from_amount", "Not specified"),
tax.get("tax", "Not specified"),
tax.get("total", "Not specified"),
tax.get("tax_included", "Not specified")]
for tax in taxs_items]
try:
total_product_prices, total_sum = receipt_calculation.calculate_tips_and_taxes(items_table, total_amount, taxes_included_sum,
tips)
message = create_message(total_sum, total_amount, items_table)
except Exception as e:
logging.exception(e)
message = "Something went wrong"
if items_table[0][0] != "No items":
for item in items_table:
item.append(0.0)
if items_table[0][0] != "No items" and message != "Something went wrong":
for idx, item in enumerate(items_table):
item.append(total_product_prices[idx].price)
return store_info, items_table, taxs_table, message
def clean_and_convert_to_float(value, return_value):
return return_value if validate_is_unknown(value) or value is None else clean_value(value)
def validate_is_unknown(value):
return value == "unknown" or value == "not available" or value == "Not specified"
def clean_value(value):
clean_price = ''.join(c for c in str(value) if c.isdigit() or c in ",.")
return float(clean_price.replace(",", "."))
def create_message(total_sum, total_amount, table):
message = ""
if total_amount in ["Not specified", "unknown", "not available"] or total_amount is None or round(
float(str(total_sum).replace(",", ".")), 2) != round(
float(str(total_amount).replace(",", ".")), 2):
message = message + (
f"The recognized total sum and product total sum are not equal. Check if the AI model correctly created a JSON.\n"
f"Recognized total sum: {total_amount},\n"
f"Calculated total sum: {total_sum}.\n"
)
if table[0][0] == "No items":
message = message + "Receipt hasn't items!\n"
else:
for i in range(len(table)):
price = clean_and_convert_to_float(table[i][2], -1.111)
amount = clean_and_convert_to_float(table[i][3], 1)
total_price = clean_value(table[i][5])
total_price_calculated = round(0 if price == -1.111 else price * amount, 2)
discount = 0 if validate_is_unknown or table[i][6] is None else clean_value(table[i][6])
total_price_calculated = round(total_price_calculated - discount, 2)
if validate_is_unknown(table[i][2]):
message = message + f" {table[i][0]} have {table[i][2]} price! Please retry!\n"
elif total_price != total_price_calculated:
message = message + f"{table[i][0]} has incorrect 'Total Price'. Recognized total: {total_price} VS Calculated total: {total_price_calculated}. \n"
return "Everything is okay!" if message == "" else message
def save_to_excel(json_output, excel_file_path, image_file_path):
store_info, items_table, taxs_table, _ = process_receipt_json(json_output)
if isinstance(store_info, str) and store_info.startswith("Error:"):
return store_info
store_info_lines = store_info.split('\n')
store_info_parsed = [(line.split(': ')[0], line.split(': ')[1]) for line in store_info_lines if ': ' in line]
store_info_df = pd.DataFrame(store_info_parsed, columns=["Label", "Value"])
items_headers = [
"Item Name", "Item category", "Unit Price", "Quantity", "Unit of measurement", "Item Price", "Discount",
"Item price with tax \n(якщо в чеку є item без включених податків - необхідно позначити як FALSE)", "Calculated Total"
]
if not items_table:
print("Warning: items_table is empty. No items to save.")
items_df = pd.DataFrame(columns=items_headers)
else:
items_df = pd.DataFrame(items_table, columns=items_headers)
items_headers.remove("Calculated Total")
items_df = items_df.drop(columns=["Calculated Total"])
workbook = load_workbook('./template/template_receipt.xlsx')
worksheet = workbook['Receipt']
for index, row in enumerate(store_info_df.itertuples(index=False), start=5): # Start from row 1
for col_index, value in enumerate(row, start=1):
worksheet.cell(row=index, column=col_index, value=value)
top_table = 5
items_header_cell = worksheet.cell(row=len(store_info_df) + top_table + 1, column=1, value="Items")
items_header_cell.font = Font(bold=True, size=12)
items_df_start_row = len(store_info_df) + 2 + top_table
thin_border = Border(
left=Side(style='thin'),
right=Side(style='thin'),
top=Side(style='thin'),
bottom=Side(style='thin')
)
for col_index, header in enumerate(items_headers, start=1):
cell = worksheet.cell(row=items_df_start_row, column=col_index, value=header)
cell.font = Font(size=14, bold=True)
cell.alignment = Alignment(wrap_text=True, horizontal='center')
cell.border = thin_border
for index, row in enumerate(items_df.itertuples(index=False), start=items_df_start_row + 1):
for col_index, value in enumerate(row, start=1):
cell = worksheet.cell(row=index, column=col_index, value=value)
cell.border = thin_border
# worksheet.column_dimensions['A'].width = 30
taxs_headers = [
"Tax name", "%", "tax from amount", "tax", "total", "tax included"
]
if not taxs_table:
print("Warning: items_table is empty. No items to save.")
taxs_df = pd.DataFrame(columns=taxs_headers)
else:
taxs_df = pd.DataFrame(taxs_table, columns=taxs_headers)
taxs_df_start_row = items_df_start_row + len(items_df) + 2
items_header_cell = worksheet.cell(row=taxs_df_start_row, column=1, value="Tax list")
items_header_cell.font = Font(bold=True, size=12)
for col_index, header in enumerate(taxs_headers, start=1):
cell = worksheet.cell(row=taxs_df_start_row+1, column=col_index, value=header)
cell.font = Font(size=14, bold=True)
cell.alignment = Alignment(wrap_text=True, horizontal='center')
cell.border = thin_border
for index, row in enumerate(taxs_df.itertuples(index=False), start=taxs_df_start_row + 2):
for col_index, value in enumerate(row, start=1):
cell = worksheet.cell(row=index, column=col_index, value=value)
cell.border = thin_border
try:
with Image.open(image_file_path) as img:
try:
exif = img._getexif()
orientation_tag = next((tag for tag, value in ExifTags.TAGS.items() if value == 'Orientation'), None)
if orientation_tag is not None and exif is not None:
orientation = exif.get(orientation_tag, 1)
if orientation == 3:
img = img.rotate(180, expand=True)
elif orientation == 6:
img = img.rotate(270, expand=True)
elif orientation == 8:
img = img.rotate(90, expand=True)
except Exception as e:
print(f"EXIF not found: {e}")
img_byte_arr = BytesIO()
img.save(img_byte_arr, format='PNG')
img_byte_arr.seek(0)
img = XLSImage(img_byte_arr)
max_height = 27 * 15
aspect_ratio = img.width / img.height
new_height = max_height
new_width = int(new_height * aspect_ratio)
img.width = new_width
img.height = new_height
worksheet.add_image(img, 'E2')
except FileNotFoundError:
print(f"Error: File '{image_file_path}' not found.")
except OSError as e:
print(f"Error: Unable to open image '{image_file_path}'. {e}")
except Exception as e:
print(f"An unexpected error occurred: {e}")
workbook.save(excel_file_path)
return excel_file_path