#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ main.py — одностраничный Flask сервер для онлайн-обработки аудио с множеством эффектов. Запуск: gunicorn -b 0.0.0.0:7860 main:app Требования (requirements.txt): flask pydub gunicorn python-multipart ВАЖНО: - ffmpeg должен быть установлен в контейнере (apt-get install -y ffmpeg перед сменой пользователя). - Этот файл использует pydub и иногда вызывает ffmpeg через subprocess для некоторых эффектов (echo, pitch_shift, reverb). """ import os import io import uuid import time import threading import subprocess from pathlib import Path from typing import List, Dict, Any from flask import ( Flask, request, jsonify, url_for, send_file, render_template_string, abort ) from pydub import AudioSegment, effects, silence # ---------------------------- # Конфигурация # ---------------------------- TMP_DIR = Path("./tmp") TMP_DIR.mkdir(exist_ok=True) KEEP_FILES_SEC = 60 * 60 # храним файлы 1 час MAX_UPLOAD_MB = 200 app = Flask(__name__) app.config["MAX_CONTENT_LENGTH"] = MAX_UPLOAD_MB * 1024 * 1024 # job state job_logs: Dict[str, List[Dict[str, Any]]] = {} job_files: Dict[str, Path] = {} jobs_lock = threading.Lock() # ---------------------------- # Утилиты # ---------------------------- def random_filename(ext: str) -> Path: return TMP_DIR / (str(uuid.uuid4()) + "." + ext) def secure_extension_for_format(fmt: str) -> str: fmt = (fmt or "").lower().strip() if fmt in ("mp3", "mpeg"): return "mp3" if fmt in ("wav", "wave"): return "wav" if fmt in ("flac",): return "flac" if fmt in ("ogg", "vorbis"): return "ogg" if fmt in ("m4a", "mp4", "aac"): return "m4a" return "wav" def cleanup_old_files(): now = time.time() for p in list(TMP_DIR.iterdir()): try: if p.is_file() and (now - p.stat().st_mtime) > KEEP_FILES_SEC: p.unlink() except Exception: pass def load_audio_from_filestorage(filestorage): data = filestorage.read() buf = io.BytesIO(data) filename = getattr(filestorage, "filename", "") or "" ext = "" if "." in filename: ext = filename.rsplit(".", 1)[1].lower() # Try extension-guessing first if ext: try: buf.seek(0) return AudioSegment.from_file(buf, format=ext) except Exception: pass # Try common formats for fmt in ("mp3", "wav", "ogg", "flac", "m4a", "aac"): try: buf.seek(0) return AudioSegment.from_file(buf, format=fmt) except Exception: continue buf.seek(0) return AudioSegment.from_file(buf) # ---------------------------- # Pydub-based эффекты # ---------------------------- def apply_trim_silence(seg: AudioSegment, silence_thresh_db=-50, chunk_len_ms=10): non_silence = silence.detect_nonsilent(seg, min_silence_len=chunk_len_ms, silence_thresh=silence_thresh_db) if not non_silence: return seg[:0] start = non_silence[0][0] end = non_silence[-1][1] return seg[start:end] def apply_lowpass(seg: AudioSegment, cutoff_hz: float): if cutoff_hz and cutoff_hz > 0: return seg.low_pass_filter(int(cutoff_hz)) return seg def apply_highpass(seg: AudioSegment, cutoff_hz: float): if cutoff_hz and cutoff_hz > 0: return seg.high_pass_filter(int(cutoff_hz)) return seg def apply_speed_change(seg: AudioSegment, speed: float): if speed <= 0 or abs(speed - 1.0) < 1e-6: return seg try: return effects.speedup(seg, playback_speed=speed) except Exception: new_frame_rate = int(seg.frame_rate * speed) return seg._spawn(seg.raw_data, overrides={"frame_rate": new_frame_rate}).set_frame_rate(seg.frame_rate) def apply_normalize(seg: AudioSegment): return effects.normalize(seg) def apply_gain(seg: AudioSegment, db: float): try: return seg.apply_gain(float(db)) except Exception: return seg def apply_fade(seg: AudioSegment, fade_in_ms: int, fade_out_ms: int): if fade_in_ms and fade_in_ms > 0: seg = seg.fade_in(int(fade_in_ms)) if fade_out_ms and fade_out_ms > 0: seg = seg.fade_out(int(fade_out_ms)) return seg def apply_reverse(seg: AudioSegment): try: return seg.reverse() except Exception: return seg # ---------------------------- # FFmpeg-based helpers для echo/reverb/pitch shift # ---------------------------- def ffmpeg_apply_filter_on_segment(seg: AudioSegment, filter_str: str) -> AudioSegment: """ Экспорт сегмента во временный WAV, применить ffmpeg -af filter_str, загрузить обратно. """ in_tmp = random_filename("wav") out_tmp = random_filename("wav") try: seg.export(in_tmp.as_posix(), format="wav") cmd = [ "ffmpeg", "-y", "-hide_banner", "-loglevel", "error", "-i", in_tmp.as_posix(), "-af", filter_str, out_tmp.as_posix() ] subprocess.run(cmd, check=True) result = AudioSegment.from_file(out_tmp.as_posix(), format="wav") except subprocess.CalledProcessError as e: raise RuntimeError(f"ffmpeg error: {e}") finally: try: if in_tmp.exists(): in_tmp.unlink() except Exception: pass # clean out_tmp later by caller (we'll try to remove now) try: if out_tmp.exists(): out_tmp.unlink() except Exception: pass return result def ffmpeg_apply_filter_files(input_path: Path, filter_str: str, out_path: Path): cmd = [ "ffmpeg", "-y", "-hide_banner", "-loglevel", "error", "-i", input_path.as_posix(), "-af", filter_str, out_path.as_posix() ] subprocess.run(cmd, check=True) def apply_echo_with_ffmpeg(seg: AudioSegment, delay_ms: float = 500, decay: float = 0.5): # aecho=in_gain:out_gain:delays:decays (delays in ms, decays 0..1) delays = str(int(delay_ms)) decays = str(float(decay)) filter_str = f"aecho=0.8:0.9:{delays}:{decays}" return ffmpeg_apply_filter_on_segment(seg, filter_str) def apply_reverb_with_ffmpeg(seg: AudioSegment, reverb_level: float = 50.0): # Emulate reverb using multiple echoes — simple approach # Compose multiple delays and decays d1 = 60; d2 = 120; d3 = 180; d4 = 240 dec1 = max(0.1, min(0.9, reverb_level / 100.0)) # build aecho with multiple delays/decays: delays separated by '|' and decays same length filter_str = f"aecho=0.7:0.75:{d1}|{d2}|{d3}|{d4}:{dec1}|{dec1*0.6}|{dec1*0.4}|{dec1*0.2}" return ffmpeg_apply_filter_on_segment(seg, filter_str) def apply_pitch_shift_with_ffmpeg(seg: AudioSegment, semitones: float): """ Pitch shift preserving tempo using ffmpeg filters: asetrate=sr*2^(semitones/12),aresample=sr,atempo=1/2^(semitones/12) This approach preserves duration (attempt) but may produce artifacts for large shifts. """ # Export, run ffmpeg, reload if abs(semitones) < 0.001: return seg factor = 2 ** (semitones / 12.0) # Build filter string # Increase sample rate by factor, then resample back, then adjust tempo by inverse factor filter_str = f"asetrate=sample_rate*{factor},aresample=sample_rate,atempo={1.0/factor}" return ffmpeg_apply_filter_on_segment(seg, filter_str) # ---------------------------- # Логирование job # ---------------------------- def post_log(job_id: str, item: dict): with jobs_lock: if job_id in job_logs: job_logs[job_id].append(item) # ---------------------------- # Рабочий поток обработки # ---------------------------- def processing_worker(job_id: str, input_path: Path, original_filename: str, options: dict): try: post_log(job_id, {"type": "progress", "msg": "Получаю файл...", "percent": 2}) # load try: with open(input_path, "rb") as f: class _FS: def __init__(self, data, filename): self._data = data self.filename = filename def read(self): return self._data fs_wrapper = _FS(f.read(), original_filename) seg = load_audio_from_filestorage(fs_wrapper) except Exception as e: post_log(job_id, {"type": "error", "msg": f"Не удалось загрузить аудио: {e}"}) return post_log(job_id, {"type": "progress", "msg": "Анализ аудио...", "percent": 8}) time.sleep(0.08) # trim silence if options.get("trim_silence"): post_log(job_id, {"type": "progress", "msg": "Обрезаю тишину...", "percent": 18}) try: seg = apply_trim_silence(seg, silence_thresh_db=int(options.get("sil_thresh", -50))) except Exception as exc: post_log(job_id, {"type": "error", "msg": f"Ошибка при обрезке тишины: {exc}"}) return time.sleep(0.05) # low/high pass lp = float(options.get("lowpass", 0) or 0) hp = float(options.get("highpass", 0) or 0) if lp > 0: post_log(job_id, {"type": "progress", "msg": f"Применяю low-pass {int(lp)} Hz...", "percent": 30}) try: seg = apply_lowpass(seg, lp) except Exception as exc: post_log(job_id, {"type": "error", "msg": f"Ошибка low-pass: {exc}"}) return time.sleep(0.04) if hp > 0: post_log(job_id, {"type": "progress", "msg": f"Применяю high-pass {int(hp)} Hz...", "percent": 36}) try: seg = apply_highpass(seg, hp) except Exception as exc: post_log(job_id, {"type": "error", "msg": f"Ошибка high-pass: {exc}"}) return time.sleep(0.04) # gain in dB gain_db = float(options.get("gain_db", 0) or 0) if abs(gain_db) > 0.001: post_log(job_id, {"type": "progress", "msg": f"Применяю усиление {gain_db} dB...", "percent": 42}) try: seg = apply_gain(seg, gain_db) except Exception as exc: post_log(job_id, {"type": "error", "msg": f"Ошибка gain: {exc}"}) return time.sleep(0.03) # fade in/out fi = int(options.get("fade_in_ms", 0) or 0) fo = int(options.get("fade_out_ms", 0) or 0) if fi > 0 or fo > 0: post_log(job_id, {"type": "progress", "msg": f"Применяю fade in {fi} ms / fade out {fo} ms...", "percent": 48}) try: seg = apply_fade(seg, fi, fo) except Exception as exc: post_log(job_id, {"type": "error", "msg": f"Ошибка fade: {exc}"}) return time.sleep(0.03) # reverse if options.get("reverse"): post_log(job_id, {"type": "progress", "msg": "Реверс трека...", "percent": 52}) try: seg = apply_reverse(seg) except Exception as exc: post_log(job_id, {"type": "error", "msg": f"Ошибка reverse: {exc}"}) return time.sleep(0.02) # speed change speed = float(options.get("speed", 1.0) or 1.0) if abs(speed - 1.0) > 1e-6: post_log(job_id, {"type": "progress", "msg": f"Изменяю скорость x{speed}...", "percent": 60}) try: seg = apply_speed_change(seg, speed) except Exception as exc: post_log(job_id, {"type": "error", "msg": f"Ошибка изменения скорости: {exc}"}) return time.sleep(0.03) # normalize if options.get("normalize"): post_log(job_id, {"type": "progress", "msg": "Нормализация громкости...", "percent": 68}) try: seg = apply_normalize(seg) except Exception as exc: post_log(job_id, {"type": "error", "msg": f"Ошибка при нормализации: {exc}"}) return time.sleep(0.03) # pitch shift via ffmpeg (preserve tempo) pitch = float(options.get("pitch_semitones", 0) or 0) if abs(pitch) > 0.001: post_log(job_id, {"type": "progress", "msg": f"Сдвиг тона на {pitch} полутонов...", "percent": 75}) try: seg = apply_pitch_shift_with_ffmpeg(seg, pitch) except Exception as exc: post_log(job_id, {"type": "error", "msg": f"Ошибка pitch shift: {exc}"}) return time.sleep(0.04) # echo if options.get("echo"): delay_ms = float(options.get("echo_delay_ms", 500) or 500) decay = float(options.get("echo_decay", 0.5) or 0.5) post_log(job_id, {"type": "progress", "msg": f"Применяю echo: delay {int(delay_ms)} ms, decay {decay}...", "percent": 82}) try: seg = apply_echo_with_ffmpeg(seg, delay_ms=delay_ms, decay=decay) except Exception as exc: post_log(job_id, {"type": "error", "msg": f"Ошибка echo: {exc}"}) return time.sleep(0.03) # reverb if options.get("reverb"): rv_level = float(options.get("reverb_level", 50) or 50) post_log(job_id, {"type": "progress", "msg": f"Применяю reverb (level {rv_level})...", "percent": 88}) try: seg = apply_reverb_with_ffmpeg(seg, reverb_level=rv_level) except Exception as exc: post_log(job_id, {"type": "error", "msg": f"Ошибка reverb: {exc}"}) return time.sleep(0.03) # export to selected format out_fmt = secure_extension_for_format(options.get("out_format", "wav")) out_path = random_filename(out_fmt) post_log(job_id, {"type": "progress", "msg": "Экспортирую файл...", "percent": 94}) try: export_kwargs = {} if out_fmt == "mp3": export_kwargs["bitrate"] = "192k" seg.export(out_path.as_posix(), format=out_fmt, **export_kwargs) except Exception as exc: post_log(job_id, {"type": "error", "msg": f"Ошибка при экспорте: {exc}"}) return with jobs_lock: job_files[job_id] = out_path post_log(job_id, {"type": "done", "msg": "Готово", "percent": 100, "file": out_path.name}) except Exception as e: post_log(job_id, {"type": "error", "msg": f"Внутренняя ошибка: {e}"}) # ---------------------------- # Шаблон UI (улучшен контраст) # ---------------------------- INDEX_HTML = """
Добавлено много эффектов: gain, fade, reverse, pitch, echo, reverb, фильтры, speed, normalize и др. Результат — в реальном времени через AJAX-поллинг.