varshakolanu's picture
Create model.py
af935dc verified
import pandas as pd
from datetime import datetime
from transformers import pipeline
# --- Constants ---
ALERT_THRESHOLD = 60 # Threshold for flagging low-performing vendors
DAYS_PER_MONTH = 30
# --- Helper Functions ---
def calculate_quality_score(incident_logs):
"""
Calculates a quality score based on the number and severity of incident logs.
Args:
incident_logs (str): A string containing incident log details.
Returns:
float: A score between 0 and 100, where 100 is the highest quality.
"""
if not incident_logs:
return 100 # Perfect score if no incidents
# Basic keyword matching for severity (can be expanded)
high_severity_keywords = ['major', 'critical', 'severe', 'fatality']
medium_severity_keywords = ['minor', 'moderate', 'injury']
low_severity_keywords = ['near miss', 'warning', 'caution']
high_count = sum(1 for keyword in high_severity_keywords if keyword in incident_logs.lower())
medium_count = sum(1 for keyword in medium_severity_keywords if keyword in incident_logs.lower())
low_count = sum(1 for keyword in low_severity_keywords if keyword in incident_logs.lower())
# Weighted scoring (adjust weights as needed)
score = 100 - (high_count * 20 + medium_count * 10 + low_count * 5)
return max(0, score) # Ensure score doesn't go below 0
def calculate_timeliness_score(work_completion_details, delay_reports, log_date):
"""
Calculates a timeliness score based on work completion details, delay reports,
and the log date.
Args:
work_completion_details (str): Details of work completion.
delay_reports (str): Reports of delays.
log_date (str): The date of the log (YYYY-MM-DD).
Returns:
float: A score between 0 and 100, where 100 is perfectly on time.
"""
if not work_completion_details:
return 100
log_date_obj = datetime.strptime(log_date, '%Y-%m-%d')
# Assume a 30-day window for "on time" (can be adjusted)
completion_window_end = log_date_obj
# Check for explicit "on time" completion
if "on time" in work_completion_details.lower():
return 100
# Penalize for delay reports
delay_penalty = 0
if delay_reports:
delay_penalty = len(delay_reports.split(',')) * 15 # 15 points per delay report (adjust as needed)
# Very basic check for "late" or "delayed"
if "late" in work_completion_details.lower() or "delayed" in work_completion_details.lower():
return max(0, 50 - delay_penalty)
return max(0, 100 - delay_penalty) # cap at 100
def calculate_safety_score(incident_logs):
"""
Calculates a safety score based on the presence of incident logs.
Args:
incident_logs (str): A string containing incident log details.
Returns:
float: 100 if no incidents, otherwise a lower score.
"""
if not incident_logs:
return 100
else:
# Further logic can be added to differentiate severity
return max(0, 80 - len(incident_logs.split(',')) * 10) # Reduce score per incident
def calculate_communication_score(work_completion_details):
"""
Calculates a communication score based on the work completion details.
Uses a simple sentiment analysis.
Args:
work_completion_details (str): Details of work completion.
Returns:
float: A score between 0 and 100.
"""
if not work_completion_details:
return 100
# Initialize sentiment analysis pipeline
sentiment_analyzer = pipeline("sentiment-analysis-ssbert-large-en") # More robust model
try:
result = sentiment_analyzer(work_completion_details)
sentiment = result[0]['label'] # Get the sentiment label
confidence = result[0]['score']
if sentiment == 'POSITIVE':
return 100
elif sentiment == 'NEGATIVE':
return max(0, 60 * confidence) # Scale the negative impact by confidence
else: # NEUTRAL
return 80
except Exception as e:
print(f"Error in sentiment analysis: {e}")
return 80 # Return a neutral score on error
def calculate_final_score(quality_score, timeliness_score, safety_score, communication_score):
"""
Calculates a final score based on weighted averages of the individual scores.
Args:
quality_score (float): The quality score.
timeliness_score (float): The timeliness score.
safety_score (float): The safety score.
communication_score (float): The communication score.
Returns:
float: The final score, between 0 and 100.
"""
# Weights (can be adjusted)
quality_weight = 0.4
timeliness_weight = 0.3
safety_weight = 0.2
communication_weight = 0.1
final_score = (
quality_weight * quality_score +
timeliness_weight * timeliness_score +
safety_weight * safety_score +
communication_weight * communication_score
)
return final_score
def generate_performance_report(vendor_id, scores, month, trend_data=None):
"""
Generates a performance report (as a dictionary). Includes a placeholder for
certificate generation.
Args:
vendor_id (str): The ID of the vendor.
scores (dict): A dictionary containing the vendor's scores.
month (str): The month for the report (e.g., "2024-01").
trend_data (dict, optional): Trend data for the vendor. Defaults to None.
Returns:
dict: A dictionary containing the performance report.
"""
report = {
'vendor_id': vendor_id,
'month': month,
'quality': scores['quality'],
'timeliness': scores['timeliness'],
'safety': scores['safety'],
'communication': scores['communication'],
'final_score': scores['final_score'],
'alert_flag': scores['final_score'] < ALERT_THRESHOLD,
'certificate_url': f"/certificates/{vendor_id}_{month}.pdf", # Placeholder URL
}
if trend_data:
report['trend_deviation'] = trend_data.get('trend_deviation', 0)
else:
report['trend_deviation'] = 0
return report
def process_vendor_logs(vendor_logs):
"""
Processes a list of vendor logs, calculates scores, and generates performance reports.
Args:
vendor_logs (list): A list of dictionaries, where each dictionary represents
a vendor log and contains the keys 'vendor_id',
'work_completion_details', 'delay_reports', 'incident_logs', and 'log_date'.
Returns:
list: A list of performance report dictionaries, ready for Salesforce.
"""
reports = []
for log in vendor_logs:
try:
vendor_id = log['vendor_id']
work_completion_details = log['work_completion_details']
delay_reports = log['delay_reports']
incident_logs = log['incident_logs']
log_date = log['log_date'] # Assuming YYYY-MM-DD format
quality_score = calculate_quality_score(incident_logs)
timeliness_score = calculate_timeliness_score(work_completion_details, delay_reports, log_date)
safety_score = calculate_safety_score(incident_logs)
communication_score = calculate_communication_score(work_completion_details)
final_score = calculate_final_score(quality_score, timeliness_score, safety_score, communication_score)
scores = {
'quality': quality_score,
'timeliness': timeliness_score,
'safety': safety_score,
'communication': communication_score,
'final_score': final_score,
}
# Basic Trend Detection (Example)
# In a real scenario, you'd fetch previous months' scores from Salesforce
# and calculate a trend. This is a placeholder.
trend_data = None
# Placeholder logic: If current score is more than 10 points lower
# than a hypothetical previous month, we have a negative trend.
# previous_month_score = get_previous_month_score(vendor_id, log_date) #from salesforce
# if previous_month_score and (final_score < previous_month_score - 10):
# trend_data = {'trend_deviation': -1} # Negative trend
# elif previous_month_score and (final_score > previous_month_score + 10):
# trend_data = {'trend_deviation': 1}
# else:
# trend_data = {'trend_deviation': 0}
report = generate_performance_report(vendor_id, scores, log_date[:7], trend_data) # Use YYYY-MM
reports.append(report)
except Exception as e:
print(f"Error processing log for vendor {log.get('vendor_id', 'Unknown')}: {e}")
# Consider logging the error to a file or database for further analysis
# You might also want to raise the exception if it's critical
# to stop processing. For now, we'll just continue to the next log.
return reports