cl / colab_tunnel /_utils.py
rottenstuff's picture
Upload 13 files
c0e2219 verified
Raw
History Blame Contribute Delete
22.6 kB
"""
Вспомогательные утилиты: процессы, архивы, сеть, чтение потоков.
"""
import os
import re
import threading
import time
from os import name as os_name
from pathlib import Path
from shutil import move as shutil_move
from subprocess import (
PIPE, Popen, STDOUT, TimeoutExpired,
check_output, CalledProcessError,
)
from sys import stdout as sys_stdout
from typing import TypedDict
from urllib.parse import unquote, urlparse
from requests import get as get_url, head as get_head
from requests.structures import CaseInsensitiveDict
from ._config import WORK_FOLDER, HEADERS
from ._logger import logger
# ---------------------------------------------------------------------------
# Типы
# ---------------------------------------------------------------------------
class RunResult(TypedDict):
status_code: int
output: str
# ---------------------------------------------------------------------------
# Процессы
# ---------------------------------------------------------------------------
def run(
command: str,
cwd: str | Path | None = None,
live_output: bool = False,
timeout: float | None = None,
) -> RunResult:
"""
Выполняет shell-команду, возвращает код выхода и вывод.
Args:
command: Shell-команда.
cwd: Рабочая директория (None — текущая).
live_output: Выводить прогресс в реальном времени.
timeout: Таймаут в секундах (None — без ограничений).
Returns:
RunResult: {'status_code': int, 'output': str}.
При таймауте status_code == -1.
"""
_encodings = ['utf-8', 'iso-8859-5', 'windows-1251', 'cp866', 'koi8-r', 'mac_cyrillic']
_progress_re = re.compile(r'\d+%|\d+/\d+')
def _decode(data: bytes) -> str:
for enc in _encodings:
try:
return data.decode(enc)
except UnicodeDecodeError:
continue
return data.decode('utf-8', errors='replace')
process = Popen(command, shell=True, cwd=cwd, stdout=PIPE, stderr=STDOUT)
# Режим без живого вывода: communicate() с поддержкой таймаута
if not live_output:
try:
stdout_data, _ = process.communicate(timeout=timeout)
except TimeoutExpired:
process.kill()
process.communicate()
msg = f'Таймаут ({timeout}с): {command}'
logger.warning(msg)
return RunResult(status_code=-1, output=msg)
lines = [ln.strip() for ln in _decode(stdout_data).splitlines() if ln.strip()]
return RunResult(status_code=process.returncode, output='\n'.join(lines))
# Режим живого вывода (для ручного использования, таймаут приблизительный)
final_output: list[str] = []
last_progress = ''
deadline = time.time() + timeout if timeout else None
for raw_line in iter(process.stdout.readline, b''): # type: ignore[union-attr]
if deadline and time.time() > deadline:
process.kill()
process.wait()
final_output.append(f'[ТАЙМАУТ после {timeout}с]')
break
line = _decode(raw_line.strip())
if _progress_re.search(line):
last_progress = line
sys_stdout.write('\r' + line)
sys_stdout.flush()
else:
final_output.append(line)
sys_stdout.write(line + '\n')
sys_stdout.flush()
try:
process.wait(timeout=5)
except TimeoutExpired:
process.kill()
process.wait()
if last_progress:
final_output.append(last_progress)
return RunResult(status_code=process.returncode, output='\n'.join(final_output))
def kill_process_by_name(name: str) -> None:
"""
Завершает процессы текущего пользователя по подстроке имени.
Ограничение по UID предотвращает случайное завершение чужих процессов.
"""
run(f'pkill -TERM -u "$(id -u)" -f "{name}" 2>/dev/null; true', timeout=5)
def is_process_running(process_name: str | Path) -> bool:
try:
output = check_output(['pgrep', '-f', str(process_name)], text=True)
return bool(output.strip())
except CalledProcessError:
return False
def terminate_process(process_name: str, process_obj: Popen) -> None:
"""
Корректно завершает процесс туннеля.
Сначала SIGTERM + wait(5с), при зависании — SIGKILL.
"""
for stream in (process_obj.stdout, process_obj.stderr, process_obj.stdin):
if stream:
try:
stream.close()
except Exception:
pass
try:
process_obj.terminate()
process_obj.wait(timeout=5)
except TimeoutExpired:
process_obj.kill()
process_obj.wait()
except Exception:
pass
kill_process_by_name(process_name)
def _drain_raw(raw_stream: object) -> None:
"""Сливает данные из сырого потока в никуда, предотвращая блокировку пайпа."""
try:
os.set_blocking(raw_stream.fileno(), True) # type: ignore[attr-defined]
except Exception:
pass
try:
while True:
data = raw_stream.read(4096) # type: ignore[attr-defined]
if not data:
break
except Exception:
pass
def drain_process_output(process: Popen) -> None:
"""
Запускает daemon-потоки для слива stdout/stderr долгоживущего процесса.
Без этого туннельный процесс заблокируется, когда системный буфер
пайпа (обычно 64 КБ) заполнится непрочитанными логами.
"""
for stream in (process.stdout, process.stderr):
if stream:
raw = getattr(stream, 'raw', stream)
t = threading.Thread(target=_drain_raw, args=(raw,), daemon=True)
t.start()
def read_until_pattern(
process: Popen,
url_pattern: str,
timeout: float = 20.0,
read_from_stderr: bool = False,
read_both_streams: bool = False,
) -> tuple[str, str]:
"""
Читает вывод процесса с таймаутом, ища URL-паттерн.
Использует блокирующие read() в daemon-потоках вместо select() +
неблокирующих read(). select()-подход даёт гонку при быстрых коротких
записях (< 1мс): select сигнализирует «readable», но к моменту вызова
read() данные ещё не попали в буфер пайпа — read возвращает пусто.
Блокирующий read в потоке этой гонки не имеет: он просто ждёт данных.
После нахождения URL потоки автоматически переходят в режим дренирования:
продолжают читать и отбрасывать данные, предотвращая переполнение
системного буфера пайпа (обычно 64 КБ) за время жизни туннеля.
Args:
process: Запущенный Popen с открытыми потоками PIPE.
url_pattern: Regex для поиска URL.
timeout: Максимальное время ожидания (сек).
read_from_stderr: Читать из stderr (иначе stdout).
read_both_streams: Читать из обоих потоков одновременно (для SSH).
Returns:
Кортеж (matched_url, full_output_so_far).
Raises:
RuntimeError: Паттерн не найден за timeout секунд.
"""
import queue as _queue
if read_both_streams:
active = [s for s in (process.stdout, process.stderr) if s]
elif read_from_stderr:
active = [process.stderr] if process.stderr else []
else:
active = [process.stdout] if process.stdout else []
if not active:
raise RuntimeError('Потоки вывода процесса недоступны.')
q: _queue.Queue[bytes | None] = _queue.Queue()
drain_mode = threading.Event()
def _reader(stream) -> None:
"""
Блокирующий читатель в daemon-потоке.
Пока URL не найден — пушит чанки в очередь.
После установки drain_mode — читает и отбрасывает (дренирование).
"""
raw = getattr(stream, 'raw', stream)
try:
# Явно возвращаем в блокирующий режим на случай,
# если где-то раньше FD был переведён в неблокирующий
os.set_blocking(raw.fileno(), True)
except Exception:
pass
try:
while True:
chunk = raw.read(4096)
if not chunk: # EOF — процесс закрыл поток
break
if drain_mode.is_set():
pass # Дренирование: читаем и отбрасываем
else:
q.put(chunk) # Ищем URL: передаём в очередь
except Exception:
pass
finally:
# Сигнал EOF только если мы всё ещё ищем URL
if not drain_mode.is_set():
q.put(None)
for stream in active:
t = threading.Thread(target=_reader, args=(stream,), daemon=True)
t.start()
buffer = b''
deadline = time.time() + timeout
n_sentinels = 0
while time.time() < deadline:
remaining = max(0.01, deadline - time.time())
try:
item = q.get(timeout=min(remaining, 0.5))
except _queue.Empty:
if process.poll() is not None:
# Процесс завершился — дочитываем остатки из очереди
while True:
try:
item = q.get_nowait()
except _queue.Empty:
break
if item is None:
n_sentinels += 1
else:
buffer += item
break
continue
if item is None:
n_sentinels += 1
if n_sentinels >= len(active):
break # Все читатели достигли EOF
continue
buffer += item
text = buffer.decode('utf-8', errors='ignore')
match = re.search(url_pattern, text)
if match:
drain_mode.set() # Переключаем потоки в режим дренирования
return match.group(), text
raise RuntimeError(
f'URL не найден за {timeout} сек. Вывод процесса:\n'
+ buffer.decode('utf-8', errors='ignore')
)
def move_path(old_path: Path | str, new_path: Path | str) -> None:
old, new = Path(old_path), Path(new_path)
if not old.exists():
raise RuntimeError(f'Не найден исходный путь для перемещения: {old}')
try:
old.replace(new)
except Exception:
try:
shutil_move(str(old), str(new))
except Exception:
if os_name == 'posix':
run(f'mv "{old}" "{new}"')
else:
run(f'move "{old}" "{new}"')
# ---------------------------------------------------------------------------
# Файлы и архивы
# ---------------------------------------------------------------------------
def _snapshot(folder: Path) -> set[Path]:
"""Рекурсивный снимок всех файлов в директории (включая вложенные папки)."""
if not folder.exists():
return set()
return {f for f in folder.rglob('*') if f.is_file()}
def determine_archive_format(filepath: str | Path) -> str | None:
"""Определяет формат архива по сигнатуре байт (magic bytes)."""
filepath = Path(filepath)
zip_sig = bytes([0x50, 0x4B, 0x03, 0x04])
seven_z = bytes([0x37, 0x7A, 0xBC, 0xAF, 0x27, 0x1C])
lzma_xz = bytes([0xFD, 0x37, 0x7A, 0x58, 0x5A])
tgz = bytes([0x1F, 0x8B])
tbz = bytes([0x42, 0x5A, 0x68])
ustar = bytes([0x75, 0x73, 0x74, 0x61, 0x72])
with filepath.open('rb') as fh:
header = fh.read(262)
if header.startswith(zip_sig): return 'zip'
if header.startswith(seven_z): return '7z'
if header.startswith(lzma_xz): return '7z'
if header.startswith(tgz): return 'tar.gz'
if header.startswith(tbz): return 'tar.bz2'
if header[0x101:0x101 + len(ustar)] == ustar: return 'tar'
return None
def unpack_archive(
archive_path: str | Path,
dest_path: str | Path,
rm_archive: bool = True,
) -> None:
"""
Распаковывает архив в указанную директорию.
Поддерживает: tar, tar.gz, tar.bz2, tar.xz, gz (одиночный файл), zip, 7z.
При неудаче основного метода делает fallback к 7z.
Принимает URL — скачивает архив сам.
"""
if str(archive_path).startswith(('https://', 'http://')):
archive_path = download(str(archive_path), save_path=WORK_FOLDER, progress=False)
archive_path = Path(archive_path)
dest_path = Path(dest_path)
if not archive_path.exists():
raise RuntimeError(f'Архив не найден: {archive_path}')
fmt = determine_archive_format(archive_path)
dest_path.mkdir(parents=True, exist_ok=True)
def _rm() -> None:
if rm_archive:
archive_path.unlink(missing_ok=True)
def _try_7z() -> None:
result = run(
f'7z -bso0 -bd -mmt4 -slp -y x "{archive_path}" -o"{dest_path}"',
timeout=180,
)
if result['status_code'] != 0:
raise RuntimeError(
f'Не удалось распаковать {archive_path.name}.\n{result["output"]}'
)
_rm()
try:
if fmt == 'zip' or archive_path.suffix == '.zip':
r = run(f'unzip -o "{archive_path}" -d "{dest_path}"', timeout=180)
if r['status_code'] != 0:
raise RuntimeError(r['output'])
_rm()
elif fmt in ('7z',) or archive_path.suffix == '.7z':
r = run(
f'7z -bso0 -bd -slp -y x "{archive_path}" -o"{dest_path}"',
timeout=180,
)
if r['status_code'] != 0:
raise RuntimeError(r['output'])
_rm()
elif fmt in ('tar', 'tar.gz', 'tar.bz2') or archive_path.suffix in (
'.tar', '.tar.gz', '.tgz', '.tar.bz2', '.tar.xz', '.txz', '.gz',
):
# GNU tar определяет формат сжатия автоматически через -xpf
r = run(f'tar -xpf "{archive_path}" -C "{dest_path}"', timeout=180)
if r['status_code'] != 0:
# Возможно, это одиночный gzip-файл без tar-обёртки (напр. tmole-linux.gz)
gz_r = run(f'gzip -df "{archive_path}"', timeout=60)
if gz_r['status_code'] != 0:
raise RuntimeError(
f'tar: {r["output"]}\ngzip: {gz_r["output"]}'
)
# gzip -f уже удалил .gz и создал распакованный файл на месте
return
_rm()
else:
_try_7z()
except RuntimeError as e:
logger.debug(f'Основной метод распаковки не сработал ({e}), fallback к 7z.')
_try_7z()
# ---------------------------------------------------------------------------
# Сеть
# ---------------------------------------------------------------------------
def is_ipv4(address: str) -> bool:
pattern = re.compile(
r'^((25[0-5]|2[0-4]\d|[01]?\d\d?)\.){3}(25[0-5]|2[0-4]\d|[01]?\d\d?)$'
)
return bool(pattern.match(address))
def is_valid_url(url: str, custom_headers: dict | None = None) -> bool:
"""
Проверяет доступность URL (HEAD или GET запрос).
Returns:
True, если сервер вернул HTTP 2xx.
"""
req_headers = HEADERS.copy()
if custom_headers:
req_headers.update(custom_headers)
for _ in range(2):
try:
with get_url(url, headers=req_headers, allow_redirects=True, stream=True, timeout=10) as resp:
if 200 <= resp.status_code < 300:
return True
except Exception:
try:
resp = get_head(url, headers=req_headers, allow_redirects=True, timeout=10)
if 200 <= resp.status_code < 300:
return True
except Exception:
pass
else:
break
return False
def get_filename_from_headers(headers: CaseInsensitiveDict) -> str | None:
cd = headers.get('content-disposition')
if not cd:
return headers.get('filename')
for part in cd.split(';'):
part = part.strip()
if part.startswith('filename*='):
encoding, _, encoded = part[len('filename*='):].partition("''")
return unquote(encoded, encoding=encoding)
if part.startswith('filename='):
return part[len('filename='):].strip('"')
return None
def download(
url: str,
filename: str | Path | None = None,
save_path: str | Path | None = None,
progress: bool = True,
extra_headers: dict[str, str] | None = None,
) -> Path:
"""
Скачивает файл по URL.
Args:
url: URL файла.
filename: Имя для сохранения (None — из заголовков или URL).
save_path: Папка сохранения (None — текущая директория).
progress: Показывать прогресс загрузки.
extra_headers: Дополнительные HTTP-заголовки.
Returns:
Path к скачанному файлу.
Raises:
RuntimeError: При ошибке сети или записи файла.
"""
save_path = Path(save_path) if save_path else Path.cwd()
save_path.mkdir(parents=True, exist_ok=True)
req_headers = HEADERS.copy()
if extra_headers:
req_headers.update(extra_headers)
try:
resp = get_url(url, stream=True, allow_redirects=True, headers=req_headers, timeout=60)
resp.raise_for_status()
except Exception as e:
raise RuntimeError(f'Не удалось скачать файл по ссылке {url}:\n{e}')
file_size = int(resp.headers.get('content-length', 0))
raw_name = (
str(filename)
if filename
else (get_filename_from_headers(resp.headers) or Path(urlparse(resp.url).path).name)
)
file_name = Path(raw_name).name
file_path = save_path / file_name
chunk_size = max(4096, file_size // 2000) if file_size else 4096
downloaded = 0
start = time.time()
try:
with open(file_path, 'wb') as fp:
for chunk in resp.iter_content(chunk_size=chunk_size):
if chunk:
fp.write(chunk)
if progress and file_size:
downloaded += len(chunk)
elapsed = time.time() - start
print(
f'\rзагрузка {file_name}: '
f'{downloaded / file_size * 100:.2f}% | {elapsed:.2f} сек.',
end='',
)
except Exception as e:
raise RuntimeError(f'Не удалось сохранить файл из {url}:\n{e}')
if progress and file_size:
print()
logger.debug(f'Скачан: {file_path} ({file_size} байт)')
return file_path
def get_github_latest_release_url(repo: str, asset_pattern: str) -> str:
"""
Возвращает URL загрузки актуального релизного ассета с GitHub.
Args:
repo: Репозиторий в формате 'owner/repo'.
asset_pattern: Регулярное выражение для имени ассета.
Returns:
browser_download_url для найденного ассета.
Raises:
RuntimeError: Если ассет не найден или GitHub API недоступен.
"""
api_url = f'https://api.github.com/repos/{repo}/releases/latest'
resp = get_url(api_url, timeout=10)
resp.raise_for_status()
assets = resp.json().get('assets', [])
for asset in assets:
if re.search(asset_pattern, asset['name']):
return asset['browser_download_url']
available = [a['name'] for a in assets]
raise RuntimeError(
f'Ассет "{asset_pattern}" не найден в {repo}. '
f'Доступные: {available}'
)