Rekham1110's picture
Update app.py
ae78ca7 verified
import os
import logging
from datetime import datetime
import pandas as pd
from dotenv import load_dotenv
from simple_salesforce import Salesforce
import gradio as gr
# Setup logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Load environment variables
load_dotenv()
SF_USERNAME = os.getenv("SF_USERNAME")
SF_PASSWORD = os.getenv("SF_PASSWORD")
SF_SECURITY_TOKEN = os.getenv("SF_SECURITY_TOKEN")
SF_DOMAIN = os.getenv("SF_DOMAIN", "login")
# Connect to Salesforce
def connect_to_salesforce():
try:
if not all([SF_USERNAME, SF_PASSWORD, SF_SECURITY_TOKEN]):
logger.error("Salesforce credentials are missing in environment variables.")
return None
sf = Salesforce(
username=SF_USERNAME,
password=SF_PASSWORD,
security_token=SF_SECURITY_TOKEN,
domain=SF_DOMAIN
)
logger.info("Salesforce connected successfully.")
return sf
except Exception as e:
logger.error(f"Salesforce login failed: {e}")
return None
# Lookup Vendor ID by Name
def get_vendor_id_by_name(sf, vendor_name):
if not sf:
logger.error("Cannot query vendor ID: Salesforce connection is not established.")
return None
try:
query = f"SELECT Id FROM Vendor__c WHERE Name = '{vendor_name}' LIMIT 1"
result = sf.query(query)
if result["totalSize"] > 0:
return result["records"][0]["Id"]
else:
logger.error(f"No vendor found with name: {vendor_name}")
return None
except Exception as e:
logger.error(f"Error querying vendor ID for {vendor_name}: {e}")
return None
# Save to Salesforce
def save_to_salesforce(data):
sf = connect_to_salesforce()
if not sf:
return "Salesforce connection failed. Please check credentials and try again."
try:
vendor_name = data["vendor_name"]
vendor_id = get_vendor_id_by_name(sf, vendor_name)
if not vendor_id:
return f"Error: No vendor found with name '{vendor_name}'"
result = sf.Vendor_Performance__c.create({
"Vendor_ID__c": int(data["vendor_id"]),
"Vendor_Name__c": vendor_id,
"Score__c": data["score"],
"Timeliness_Score__c": data["timeliness_score"],
"Issue_Count__c": data["issue_count"],
"Feedback_Rating__c": data["feedback_score"],
"Evaluation_Date__c": data["evaluation_date"],
"Rationale__c": data["rationale"],
"Performance_Level__c": data["performance_level"]
})
logger.info(f"Record saved to Salesforce: {result}")
return "Saved successfully"
except Exception as e:
logger.error(f"Save failed: {str(e)}")
return f"Error: {str(e)}"
# Score vendor
def score_vendor(vendor_id, vendor_name, delivery_records, issue_counts, nps_values):
if not vendor_id or not vendor_name:
return {"error": "Vendor ID and Vendor Name are required."}
if not (0 <= delivery_records <= 100):
return {"error": "Delivery must be 0โ€“100"}
if not (0 <= issue_counts <= 10):
return {"error": "Issue Count must be 0โ€“10"}
if not (0 <= nps_values <= 10):
return {"error": "NPS must be 0โ€“10"}
timeliness_score = min(40, (delivery_records / 100) * 40)
issue_score = max(0, 30 - (issue_counts * 3))
feedback_score = min(30, (nps_values / 10) * 30)
total_score = round(timeliness_score + issue_score + feedback_score, 2)
rationale = f"Timeliness: {timeliness_score:.1f}/40, Issues: {issue_score:.1f}/30, Feedback: {feedback_score:.1f}/30"
if total_score < 40:
performance_level = "Low"
elif total_score <= 70:
performance_level = "Average"
else:
performance_level = "High"
eval_date = datetime.today().strftime('%Y-%m-%d')
result = {
"vendor_id": vendor_id,
"vendor_name": vendor_name,
"score": total_score,
"timeliness_score": timeliness_score,
"issue_score": issue_score,
"issue_count": issue_counts,
"feedback_score": feedback_score,
"evaluation_date": eval_date,
"rationale": rationale,
"performance_level": performance_level
}
save_status = save_to_salesforce(result)
result["status"] = save_status
return result
# Format for Gradio
def format_output(vendor_id, vendor_name, delivery_records, issue_counts, nps_values):
result = score_vendor(vendor_id, vendor_name, delivery_records, issue_counts, nps_values)
if "error" in result:
return result["error"], None
df = pd.DataFrame([result])
return result["status"], df[[
"vendor_id", "vendor_name", "score", "timeliness_score",
"issue_score", "feedback_score", "evaluation_date", "rationale", "performance_level"
]]
# Batch CSV Upload
def process_uploaded_file(file):
try:
df = pd.read_csv(file.name)
required = {"vendor_id", "vendor_name", "delivery_records", "issue_counts", "nps_values"}
if not required.issubset(df.columns):
return f"Missing required columns: {', '.join(required)}", None
results = []
for _, row in df.iterrows():
res = score_vendor(row["vendor_id"], row["vendor_name"], row["delivery_records"], row["issue_counts"], row["nps_values"])
if "error" not in res:
results.append(res)
if not results:
return "No valid records found", None
return "Batch processed", pd.DataFrame(results)[[
"vendor_id", "vendor_name", "score", "timeliness_score",
"issue_score", "feedback_score", "evaluation_date", "rationale", "performance_level"
]]
except Exception as e:
logger.error(f"Error processing file: {e}")
return f"Error processing file: {e}", None
# Gradio UI
try:
with gr.Blocks(title="Vendor Performance App") as demo:
gr.Markdown("## ๐Ÿ“Š Vendor Performance Scoring Tool")
with gr.Tabs():
with gr.Tab("Single Vendor"):
vendor_id = gr.Textbox(label="Vendor ID")
vendor_name = gr.Textbox(label="Vendor Name")
delivery = gr.Number(label="Delivery ", minimum=0, maximum=100)
issues = gr.Number(label="Issue Count", minimum=0,maximum=10)
nps = gr.Number(label="NPS", minimum=0, maximum=10)
status = gr.Textbox(label="Status", interactive=False)
result_table = gr.Dataframe(headers=[
"Vendor ID", "Vendor Name", "Score", "Timeliness",
"Issues", "Feedback", "Date", "Rationale", "Performance Level"
])
submit = gr.Button("Submit")
submit.click(
fn=format_output,
inputs=[vendor_id, vendor_name, delivery, issues, nps],
outputs=[status, result_table]
)
with gr.Tab("Upload CSV"):
file_upload = gr.File(label="Upload CSV (vendor_name should contain vendor names)", file_types=[".csv"])
file_status = gr.Textbox(label="Upload Status")
file_result = gr.Dataframe(headers=[
"Vendor ID", "Vendor Name", "Score", "Timeliness",
"Issues", "Feedback", "Date", "Rationale", "Performance Level"
])
file_upload.change(
fn=process_uploaded_file,
inputs=file_upload,
outputs=[file_status, file_result]
)
except Exception as e:
logger.error(f"Failed to initialize Gradio app: {e}")
raise
if __name__ == "__main__":
logger.info("Starting Gradio app on port 7860...")
demo.launch(server_name="0.0.0.0", server_port=7860)