""" Вспомогательные утилиты: процессы, архивы, сеть, чтение потоков. """ import os import re import threading import time from os import name as os_name from pathlib import Path from shutil import move as shutil_move from subprocess import ( PIPE, Popen, STDOUT, TimeoutExpired, check_output, CalledProcessError, ) from sys import stdout as sys_stdout from typing import TypedDict from urllib.parse import unquote, urlparse from requests import get as get_url, head as get_head from requests.structures import CaseInsensitiveDict from ._config import WORK_FOLDER, HEADERS from ._logger import logger # --------------------------------------------------------------------------- # Типы # --------------------------------------------------------------------------- class RunResult(TypedDict): status_code: int output: str # --------------------------------------------------------------------------- # Процессы # --------------------------------------------------------------------------- def run( command: str, cwd: str | Path | None = None, live_output: bool = False, timeout: float | None = None, ) -> RunResult: """ Выполняет shell-команду, возвращает код выхода и вывод. Args: command: Shell-команда. cwd: Рабочая директория (None — текущая). live_output: Выводить прогресс в реальном времени. timeout: Таймаут в секундах (None — без ограничений). Returns: RunResult: {'status_code': int, 'output': str}. При таймауте status_code == -1. """ _encodings = ['utf-8', 'iso-8859-5', 'windows-1251', 'cp866', 'koi8-r', 'mac_cyrillic'] _progress_re = re.compile(r'\d+%|\d+/\d+') def _decode(data: bytes) -> str: for enc in _encodings: try: return data.decode(enc) except UnicodeDecodeError: continue return data.decode('utf-8', errors='replace') process = Popen(command, shell=True, cwd=cwd, stdout=PIPE, stderr=STDOUT) # Режим без живого вывода: communicate() с поддержкой таймаута if not live_output: try: stdout_data, _ = process.communicate(timeout=timeout) except TimeoutExpired: process.kill() process.communicate() msg = f'Таймаут ({timeout}с): {command}' logger.warning(msg) return RunResult(status_code=-1, output=msg) lines = [ln.strip() for ln in _decode(stdout_data).splitlines() if ln.strip()] return RunResult(status_code=process.returncode, output='\n'.join(lines)) # Режим живого вывода (для ручного использования, таймаут приблизительный) final_output: list[str] = [] last_progress = '' deadline = time.time() + timeout if timeout else None for raw_line in iter(process.stdout.readline, b''): # type: ignore[union-attr] if deadline and time.time() > deadline: process.kill() process.wait() final_output.append(f'[ТАЙМАУТ после {timeout}с]') break line = _decode(raw_line.strip()) if _progress_re.search(line): last_progress = line sys_stdout.write('\r' + line) sys_stdout.flush() else: final_output.append(line) sys_stdout.write(line + '\n') sys_stdout.flush() try: process.wait(timeout=5) except TimeoutExpired: process.kill() process.wait() if last_progress: final_output.append(last_progress) return RunResult(status_code=process.returncode, output='\n'.join(final_output)) def kill_process_by_name(name: str) -> None: """ Завершает процессы текущего пользователя по подстроке имени. Ограничение по UID предотвращает случайное завершение чужих процессов. """ run(f'pkill -TERM -u "$(id -u)" -f "{name}" 2>/dev/null; true', timeout=5) def is_process_running(process_name: str | Path) -> bool: try: output = check_output(['pgrep', '-f', str(process_name)], text=True) return bool(output.strip()) except CalledProcessError: return False def terminate_process(process_name: str, process_obj: Popen) -> None: """ Корректно завершает процесс туннеля. Сначала SIGTERM + wait(5с), при зависании — SIGKILL. """ for stream in (process_obj.stdout, process_obj.stderr, process_obj.stdin): if stream: try: stream.close() except Exception: pass try: process_obj.terminate() process_obj.wait(timeout=5) except TimeoutExpired: process_obj.kill() process_obj.wait() except Exception: pass kill_process_by_name(process_name) def _drain_raw(raw_stream: object) -> None: """Сливает данные из сырого потока в никуда, предотвращая блокировку пайпа.""" try: os.set_blocking(raw_stream.fileno(), True) # type: ignore[attr-defined] except Exception: pass try: while True: data = raw_stream.read(4096) # type: ignore[attr-defined] if not data: break except Exception: pass def drain_process_output(process: Popen) -> None: """ Запускает daemon-потоки для слива stdout/stderr долгоживущего процесса. Без этого туннельный процесс заблокируется, когда системный буфер пайпа (обычно 64 КБ) заполнится непрочитанными логами. """ for stream in (process.stdout, process.stderr): if stream: raw = getattr(stream, 'raw', stream) t = threading.Thread(target=_drain_raw, args=(raw,), daemon=True) t.start() def read_until_pattern( process: Popen, url_pattern: str, timeout: float = 20.0, read_from_stderr: bool = False, read_both_streams: bool = False, ) -> tuple[str, str]: """ Читает вывод процесса с таймаутом, ища URL-паттерн. Использует блокирующие read() в daemon-потоках вместо select() + неблокирующих read(). select()-подход даёт гонку при быстрых коротких записях (< 1мс): select сигнализирует «readable», но к моменту вызова read() данные ещё не попали в буфер пайпа — read возвращает пусто. Блокирующий read в потоке этой гонки не имеет: он просто ждёт данных. После нахождения URL потоки автоматически переходят в режим дренирования: продолжают читать и отбрасывать данные, предотвращая переполнение системного буфера пайпа (обычно 64 КБ) за время жизни туннеля. Args: process: Запущенный Popen с открытыми потоками PIPE. url_pattern: Regex для поиска URL. timeout: Максимальное время ожидания (сек). read_from_stderr: Читать из stderr (иначе stdout). read_both_streams: Читать из обоих потоков одновременно (для SSH). Returns: Кортеж (matched_url, full_output_so_far). Raises: RuntimeError: Паттерн не найден за timeout секунд. """ import queue as _queue if read_both_streams: active = [s for s in (process.stdout, process.stderr) if s] elif read_from_stderr: active = [process.stderr] if process.stderr else [] else: active = [process.stdout] if process.stdout else [] if not active: raise RuntimeError('Потоки вывода процесса недоступны.') q: _queue.Queue[bytes | None] = _queue.Queue() drain_mode = threading.Event() def _reader(stream) -> None: """ Блокирующий читатель в daemon-потоке. Пока URL не найден — пушит чанки в очередь. После установки drain_mode — читает и отбрасывает (дренирование). """ raw = getattr(stream, 'raw', stream) try: # Явно возвращаем в блокирующий режим на случай, # если где-то раньше FD был переведён в неблокирующий os.set_blocking(raw.fileno(), True) except Exception: pass try: while True: chunk = raw.read(4096) if not chunk: # EOF — процесс закрыл поток break if drain_mode.is_set(): pass # Дренирование: читаем и отбрасываем else: q.put(chunk) # Ищем URL: передаём в очередь except Exception: pass finally: # Сигнал EOF только если мы всё ещё ищем URL if not drain_mode.is_set(): q.put(None) for stream in active: t = threading.Thread(target=_reader, args=(stream,), daemon=True) t.start() buffer = b'' deadline = time.time() + timeout n_sentinels = 0 while time.time() < deadline: remaining = max(0.01, deadline - time.time()) try: item = q.get(timeout=min(remaining, 0.5)) except _queue.Empty: if process.poll() is not None: # Процесс завершился — дочитываем остатки из очереди while True: try: item = q.get_nowait() except _queue.Empty: break if item is None: n_sentinels += 1 else: buffer += item break continue if item is None: n_sentinels += 1 if n_sentinels >= len(active): break # Все читатели достигли EOF continue buffer += item text = buffer.decode('utf-8', errors='ignore') match = re.search(url_pattern, text) if match: drain_mode.set() # Переключаем потоки в режим дренирования return match.group(), text raise RuntimeError( f'URL не найден за {timeout} сек. Вывод процесса:\n' + buffer.decode('utf-8', errors='ignore') ) def move_path(old_path: Path | str, new_path: Path | str) -> None: old, new = Path(old_path), Path(new_path) if not old.exists(): raise RuntimeError(f'Не найден исходный путь для перемещения: {old}') try: old.replace(new) except Exception: try: shutil_move(str(old), str(new)) except Exception: if os_name == 'posix': run(f'mv "{old}" "{new}"') else: run(f'move "{old}" "{new}"') # --------------------------------------------------------------------------- # Файлы и архивы # --------------------------------------------------------------------------- def _snapshot(folder: Path) -> set[Path]: """Рекурсивный снимок всех файлов в директории (включая вложенные папки).""" if not folder.exists(): return set() return {f for f in folder.rglob('*') if f.is_file()} def determine_archive_format(filepath: str | Path) -> str | None: """Определяет формат архива по сигнатуре байт (magic bytes).""" filepath = Path(filepath) zip_sig = bytes([0x50, 0x4B, 0x03, 0x04]) seven_z = bytes([0x37, 0x7A, 0xBC, 0xAF, 0x27, 0x1C]) lzma_xz = bytes([0xFD, 0x37, 0x7A, 0x58, 0x5A]) tgz = bytes([0x1F, 0x8B]) tbz = bytes([0x42, 0x5A, 0x68]) ustar = bytes([0x75, 0x73, 0x74, 0x61, 0x72]) with filepath.open('rb') as fh: header = fh.read(262) if header.startswith(zip_sig): return 'zip' if header.startswith(seven_z): return '7z' if header.startswith(lzma_xz): return '7z' if header.startswith(tgz): return 'tar.gz' if header.startswith(tbz): return 'tar.bz2' if header[0x101:0x101 + len(ustar)] == ustar: return 'tar' return None def unpack_archive( archive_path: str | Path, dest_path: str | Path, rm_archive: bool = True, ) -> None: """ Распаковывает архив в указанную директорию. Поддерживает: tar, tar.gz, tar.bz2, tar.xz, gz (одиночный файл), zip, 7z. При неудаче основного метода делает fallback к 7z. Принимает URL — скачивает архив сам. """ if str(archive_path).startswith(('https://', 'http://')): archive_path = download(str(archive_path), save_path=WORK_FOLDER, progress=False) archive_path = Path(archive_path) dest_path = Path(dest_path) if not archive_path.exists(): raise RuntimeError(f'Архив не найден: {archive_path}') fmt = determine_archive_format(archive_path) dest_path.mkdir(parents=True, exist_ok=True) def _rm() -> None: if rm_archive: archive_path.unlink(missing_ok=True) def _try_7z() -> None: result = run( f'7z -bso0 -bd -mmt4 -slp -y x "{archive_path}" -o"{dest_path}"', timeout=180, ) if result['status_code'] != 0: raise RuntimeError( f'Не удалось распаковать {archive_path.name}.\n{result["output"]}' ) _rm() try: if fmt == 'zip' or archive_path.suffix == '.zip': r = run(f'unzip -o "{archive_path}" -d "{dest_path}"', timeout=180) if r['status_code'] != 0: raise RuntimeError(r['output']) _rm() elif fmt in ('7z',) or archive_path.suffix == '.7z': r = run( f'7z -bso0 -bd -slp -y x "{archive_path}" -o"{dest_path}"', timeout=180, ) if r['status_code'] != 0: raise RuntimeError(r['output']) _rm() elif fmt in ('tar', 'tar.gz', 'tar.bz2') or archive_path.suffix in ( '.tar', '.tar.gz', '.tgz', '.tar.bz2', '.tar.xz', '.txz', '.gz', ): # GNU tar определяет формат сжатия автоматически через -xpf r = run(f'tar -xpf "{archive_path}" -C "{dest_path}"', timeout=180) if r['status_code'] != 0: # Возможно, это одиночный gzip-файл без tar-обёртки (напр. tmole-linux.gz) gz_r = run(f'gzip -df "{archive_path}"', timeout=60) if gz_r['status_code'] != 0: raise RuntimeError( f'tar: {r["output"]}\ngzip: {gz_r["output"]}' ) # gzip -f уже удалил .gz и создал распакованный файл на месте return _rm() else: _try_7z() except RuntimeError as e: logger.debug(f'Основной метод распаковки не сработал ({e}), fallback к 7z.') _try_7z() # --------------------------------------------------------------------------- # Сеть # --------------------------------------------------------------------------- def is_ipv4(address: str) -> bool: pattern = re.compile( r'^((25[0-5]|2[0-4]\d|[01]?\d\d?)\.){3}(25[0-5]|2[0-4]\d|[01]?\d\d?)$' ) return bool(pattern.match(address)) def is_valid_url(url: str, custom_headers: dict | None = None) -> bool: """ Проверяет доступность URL (HEAD или GET запрос). Returns: True, если сервер вернул HTTP 2xx. """ req_headers = HEADERS.copy() if custom_headers: req_headers.update(custom_headers) for _ in range(2): try: with get_url(url, headers=req_headers, allow_redirects=True, stream=True, timeout=10) as resp: if 200 <= resp.status_code < 300: return True except Exception: try: resp = get_head(url, headers=req_headers, allow_redirects=True, timeout=10) if 200 <= resp.status_code < 300: return True except Exception: pass else: break return False def get_filename_from_headers(headers: CaseInsensitiveDict) -> str | None: cd = headers.get('content-disposition') if not cd: return headers.get('filename') for part in cd.split(';'): part = part.strip() if part.startswith('filename*='): encoding, _, encoded = part[len('filename*='):].partition("''") return unquote(encoded, encoding=encoding) if part.startswith('filename='): return part[len('filename='):].strip('"') return None def download( url: str, filename: str | Path | None = None, save_path: str | Path | None = None, progress: bool = True, extra_headers: dict[str, str] | None = None, ) -> Path: """ Скачивает файл по URL. Args: url: URL файла. filename: Имя для сохранения (None — из заголовков или URL). save_path: Папка сохранения (None — текущая директория). progress: Показывать прогресс загрузки. extra_headers: Дополнительные HTTP-заголовки. Returns: Path к скачанному файлу. Raises: RuntimeError: При ошибке сети или записи файла. """ save_path = Path(save_path) if save_path else Path.cwd() save_path.mkdir(parents=True, exist_ok=True) req_headers = HEADERS.copy() if extra_headers: req_headers.update(extra_headers) try: resp = get_url(url, stream=True, allow_redirects=True, headers=req_headers, timeout=60) resp.raise_for_status() except Exception as e: raise RuntimeError(f'Не удалось скачать файл по ссылке {url}:\n{e}') file_size = int(resp.headers.get('content-length', 0)) raw_name = ( str(filename) if filename else (get_filename_from_headers(resp.headers) or Path(urlparse(resp.url).path).name) ) file_name = Path(raw_name).name file_path = save_path / file_name chunk_size = max(4096, file_size // 2000) if file_size else 4096 downloaded = 0 start = time.time() try: with open(file_path, 'wb') as fp: for chunk in resp.iter_content(chunk_size=chunk_size): if chunk: fp.write(chunk) if progress and file_size: downloaded += len(chunk) elapsed = time.time() - start print( f'\rзагрузка {file_name}: ' f'{downloaded / file_size * 100:.2f}% | {elapsed:.2f} сек.', end='', ) except Exception as e: raise RuntimeError(f'Не удалось сохранить файл из {url}:\n{e}') if progress and file_size: print() logger.debug(f'Скачан: {file_path} ({file_size} байт)') return file_path def get_github_latest_release_url(repo: str, asset_pattern: str) -> str: """ Возвращает URL загрузки актуального релизного ассета с GitHub. Args: repo: Репозиторий в формате 'owner/repo'. asset_pattern: Регулярное выражение для имени ассета. Returns: browser_download_url для найденного ассета. Raises: RuntimeError: Если ассет не найден или GitHub API недоступен. """ api_url = f'https://api.github.com/repos/{repo}/releases/latest' resp = get_url(api_url, timeout=10) resp.raise_for_status() assets = resp.json().get('assets', []) for asset in assets: if re.search(asset_pattern, asset['name']): return asset['browser_download_url'] available = [a['name'] for a in assets] raise RuntimeError( f'Ассет "{asset_pattern}" не найден в {repo}. ' f'Доступные: {available}' )