File size: 6,054 Bytes
9c4b1c4
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
import os
from tqdm import tqdm
import torch
import pandas as pd
import json
import time
import numpy as np
from sklearn.metrics import roc_auc_score, accuracy_score
from networks import create_architecture, count_parameters
from utils.dataset import create_dataloader
from utils.processing import add_processing_arguments
from parser import get_parser

def test(loader, model, settings, device):
    model.eval()
    
    start_time = time.time()
    
    # File paths
    output_dir = f'./results/{settings.name}/data/{settings.data_keys}'
    os.makedirs(output_dir, exist_ok=True)
    
    csv_filename = os.path.join(output_dir, 'results.csv')
    metrics_filename = os.path.join(output_dir, 'metrics.json')
    image_results_filename = os.path.join(output_dir, 'image_results.json')
    
    # Collect all results
    all_scores = []
    all_labels = []
    all_paths = []
    image_results = []
    
    # Extract training dataset keys from model name (format: "training_keys_freeze_down" or "training_keys")
    training_dataset_keys = []
    model_name = settings.name
    if '_freeze_down' in model_name:
        training_name = model_name.replace('_freeze_down', '')
    else:
        training_name = model_name
    if '&' in training_name:
        training_dataset_keys = training_name.split('&')
    else:
        training_dataset_keys = [training_name]
    
    # Write CSV header
    with open(csv_filename, 'w') as f:
        f.write(f"{','.join(['name', 'pro', 'flag'])}\n")
    
    with torch.no_grad():
        with tqdm(loader, unit='batch', mininterval=0.5) as tbatch:
            tbatch.set_description(f'Validation')
            for data_dict in tbatch:
                data = data_dict['img'].to(device)
                labels = data_dict['target'].to(device)
                paths = data_dict['path']

                scores = model(data).squeeze(1)

                # Collect results
                for score, label, path in zip(scores, labels, paths):
                    score_val = score.item()
                    label_val = label.item()
                    
                    all_scores.append(score_val)
                    all_labels.append(label_val)
                    all_paths.append(path)
                    
                    image_results.append({
                        'path': path,
                        'score': score_val,
                        'label': label_val
                    })
                
                # Write to CSV (maintain backward compatibility)
                with open(csv_filename, 'a') as f:
                    for score, label, path in zip(scores, labels, paths):
                        f.write(f"{path}, {score.item()}, {label.item()}\n")

    # Calculate metrics
    all_scores = np.array(all_scores)
    all_labels = np.array(all_labels)
    
    # Convert scores to predictions (threshold at 0, as used in train.py: y_pred > 0.0)
    predictions = (all_scores > 0).astype(int)
    
    # Calculate overall metrics
    total_accuracy = accuracy_score(all_labels, predictions)
    
    # TPR (True Positive Rate) = TP / (TP + FN) = accuracy on fake images (label==1)
    fake_mask = all_labels == 1
    if fake_mask.sum() > 0:
        tpr = accuracy_score(all_labels[fake_mask], predictions[fake_mask])
    else:
        tpr = 0.0
    
    # TNR per dataset key (True Negative Rate) = TN / (TN + FP) = accuracy on real images (label==0)
    tnr_per_dataset = {}
    
    # Calculate TNR on real images (label==0) in the test set
    real_mask = all_labels == 0
    if real_mask.sum() > 0:
        # Overall TNR calculated on all real images in the test set
        tnr = accuracy_score(all_labels[real_mask], predictions[real_mask])
    else:
        tnr = 0.0
        
        # Map TNR to training dataset keys (as shown in the example JSON structure)
        # The TNR is calculated on the test set, but organized by training dataset keys
        #for training_key in training_dataset_keys:
        #    tnr_per_dataset[training_key] = overall_tnr
    
    # AUC calculation (needs probabilities, so we'll use sigmoid on scores)
    if len(np.unique(all_labels)) > 1:  # Need both classes for AUC
        # Apply sigmoid to convert scores to probabilities
        probabilities = torch.sigmoid(torch.tensor(all_scores)).numpy()
        auc = roc_auc_score(all_labels, probabilities)
    else:
        auc = 0.0
    
    execution_time = time.time() - start_time
    
    # Prepare metrics JSON
    metrics = {
        'TPR': float(tpr),
        'TNR': float(tnr),
        'Acc total': float(total_accuracy),
        'AUC': float(auc),
        'execution time': float(execution_time)
    }
    
    # Write metrics JSON
    with open(metrics_filename, 'w') as f:
        json.dump(metrics, f, indent=2)
    
    # Write individual image results JSON
    with open(image_results_filename, 'w') as f:
        json.dump(image_results, f, indent=2)
    
    print(f'\nMetrics saved to {metrics_filename}')
    print(f'Image results saved to {image_results_filename}')
    print(f'\nMetrics:')
    print(f'  TPR: {tpr:.4f}')
    print(f'  TNR: {tnr:.4f}')
    print(f'  Accuracy: {total_accuracy:.4f}')
    print(f'  AUC: {auc:.4f}')
    print(f'  Execution time: {execution_time:.2f} seconds')

if __name__ == '__main__':
    parser = get_parser()
    parser = add_processing_arguments(parser)
    settings = parser.parse_args()
    
    device = torch.device(settings.device if torch.cuda.is_available() else 'cpu')

    test_dataloader = create_dataloader(settings, split='test')

    model = create_architecture(settings.arch, pretrained=True, num_classes=1).to(device)
    num_parameters = count_parameters(model)
    print(f"Arch: {settings.arch} with #parameters {num_parameters}")
    
    load_path = f'./checkpoint/{settings.name}/weights/best.pt'
    
    print('loading the model from %s' % load_path)
    model.load_state_dict(torch.load(load_path, map_location=device)['model'])
    model.to(device)

    test(test_dataloader, model, settings, device)