mvsepless_zero_gpu / extra_utils.py
noblebarkrr's picture
Google Drive support added + minifixes
65f2f6a verified
import os
from typing import List, Optional, Tuple, Union, Any, Dict, Literal, Callable
import shutil
from concurrent.futures import ThreadPoolExecutor, as_completed
from tqdm import tqdm
import json
from datetime import datetime
import requests
from pathlib import Path
import gradio as gr
import gc
from i18n import _i18n
from namer import Namer
import time
from datetime import timezone, timedelta
from audio import get_audio_files_from_list
import psutil
import os
import ctypes
import platform
import numpy as np
import yt_dlp
import subprocess
try:
import spaces
except ImportError:
spaces = None
zerogpu_available = False
def hf_spaces_gpu(*args, **kwargs):
"""Декоратор для GPU на HF Spaces с fallback для локального запуска"""
if len(args) == 1 and callable(args[0]):
return args[0]
return lambda f: f
if spaces is not None:
try:
if hasattr(spaces, 'GPU'):
zerogpu_available = True
print(_i18n("zerogpu=true"))
hf_spaces_gpu = spaces.GPU
except:
pass # Если что-то пошло не так, оставляем заглушку
import torch
tz = timezone(timedelta(hours=3))
def get_gdrive_dir():
try:
result = subprocess.run(['/bin/mount'], capture_output=True, text=True)
for line in result.stdout.strip().split('\n'):
if 'type fuse.drive' in line:
parts = line.split(' type ')
if len(parts) >= 2:
source_mount = parts[0]
source, mount_point = source_mount.split(' on ')
return mount_point
except:
pass
return None
def easy_check_is_colab() -> bool:
"""
Проверить, выполняется ли код в Google Colab
Returns:
True если в Colab
"""
if platform.machine() == "x86_64" and "Linux" in platform.platform():
try:
import google.colab
module_path: str = google.colab.__file__
if module_path.startswith("/usr/local/lib/python") and module_path.endswith("/dist-packages/google/colab/__init__.py"):
return True
else:
return False
except ImportError:
return False
else:
return False
class DownloadError(Exception): pass
base_c_params = {
"input_file": {
"interactive": True,
"type": "filepath",
"file_count": "single"
},
"input_files_multi": {
"interactive": True,
"type": "filepath",
"file_count": "multiple"
},
"output_audio": {
"interactive": False,
"type": "filepath",
"show_download_button": True
},
"base": {
"interactive": True
},
"dropdown_multi": {
"interactive": True,
"multiselect": True
},
"dropdown": {
"interactive": True,
"multiselect": False
}
}
def size_readable(size_bytes: int):
if size_bytes == 0:
return f"0 {_i18n('bytes')}"
# Единицы измерения
units = (_i18n('bytes'), _i18n('kbytes'), _i18n('mbytes'), _i18n('gbytes'), _i18n('tbytes'))
i = 0
# Делим на 1024, пока размер > 1024
while size_bytes >= 1024 and i < len(units) - 1:
size_bytes /= 1024
i += 1
return f"{size_bytes:.2f} {units[i]}"
def get_size_folder(folder: str | Path):
folder_path = Path(folder)
return sum([file.stat().st_size for file in folder_path.rglob('*') if file.is_file()])
def get_disk_usage(path="/content/drive/MyDrive", user_dir="", user_gdrive_dir="", list_subdirs=[]):
try:
usage = shutil.disk_usage(path)
total_gb = size_readable(usage.total)
used_gb = size_readable(usage.used)
free_gb = size_readable(usage.free)
return f"""{_i18n("all_space")}: {total_gb}
{_i18n("used_space")}: {used_gb}
{_i18n("free_space")}: {free_gb}"""
except Exception as e:
return ""
def define_audio_with_size(basename: bool = False, **kwargs):
path = kwargs.get("value", None)
if not path:
return gr.update(**kwargs)
file_path = Path(path)
if "label" in kwargs:
temp_label = f"[{size_readable(file_path.stat().st_size)}] " + (file_path.stem if basename else kwargs["label"])
kwargs["label"] = temp_label
return gr.Audio(**kwargs)
def update_audio_with_size(basename: bool = False, **kwargs):
path = kwargs.get("value", None)
if not path:
return gr.update(**kwargs)
file_path = Path(path)
if "label" in kwargs:
temp_label = f"[{size_readable(file_path.stat().st_size)}] " + (file_path.stem if basename else kwargs["label"])
kwargs["label"] = temp_label
return gr.update(**kwargs)
class DownloadError(Exception):
"""Custom exception for download errors"""
pass
def format_size(size_bytes: int) -> str:
"""
Format file size with appropriate units using i18n
Args:
size_bytes: Size in bytes
Returns:
Formatted string with units
"""
if size_bytes < 1024:
return f"{size_bytes} {_i18n('bytes')}"
elif size_bytes < 1024**2:
return f"{size_bytes / 1024:.2f} {_i18n('kbytes')}"
elif size_bytes < 1024**3:
return f"{size_bytes / (1024**2):.2f} {_i18n('mbytes')}"
elif size_bytes < 1024**4:
return f"{size_bytes / (1024**3):.2f} {_i18n('gbytes')}"
else:
return f"{size_bytes / (1024**4):.2f} {_i18n('tbytes')}"
def dw_file(
url_model: str,
local_path: Union[str, Path],
retries: int = 180,
timeout: int = 300,
chunk_size: int = 8192,
progress_callback: Optional[Callable[[int, int], None]] = None
) -> None:
"""
Download file with resume support and hash verification
Args:
url_model: File URL
local_path: Local path for saving
retries: Number of retry attempts
timeout: Request timeout in seconds
chunk_size: Download chunk size in bytes
resume: Enable resume for partial downloads
expected_hash: Expected hash value (if None and auto_detect_hash=True, try to get from server)
hash_algorithm: Hash algorithm to use (md5, sha1, sha256, sha512)
auto_detect_hash: Try to get hash from server automatically
verify_after_download: Verify hash after download completion
progress_callback: Optional callback for progress updates (current, total)
Raises:
DownloadError: If download fails or hash verification fails
"""
local_path_ = Path(local_path)
local_path_.parent.mkdir(parents=True, exist_ok=True)
headers = {}
for attempt in range(retries):
try:
with requests.Session() as session:
session.headers.update({
"User-Agent": "Mozilla/5.0 (compatible; MVSepless/1.0)"
})
response = session.get(
url_model,
stream=True,
timeout=timeout,
headers=headers
)
# Handle response status
if response.status_code == 200:
# Full download (not resumed)
total_size = int(response.headers.get("content-length", 0))
mode = "wb"
initial_progress = 0
print(f"[{_i18n('status')}] {_i18n('download_start')} {format_size(total_size)}")
else:
raise DownloadError(f"HTTP {response.status_code}")
# Download with progress bar
with tqdm(
total=total_size,
desc=local_path_.name,
unit="B",
unit_scale=True,
unit_divisor=1024,
initial=initial_progress,
bar_format="{l_bar}{bar}| {n_fmt}/{total_fmt} [{elapsed}<{remaining}, {rate_fmt}]"
) as progress_bar:
with open(local_path_, mode) as f:
for chunk in response.iter_content(chunk_size=chunk_size):
if chunk:
f.write(chunk)
progress_bar.update(len(chunk))
if progress_callback:
progress_callback(progress_bar.n, total_size)
print(f"[{_i18n('status')}] ✓ {_i18n('download_complete')}: {local_path_}")
return
except (requests.RequestException, DownloadError) as e:
print(_i18n(
"download_attempt_failed",
attempt=attempt + 1,
retries=retries,
error=str(e)
))
if attempt < retries - 1:
print(_i18n("retrying"))
else:
print(_i18n("all_download_attempts_failed"))
raise DownloadError(f"{_i18n('download_error', error=str(e))}")
def dw_yt_dlp(
url: str,
output_dir: str | Path = None,
cookie: str | Path = None,
output_format: str = "mp3",
output_bitrate: str = "320",
title: str = None,
) -> str:
"""
Скачать аудио с YouTube с помощью yt-dlp
Args:
url: URL видео
output_dir: Директория для сохранения
cookie: Путь к файлу с cookies
output_format: Формат выходного файла
output_bitrate: Битрейт
title: Название файла
Returns:
Путь к скачанному файлу или None
"""
if not output_dir:
output_dir = Path(".")
output_dir = Path(output_dir)
output_dir.mkdir(parents=True, exist_ok=True)
outtmpl = "%(title)s.%(ext)s" if title is None else f"{title}.%(ext)s"
output_path_template = output_dir / outtmpl
ydl_opts = {
"format": "bestaudio/best",
"outtmpl": str(output_path_template),
"postprocessors": [
{
"key": "FFmpegExtractAudio",
"preferredcodec": output_format,
"preferredquality": output_bitrate,
}
],
"noplaylist": True,
"quiet": True,
"no_warnings": True,
}
if cookie:
cookie_path = Path(cookie)
if cookie_path.exists():
ydl_opts["cookiefile"] = cookie
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
try:
info = ydl.extract_info(url, process=True, download=True)
if "_type" in info and info["_type"] == "playlist":
entry = info["entries"][0]
filename = ydl.prepare_filename(entry)
else:
filename = ydl.prepare_filename(info)
output_file = Path(filename)
audio_file = output_file.with_suffix(f".{output_format}")
return audio_file.as_posix()
except Exception as e:
print(_i18n("download_error", error=str(e)))
return None
def one_element_list_to_value(input_list: list | tuple):
if input_list:
return input_list[0]
else:
return None
def emergency_ram_clear():
"""
Экстренная очистка — удаление ВСЕХ больших объектов в процессе.
Использовать только если nuclear_clear_model недостаточно.
"""
process = psutil.Process(os.getpid())
mem_before = process.memory_info().rss
# Удаляем все numpy массивы и тензоры из globals
for name in list(globals().keys()):
try:
obj = globals()[name]
if isinstance(obj, (np.ndarray, torch.Tensor)):
if isinstance(obj, np.ndarray):
obj.fill(0)
del globals()[name]
except:
pass
for name in list(locals().keys()):
try:
obj = locals()[name]
if isinstance(obj, (np.ndarray, torch.Tensor)) and name != 'self':
if isinstance(obj, np.ndarray):
obj.fill(0)
del locals()[name]
except:
pass
for _ in range(5):
gc.collect()
system = platform.system()
if system == "Linux":
try:
ctypes.CDLL('libc.so.6').malloc_trim(0)
except:
pass
elif system == "Windows":
try:
ctypes.windll.psapi.EmptyWorkingSet(ctypes.c_void_p(-1))
ctypes.windll.kernel32.SetProcessWorkingSetSize(
ctypes.c_void_p(-1), ctypes.c_size_t(-1), ctypes.c_size_t(-1)
)
except:
pass
mem_after = process.memory_info().rss
freed = (mem_before - mem_after) / (1024 * 1024)
print(f"[EMERGENCY] {_i18n('emeergency_ram')}: {freed:.1f} MB")
return max(0, freed)
def linux_nuclear_ram_clear():
"""Linux-специфичная очистка: malloc_trim + madvise"""
try:
libc = ctypes.CDLL('libc.so.6')
# malloc_trim(0) — возврат памяти из heap в ОС
libc.malloc_trim(0)
# Попытка очистить page cache (требует root, игнорируем ошибки)
try:
with open('/proc/sys/vm/drop_caches', 'w') as f:
f.write('1')
except (PermissionError, IOError):
pass
# MADV_DONTNEED для больших numpy массивов в процессе
MADV_DONTNEED = 4
for obj in gc.get_objects():
try:
if isinstance(obj, np.ndarray) and obj.size > 100000:
# Помечаем страницы как "не нужны"
addr = obj.ctypes.data
size = obj.size * obj.itemsize
libc.madvise(
ctypes.c_void_p(addr),
ctypes.c_size_t(size),
MADV_DONTNEED
)
except:
pass
except Exception as e:
pass # Не критично если не сработало
def windows_nuclear_ram_clear():
"""Windows-специфичная очистка: EmptyWorkingSet + SetProcessWorkingSetSize"""
try:
kernel32 = ctypes.windll.kernel32
psapi = ctypes.windll.psapi
current_process = ctypes.c_void_p(-1)
try:
psapi.EmptyWorkingSet(current_process)
except:
pass
try:
kernel32.SetProcessWorkingSetSize(
current_process,
ctypes.c_size_t(-1),
ctypes.c_size_t(-1)
)
except:
pass
try:
msvcrt = ctypes.cdll.msvcrt
msvcrt._heapmin()
except:
pass
MEM_RESET = 0x00080000
PAGE_READWRITE = 0x04
for obj in gc.get_objects():
try:
if isinstance(obj, np.ndarray) and obj.size > 100000:
addr = obj.ctypes.data
size = obj.size * obj.itemsize
kernel32.VirtualAlloc(
ctypes.c_void_p(addr),
ctypes.c_size_t(size),
MEM_RESET,
PAGE_READWRITE
)
except:
pass
except Exception as e:
pass
def nuclear_clear_model():
"""
Ядерная очистка RAM. Возвращает освобожденные МБ.
Args:
aggressive: Если True, использует платформенно-специфичные вызовы
"""
process = psutil.Process(os.getpid())
mem_before = process.memory_info().rss
gc.set_threshold(1, 1, 1)
for generation in [0, 1, 2]:
gc.collect(generation)
gc.set_threshold(700, 10, 10)
system = platform.system()
if system == "Linux":
linux_nuclear_ram_clear()
elif system == "Windows":
windows_nuclear_ram_clear()
gc.collect()
gc.collect()
mem_after = process.memory_info().rss
freed_mb = (mem_before - mem_after) / (1024 * 1024)
if freed_mb > 0:
print(f"[NUCLEAR] {_i18n('freed_ram')}: {freed_mb:.1f} MB")
return max(0, freed_mb)
def extra_clear_torch_cache():
gc.collect()
if torch.cuda.is_available():
torch.cuda.empty_cache()
torch.cuda.ipc_collect()
if hasattr(torch._C, "_cuda_clearCublasWorkspaces"):
try:
torch._C._cuda_clearCublasWorkspaces()
except Exception: pass
if hasattr(torch, "compiler") and hasattr(torch.compiler, "reset"):
try:
torch.compiler.reset()
except Exception: pass
if hasattr(torch._C, "_clear_dynamo_cache"):
try:
torch._C._clear_dynamo_cache()
except Exception: pass
if hasattr(torch._C, "_clear_cpu_caches"):
torch._C._clear_cpu_caches()
if hasattr(torch._C, "clear_autocast_cache"):
torch._C.clear_autocast_cache()
if hasattr(torch._C, "_jit_clear_class_registry"):
torch._C._jit_clear_class_registry()
if hasattr(torch._C, "_jit_pass_onnx_clear_scope_records"):
try:
torch._C._jit_pass_onnx_clear_scope_records()
except Exception: pass