import os import logging from datetime import datetime import pandas as pd from dotenv import load_dotenv from simple_salesforce import Salesforce import gradio as gr # Setup logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # Load environment variables load_dotenv() SF_USERNAME = os.getenv("SF_USERNAME") SF_PASSWORD = os.getenv("SF_PASSWORD") SF_SECURITY_TOKEN = os.getenv("SF_SECURITY_TOKEN") SF_DOMAIN = os.getenv("SF_DOMAIN", "login") # Connect to Salesforce def connect_to_salesforce(): try: if not all([SF_USERNAME, SF_PASSWORD, SF_SECURITY_TOKEN]): logger.error("Salesforce credentials are missing in environment variables.") return None sf = Salesforce( username=SF_USERNAME, password=SF_PASSWORD, security_token=SF_SECURITY_TOKEN, domain=SF_DOMAIN ) logger.info("Salesforce connected successfully.") return sf except Exception as e: logger.error(f"Salesforce login failed: {e}") return None # Lookup Vendor ID by Name def get_vendor_id_by_name(sf, vendor_name): if not sf: logger.error("Cannot query vendor ID: Salesforce connection is not established.") return None try: query = f"SELECT Id FROM Vendor__c WHERE Name = '{vendor_name}' LIMIT 1" result = sf.query(query) if result["totalSize"] > 0: return result["records"][0]["Id"] else: logger.error(f"No vendor found with name: {vendor_name}") return None except Exception as e: logger.error(f"Error querying vendor ID for {vendor_name}: {e}") return None # Save to Salesforce def save_to_salesforce(data): sf = connect_to_salesforce() if not sf: return "Salesforce connection failed. Please check credentials and try again." try: vendor_name = data["vendor_name"] vendor_id = get_vendor_id_by_name(sf, vendor_name) if not vendor_id: return f"Error: No vendor found with name '{vendor_name}'" result = sf.Vendor_Performance__c.create({ "Vendor_ID__c": int(data["vendor_id"]), "Vendor_Name__c": vendor_id, "Score__c": data["score"], "Timeliness_Score__c": data["timeliness_score"], "Issue_Count__c": data["issue_count"], "Feedback_Rating__c": data["feedback_score"], "Evaluation_Date__c": data["evaluation_date"], "Rationale__c": data["rationale"], "Performance_Level__c": data["performance_level"] }) logger.info(f"Record saved to Salesforce: {result}") return "Saved successfully" except Exception as e: logger.error(f"Save failed: {str(e)}") return f"Error: {str(e)}" # Score vendor def score_vendor(vendor_id, vendor_name, delivery_records, issue_counts, nps_values): if not vendor_id or not vendor_name: return {"error": "Vendor ID and Vendor Name are required."} if not (0 <= delivery_records <= 100): return {"error": "Delivery must be 0–100"} if not (0 <= issue_counts <= 10): return {"error": "Issue Count must be 0–10"} if not (0 <= nps_values <= 10): return {"error": "NPS must be 0–10"} timeliness_score = min(40, (delivery_records / 100) * 40) issue_score = max(0, 30 - (issue_counts * 3)) feedback_score = min(30, (nps_values / 10) * 30) total_score = round(timeliness_score + issue_score + feedback_score, 2) rationale = f"Timeliness: {timeliness_score:.1f}/40, Issues: {issue_score:.1f}/30, Feedback: {feedback_score:.1f}/30" if total_score < 40: performance_level = "Low" elif total_score <= 70: performance_level = "Average" else: performance_level = "High" eval_date = datetime.today().strftime('%Y-%m-%d') result = { "vendor_id": vendor_id, "vendor_name": vendor_name, "score": total_score, "timeliness_score": timeliness_score, "issue_score": issue_score, "issue_count": issue_counts, "feedback_score": feedback_score, "evaluation_date": eval_date, "rationale": rationale, "performance_level": performance_level } save_status = save_to_salesforce(result) result["status"] = save_status return result # Format for Gradio def format_output(vendor_id, vendor_name, delivery_records, issue_counts, nps_values): result = score_vendor(vendor_id, vendor_name, delivery_records, issue_counts, nps_values) if "error" in result: return result["error"], None df = pd.DataFrame([result]) return result["status"], df[[ "vendor_id", "vendor_name", "score", "timeliness_score", "issue_score", "feedback_score", "evaluation_date", "rationale", "performance_level" ]] # Batch CSV Upload def process_uploaded_file(file): try: df = pd.read_csv(file.name) required = {"vendor_id", "vendor_name", "delivery_records", "issue_counts", "nps_values"} if not required.issubset(df.columns): return f"Missing required columns: {', '.join(required)}", None results = [] for _, row in df.iterrows(): res = score_vendor(row["vendor_id"], row["vendor_name"], row["delivery_records"], row["issue_counts"], row["nps_values"]) if "error" not in res: results.append(res) if not results: return "No valid records found", None return "Batch processed", pd.DataFrame(results)[[ "vendor_id", "vendor_name", "score", "timeliness_score", "issue_score", "feedback_score", "evaluation_date", "rationale", "performance_level" ]] except Exception as e: logger.error(f"Error processing file: {e}") return f"Error processing file: {e}", None # Gradio UI try: with gr.Blocks(title="Vendor Performance App") as demo: gr.Markdown("## 📊 Vendor Performance Scoring Tool") with gr.Tabs(): with gr.Tab("Single Vendor"): vendor_id = gr.Textbox(label="Vendor ID") vendor_name = gr.Textbox(label="Vendor Name") delivery = gr.Number(label="Delivery ", minimum=0, maximum=100) issues = gr.Number(label="Issue Count", minimum=0,maximum=10) nps = gr.Number(label="NPS", minimum=0, maximum=10) status = gr.Textbox(label="Status", interactive=False) result_table = gr.Dataframe(headers=[ "Vendor ID", "Vendor Name", "Score", "Timeliness", "Issues", "Feedback", "Date", "Rationale", "Performance Level" ]) submit = gr.Button("Submit") submit.click( fn=format_output, inputs=[vendor_id, vendor_name, delivery, issues, nps], outputs=[status, result_table] ) with gr.Tab("Upload CSV"): file_upload = gr.File(label="Upload CSV (vendor_name should contain vendor names)", file_types=[".csv"]) file_status = gr.Textbox(label="Upload Status") file_result = gr.Dataframe(headers=[ "Vendor ID", "Vendor Name", "Score", "Timeliness", "Issues", "Feedback", "Date", "Rationale", "Performance Level" ]) file_upload.change( fn=process_uploaded_file, inputs=file_upload, outputs=[file_status, file_result] ) except Exception as e: logger.error(f"Failed to initialize Gradio app: {e}") raise if __name__ == "__main__": logger.info("Starting Gradio app on port 7860...") demo.launch(server_name="0.0.0.0", server_port=7860)