Spaces:
Sleeping
Sleeping
| import os | |
| import logging | |
| from datetime import datetime | |
| import pandas as pd | |
| from dotenv import load_dotenv | |
| from simple_salesforce import Salesforce | |
| import gradio as gr | |
| # Setup logging | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| # Load environment variables | |
| load_dotenv() | |
| SF_USERNAME = os.getenv("SF_USERNAME") | |
| SF_PASSWORD = os.getenv("SF_PASSWORD") | |
| SF_SECURITY_TOKEN = os.getenv("SF_SECURITY_TOKEN") | |
| SF_DOMAIN = os.getenv("SF_DOMAIN", "login") | |
| # Connect to Salesforce | |
| def connect_to_salesforce(): | |
| try: | |
| if not all([SF_USERNAME, SF_PASSWORD, SF_SECURITY_TOKEN]): | |
| logger.error("Salesforce credentials are missing in environment variables.") | |
| return None | |
| sf = Salesforce( | |
| username=SF_USERNAME, | |
| password=SF_PASSWORD, | |
| security_token=SF_SECURITY_TOKEN, | |
| domain=SF_DOMAIN | |
| ) | |
| logger.info("Salesforce connected successfully.") | |
| return sf | |
| except Exception as e: | |
| logger.error(f"Salesforce login failed: {e}") | |
| return None | |
| # Lookup Vendor ID by Name | |
| def get_vendor_id_by_name(sf, vendor_name): | |
| if not sf: | |
| logger.error("Cannot query vendor ID: Salesforce connection is not established.") | |
| return None | |
| try: | |
| query = f"SELECT Id FROM Vendor__c WHERE Name = '{vendor_name}' LIMIT 1" | |
| result = sf.query(query) | |
| if result["totalSize"] > 0: | |
| return result["records"][0]["Id"] | |
| else: | |
| logger.error(f"No vendor found with name: {vendor_name}") | |
| return None | |
| except Exception as e: | |
| logger.error(f"Error querying vendor ID for {vendor_name}: {e}") | |
| return None | |
| # Save to Salesforce | |
| def save_to_salesforce(data): | |
| sf = connect_to_salesforce() | |
| if not sf: | |
| return "Salesforce connection failed. Please check credentials and try again." | |
| try: | |
| vendor_name = data["vendor_name"] | |
| vendor_id = get_vendor_id_by_name(sf, vendor_name) | |
| if not vendor_id: | |
| return f"Error: No vendor found with name '{vendor_name}'" | |
| result = sf.Vendor_Performance__c.create({ | |
| "Vendor_ID__c": int(data["vendor_id"]), | |
| "Vendor_Name__c": vendor_id, | |
| "Score__c": data["score"], | |
| "Timeliness_Score__c": data["timeliness_score"], | |
| "Issue_Count__c": data["issue_count"], | |
| "Feedback_Rating__c": data["feedback_score"], | |
| "Evaluation_Date__c": data["evaluation_date"], | |
| "Rationale__c": data["rationale"], | |
| "Performance_Level__c": data["performance_level"] | |
| }) | |
| logger.info(f"Record saved to Salesforce: {result}") | |
| return "Saved successfully" | |
| except Exception as e: | |
| logger.error(f"Save failed: {str(e)}") | |
| return f"Error: {str(e)}" | |
| # Score vendor | |
| def score_vendor(vendor_id, vendor_name, delivery_records, issue_counts, nps_values): | |
| if not vendor_id or not vendor_name: | |
| return {"error": "Vendor ID and Vendor Name are required."} | |
| if not (0 <= delivery_records <= 100): | |
| return {"error": "Delivery must be 0โ100"} | |
| if not (0 <= issue_counts <= 10): | |
| return {"error": "Issue Count must be 0โ10"} | |
| if not (0 <= nps_values <= 10): | |
| return {"error": "NPS must be 0โ10"} | |
| timeliness_score = min(40, (delivery_records / 100) * 40) | |
| issue_score = max(0, 30 - (issue_counts * 3)) | |
| feedback_score = min(30, (nps_values / 10) * 30) | |
| total_score = round(timeliness_score + issue_score + feedback_score, 2) | |
| rationale = f"Timeliness: {timeliness_score:.1f}/40, Issues: {issue_score:.1f}/30, Feedback: {feedback_score:.1f}/30" | |
| if total_score < 40: | |
| performance_level = "Low" | |
| elif total_score <= 70: | |
| performance_level = "Average" | |
| else: | |
| performance_level = "High" | |
| eval_date = datetime.today().strftime('%Y-%m-%d') | |
| result = { | |
| "vendor_id": vendor_id, | |
| "vendor_name": vendor_name, | |
| "score": total_score, | |
| "timeliness_score": timeliness_score, | |
| "issue_score": issue_score, | |
| "issue_count": issue_counts, | |
| "feedback_score": feedback_score, | |
| "evaluation_date": eval_date, | |
| "rationale": rationale, | |
| "performance_level": performance_level | |
| } | |
| save_status = save_to_salesforce(result) | |
| result["status"] = save_status | |
| return result | |
| # Format for Gradio | |
| def format_output(vendor_id, vendor_name, delivery_records, issue_counts, nps_values): | |
| result = score_vendor(vendor_id, vendor_name, delivery_records, issue_counts, nps_values) | |
| if "error" in result: | |
| return result["error"], None | |
| df = pd.DataFrame([result]) | |
| return result["status"], df[[ | |
| "vendor_id", "vendor_name", "score", "timeliness_score", | |
| "issue_score", "feedback_score", "evaluation_date", "rationale", "performance_level" | |
| ]] | |
| # Batch CSV Upload | |
| def process_uploaded_file(file): | |
| try: | |
| df = pd.read_csv(file.name) | |
| required = {"vendor_id", "vendor_name", "delivery_records", "issue_counts", "nps_values"} | |
| if not required.issubset(df.columns): | |
| return f"Missing required columns: {', '.join(required)}", None | |
| results = [] | |
| for _, row in df.iterrows(): | |
| res = score_vendor(row["vendor_id"], row["vendor_name"], row["delivery_records"], row["issue_counts"], row["nps_values"]) | |
| if "error" not in res: | |
| results.append(res) | |
| if not results: | |
| return "No valid records found", None | |
| return "Batch processed", pd.DataFrame(results)[[ | |
| "vendor_id", "vendor_name", "score", "timeliness_score", | |
| "issue_score", "feedback_score", "evaluation_date", "rationale", "performance_level" | |
| ]] | |
| except Exception as e: | |
| logger.error(f"Error processing file: {e}") | |
| return f"Error processing file: {e}", None | |
| # Gradio UI | |
| try: | |
| with gr.Blocks(title="Vendor Performance App") as demo: | |
| gr.Markdown("## ๐ Vendor Performance Scoring Tool") | |
| with gr.Tabs(): | |
| with gr.Tab("Single Vendor"): | |
| vendor_id = gr.Textbox(label="Vendor ID") | |
| vendor_name = gr.Textbox(label="Vendor Name") | |
| delivery = gr.Number(label="Delivery ", minimum=0, maximum=100) | |
| issues = gr.Number(label="Issue Count", minimum=0,maximum=10) | |
| nps = gr.Number(label="NPS", minimum=0, maximum=10) | |
| status = gr.Textbox(label="Status", interactive=False) | |
| result_table = gr.Dataframe(headers=[ | |
| "Vendor ID", "Vendor Name", "Score", "Timeliness", | |
| "Issues", "Feedback", "Date", "Rationale", "Performance Level" | |
| ]) | |
| submit = gr.Button("Submit") | |
| submit.click( | |
| fn=format_output, | |
| inputs=[vendor_id, vendor_name, delivery, issues, nps], | |
| outputs=[status, result_table] | |
| ) | |
| with gr.Tab("Upload CSV"): | |
| file_upload = gr.File(label="Upload CSV (vendor_name should contain vendor names)", file_types=[".csv"]) | |
| file_status = gr.Textbox(label="Upload Status") | |
| file_result = gr.Dataframe(headers=[ | |
| "Vendor ID", "Vendor Name", "Score", "Timeliness", | |
| "Issues", "Feedback", "Date", "Rationale", "Performance Level" | |
| ]) | |
| file_upload.change( | |
| fn=process_uploaded_file, | |
| inputs=file_upload, | |
| outputs=[file_status, file_result] | |
| ) | |
| except Exception as e: | |
| logger.error(f"Failed to initialize Gradio app: {e}") | |
| raise | |
| if __name__ == "__main__": | |
| logger.info("Starting Gradio app on port 7860...") | |
| demo.launch(server_name="0.0.0.0", server_port=7860) | |