import os from pathlib import Path # ----------------------------- # Environment hardening (HF Spaces, /.cache issue) # ----------------------------- _home = os.environ.get("HOME", "") if _home in ("", "/", None): repo_dir = os.getcwd() safe_home = repo_dir if os.access(repo_dir, os.W_OK) else "/tmp" os.environ["HOME"] = safe_home print(f"[startup] HOME not set or unwritable — setting HOME={safe_home}") streamlit_dir = Path(os.environ["HOME"]) / ".streamlit" try: streamlit_dir.mkdir(parents=True, exist_ok=True) print(f"[startup] ensured {streamlit_dir}") except Exception as e: print(f"[startup] WARNING: could not create {streamlit_dir}: {e}") import streamlit as st import json import io from PIL import Image import time import pandas as pd from streamlit_drawable_canvas import st_canvas import pytesseract import numpy as np # Set Tesseract path - auto-detect based on OS if os.name == 'nt': # Windows pytesseract.pytesseract.tesseract_cmd = r"C:\Program Files\Tesseract-OCR\tesseract.exe" else: # Linux/Mac (HF Spaces uses Linux) # On HF Spaces with packages.txt, tesseract is in system PATH # No need to set path explicitly pass # Page configuration st.set_page_config(page_title="Remittance Data Viewer", layout="wide") # Custom CSS to reduce gaps between form fields and style buttons st.markdown(""" """, unsafe_allow_html=True) def load_jsonl(file): """Load JSONL file and return list of records""" data = [] content = file.getvalue().decode('utf-8') for line in content.strip().split('\n'): if line.strip(): data.append(json.loads(line)) return data def save_to_jsonl(data): """Convert data list to JSONL format""" jsonl_content = '\n'.join([json.dumps(record) for record in data]) return jsonl_content def perform_ocr(image, bbox): """Perform OCR on the selected region of the image""" try: # bbox is [x1, y1, x2, y2] x1, y1, x2, y2 = int(bbox[0]), int(bbox[1]), int(bbox[2]), int(bbox[3]) # Ensure coordinates are within image bounds x1, y1 = max(0, x1), max(0, y1) x2, y2 = min(image.width, x2), min(image.height, y2) # Crop the image cropped = image.crop((x1, y1, x2, y2)) # Perform OCR text = pytesseract.image_to_string(cropped, config='--psm 6').strip() return text except Exception as e: return f"OCR Error: {str(e)}" def scale_image_to_fixed_size(image, target_width=700, target_height=900): """Scale and pad image to exact fixed size while maintaining aspect ratio and quality""" # Convert image to RGB if it's not already (handles RGBA, L, etc.) if image.mode not in ('RGB', 'RGBA'): image = image.convert('RGB') elif image.mode == 'RGBA': # Create white background for transparent images background = Image.new('RGB', image.size, (255, 255, 255)) background.paste(image, mask=image.split()[3]) # Use alpha channel as mask image = background # Calculate scaling ratio to fit within target dimensions width_ratio = target_width / image.width height_ratio = target_height / image.height # Use the smaller ratio to ensure image fits within both constraints ratio = min(width_ratio, height_ratio) # Calculate new dimensions new_width = int(image.width * ratio) new_height = int(image.height * ratio) # Resize image with high-quality LANCZOS resampling # Only resize if needed (don't upscale small images too much) if ratio < 1.0 or (ratio > 1.0 and ratio < 1.5): resized_image = image.resize((new_width, new_height), Image.Resampling.LANCZOS) else: # For significant upscaling, use BICUBIC which can be sharper resized_image = image.resize((new_width, new_height), Image.Resampling.BICUBIC) # Create a new image with target size and white background final_image = Image.new('RGB', (target_width, target_height), (255, 255, 255)) # Calculate position to paste resized image (center it) paste_x = (target_width - new_width) // 2 paste_y = (target_height - new_height) // 2 # Paste resized image onto white background final_image.paste(resized_image, (paste_x, paste_y)) return final_image, ratio, paste_x, paste_y # Initialize session state if 'data' not in st.session_state: st.session_state.data = None if 'current_index' not in st.session_state: st.session_state.current_index = 0 if 'edited_data' not in st.session_state: st.session_state.edited_data = None if 'page' not in st.session_state: st.session_state.page = 'upload' if 'images' not in st.session_state: st.session_state.images = {} if 'modified_indices' not in st.session_state: st.session_state.modified_indices = set() if 'ocr_active_section' not in st.session_state: st.session_state.ocr_active_section = None if 'ocr_active_field' not in st.session_state: st.session_state.ocr_active_field = None if 'ocr_line_item_row' not in st.session_state: st.session_state.ocr_line_item_row = None if 'canvas_key' not in st.session_state: st.session_state.canvas_key = 0 if 'line_items_temp' not in st.session_state: st.session_state.line_items_temp = [] if 'button_clicked' not in st.session_state: st.session_state.button_clicked = False if 'save_message' not in st.session_state: st.session_state.save_message = None if 'save_message_time' not in st.session_state: st.session_state.save_message_time = None if 'just_saved' not in st.session_state: st.session_state.just_saved = False # Auto-save function def auto_save(index): """Automatically save changes to session state and mark as modified""" if st.session_state.edited_data: st.session_state.data = st.session_state.edited_data.copy() st.session_state.modified_indices.add(index) # Save button callback def save_changes_callback(): """Callback function for save button""" auto_save(st.session_state.current_index) st.session_state.save_message = "✅ Changes saved successfully!" st.session_state.save_message_time = time.time() # PAGE 1: Upload Page if st.session_state.page == 'upload': st.title("📤 Remittance Data Viewer with OCR") st.markdown("### Upload your files to begin") # Step 1: Upload JSONL st.markdown("**Step 1: Upload JSONL File**") uploaded_file = st.file_uploader("Choose a JSONL file", type=['jsonl', 'json']) if uploaded_file is not None: try: data = load_jsonl(uploaded_file) st.session_state.data = data st.session_state.edited_data = data.copy() st.success(f"✅ Successfully loaded {len(data)} records!") except Exception as e: st.error(f"Error loading file: {str(e)}") # Step 2: Upload Images st.markdown("**Step 2: Upload Images Folder**") uploaded_images = st.file_uploader( "Choose image files", type=['png', 'jpg', 'jpeg', 'tiff', 'tif', 'bmp'], accept_multiple_files=True, help="Select all images from your folder at once" ) if uploaded_images: # Load images into session state images_dict = {} for img_file in uploaded_images: try: image = Image.open(img_file) images_dict[img_file.name] = image except Exception as e: st.warning(f"Could not load image {img_file.name}: {str(e)}") st.session_state.images = images_dict # Show summary of loaded images and matches with ground truth if st.session_state.data is not None: # gather ground truth file names gt_file_names = [rec.get('file_name', '') for rec in st.session_state.data] matched_images = set() unmatched_gt_files = [] # Find matched images - CASE SENSITIVE EXACT MATCH ONLY for fname in gt_file_names: if not fname: continue # Check for exact match in uploaded images if fname in images_dict: matched_images.add(fname) # Find unmatched ground truth file names for fname in gt_file_names: if fname and fname not in matched_images: unmatched_gt_files.append(fname) st.success(f"✅ Successfully loaded {len(images_dict)} images!") st.info(f"🔎 Exact matches: {len(matched_images)}/{len([f for f in gt_file_names if f])}") # Show unmatched files if unmatched_gt_files: st.warning(f"âš ī¸ {len(unmatched_gt_files)} file(s) from JSONL not matched to images:") with st.expander(f"Show {len(unmatched_gt_files)} unmatched file names"): for fname in unmatched_gt_files: st.text(f" â€ĸ {fname}") else: st.success("✅ All JSONL file names matched to images!") else: st.success(f"✅ Successfully loaded {len(images_dict)} images!") st.info("â„šī¸ Upload a JSONL file to see how many images match the ground truth 'file_name' field.") # Continue Button if st.session_state.data is not None: col1, col2, col3 = st.columns([1, 1, 1]) with col2: if st.button("Continue to Viewer →", type="primary", use_container_width=True): st.session_state.page = 'viewer' st.session_state.modified_indices = set() st.rerun() # PAGE 2: Viewer Page elif st.session_state.page == 'viewer': # Clear old save messages (after 3 seconds) if st.session_state.save_message_time is not None: if time.time() - st.session_state.save_message_time > 3: st.session_state.save_message = None st.session_state.save_message_time = None # Header with back button and download options col1, col2, col3, col4 = st.columns([1, 2, 2, 2]) with col1: if st.button("← Back to Upload"): st.session_state.page = 'upload' st.session_state.ocr_active_section = None st.session_state.ocr_active_field = None st.session_state.save_message = None st.session_state.save_message_time = None st.rerun() # Download modified records and unmodified records separately with col2: if st.session_state.modified_indices: modified_data = [st.session_state.edited_data[i] for i in sorted(st.session_state.modified_indices)] jsonl_modified = save_to_jsonl(modified_data) st.download_button( label=f"âŦ‡ī¸ Download Modified ({len(modified_data)})", data=jsonl_modified, file_name="modified_remittance_data.jsonl", mime="application/jsonl", type="primary", use_container_width=True ) else: st.button( "âŦ‡ī¸ No Modified Records", disabled=True, use_container_width=True ) # Download unmodified records (original data excluding modified) with col3: if st.session_state.modified_indices: # Get original unmodified data unmodified_data = [st.session_state.data[i] for i in range(len(st.session_state.data)) if i not in st.session_state.modified_indices] jsonl_unmodified = save_to_jsonl(unmodified_data) st.download_button( label=f"âŦ‡ī¸ Download Unmodified ({len(unmodified_data)})", data=jsonl_unmodified, file_name="unmodified_remittance_data.jsonl", mime="application/jsonl", use_container_width=True ) else: st.button( "âŦ‡ī¸ No Unmodified Records", disabled=True, use_container_width=True ) # Download all edited data with col4: jsonl_all = save_to_jsonl(st.session_state.edited_data) st.download_button( label=f"âŦ‡ī¸ Download All ({len(st.session_state.edited_data)})", data=jsonl_all, file_name="all_remittance_data.jsonl", mime="application/jsonl", use_container_width=True ) # File selector dropdown file_names = [record.get('file_name', f'Record {i}') for i, record in enumerate(st.session_state.data)] selected_file = st.selectbox( "Select a file to view:", options=range(len(file_names)), format_func=lambda x: f"{'âœī¸ ' if x in st.session_state.modified_indices else ''}{file_names[x]}", index=st.session_state.current_index ) st.session_state.current_index = selected_file current_record = st.session_state.edited_data[selected_file] # Main layout: LHS (Image) and RHS (Details) - REDUCED GAP left_col, right_col = st.columns([1.3, 1], gap="small") # LEFT SIDE: Image Display with OCR Canvas with left_col: st.markdown("### đŸ–ŧī¸ Document Image") file_name = current_record.get('file_name', '') if file_name: st.caption(f"**File:** {file_name}") # Try to find matching image - CASE SENSITIVE EXACT MATCH ONLY current_image = None if file_name in st.session_state.images: current_image = st.session_state.images[file_name] else: st.error(f"❌ Image '{file_name}' not found in uploaded images") st.info("💡 Available images:") with st.expander("Show available images"): for img_name in list(st.session_state.images.keys())[:20]: st.text(f" â€ĸ {img_name}") if len(st.session_state.images) > 20: st.text(f" ... and {len(st.session_state.images) - 20} more") if current_image: # Scale image to fixed size scaled_image, scale_ratio, paste_x, paste_y = scale_image_to_fixed_size(current_image) # Always show canvas for drawing rectangles canvas_result = st_canvas( fill_color="rgba(255, 165, 0, 0.3)", stroke_width=2, stroke_color="#FF0000", background_image=scaled_image, update_streamlit=True, height=scaled_image.height, width=scaled_image.width, drawing_mode="rect", key=f"canvas_{selected_file}_{st.session_state.canvas_key}", ) # Process OCR when rectangle is drawn and field is selected if canvas_result.json_data is not None and st.session_state.ocr_active_field: objects = canvas_result.json_data["objects"] if len(objects) > 0: # Get the last drawn rectangle rect = objects[-1] # Adjust coordinates for padding and scale back to original image coordinates bbox = [ (rect["left"] - paste_x) / scale_ratio, (rect["top"] - paste_y) / scale_ratio, (rect["left"] + rect["width"] - paste_x) / scale_ratio, (rect["top"] + rect["height"] - paste_y) / scale_ratio ] # Perform OCR on original image with st.spinner("Performing OCR..."): ocr_text = perform_ocr(current_image, bbox) if ocr_text and not ocr_text.startswith("OCR Error"): st.success(f"✅ OCR Result: {ocr_text}") # Update the field value gt_parse = st.session_state.edited_data[selected_file].get('gt_parse', {}) if st.session_state.ocr_active_section == 'Line_items': # Handle line items line_items = gt_parse.get('Line_items', []) row_idx = st.session_state.ocr_line_item_row if row_idx is not None and row_idx < len(line_items): line_items[row_idx][st.session_state.ocr_active_field] = ocr_text gt_parse['Line_items'] = line_items else: # Handle other sections section = st.session_state.ocr_active_section field = st.session_state.ocr_active_field if section not in gt_parse: gt_parse[section] = {} gt_parse[section][field] = ocr_text st.session_state.edited_data[selected_file]['gt_parse'] = gt_parse # Clear canvas and reset st.session_state.canvas_key += 1 time.sleep(0.3) st.rerun() else: st.error(ocr_text) else: st.warning("No file name specified in record") # RIGHT SIDE: Editable Details with right_col: st.markdown("### 📝 Document Details") gt_parse = current_record.get('gt_parse', {}) # Create tabs for each section tab1, tab2, tab3, tab4 = st.tabs([ "📄 Remittance Details", "đŸ‘Ĩ Party Details", "đŸĻ Bank Details", "📋 Line Items" ]) # TAB 1: Remittance Details with tab1: # OCR Field Selector remittance_fields = [ 'Select fields', 'Remittance_adv_no', 'Remittance_adv_date', 'Payment_method', 'FCY', 'Total_payment_amt_FCY', 'Payment_date', 'Payment_ref_no' ] selected_rem_field = st.selectbox( "🔍 Select field to populate via OCR:", options=remittance_fields, key=f"rem_ocr_select_{selected_file}" ) if selected_rem_field != 'Select fields': st.session_state.ocr_active_section = 'Remittance_details' st.session_state.ocr_active_field = selected_rem_field st.session_state.ocr_line_item_row = None else: if st.session_state.ocr_active_section == 'Remittance_details': st.session_state.ocr_active_section = None st.session_state.ocr_active_field = None remittance = gt_parse.get('Remittance_details', {}) remittance['Remittance_adv_no'] = st.text_input( "Remittance Advice No", value=remittance.get('Remittance_adv_no', ''), key=f"rem_adv_no_{selected_file}" ) remittance['Remittance_adv_date'] = st.text_input( "Remittance Advice Date", value=remittance.get('Remittance_adv_date', ''), key=f"rem_adv_date_{selected_file}" ) remittance['Payment_method'] = st.text_input( "Payment Method", value=remittance.get('Payment_method', ''), key=f"payment_method_{selected_file}" ) remittance['FCY'] = st.text_input( "FCY (Foreign Currency)", value=remittance.get('FCY', ''), key=f"fcy_{selected_file}" ) remittance['Total_payment_amt_FCY'] = st.text_input( "Total Payment Amount (FCY)", value=remittance.get('Total_payment_amt_FCY', ''), key=f"total_payment_{selected_file}" ) remittance['Payment_date'] = st.text_input( "Payment Date", value=remittance.get('Payment_date', ''), key=f"payment_date_{selected_file}" ) remittance['Payment_ref_no'] = st.text_input( "Payment Reference No", value=remittance.get('Payment_ref_no', ''), key=f"payment_ref_{selected_file}" ) gt_parse['Remittance_details'] = remittance # TAB 2: Customer/Supplier Details with tab2: # OCR Field Selector customer_fields = [ 'Select fields', 'Customer_name', 'Customer_address', 'Customer_contact_info', 'Supplier_name', 'Supplier_address', 'Supplier_contact_info' ] selected_cust_field = st.selectbox( "🔍 Select field to populate via OCR:", options=customer_fields, key=f"cust_ocr_select_{selected_file}" ) if selected_cust_field != 'Select fields': st.session_state.ocr_active_section = 'Customer_supplier_details' st.session_state.ocr_active_field = selected_cust_field st.session_state.ocr_line_item_row = None else: if st.session_state.ocr_active_section == 'Customer_supplier_details': st.session_state.ocr_active_section = None st.session_state.ocr_active_field = None st.markdown("**Customer Details**") customer_supplier = gt_parse.get('Customer_supplier_details', {}) customer_supplier['Customer_name'] = st.text_input( "Customer Name", value=customer_supplier.get('Customer_name', ''), key=f"cust_name_{selected_file}" ) customer_supplier['Customer_address'] = st.text_area( "Customer Address", value=customer_supplier.get('Customer_address', ''), key=f"cust_addr_{selected_file}", height=60 ) customer_supplier['Customer_contact_info'] = st.text_input( "Customer Contact Info", value=customer_supplier.get('Customer_contact_info', ''), key=f"cust_contact_{selected_file}" ) st.markdown("**Supplier Details**") customer_supplier['Supplier_name'] = st.text_input( "Supplier Name", value=customer_supplier.get('Supplier_name', ''), key=f"supp_name_{selected_file}" ) customer_supplier['Supplier_address'] = st.text_area( "Supplier Address", value=customer_supplier.get('Supplier_address', ''), key=f"supp_addr_{selected_file}", height=60 ) customer_supplier['Supplier_contact_info'] = st.text_input( "Supplier Contact Info", value=customer_supplier.get('Supplier_contact_info', ''), key=f"supp_contact_{selected_file}" ) gt_parse['Customer_supplier_details'] = customer_supplier # TAB 3: Bank Details with tab3: # OCR Field Selector bank_fields = [ 'Select fields', 'Bank_name', 'Bank_acc_no', 'Bank_routing_no', 'Swift_code' ] selected_bank_field = st.selectbox( "🔍 Select field to populate via OCR:", options=bank_fields, key=f"bank_ocr_select_{selected_file}" ) if selected_bank_field != 'Select fields': st.session_state.ocr_active_section = 'Bank_details' st.session_state.ocr_active_field = selected_bank_field st.session_state.ocr_line_item_row = None else: if st.session_state.ocr_active_section == 'Bank_details': st.session_state.ocr_active_section = None st.session_state.ocr_active_field = None bank = gt_parse.get('Bank_details', {}) bank['Bank_name'] = st.text_input( "Bank Name", value=bank.get('Bank_name', ''), key=f"bank_name_{selected_file}" ) bank['Bank_acc_no'] = st.text_input( "Bank Account No", value=bank.get('Bank_acc_no', ''), key=f"bank_acc_{selected_file}" ) bank['Bank_routing_no'] = st.text_input( "Bank Routing No", value=bank.get('Bank_routing_no', ''), key=f"bank_routing_{selected_file}" ) bank['Swift_code'] = st.text_input( "SWIFT Code", value=bank.get('Swift_code', ''), key=f"swift_{selected_file}" ) gt_parse['Bank_details'] = bank # TAB 4: Line Items with tab4: # OCR Controls for Line Items - Fixed layout line_items = gt_parse.get('Line_items', []) # Adjusted column widths - all controls in single compact line col_field, col_row, col_add, col_remove = st.columns([1.5, 0.7, 0.30, 0.30]) line_item_fields = [ 'Select fields', 'Po_number', 'Invoice_no', 'Other_doc_ref_no', 'Invoice_date', 'Invoice_amount_FCY', 'Amount_paid_for_each_invoice', 'Outstanding_balance_FCY', 'Discounts_taken_FCY', 'Adjustments(without_holding_tax)_FCY', 'Descriptions' ] with col_field: selected_line_field = st.selectbox( "🔍 Field:", options=line_item_fields, key=f"line_ocr_field_{selected_file}" ) with col_row: if len(line_items) > 0: selected_row = st.selectbox( "Row:", options=list(range(len(line_items))), format_func=lambda x: f"Row {x + 1}", key=f"line_ocr_row_{selected_file}" ) else: st.selectbox("Row:", options=[], disabled=True, key=f"line_ocr_row_empty_{selected_file}") selected_row = None with col_add: # Use button with on_click callback to prevent loop if st.button("➕", key=f"add_row_{selected_file}", help="Add new row"): if not st.session_state.button_clicked: st.session_state.button_clicked = True new_row = { "Po_number": "", "Invoice_no": "", "Other_doc_ref_no": "", "Invoice_date": "", "Invoice_amount_FCY": "", "Amount_paid_for_each_invoice": "", "Outstanding_balance_FCY": "", "Discounts_taken_FCY": "", "Adjustments(without_holding_tax)_FCY": "", "Descriptions": "" } line_items.append(new_row) gt_parse['Line_items'] = line_items st.session_state.edited_data[selected_file]['gt_parse'] = gt_parse st.session_state.modified_indices.add(selected_file) st.rerun() with col_remove: if st.button("➖", key=f"remove_row_{selected_file}", help="Remove selected row", disabled=(len(line_items) == 0)): if not st.session_state.button_clicked and len(line_items) > 0 and selected_row is not None: st.session_state.button_clicked = True line_items.pop(selected_row) gt_parse['Line_items'] = line_items st.session_state.edited_data[selected_file]['gt_parse'] = gt_parse st.session_state.modified_indices.add(selected_file) st.rerun() # Reset button clicked flag after processing if st.session_state.button_clicked: st.session_state.button_clicked = False # Set OCR state for line items if selected_line_field != 'Select fields' and selected_row is not None: st.session_state.ocr_active_section = 'Line_items' st.session_state.ocr_active_field = selected_line_field st.session_state.ocr_line_item_row = selected_row else: if st.session_state.ocr_active_section == 'Line_items': st.session_state.ocr_active_section = None st.session_state.ocr_active_field = None st.session_state.ocr_line_item_row = None # Display line items table if line_items: df = pd.DataFrame(line_items) # Convert amount fields to numeric amount_fields = ['Invoice_amount_FCY', 'Amount_paid_for_each_invoice', 'Outstanding_balance_FCY', 'Discounts_taken_FCY', 'Adjustments(without_holding_tax)_FCY'] for field in amount_fields: if field in df.columns: df[field] = pd.to_numeric(df[field].replace('', None), errors='coerce') column_config = { "Po_number": st.column_config.TextColumn("PO Number", width="small"), "Invoice_no": st.column_config.TextColumn("Invoice No", width="small"), "Other_doc_ref_no": st.column_config.TextColumn("Other Doc Ref No", width="small"), "Invoice_date": st.column_config.TextColumn("Invoice Date", width="small"), "Invoice_amount_FCY": st.column_config.NumberColumn("Invoice Amt FCY", width="small", format="%.2f"), "Amount_paid_for_each_invoice": st.column_config.NumberColumn("Amount Paid", width="small", format="%.2f"), "Outstanding_balance_FCY": st.column_config.NumberColumn("Outstanding FCY", width="small", format="%.2f"), "Discounts_taken_FCY": st.column_config.NumberColumn("Discounts FCY", width="small", format="%.2f"), "Adjustments(without_holding_tax)_FCY": st.column_config.NumberColumn("Adjustments FCY", width="small", format="%.2f"), "Descriptions": st.column_config.TextColumn("Descriptions", width="medium"), } edited_df = st.data_editor( df, column_config=column_config, num_rows="fixed", use_container_width=True, key=f"line_items_table_{selected_file}", hide_index=False ) # Convert back to string for field in amount_fields: if field in edited_df.columns: edited_df[field] = edited_df[field].apply(lambda x: str(x) if pd.notna(x) else '') gt_parse['Line_items'] = edited_df.to_dict('records') else: st.info("No line items. Click ➕ to add a new row.") # Update the edited data st.session_state.edited_data[selected_file]['gt_parse'] = gt_parse # Save button st.markdown("---") col1, col2 = st.columns([1, 1]) with col1: if st.button("💾 Save Changes", type="primary", use_container_width=True, key=f"save_btn_{selected_file}"): if not st.session_state.just_saved: st.session_state.just_saved = True auto_save(selected_file) st.session_state.save_message = "✅ Changes saved successfully!" st.session_state.save_message_time = time.time() st.rerun() # Reset the just_saved flag after rerun if st.session_state.just_saved: st.session_state.just_saved = False # Display save message under the button (appears after rerun) if st.session_state.save_message: st.success(st.session_state.save_message)