lifesee commited on
Commit
c20c287
·
verified ·
1 Parent(s): 06efc4c

add app.py

Browse files
Files changed (1) hide show
  1. app.py +179 -0
app.py ADDED
@@ -0,0 +1,179 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import gradio as gr
2
+ import io, os, uuid, zipfile, tempfile, subprocess
3
+ from pydub import AudioSegment
4
+ from pydub.silence import split_on_silence
5
+
6
+ # ---------- helpers ----------
7
+ def _load(file_or_bytes):
8
+ if isinstance(file_or_bytes, (bytes, bytearray)):
9
+ return AudioSegment.from_file(io.BytesIO(file_or_bytes))
10
+ if hasattr(file_or_bytes, "read"):
11
+ return AudioSegment.from_file(file_or_bytes)
12
+ return AudioSegment.from_file(file_or_bytes)
13
+
14
+ def _export(seg: AudioSegment, fmt="mp3") -> io.BytesIO:
15
+ buf = io.BytesIO()
16
+ seg.export(buf, format=fmt)
17
+ buf.seek(0)
18
+ return buf
19
+
20
+ def remove_silence(seg: AudioSegment, keep_ms=50, min_silence_ms=100, thresh_db=-45):
21
+ chunks = split_on_silence(
22
+ seg,
23
+ min_silence_len=int(min_silence_ms),
24
+ silence_thresh=float(thresh_db),
25
+ keep_silence=int(keep_ms),
26
+ )
27
+ return sum(chunks, AudioSegment.silent(duration=0)) if chunks else seg
28
+
29
+ def trim_to_seconds(seg: AudioSegment, target_s: float):
30
+ t_ms = max(0, int(float(target_s) * 1000))
31
+ if len(seg) >= t_ms:
32
+ return seg[:t_ms]
33
+ return seg + AudioSegment.silent(duration=t_ms - len(seg))
34
+
35
+ def _atempo_chain(factor: float) -> str:
36
+ # Build a chain so each step stays within [0.5, 2.0] for better quality.
37
+ steps = []
38
+ f = max(0.1, min(10.0, float(factor)))
39
+ while f < 0.5:
40
+ steps.append(0.5); f /= 0.5
41
+ while f > 2.0:
42
+ steps.append(2.0); f /= 2.0
43
+ steps.append(f)
44
+ return ",".join([f"atempo={s:.5f}" for s in steps])
45
+
46
+ def fit_to_seconds(seg: AudioSegment, target_s: float, fmt_out="mp3") -> io.BytesIO:
47
+ """Pitch-preserving time stretch via FFmpeg atempo."""
48
+ with tempfile.TemporaryDirectory() as d:
49
+ inp = os.path.join(d, "in.wav")
50
+ outp = os.path.join(d, f"out.{fmt_out}")
51
+ seg.export(inp, format="wav")
52
+ orig = max(0.01, len(seg) / 1000)
53
+ factor = float(target_s) / orig
54
+ af = _atempo_chain(factor)
55
+ codec = ["-c:a", "libmp3lame", "-b:a", "128k"] if fmt_out == "mp3" else []
56
+ cmd = ["ffmpeg", "-y", "-i", inp, "-vn", "-af", af, *codec, outp]
57
+ subprocess.run(cmd, check=True)
58
+ with open(outp, "rb") as f:
59
+ return io.BytesIO(f.read())
60
+
61
+ def normalize_lufs(seg: AudioSegment, target_lufs=-14.0):
62
+ # Lightweight perceived normalization using RMS (keeps deps minimal).
63
+ import math
64
+ rms = seg.rms or 1
65
+ current_db = 20 * math.log10(rms / (1 << 15))
66
+ gain_db = float(target_lufs) - current_db
67
+ return seg.apply_gain(gain_db)
68
+
69
+ # ---------- processors ----------
70
+ def process_single(file, mode, target_seconds, keep_silence_s,
71
+ min_silence_ms, silence_thresh_db, do_normalize, fmt):
72
+ raw = file if isinstance(file, (bytes, bytearray)) else file.read()
73
+ original = _load(raw)
74
+
75
+ # 1) optional silence removal / pause control
76
+ cleaned = remove_silence(
77
+ original,
78
+ keep_ms=int(float(keep_silence_s) * 1000),
79
+ min_silence_ms=int(min_silence_ms),
80
+ thresh_db=float(silence_thresh_db),
81
+ )
82
+
83
+ # 2) optional loudness normalize
84
+ if do_normalize:
85
+ cleaned = normalize_lufs(cleaned, -14.0)
86
+
87
+ # 3) timing mode
88
+ if mode == "trim" and target_seconds:
89
+ final = trim_to_seconds(cleaned, target_seconds)
90
+ out = _export(final, fmt)
91
+ elif mode == "fit" and target_seconds:
92
+ out = fit_to_seconds(cleaned, target_seconds, fmt_out=fmt)
93
+ else:
94
+ out = _export(cleaned, fmt)
95
+
96
+ before = len(original) / 1000
97
+ after = len(_load(out.getvalue())) / 1000
98
+ report = f"Before: {before:.2f}s | After: {after:.2f}s"
99
+ return out, report
100
+
101
+ def process_batch(files, **kwargs) -> io.BytesIO:
102
+ zbuf = io.BytesIO()
103
+ with zipfile.ZipFile(zbuf, "w", zipfile.ZIP_DEFLATED) as z:
104
+ for f in files:
105
+ single, _ = process_single(f, **kwargs)
106
+ name = getattr(f, "name", f"audio_{uuid.uuid4().hex}")
107
+ stem = os.path.splitext(name)[0]
108
+ z.writestr(f"{stem}_processed.{kwargs['fmt']}", single.getvalue())
109
+ zbuf.seek(0)
110
+ return zbuf
111
+
112
+ def write_temp_for_preview(blob: io.BytesIO, fmt: str) -> str:
113
+ # Gradio Audio preview works great with a file path; write a temp file.
114
+ tf = tempfile.NamedTemporaryFile(delete=False, suffix=f".{fmt}")
115
+ tf.write(blob.getvalue())
116
+ tf.flush(); tf.close()
117
+ return tf.name
118
+
119
+ # ---------- UI ----------
120
+ with gr.Blocks(title="AI Voice Studio – Pause Control, Trim, Fit") as demo:
121
+ gr.Markdown(
122
+ "### Remove or normalize pauses, **set pause length**, **trim to exact time**, or **fit length (pitch preserved)**.\n"
123
+ "_Outputs: mp3 / wav / m4a / ogg. Single file → direct download. Multiple files → ZIP._"
124
+ )
125
+
126
+ with gr.Row():
127
+ with gr.Column():
128
+ files = gr.Files(label="Upload audio (one or many)", file_types=["audio"], type="filepath")
129
+ mode = gr.Radio(["none", "trim", "fit"], value="none", label="Timing mode")
130
+ target = gr.Number(value=30, label="Target seconds (for trim/fit)")
131
+ keep = gr.Number(value=0.25, label="Set pause length (seconds kept at cuts)")
132
+ min_sil = gr.Slider(50, 1000, 120, step=10, label="Count a pause if silence ≥ (ms)")
133
+ thresh = gr.Slider(-80, -10, -45, step=1, label="Silence threshold (dBFS)")
134
+ do_norm = gr.Checkbox(True, label="Normalize loudness (~-14 LUFS)")
135
+ fmt = gr.Dropdown(["mp3","wav","m4a","ogg"], value="mp3", label="Output format")
136
+ go = gr.Button("Process")
137
+
138
+ with gr.Column():
139
+ preview = gr.Audio(label="Preview (first file)", type="filepath", interactive=False)
140
+ direct = gr.File(label="Download processed file (single)")
141
+ zip_out = gr.File(label="Download ZIP (if multiple)")
142
+ rep = gr.Textbox(label="Report", lines=1)
143
+
144
+ def run(files, mode, target, keep, min_sil, thresh, do_norm, fmt):
145
+ files = files or []
146
+ if not files:
147
+ return None, None, None, "Please upload at least one audio file."
148
+
149
+ # Process first file for preview & (if single) for direct download
150
+ single_blob, report = process_single(
151
+ open(files[0], "rb"),
152
+ mode=mode, target_seconds=target, keep_silence_s=keep,
153
+ min_silence_ms=min_sil, silence_thresh_db=thresh,
154
+ do_normalize=do_norm, fmt=fmt
155
+ )
156
+ preview_path = write_temp_for_preview(single_blob, fmt)
157
+
158
+ if len(files) == 1:
159
+ # Direct download for single file
160
+ direct_file = single_blob
161
+ return preview_path, direct_file, None, report
162
+ else:
163
+ # ZIP for multiple files
164
+ opened = [open(p, "rb") for p in files]
165
+ zipped = process_batch(
166
+ opened, mode=mode, target_seconds=target, keep_silence_s=keep,
167
+ min_silence_ms=min_sil, silence_thresh_db=thresh,
168
+ do_normalize=do_norm, fmt=fmt
169
+ )
170
+ return preview_path, None, zipped, report
171
+
172
+ go.click(
173
+ run,
174
+ [files, mode, target, keep, min_sil, thresh, do_norm, fmt],
175
+ [preview, direct, zip_out, rep]
176
+ )
177
+
178
+ if __name__ == "__main__":
179
+ demo.queue().launch()