File size: 4,158 Bytes
ade4f6a
 
 
 
 
 
778443c
ade4f6a
778443c
 
ade4f6a
 
778443c
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
ade4f6a
 
 
 
 
 
 
 
 
 
 
 
69f7e51
ade4f6a
e8abd42
 
ade4f6a
 
 
 
 
 
 
 
 
 
 
 
 
42742c6
5627195
42742c6
79f9224
5627195
42742c6
 
 
 
5627195
 
 
ade4f6a
27d3144
ade4f6a
 
 
 
 
 
 
 
5627195
 
 
 
ade4f6a
 
5627195
ade4f6a
 
5627195
ade4f6a
5627195
 
ade4f6a
5627195
 
ade4f6a
5627195
 
 
 
 
e8abd42
 
 
 
 
 
778443c
 
 
 
 
 
 
 
 
e8abd42
 
778443c
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
import csv
from datetime import datetime
import subprocess
from subprocess import CompletedProcess
from typing import Literal
import re
import time
import difflib
from functools import wraps
from pathlib import Path

import textdistance
import numpy as np
import soundfile as sf

def timer(func):
    @wraps(func)
    def wrapper(*args, **kwargs):
        start_time = time.perf_counter()
        result = func(*args, **kwargs)  # 执行原函数
        end_time = time.perf_counter()
        run_time = end_time - start_time
        print(f"函数 {func.__name__!r} 执行耗时: {run_time:.4f} 秒")
        return result
    return wrapper

class Timer:
    def __init__(self, log=""):
        self.log = log

    def __enter__(self):
        self.start = time.perf_counter()
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        end = time.perf_counter()
        self.duration = end - self.start
        print(f"{self.log} cost: {self.duration:.4f} 秒")

def get_time_str(level:Literal["d","s","ms"]="d"):
    time = datetime.now()
    if level == "d":
        return time.strftime("%Y-%m-%d")
    if level == "s":
        return time.strftime("%H%M%S")
    if level == "ms":
        return time.strftime("%H%M%S.%f")


def save_csv(file_path, header, rows):
    with open(file_path, "w", encoding="utf-8", newline="") as f:
        writer = csv.writer(f)
        if header:
            writer.writerow(header)
        writer.writerows(rows)
        print(f"write csv to {file_path}")

def cmd(command: str, check=True, capture_output=False) -> CompletedProcess:
    print(command)
    if capture_output:
        ret = subprocess.run(command, shell=True, check=check, stdout=subprocess.PIPE, stderr=subprocess.STDOUT,
                             universal_newlines=True)
    else:
        ret = subprocess.run(command, shell=True, check=check)
    print(ret.stdout)
    return ret

import cn2an
def clean_text_for_comparison_zh(text):
    """移除中文标点,并把所有数字都转换成中文的形式"""
    symbol_pattern = "[  ,。、!?::‘’-《》!?;,\n]"
    to = ""
    text = re.sub(symbol_pattern, to, text).lower()
    if re.search(r"\d", text):
        text = cn2an.transform(text, "an2cn")
    return text

def clean_text_for_comparison_en(text):
    symbol_pattern = "[,.\n]"
    to = ""
    return re.sub(symbol_pattern, to, text).lower()


def run_textdistance(text1, text2):
    d = textdistance.levenshtein.distance(text1, text2)
    nd = d / len(text1)
    # print("Levenshtein distance of texts:", d, "normalized distance is:", nd)
    return d, nd

def highlight_diff(a, b, spliter=""):
    if spliter:
        a = a.split(spliter)
        b = b.split(spliter)
    matcher = difflib.SequenceMatcher(None, a, b)
    output = []

    for tag, a_start, a_end, b_start, b_end in matcher.get_opcodes():
        if tag == 'equal':
            output.append(spliter.join(a[a_start:a_end]))
        elif tag == 'delete':
            deleted = spliter.join(a[a_start:a_end])
            output.append(f"[-{deleted}-]")
        elif tag == 'insert':
            inserted = spliter.join(b[b_start:b_end])
            output.append(f"{{+{inserted}+}}")
        elif tag == 'replace':
            deleted = spliter.join(a[a_start:a_end])
            inserted = spliter.join(b[b_start:b_end])
            output.append(f"[-{deleted}-]{{+{inserted}+}}")

    return spliter.join(output)

def time_to_float(s: str):
    if d := s.replace("s", ""):
        return float(d)
    return 0.0

def read_audio(file:Path)->np.ndarray:
    audio, sr = sf.read(file)
    if sr != 16000:
        raise ValueError(f"只支持 16k 采样率的音频,当前采样率为 {sr}")
    return audio.astype(np.float32)

def write_audio(file:Path, audio:np.ndarray, sr=16000):
    sf.write(file, audio, sr)
    print(f"写入音频文件 {file}")

if __name__ == '__main__':
    with Timer() as duration_b:
        print("开始操作 B...")
        time.sleep(0.4)
    print(duration_b.duration)
    with Timer("C") as duration_b:
        print("开始操作 C...")
        time.sleep(0.5)
    print(duration_b.duration)