import pandas as pd from datetime import datetime from transformers import pipeline # --- Constants --- ALERT_THRESHOLD = 60 # Threshold for flagging low-performing vendors DAYS_PER_MONTH = 30 # --- Helper Functions --- def calculate_quality_score(incident_logs): """ Calculates a quality score based on the number and severity of incident logs. Args: incident_logs (str): A string containing incident log details. Returns: float: A score between 0 and 100, where 100 is the highest quality. """ if not incident_logs: return 100 # Perfect score if no incidents # Basic keyword matching for severity (can be expanded) high_severity_keywords = ['major', 'critical', 'severe', 'fatality'] medium_severity_keywords = ['minor', 'moderate', 'injury'] low_severity_keywords = ['near miss', 'warning', 'caution'] high_count = sum(1 for keyword in high_severity_keywords if keyword in incident_logs.lower()) medium_count = sum(1 for keyword in medium_severity_keywords if keyword in incident_logs.lower()) low_count = sum(1 for keyword in low_severity_keywords if keyword in incident_logs.lower()) # Weighted scoring (adjust weights as needed) score = 100 - (high_count * 20 + medium_count * 10 + low_count * 5) return max(0, score) # Ensure score doesn't go below 0 def calculate_timeliness_score(work_completion_details, delay_reports, log_date): """ Calculates a timeliness score based on work completion details, delay reports, and the log date. Args: work_completion_details (str): Details of work completion. delay_reports (str): Reports of delays. log_date (str): The date of the log (YYYY-MM-DD). Returns: float: A score between 0 and 100, where 100 is perfectly on time. """ if not work_completion_details: return 100 log_date_obj = datetime.strptime(log_date, '%Y-%m-%d') # Assume a 30-day window for "on time" (can be adjusted) completion_window_end = log_date_obj # Check for explicit "on time" completion if "on time" in work_completion_details.lower(): return 100 # Penalize for delay reports delay_penalty = 0 if delay_reports: delay_penalty = len(delay_reports.split(',')) * 15 # 15 points per delay report (adjust as needed) # Very basic check for "late" or "delayed" if "late" in work_completion_details.lower() or "delayed" in work_completion_details.lower(): return max(0, 50 - delay_penalty) return max(0, 100 - delay_penalty) # cap at 100 def calculate_safety_score(incident_logs): """ Calculates a safety score based on the presence of incident logs. Args: incident_logs (str): A string containing incident log details. Returns: float: 100 if no incidents, otherwise a lower score. """ if not incident_logs: return 100 else: # Further logic can be added to differentiate severity return max(0, 80 - len(incident_logs.split(',')) * 10) # Reduce score per incident def calculate_communication_score(work_completion_details): """ Calculates a communication score based on the work completion details. Uses a simple sentiment analysis. Args: work_completion_details (str): Details of work completion. Returns: float: A score between 0 and 100. """ if not work_completion_details: return 100 # Initialize sentiment analysis pipeline sentiment_analyzer = pipeline("sentiment-analysis-ssbert-large-en") # More robust model try: result = sentiment_analyzer(work_completion_details) sentiment = result[0]['label'] # Get the sentiment label confidence = result[0]['score'] if sentiment == 'POSITIVE': return 100 elif sentiment == 'NEGATIVE': return max(0, 60 * confidence) # Scale the negative impact by confidence else: # NEUTRAL return 80 except Exception as e: print(f"Error in sentiment analysis: {e}") return 80 # Return a neutral score on error def calculate_final_score(quality_score, timeliness_score, safety_score, communication_score): """ Calculates a final score based on weighted averages of the individual scores. Args: quality_score (float): The quality score. timeliness_score (float): The timeliness score. safety_score (float): The safety score. communication_score (float): The communication score. Returns: float: The final score, between 0 and 100. """ # Weights (can be adjusted) quality_weight = 0.4 timeliness_weight = 0.3 safety_weight = 0.2 communication_weight = 0.1 final_score = ( quality_weight * quality_score + timeliness_weight * timeliness_score + safety_weight * safety_score + communication_weight * communication_score ) return final_score def generate_performance_report(vendor_id, scores, month, trend_data=None): """ Generates a performance report (as a dictionary). Includes a placeholder for certificate generation. Args: vendor_id (str): The ID of the vendor. scores (dict): A dictionary containing the vendor's scores. month (str): The month for the report (e.g., "2024-01"). trend_data (dict, optional): Trend data for the vendor. Defaults to None. Returns: dict: A dictionary containing the performance report. """ report = { 'vendor_id': vendor_id, 'month': month, 'quality': scores['quality'], 'timeliness': scores['timeliness'], 'safety': scores['safety'], 'communication': scores['communication'], 'final_score': scores['final_score'], 'alert_flag': scores['final_score'] < ALERT_THRESHOLD, 'certificate_url': f"/certificates/{vendor_id}_{month}.pdf", # Placeholder URL } if trend_data: report['trend_deviation'] = trend_data.get('trend_deviation', 0) else: report['trend_deviation'] = 0 return report def process_vendor_logs(vendor_logs): """ Processes a list of vendor logs, calculates scores, and generates performance reports. Args: vendor_logs (list): A list of dictionaries, where each dictionary represents a vendor log and contains the keys 'vendor_id', 'work_completion_details', 'delay_reports', 'incident_logs', and 'log_date'. Returns: list: A list of performance report dictionaries, ready for Salesforce. """ reports = [] for log in vendor_logs: try: vendor_id = log['vendor_id'] work_completion_details = log['work_completion_details'] delay_reports = log['delay_reports'] incident_logs = log['incident_logs'] log_date = log['log_date'] # Assuming YYYY-MM-DD format quality_score = calculate_quality_score(incident_logs) timeliness_score = calculate_timeliness_score(work_completion_details, delay_reports, log_date) safety_score = calculate_safety_score(incident_logs) communication_score = calculate_communication_score(work_completion_details) final_score = calculate_final_score(quality_score, timeliness_score, safety_score, communication_score) scores = { 'quality': quality_score, 'timeliness': timeliness_score, 'safety': safety_score, 'communication': communication_score, 'final_score': final_score, } # Basic Trend Detection (Example) # In a real scenario, you'd fetch previous months' scores from Salesforce # and calculate a trend. This is a placeholder. trend_data = None # Placeholder logic: If current score is more than 10 points lower # than a hypothetical previous month, we have a negative trend. # previous_month_score = get_previous_month_score(vendor_id, log_date) #from salesforce # if previous_month_score and (final_score < previous_month_score - 10): # trend_data = {'trend_deviation': -1} # Negative trend # elif previous_month_score and (final_score > previous_month_score + 10): # trend_data = {'trend_deviation': 1} # else: # trend_data = {'trend_deviation': 0} report = generate_performance_report(vendor_id, scores, log_date[:7], trend_data) # Use YYYY-MM reports.append(report) except Exception as e: print(f"Error processing log for vendor {log.get('vendor_id', 'Unknown')}: {e}") # Consider logging the error to a file or database for further analysis # You might also want to raise the exception if it's critical # to stop processing. For now, we'll just continue to the next log. return reports