setaprojectv / app.py
gopichandra's picture
Update app.py
0dec35e verified
import os
from paddleocr import PaddleOCR
from PIL import Image
import gradio as gr
import requests
import re
from simple_salesforce import Salesforce
import pandas as pd
import matplotlib.pyplot as plt
from io import BytesIO
from fuzzywuzzy import process
import kaleido
# Attribute mappings: readable names to Salesforce API names
ATTRIBUTE_MAPPING = {
"Product name": "Productname__c",
"Colour": "Colour__c",
"Motortype": "Motortype__c",
"Frequency": "Frequency__c",
"Grossweight": "Grossweight__c",
"Ratio": "Ratio__c",
"MotorFrame": "Motorframe__c",
"Model": "Model__c",
"Speed": "Speed__c",
"Quantity": "Quantity__c",
"Voltage": "Voltage__c",
"Material": "Material__c",
"Type": "Type__c",
"Horsepower": "Horsepower__c",
"Consignee": "Consignee__c",
"LOT": "LOT__c",
"Stage": "Stage__c",
"Outlet": "Outlet__c",
"Serialnumber": "Serialnumber__c",
"HeadSize": "Headsize__c",
"Deliverysize": "Deliverysize__c",
"Phase": "Phase__c",
"Size": "Size__c",
"MRP": "MRP__c",
"Usebefore": "Usebefore__c",
"Height": "Height__c",
"MaximumDischarge Flow": "Maximumdischargeflow__c",
"DischargeRange": "Dischargeflow__c",
"Assembledby": "Manufacturer__c",
"Manufacturedate": "Manufacturedate__c",
"Companyname": "Companyname__c",
"Customercarenumber": "Customercarenumber__c",
"SellerAddress": "Selleraddress__c",
"Selleremail": "Selleremail__c",
"GSTIN": "GSTIN__c",
"Totalamount": "Totalamount__c",
"Paymentstatus": "Paymentstatus__c",
"Paymentmethod": "Paymentstatus__c",
"Invoicedate": "Manufacturedate__c",
"Warranty": "Warranty__c",
"Brand": "Brand__c",
"Motorhorsepower": "Motorhorsepower__c",
"Power": "Power__c",
"Motorphase": "Motorphase__c",
"Enginetype": "Enginetype__c",
"Tankcapacity": "Tankcapacity__c",
"Head": "Head__c",
"Usage/Application": "Usage_Application__c",
"Volts": "volts__c",
"Hertz": "Hertz__c",
"Frame": "frame__c",
"Mounting": "Mounting__c",
"Tollfreenumber": "Tollfreenumber__c",
"Pipesize": "Pipesize__c",
"Manufacturer": "Manufacturer__c",
"Office": "Office__c",
"SRnumber": "SRnumber__c",
"TypeOfEndUse": "TypeOfEndUse__c",
"Model Name": "Model_Name_Number__c",
"coolingmethod": "coolingmethod__c",
"H.P.": "H_p__c"
}
# List of product names to match
PRODUCT_NAMES = [
"Fusion", "Agroking", "CG commercial motors", "Jaguar", "Gaurav"
]
# Salesforce credentials
SALESFORCE_USERNAME = "venkatramana@sandbox.com"
SALESFORCE_PASSWORD = "Seta12345@"
SALESFORCE_SECURITY_TOKEN = "Drl0jchCwLBfvX4ODMeFDksP"
# Initialize PaddleOCR
ocr = PaddleOCR(use_angle_cls=True, lang='en')
# Function to extract text using PaddleOCR
def extract_text(image):
result = ocr.ocr(image)
extracted_text = []
for line in result[0]:
extracted_text.append(line[1][0])
return "\n".join(extracted_text)
# Function to match product name using fuzzy matching
def match_product_name(extracted_text):
best_match = None
best_score = 0
for line in extracted_text.split("\n"):
match, score = process.extractOne(line, PRODUCT_NAMES)
if score > best_score:
best_match = match
best_score = score
return best_match if best_score >= 70 else None
# Function to extract attributes and their values
def extract_attributes(extracted_text):
attributes = {}
for readable_attr, sf_attr in ATTRIBUTE_MAPPING.items():
pattern = rf"{re.escape(readable_attr)}[:\-]?\s*(.+)"
match = re.search(pattern, extracted_text, re.IGNORECASE)
if match:
attributes[readable_attr] = match.group(1).strip()
return attributes
# Function to filter attributes for valid Salesforce fields
def filter_valid_attributes(attributes, valid_fields):
return {ATTRIBUTE_MAPPING[key]: value for key, value in attributes.items() if ATTRIBUTE_MAPPING[key] in valid_fields}
# Function to interact with Salesforce based on mode and type
def interact_with_salesforce(mode, entry_type, quantity, attributes):
try:
sf = Salesforce(
username=SALESFORCE_USERNAME,
password=SALESFORCE_PASSWORD,
security_token=SALESFORCE_SECURITY_TOKEN
)
object_name = None
field_name = None
field_names = []
product_field_name = "Productname__c"
model_field_name = "Model__c"
stage_field_name = "Stage__c"
hp_field_name = "H_p__c"
if mode == "Entry":
if entry_type == "Sales":
object_name = "VENKATA_RAMANA_MOTORS__c"
field_name = "Quantity__c"
elif entry_type == "Non-Sales":
object_name = "UNBILLING_DATA__c"
field_name = "TotalQuantity__c"
elif mode == "Exit":
if entry_type == "Sales":
object_name = "Inventory_Management__c"
field_names = ["Quantity_Sold__c", "soldstock__c"]
elif entry_type == "Non-Sales":
object_name = "Un_Billable__c"
field_names = ["Sold_Out__c", "soldstock__c"]
if not object_name or (not field_name and not field_names):
return "Invalid mode or entry type."
sf_object = sf.__getattr__(object_name)
schema = sf_object.describe()
valid_fields = {field["name"] for field in schema["fields"]}
filtered_attributes = filter_valid_attributes(attributes, valid_fields)
if mode == "Exit":
query_conditions = [f"{product_field_name} = '{attributes['Product name']}'"]
if "Model Name" in attributes and attributes["Model Name"]:
query_conditions.append(f"{model_field_name} = '{attributes['Model Name']}'")
if "Stage" in attributes and attributes["Stage"] != "":
query_conditions.append(f"{stage_field_name} = '{attributes['Stage']}'")
if "H.P." in attributes and attributes["H.P."]:
query_conditions.append(f"{hp_field_name} = '{attributes['H.P.']}'")
query = f"SELECT Id, {', '.join(field_names)} FROM {object_name} WHERE {' AND '.join(query_conditions)} LIMIT 1"
response = sf.query(query)
if response["records"]:
record_id = response["records"][0]["Id"]
updated_fields = {field: quantity for field in field_names}
sf_object.update(record_id, updated_fields)
return f"βœ… Updated record for product '{attributes['Product name']}' in {object_name}. Updated fields: {updated_fields}."
else:
return f"❌ No matching record found for product '{attributes['Product name']}' in {object_name}."
else:
filtered_attributes[field_name] = quantity
sf_object.create(filtered_attributes)
return f"βœ… Data successfully exported to Salesforce object {object_name}."
except Exception as e:
return f"❌ Error interacting with Salesforce: {str(e)}"
# Function to process image, extract attributes, and allow editing
def process_image(image, mode, entry_type, quantity):
extracted_text = extract_text(image)
if not extracted_text:
return "No text detected in the image.", None, None
product_name = match_product_name(extracted_text)
attributes = extract_attributes(extracted_text)
if product_name:
attributes["Product name"] = product_name
# Ensure fixed attributes are present
for fixed_attr in ["Stage", "H.P.", "Product name", "Model"]:
if fixed_attr not in attributes:
attributes[fixed_attr] = ""
# Convert attributes to DataFrame for editing
df = pd.DataFrame(list(attributes.items()), columns=["Attribute", "Value"])
return f"Extracted Text:\n{extracted_text}", df, None
# Function to handle edited attributes and export to Salesforce
def export_to_salesforce(mode, entry_type, quantity, edited_df):
try:
# Convert edited DataFrame back to dictionary
edited_attributes = dict(zip(edited_df["Attribute"], edited_df["Value"]))
# Export to Salesforce
message = interact_with_salesforce(mode, entry_type, quantity, edited_attributes)
# Fetch the price from Inventory_Management__c based on attributes
try:
sf = Salesforce(
username=SALESFORCE_USERNAME,
password=SALESFORCE_PASSWORD,
security_token=SALESFORCE_SECURITY_TOKEN
)
product_name = edited_attributes.get("Product name", "")
model_name = edited_attributes.get("Model Name", "")
stage = edited_attributes.get("Stage", "")
# Build the query
query_conditions = []
if product_name:
query_conditions.append(f"Productname__c = '{product_name}'")
if model_name:
query_conditions.append(f"Model__c = '{model_name}'")
if stage:
query_conditions.append(f"Stage__c = '{stage}'")
if query_conditions:
query = f"SELECT Price__c FROM Inventory_Management__c WHERE {' AND '.join(query_conditions)} LIMIT 1"
response = sf.query(query)
if response["records"]:
price = response["records"][0].get("Price__c", None)
if price:
price_message = f"The estimated price for the {product_name} with {model_name} at {stage} is β‚Ή{price:,}."
return f"{message}\n\n{price_message}"
else:
return f"{message}\n\nPrice information not available for the specified product."
else:
return f"{message}\n\nNo matching record found for the specified product."
else:
return f"{message}\n\nInsufficient data to fetch price information."
except Exception as e:
return f"{message}\n\nError fetching price information: {str(e)}"
except Exception as e:
return f"❌ Error exporting to Salesforce: {str(e)}"
import pytz
# Function to pull structured data from Salesforce and display as a table
def pull_data_from_salesforce(data_type):
try:
sf = Salesforce(
username=SALESFORCE_USERNAME,
password=SALESFORCE_PASSWORD,
security_token=SALESFORCE_SECURITY_TOKEN
)
if data_type == "Inventory":
query = "SELECT Productname__c, Model__c, H_p__c, Stage__c, Current_Stocks__c, soldstock__c,Last_Modified_Date__c FROM Inventory_Management__c LIMIT 100"
else:
query = "SELECT Productname__c, Model__c, H_p__c, Stage__c, Current_Stock__c, soldstock__c, Last_Modified_Date__c FROM Un_Billable__c LIMIT 100"
response = sf.query_all(query)
records = response.get("records", [])
if not records:
return "No data found in Salesforce.", None, None, None
df = pd.DataFrame(records)
df = df.drop(columns=['attributes'], errors='ignore')
# Format the Last_Modified_Date__c field to show only the date
if "Last_Modified_Date__c" in df.columns:
df["Last_Modified_Date__c"] = pd.to_datetime(df["Last_Modified_Date__c"]).dt.date
# Rename columns for better readability
df.rename(columns={
"Productname__c": "Product Name",
"Model__c": "Model",
"H_p__c": "H.P",
"Stage__c": "Stage",
"Current_Stocks__c": "Current Stocks",
"Current_Stock__c": "Current Stocks",
"soldstock__c": "Sold Stock",
"Last_Modified_Date__c": "Last Modified Date"
}, inplace=True)
excel_path = "salesforce_data.xlsx"
df.to_excel(excel_path, index=False)
# Generate interactive vertical bar graph using Matplotlib
fig, ax = plt.subplots(figsize=(12, 8))
df.plot(kind='bar', x="Product Name", y="Current Stocks", ax=ax, legend=False)
ax.set_title("Stock Distribution by Product Name")
ax.set_xlabel("Product Name")
ax.set_ylabel("Current Stocks")
plt.xticks(rotation=45, ha="right", fontsize=10)
plt.tight_layout()
buffer = BytesIO()
plt.savefig(buffer, format="png")
buffer.seek(0)
img = Image.open(buffer)
return df, excel_path, img
except Exception as e:
return f"Error fetching data: {str(e)}", None, None, None
# Gradio Interface
def app():
with gr.Blocks() as demo:
with gr.Tab("πŸ“₯ OCR Processing"):
with gr.Row():
image_input = gr.Image(type="numpy", label="πŸ“„ Upload Image")
mode_input = gr.Dropdown(label="πŸ“Œ Mode", choices=["Entry", "Exit"], value="Entry")
entry_type_input = gr.Radio(label="πŸ“¦ Entry Type", choices=["Sales", "Non-Sales"], value="Sales")
quantity_input = gr.Number(label="πŸ”’ Quantity", value=1, interactive=True)
extract_button = gr.Button("Extract Text and Attributes")
extracted_text_output = gr.Text(label="πŸ“ Extracted Image Data")
editable_df_output = gr.Dataframe(label="✏️ Edit Attributes (Key-Value Pairs)", interactive=True)
ok_button = gr.Button("OK")
result_output = gr.Text(label="πŸš€ Result")
with gr.Tab("πŸ“Š Salesforce Data"):
data_type_input = gr.Dropdown(label="Select Data Type", choices=["Inventory", "Unbilling"], value="Inventory")
pull_button = gr.Button("Pull Data from Salesforce")
salesforce_data_output = gr.Dataframe(label="πŸ“Š Salesforce Data")
excel_download_output = gr.File(label="πŸ“₯ Download Excel")
graph_output = gr.Image(label="πŸ“ˆ Stock Distribution Graph")
# Define button actions
extract_button.click(
fn=process_image,
inputs=[image_input, mode_input, entry_type_input, quantity_input],
outputs=[extracted_text_output, editable_df_output, result_output]
)
ok_button.click(
fn=export_to_salesforce,
inputs=[mode_input, entry_type_input, quantity_input, editable_df_output],
outputs=[result_output]
)
pull_button.click(
fn=pull_data_from_salesforce,
inputs=[data_type_input],
outputs=[salesforce_data_output, excel_download_output, graph_output]
)
return demo
if __name__ == "__main__":
app().launch(share=True)