File size: 2,239 Bytes
bd90acc 40e9531 bd90acc |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 |
import numpy as np
import faunanet.preprocessor_base as ppb
class Preprocessor(ppb.PreprocessorBase):
def __init__(
self,
sample_rate: int = 32000,
sample_secs: float = 5.0,
resample_type: str = "kaiser_fast",
**kwargs
):
super().__init__(
"google_perch_lite",
sample_rate=sample_rate,
sample_secs=sample_secs,
resample_type=resample_type,
**kwargs
)
def process_audio_data(self, rawdata: np.array) -> np.array:
# raise when sampling rate is unequal.
if self.actual_sampling_rate != self.sample_rate:
raise RuntimeError(
"Sampling rate is not the desired one. Desired sampling rate: {self.sample_rate}, actual sampling rate: {self.actual_sampling_rate}"
)
seconds = self.sample_secs
minlen = 1.5
self.chunks = []
for i in range(
0, len(rawdata), int((seconds - self.overlap) * self.sample_rate)
):
split = rawdata[i : (i + int(seconds * self.actual_sampling_rate))]
# End of signal?
if len(split) < int(minlen * self.actual_sampling_rate):
break
# Signal chunk too short? Fill with zeros.
if len(split) < int(self.actual_sampling_rate * seconds):
temp = np.zeros((int(self.actual_sampling_rate * seconds)))
temp[: len(split)] = split
split = temp
self.chunks.append(split)
print(
"process audio data google: complete, read ",
str(len(self.chunks)),
"chunks.",
flush=True,
)
return self.chunks
@classmethod
def from_cfg(cls, cfg: dict):
# make sure there are no more than the allowed keyword arguments in the cfg
allowed = [
"sample_rate",
"sample_secs",
"resample_type",
"duration",
"actual_sampling_rate",
]
if len([key for key in cfg if key not in allowed]) > 0:
raise RuntimeError("Erroneous keyword arguments in preprocessor config")
return cls(**cfg)
|