| import os
|
| from typing import List, Optional, Tuple, Union, Any, Dict, Literal, Callable
|
| import shutil
|
| from concurrent.futures import ThreadPoolExecutor, as_completed
|
| from tqdm import tqdm
|
| import json
|
| from datetime import datetime
|
| import requests
|
| from pathlib import Path
|
| import gradio as gr
|
| import gc
|
| from i18n import _i18n
|
| from namer import Namer
|
| import time
|
| from datetime import timezone, timedelta
|
| from audio import get_audio_files_from_list
|
| import psutil
|
| import os
|
| import ctypes
|
| import platform
|
| import numpy as np
|
| import yt_dlp
|
| import subprocess
|
|
|
| try:
|
| import spaces
|
| except ImportError:
|
| spaces = None
|
|
|
| zerogpu_available = False
|
|
|
| def hf_spaces_gpu(*args, **kwargs):
|
| """Декоратор для GPU на HF Spaces с fallback для локального запуска"""
|
| if len(args) == 1 and callable(args[0]):
|
| return args[0]
|
| return lambda f: f
|
|
|
| if spaces is not None:
|
| try:
|
| if hasattr(spaces, 'GPU'):
|
| zerogpu_available = True
|
| print(_i18n("zerogpu=true"))
|
| hf_spaces_gpu = spaces.GPU
|
| except:
|
| pass
|
|
|
| import torch
|
| tz = timezone(timedelta(hours=3))
|
|
|
| def get_gdrive_dir():
|
| try:
|
| result = subprocess.run(['/bin/mount'], capture_output=True, text=True)
|
| for line in result.stdout.strip().split('\n'):
|
| if 'type fuse.drive' in line:
|
| parts = line.split(' type ')
|
| if len(parts) >= 2:
|
| source_mount = parts[0]
|
| source, mount_point = source_mount.split(' on ')
|
| return mount_point
|
| except:
|
| pass
|
| return None
|
|
|
| def easy_check_is_colab() -> bool:
|
| """
|
| Проверить, выполняется ли код в Google Colab
|
|
|
| Returns:
|
| True если в Colab
|
| """
|
| if platform.machine() == "x86_64" and "Linux" in platform.platform():
|
| try:
|
| import google.colab
|
| module_path: str = google.colab.__file__
|
| if module_path.startswith("/usr/local/lib/python") and module_path.endswith("/dist-packages/google/colab/__init__.py"):
|
| return True
|
| else:
|
| return False
|
| except ImportError:
|
| return False
|
| else:
|
| return False
|
|
|
| class DownloadError(Exception): pass
|
|
|
| base_c_params = {
|
| "input_file": {
|
| "interactive": True,
|
| "type": "filepath",
|
| "file_count": "single"
|
| },
|
| "input_files_multi": {
|
| "interactive": True,
|
| "type": "filepath",
|
| "file_count": "multiple"
|
| },
|
| "output_audio": {
|
| "interactive": False,
|
| "type": "filepath",
|
| "show_download_button": True
|
| },
|
| "base": {
|
| "interactive": True
|
| },
|
| "dropdown_multi": {
|
| "interactive": True,
|
| "multiselect": True
|
| },
|
| "dropdown": {
|
| "interactive": True,
|
| "multiselect": False
|
| }
|
| }
|
|
|
| def size_readable(size_bytes: int):
|
| if size_bytes == 0:
|
| return f"0 {_i18n('bytes')}"
|
|
|
| units = (_i18n('bytes'), _i18n('kbytes'), _i18n('mbytes'), _i18n('gbytes'), _i18n('tbytes'))
|
| i = 0
|
|
|
| while size_bytes >= 1024 and i < len(units) - 1:
|
| size_bytes /= 1024
|
| i += 1
|
| return f"{size_bytes:.2f} {units[i]}"
|
|
|
| def get_size_folder(folder: str | Path):
|
| folder_path = Path(folder)
|
| return sum([file.stat().st_size for file in folder_path.rglob('*') if file.is_file()])
|
|
|
| def get_disk_usage(path="/content/drive/MyDrive", user_dir="", user_gdrive_dir="", list_subdirs=[]):
|
| try:
|
| usage = shutil.disk_usage(path)
|
|
|
| total_gb = size_readable(usage.total)
|
| used_gb = size_readable(usage.used)
|
| free_gb = size_readable(usage.free)
|
| return f"""{_i18n("all_space")}: {total_gb}
|
| {_i18n("used_space")}: {used_gb}
|
| {_i18n("free_space")}: {free_gb}"""
|
| except Exception as e:
|
| return ""
|
|
|
| def define_audio_with_size(basename: bool = False, **kwargs):
|
| path = kwargs.get("value", None)
|
| if not path:
|
| return gr.update(**kwargs)
|
| file_path = Path(path)
|
| if "label" in kwargs:
|
| temp_label = f"[{size_readable(file_path.stat().st_size)}] " + (file_path.stem if basename else kwargs["label"])
|
| kwargs["label"] = temp_label
|
| return gr.Audio(**kwargs)
|
|
|
| def update_audio_with_size(basename: bool = False, **kwargs):
|
| path = kwargs.get("value", None)
|
| if not path:
|
| return gr.update(**kwargs)
|
| file_path = Path(path)
|
| if "label" in kwargs:
|
| temp_label = f"[{size_readable(file_path.stat().st_size)}] " + (file_path.stem if basename else kwargs["label"])
|
| kwargs["label"] = temp_label
|
| return gr.update(**kwargs)
|
|
|
|
|
| class DownloadError(Exception):
|
| """Custom exception for download errors"""
|
| pass
|
|
|
| def format_size(size_bytes: int) -> str:
|
| """
|
| Format file size with appropriate units using i18n
|
|
|
| Args:
|
| size_bytes: Size in bytes
|
|
|
| Returns:
|
| Formatted string with units
|
| """
|
| if size_bytes < 1024:
|
| return f"{size_bytes} {_i18n('bytes')}"
|
| elif size_bytes < 1024**2:
|
| return f"{size_bytes / 1024:.2f} {_i18n('kbytes')}"
|
| elif size_bytes < 1024**3:
|
| return f"{size_bytes / (1024**2):.2f} {_i18n('mbytes')}"
|
| elif size_bytes < 1024**4:
|
| return f"{size_bytes / (1024**3):.2f} {_i18n('gbytes')}"
|
| else:
|
| return f"{size_bytes / (1024**4):.2f} {_i18n('tbytes')}"
|
|
|
|
|
| def dw_file(
|
| url_model: str,
|
| local_path: Union[str, Path],
|
| retries: int = 180,
|
| timeout: int = 300,
|
| chunk_size: int = 8192,
|
| progress_callback: Optional[Callable[[int, int], None]] = None
|
| ) -> None:
|
| """
|
| Download file with resume support and hash verification
|
|
|
| Args:
|
| url_model: File URL
|
| local_path: Local path for saving
|
| retries: Number of retry attempts
|
| timeout: Request timeout in seconds
|
| chunk_size: Download chunk size in bytes
|
| resume: Enable resume for partial downloads
|
| expected_hash: Expected hash value (if None and auto_detect_hash=True, try to get from server)
|
| hash_algorithm: Hash algorithm to use (md5, sha1, sha256, sha512)
|
| auto_detect_hash: Try to get hash from server automatically
|
| verify_after_download: Verify hash after download completion
|
| progress_callback: Optional callback for progress updates (current, total)
|
|
|
| Raises:
|
| DownloadError: If download fails or hash verification fails
|
| """
|
| local_path_ = Path(local_path)
|
| local_path_.parent.mkdir(parents=True, exist_ok=True)
|
|
|
|
|
| headers = {}
|
|
|
| for attempt in range(retries):
|
| try:
|
| with requests.Session() as session:
|
| session.headers.update({
|
| "User-Agent": "Mozilla/5.0 (compatible; MVSepless/1.0)"
|
| })
|
|
|
| response = session.get(
|
| url_model,
|
| stream=True,
|
| timeout=timeout,
|
| headers=headers
|
| )
|
|
|
|
|
| if response.status_code == 200:
|
|
|
| total_size = int(response.headers.get("content-length", 0))
|
| mode = "wb"
|
| initial_progress = 0
|
| print(f"[{_i18n('status')}] {_i18n('download_start')} {format_size(total_size)}")
|
|
|
| else:
|
| raise DownloadError(f"HTTP {response.status_code}")
|
|
|
|
|
| with tqdm(
|
| total=total_size,
|
| desc=local_path_.name,
|
| unit="B",
|
| unit_scale=True,
|
| unit_divisor=1024,
|
| initial=initial_progress,
|
| bar_format="{l_bar}{bar}| {n_fmt}/{total_fmt} [{elapsed}<{remaining}, {rate_fmt}]"
|
| ) as progress_bar:
|
|
|
| with open(local_path_, mode) as f:
|
| for chunk in response.iter_content(chunk_size=chunk_size):
|
| if chunk:
|
| f.write(chunk)
|
| progress_bar.update(len(chunk))
|
| if progress_callback:
|
| progress_callback(progress_bar.n, total_size)
|
|
|
| print(f"[{_i18n('status')}] ✓ {_i18n('download_complete')}: {local_path_}")
|
| return
|
|
|
| except (requests.RequestException, DownloadError) as e:
|
| print(_i18n(
|
| "download_attempt_failed",
|
| attempt=attempt + 1,
|
| retries=retries,
|
| error=str(e)
|
| ))
|
|
|
| if attempt < retries - 1:
|
| print(_i18n("retrying"))
|
| else:
|
| print(_i18n("all_download_attempts_failed"))
|
| raise DownloadError(f"{_i18n('download_error', error=str(e))}")
|
|
|
| def dw_yt_dlp(
|
| url: str,
|
| output_dir: str | Path = None,
|
| cookie: str | Path = None,
|
| output_format: str = "mp3",
|
| output_bitrate: str = "320",
|
| title: str = None,
|
| ) -> str:
|
| """
|
| Скачать аудио с YouTube с помощью yt-dlp
|
|
|
| Args:
|
| url: URL видео
|
| output_dir: Директория для сохранения
|
| cookie: Путь к файлу с cookies
|
| output_format: Формат выходного файла
|
| output_bitrate: Битрейт
|
| title: Название файла
|
|
|
| Returns:
|
| Путь к скачанному файлу или None
|
| """
|
| if not output_dir:
|
| output_dir = Path(".")
|
| output_dir = Path(output_dir)
|
| output_dir.mkdir(parents=True, exist_ok=True)
|
| outtmpl = "%(title)s.%(ext)s" if title is None else f"{title}.%(ext)s"
|
| output_path_template = output_dir / outtmpl
|
|
|
| ydl_opts = {
|
| "format": "bestaudio/best",
|
| "outtmpl": str(output_path_template),
|
| "postprocessors": [
|
| {
|
| "key": "FFmpegExtractAudio",
|
| "preferredcodec": output_format,
|
| "preferredquality": output_bitrate,
|
| }
|
| ],
|
| "noplaylist": True,
|
| "quiet": True,
|
| "no_warnings": True,
|
| }
|
|
|
| if cookie:
|
| cookie_path = Path(cookie)
|
| if cookie_path.exists():
|
| ydl_opts["cookiefile"] = cookie
|
|
|
| with yt_dlp.YoutubeDL(ydl_opts) as ydl:
|
| try:
|
| info = ydl.extract_info(url, process=True, download=True)
|
| if "_type" in info and info["_type"] == "playlist":
|
| entry = info["entries"][0]
|
| filename = ydl.prepare_filename(entry)
|
| else:
|
| filename = ydl.prepare_filename(info)
|
|
|
| output_file = Path(filename)
|
| audio_file = output_file.with_suffix(f".{output_format}")
|
|
|
| return audio_file.as_posix()
|
| except Exception as e:
|
| print(_i18n("download_error", error=str(e)))
|
| return None
|
|
|
| def one_element_list_to_value(input_list: list | tuple):
|
| if input_list:
|
| return input_list[0]
|
| else:
|
| return None
|
|
|
| def emergency_ram_clear():
|
| """
|
| Экстренная очистка — удаление ВСЕХ больших объектов в процессе.
|
| Использовать только если nuclear_clear_model недостаточно.
|
| """
|
|
|
| process = psutil.Process(os.getpid())
|
| mem_before = process.memory_info().rss
|
|
|
|
|
| for name in list(globals().keys()):
|
| try:
|
| obj = globals()[name]
|
| if isinstance(obj, (np.ndarray, torch.Tensor)):
|
| if isinstance(obj, np.ndarray):
|
| obj.fill(0)
|
| del globals()[name]
|
| except:
|
| pass
|
|
|
| for name in list(locals().keys()):
|
| try:
|
| obj = locals()[name]
|
| if isinstance(obj, (np.ndarray, torch.Tensor)) and name != 'self':
|
| if isinstance(obj, np.ndarray):
|
| obj.fill(0)
|
| del locals()[name]
|
| except:
|
| pass
|
|
|
| for _ in range(5):
|
| gc.collect()
|
|
|
| system = platform.system()
|
| if system == "Linux":
|
| try:
|
| ctypes.CDLL('libc.so.6').malloc_trim(0)
|
| except:
|
| pass
|
| elif system == "Windows":
|
| try:
|
| ctypes.windll.psapi.EmptyWorkingSet(ctypes.c_void_p(-1))
|
| ctypes.windll.kernel32.SetProcessWorkingSetSize(
|
| ctypes.c_void_p(-1), ctypes.c_size_t(-1), ctypes.c_size_t(-1)
|
| )
|
| except:
|
| pass
|
|
|
| mem_after = process.memory_info().rss
|
| freed = (mem_before - mem_after) / (1024 * 1024)
|
| print(f"[EMERGENCY] {_i18n('emeergency_ram')}: {freed:.1f} MB")
|
|
|
| return max(0, freed)
|
|
|
| def linux_nuclear_ram_clear():
|
| """Linux-специфичная очистка: malloc_trim + madvise"""
|
| try:
|
| libc = ctypes.CDLL('libc.so.6')
|
|
|
|
|
| libc.malloc_trim(0)
|
|
|
|
|
| try:
|
| with open('/proc/sys/vm/drop_caches', 'w') as f:
|
| f.write('1')
|
| except (PermissionError, IOError):
|
| pass
|
|
|
|
|
| MADV_DONTNEED = 4
|
|
|
| for obj in gc.get_objects():
|
| try:
|
| if isinstance(obj, np.ndarray) and obj.size > 100000:
|
|
|
| addr = obj.ctypes.data
|
| size = obj.size * obj.itemsize
|
| libc.madvise(
|
| ctypes.c_void_p(addr),
|
| ctypes.c_size_t(size),
|
| MADV_DONTNEED
|
| )
|
| except:
|
| pass
|
|
|
| except Exception as e:
|
| pass
|
|
|
| def windows_nuclear_ram_clear():
|
| """Windows-специфичная очистка: EmptyWorkingSet + SetProcessWorkingSetSize"""
|
| try:
|
| kernel32 = ctypes.windll.kernel32
|
| psapi = ctypes.windll.psapi
|
|
|
| current_process = ctypes.c_void_p(-1)
|
|
|
| try:
|
| psapi.EmptyWorkingSet(current_process)
|
| except:
|
| pass
|
|
|
| try:
|
| kernel32.SetProcessWorkingSetSize(
|
| current_process,
|
| ctypes.c_size_t(-1),
|
| ctypes.c_size_t(-1)
|
| )
|
| except:
|
| pass
|
|
|
| try:
|
| msvcrt = ctypes.cdll.msvcrt
|
| msvcrt._heapmin()
|
| except:
|
| pass
|
|
|
| MEM_RESET = 0x00080000
|
| PAGE_READWRITE = 0x04
|
|
|
| for obj in gc.get_objects():
|
| try:
|
| if isinstance(obj, np.ndarray) and obj.size > 100000:
|
| addr = obj.ctypes.data
|
| size = obj.size * obj.itemsize
|
| kernel32.VirtualAlloc(
|
| ctypes.c_void_p(addr),
|
| ctypes.c_size_t(size),
|
| MEM_RESET,
|
| PAGE_READWRITE
|
| )
|
| except:
|
| pass
|
|
|
| except Exception as e:
|
| pass
|
|
|
| def nuclear_clear_model():
|
| """
|
| Ядерная очистка RAM. Возвращает освобожденные МБ.
|
|
|
| Args:
|
| aggressive: Если True, использует платформенно-специфичные вызовы
|
| """
|
| process = psutil.Process(os.getpid())
|
| mem_before = process.memory_info().rss
|
|
|
| gc.set_threshold(1, 1, 1)
|
| for generation in [0, 1, 2]:
|
| gc.collect(generation)
|
| gc.set_threshold(700, 10, 10)
|
|
|
| system = platform.system()
|
| if system == "Linux":
|
| linux_nuclear_ram_clear()
|
| elif system == "Windows":
|
| windows_nuclear_ram_clear()
|
|
|
| gc.collect()
|
| gc.collect()
|
|
|
| mem_after = process.memory_info().rss
|
| freed_mb = (mem_before - mem_after) / (1024 * 1024)
|
|
|
| if freed_mb > 0:
|
| print(f"[NUCLEAR] {_i18n('freed_ram')}: {freed_mb:.1f} MB")
|
|
|
| return max(0, freed_mb)
|
|
|
| def extra_clear_torch_cache():
|
| gc.collect()
|
|
|
| if torch.cuda.is_available():
|
| torch.cuda.empty_cache()
|
| torch.cuda.ipc_collect()
|
| if hasattr(torch._C, "_cuda_clearCublasWorkspaces"):
|
| try:
|
| torch._C._cuda_clearCublasWorkspaces()
|
| except Exception: pass
|
|
|
| if hasattr(torch, "compiler") and hasattr(torch.compiler, "reset"):
|
| try:
|
| torch.compiler.reset()
|
| except Exception: pass
|
|
|
| if hasattr(torch._C, "_clear_dynamo_cache"):
|
| try:
|
| torch._C._clear_dynamo_cache()
|
| except Exception: pass
|
|
|
| if hasattr(torch._C, "_clear_cpu_caches"):
|
| torch._C._clear_cpu_caches()
|
|
|
| if hasattr(torch._C, "clear_autocast_cache"):
|
| torch._C.clear_autocast_cache()
|
|
|
| if hasattr(torch._C, "_jit_clear_class_registry"):
|
| torch._C._jit_clear_class_registry()
|
|
|
| if hasattr(torch._C, "_jit_pass_onnx_clear_scope_records"):
|
| try:
|
| torch._C._jit_pass_onnx_clear_scope_records()
|
| except Exception: pass |