File size: 3,257 Bytes
a721dfa
 
 
9734b71
a721dfa
 
9734b71
a721dfa
 
9734b71
a721dfa
9734b71
a721dfa
 
 
 
 
 
 
9734b71
 
 
 
a721dfa
 
 
 
 
 
 
9734b71
 
 
 
 
 
 
 
 
 
 
 
 
 
a721dfa
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
9734b71
 
 
a721dfa
9734b71
a721dfa
 
 
9734b71
 
a721dfa
 
 
9734b71
a721dfa
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
from __future__ import annotations

import unittest
from pathlib import Path

import numpy as np
import socket

from backend.runtime_utils import (
    can_bind_tcp_port,
    clone_cache_payload,
    find_free_tcp_port,
    make_json_compatible,
    parse_cors_origins,
    resolve_runtime_paths,
)


class RuntimeUtilsTests(unittest.TestCase):
    @staticmethod
    def _module_file() -> str:
        return str(Path(__file__).resolve().with_name("main.py"))

    def test_parse_cors_origins_splits_and_trims(self) -> None:
        result = parse_cors_origins(" https://a.com,https://b.com , , ")
        self.assertEqual(result, ["https://a.com", "https://b.com"])

    def test_parse_cors_origins_defaults_to_wildcard(self) -> None:
        self.assertEqual(parse_cors_origins("   "), ["*"])

    def test_can_bind_tcp_port_detects_busy_listener(self) -> None:
        with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as server_socket:
            server_socket.bind(("127.0.0.1", 0))
            server_socket.listen(1)
            busy_port = int(server_socket.getsockname()[1])
            self.assertFalse(can_bind_tcp_port("127.0.0.1", busy_port))

        self.assertTrue(can_bind_tcp_port("127.0.0.1", busy_port))

    def test_find_free_tcp_port_returns_bindable_port(self) -> None:
        port = find_free_tcp_port("127.0.0.1")
        self.assertGreater(port, 0)
        self.assertTrue(can_bind_tcp_port("127.0.0.1", port))

    def test_clone_cache_payload_returns_independent_copy(self) -> None:
        original = {"nested": {"value": 1}, "items": [1, 2, 3]}
        cloned = clone_cache_payload(original)
        cloned["nested"]["value"] = 7
        cloned["items"].append(4)

        self.assertEqual(original["nested"]["value"], 1)
        self.assertEqual(original["items"], [1, 2, 3])

    def test_make_json_compatible_converts_numpy_scalars_and_arrays(self) -> None:
        payload = {
            "flag": np.bool_(True),
            "score": np.float64(12.5),
            "count": np.int64(7),
            "series": np.array([1.0, 2.0]),
        }

        result = make_json_compatible(payload)

        self.assertIs(result["flag"], True)
        self.assertEqual(result["score"], 12.5)
        self.assertEqual(result["count"], 7)
        self.assertEqual(result["series"], [1.0, 2.0])

    def test_resolve_runtime_paths_for_dev_mode(self) -> None:
        module_file = self._module_file()
        expected_current_dir = str(Path(module_file).resolve().parent)
        expected_project_root = str(Path(module_file).resolve().parent.parent)
        paths = resolve_runtime_paths(
            module_file=module_file,
            is_frozen=False,
            bundle_dir=r"D:\Bundle",
        )
        self.assertEqual(paths.current_dir, expected_current_dir)
        self.assertEqual(paths.project_root, expected_project_root)

    def test_resolve_runtime_paths_for_frozen_mode(self) -> None:
        paths = resolve_runtime_paths(
            module_file=self._module_file(),
            is_frozen=True,
            bundle_dir=r"D:\Bundle",
        )
        self.assertEqual(paths.current_dir, r"D:\Bundle")
        self.assertEqual(paths.project_root, r"D:\Bundle")


if __name__ == "__main__":
    unittest.main()