File size: 6,326 Bytes
eeef81e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
# Copyright 2020 Pants project contributors (see CONTRIBUTORS.md).
# Licensed under the Apache License, Version 2.0 (see LICENSE).

from abc import ABCMeta, abstractmethod
from pathlib import Path
from textwrap import dedent
from typing import ClassVar, Iterable, List, Optional, Tuple, Type

from pants.core.goals.check import Check, CheckRequest, CheckResult, CheckResults, check
from pants.core.util_rules.distdir import DistDir
from pants.engine.addresses import Address
from pants.engine.fs import Workspace
from pants.engine.target import FieldSet, MultipleSourcesField, Target, Targets
from pants.engine.unions import UnionMembership
from pants.testutil.option_util import create_options_bootstrapper
from pants.testutil.rule_runner import MockGet, RuleRunner, mock_console, run_rule_with_mocks
from pants.util.logging import LogLevel


class MockTarget(Target):
    alias = "mock_target"
    core_fields = (MultipleSourcesField,)


class MockCheckFieldSet(FieldSet):
    required_fields = (MultipleSourcesField,)


class MockCheckRequest(CheckRequest, metaclass=ABCMeta):
    field_set_type = MockCheckFieldSet
    checker_name: ClassVar[str]

    @staticmethod
    @abstractmethod
    def exit_code(_: Iterable[Address]) -> int:
        pass

    @property
    def check_results(self) -> CheckResults:
        addresses = [config.address for config in self.field_sets]
        return CheckResults(
            [
                CheckResult(
                    self.exit_code(addresses),
                    "",
                    "",
                )
            ],
            checker_name=self.checker_name,
        )


class SuccessfulRequest(MockCheckRequest):
    checker_name = "SuccessfulChecker"

    @staticmethod
    def exit_code(_: Iterable[Address]) -> int:
        return 0


class FailingRequest(MockCheckRequest):
    checker_name = "FailingChecker"

    @staticmethod
    def exit_code(_: Iterable[Address]) -> int:
        return 1


class ConditionallySucceedsRequest(MockCheckRequest):
    checker_name = "ConditionallySucceedsChecker"

    @staticmethod
    def exit_code(addresses: Iterable[Address]) -> int:
        if any(address.target_name == "bad" for address in addresses):
            return 127
        return 0


class SkippedRequest(MockCheckRequest):
    @staticmethod
    def exit_code(_) -> int:
        return 0

    @property
    def check_results(self) -> CheckResults:
        return CheckResults([], checker_name="SkippedChecker")


class InvalidField(MultipleSourcesField):
    pass


class InvalidFieldSet(MockCheckFieldSet):
    required_fields = (InvalidField,)


class InvalidRequest(MockCheckRequest):
    field_set_type = InvalidFieldSet
    checker_name = "InvalidChecker"

    @staticmethod
    def exit_code(_: Iterable[Address]) -> int:
        return -1


def make_target(address: Optional[Address] = None) -> Target:
    if address is None:
        address = Address("", target_name="tests")
    return MockTarget({}, address)


def run_typecheck_rule(
    *, request_types: List[Type[CheckRequest]], targets: List[Target]
) -> Tuple[int, str]:
    union_membership = UnionMembership({CheckRequest: request_types})
    with mock_console(create_options_bootstrapper()) as (console, stdio_reader):
        rule_runner = RuleRunner()
        result: Check = run_rule_with_mocks(
            check,
            rule_args=[
                console,
                Workspace(rule_runner.scheduler, _enforce_effects=False),
                Targets(targets),
                DistDir(relpath=Path("dist")),
                union_membership,
            ],
            mock_gets=[
                MockGet(
                    output_type=CheckResults,
                    input_type=CheckRequest,
                    mock=lambda field_set_collection: field_set_collection.check_results,
                ),
            ],
            union_membership=union_membership,
        )
        assert not stdio_reader.get_stdout()
        return result.exit_code, stdio_reader.get_stderr()


def test_invalid_target_noops() -> None:
    exit_code, stderr = run_typecheck_rule(request_types=[InvalidRequest], targets=[make_target()])
    assert exit_code == 0
    assert stderr == ""


def test_summary() -> None:
    good_address = Address("", target_name="good")
    bad_address = Address("", target_name="bad")
    exit_code, stderr = run_typecheck_rule(
        request_types=[
            ConditionallySucceedsRequest,
            FailingRequest,
            SkippedRequest,
            SuccessfulRequest,
        ],
        targets=[make_target(good_address), make_target(bad_address)],
    )
    assert exit_code == FailingRequest.exit_code([bad_address])
    assert stderr == dedent(
        """\

        𐄂 ConditionallySucceedsChecker failed.
        𐄂 FailingChecker failed.
        - SkippedChecker skipped.
        ✓ SuccessfulChecker succeeded.
        """
    )


def test_streaming_output_skip() -> None:
    results = CheckResults([], checker_name="typechecker")
    assert results.level() == LogLevel.DEBUG
    assert results.message() == "typechecker skipped."


def test_streaming_output_success() -> None:
    results = CheckResults([CheckResult(0, "stdout", "stderr")], checker_name="typechecker")
    assert results.level() == LogLevel.INFO
    assert results.message() == dedent(
        """\
        typechecker succeeded.
        stdout
        stderr

        """
    )


def test_streaming_output_failure() -> None:
    results = CheckResults([CheckResult(18, "stdout", "stderr")], checker_name="typechecker")
    assert results.level() == LogLevel.ERROR
    assert results.message() == dedent(
        """\
        typechecker failed (exit code 18).
        stdout
        stderr

        """
    )


def test_streaming_output_partitions() -> None:
    results = CheckResults(
        [
            CheckResult(21, "", "", partition_description="ghc8.1"),
            CheckResult(0, "stdout", "stderr", partition_description="ghc9.2"),
        ],
        checker_name="typechecker",
    )
    assert results.level() == LogLevel.ERROR
    assert results.message() == dedent(
        """\
        typechecker failed (exit code 21).
        Partition #1 - ghc8.1:

        Partition #2 - ghc9.2:
        stdout
        stderr

        """
    )