|
|
from __future__ import annotations |
|
|
|
|
|
import os |
|
|
import sys |
|
|
from contextlib import suppress |
|
|
from errno import EACCES |
|
|
from pathlib import Path |
|
|
from typing import cast |
|
|
|
|
|
from ._api import BaseFileLock |
|
|
from ._util import ensure_directory_exists, raise_on_not_writable_file |
|
|
|
|
|
if sys.platform == "win32": |
|
|
import ctypes |
|
|
import msvcrt |
|
|
from ctypes import wintypes |
|
|
|
|
|
|
|
|
FILE_ATTRIBUTE_REPARSE_POINT = 0x00000400 |
|
|
INVALID_FILE_ATTRIBUTES = 0xFFFFFFFF |
|
|
|
|
|
|
|
|
_kernel32 = ctypes.WinDLL("kernel32", use_last_error=True) |
|
|
_kernel32.GetFileAttributesW.argtypes = [wintypes.LPCWSTR] |
|
|
_kernel32.GetFileAttributesW.restype = wintypes.DWORD |
|
|
|
|
|
def _is_reparse_point(path: str) -> bool: |
|
|
""" |
|
|
Check if a path is a reparse point (symlink, junction, etc.) on Windows. |
|
|
|
|
|
:param path: Path to check |
|
|
:return: True if path is a reparse point, False otherwise |
|
|
:raises OSError: If GetFileAttributesW fails for reasons other than file-not-found |
|
|
""" |
|
|
attrs = _kernel32.GetFileAttributesW(path) |
|
|
if attrs == INVALID_FILE_ATTRIBUTES: |
|
|
|
|
|
err = ctypes.get_last_error() |
|
|
if err == 2: |
|
|
return False |
|
|
if err == 3: |
|
|
return False |
|
|
|
|
|
return False |
|
|
return bool(attrs & FILE_ATTRIBUTE_REPARSE_POINT) |
|
|
|
|
|
class WindowsFileLock(BaseFileLock): |
|
|
"""Uses the :func:`msvcrt.locking` function to hard lock the lock file on Windows systems.""" |
|
|
|
|
|
def _acquire(self) -> None: |
|
|
raise_on_not_writable_file(self.lock_file) |
|
|
ensure_directory_exists(self.lock_file) |
|
|
|
|
|
|
|
|
|
|
|
if _is_reparse_point(self.lock_file): |
|
|
msg = f"Lock file is a reparse point (symlink/junction): {self.lock_file}" |
|
|
raise OSError(msg) |
|
|
|
|
|
flags = ( |
|
|
os.O_RDWR |
|
|
| os.O_CREAT |
|
|
| os.O_TRUNC |
|
|
) |
|
|
try: |
|
|
fd = os.open(self.lock_file, flags, self._context.mode) |
|
|
except OSError as exception: |
|
|
if exception.errno != EACCES: |
|
|
raise |
|
|
else: |
|
|
try: |
|
|
msvcrt.locking(fd, msvcrt.LK_NBLCK, 1) |
|
|
except OSError as exception: |
|
|
os.close(fd) |
|
|
if exception.errno != EACCES: |
|
|
raise |
|
|
else: |
|
|
self._context.lock_file_fd = fd |
|
|
|
|
|
def _release(self) -> None: |
|
|
fd = cast("int", self._context.lock_file_fd) |
|
|
self._context.lock_file_fd = None |
|
|
msvcrt.locking(fd, msvcrt.LK_UNLCK, 1) |
|
|
os.close(fd) |
|
|
|
|
|
with suppress(OSError): |
|
|
Path(self.lock_file).unlink() |
|
|
|
|
|
else: |
|
|
|
|
|
class WindowsFileLock(BaseFileLock): |
|
|
"""Uses the :func:`msvcrt.locking` function to hard lock the lock file on Windows systems.""" |
|
|
|
|
|
def _acquire(self) -> None: |
|
|
raise NotImplementedError |
|
|
|
|
|
def _release(self) -> None: |
|
|
raise NotImplementedError |
|
|
|
|
|
|
|
|
__all__ = [ |
|
|
"WindowsFileLock", |
|
|
] |
|
|
|