File size: 6,854 Bytes
c8c12e9
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
"""Test Models on all MVTec AD Categories."""

# Copyright (C) 2020 Intel Corporation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions
# and limitations under the License.

import itertools
import math
import multiprocessing
import random
import tempfile
from concurrent.futures import ProcessPoolExecutor
from datetime import datetime
from pathlib import Path
from typing import Dict, List, Union

import numpy as np
import pandas as pd
import torch
from omegaconf import DictConfig, ListConfig, OmegaConf
from pytorch_lightning import seed_everything

from anomalib.utils.sweep.config import flatten_sweep_params
from tests.helpers.dataset import get_dataset_path
from tests.helpers.model import model_load_test, setup_model_train


def get_model_nncf_cat() -> List:
    """Test helper for getting cartesian product of models and categories.

    Returns:
        List: Returns a combination of models with their nncf support for each category.
    """
    model_support = [
        ("padim", False),
        ("dfkde", False),
        ("dfm", False),
        ("stfpm", False),
        # ("stfpm", True),
        ("patchcore", False),
        ("cflow", False),
        ("ganomaly", False),
    ]
    categories = random.sample(
        [
            "bottle",
            "cable",
            "capsule",
            "carpet",
            "grid",
            "hazelnut",
            "leather",
            "metal_nut",
            "pill",
            "screw",
            "tile",
            "toothbrush",
            "transistor",
            "wood",
            "zipper",
        ],
        k=3,
    )

    return [
        (model, nncf, category) for ((model, nncf), category) in list(itertools.product(*[model_support, categories]))
    ]


class TestModel:
    """Run Model on all categories."""

    def _test_metrics(self, trainer, config, model, datamodule):
        """Tests the model metrics but also acts as a setup."""

        results = trainer.test(model=model, datamodule=datamodule)[0]

        thresholds = OmegaConf.load("tests/nightly/models/performance_thresholds.yaml")

        threshold = thresholds[config.model.name][config.dataset.category]
        if "optimization" in config.keys() and config.optimization.nncf.apply:
            threshold = threshold.nncf
        if not (
            np.isclose(results["image_AUROC"], threshold["image_AUROC"], rtol=0.02)
            or (results["image_AUROC"] >= threshold["image_AUROC"])
        ):
            raise AssertionError(
                f"results['image_AUROC']:{results['image_AUROC']} >= threshold['image_AUROC']:{threshold['image_AUROC']}"
            )

        if config.dataset.task == "segmentation":
            if not (
                np.isclose(results["pixel_AUROC"], threshold["pixel_AUROC"], rtol=0.02)
                or (results["pixel_AUROC"] >= threshold["pixel_AUROC"])
            ):
                raise AssertionError(
                    f"results['pixel_AUROC']:{results['pixel_AUROC']} >= threshold['pixel_AUROC']:{threshold['pixel_AUROC']}"
                )
        return results

    def _save_to_csv(self, config: Union[DictConfig, ListConfig], results: Dict):
        """Save model results to csv. Useful for tracking model drift.

        Args:
            config (Union[DictConfig, ListConfig]): Model config which is also added to csv for complete picture.
            results (Dict): Metrics from trainer.test
        """
        # Save results in csv for tracking model drift
        model_metrics = flatten_sweep_params(config)
        # convert dict, list values to string
        for key, val in model_metrics.items():
            if isinstance(val, (list, dict, ListConfig, DictConfig)):
                model_metrics[key] = str(val)
        for metric, value in results.items():
            model_metrics[metric] = value
        model_metrics_df = pd.DataFrame([model_metrics])

        result_path = Path(f"tests/artifacts/{datetime.now().strftime('%m_%d_%Y')}.csv")
        result_path.parent.mkdir(parents=True, exist_ok=True)
        if not result_path.is_file():
            model_metrics_df.to_csv(result_path)
        else:
            model_metrics_df.to_csv(result_path, mode="a", header=False)

    def runner(self, run_configs, path, score_type, device_id):
        for model_name, nncf, category in run_configs:
            try:
                with tempfile.TemporaryDirectory() as project_path:
                    # Fix seed
                    seed_everything(42, workers=True)
                    config, datamodule, model, trainer = setup_model_train(
                        model_name=model_name,
                        dataset_path=path,
                        nncf=nncf,
                        project_path=project_path,
                        category=category,
                        score_type=score_type,
                        device=[device_id],
                    )

                    # test model metrics
                    results = self._test_metrics(trainer=trainer, config=config, model=model, datamodule=datamodule)

                    # test model load
                    model_load_test(config=config, datamodule=datamodule, results=results)

                    self._save_to_csv(config, results)
            except AssertionError as assertion_error:
                raise Exception(f"Model: {model_name} NNCF:{nncf} Category:{category}") from assertion_error

    def test_model(self, path=get_dataset_path(), score_type=None):
        run_configs = get_model_nncf_cat()
        with ProcessPoolExecutor(
            max_workers=torch.cuda.device_count(), mp_context=multiprocessing.get_context("spawn")
        ) as executor:
            jobs = []
            for device_id, run_split in enumerate(
                range(0, len(run_configs), math.ceil(len(run_configs) / torch.cuda.device_count()))
            ):
                jobs.append(
                    executor.submit(
                        self.runner,
                        run_configs[run_split : run_split + math.ceil(len(run_configs) / torch.cuda.device_count())],
                        path,
                        score_type,
                        device_id,
                    )
                )
            for job in jobs:
                try:
                    job.result()
                except Exception as e:
                    raise e