File size: 8,081 Bytes
c20c287
8593d59
c20c287
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
ab32289
89f7924
 
 
 
 
c20c287
 
 
 
 
 
 
 
 
 
 
 
89f7924
c20c287
 
 
89f7924
c20c287
 
 
 
 
 
 
 
 
 
89f7924
c20c287
 
 
 
 
 
 
 
 
 
 
 
 
 
89f7924
c20c287
 
 
 
 
 
 
 
 
 
 
 
89f7924
c20c287
 
 
 
 
 
 
89f7924
c20c287
 
 
89f7924
c20c287
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
89f7924
c20c287
 
 
 
 
89f7924
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
ab32289
 
89f7924
 
c20c287
89f7924
8593d59
89f7924
ab32289
89f7924
ab32289
 
 
8593d59
 
ab32289
 
 
c20c287
ab32289
c20c287
8593d59
89f7924
 
c20c287
 
 
 
8593d59
c20c287
 
 
 
89f7924
c20c287
 
 
 
 
 
 
 
 
8593d59
c20c287
 
 
 
 
 
 
8593d59
c20c287
89f7924
c20c287
 
8593d59
c20c287
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
import gradio as gr
import io, os, uuid, zipfile, tempfile, subprocess
from pydub import AudioSegment
from pydub.silence import split_on_silence

# ---------- helpers ----------
def _load(file_or_bytes):
    if isinstance(file_or_bytes, (bytes, bytearray)):
        return AudioSegment.from_file(io.BytesIO(file_or_bytes))
    if hasattr(file_or_bytes, "read"):
        return AudioSegment.from_file(file_or_bytes)
    return AudioSegment.from_file(file_or_bytes)

def _export(seg: AudioSegment, fmt="mp3") -> io.BytesIO:
    buf = io.BytesIO()
    seg.export(buf, format=fmt)
    buf.seek(0)
    return buf

def remove_silence(seg: AudioSegment, keep_ms=250, min_silence_ms=120, thresh_db=-45):
    """
    keep_ms: how much silence to keep at each cut (your final pause length)
    min_silence_ms: only treat silence >= this length as a pause
    thresh_db: what counts as "silence" (in dBFS), e.g., -45 for voiceovers
    """
    chunks = split_on_silence(
        seg,
        min_silence_len=int(min_silence_ms),
        silence_thresh=float(thresh_db),
        keep_silence=int(keep_ms),
    )
    return sum(chunks, AudioSegment.silent(duration=0)) if chunks else seg

def trim_to_seconds(seg: AudioSegment, target_s: float):
    t_ms = max(0, int(float(target_s) * 1000))
    if len(seg) >= t_ms:
        return seg[:t_ms]
    # pad if shorter
    return seg + AudioSegment.silent(duration=t_ms - len(seg))

def _atempo_chain(factor: float) -> str:
    # Split large/small adjustments into steps within [0.5, 2.0] for quality
    steps = []
    f = max(0.1, min(10.0, float(factor)))
    while f < 0.5:
        steps.append(0.5); f /= 0.5
    while f > 2.0:
        steps.append(2.0); f /= 2.0
    steps.append(f)
    return ",".join([f"atempo={s:.5f}" for s in steps])

def fit_to_seconds(seg: AudioSegment, target_s: float, fmt_out="mp3") -> io.BytesIO:
    """Pitch-preserving time stretch via FFmpeg atempo."""
    with tempfile.TemporaryDirectory() as d:
        inp = os.path.join(d, "in.wav")
        outp = os.path.join(d, f"out.{fmt_out}")
        seg.export(inp, format="wav")
        orig = max(0.01, len(seg) / 1000)
        factor = float(target_s) / orig
        af = _atempo_chain(factor)
        codec = ["-c:a", "libmp3lame", "-b:a", "128k"] if fmt_out == "mp3" else []
        cmd = ["ffmpeg", "-y", "-i", inp, "-vn", "-af", af, *codec, outp]
        subprocess.run(cmd, check=True)
        with open(outp, "rb") as f:
            return io.BytesIO(f.read())

def normalize_lufs(seg: AudioSegment, target_lufs=-14.0):
    # Lightweight RMS-based normalization (minimal deps)
    import math
    rms = seg.rms or 1
    current_db = 20 * math.log10(rms / (1 << 15))
    gain_db = float(target_lufs) - current_db
    return seg.apply_gain(gain_db)

# ---------- processors ----------
def process_single(file, mode, target_seconds, keep_silence_s,
                   min_silence_ms, silence_thresh_db, do_normalize, fmt):
    raw = file if isinstance(file, (bytes, bytearray)) else file.read()
    original = _load(raw)

    # 1) pause cleanup / normalization
    cleaned = remove_silence(
        original,
        keep_ms=int(float(keep_silence_s) * 1000),
        min_silence_ms=int(min_silence_ms),
        thresh_db=float(silence_thresh_db),
    )

    # 2) loudness normalize
    if do_normalize:
        cleaned = normalize_lufs(cleaned, -14.0)

    # 3) timing
    if mode == "trim" and target_seconds:
        final = trim_to_seconds(cleaned, target_seconds)
        out = _export(final, fmt)
    elif mode == "fit" and target_seconds:
        out = fit_to_seconds(cleaned, target_seconds, fmt_out=fmt)
    else:
        out = _export(cleaned, fmt)

    before = len(original) / 1000
    after = len(_load(out.getvalue())) / 1000
    report = f"Before: {before:.2f}s | After: {after:.2f}s"
    return out, report

def process_batch(files, **kwargs) -> io.BytesIO:
    zbuf = io.BytesIO()
    with zipfile.ZipFile(zbuf, "w", zipfile.ZIP_DEFLATED) as z:
        for f in files:
            single, _ = process_single(f, **kwargs)
            name = getattr(f, "name", f"audio_{uuid.uuid4().hex}")
            stem = os.path.splitext(name)[0]
            z.writestr(f"{stem}_processed.{kwargs['fmt']}", single.getvalue())
    zbuf.seek(0)
    return zbuf

def write_temp_for_preview(blob: io.BytesIO, fmt: str) -> str:
    # Gradio audio prefers a file path for the preview widget
    tf = tempfile.NamedTemporaryFile(delete=False, suffix=f".{fmt}")
    tf.write(blob.getvalue())
    tf.flush(); tf.close()
    return tf.name

# ---------- UI (force two-column, compact) ----------
CSS = """
/* wider canvas */
.gradio-container { max-width: 1200px !important; margin: 0 auto !important; padding: 8px 10px !important; }

/* force two columns with sane minimums */
#twocol {
  display: grid;
  grid-template-columns: minmax(320px, 1fr) minmax(320px, 1fr);
  gap: 12px;
  align-items: start;
}

/* tighten component spacing */
#twocol .block, #twocol .form, #twocol .gap { gap: 8px !important; }
#twocol .gr-button { height: 40px; }
#twocol .gr-number input { height: 36px; }
#twocol .gr-textbox textarea { min-height: 40px; }

/* compact audio bar */
#preview-audio audio { width: 100%; height: 36px; }

/* Only stack on very small screens */
@media (max-width: 600px) {
  #twocol { grid-template-columns: 1fr; }
}
"""

with gr.Blocks(title="AI Voice Studio – Simple", css=CSS) as demo:
    gr.Markdown("### AI Voice Studio — Set pause length; optionally **Trim** or **Fit** to exact time. Export MP3/WAV/M4A/OGG.")

    with gr.Row(elem_id="twocol"):
        # Left column: controls
        with gr.Column():
            files = gr.Files(label="Upload audio", file_types=["audio"], type="filepath")
            mode = gr.Radio(["none", "trim", "fit"], value="none", label="Timing mode")
            target = gr.Number(value=30, label="Target seconds (used for trim/fit)")
            keep = gr.Number(value=0.25, label="Set pause length (seconds)")

            with gr.Accordion("Advanced options", open=False):
                min_sil = gr.Slider(50, 1000, 120, step=10, label="Pause if silence ≥ (ms)")
                thresh = gr.Slider(-80, -10, -45, step=1, label="Silence threshold (dBFS)")
                do_norm = gr.Checkbox(True, label="Normalize loudness (~-14 LUFS)")

            fmt = gr.Dropdown(["mp3","wav","m4a","ogg"], value="mp3", label="Output format")
            go = gr.Button("Process", variant="primary")

        # Right column: outputs
        with gr.Column():
            preview = gr.Audio(label="Preview (first file)", type="filepath", interactive=False, elem_id="preview-audio")
            direct = gr.File(label="Download processed file (single)")
            zip_out = gr.File(label="Download ZIP (if multiple)")
            rep = gr.Textbox(label="Report", lines=1)

    def run(files, mode, target, keep, min_sil, thresh, do_norm, fmt):
        files = files or []
        if not files:
            return None, None, None, "Please upload at least one audio file."

        # process first file (preview + single download)
        single_blob, report = process_single(
            open(files[0], "rb"),
            mode=mode, target_seconds=target, keep_silence_s=keep,
            min_silence_ms=min_sil, silence_thresh_db=thresh,
            do_normalize=do_norm, fmt=fmt
        )
        preview_path = write_temp_for_preview(single_blob, fmt)

        if len(files) == 1:
            return preview_path, single_blob, None, report
        else:
            opened = [open(p, "rb") for p in files]
            zipped = process_batch(
                opened, mode=mode, target_seconds=target, keep_silence_s=keep,
                min_silence_ms=min_sil, silence_thresh_db=thresh,
                do_normalize=do_norm, fmt=fmt
            )
            return preview_path, None, zipped, report

    # wire UI
    go.click(
        run,
        [files, mode, target, keep, min_sil, thresh, do_norm, fmt],
        [preview, direct, zip_out, rep]
    )

if __name__ == "__main__":
    demo.queue().launch()