Spaces:
Build error
Build error
| import os | |
| from paddleocr import PaddleOCR | |
| from PIL import Image | |
| import gradio as gr | |
| import requests | |
| import re | |
| from simple_salesforce import Salesforce | |
| import pandas as pd | |
| import matplotlib.pyplot as plt | |
| from io import BytesIO | |
| from fuzzywuzzy import process | |
| import kaleido | |
| # Attribute mappings: readable names to Salesforce API names | |
| ATTRIBUTE_MAPPING = { | |
| "Product name": "Productname__c", | |
| "Colour": "Colour__c", | |
| "Motortype": "Motortype__c", | |
| "Frequency": "Frequency__c", | |
| "Grossweight": "Grossweight__c", | |
| "Ratio": "Ratio__c", | |
| "MotorFrame": "Motorframe__c", | |
| "Model": "Model__c", | |
| "Speed": "Speed__c", | |
| "Quantity": "Quantity__c", | |
| "Voltage": "Voltage__c", | |
| "Material": "Material__c", | |
| "Type": "Type__c", | |
| "Horsepower": "Horsepower__c", | |
| "Consignee": "Consignee__c", | |
| "LOT": "LOT__c", | |
| "Stage": "Stage__c", | |
| "Outlet": "Outlet__c", | |
| "Serialnumber": "Serialnumber__c", | |
| "HeadSize": "Headsize__c", | |
| "Deliverysize": "Deliverysize__c", | |
| "Phase": "Phase__c", | |
| "Size": "Size__c", | |
| "MRP": "MRP__c", | |
| "Usebefore": "Usebefore__c", | |
| "Height": "Height__c", | |
| "MaximumDischarge Flow": "Maximumdischargeflow__c", | |
| "DischargeRange": "Dischargeflow__c", | |
| "Assembledby": "Manufacturer__c", | |
| "Manufacturedate": "Manufacturedate__c", | |
| "Companyname": "Companyname__c", | |
| "Customercarenumber": "Customercarenumber__c", | |
| "SellerAddress": "Selleraddress__c", | |
| "Selleremail": "Selleremail__c", | |
| "GSTIN": "GSTIN__c", | |
| "Totalamount": "Totalamount__c", | |
| "Paymentstatus": "Paymentstatus__c", | |
| "Paymentmethod": "Paymentstatus__c", | |
| "Invoicedate": "Manufacturedate__c", | |
| "Warranty": "Warranty__c", | |
| "Brand": "Brand__c", | |
| "Motorhorsepower": "Motorhorsepower__c", | |
| "Power": "Power__c", | |
| "Motorphase": "Motorphase__c", | |
| "Enginetype": "Enginetype__c", | |
| "Tankcapacity": "Tankcapacity__c", | |
| "Head": "Head__c", | |
| "Usage/Application": "Usage_Application__c", | |
| "Volts": "volts__c", | |
| "Hertz": "Hertz__c", | |
| "Frame": "frame__c", | |
| "Mounting": "Mounting__c", | |
| "Tollfreenumber": "Tollfreenumber__c", | |
| "Pipesize": "Pipesize__c", | |
| "Manufacturer": "Manufacturer__c", | |
| "Office": "Office__c", | |
| "SRnumber": "SRnumber__c", | |
| "TypeOfEndUse": "TypeOfEndUse__c", | |
| "Model Name": "Model_Name_Number__c", | |
| "coolingmethod": "coolingmethod__c", | |
| "H.P.": "H_p__c" | |
| } | |
| # List of product names to match | |
| PRODUCT_NAMES = [ | |
| "Fusion", "Agroking", "CG commercial motors", "Jaguar", "Gaurav" | |
| ] | |
| # Salesforce credentials | |
| SALESFORCE_USERNAME = "venkatramana@sandbox.com" | |
| SALESFORCE_PASSWORD = "Seta12345@" | |
| SALESFORCE_SECURITY_TOKEN = "Drl0jchCwLBfvX4ODMeFDksP" | |
| # Initialize PaddleOCR | |
| ocr = PaddleOCR(use_angle_cls=True, lang='en') | |
| # Function to extract text using PaddleOCR | |
| def extract_text(image): | |
| result = ocr.ocr(image) | |
| extracted_text = [] | |
| for line in result[0]: | |
| extracted_text.append(line[1][0]) | |
| return "\n".join(extracted_text) | |
| # Function to match product name using fuzzy matching | |
| def match_product_name(extracted_text): | |
| best_match = None | |
| best_score = 0 | |
| for line in extracted_text.split("\n"): | |
| match, score = process.extractOne(line, PRODUCT_NAMES) | |
| if score > best_score: | |
| best_match = match | |
| best_score = score | |
| return best_match if best_score >= 70 else None | |
| # Function to extract attributes and their values | |
| def extract_attributes(extracted_text): | |
| attributes = {} | |
| for readable_attr, sf_attr in ATTRIBUTE_MAPPING.items(): | |
| pattern = rf"{re.escape(readable_attr)}[:\-]?\s*(.+)" | |
| match = re.search(pattern, extracted_text, re.IGNORECASE) | |
| if match: | |
| attributes[readable_attr] = match.group(1).strip() | |
| return attributes | |
| # Function to filter attributes for valid Salesforce fields | |
| def filter_valid_attributes(attributes, valid_fields): | |
| return {ATTRIBUTE_MAPPING[key]: value for key, value in attributes.items() if ATTRIBUTE_MAPPING[key] in valid_fields} | |
| # Function to interact with Salesforce based on mode and type | |
| def interact_with_salesforce(mode, entry_type, quantity, attributes): | |
| try: | |
| sf = Salesforce( | |
| username=SALESFORCE_USERNAME, | |
| password=SALESFORCE_PASSWORD, | |
| security_token=SALESFORCE_SECURITY_TOKEN | |
| ) | |
| object_name = None | |
| field_name = None | |
| field_names = [] | |
| product_field_name = "Productname__c" | |
| model_field_name = "Model__c" | |
| stage_field_name = "Stage__c" | |
| hp_field_name = "H_p__c" | |
| if mode == "Entry": | |
| if entry_type == "Sales": | |
| object_name = "VENKATA_RAMANA_MOTORS__c" | |
| field_name = "Quantity__c" | |
| elif entry_type == "Non-Sales": | |
| object_name = "UNBILLING_DATA__c" | |
| field_name = "TotalQuantity__c" | |
| elif mode == "Exit": | |
| if entry_type == "Sales": | |
| object_name = "Inventory_Management__c" | |
| field_names = ["Quantity_Sold__c", "soldstock__c"] | |
| elif entry_type == "Non-Sales": | |
| object_name = "Un_Billable__c" | |
| field_names = ["Sold_Out__c", "soldstock__c"] | |
| if not object_name or (not field_name and not field_names): | |
| return "Invalid mode or entry type." | |
| sf_object = sf.__getattr__(object_name) | |
| schema = sf_object.describe() | |
| valid_fields = {field["name"] for field in schema["fields"]} | |
| filtered_attributes = filter_valid_attributes(attributes, valid_fields) | |
| if mode == "Exit": | |
| query_conditions = [f"{product_field_name} = '{attributes['Product name']}'"] | |
| if "Model Name" in attributes and attributes["Model Name"]: | |
| query_conditions.append(f"{model_field_name} = '{attributes['Model Name']}'") | |
| if "Stage" in attributes and attributes["Stage"] != "": | |
| query_conditions.append(f"{stage_field_name} = '{attributes['Stage']}'") | |
| if "H.P." in attributes and attributes["H.P."]: | |
| query_conditions.append(f"{hp_field_name} = '{attributes['H.P.']}'") | |
| query = f"SELECT Id, {', '.join(field_names)} FROM {object_name} WHERE {' AND '.join(query_conditions)} LIMIT 1" | |
| response = sf.query(query) | |
| if response["records"]: | |
| record_id = response["records"][0]["Id"] | |
| updated_fields = {field: quantity for field in field_names} | |
| sf_object.update(record_id, updated_fields) | |
| return f"β Updated record for product '{attributes['Product name']}' in {object_name}. Updated fields: {updated_fields}." | |
| else: | |
| return f"β No matching record found for product '{attributes['Product name']}' in {object_name}." | |
| else: | |
| filtered_attributes[field_name] = quantity | |
| sf_object.create(filtered_attributes) | |
| return f"β Data successfully exported to Salesforce object {object_name}." | |
| except Exception as e: | |
| return f"β Error interacting with Salesforce: {str(e)}" | |
| # Function to process image, extract attributes, and allow editing | |
| def process_image(image, mode, entry_type, quantity): | |
| extracted_text = extract_text(image) | |
| if not extracted_text: | |
| return "No text detected in the image.", None, None | |
| product_name = match_product_name(extracted_text) | |
| attributes = extract_attributes(extracted_text) | |
| if product_name: | |
| attributes["Product name"] = product_name | |
| # Ensure fixed attributes are present | |
| for fixed_attr in ["Stage", "H.P.", "Product name", "Model"]: | |
| if fixed_attr not in attributes: | |
| attributes[fixed_attr] = "" | |
| # Convert attributes to DataFrame for editing | |
| df = pd.DataFrame(list(attributes.items()), columns=["Attribute", "Value"]) | |
| return f"Extracted Text:\n{extracted_text}", df, None | |
| # Function to handle edited attributes and export to Salesforce | |
| def export_to_salesforce(mode, entry_type, quantity, edited_df): | |
| try: | |
| # Convert edited DataFrame back to dictionary | |
| edited_attributes = dict(zip(edited_df["Attribute"], edited_df["Value"])) | |
| # Export to Salesforce | |
| message = interact_with_salesforce(mode, entry_type, quantity, edited_attributes) | |
| # Fetch the price from Inventory_Management__c based on attributes | |
| try: | |
| sf = Salesforce( | |
| username=SALESFORCE_USERNAME, | |
| password=SALESFORCE_PASSWORD, | |
| security_token=SALESFORCE_SECURITY_TOKEN | |
| ) | |
| product_name = edited_attributes.get("Product name", "") | |
| model_name = edited_attributes.get("Model Name", "") | |
| stage = edited_attributes.get("Stage", "") | |
| # Build the query | |
| query_conditions = [] | |
| if product_name: | |
| query_conditions.append(f"Productname__c = '{product_name}'") | |
| if model_name: | |
| query_conditions.append(f"Model__c = '{model_name}'") | |
| if stage: | |
| query_conditions.append(f"Stage__c = '{stage}'") | |
| if query_conditions: | |
| query = f"SELECT Price__c FROM Inventory_Management__c WHERE {' AND '.join(query_conditions)} LIMIT 1" | |
| response = sf.query(query) | |
| if response["records"]: | |
| price = response["records"][0].get("Price__c", None) | |
| if price: | |
| price_message = f"The estimated price for the {product_name} with {model_name} at {stage} is βΉ{price:,}." | |
| return f"{message}\n\n{price_message}" | |
| else: | |
| return f"{message}\n\nPrice information not available for the specified product." | |
| else: | |
| return f"{message}\n\nNo matching record found for the specified product." | |
| else: | |
| return f"{message}\n\nInsufficient data to fetch price information." | |
| except Exception as e: | |
| return f"{message}\n\nError fetching price information: {str(e)}" | |
| except Exception as e: | |
| return f"β Error exporting to Salesforce: {str(e)}" | |
| import pytz | |
| # Function to pull structured data from Salesforce and display as a table | |
| def pull_data_from_salesforce(data_type): | |
| try: | |
| sf = Salesforce( | |
| username=SALESFORCE_USERNAME, | |
| password=SALESFORCE_PASSWORD, | |
| security_token=SALESFORCE_SECURITY_TOKEN | |
| ) | |
| if data_type == "Inventory": | |
| query = "SELECT Productname__c, Model__c, H_p__c, Stage__c, Current_Stocks__c, soldstock__c,Last_Modified_Date__c FROM Inventory_Management__c LIMIT 100" | |
| else: | |
| query = "SELECT Productname__c, Model__c, H_p__c, Stage__c, Current_Stock__c, soldstock__c, Last_Modified_Date__c FROM Un_Billable__c LIMIT 100" | |
| response = sf.query_all(query) | |
| records = response.get("records", []) | |
| if not records: | |
| return "No data found in Salesforce.", None, None, None | |
| df = pd.DataFrame(records) | |
| df = df.drop(columns=['attributes'], errors='ignore') | |
| # Format the Last_Modified_Date__c field to show only the date | |
| if "Last_Modified_Date__c" in df.columns: | |
| df["Last_Modified_Date__c"] = pd.to_datetime(df["Last_Modified_Date__c"]).dt.date | |
| # Rename columns for better readability | |
| df.rename(columns={ | |
| "Productname__c": "Product Name", | |
| "Model__c": "Model", | |
| "H_p__c": "H.P", | |
| "Stage__c": "Stage", | |
| "Current_Stocks__c": "Current Stocks", | |
| "Current_Stock__c": "Current Stocks", | |
| "soldstock__c": "Sold Stock", | |
| "Last_Modified_Date__c": "Last Modified Date" | |
| }, inplace=True) | |
| excel_path = "salesforce_data.xlsx" | |
| df.to_excel(excel_path, index=False) | |
| # Generate interactive vertical bar graph using Matplotlib | |
| fig, ax = plt.subplots(figsize=(12, 8)) | |
| df.plot(kind='bar', x="Product Name", y="Current Stocks", ax=ax, legend=False) | |
| ax.set_title("Stock Distribution by Product Name") | |
| ax.set_xlabel("Product Name") | |
| ax.set_ylabel("Current Stocks") | |
| plt.xticks(rotation=45, ha="right", fontsize=10) | |
| plt.tight_layout() | |
| buffer = BytesIO() | |
| plt.savefig(buffer, format="png") | |
| buffer.seek(0) | |
| img = Image.open(buffer) | |
| return df, excel_path, img | |
| except Exception as e: | |
| return f"Error fetching data: {str(e)}", None, None, None | |
| # Gradio Interface | |
| def app(): | |
| with gr.Blocks() as demo: | |
| with gr.Tab("π₯ OCR Processing"): | |
| with gr.Row(): | |
| image_input = gr.Image(type="numpy", label="π Upload Image") | |
| mode_input = gr.Dropdown(label="π Mode", choices=["Entry", "Exit"], value="Entry") | |
| entry_type_input = gr.Radio(label="π¦ Entry Type", choices=["Sales", "Non-Sales"], value="Sales") | |
| quantity_input = gr.Number(label="π’ Quantity", value=1, interactive=True) | |
| extract_button = gr.Button("Extract Text and Attributes") | |
| extracted_text_output = gr.Text(label="π Extracted Image Data") | |
| editable_df_output = gr.Dataframe(label="βοΈ Edit Attributes (Key-Value Pairs)", interactive=True) | |
| ok_button = gr.Button("OK") | |
| result_output = gr.Text(label="π Result") | |
| with gr.Tab("π Salesforce Data"): | |
| data_type_input = gr.Dropdown(label="Select Data Type", choices=["Inventory", "Unbilling"], value="Inventory") | |
| pull_button = gr.Button("Pull Data from Salesforce") | |
| salesforce_data_output = gr.Dataframe(label="π Salesforce Data") | |
| excel_download_output = gr.File(label="π₯ Download Excel") | |
| graph_output = gr.Image(label="π Stock Distribution Graph") | |
| # Define button actions | |
| extract_button.click( | |
| fn=process_image, | |
| inputs=[image_input, mode_input, entry_type_input, quantity_input], | |
| outputs=[extracted_text_output, editable_df_output, result_output] | |
| ) | |
| ok_button.click( | |
| fn=export_to_salesforce, | |
| inputs=[mode_input, entry_type_input, quantity_input, editable_df_output], | |
| outputs=[result_output] | |
| ) | |
| pull_button.click( | |
| fn=pull_data_from_salesforce, | |
| inputs=[data_type_input], | |
| outputs=[salesforce_data_output, excel_download_output, graph_output] | |
| ) | |
| return demo | |
| if __name__ == "__main__": | |
| app().launch(share=True) |