| | import pandas as pd |
| | import numpy as np |
| | from docx import Document |
| | import tempfile |
| | import os |
| | import re |
| | from typing import List, Dict, Any, Union, Optional |
| | import io |
| | import matplotlib.pyplot as plt |
| | |
| |
|
| | class FileProcessor: |
| | """Utility class for handling file operations""" |
| | |
| | @staticmethod |
| | def read_excel_file(file) -> pd.DataFrame: |
| | """ |
| | Read Excel or CSV file and return DataFrame |
| | """ |
| | try: |
| | if file.name.endswith('.csv'): |
| | df = pd.read_csv(file) |
| | elif file.name.endswith(('.xlsx', '.xls')): |
| | df = pd.read_excel(file) |
| | else: |
| | raise ValueError(f"Unsupported file format: {file.name}") |
| | |
| | |
| | df.columns = df.columns.str.strip() |
| | return df |
| | |
| | except Exception as e: |
| | raise Exception(f"Error reading file {file.name}: {str(e)}") |
| | |
| | @staticmethod |
| | def save_temp_docx(uploaded_file) -> str: |
| | """ |
| | Save uploaded Word document to temporary file and return path |
| | """ |
| | with tempfile.NamedTemporaryFile(delete=False, suffix='.docx') as tmp_file: |
| | tmp_file.write(uploaded_file.read()) |
| | return tmp_file.name |
| | |
| | @staticmethod |
| | def cleanup_temp_file(file_path: str): |
| | """ |
| | Clean up temporary file |
| | """ |
| | try: |
| | if os.path.exists(file_path): |
| | os.unlink(file_path) |
| | except Exception as e: |
| | print(f"Warning: Could not delete temp file {file_path}: {e}") |
| |
|
| | class DataValidator: |
| | """Utility class for data validation""" |
| | |
| | @staticmethod |
| | def validate_supplier_data(df: pd.DataFrame) -> Dict[str, Any]: |
| | """ |
| | Validate supplier data DataFrame and return validation results |
| | """ |
| | validation_results = { |
| | "is_valid": True, |
| | "errors": [], |
| | "warnings": [], |
| | "summary": {} |
| | } |
| | |
| | required_columns = ['Supplier', 'OnTimeDeliveryRate', 'DefectRate'] |
| | |
| | |
| | missing_columns = [col for col in required_columns if col not in df.columns] |
| | if missing_columns: |
| | validation_results["is_valid"] = False |
| | validation_results["errors"].append(f"Missing required columns: {missing_columns}") |
| | |
| | |
| | if 'OnTimeDeliveryRate' in df.columns: |
| | invalid_rates = df[ |
| | (df['OnTimeDeliveryRate'] < 0) | (df['OnTimeDeliveryRate'] > 100) |
| | ] |
| | if not invalid_rates.empty: |
| | validation_results["warnings"].append( |
| | f"OnTimeDeliveryRate should be between 0-100. Found {len(invalid_rates)} invalid values." |
| | ) |
| | |
| | if 'DefectRate' in df.columns: |
| | invalid_defects = df[(df['DefectRate'] < 0) | (df['DefectRate'] > 100)] |
| | if not invalid_defects.empty: |
| | validation_results["warnings"].append( |
| | f"DefectRate should be between 0-100. Found {len(invalid_defects)} invalid values." |
| | ) |
| | |
| | |
| | validation_results["summary"] = { |
| | "total_rows": len(df), |
| | "total_columns": len(df.columns), |
| | "duplicate_suppliers": df.duplicated(subset=['Supplier']).sum() if 'Supplier' in df.columns else 0, |
| | "missing_values": df.isnull().sum().to_dict() |
| | } |
| | |
| | return validation_results |
| | |
| | @staticmethod |
| | def clean_supplier_data(df: pd.DataFrame) -> pd.DataFrame: |
| | """ |
| | Clean and preprocess supplier data |
| | """ |
| | df_clean = df.copy() |
| | |
| | |
| | string_columns = df_clean.select_dtypes(include=['object']).columns |
| | for col in string_columns: |
| | df_clean[col] = df_clean[col].astype(str).str.strip() |
| | |
| | |
| | numeric_columns = df_clean.select_dtypes(include=[np.number]).columns |
| | for col in numeric_columns: |
| | df_clean[col] = pd.to_numeric(df_clean[col], errors='coerce') |
| | |
| | |
| | if 'Supplier' in df_clean.columns: |
| | df_clean = df_clean.drop_duplicates(subset=['Supplier'], keep='first') |
| | |
| | return df_clean |
| |
|
| | class AuditProcessor: |
| | """Utility class for processing audit documents""" |
| | |
| | @staticmethod |
| | def extract_text_from_docx(file_path: str) -> str: |
| | """ |
| | Extract text from Word document |
| | """ |
| | try: |
| | doc = Document(file_path) |
| | return "\n".join([para.text for para in doc.paragraphs]) |
| | except Exception as e: |
| | raise Exception(f"Error reading document: {str(e)}") |
| | |
| | @staticmethod |
| | def extract_supplier_name(content: str, filename: str) -> str: |
| | """ |
| | Extract supplier name from document content or filename |
| | """ |
| | |
| | patterns = [ |
| | r'SUPPLIER\s+(\d+)S?', |
| | r'Supplier\s+(\d+)', |
| | r'supplier\s+(\d+)', |
| | ] |
| | |
| | for pattern in patterns: |
| | match = re.search(pattern, content, re.IGNORECASE) |
| | if match: |
| | return f"Supplier {match.group(1)}" |
| | |
| | |
| | filename_patterns = [ |
| | r'Audit.*Report.*?(\d+)', |
| | r'supplier.*?(\d+)', |
| | r'audit.*?(\d+)', |
| | ] |
| | |
| | for pattern in filename_patterns: |
| | match = re.search(pattern, filename, re.IGNORECASE) |
| | if match: |
| | return f"Supplier {match.group(1)}" |
| | |
| | |
| | return f"Supplier from {filename}" |
| | |
| | @staticmethod |
| | def count_audit_findings(content: str) -> int: |
| | """ |
| | Count audit findings in document content using multiple approaches |
| | """ |
| | content_lower = content.lower() |
| | |
| | |
| | total_findings = 0 |
| | |
| | |
| | explicit_patterns = [ |
| | r'(\d+)\s+major\s+finding', |
| | r'(\d+)\s+minor\s+finding', |
| | r'(\d+)\s+finding', |
| | r'no\s+findings?\s+observed', |
| | r'zero\s+findings?' |
| | ] |
| | |
| | for pattern in explicit_patterns: |
| | matches = re.findall(pattern, content_lower) |
| | if matches: |
| | if 'no' in pattern or 'zero' in pattern: |
| | return 0 |
| | for match in matches: |
| | total_findings += int(match) |
| | |
| | |
| | if total_findings == 0: |
| | finding_keywords = [ |
| | 'major finding', 'minor finding', 'critical finding', |
| | 'observation', 'non-conformance', 'deficiency' |
| | ] |
| | |
| | for keyword in finding_keywords: |
| | count = content_lower.count(keyword) |
| | |
| | if 'major' in keyword or 'critical' in keyword: |
| | total_findings += count * 2 |
| | else: |
| | total_findings += count |
| | |
| | |
| | if 'no findings' in content_lower: |
| | return 0 |
| | |
| | return total_findings |
| | |
| | @staticmethod |
| | def extract_audit_metadata(content: str) -> Dict[str, Any]: |
| | """ |
| | Extract metadata from audit document |
| | """ |
| | metadata = { |
| | 'audit_type': 'Unknown', |
| | 'location': 'Unknown', |
| | 'auditor': 'Unknown', |
| | 'date': 'Unknown', |
| | 'iso_standard': None |
| | } |
| | |
| | |
| | if re.search(r'surveillance\s+audit', content, re.IGNORECASE): |
| | metadata['audit_type'] = 'Surveillance' |
| | elif re.search(r'certification\s+audit', content, re.IGNORECASE): |
| | metadata['audit_type'] = 'Certification' |
| | |
| | |
| | location_match = re.search(r'located\s+in\s+([^.]+)', content, re.IGNORECASE) |
| | if location_match: |
| | metadata['location'] = location_match.group(1).strip() |
| | |
| | |
| | iso_match = re.search(r'ISO\s+(\d+:\d+)', content) |
| | if iso_match: |
| | metadata['iso_standard'] = iso_match.group(1) |
| | |
| | return metadata |
| |
|
| | class ChartGenerator: |
| | """Utility class for generating charts and visualizations""" |
| | |
| | @staticmethod |
| | def create_performance_chart(scores: Dict[str, float], |
| | title: str = "Supplier Performance Scores", |
| | figsize: tuple = (10, 6)) -> str: |
| | """ |
| | Create a performance chart and return the file path |
| | """ |
| | plt.figure(figsize=figsize) |
| | |
| | |
| | sorted_scores = dict(sorted(scores.items(), key=lambda x: x[1], reverse=True)) |
| | |
| | bars = plt.bar(sorted_scores.keys(), sorted_scores.values(), |
| | color='skyblue', edgecolor='navy', alpha=0.7) |
| | |
| | |
| | for bar in bars: |
| | height = bar.get_height() |
| | plt.text(bar.get_x() + bar.get_width()/2., height + 1, |
| | f'{height:.1f}', ha='center', va='bottom', fontweight='bold') |
| | |
| | plt.title(title, fontsize=16, fontweight='bold') |
| | plt.ylabel("Performance Score", fontsize=12) |
| | plt.xlabel("Suppliers", fontsize=12) |
| | plt.xticks(rotation=45, ha='right') |
| | plt.grid(axis='y', alpha=0.3) |
| | plt.tight_layout() |
| | |
| | |
| | chart_path = tempfile.mktemp(suffix='.png') |
| | plt.savefig(chart_path, dpi=300, bbox_inches='tight') |
| | plt.close() |
| | |
| | return chart_path |
| | |
| | @staticmethod |
| | def create_distribution_chart(data: Dict[str, Any], |
| | title: str = "Distribution Analysis") -> str: |
| | """ |
| | Create distribution charts for various metrics |
| | """ |
| | fig, axes = plt.subplots(2, 2, figsize=(12, 8)) |
| | fig.suptitle(title, fontsize=16, fontweight='bold') |
| | |
| | |
| | |
| | |
| | chart_path = tempfile.mktemp(suffix='.png') |
| | plt.savefig(chart_path, dpi=300, bbox_inches='tight') |
| | plt.close() |
| | |
| | return chart_path |
| |
|
| | class ScoreCalculator: |
| | """Utility class for calculating various performance scores""" |
| | |
| | @staticmethod |
| | def calculate_weighted_score(metrics: Dict[str, float], |
| | weights: Dict[str, float]) -> float: |
| | """ |
| | Calculate weighted performance score |
| | """ |
| | total_score = 0.0 |
| | total_weight = 0.0 |
| | |
| | for metric, value in metrics.items(): |
| | if metric in weights: |
| | weight = weights[metric] |
| | total_score += value * weight |
| | total_weight += weight |
| | |
| | return total_score / total_weight if total_weight > 0 else 0.0 |
| | |
| | @staticmethod |
| | def normalize_score(score: float, min_val: float = 0, max_val: float = 100) -> float: |
| | """ |
| | Normalize score to a specific range |
| | """ |
| | return max(min_val, min(max_val, score)) |
| | |
| | @staticmethod |
| | def calculate_performance_tier(score: float) -> str: |
| | """ |
| | Determine performance tier based on score |
| | """ |
| | if score >= 90: |
| | return "Excellent" |
| | elif score >= 80: |
| | return "Good" |
| | elif score >= 70: |
| | return "Satisfactory" |
| | elif score >= 60: |
| | return "Needs Improvement" |
| | else: |
| | return "Poor" |
| |
|
| | class ReportFormatter: |
| | """Utility class for formatting reports and outputs""" |
| | |
| | @staticmethod |
| | def format_score_table(scores: Dict[str, float]) -> pd.DataFrame: |
| | """ |
| | Format scores into a nicely structured DataFrame |
| | """ |
| | df = pd.DataFrame(list(scores.items()), columns=['Supplier', 'Score']) |
| | df = df.sort_values('Score', ascending=False) |
| | df['Rank'] = range(1, len(df) + 1) |
| | df['Performance Tier'] = df['Score'].apply(ScoreCalculator.calculate_performance_tier) |
| | df['Score'] = df['Score'].round(2) |
| | |
| | return df[['Rank', 'Supplier', 'Score', 'Performance Tier']] |
| | |
| | @staticmethod |
| | def format_findings_summary(findings: Dict[str, int]) -> pd.DataFrame: |
| | """ |
| | Format audit findings into a structured DataFrame |
| | """ |
| | df = pd.DataFrame(list(findings.items()), |
| | columns=['Supplier', 'Findings Count']) |
| | df['Risk Level'] = df['Findings Count'].apply( |
| | lambda x: 'High' if x > 3 else 'Medium' if x > 1 else 'Low' |
| | ) |
| | |
| | return df.sort_values('Findings Count', ascending=False) |
| |
|
| | |
| | def safe_divide(numerator: float, denominator: float, default: float = 0.0) -> float: |
| | """ |
| | Safely divide two numbers, returning default if denominator is zero |
| | """ |
| | return numerator / denominator if denominator != 0 else default |
| |
|
| | def extract_numeric_value(text: str, default: float = 0.0) -> float: |
| | """ |
| | Extract numeric value from text string |
| | """ |
| | if pd.isna(text): |
| | return default |
| | |
| | |
| | numbers = re.findall(r'\d+\.?\d*', str(text)) |
| | if numbers: |
| | return float(numbers[0]) |
| | |
| | return default |
| |
|
| | def clean_text(text: str) -> str: |
| | """ |
| | Clean and normalize text |
| | """ |
| | if pd.isna(text): |
| | return "" |
| | |
| | |
| | cleaned = re.sub(r'\s+', ' ', str(text).strip()) |
| | return cleaned |
| |
|