kamalakn's picture
Create agent/utils.py
6e2c94a verified
import pandas as pd
import numpy as np
from docx import Document
import tempfile
import os
import re
from typing import List, Dict, Any, Union, Optional
import io
import matplotlib.pyplot as plt
#import seaborn as sns
class FileProcessor:
"""Utility class for handling file operations"""
@staticmethod
def read_excel_file(file) -> pd.DataFrame:
"""
Read Excel or CSV file and return DataFrame
"""
try:
if file.name.endswith('.csv'):
df = pd.read_csv(file)
elif file.name.endswith(('.xlsx', '.xls')):
df = pd.read_excel(file)
else:
raise ValueError(f"Unsupported file format: {file.name}")
# Clean column names
df.columns = df.columns.str.strip()
return df
except Exception as e:
raise Exception(f"Error reading file {file.name}: {str(e)}")
@staticmethod
def save_temp_docx(uploaded_file) -> str:
"""
Save uploaded Word document to temporary file and return path
"""
with tempfile.NamedTemporaryFile(delete=False, suffix='.docx') as tmp_file:
tmp_file.write(uploaded_file.read())
return tmp_file.name
@staticmethod
def cleanup_temp_file(file_path: str):
"""
Clean up temporary file
"""
try:
if os.path.exists(file_path):
os.unlink(file_path)
except Exception as e:
print(f"Warning: Could not delete temp file {file_path}: {e}")
class DataValidator:
"""Utility class for data validation"""
@staticmethod
def validate_supplier_data(df: pd.DataFrame) -> Dict[str, Any]:
"""
Validate supplier data DataFrame and return validation results
"""
validation_results = {
"is_valid": True,
"errors": [],
"warnings": [],
"summary": {}
}
required_columns = ['Supplier', 'OnTimeDeliveryRate', 'DefectRate']
# Check for required columns
missing_columns = [col for col in required_columns if col not in df.columns]
if missing_columns:
validation_results["is_valid"] = False
validation_results["errors"].append(f"Missing required columns: {missing_columns}")
# Check data types and ranges
if 'OnTimeDeliveryRate' in df.columns:
invalid_rates = df[
(df['OnTimeDeliveryRate'] < 0) | (df['OnTimeDeliveryRate'] > 100)
]
if not invalid_rates.empty:
validation_results["warnings"].append(
f"OnTimeDeliveryRate should be between 0-100. Found {len(invalid_rates)} invalid values."
)
if 'DefectRate' in df.columns:
invalid_defects = df[(df['DefectRate'] < 0) | (df['DefectRate'] > 100)]
if not invalid_defects.empty:
validation_results["warnings"].append(
f"DefectRate should be between 0-100. Found {len(invalid_defects)} invalid values."
)
# Summary statistics
validation_results["summary"] = {
"total_rows": len(df),
"total_columns": len(df.columns),
"duplicate_suppliers": df.duplicated(subset=['Supplier']).sum() if 'Supplier' in df.columns else 0,
"missing_values": df.isnull().sum().to_dict()
}
return validation_results
@staticmethod
def clean_supplier_data(df: pd.DataFrame) -> pd.DataFrame:
"""
Clean and preprocess supplier data
"""
df_clean = df.copy()
# Remove leading/trailing whitespace from string columns
string_columns = df_clean.select_dtypes(include=['object']).columns
for col in string_columns:
df_clean[col] = df_clean[col].astype(str).str.strip()
# Handle missing values
numeric_columns = df_clean.select_dtypes(include=[np.number]).columns
for col in numeric_columns:
df_clean[col] = pd.to_numeric(df_clean[col], errors='coerce')
# Remove duplicate suppliers (keep first occurrence)
if 'Supplier' in df_clean.columns:
df_clean = df_clean.drop_duplicates(subset=['Supplier'], keep='first')
return df_clean
class AuditProcessor:
"""Utility class for processing audit documents"""
@staticmethod
def extract_text_from_docx(file_path: str) -> str:
"""
Extract text from Word document
"""
try:
doc = Document(file_path)
return "\n".join([para.text for para in doc.paragraphs])
except Exception as e:
raise Exception(f"Error reading document: {str(e)}")
@staticmethod
def extract_supplier_name(content: str, filename: str) -> str:
"""
Extract supplier name from document content or filename
"""
# Try to find supplier name in content using regex
patterns = [
r'SUPPLIER\s+(\d+)S?', # SUPPLIER 1S, SUPPLIER 2, etc.
r'Supplier\s+(\d+)', # Supplier 1, Supplier 2, etc.
r'supplier\s+(\d+)', # supplier 1, supplier 2, etc.
]
for pattern in patterns:
match = re.search(pattern, content, re.IGNORECASE)
if match:
return f"Supplier {match.group(1)}"
# Fallback to filename parsing
filename_patterns = [
r'Audit.*Report.*?(\d+)', # Audit_Report_1.docx, etc.
r'supplier.*?(\d+)', # supplier_1_audit.docx, etc.
r'audit.*?(\d+)', # audit_1.docx, etc.
]
for pattern in filename_patterns:
match = re.search(pattern, filename, re.IGNORECASE)
if match:
return f"Supplier {match.group(1)}"
# Final fallback
return f"Supplier from {filename}"
@staticmethod
def count_audit_findings(content: str) -> int:
"""
Count audit findings in document content using multiple approaches
"""
content_lower = content.lower()
# Initialize count
total_findings = 0
# Method 1: Look for explicit numbers
explicit_patterns = [
r'(\d+)\s+major\s+finding',
r'(\d+)\s+minor\s+finding',
r'(\d+)\s+finding',
r'no\s+findings?\s+observed',
r'zero\s+findings?'
]
for pattern in explicit_patterns:
matches = re.findall(pattern, content_lower)
if matches:
if 'no' in pattern or 'zero' in pattern:
return 0
for match in matches:
total_findings += int(match)
# Method 2: Count individual mentions if no explicit numbers found
if total_findings == 0:
finding_keywords = [
'major finding', 'minor finding', 'critical finding',
'observation', 'non-conformance', 'deficiency'
]
for keyword in finding_keywords:
count = content_lower.count(keyword)
# Weight different types of findings
if 'major' in keyword or 'critical' in keyword:
total_findings += count * 2 # Major findings count double
else:
total_findings += count
# Method 3: Special case handling
if 'no findings' in content_lower:
return 0
return total_findings
@staticmethod
def extract_audit_metadata(content: str) -> Dict[str, Any]:
"""
Extract metadata from audit document
"""
metadata = {
'audit_type': 'Unknown',
'location': 'Unknown',
'auditor': 'Unknown',
'date': 'Unknown',
'iso_standard': None
}
# Extract audit type
if re.search(r'surveillance\s+audit', content, re.IGNORECASE):
metadata['audit_type'] = 'Surveillance'
elif re.search(r'certification\s+audit', content, re.IGNORECASE):
metadata['audit_type'] = 'Certification'
# Extract location
location_match = re.search(r'located\s+in\s+([^.]+)', content, re.IGNORECASE)
if location_match:
metadata['location'] = location_match.group(1).strip()
# Extract ISO standard
iso_match = re.search(r'ISO\s+(\d+:\d+)', content)
if iso_match:
metadata['iso_standard'] = iso_match.group(1)
return metadata
class ChartGenerator:
"""Utility class for generating charts and visualizations"""
@staticmethod
def create_performance_chart(scores: Dict[str, float],
title: str = "Supplier Performance Scores",
figsize: tuple = (10, 6)) -> str:
"""
Create a performance chart and return the file path
"""
plt.figure(figsize=figsize)
# Sort scores for better visualization
sorted_scores = dict(sorted(scores.items(), key=lambda x: x[1], reverse=True))
bars = plt.bar(sorted_scores.keys(), sorted_scores.values(),
color='skyblue', edgecolor='navy', alpha=0.7)
# Add value labels on bars
for bar in bars:
height = bar.get_height()
plt.text(bar.get_x() + bar.get_width()/2., height + 1,
f'{height:.1f}', ha='center', va='bottom', fontweight='bold')
plt.title(title, fontsize=16, fontweight='bold')
plt.ylabel("Performance Score", fontsize=12)
plt.xlabel("Suppliers", fontsize=12)
plt.xticks(rotation=45, ha='right')
plt.grid(axis='y', alpha=0.3)
plt.tight_layout()
# Save to temporary file
chart_path = tempfile.mktemp(suffix='.png')
plt.savefig(chart_path, dpi=300, bbox_inches='tight')
plt.close()
return chart_path
@staticmethod
def create_distribution_chart(data: Dict[str, Any],
title: str = "Distribution Analysis") -> str:
"""
Create distribution charts for various metrics
"""
fig, axes = plt.subplots(2, 2, figsize=(12, 8))
fig.suptitle(title, fontsize=16, fontweight='bold')
# Example: You can expand this based on your data structure
# This is a template that can be customized
chart_path = tempfile.mktemp(suffix='.png')
plt.savefig(chart_path, dpi=300, bbox_inches='tight')
plt.close()
return chart_path
class ScoreCalculator:
"""Utility class for calculating various performance scores"""
@staticmethod
def calculate_weighted_score(metrics: Dict[str, float],
weights: Dict[str, float]) -> float:
"""
Calculate weighted performance score
"""
total_score = 0.0
total_weight = 0.0
for metric, value in metrics.items():
if metric in weights:
weight = weights[metric]
total_score += value * weight
total_weight += weight
return total_score / total_weight if total_weight > 0 else 0.0
@staticmethod
def normalize_score(score: float, min_val: float = 0, max_val: float = 100) -> float:
"""
Normalize score to a specific range
"""
return max(min_val, min(max_val, score))
@staticmethod
def calculate_performance_tier(score: float) -> str:
"""
Determine performance tier based on score
"""
if score >= 90:
return "Excellent"
elif score >= 80:
return "Good"
elif score >= 70:
return "Satisfactory"
elif score >= 60:
return "Needs Improvement"
else:
return "Poor"
class ReportFormatter:
"""Utility class for formatting reports and outputs"""
@staticmethod
def format_score_table(scores: Dict[str, float]) -> pd.DataFrame:
"""
Format scores into a nicely structured DataFrame
"""
df = pd.DataFrame(list(scores.items()), columns=['Supplier', 'Score'])
df = df.sort_values('Score', ascending=False)
df['Rank'] = range(1, len(df) + 1)
df['Performance Tier'] = df['Score'].apply(ScoreCalculator.calculate_performance_tier)
df['Score'] = df['Score'].round(2)
return df[['Rank', 'Supplier', 'Score', 'Performance Tier']]
@staticmethod
def format_findings_summary(findings: Dict[str, int]) -> pd.DataFrame:
"""
Format audit findings into a structured DataFrame
"""
df = pd.DataFrame(list(findings.items()),
columns=['Supplier', 'Findings Count'])
df['Risk Level'] = df['Findings Count'].apply(
lambda x: 'High' if x > 3 else 'Medium' if x > 1 else 'Low'
)
return df.sort_values('Findings Count', ascending=False)
# Helper functions for common operations
def safe_divide(numerator: float, denominator: float, default: float = 0.0) -> float:
"""
Safely divide two numbers, returning default if denominator is zero
"""
return numerator / denominator if denominator != 0 else default
def extract_numeric_value(text: str, default: float = 0.0) -> float:
"""
Extract numeric value from text string
"""
if pd.isna(text):
return default
# Try to extract number from string
numbers = re.findall(r'\d+\.?\d*', str(text))
if numbers:
return float(numbers[0])
return default
def clean_text(text: str) -> str:
"""
Clean and normalize text
"""
if pd.isna(text):
return ""
# Remove extra whitespace and normalize
cleaned = re.sub(r'\s+', ' ', str(text).strip())
return cleaned