Spaces:
Sleeping
Sleeping
| import gradio as gr | |
| import cv2 | |
| import pytesseract | |
| from PIL import Image | |
| import io | |
| import base64 | |
| from datetime import datetime | |
| import pytz | |
| from simple_salesforce import Salesforce | |
| import logging | |
| import numpy as np | |
| import os | |
| # Set up logging | |
| logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') | |
| # Configure Tesseract path for Hugging Face | |
| try: | |
| pytesseract.pytesseract.tesseract_cmd = '/usr/bin/tesseract' | |
| pytesseract.get_tesseract_version() # Test Tesseract availability | |
| logging.info("Tesseract is available") | |
| except Exception as e: | |
| logging.error(f"Tesseract not found or misconfigured: {str(e)}") | |
| # Salesforce configuration (use environment variables in production) | |
| SF_USERNAME = os.getenv("SF_USERNAME", "your_salesforce_username") | |
| SF_PASSWORD = os.getenv("SF_PASSWORD", "your_salesforce_password") | |
| SF_SECURITY_TOKEN = os.getenv("SF_SECURITY_TOKEN", "your_salesforce_security_token") | |
| SF_DOMAIN = os.getenv("SF_DOMAIN", "login") # or "test" for sandbox | |
| def connect_to_salesforce(): | |
| """Connect to Salesforce with error handling.""" | |
| try: | |
| sf = Salesforce(username=SF_USERNAME, password=SF_PASSWORD, security_token=SF_SECURITY_TOKEN, domain=SF_DOMAIN) | |
| logging.info("Connected to Salesforce successfully") | |
| return sf | |
| except Exception as e: | |
| logging.error(f"Salesforce connection failed: {str(e)}") | |
| return None | |
| def resize_image(img, max_size_mb=5): | |
| """Resize image to ensure size < 5MB while preserving quality.""" | |
| try: | |
| img_bytes = io.BytesIO() | |
| img.save(img_bytes, format="PNG") | |
| size_mb = len(img_bytes.getvalue()) / (1024 * 1024) | |
| if size_mb <= max_size_mb: | |
| return img, img_bytes.getvalue() | |
| scale = 0.9 | |
| while size_mb > max_size_mb: | |
| w, h = img.size | |
| img = img.resize((int(w * scale), int(h * scale)), Image.Resampling.LANCZOS) | |
| img_bytes = io.BytesIO() | |
| img.save(img_bytes, format="PNG") | |
| size_mb = len(img_bytes.getvalue()) / (1024 * 1024) | |
| scale *= 0.9 | |
| logging.info(f"Resized image to {size_mb:.2f} MB") | |
| return img, img_bytes.getvalue() | |
| except Exception as e: | |
| logging.error(f"Image resizing failed: {str(e)}") | |
| return img, None | |
| def preprocess_image(img_cv): | |
| """Preprocess image for OCR: enhance contrast, reduce noise, and apply adaptive thresholding.""" | |
| try: | |
| # Convert to grayscale | |
| gray = cv2.cvtColor(img_cv, cv2.COLOR_BGR2GRAY) | |
| # Enhance contrast with CLAHE (Contrast Limited Adaptive Histogram Equalization) | |
| clahe = cv2.createCLAHE(clipLimit=3.0, tileGridSize=(8, 8)) | |
| contrast = clahe.apply(gray) | |
| # Reduce noise with Gaussian blur | |
| blurred = cv2.GaussianBlur(contrast, (5, 5), 0) | |
| # Apply adaptive thresholding for better binary image representation | |
| thresh = cv2.adaptiveThreshold(blurred, 255, cv2.ADAPTIVE_THRESH_GAUSSIAN_C, cv2.THRESH_BINARY, 11, 2) | |
| # Sharpen the image to bring out more details in the numbers | |
| kernel = np.array([[0, -1, 0], [-1, 5, -1], [0, -1, 0]]) | |
| sharpened = cv2.filter2D(thresh, -1, kernel) | |
| return sharpened | |
| except Exception as e: | |
| logging.error(f"Image preprocessing failed: {str(e)}") | |
| return gray | |
| def detect_roi(img_cv): | |
| """Detect the region of interest (ROI) containing the weight display.""" | |
| try: | |
| # Convert to grayscale for edge detection | |
| gray = cv2.cvtColor(img_cv, cv2.COLOR_BGR2GRAY) | |
| # Apply edge detection | |
| edges = cv2.Canny(gray, 50, 150) | |
| # Find contours | |
| contours, _ = cv2.findContours(edges, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE) | |
| if not contours: | |
| logging.warning("No contours detected for ROI") | |
| return img_cv # Return full image if no contours found | |
| # Find the largest contour (assuming it’s the display) | |
| largest_contour = max(contours, key=cv2.contourArea) | |
| x, y, w, h = cv2.boundingRect(largest_contour) | |
| # Add padding to the detected region to ensure weight is fully captured | |
| padding = 10 | |
| x = max(0, x - padding) | |
| y = max(0, y - padding) | |
| w = min(img_cv.shape[1] - x, w + 2 * padding) | |
| h = min(img_cv.shape[0] - y, h + 2 * padding) | |
| roi = img_cv[y:y+h, x:x+w] | |
| logging.info(f"ROI detected at ({x}, {y}, {w}, {h})") | |
| return roi | |
| except Exception as e: | |
| logging.error(f"ROI detection failed: {str(e)}") | |
| return img_cv | |
| def extract_weight(img): | |
| """Extract weight from image using Tesseract OCR with multiple PSM modes.""" | |
| try: | |
| if img is None: | |
| logging.error("No image provided for OCR") | |
| return "Not detected", 0.0 | |
| # Convert PIL image to OpenCV format | |
| img_cv = cv2.cvtColor(np.array(img), cv2.COLOR_RGB2BGR) | |
| # Detect ROI | |
| roi_img = detect_roi(img_cv) | |
| # Preprocess the ROI | |
| processed_img = preprocess_image(roi_img) | |
| # Try multiple PSM modes for better detection | |
| psm_modes = [ | |
| ('--psm 7 digits', 'Single line, digits only'), | |
| ('--psm 6 digits', 'Single block, digits only'), | |
| ('--psm 10 digits', 'Single character, digits only'), | |
| ('--psm 8 digits', 'Single word, digits only') | |
| ] | |
| for config, desc in psm_modes: | |
| text = pytesseract.image_to_string(processed_img, config=config) | |
| logging.info(f"OCR attempt with {desc}: Raw text = '{text}'") | |
| weight = ''.join(filter(lambda x: x in '0123456789.', text.strip())) | |
| try: | |
| weight_float = float(weight) | |
| if weight_float >= 0: # Allow zero weights | |
| confidence = 95.0 # Simplified confidence for valid numbers | |
| logging.info(f"Weight detected: {weight} (Confidence: {confidence:.2f}%)") | |
| return weight, confidence | |
| except ValueError: | |
| logging.warning(f"Invalid number format: {weight}") | |
| continue | |
| logging.error("All OCR attempts failed to detect a valid weight") | |
| return "Not detected", 0.0 | |
| except Exception as e: | |
| logging.error(f"OCR processing failed: {str(e)}") | |
| return "Not detected", 0.0 | |
| def process_image(img): | |
| """Process uploaded or captured image and extract weight.""" | |
| if img is None: | |
| logging.error("No image provided") | |
| return "No image uploaded", None, None, None, gr.update(visible=False), gr.update(visible=False) | |
| ist_time = datetime.now(pytz.timezone("Asia/Kolkata")).strftime("%d-%m-%Y %I:%M:%S %p") | |
| img, img_bytes = resize_image(img) | |
| if img_bytes is None: | |
| logging.error("Image resizing failed") | |
| return "Image processing failed", ist_time, img, None, gr.update(visible=False), gr.update(visible=False) | |
| weight, confidence = extract_weight(img) | |
| if weight == "Not detected" or confidence < 95.0: | |
| logging.warning(f"Weight detection failed: {weight} (Confidence: {confidence:.2f}%)") | |
| return f"{weight} (Confidence: {confidence:.2f}%)", ist_time, img, None, gr.update(visible=True), gr.update(visible=False) | |
| img_buffer = io.BytesIO(img_bytes) | |
| img_base64 = base64.b64encode(img_buffer.getvalue()).decode() | |
| logging.info(f"Weight detected successfully: {weight} kg") | |
| return f"{weight} kg (Confidence: {confidence:.2f}%)", ist_time, img, img_base64, gr.update(visible=True), gr.update(visible=True) | |
| def save_to_salesforce(weight_text, img_base64): | |
| """Save weight and image to Salesforce Weight_Log__c object.""" | |
| try: | |
| sf = connect_to_salesforce() | |
| if sf is None: | |
| logging.error("Salesforce connection failed") | |
| return "Failed to connect to Salesforce" | |
| weight = float(weight_text.split(" ")[0]) | |
| ist_time = datetime.now(pytz.timezone("Asia/Kolkata")).strftime("%Y-%m-%d %H:%M:%S") | |
| record = { | |
| "Name": f"Weight_Log_{ist_time}", | |
| "Captured_Weight__c": weight, | |
| "Captured_At__c": ist_time, | |
| "Snapshot_Image__c": img_base64, | |
| "Status__c": "Confirmed" | |
| } | |
| result = sf.Weight_Log__c.create(record) | |
| logging.info(f"Salesforce record created: {result}") | |
| return "Successfully saved to Salesforce" | |
| except Exception as e: | |
| logging.error(f"Salesforce save failed: {str(e)}") | |
| return f"Failed to save to Salesforce: {str(e)}" | |
| # Gradio Interface | |
| with gr.Blocks(title="⚖️ Auto Weight Logger") as demo: | |
| gr.Markdown("## ⚖️ Auto Weight Logger") | |
| gr.Markdown("📷 Upload or capture an image of a digital weight scale (max 5MB).") | |
| with gr.Row(): | |
| image_input = gr.Image(type="pil", label="Upload / Capture Image", sources=["upload", "webcam"]) | |
| output_weight = gr.Textbox(label="⚖️ Detected Weight (in kg)") | |
| with gr.Row(): | |
| timestamp = gr.Textbox(label="🕒 Captured At (IST)") | |
| snapshot = gr.Image(label="📸 Snapshot Image") | |
| with gr.Row(): | |
| confirm_button = gr.Button("✅ Confirm and Save to Salesforce", visible=False) | |
| status = gr.Textbox(label="Save Status", visible=False) | |
| submit = gr.Button("🔍 Detect Weight") | |
| submit.click( | |
| fn=process_image, | |
| inputs=image_input, | |
| outputs=[output_weight, timestamp, snapshot, gr.State(), confirm_button, status] | |
| ) | |
| confirm_button.click( | |
| fn=save_to_salesforce, | |
| inputs=[output_weight, gr.State()], | |
| outputs=status | |
| ) | |
| gr.Markdown(""" | |
| ### Instructions | |
| - Upload a clear, well-lit image of a digital weight scale display (7-segment font preferred). | |
| - Ensure the image is < 5MB (automatically resized if larger). | |
| - Review the detected weight and click 'Confirm and Save to Salesforce' to log the data. | |
| - Works on desktop and mobile browsers. | |
| - If weight detection fails, check the image for glare, low contrast, or non-numeric characters and try again. | |
| """) | |
| if __name__ == "__main__": | |
| demo.launch() | |