Spaces:
Runtime error
Runtime error
| import gradio as gr | |
| import logging | |
| import pandas as pd | |
| from datetime import datetime | |
| import os | |
| from simple_salesforce import Salesforce | |
| from dotenv import load_dotenv | |
| from transformers import pipeline | |
| # Set up logging | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| # Load environment variables | |
| load_dotenv() | |
| # Initialize Hugging Face sentiment analysis | |
| try: | |
| sentiment_analyzer = pipeline("sentiment-analysis", model="distilbert-base-uncased-finetuned-sst-2-english") | |
| logger.info("Hugging Face sentiment analyzer loaded") | |
| except Exception as e: | |
| logger.error(f"Failed to load Hugging Face model: {str(e)}") | |
| sentiment_analyzer = None | |
| # Salesforce connection | |
| try: | |
| sf = Salesforce( | |
| username=os.getenv('SF_USERNAME'), | |
| password=os.getenv('SF_PASSWORD'), | |
| security_token=os.getenv('SF_SECURITY_TOKEN') | |
| ) | |
| logger.info("Connected to Salesforce") | |
| except Exception as e: | |
| logger.error(f"Failed to connect to Salesforce: {str(e)}") | |
| sf = None | |
| # Global storage for form data | |
| global_data = { | |
| "calculate_scores": None, | |
| "contact_us": None, | |
| "upload_file": None | |
| } | |
| def score_vendor(vendor_id: str, delivery_records: float, issue_counts: int, nps_values: float): | |
| try: | |
| if not vendor_id: | |
| return {"error": "Vendor ID cannot be empty. Please provide a valid ID (e.g., V001)."} | |
| if not (0 <= delivery_records <= 100): | |
| return {"error": f"Delivery Records ({delivery_records}) must be between 0 and 100."} | |
| if issue_counts < 0: | |
| return {"error": f"Issue Counts ({issue_counts}) must be non-negative."} | |
| if not (0 <= nps_values <= 100): | |
| return {"error": f"NPS Value ({nps_values}) must be between 0 and 100."} | |
| timeliness_score = delivery_records * 0.4 | |
| issue_score = (10 - min(issue_counts, 10)) * 3 | |
| feedback_score = nps_values * 0.3 | |
| composite_score = round(timeliness_score + issue_score + feedback_score, 2) | |
| rationale = ( | |
| f"Score = Timeliness ({timeliness_score:.2f}) + " | |
| f"Issues ({issue_score:.2f}) + Feedback ({feedback_score:.2f})" | |
| ) | |
| sentiment = sentiment_analyzer(rationale)[0]["label"] if sentiment_analyzer else "Unknown" | |
| rationale_with_sentiment = f"{rationale} [Sentiment: {sentiment}]" | |
| evaluation_date = datetime.now().strftime("%Y-%m-%d") | |
| alert_flag = composite_score < 50 | |
| result = { | |
| "vendor_id": vendor_id, | |
| "score": composite_score, | |
| "timeliness_score": timeliness_score, | |
| "issue_score": issue_score, | |
| "feedback_score": feedback_score, | |
| "evaluation_date": evaluation_date, | |
| "rationale": rationale_with_sentiment, | |
| "alert_flag": alert_flag | |
| } | |
| logger.info(f"Processed vendor: {vendor_id} on {evaluation_date}") | |
| return result | |
| except Exception as e: | |
| logger.error(f"Error processing vendor {vendor_id}: {str(e)}") | |
| return {"error": f"An unexpected error occurred: {str(e)}"} | |
| def format_output(vendor_id, delivery_records, issue_counts, nps_values): | |
| result = score_vendor(vendor_id, delivery_records, issue_counts, nps_values) | |
| if "error" in result: | |
| return result["error"], None | |
| global_data["calculate_scores"] = result | |
| df = pd.DataFrame([result]) | |
| return None, df[["vendor_id", "score", "timeliness_score", "issue_score", "feedback_score", "evaluation_date", "rationale", "alert_flag"]] | |
| def process_uploaded_file(file): | |
| if not file: | |
| return "Upload a file to see status.", None | |
| try: | |
| df = pd.read_csv(file.name if hasattr(file, 'name') else file) | |
| required_columns = ["vendor_id", "delivery_records", "issue_counts", "nps_values"] | |
| if not all(col in df.columns for col in required_columns): | |
| return ( | |
| f"File uploaded: {file.name if hasattr(file, 'name') else 'Unknown file'} - Error: File must contain columns: vendor_id, delivery_records, issue_counts, nps_values.", | |
| None | |
| ) | |
| results = [] | |
| for _, row in df.iterrows(): | |
| result = score_vendor( | |
| str(row["vendor_id"]), | |
| float(row["delivery_records"]), | |
| int(row["issue_counts"]), | |
| float(row["nps_values"]) | |
| ) | |
| if "error" not in result: | |
| results.append(result) | |
| else: | |
| return ( | |
| f"File uploaded: {file.name if hasattr(file, 'name') else 'Unknown file'} - Error in row {row.name + 1}: {result['error']}", | |
| None | |
| ) | |
| if not results: | |
| return ( | |
| f"File uploaded: {file.name if hasattr(file, 'name') else 'Unknown file'} - Error: No valid data to process.", | |
| None | |
| ) | |
| global_data["upload_file"] = results | |
| result_df = pd.DataFrame(results) | |
| return ( | |
| f"File uploaded successfully: {file.name if hasattr(file, 'name') else 'Unknown file'}", | |
| result_df[["vendor_id", "score", "timeliness_score", "issue_score", "feedback_score", "evaluation_date", "rationale", "alert_flag"]] | |
| ) | |
| except Exception as e: | |
| logger.error(f"Error processing file: {str(e)}") | |
| return ( | |
| f"File uploaded: {file.name if hasattr(file, 'name') else 'Unknown file'} - Error processing file: {str(e)}", | |
| None | |
| ) | |
| def handle_contact_form(name, email, message): | |
| if not name or not email or not message: | |
| return "Error: All fields (Name, Email, Message) are required." | |
| sentiment = sentiment_analyzer(message)[0]["label"] if sentiment_analyzer else "Unknown" | |
| message_with_sentiment = f"{message} [Sentiment: {sentiment}]" | |
| global_data["contact_us"] = {"name": name, "email": email, "message": message_with_sentiment} | |
| return f"Data saved locally. Click 'Submit to Salesforce' to create record." | |
| def submit_to_salesforce(): | |
| if not sf: | |
| return "Error: Salesforce connection not established." | |
| if not sentiment_analyzer: | |
| return "Error: Hugging Face model not loaded." | |
| try: | |
| results = [] | |
| # Calculate Scores tab | |
| if global_data["calculate_scores"] and "error" not in global_data["calculate_scores"]: | |
| result = global_data["calculate_scores"] | |
| vendor_id = result["vendor_id"] | |
| vendor_query = sf.query(f"SELECT Id FROM Vendor__c WHERE Vendor_ID__c = '{vendor_id}'") | |
| if vendor_query['totalSize'] == 0: | |
| results.append(f"Error: Vendor with ID {vendor_id} not found in Salesforce.") | |
| else: | |
| vendor_id_sf = vendor_query['records'][0]['Id'] | |
| response = sf.Vendor_Performance__c.create({ | |
| 'Vendor_ID__c': vendor_id, | |
| 'Timeliness_Score__c': result["timeliness_score"], | |
| 'Issue_Count__c': result["issue_score"] / 3, | |
| 'Feedback_Rating__c': result["feedback_score"] / 0.3, | |
| 'Evaluation_Date__c': result["evaluation_date"], | |
| 'Score__c': result["score"], | |
| 'Rationale__c': result["rationale"], | |
| 'Vendor__c': vendor_id_sf, | |
| 'Alert_Flag__c': result["alert_flag"] | |
| }) | |
| logger.info(f"Created Vendor_Performance__c record for vendor: {vendor_id}, Response: {response}") | |
| results.append(f"Vendor_Performance__c created for {vendor_id}. Trigger fired.") | |
| global_data["calculate_scores"] = None | |
| # Contact Us tab | |
| if global_data["contact_us"]: | |
| name = global_data["contact_us"]["name"] | |
| email = global_data["contact_us"]["email"] | |
| message = global_data["contact_us"]["message"] | |
| vendor_query = sf.query(f"SELECT Id FROM Vendor__c WHERE Contact_Email__c = '{email}'") | |
| if vendor_query['totalSize'] > 0: | |
| vendor_id_sf = vendor_query['records'][0]['Id'] | |
| response = sf.Vendor__c.update(vendor_id_sf, { | |
| 'Vendor_Name__c': name, | |
| 'Contact_Email__c': email, | |
| 'Comments__c': message | |
| }) | |
| logger.info(f"Updated Vendor__c record for email: {email}, Response: {response}") | |
| results.append(f"Vendor__c updated for {email}. Trigger fired.") | |
| else: | |
| response = sf.Vendor__c.create({ | |
| 'Vendor_Name__c': name, | |
| 'Contact_Email__c': email, | |
| 'Comments__c': message, | |
| 'Vendor_ID__c': f"V{datetime.now().strftime('%Y%m%d%H%M%S')}" | |
| }) | |
| logger.info(f"Created Vendor__c record for email: {email}, Response: {response}") | |
| results.append(f"Vendor__c created for {email}. Trigger fired.") | |
| global_data["contact_us"] = None | |
| # Upload File tab | |
| if global_data["upload_file"]: | |
| for result in global_data["upload_file"]: | |
| vendor_id = result["vendor_id"] | |
| vendor_query = sf.query(f"SELECT Id FROM Vendor__c WHERE Vendor_ID__c = '{vendor_id}'") | |
| if vendor_query['totalSize'] == 0: | |
| results.append(f"Error: Vendor with ID {vendor_id} not found in Salesforce.") | |
| continue | |
| vendor_id_sf = vendor_query['records'][0]['Id'] | |
| response = sf.Vendor_Performance__c.create({ | |
| 'Vendor_ID__c': vendor_id, | |
| 'Timeliness_Score__c': result["timeliness_score"], | |
| 'Issue_Count__c': result["issue_score"] / 3, | |
| 'Feedback_Rating__c': result["feedback_score"] / 0.3, | |
| 'Evaluation_Date__c': result["evaluation_date"], | |
| 'Score__c': result["score"], | |
| 'Rationale__c': result["rationale"], | |
| 'Vendor__c': vendor_id_sf, | |
| 'Alert_Flag__c': result["alert_flag"] | |
| }) | |
| logger.info(f"Created Vendor_Performance__c record for vendor: {vendor_id}, Response: {response}") | |
| results.append(f"Vendor_Performance__c created for {vendor_id}. Trigger fired.") | |
| global_data["upload_file"] = None | |
| return "\n".join(results) if results else "No data to submit." | |
| except Exception as e: | |
| logger.error(f"Error submitting to Salesforce: {str(e)}") | |
| return f"Error submitting to Salesforce: {str(e)}" | |
| with gr.Blocks(title="Vendor Performance Scoring") as demo: | |
| gr.Markdown("# Vendor Performance Scoring System") | |
| gr.Markdown("Enter data in any tab and click 'Submit to Salesforce' to save records with Hugging Face sentiment analysis and trigger Apex actions.") | |
| submit_salesforce_btn = gr.Button("Submit to Salesforce", variant="primary") | |
| salesforce_output = gr.Textbox(label="Salesforce Submission Status", placeholder="Click 'Submit to Salesforce' to save data", interactive=False) | |
| with gr.Tabs(): | |
| with gr.Tab("Calculate Scores"): | |
| gr.Markdown("### Enter Vendor Details") | |
| gr.Markdown("Provide the vendor details below to calculate performance scores. All fields are required.") | |
| with gr.Row(): | |
| vendor_id = gr.Textbox(label="Vendor ID", placeholder="e.g., V001", value="") | |
| delivery_records = gr.Slider(minimum=0, maximum=100, step=1, label="Delivery Records (% On-Time)", value=0) | |
| with gr.Row(): | |
| issue_counts = gr.Number(label="Issue Counts", value=0, step=1, minimum=0) | |
| nps_values = gr.Slider(minimum=0, maximum=100, step=1, label="NPS Value", value=0) | |
| with gr.Row(): | |
| calculate_btn = gr.Button("Calculate Scores", variant="primary") | |
| clear_btn = gr.Button("Clear Inputs") | |
| error_output = gr.Textbox(label="Error Message", placeholder="No errors", interactive=False) | |
| score_output = gr.Dataframe( | |
| label="Scoring Results", | |
| headers=["Vendor ID", "Score", "Timeliness Score", "Issue Score", "Feedback Score", "Evaluation Date", "Rationale", "Alert Flag"], | |
| wrap=True | |
| ) | |
| calculate_btn.click( | |
| fn=format_output, | |
| inputs=[vendor_id, delivery_records, issue_counts, nps_values], | |
| outputs=[error_output, score_output] | |
| ) | |
| clear_btn.click( | |
| fn=lambda: ("", 0, 0, 0, "No errors", None), | |
| inputs=None, | |
| outputs=[vendor_id, delivery_records, issue_counts, nps_values, error_output, score_output] | |
| ) | |
| with gr.Tab("Contact Us"): | |
| gr.Markdown("### Contact Us") | |
| gr.Markdown("Fill out the form below. Data will be saved when you click 'Submit to Salesforce'.") | |
| with gr.Row(): | |
| name_input = gr.Textbox(label="Name", placeholder="Your name") | |
| email_input = gr.Textbox(label="Email", placeholder="your.email@example.com") | |
| message_input = gr.Textbox(label="Message", placeholder="Your message or feedback", lines=3) | |
| contact_save_btn = gr.Button("Save Contact Data", variant="primary") | |
| contact_output = gr.Textbox(label="Contact Form Status", placeholder="Click 'Save Contact Data' to save locally", interactive=False) | |
| contact_save_btn.click( | |
| fn=handle_contact_form, | |
| inputs=[name_input, email_input, message_input], | |
| outputs=contact_output | |
| ) | |
| with gr.Tab("Upload File"): | |
| gr.Markdown("### Upload Supporting Documents") | |
| gr.Markdown("Upload a CSV file with columns: vendor_id, delivery_records, issue_counts, nps_values.") | |
| file_input = gr.File(label="Upload File") | |
| file_output = gr.Textbox(label="Upload Status", placeholder="Upload a file to see status", interactive=False) | |
| upload_score_output = gr.Dataframe( | |
| label="Scoring Results", | |
| headers=["Vendor ID", "Score", "Timeliness Score", "Issue Score", "Feedback Score", "Evaluation Date", "Rationale", "Alert Flag"], | |
| wrap=True | |
| ) | |
| file_input.change( | |
| fn=process_uploaded_file, | |
| inputs=file_input, | |
| outputs=[file_output, upload_score_output] | |
| ) | |
| submit_salesforce_btn.click( | |
| fn=submit_to_salesforce, | |
| inputs=None, | |
| outputs=salesforce_output | |
| ) | |
| if __name__ == "__main__": | |
| demo.launch(server_name="0.0.0.0", server_port=7860) |