File size: 10,903 Bytes
4e67a93
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
#!/usr/bin/env python3
"""
Test suite for verifying that kpi_correlation_app correctly matches emails
and retrieves corresponding values from KPI and scores CSV files.
"""

import os
import sys
import pandas as pd
import numpy as np
import tempfile
import unittest
from io import StringIO

# Add the current directory to Python path for imports
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))

from kpi_correlation_app import analyze_correlations, convert_percentage_to_numeric
from csv_utils import robust_csv_loader, find_email_column, find_ipm_columns


class TestEmailMatching(unittest.TestCase):
    """Test suite for email matching and value correspondence"""
    
    def setUp(self):
        """Set up test data files"""
        self.test_dir = tempfile.mkdtemp()
        
        # Create test scores data
        self.scores_data = pd.DataFrame({
            'email': [
                'john.doe@company.com',
                'jane.smith@company.com',
                'MIKE.JONES@company.com',  # Different case
                'sarah.williams@company.com',
                'tom.brown@company.com'
            ],
            'problem_score': [85.5, 92.0, 78.5, 88.0, 90.5],
            'ability_score': [4.2, 4.8, 3.9, 4.5, 4.6]
        })
        
        # Create test KPI data with known values
        self.kpi_data = pd.DataFrame({
            'Email': [
                'John.Doe@company.com',  # Different case than scores
                'jane.smith@company.com',  # Same case
                'mike.jones@company.com',  # Different case than scores
                'sarah.williams@company.com',
                'alice.wonder@company.com'  # Not in scores
            ],
            'FY23/24 全年IPM': ['85%', '90%', '75%', '88%', '92%'],
            'FY24/25  全年IPM': ['87%', '93%', '78%', '89%', '94%'],
            'Department': ['Sales', 'Marketing', 'IT', 'HR', 'Finance']
        })
        
        # Save test files
        self.scores_file = os.path.join(self.test_dir, 'lenovo-scores-0603.csv')
        self.kpi_file = os.path.join(self.test_dir, 'test_kpi.csv')
        
        self.scores_data.to_csv(self.scores_file, index=False)
        self.kpi_data.to_csv(self.kpi_file, index=False)
        
    def tearDown(self):
        """Clean up test files"""
        import shutil
        if os.path.exists(self.test_dir):
            shutil.rmtree(self.test_dir)
    
    def test_email_case_insensitive_matching(self):
        """Test that emails are matched case-insensitively"""
        # Temporarily change directory to test directory
        original_dir = os.getcwd()
        os.chdir(self.test_dir)
        
        try:
            # Analyze correlations
            results_text, _, _, _, _, correlation_results = analyze_correlations(self.kpi_file)
            
            # Check that case differences don't prevent matching
            self.assertIn("Matched emails: 4 records", results_text)
            
            # Verify that john.doe, jane.smith, mike.jones, and sarah.williams were matched
            # (alice.wonder and tom.brown should not match)
            
        finally:
            os.chdir(original_dir)
    
    def test_correct_value_correspondence(self):
        """Test that correct values are retrieved for matched emails"""
        # Create a simplified version for direct testing
        scores_df = self.scores_data.copy()
        kpi_df = self.kpi_data.copy()
        
        # Normalize emails
        scores_df['email_normalized'] = scores_df['email'].str.lower()
        kpi_df['email_normalized'] = kpi_df['Email'].str.lower()
        
        # Merge datasets
        merged_df = pd.merge(scores_df, kpi_df, on='email_normalized', how='inner')
        
        # Test specific email correspondences
        john_row = merged_df[merged_df['email_normalized'] == 'john.doe@company.com']
        self.assertEqual(len(john_row), 1)
        self.assertAlmostEqual(john_row['problem_score'].iloc[0], 85.5)
        self.assertAlmostEqual(john_row['ability_score'].iloc[0], 4.2)
        self.assertEqual(john_row['FY23/24 全年IPM'].iloc[0], '85%')
        self.assertEqual(john_row['FY24/25  全年IPM'].iloc[0], '87%')
        
        # Test another email
        jane_row = merged_df[merged_df['email_normalized'] == 'jane.smith@company.com']
        self.assertEqual(len(jane_row), 1)
        self.assertAlmostEqual(jane_row['problem_score'].iloc[0], 92.0)
        self.assertAlmostEqual(jane_row['ability_score'].iloc[0], 4.8)
        self.assertEqual(jane_row['FY23/24 全年IPM'].iloc[0], '90%')
        self.assertEqual(jane_row['FY24/25  全年IPM'].iloc[0], '93%')
        
    def test_percentage_conversion(self):
        """Test that percentage strings are correctly converted to numeric values"""
        # Test conversion function
        test_series = pd.Series(['85%', '90.5%', '100%', '0%', 'invalid'])
        converted = convert_percentage_to_numeric(test_series)
        
        self.assertAlmostEqual(converted.iloc[0], 0.85)
        self.assertAlmostEqual(converted.iloc[1], 0.905)
        self.assertAlmostEqual(converted.iloc[2], 1.0)
        self.assertAlmostEqual(converted.iloc[3], 0.0)
        self.assertTrue(pd.isna(converted.iloc[4]))  # 'invalid' should be NaN
        
    def test_missing_emails_handling(self):
        """Test that non-matching emails are handled correctly"""
        scores_df = self.scores_data.copy()
        kpi_df = self.kpi_data.copy()
        
        # Normalize emails
        scores_df['email_normalized'] = scores_df['email'].str.lower()
        kpi_df['email_normalized'] = kpi_df['Email'].str.lower()
        
        # Merge datasets
        merged_df = pd.merge(scores_df, kpi_df, on='email_normalized', how='inner')
        
        # tom.brown@company.com is in scores but not in KPI
        self.assertFalse('tom.brown@company.com' in merged_df['email_normalized'].values)
        
        # alice.wonder@company.com is in KPI but not in scores
        self.assertFalse('alice.wonder@company.com' in merged_df['email_normalized'].values)
        
        # Only 4 emails should match
        self.assertEqual(len(merged_df), 4)
        
    def test_excel_file_support(self):
        """Test that Excel files are correctly processed"""
        # Save KPI data as Excel
        excel_file = os.path.join(self.test_dir, 'test_kpi.xlsx')
        self.kpi_data.to_excel(excel_file, index=False)
        
        # Test loading Excel file
        loaded_df = robust_csv_loader(excel_file)
        
        # Verify data integrity
        self.assertEqual(len(loaded_df), len(self.kpi_data))
        self.assertListEqual(loaded_df.columns.tolist(), self.kpi_data.columns.tolist())
        
    def test_correlation_calculation_integrity(self):
        """Test that correlations are calculated on correctly matched data"""
        # Create specific test data with known correlations
        test_scores = pd.DataFrame({
            'email': ['user1@test.com', 'user2@test.com', 'user3@test.com', 'user4@test.com', 'user5@test.com'],
            'problem_score': [10, 20, 30, 40, 50],
            'ability_score': [1, 2, 3, 4, 5]
        })
        
        test_kpi = pd.DataFrame({
            'Email': ['user1@test.com', 'user2@test.com', 'user3@test.com', 'user4@test.com', 'user5@test.com'],
            'FY23/24 全年IPM': ['20%', '40%', '60%', '80%', '100%'],  # Perfect correlation
            'FY24/25  全年IPM': ['100%', '80%', '60%', '40%', '20%']   # Perfect negative correlation
        })
        
        # Save test files
        test_scores_file = os.path.join(self.test_dir, 'lenovo-scores-0603.csv')
        test_kpi_file = os.path.join(self.test_dir, 'perfect_correlation_kpi.csv')
        
        test_scores.to_csv(test_scores_file, index=False)
        test_kpi.to_csv(test_kpi_file, index=False)
        
        # Change to test directory and analyze
        original_dir = os.getcwd()
        os.chdir(self.test_dir)
        
        try:
            results_text, _, _, _, _, correlation_results = analyze_correlations(test_kpi_file)
            
            # Check AC correlation (problem_score vs FY23/24 IPM) - should be perfect positive
            if correlation_results and 'AC' in correlation_results:
                self.assertAlmostEqual(correlation_results['AC']['pearson'], 1.0, places=5)
                
            # Check AD correlation (problem_score vs FY24/25 IPM) - should be perfect negative
            if correlation_results and 'AD' in correlation_results:
                self.assertAlmostEqual(correlation_results['AD']['pearson'], -1.0, places=5)
                
        finally:
            os.chdir(original_dir)


class TestColumnFinding(unittest.TestCase):
    """Test suite for column finding utilities"""
    
    def test_find_email_column_variations(self):
        """Test finding email column with various naming conventions"""
        test_cases = [
            pd.DataFrame({'Email': [], 'Name': []}),
            pd.DataFrame({'email': [], 'Name': []}),
            pd.DataFrame({'EMAIL': [], 'Name': []}),
            pd.DataFrame({'Email Address': [], 'Name': []}),
            pd.DataFrame({'user_email': [], 'Name': []})
        ]
        
        for df in test_cases:
            email_col = find_email_column(df)
            self.assertIsNotNone(email_col, f"Failed to find email column in {df.columns.tolist()}")
            self.assertTrue('email' in email_col.lower())
    
    def test_find_ipm_columns(self):
        """Test finding IPM columns with exact naming"""
        df = pd.DataFrame({
            'Email': [],
            'FY23/24 全年IPM': [],
            'FY24/25  全年IPM': [],  # Note the extra space
            'Other Column': []
        })
        
        fy2425_col, fy2324_col = find_ipm_columns(df)
        
        self.assertEqual(fy2324_col, 'FY23/24 全年IPM')
        self.assertEqual(fy2425_col, 'FY24/25  全年IPM')


def run_detailed_tests():
    """Run tests with detailed output"""
    # Create a test suite
    loader = unittest.TestLoader()
    suite = unittest.TestSuite()
    
    # Add all test classes
    suite.addTests(loader.loadTestsFromTestCase(TestEmailMatching))
    suite.addTests(loader.loadTestsFromTestCase(TestColumnFinding))
    
    # Run with verbose output
    runner = unittest.TextTestRunner(verbosity=2)
    result = runner.run(suite)
    
    return result.wasSuccessful()


if __name__ == '__main__':
    print("=" * 70)
    print("Running Email Matching and Value Correspondence Tests")
    print("=" * 70)
    
    success = run_detailed_tests()
    
    if success:
        print("\n✅ All tests passed! The app correctly matches emails and retrieves corresponding values.")
    else:
        print("\n❌ Some tests failed. Please review the output above.")
    
    sys.exit(0 if success else 1)