| """
|
| Process Lock Handler for VPN Server
|
| Ensures only one instance of the server is running
|
| """
|
|
|
| import os
|
| import fcntl
|
| import errno
|
| import atexit
|
| import logging
|
| from typing import Optional
|
|
|
| logger = logging.getLogger(__name__)
|
|
|
| class ProcessLock:
|
| def __init__(self, lock_file: str = "/tmp/outline_vpn.lock"):
|
| self.lock_file = lock_file
|
| self.lock_fd: Optional[int] = None
|
| atexit.register(self.release)
|
|
|
| def acquire(self) -> bool:
|
| """Acquire process lock. Returns True if successful, False if already locked."""
|
| try:
|
|
|
| self.lock_fd = os.open(self.lock_file, os.O_CREAT | os.O_RDWR)
|
| fcntl.flock(self.lock_fd, fcntl.LOCK_EX | fcntl.LOCK_NB)
|
|
|
|
|
| os.truncate(self.lock_fd, 0)
|
| os.write(self.lock_fd, str(os.getpid()).encode())
|
|
|
| return True
|
|
|
| except (IOError, OSError) as e:
|
| if e.errno == errno.EWOULDBLOCK:
|
|
|
| logger.warning("Another instance of the VPN server is already running")
|
| else:
|
| logger.error(f"Failed to acquire process lock: {e}")
|
| return False
|
|
|
| def release(self):
|
| """Release the process lock"""
|
| if self.lock_fd is not None:
|
| try:
|
| fcntl.flock(self.lock_fd, fcntl.LOCK_UN)
|
| os.close(self.lock_fd)
|
| os.unlink(self.lock_file)
|
| self.lock_fd = None
|
| except (IOError, OSError) as e:
|
| logger.error(f"Failed to release process lock: {e}")
|
|
|