SETA / app.py
gopichandra's picture
Update app.py
a795d87 verified
import os
from paddleocr import PaddleOCR
from PIL import Image
import gradio as gr
import requests
import re
from simple_salesforce import Salesforce
import pandas as pd
import matplotlib.pyplot as plt
from io import BytesIO
from fuzzywuzzy import process
import kaleido # Ensure kaleido is imported
# Attribute mappings: readable names to Salesforce API names
ATTRIBUTE_MAPPING = {
"Product name": "Productname__c",
"Colour": "Colour__c",
"Frequency": "Frequency__c",
"Model": "Model__c",
"Speed": "Speed__c",
"Quantity": "Quantity__c",
"Voltage": "Voltage__c",
"Type": "Type__c",
"Stage": "Stage__c",
"Outlet": "Outlet__c",
"Phase": "Phase__c",
"H.P.": "H_p__c"
}
# List of product names to match
PRODUCT_NAMES = [
"Fusion", "Agroking", "openwell", "CG commercial motors", "Jaguar", "Submersible pumps", "Gaurav"
]
# Salesforce credentials
SALESFORCE_USERNAME = "venkatramana@sandbox.com"
SALESFORCE_PASSWORD = "Venkat12345@"
SALESFORCE_SECURITY_TOKEN = "GhcJJmjBEefdnukJoz4CAQlR"
# Initialize PaddleOCR
ocr = PaddleOCR(use_angle_cls=True, lang='en')
# Function to extract text using PaddleOCR
def extract_text(image):
result = ocr.ocr(image)
extracted_text = []
for line in result[0]:
extracted_text.append(line[1][0])
return "\n".join(extracted_text)
# Function to match product name using fuzzy matching
def match_product_name(extracted_text):
best_match = None
best_score = 0
for line in extracted_text.split("\n"):
match, score = process.extractOne(line, PRODUCT_NAMES)
if score > best_score:
best_match = match
best_score = score
return best_match if best_score >= 70 else None # Threshold of 70 for a match
# Function to extract attributes and their values
def extract_attributes(extracted_text):
attributes = {}
for readable_attr, sf_attr in ATTRIBUTE_MAPPING.items():
pattern = rf"{re.escape(readable_attr)}[:\-]?\s*(.+)"
match = re.search(pattern, extracted_text, re.IGNORECASE)
if match:
attributes[readable_attr] = match.group(1).strip()
return attributes
# Function to filter attributes for valid Salesforce fields
def filter_valid_attributes(attributes, valid_fields):
return {ATTRIBUTE_MAPPING[key]: value for key, value in attributes.items() if ATTRIBUTE_MAPPING[key] in valid_fields}
#πŸ“Š Function to interact with Salesforce based on mode and type
def interact_with_salesforce(mode, entry_type, quantity, extracted_text):
try:
sf = Salesforce(
username=SALESFORCE_USERNAME,
password=SALESFORCE_PASSWORD,
security_token=SALESFORCE_SECURITY_TOKEN
)
# Mapping mode and entry_type to Salesforce object and field
object_name = None
field_name = None
field_names = []
product_field_name = "Productname__c"
model_field_name = "Model__c"
stage_field_name = "Stage__c"
hp_field_name = "H_p__c"
if mode == "Entry":
if entry_type == "Sales":
object_name = "VENKATA_RAMANA_MOTORS__c"
field_name = "Quantity__c"
elif entry_type == "Non-Sales":
object_name = "UNBILLING_DATA__c"
field_name = "TotalQuantity__c"
elif mode == "Exit":
if entry_type == "Sales":
object_name = "Inventory_Management__c"
field_names = ["Quantity_Sold__c", "soldstock__c"]
elif entry_type == "Non-Sales":
object_name = "Un_Billable__c"
field_names = ["Sold_Out__c", "soldstock__c"]
if not object_name or (not field_name and not field_names):
return "Invalid mode or entry type."
# Get valid fields for the specified Salesforce object
sf_object = sf.__getattr__(object_name)
schema = sf_object.describe()
valid_fields = {field["name"] for field in schema["fields"]}
# Extract product name and attributes
product_name = match_product_name(extracted_text)
attributes = extract_attributes(extracted_text)
model_name = attributes.get("Model Name", "").strip()
stage = attributes.get("Stage", "").strip()
hp = attributes.get("H.P.", "").strip()
if not product_name:
return "Product name could not be matched from the extracted text."
attributes["Product name"] = product_name
# Handling "Exit" Mode (Updating Records)
if mode == "Exit":
# Query should match exact product name, model name, stage, and hp if available
query_conditions = [f"{product_field_name} = '{product_name}'"]
if model_name:
query_conditions.append(f"{model_field_name} = '{model_name}'")
if stage:
query_conditions.append(f"{stage_field_name} = '{stage}'")
if hp:
query_conditions.append(f"{hp_field_name} = '{hp}'")
query = f"SELECT Id, {', '.join(field_names)} FROM {object_name} WHERE {' AND '.join(query_conditions)} LIMIT 1"
response = sf.query(query)
if response["records"]:
record_id = response["records"][0]["Id"]
updated_fields = {field: quantity for field in field_names}
sf_object.update(record_id, updated_fields)
return f"βœ… Updated record for product '{product_name}' ({model_name}) in {object_name}. Updated fields: {updated_fields}."
else:
# If no matching record found with all conditions, try with only product name
query_conditions = [f"{product_field_name} = '{product_name}'"]
query = f"SELECT Id, {', '.join(field_names)} FROM {object_name} WHERE {' AND '.join(query_conditions)} LIMIT 1"
response = sf.query(query)
if response["records"]:
record_id = response["records"][0]["Id"]
updated_fields = {field: quantity for field in field_names}
sf_object.update(record_id, updated_fields)
return f"βœ… Updated record for product '{product_name}' in {object_name}. Updated fields: {updated_fields}."
else:
return f"❌ No matching record found for product '{product_name}' in {object_name}."
# Handling "Entry" Mode (Creating Records)
else:
filtered_attributes = filter_valid_attributes(attributes, valid_fields)
filtered_attributes[field_name] = quantity
sf_object.create(filtered_attributes)
return f"βœ… Data successfully exported to Salesforce object {object_name}."
except Exception as e:
return f"❌ Error interacting with Salesforce: {str(e)}"
# Function to pull structured data from Salesforce and display as a table
def pull_data_from_salesforce():
try:
sf = Salesforce(
username=SALESFORCE_USERNAME,
password=SALESFORCE_PASSWORD,
security_token=SALESFORCE_SECURITY_TOKEN
)
query_inventory = "SELECT Product_Name__c, Current_Stocks__c, soldstock__c FROM Inventory_Management__c LIMIT 100"
query_unbillable = "SELECT Product_Name__c, Current_Stock__c, soldstock__c FROM Un_Billable__c LIMIT 100"
response_inventory = sf.query_all(query_inventory)
response_unbillable = sf.query_all(query_unbillable)
records_inventory = response_inventory.get("records", [])
records_unbillable = response_unbillable.get("records", [])
if not records_inventory and not records_unbillable:
return "No data found in Salesforce.", None, None, None
records = records_inventory + records_unbillable
df = pd.DataFrame(records)
df = df.drop(columns=['attributes'], errors='ignore')
# Rename columns for better readability
df.rename(columns={
"Product_Name__c": "Product Name",
"Modal_Name__c": "Model Name (Inventory)",
"Model_Name__c": "Model Name (Unbillable)",
"Current_Stocks__c": "Current Stocks",
"soldstock__c": "Sold Stock"
}, inplace=True)
excel_path = "salesforce_data.xlsx"
df.to_excel(excel_path, index=False)
# Generate interactive vertical bar graph using Matplotlib
fig, ax = plt.subplots(figsize=(12, 8))
df.plot(kind='bar', x="Product Name", y="Current Stocks", ax=ax, legend=False)
ax.set_title("Stock Distribution by Product Name")
ax.set_xlabel("Product Name")
ax.set_ylabel("Current Stocks")
plt.xticks(rotation=45, ha="right", fontsize=10)
plt.tight_layout()
buffer = BytesIO()
plt.savefig(buffer, format="png")
buffer.seek(0)
img = Image.open(buffer)
return "Data successfully retrieved.", df, excel_path, img
except Exception as e:
return f"Error fetching data: {str(e)}", None, None, None
# Unified function to handle image processing and Salesforce interaction
def process_image(image, mode, entry_type, quantity):
extracted_text = extract_text(image)
if not extracted_text:
return "No text detected in the image.", None
product_name = match_product_name(extracted_text)
attributes = extract_attributes(extracted_text)
if product_name:
attributes["Product name"] = product_name
# Interact with Salesforce
message = interact_with_salesforce(mode, entry_type, quantity, extracted_text)
numbered_output = "\n".join([f"{key}: {value}" for key, value in attributes.items()])
return f"Extracted Text:\n{extracted_text}\n\nAttributes and Values:\n{numbered_output}", message
# Gradio Interface
def app():
return gr.TabbedInterface([
gr.Interface(
fn=process_image,
inputs=[
gr.Image(type="numpy", label="πŸ“„α΄œα΄˜ΚŸα΄α΄€α΄… Ιͺᴍᴀɒᴇ"),
gr.Dropdown(label="πŸ“Œ Mode", choices=["Entry", "Exit"], value="Entry"),
gr.Radio(label="πŸ“¦ Entry Type", choices=["Sales", "Non-Sales"], value="Sales"),
gr.Number(label="πŸ”’ Quantity", value=1, interactive=True),
],
outputs=[
gr.Text(label="πŸ“ Extracted Image Data"),
gr.Text(label="πŸš€ Result")
],
title="🏒 𝑽𝑬𝑡𝑲𝑨𝑻𝑨𝑹𝑨𝑴𝑨𝑡𝑨 𝑴𝑢𝑻𝑢𝑹𝑺",
description="πŸ“¦ πˆππ•π„ππ“πŽπ‘π˜ πŒπ€ππ€π†π„πŒπ„ππ“"
),
gr.Interface(
fn=pull_data_from_salesforce,
inputs=[],
outputs=[
gr.Text(label="Status"),
gr.Dataframe(label="πŸ“¦ Salesforce Data Table"),
gr.File(label="Download Salesforce Data"),
gr.Image(label="πŸ“‰ Stock Distribution Bar Graph")
],
title="πŸ“Š Salesforce Data Export",
description="View, visualize (zoom-in/out), and download Salesforce data (Product Name, Model Name, Current Stocks)."
)
], ["πŸ“₯ OCR Processing", "πŸ“Š Salesforce Data Export"])
if __name__ == "__main__":
app().launch(share=True)