| import os |
| from paddleocr import PaddleOCR |
| from PIL import Image |
| import gradio as gr |
| import requests |
| import re |
| from simple_salesforce import Salesforce |
| import pandas as pd |
| import matplotlib.pyplot as plt |
| from io import BytesIO |
| from fuzzywuzzy import process |
| import kaleido |
|
|
| |
| ATTRIBUTE_MAPPING = { |
| "Product name": "Productname__c", |
| "Colour": "Colour__c", |
| "Motortype": "Motortype__c", |
| "Frequency": "Frequency__c", |
| "Grossweight": "Grossweight__c", |
| "Ratio": "Ratio__c", |
| "MotorFrame": "Motorframe__c", |
| "Model": "Model__c", |
| "Speed": "Speed__c", |
| "Quantity": "Quantity__c", |
| "Voltage": "Voltage__c", |
| "Material": "Material__c", |
| "Type": "Type__c", |
| "Horsepower": "Horsepower__c", |
| "Consignee": "Consignee__c", |
| "LOT": "LOT__c", |
| "Stage": "Stage__c", |
| "Outlet": "Outlet__c", |
| "Serialnumber": "Serialnumber__c", |
| "HeadSize": "Headsize__c", |
| "Deliverysize": "Deliverysize__c", |
| "Phase": "Phase__c", |
| "Size": "Size__c", |
| "MRP": "MRP__c", |
| "Usebefore": "Usebefore__c", |
| "Height": "Height__c", |
| "MaximumDischarge Flow": "Maximumdischargeflow__c", |
| "DischargeRange": "Dischargeflow__c", |
| "Assembledby": "Manufacturer__c", |
| "Manufacturedate": "Manufacturedate__c", |
| "Companyname": "Companyname__c", |
| "Customercarenumber": "Customercarenumber__c", |
| "SellerAddress": "Selleraddress__c", |
| "Selleremail": "Selleremail__c", |
| "GSTIN": "GSTIN__c", |
| "Totalamount": "Totalamount__c", |
| "Paymentstatus": "Paymentstatus__c", |
| "Paymentmethod": "Paymentstatus__c", |
| "Invoicedate": "Manufacturedate__c", |
| "Warranty": "Warranty__c", |
| "Brand": "Brand__c", |
| "Motorhorsepower": "Motorhorsepower__c", |
| "Power": "Power__c", |
| "Motorphase": "Motorphase__c", |
| "Enginetype": "Enginetype__c", |
| "Tankcapacity": "Tankcapacity__c", |
| "Head": "Head__c", |
| "Usage/Application": "Usage_Application__c", |
| "Volts": "volts__c", |
| "Hertz": "Hertz__c", |
| "Frame": "frame__c", |
| "Mounting": "Mounting__c", |
| "Tollfreenumber": "Tollfreenumber__c", |
| "Pipesize": "Pipesize__c", |
| "Manufacturer": "Manufacturer__c", |
| "Office": "Office__c", |
| "SRnumber": "SRnumber__c", |
| "TypeOfEndUse": "TypeOfEndUse__c", |
| "Model Name": "Model_Name_Number__c", |
| "coolingmethod": "coolingmethod__c", |
| "H.P.": "H_p__c" |
| } |
|
|
| |
| PRODUCT_NAMES = [ |
| "Fusion", "Agroking", "CG commercial motors", "Jaguar", "Gaurav" |
| ] |
|
|
| |
| SALESFORCE_USERNAME = "Venkatramana@sandbox.com" |
| SALESFORCE_PASSWORD = "Seta12345@" |
| SALESFORCE_SECURITY_TOKEN = "Drl0jchCwLBfvX4ODMeFDksP" |
| domain='login' |
|
|
| |
| ocr = PaddleOCR(use_angle_cls=True, lang='en') |
|
|
| |
| def extract_text(image): |
| result = ocr.ocr(image) |
| extracted_text = [] |
| for line in result[0]: |
| extracted_text.append(line[1][0]) |
| return "\n".join(extracted_text) |
|
|
| |
| def match_product_name(extracted_text): |
| best_match = None |
| best_score = 0 |
|
|
| for line in extracted_text.split("\n"): |
| match, score = process.extractOne(line, PRODUCT_NAMES) |
| if score > best_score: |
| best_match = match |
| best_score = score |
|
|
| return best_match if best_score >= 70 else None |
|
|
| |
| def extract_attributes(extracted_text): |
| attributes = {} |
|
|
| for readable_attr, sf_attr in ATTRIBUTE_MAPPING.items(): |
| pattern = rf"{re.escape(readable_attr)}[:\-]?\s*(.+)" |
| match = re.search(pattern, extracted_text, re.IGNORECASE) |
| if match: |
| attributes[readable_attr] = match.group(1).strip() |
|
|
| return attributes |
|
|
| |
| def filter_valid_attributes(attributes, valid_fields): |
| return {ATTRIBUTE_MAPPING[key]: value for key, value in attributes.items() if ATTRIBUTE_MAPPING[key] in valid_fields} |
|
|
| |
| def interact_with_salesforce(mode, entry_type, quantity, attributes): |
| try: |
| sf = Salesforce( |
| username=SALESFORCE_USERNAME, |
| password=SALESFORCE_PASSWORD, |
| security_token=SALESFORCE_SECURITY_TOKEN |
| ) |
|
|
| object_name = None |
| field_name = None |
| field_names = [] |
| product_field_name = "Productname__c" |
| model_field_name = "Model__c" |
| stage_field_name = "Stage__c" |
| hp_field_name = "H_p__c" |
|
|
| if mode == "Entry": |
| if entry_type == "Sales": |
| object_name = "VENKATA_RAMANA_MOTORS__c" |
| field_name = "Quantity__c" |
| elif entry_type == "Non-Sales": |
| object_name = "UNBILLING_DATA__c" |
| field_name = "TotalQuantity__c" |
| elif mode == "Exit": |
| if entry_type == "Sales": |
| object_name = "Inventory_Management__c" |
| field_names = ["Quantity_Sold__c", "soldstock__c"] |
| elif entry_type == "Non-Sales": |
| object_name = "Un_Billable__c" |
| field_names = ["Sold_Out__c", "soldstock__c"] |
|
|
| if not object_name or (not field_name and not field_names): |
| return "Invalid mode or entry type." |
|
|
| sf_object = sf.__getattr__(object_name) |
| schema = sf_object.describe() |
| valid_fields = {field["name"] for field in schema["fields"]} |
|
|
| filtered_attributes = filter_valid_attributes(attributes, valid_fields) |
|
|
| if mode == "Exit": |
| query_conditions = [f"{product_field_name} = '{attributes['Product name']}'"] |
| if "Model Name" in attributes and attributes["Model Name"]: |
| query_conditions.append(f"{model_field_name} = '{attributes['Model Name']}'") |
| if "Stage" in attributes and attributes["Stage"]: |
| query_conditions.append(f"{stage_field_name} = '{attributes['Stage']}'") |
| if "H.P." in attributes and attributes["H.P."]: |
| query_conditions.append(f"{hp_field_name} = '{attributes['H.P.']}'") |
|
|
| query = f"SELECT Id, {', '.join(field_names)} FROM {object_name} WHERE {' AND '.join(query_conditions)} LIMIT 1" |
| response = sf.query(query) |
|
|
| if response["records"]: |
| record_id = response["records"][0]["Id"] |
| updated_fields = {field: quantity for field in field_names} |
| sf_object.update(record_id, updated_fields) |
| return f"β
Updated record for product '{attributes['Product name']}' in {object_name}. Updated fields: {updated_fields}." |
| else: |
| return f"β No matching record found for product '{attributes['Product name']}' in {object_name}." |
|
|
| else: |
| filtered_attributes[field_name] = quantity |
| sf_object.create(filtered_attributes) |
| return f"β
Data successfully exported to Salesforce object {object_name}." |
|
|
| except Exception as e: |
| return f"β Error interacting with Salesforce: {str(e)}" |
|
|
|
|
|
|
| |
| def export_to_salesforce(mode, entry_type, quantity, edited_df): |
| try: |
| |
| edited_attributes = dict(zip(edited_df["Attribute"], edited_df["Value"])) |
| |
| |
| message = interact_with_salesforce(mode, entry_type, quantity, edited_attributes) |
| return message |
| except Exception as e: |
| return f"β Error exporting to Salesforce: {str(e)}" |
|
|
| |
|
|
| |
| def process_image(image, mode, entry_type, quantity): |
| extracted_text = extract_text(image) |
| if not extracted_text: |
| |
| return "No text detected in the image.", pd.DataFrame(columns=["Attribute", "Value"]), "" |
|
|
| product_name = match_product_name(extracted_text) |
| attributes = extract_attributes(extracted_text) |
| if product_name: |
| attributes["Product name"] = product_name |
|
|
| |
| for fixed_attr in ["Stage", "H.P.", "Product name", "Model"]: |
| if fixed_attr not in attributes: |
| attributes[fixed_attr] = "" |
|
|
| |
| df = pd.DataFrame(list(attributes.items()), columns=["Attribute", "Value"]) |
| return f"Extracted Text:\n{extracted_text}", df, "" |
|
|
| |
| def pull_data_from_salesforce(data_type): |
| try: |
| sf = Salesforce( |
| username=SALESFORCE_USERNAME, |
| password=SALESFORCE_PASSWORD, |
| security_token=SALESFORCE_SECURITY_TOKEN |
| ) |
|
|
| if data_type == "Inventory": |
| query = "SELECT Productname__c,Model__c, H_p__c, Stage__c, Current_Stocks__c, soldstock__c FROM Inventory_Management__c LIMIT 100" |
| else: |
| query = "SELECT Productname__c, Model__c, H_p__c, Stage__c, Current_Stock__c, soldstock__c FROM Un_Billable__c LIMIT 100" |
|
|
| response = sf.query_all(query) |
| records = response.get("records", []) |
|
|
| if not records: |
| |
| empty_df = pd.DataFrame(columns=["Product Name", "Model", "H.P", "Stage", "Current Stocks", "Sold Stock"]) |
| return empty_df, "", None |
|
|
| df = pd.DataFrame(records) |
| df = df.drop(columns=['attributes'], errors='ignore') |
|
|
| |
| df.rename(columns={ |
| "Productname__c": "Product Name", |
| "Model__c": "Model", |
| "H_p__c": "H.P", |
| "Stage__c": "Stage", |
| "Current_Stocks__c": "Current Stocks", |
| "Current_Stock__c": "Current Stocks", |
| "soldstock__c": "Sold Stock" |
| }, inplace=True) |
|
|
| excel_path = "salesforce_data.xlsx" |
| df.to_excel(excel_path, index=False) |
|
|
| |
| fig, ax = plt.subplots(figsize=(12, 8)) |
| df.plot(kind='bar', x="Product Name", y="Current Stocks", ax=ax, legend=False) |
| ax.set_title("Stock Distribution by Product Name") |
| ax.set_xlabel("Product Name") |
| ax.set_ylabel("Current Stocks") |
| plt.xticks(rotation=45, ha="right", fontsize=10) |
| plt.tight_layout() |
| buffer = BytesIO() |
| plt.savefig(buffer, format="png") |
| buffer.seek(0) |
| img = Image.open(buffer) |
|
|
| return df, excel_path, img |
|
|
| except Exception as e: |
| |
| empty_df = pd.DataFrame(columns=["Product Name", "Model", "H.P", "Stage", "Current Stocks", "Sold Stock"]) |
| return empty_df, "", None |
|
|
| |
| |
|
|
| |
| |
| fig, ax = plt.subplots(figsize=(12, 8)) |
| df.plot(kind='bar', x="Product Name", y="Current Stocks", ax=ax, legend=False) |
| ax.set_title("Stock Distribution by Product Name") |
| ax.set_xlabel("Product Name") |
| ax.set_ylabel("Current Stocks") |
| plt.xticks(rotation=45, ha="right", fontsize=10) |
| plt.tight_layout() |
| buffer = BytesIO() |
| plt.savefig(buffer, format="png") |
| buffer.seek(0) |
| img = Image.open(buffer) |
| |
| return df, excel_path, img |
| except Exception as e: |
| return f"Error fetching data: {str(e)}", None, None, None |
|
|
| |
| def app(): |
| with gr.Blocks() as demo: |
| with gr.Tab("π₯ OCR Processing"): |
| with gr.Row(): |
| image_input = gr.Image(type="numpy", label="π Upload Image") |
| mode_input = gr.Dropdown(label="π Mode", choices=["Entry", "Exit"], value="Entry") |
| entry_type_input = gr.Radio(label="π¦ Entry Type", choices=["Sales", "Non-Sales"], value="Sales") |
| quantity_input = gr.Number(label="π’ Quantity", value=1, interactive=True) |
| extract_button = gr.Button("Extract Text and Attributes") |
| extracted_text_output = gr.Text(label="π Extracted Image Data") |
| editable_df_output = gr.Dataframe(label="βοΈ Edit Attributes (Key-Value Pairs)", interactive=True) |
| ok_button = gr.Button("OK") |
| result_output = gr.Text(label="π Result") |
|
|
| with gr.Tab("π Salesforce Data"): |
| data_type_input = gr.Dropdown(label="Select Data Type", choices=["Inventory", "Unbilling"], value="Inventory") |
| pull_button = gr.Button("Pull Data from Salesforce") |
| salesforce_data_output = gr.Dataframe(label="π Salesforce Data") |
| excel_download_output = gr.File(label="π₯ Download Excel") |
| graph_output = gr.Image(label="π Stock Distribution Graph") |
|
|
| |
| extract_button.click( |
| fn=process_image, |
| inputs=[image_input, mode_input, entry_type_input, quantity_input], |
| outputs=[extracted_text_output, editable_df_output, result_output] |
| ) |
| ok_button.click( |
| fn=export_to_salesforce, |
| inputs=[mode_input, entry_type_input, quantity_input, editable_df_output], |
| outputs=[result_output] |
| ) |
| pull_button.click( |
| fn=pull_data_from_salesforce, |
| inputs=[data_type_input], |
| outputs=[salesforce_data_output, excel_download_output, graph_output] |
| ) |
|
|
| return demo |
|
|
| if __name__ == "__main__": |
| app().launch(share=True) |