File size: 9,965 Bytes
6a45c3a
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
import importlib
import importlib.abc
import importlib.machinery
import os
import sys
import time
import types
import warnings
from pathlib import Path

import torch

# Stub speechbrain optional integrations (numba, k2_fsa) to prevent import errors on Windows
# These are optional dependencies that may fail to import but are not required for inference


class SpeechbrainIntegrationStubLoader(importlib.abc.Loader):
    def create_module(self, spec):
        return types.ModuleType(spec.name)

    def exec_module(self, module):
        spec = module.__spec__
        module.__file__ = "<stub>"
        module.__package__ = spec.name if spec.submodule_search_locations is not None else spec.name.rpartition(".")[0]
        if spec.submodule_search_locations is not None:
            module.__path__ = []
        module.__all__ = []


class SpeechbrainIntegrationStubFinder(importlib.abc.MetaPathFinder):
    NAMESPACE = "speechbrain.integrations"

    def find_spec(self, fullname, path, target=None):
        if not fullname.startswith(self.NAMESPACE):
            return None
        if fullname in sys.modules:
            return None

        real_spec = importlib.machinery.PathFinder.find_spec(fullname, path)
        if real_spec is not None:
            return None

        is_pkg = "." not in fullname
        spec = importlib.machinery.ModuleSpec(fullname, SpeechbrainIntegrationStubLoader(), is_package=is_pkg)
        if is_pkg:
            spec.submodule_search_locations = []
        sys.modules[fullname] = types.ModuleType(fullname)
        sys.modules[fullname].__all__ = []
        return spec


def _install_speechbrain_optional_integration_stub_finder():
    if not any(isinstance(finder, SpeechbrainIntegrationStubFinder) for finder in sys.meta_path):
        sys.meta_path.insert(0, SpeechbrainIntegrationStubFinder())
    if "speechbrain.integrations" not in sys.modules:
        base_mod = types.ModuleType("speechbrain.integrations")
        base_mod.__path__ = []
        sys.modules["speechbrain.integrations"] = base_mod
    for submodule in ["huggingface", "numba", "k2_fsa", "nlp"]:
        fullname = f"speechbrain.integrations.{submodule}"
        if fullname not in sys.modules:
            submod = types.ModuleType(fullname)
            submod.__path__ = []
            submod.__package__ = "speechbrain.integrations"
            submod.__all__ = []
            sys.modules[fullname] = submod


_install_speechbrain_optional_integration_stub_finder()


SEPFORNER_MODEL_SOURCE = os.environ.get("SEPFORNER_MODEL_SOURCE", "speechbrain/sepformer-libri3mix")
SEPFORNER_MODEL_REVISION = os.environ.get("SEPFORNER_MODEL_REVISION", "main")
SEPFORNER_REQUIRED_FILES = ("hyperparams.yaml", "encoder.ckpt", "decoder.ckpt", "masknet.ckpt")


def _local_sepformer_dir() -> Path:
    return Path(os.path.abspath("./pretrained_sepformer"))


def _missing_sepformer_files(local_dir: Path):
    return [
        filename
        for filename in SEPFORNER_REQUIRED_FILES
        if not (local_dir / filename).is_file() or (local_dir / filename).stat().st_size == 0
    ]


def _download_missing_sepformer_files(local_dir: Path) -> None:
    local_dir.mkdir(parents=True, exist_ok=True)

    try:
        from huggingface_hub import hf_hub_download
    except ModuleNotFoundError as exc:
        raise ModuleNotFoundError(
            "huggingface_hub is required to download SepFormer assets. Install it with `pip install huggingface_hub`."
        ) from exc

    print(f"SepFormer source: {SEPFORNER_MODEL_SOURCE}@{SEPFORNER_MODEL_REVISION}")
    for filename in SEPFORNER_REQUIRED_FILES:
        local_path = local_dir / filename
        is_file = local_path.is_file()
        file_size = local_path.stat().st_size if is_file else 0

        if is_file and file_size > 0:
            print(f"Using existing SepFormer asset: {local_path}")
            continue

        status_msg = "missing" if not is_file else "empty"
        print(f"Local asset '{filename}' is {status_msg}. Downloading from '{SEPFORNER_MODEL_SOURCE}' to '{local_dir}'...")

        max_retries = 3
        last_error = None
        for attempt in range(max_retries):
            try:
                hf_hub_download(
                    repo_id=SEPFORNER_MODEL_SOURCE,
                    filename=filename,
                    revision=SEPFORNER_MODEL_REVISION,
                    local_dir=str(local_dir),
                    local_dir_use_symlinks=False,
                )
                break
            except Exception as exc:
                last_error = exc
                wait_time = 2 ** attempt
                print(f"Attempt {attempt + 1}/{max_retries} failed for '{filename}'. Retrying in {wait_time}s...")
                time.sleep(wait_time)
        else:
            raise RuntimeError(
                f"Failed to download '{filename}' from '{SEPFORNER_MODEL_SOURCE}' after {max_retries} attempts. "
                f"Check network connectivity or Hugging Face Hub status. Original error: {last_error}"
            ) from last_error


def ensure_local_sepformer_assets() -> Path:
    local_dir = _local_sepformer_dir()
    missing = _missing_sepformer_files(local_dir)
    if missing:
        _download_missing_sepformer_files(local_dir)

    missing = _missing_sepformer_files(local_dir)
    if missing:
        raise FileNotFoundError(
            f"Local pretrained SepFormer directory '{local_dir}' is missing required files: {missing}. "
            f"Set SEPFORNER_MODEL_SOURCE to a valid SpeechBrain SepFormer model and rerun the application."
        )

    return local_dir


class UnifiedSepFormer(torch.nn.Module):
    def __init__(self, modules_dict):
        super().__init__()

        self.encoder = modules_dict['encoder']
        self.masknet = modules_dict['masknet']
        self.decoder = modules_dict['decoder']

    def forward(self, mix):
        mix_w = self.encoder(mix)
        est_mask = self.masknet(mix_w)

        decoded_sources = []

        for i in range(est_mask.shape[0]):
            sep_h_i = mix_w * est_mask[i]
            est_source_i = self.decoder(sep_h_i)
            decoded_sources.append(est_source_i.unsqueeze(-1))

        est_source = torch.cat(decoded_sources, dim=-1)
        return est_source


def load_model(checkpoint_path=None):
    try:
        speechbrain_inference = importlib.import_module("speechbrain.inference.separation")
        speechbrain_fetching = importlib.import_module("speechbrain.utils.fetching")
    except ModuleNotFoundError as exc:
        raise ModuleNotFoundError(
            "SpeechBrain is required for SepFormer model loading. Install it with `pip install speechbrain` and a compatible `k2` package, or use a separate environment where SpeechBrain is supported."
        ) from exc

    _install_speechbrain_optional_integration_stub_finder()
    SepformerSeparation = getattr(speechbrain_inference, "SepformerSeparation")
    LocalStrategy = getattr(speechbrain_fetching, "LocalStrategy")

    local_sepformer_dir = ensure_local_sepformer_assets()

    try:
        with warnings.catch_warnings():
            warnings.filterwarnings("ignore", category=DeprecationWarning)
            model_hub = SepformerSeparation.from_hparams(
                source=str(local_sepformer_dir),
                savedir=str(local_sepformer_dir),
                local_strategy=LocalStrategy.COPY_SKIP_CACHE,
            )
    except ImportError as exc:
        msg = str(exc)
        if "speechbrain.integrations.k2_fsa" in msg or "Please install k2 to use k2" in msg or "No module named '_k2'" in msg:
            raise ImportError(
                "SpeechBrain attempted to load the optional k2 integration and failed. "
                "This often happens on Windows because k2 is not available or the installed wheel is incompatible. "
                "If you do not need k2 features, use a SpeechBrain install that does not require k2 or run this project on Linux. "
                "Original error: " + msg
            ) from exc
        raise

    model = UnifiedSepFormer(model_hub.mods)

    if checkpoint_path is None:
        model.eval()
        return model

    if not os.path.exists(checkpoint_path):
        print(f"WARNING: checkpoint '{checkpoint_path}' not found. Using local pretrained model instead.")
        model.eval()
        return model

    with warnings.catch_warnings():
        warnings.filterwarnings("ignore", message=r"TypedStorage is deprecated.*")
        checkpoint = torch.load(
            checkpoint_path,
            map_location="cpu"
        )

    if isinstance(checkpoint, dict):
        if "model_state_dict" in checkpoint:
            state_dict = checkpoint["model_state_dict"]
        elif "state_dict" in checkpoint:
            state_dict = checkpoint["state_dict"]
        else:
            state_dict = checkpoint
    else:
        state_dict = checkpoint

    try:
        model.load_state_dict(state_dict)
    except RuntimeError as err:
        print("WARNING: checkpoint is incompatible with the local SepFormer architecture.")
        print("Attempting relaxed load with strict=False.")
        try:
            load_result = model.load_state_dict(state_dict, strict=False)
            missing = getattr(load_result, "missing_keys", None)
            unexpected = getattr(load_result, "unexpected_keys", None)
            if missing:
                print("Missing keys from checkpoint:", missing)
            if unexpected:
                print("Unexpected keys in checkpoint:", unexpected)
            print("Relaxed checkpoint load succeeded. Using loaded weights where possible.")
            model.eval()
            return model
        except RuntimeError as err2:
            print("Relaxed checkpoint load also failed. Using local pretrained SepFormer weights from './pretrained_sepformer' instead.")
            print(err2)
            model.eval()
            return model

    model.eval()
    return model