File size: 4,187 Bytes
fcd58ee 1f45d99 c0447ed d4ac08b 93d2288 fcd58ee 1f45d99 d4ac08b 1f45d99 9e66f7d 1f45d99 fcd58ee c0447ed 3ec4a4f 730ea7e 93d2288 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 |
import os
import textwrap
from pathlib import Path
import logging
import numpy as np
from scipy.io.wavfile import write
import config
import csv
import av
def log_block(key: str, value, unit=''):
if config.DEBUG:
return
"""格式化输出日志内容"""
key_fmt = f"[ {key.ljust(25)}]" # 左对齐填充
val_fmt = f"{value} {unit}".strip()
logging.info(f"{key_fmt}: {val_fmt}")
def clear_screen():
"""Clears the console screen."""
os.system("cls" if os.name == "nt" else "clear")
def print_transcript(text):
"""Prints formatted transcript text."""
wrapper = textwrap.TextWrapper(width=60)
for line in wrapper.wrap(text="".join(text)):
print(line)
def format_time(s):
"""Convert seconds (float) to SRT time format."""
hours = int(s // 3600)
minutes = int((s % 3600) // 60)
seconds = int(s % 60)
milliseconds = int((s - int(s)) * 1000)
return f"{hours:02}:{minutes:02}:{seconds:02},{milliseconds:03}"
def create_srt_file(segments, resampled_file):
with open(resampled_file, 'w', encoding='utf-8') as srt_file:
segment_number = 1
for segment in segments:
start_time = format_time(float(segment['start']))
end_time = format_time(float(segment['end']))
text = segment['text']
srt_file.write(f"{segment_number}\n")
srt_file.write(f"{start_time} --> {end_time}\n")
srt_file.write(f"{text}\n\n")
segment_number += 1
def resample(file: str, sr: int = 16000):
"""
Resample the audio file to 16kHz.
Args:
file (str): The audio file to open
sr (int): The sample rate to resample the audio if necessary
Returns:
resampled_file (str): The resampled audio file
"""
container = av.open(file)
stream = next(s for s in container.streams if s.type == 'audio')
resampler = av.AudioResampler(
format='s16',
layout='mono',
rate=sr,
)
resampled_file = Path(file).stem + "_resampled.wav"
output_container = av.open(resampled_file, mode='w')
output_stream = output_container.add_stream('pcm_s16le', rate=sr)
output_stream.layout = 'mono'
for frame in container.decode(audio=0):
frame.pts = None
resampled_frames = resampler.resample(frame)
if resampled_frames is not None:
for resampled_frame in resampled_frames:
for packet in output_stream.encode(resampled_frame):
output_container.mux(packet)
for packet in output_stream.encode(None):
output_container.mux(packet)
output_container.close()
return resampled_file
def save_to_wave(filename, data:np.ndarray, sample_rate=16000):
data = (data * 32767).astype(np.int16)
write(filename, sample_rate, data)
def pcm_bytes_to_np_array(pcm_bytes: bytes, dtype=np.float32, channels=1):
# 1. 转换成 numpy int16 数组(每个采样点是 2 字节)
audio_np = np.frombuffer(pcm_bytes, dtype=np.int16)
audio_np = audio_np.astype(dtype=dtype)
if dtype == np.float32:
audio_np /= 32768.0
# 2. 如果是多声道,例如 2 通道(立体声),你可以 reshape
if channels > 1:
audio_np = audio_np.reshape(-1, channels)
return audio_np
class TestDataWriter:
def __init__(self, file_path='test_data.csv'):
self.file_path = file_path
self.fieldnames = [
'seg_id', 'transcrible_time', 'translate_time',
'transcribleContent', 'from', 'to', 'translateContent', 'partial'
]
self._ensure_file_has_header()
def _ensure_file_has_header(self):
if not os.path.exists(self.file_path) or os.path.getsize(self.file_path) == 0:
with open(self.file_path, mode='w', newline='') as file:
writer = csv.DictWriter(file, fieldnames=self.fieldnames)
writer.writeheader()
def write(self, result: 'DebugResult'):
with open(self.file_path, mode='a', newline='') as file:
writer = csv.DictWriter(file, fieldnames=self.fieldnames)
writer.writerow(result.model_dump(by_alias=True)) |