Vendor / app.py
Rekham1110's picture
Update app.py
4015b8c verified
import gradio as gr
import logging
import pandas as pd
from datetime import datetime
import os
from simple_salesforce import Salesforce
from dotenv import load_dotenv
from transformers import pipeline
# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Load environment variables
load_dotenv()
# Initialize Hugging Face sentiment analysis
try:
sentiment_analyzer = pipeline("sentiment-analysis", model="distilbert-base-uncased-finetuned-sst-2-english")
logger.info("Hugging Face sentiment analyzer loaded")
except Exception as e:
logger.error(f"Failed to load Hugging Face model: {str(e)}")
sentiment_analyzer = None
# Salesforce connection
try:
sf = Salesforce(
username=os.getenv('SF_USERNAME'),
password=os.getenv('SF_PASSWORD'),
security_token=os.getenv('SF_SECURITY_TOKEN')
)
logger.info("Connected to Salesforce")
except Exception as e:
logger.error(f"Failed to connect to Salesforce: {str(e)}")
sf = None
# Global storage for form data
global_data = {
"calculate_scores": None,
"contact_us": None,
"upload_file": None
}
def score_vendor(vendor_id: str, delivery_records: float, issue_counts: int, nps_values: float):
try:
if not vendor_id:
return {"error": "Vendor ID cannot be empty. Please provide a valid ID (e.g., V001)."}
if not (0 <= delivery_records <= 100):
return {"error": f"Delivery Records ({delivery_records}) must be between 0 and 100."}
if issue_counts < 0:
return {"error": f"Issue Counts ({issue_counts}) must be non-negative."}
if not (0 <= nps_values <= 100):
return {"error": f"NPS Value ({nps_values}) must be between 0 and 100."}
timeliness_score = delivery_records * 0.4
issue_score = (10 - min(issue_counts, 10)) * 3
feedback_score = nps_values * 0.3
composite_score = round(timeliness_score + issue_score + feedback_score, 2)
rationale = (
f"Score = Timeliness ({timeliness_score:.2f}) + "
f"Issues ({issue_score:.2f}) + Feedback ({feedback_score:.2f})"
)
sentiment = sentiment_analyzer(rationale)[0]["label"] if sentiment_analyzer else "Unknown"
rationale_with_sentiment = f"{rationale} [Sentiment: {sentiment}]"
evaluation_date = datetime.now().strftime("%Y-%m-%d")
alert_flag = composite_score < 50
result = {
"vendor_id": vendor_id,
"score": composite_score,
"timeliness_score": timeliness_score,
"issue_score": issue_score,
"feedback_score": feedback_score,
"evaluation_date": evaluation_date,
"rationale": rationale_with_sentiment,
"alert_flag": alert_flag
}
logger.info(f"Processed vendor: {vendor_id} on {evaluation_date}")
return result
except Exception as e:
logger.error(f"Error processing vendor {vendor_id}: {str(e)}")
return {"error": f"An unexpected error occurred: {str(e)}"}
def format_output(vendor_id, delivery_records, issue_counts, nps_values):
result = score_vendor(vendor_id, delivery_records, issue_counts, nps_values)
if "error" in result:
return result["error"], None
global_data["calculate_scores"] = result
df = pd.DataFrame([result])
return None, df[["vendor_id", "score", "timeliness_score", "issue_score", "feedback_score", "evaluation_date", "rationale", "alert_flag"]]
def process_uploaded_file(file):
if not file:
return "Upload a file to see status.", None
try:
df = pd.read_csv(file.name if hasattr(file, 'name') else file)
required_columns = ["vendor_id", "delivery_records", "issue_counts", "nps_values"]
if not all(col in df.columns for col in required_columns):
return (
f"File uploaded: {file.name if hasattr(file, 'name') else 'Unknown file'} - Error: File must contain columns: vendor_id, delivery_records, issue_counts, nps_values.",
None
)
results = []
for _, row in df.iterrows():
result = score_vendor(
str(row["vendor_id"]),
float(row["delivery_records"]),
int(row["issue_counts"]),
float(row["nps_values"])
)
if "error" not in result:
results.append(result)
else:
return (
f"File uploaded: {file.name if hasattr(file, 'name') else 'Unknown file'} - Error in row {row.name + 1}: {result['error']}",
None
)
if not results:
return (
f"File uploaded: {file.name if hasattr(file, 'name') else 'Unknown file'} - Error: No valid data to process.",
None
)
global_data["upload_file"] = results
result_df = pd.DataFrame(results)
return (
f"File uploaded successfully: {file.name if hasattr(file, 'name') else 'Unknown file'}",
result_df[["vendor_id", "score", "timeliness_score", "issue_score", "feedback_score", "evaluation_date", "rationale", "alert_flag"]]
)
except Exception as e:
logger.error(f"Error processing file: {str(e)}")
return (
f"File uploaded: {file.name if hasattr(file, 'name') else 'Unknown file'} - Error processing file: {str(e)}",
None
)
def handle_contact_form(name, email, message):
if not name or not email or not message:
return "Error: All fields (Name, Email, Message) are required."
sentiment = sentiment_analyzer(message)[0]["label"] if sentiment_analyzer else "Unknown"
message_with_sentiment = f"{message} [Sentiment: {sentiment}]"
global_data["contact_us"] = {"name": name, "email": email, "message": message_with_sentiment}
return f"Data saved locally. Click 'Submit to Salesforce' to create record."
def submit_to_salesforce():
if not sf:
return "Error: Salesforce connection not established."
if not sentiment_analyzer:
return "Error: Hugging Face model not loaded."
try:
results = []
# Calculate Scores tab
if global_data["calculate_scores"] and "error" not in global_data["calculate_scores"]:
result = global_data["calculate_scores"]
vendor_id = result["vendor_id"]
vendor_query = sf.query(f"SELECT Id FROM Vendor__c WHERE Vendor_ID__c = '{vendor_id}'")
if vendor_query['totalSize'] == 0:
results.append(f"Error: Vendor with ID {vendor_id} not found in Salesforce.")
else:
vendor_id_sf = vendor_query['records'][0]['Id']
response = sf.Vendor_Performance__c.create({
'Vendor_ID__c': vendor_id,
'Timeliness_Score__c': result["timeliness_score"],
'Issue_Count__c': result["issue_score"] / 3,
'Feedback_Rating__c': result["feedback_score"] / 0.3,
'Evaluation_Date__c': result["evaluation_date"],
'Score__c': result["score"],
'Rationale__c': result["rationale"],
'Vendor__c': vendor_id_sf,
'Alert_Flag__c': result["alert_flag"]
})
logger.info(f"Created Vendor_Performance__c record for vendor: {vendor_id}, Response: {response}")
results.append(f"Vendor_Performance__c created for {vendor_id}. Trigger fired.")
global_data["calculate_scores"] = None
# Contact Us tab
if global_data["contact_us"]:
name = global_data["contact_us"]["name"]
email = global_data["contact_us"]["email"]
message = global_data["contact_us"]["message"]
vendor_query = sf.query(f"SELECT Id FROM Vendor__c WHERE Contact_Email__c = '{email}'")
if vendor_query['totalSize'] > 0:
vendor_id_sf = vendor_query['records'][0]['Id']
response = sf.Vendor__c.update(vendor_id_sf, {
'Vendor_Name__c': name,
'Contact_Email__c': email,
'Comments__c': message
})
logger.info(f"Updated Vendor__c record for email: {email}, Response: {response}")
results.append(f"Vendor__c updated for {email}. Trigger fired.")
else:
response = sf.Vendor__c.create({
'Vendor_Name__c': name,
'Contact_Email__c': email,
'Comments__c': message,
'Vendor_ID__c': f"V{datetime.now().strftime('%Y%m%d%H%M%S')}"
})
logger.info(f"Created Vendor__c record for email: {email}, Response: {response}")
results.append(f"Vendor__c created for {email}. Trigger fired.")
global_data["contact_us"] = None
# Upload File tab
if global_data["upload_file"]:
for result in global_data["upload_file"]:
vendor_id = result["vendor_id"]
vendor_query = sf.query(f"SELECT Id FROM Vendor__c WHERE Vendor_ID__c = '{vendor_id}'")
if vendor_query['totalSize'] == 0:
results.append(f"Error: Vendor with ID {vendor_id} not found in Salesforce.")
continue
vendor_id_sf = vendor_query['records'][0]['Id']
response = sf.Vendor_Performance__c.create({
'Vendor_ID__c': vendor_id,
'Timeliness_Score__c': result["timeliness_score"],
'Issue_Count__c': result["issue_score"] / 3,
'Feedback_Rating__c': result["feedback_score"] / 0.3,
'Evaluation_Date__c': result["evaluation_date"],
'Score__c': result["score"],
'Rationale__c': result["rationale"],
'Vendor__c': vendor_id_sf,
'Alert_Flag__c': result["alert_flag"]
})
logger.info(f"Created Vendor_Performance__c record for vendor: {vendor_id}, Response: {response}")
results.append(f"Vendor_Performance__c created for {vendor_id}. Trigger fired.")
global_data["upload_file"] = None
return "\n".join(results) if results else "No data to submit."
except Exception as e:
logger.error(f"Error submitting to Salesforce: {str(e)}")
return f"Error submitting to Salesforce: {str(e)}"
with gr.Blocks(title="Vendor Performance Scoring") as demo:
gr.Markdown("# Vendor Performance Scoring System")
gr.Markdown("Enter data in any tab and click 'Submit to Salesforce' to save records with Hugging Face sentiment analysis and trigger Apex actions.")
submit_salesforce_btn = gr.Button("Submit to Salesforce", variant="primary")
salesforce_output = gr.Textbox(label="Salesforce Submission Status", placeholder="Click 'Submit to Salesforce' to save data", interactive=False)
with gr.Tabs():
with gr.Tab("Calculate Scores"):
gr.Markdown("### Enter Vendor Details")
gr.Markdown("Provide the vendor details below to calculate performance scores. All fields are required.")
with gr.Row():
vendor_id = gr.Textbox(label="Vendor ID", placeholder="e.g., V001", value="")
delivery_records = gr.Slider(minimum=0, maximum=100, step=1, label="Delivery Records (% On-Time)", value=0)
with gr.Row():
issue_counts = gr.Number(label="Issue Counts", value=0, step=1, minimum=0)
nps_values = gr.Slider(minimum=0, maximum=100, step=1, label="NPS Value", value=0)
with gr.Row():
calculate_btn = gr.Button("Calculate Scores", variant="primary")
clear_btn = gr.Button("Clear Inputs")
error_output = gr.Textbox(label="Error Message", placeholder="No errors", interactive=False)
score_output = gr.Dataframe(
label="Scoring Results",
headers=["Vendor ID", "Score", "Timeliness Score", "Issue Score", "Feedback Score", "Evaluation Date", "Rationale", "Alert Flag"],
wrap=True
)
calculate_btn.click(
fn=format_output,
inputs=[vendor_id, delivery_records, issue_counts, nps_values],
outputs=[error_output, score_output]
)
clear_btn.click(
fn=lambda: ("", 0, 0, 0, "No errors", None),
inputs=None,
outputs=[vendor_id, delivery_records, issue_counts, nps_values, error_output, score_output]
)
with gr.Tab("Contact Us"):
gr.Markdown("### Contact Us")
gr.Markdown("Fill out the form below. Data will be saved when you click 'Submit to Salesforce'.")
with gr.Row():
name_input = gr.Textbox(label="Name", placeholder="Your name")
email_input = gr.Textbox(label="Email", placeholder="your.email@example.com")
message_input = gr.Textbox(label="Message", placeholder="Your message or feedback", lines=3)
contact_save_btn = gr.Button("Save Contact Data", variant="primary")
contact_output = gr.Textbox(label="Contact Form Status", placeholder="Click 'Save Contact Data' to save locally", interactive=False)
contact_save_btn.click(
fn=handle_contact_form,
inputs=[name_input, email_input, message_input],
outputs=contact_output
)
with gr.Tab("Upload File"):
gr.Markdown("### Upload Supporting Documents")
gr.Markdown("Upload a CSV file with columns: vendor_id, delivery_records, issue_counts, nps_values.")
file_input = gr.File(label="Upload File")
file_output = gr.Textbox(label="Upload Status", placeholder="Upload a file to see status", interactive=False)
upload_score_output = gr.Dataframe(
label="Scoring Results",
headers=["Vendor ID", "Score", "Timeliness Score", "Issue Score", "Feedback Score", "Evaluation Date", "Rationale", "Alert Flag"],
wrap=True
)
file_input.change(
fn=process_uploaded_file,
inputs=file_input,
outputs=[file_output, upload_score_output]
)
submit_salesforce_btn.click(
fn=submit_to_salesforce,
inputs=None,
outputs=salesforce_output
)
if __name__ == "__main__":
demo.launch(server_name="0.0.0.0", server_port=7860)