|
|
|
|
|
""" |
|
|
Final Menu OCR -> Excel (Batch) Gradio app |
|
|
- Robust handling of Gradio temp file paths |
|
|
- Parses filename (<StoreName>_<StoreCode> <BranchName>.<ext>) |
|
|
- Produces one Excel per image (mapped to template A..S, row 3 onward) |
|
|
- Returns a ZIP of all Excels and also individual Excel files for download |
|
|
""" |
|
|
import gradio as gr |
|
|
import pandas as pd |
|
|
import pytesseract |
|
|
from pytesseract import Output |
|
|
import cv2 |
|
|
import re |
|
|
import tempfile |
|
|
import shutil |
|
|
import os |
|
|
import numpy as np |
|
|
from PIL import Image |
|
|
from io import BytesIO |
|
|
from zipfile import ZipFile |
|
|
from openpyxl import load_workbook |
|
|
|
|
|
PRICE_REGEX = re.compile(r"(?:₹|Rs\.?|INR)?\s*([0-9]{1,6}(?:\.[0-9]{1,2})?)(?:\s*/-)?\s*$", flags=re.IGNORECASE) |
|
|
CATEGORY_HINTS = ["maggi", "noodles", "pizza", "burger", "rice", "continental", "beverages", "coffee", "tea"] |
|
|
DEFAULTS = { |
|
|
"Active": "1", |
|
|
"Priority": "", |
|
|
"Image": "", |
|
|
"Food type": "", |
|
|
"NoOfMains": "1", |
|
|
"OnlineName": "", |
|
|
"AlternateClassification": "", |
|
|
"ItemTaxInclusive": "0", |
|
|
"TaxPct": "", |
|
|
"BrandName": "", |
|
|
"ClassificationCode": "", |
|
|
"HSN Code": "" |
|
|
} |
|
|
|
|
|
def safe_read_bytes(uploaded_file): |
|
|
""" |
|
|
uploaded_file may be a Gradio temp-file object. We try reading from the .name path if present, |
|
|
otherwise fallback to uploaded_file.read() |
|
|
""" |
|
|
if uploaded_file is None: |
|
|
return None |
|
|
|
|
|
try: |
|
|
path = getattr(uploaded_file, "name", None) |
|
|
if path and os.path.exists(path): |
|
|
with open(path, "rb") as f: |
|
|
return f.read() |
|
|
except Exception: |
|
|
pass |
|
|
|
|
|
try: |
|
|
uploaded_file.seek(0) |
|
|
except Exception: |
|
|
pass |
|
|
try: |
|
|
return uploaded_file.read() |
|
|
except Exception: |
|
|
return None |
|
|
|
|
|
def get_original_basename(uploaded_file): |
|
|
""" |
|
|
Return basename from uploaded_file.name (works with Gradio temp paths) |
|
|
""" |
|
|
name_attr = getattr(uploaded_file, "name", "") |
|
|
if not name_attr: |
|
|
return "unknown" |
|
|
return os.path.basename(name_attr) |
|
|
|
|
|
def parse_filename(filename: str): |
|
|
base = os.path.splitext(os.path.basename(filename))[0] |
|
|
if "_" in base: |
|
|
left, right = base.split("_", 1) |
|
|
store_name = left.strip() |
|
|
parts = right.strip().split(" ", 1) |
|
|
store_code = parts[0].strip() |
|
|
branch_name = parts[1].strip() if len(parts) > 1 else "" |
|
|
else: |
|
|
m = re.match(r"(.+?)\s*\((.+?)\)", base) |
|
|
if m: |
|
|
store_name = m.group(1).strip() |
|
|
branch_name = m.group(2).strip() |
|
|
store_code = "" |
|
|
else: |
|
|
store_name = base |
|
|
store_code = "" |
|
|
branch_name = "" |
|
|
return store_name, store_code, branch_name |
|
|
|
|
|
def preprocess_image(np_img): |
|
|
gray = cv2.cvtColor(np_img, cv2.COLOR_RGB2GRAY) |
|
|
h, w = gray.shape[:2] |
|
|
if min(h, w) < 1000: |
|
|
scale = max(1.5, 1000.0 / min(h, w)) |
|
|
gray = cv2.resize(gray, None, fx=scale, fy=scale, interpolation=cv2.INTER_CUBIC) |
|
|
th = cv2.adaptiveThreshold(gray, 255, cv2.ADAPTIVE_THRESH_GAUSSIAN_C, |
|
|
cv2.THRESH_BINARY, 41, 11) |
|
|
kernel = np.ones((1, 1), np.uint8) |
|
|
opened = cv2.morphologyEx(th, cv2.MORPH_OPEN, kernel) |
|
|
return opened |
|
|
|
|
|
def ocr_with_confidence(pil_img): |
|
|
try: |
|
|
data = pytesseract.image_to_data(pil_img, output_type=Output.DICT, lang='eng') |
|
|
except Exception as e: |
|
|
raise RuntimeError(f"Tesseract OCR failed: {e}. Ensure Tesseract is installed on the host.") |
|
|
texts = data.get('text', []) |
|
|
confs = data.get('conf', []) |
|
|
block_nums = data.get('block_num', []) |
|
|
par_nums = data.get('par_num', []) |
|
|
line_nums = data.get('line_num', []) |
|
|
lines_map = {} |
|
|
for t, c, b, p, l in zip(texts, confs, block_nums, par_nums, line_nums): |
|
|
if t is None or str(t).strip() == "": |
|
|
continue |
|
|
key = f"{b}_{p}_{l}" |
|
|
if key not in lines_map: |
|
|
lines_map[key] = {"tokens": [], "confs": []} |
|
|
lines_map[key]["tokens"].append(str(t)) |
|
|
try: |
|
|
conf_val = float(c) |
|
|
except: |
|
|
conf_val = -1.0 |
|
|
if conf_val >= 0: |
|
|
lines_map[key]["confs"].append(conf_val) |
|
|
lines = [] |
|
|
for key in sorted(lines_map.keys(), key=lambda x: tuple(map(int, x.split("_")))): |
|
|
tokens = lines_map[key]["tokens"] |
|
|
confs_line = lines_map[key]["confs"] |
|
|
text_line = " ".join(tokens).strip() |
|
|
avg_conf = round(sum(confs_line)/len(confs_line),2) if confs_line else 0.0 |
|
|
lines.append({"line": text_line, "conf": avg_conf}) |
|
|
full_text = "\n".join([l["line"] for l in lines]) |
|
|
return full_text, lines |
|
|
|
|
|
def split_lines(text: str): |
|
|
cleaned = re.sub(r"[•·●\t]", " ", text) |
|
|
cleaned = re.sub(r"[ ]{2,}", " ", cleaned) |
|
|
return [ln.strip() for ln in cleaned.splitlines() if ln.strip()] |
|
|
|
|
|
def looks_like_category(line: str): |
|
|
low = line.lower() |
|
|
if any(k in low for k in CATEGORY_HINTS): |
|
|
return True |
|
|
if not re.search(r"\d", line) and len(line.split()) <= 6: |
|
|
return True |
|
|
return False |
|
|
|
|
|
def parse_menu_lines(lines): |
|
|
rows = [] |
|
|
current_parent = "" |
|
|
current_category = "" |
|
|
for ln in lines: |
|
|
if looks_like_category(ln): |
|
|
if ln.isupper() or any(k in ln.lower() for k in CATEGORY_HINTS): |
|
|
current_parent = ln.strip(":- ") |
|
|
continue |
|
|
else: |
|
|
current_category = ln.strip(":- ") |
|
|
continue |
|
|
m = PRICE_REGEX.search(ln) |
|
|
if m: |
|
|
price = m.group(1).strip() |
|
|
name_part = PRICE_REGEX.sub("", ln).strip(" -:.") |
|
|
row = { |
|
|
"Parent Category": current_parent, |
|
|
"Category": current_category, |
|
|
"Name": name_part, |
|
|
"Item Code": "", |
|
|
"Master Item Name": name_part, |
|
|
"EAN Code": "", |
|
|
"Price": price, |
|
|
"Active": DEFAULTS["Active"], |
|
|
"Priority": DEFAULTS["Priority"], |
|
|
"Image": DEFAULTS["Image"], |
|
|
"Food type": DEFAULTS["Food type"], |
|
|
"NoOfMains": DEFAULTS["NoOfMains"], |
|
|
"OnlineName": DEFAULTS["OnlineName"], |
|
|
"AlternateClassification": DEFAULTS["AlternateClassification"], |
|
|
"ItemTaxInclusive": DEFAULTS["ItemTaxInclusive"], |
|
|
"TaxPct": DEFAULTS["TaxPct"], |
|
|
"BrandName": DEFAULTS["BrandName"], |
|
|
"ClassificationCode": DEFAULTS["ClassificationCode"], |
|
|
"HSN Code": DEFAULTS["HSN Code"] |
|
|
} |
|
|
rows.append(row) |
|
|
else: |
|
|
if re.search(r"\d", ln): |
|
|
name_part = ln.strip() |
|
|
row = { |
|
|
"Parent Category": current_parent, |
|
|
"Category": current_category, |
|
|
"Name": name_part, |
|
|
"Item Code": "", |
|
|
"Master Item Name": name_part, |
|
|
"EAN Code": "", |
|
|
"Price": "", |
|
|
"Active": DEFAULTS["Active"], |
|
|
"Priority": DEFAULTS["Priority"], |
|
|
"Image": DEFAULTS["Image"], |
|
|
"Food type": DEFAULTS["Food type"], |
|
|
"NoOfMains": DEFAULTS["NoOfMains"], |
|
|
"OnlineName": DEFAULTS["OnlineName"], |
|
|
"AlternateClassification": DEFAULTS["AlternateClassification"], |
|
|
"ItemTaxInclusive": DEFAULTS["ItemTaxInclusive"], |
|
|
"TaxPct": DEFAULTS["TaxPct"], |
|
|
"BrandName": DEFAULTS["BrandName"], |
|
|
"ClassificationCode": DEFAULTS["ClassificationCode"], |
|
|
"HSN Code": DEFAULTS["HSN Code"] |
|
|
} |
|
|
rows.append(row) |
|
|
return rows |
|
|
|
|
|
def fill_template_bytes(template_path, rows, store_name, store_code, branch_name): |
|
|
wb = load_workbook(template_path) |
|
|
ws = wb.active |
|
|
ws["A1"] = store_name |
|
|
ws["B1"] = store_code |
|
|
ws["C1"] = branch_name |
|
|
start_row = 3 |
|
|
r = start_row |
|
|
for item in rows: |
|
|
ws.cell(row=r, column=1, value=item.get("Parent Category","")) |
|
|
ws.cell(row=r, column=2, value=item.get("Category","")) |
|
|
ws.cell(row=r, column=3, value=item.get("Name","")) |
|
|
ws.cell(row=r, column=4, value=item.get("Item Code","")) |
|
|
ws.cell(row=r, column=5, value=item.get("Master Item Name","")) |
|
|
ws.cell(row=r, column=6, value=item.get("EAN Code","")) |
|
|
ws.cell(row=r, column=7, value=item.get("Price","")) |
|
|
ws.cell(row=r, column=8, value=item.get("Active","")) |
|
|
ws.cell(row=r, column=9, value=item.get("Priority","")) |
|
|
ws.cell(row=r, column=10, value=item.get("Image","")) |
|
|
ws.cell(row=r, column=11, value=item.get("Food type","")) |
|
|
ws.cell(row=r, column=12, value=item.get("NoOfMains","")) |
|
|
ws.cell(row=r, column=13, value=item.get("OnlineName","")) |
|
|
ws.cell(row=r, column=14, value=item.get("AlternateClassification","")) |
|
|
ws.cell(row=r, column=15, value=item.get("ItemTaxInclusive","")) |
|
|
ws.cell(row=r, column=16, value=item.get("TaxPct","")) |
|
|
ws.cell(row=r, column=17, value=item.get("BrandName","")) |
|
|
ws.cell(row=r, column=18, value=item.get("ClassificationCode","")) |
|
|
ws.cell(row=r, column=19, value=item.get("HSN Code","")) |
|
|
r += 1 |
|
|
out = BytesIO() |
|
|
wb.save(out) |
|
|
out.seek(0) |
|
|
return out |
|
|
|
|
|
def sanitize_filename(name): |
|
|
return re.sub(r"[^\w\-_\. ]", "_", name) |
|
|
|
|
|
def process_batch(images, template): |
|
|
if images is None or template is None: |
|
|
raise gr.Error("Please upload images and a template file.") |
|
|
tempdir = tempfile.mkdtemp() |
|
|
generated_paths = [] |
|
|
for uploaded in images: |
|
|
try: |
|
|
orig_basename = get_original_basename(uploaded) |
|
|
store_name, store_code, branch_name = parse_filename(orig_basename) |
|
|
data = safe_read_bytes(uploaded) |
|
|
if data is None: |
|
|
raise RuntimeError("Could not read uploaded image bytes.") |
|
|
pil = Image.open(BytesIO(data)).convert("RGB") |
|
|
np_img = np.array(pil) |
|
|
pre = preprocess_image(np_img) |
|
|
pil_pre = Image.fromarray(pre) |
|
|
full_text, lines_conf = ocr_with_confidence(pil_pre) |
|
|
lines = split_lines(full_text) |
|
|
rows = parse_menu_lines(lines) |
|
|
out_buf = fill_template_bytes(template.name, rows, store_name, store_code, branch_name) |
|
|
out_name = sanitize_filename(os.path.splitext(orig_basename)[0]) + ".xlsx" |
|
|
out_path = os.path.join(tempdir, out_name) |
|
|
with open(out_path, "wb") as f: |
|
|
f.write(out_buf.read()) |
|
|
generated_paths.append(out_path) |
|
|
except Exception as e: |
|
|
err_name = sanitize_filename(os.path.splitext(get_original_basename(uploaded))[0]) + "_ERROR.txt" |
|
|
err_path = os.path.join(tempdir, err_name) |
|
|
with open(err_path, "w", encoding="utf-8") as ef: |
|
|
ef.write(str(e)) |
|
|
generated_paths.append(err_path) |
|
|
zip_path = os.path.join(tempdir, "Menu_Results.zip") |
|
|
with ZipFile(zip_path, "w") as zf: |
|
|
for p in generated_paths: |
|
|
zf.write(p, arcname=os.path.basename(p)) |
|
|
return zip_path, generated_paths |
|
|
|
|
|
with gr.Blocks() as demo: |
|
|
gr.Markdown("## Menu OCR → Excel (Batch)\nUpload multiple images and an Excel template. The app will parse filename metadata, OCR the menu, and produce one Excel per image.") |
|
|
with gr.Row(): |
|
|
images_in = gr.File(file_count="multiple", label="Upload menu images", file_types=["image"]) |
|
|
template_in = gr.File(file_count="single", label="Upload Excel template (.xlsx)", file_types=[".xlsx"]) |
|
|
run_btn = gr.Button("Process all images") |
|
|
zip_out = gr.File(label="Download ZIP of all Excel outputs") |
|
|
files_out = gr.File(label="Download individual Excel files (multiple)", file_count="multiple") |
|
|
status = gr.Textbox(label="Status") |
|
|
def run(images, template): |
|
|
try: |
|
|
zip_path, files = process_batch(images, template) |
|
|
return zip_path, files, f"Processed {len(files)} files. Download ZIP or individual files." |
|
|
except Exception as e: |
|
|
return None, [], f"Error: {e}" |
|
|
run_btn.click(fn=run, inputs=[images_in, template_in], outputs=[zip_out, files_out, status]) |
|
|
demo.launch() |
|
|
|