|
|
|
|
|
|
|
|
| """Routines common to all posix systems."""
|
|
|
| import enum
|
| import glob
|
| import os
|
| import signal
|
| import time
|
|
|
| from ._common import MACOS
|
| from ._common import TimeoutExpired
|
| from ._common import memoize
|
| from ._common import sdiskusage
|
| from ._common import usage_percent
|
|
|
| if MACOS:
|
| from . import _psutil_osx
|
|
|
|
|
| __all__ = ['pid_exists', 'wait_pid', 'disk_usage', 'get_terminal_map']
|
|
|
|
|
| def pid_exists(pid):
|
| """Check whether pid exists in the current process table."""
|
| if pid == 0:
|
|
|
|
|
|
|
|
|
|
|
| return True
|
| try:
|
| os.kill(pid, 0)
|
| except ProcessLookupError:
|
| return False
|
| except PermissionError:
|
|
|
| return True
|
|
|
|
|
| else:
|
| return True
|
|
|
|
|
| Negsignal = enum.IntEnum(
|
| 'Negsignal', {x.name: -x.value for x in signal.Signals}
|
| )
|
|
|
|
|
| def negsig_to_enum(num):
|
| """Convert a negative signal value to an enum."""
|
| try:
|
| return Negsignal(num)
|
| except ValueError:
|
| return num
|
|
|
|
|
| def wait_pid(
|
| pid,
|
| timeout=None,
|
| proc_name=None,
|
| _waitpid=os.waitpid,
|
| _timer=getattr(time, 'monotonic', time.time),
|
| _min=min,
|
| _sleep=time.sleep,
|
| _pid_exists=pid_exists,
|
| ):
|
| """Wait for a process PID to terminate.
|
|
|
| If the process terminated normally by calling exit(3) or _exit(2),
|
| or by returning from main(), the return value is the positive integer
|
| passed to *exit().
|
|
|
| If it was terminated by a signal it returns the negated value of the
|
| signal which caused the termination (e.g. -SIGTERM).
|
|
|
| If PID is not a children of os.getpid() (current process) just
|
| wait until the process disappears and return None.
|
|
|
| If PID does not exist at all return None immediately.
|
|
|
| If *timeout* != None and process is still alive raise TimeoutExpired.
|
| timeout=0 is also possible (either return immediately or raise).
|
| """
|
| if pid <= 0:
|
|
|
| msg = "can't wait for PID 0"
|
| raise ValueError(msg)
|
| interval = 0.0001
|
| flags = 0
|
| if timeout is not None:
|
| flags |= os.WNOHANG
|
| stop_at = _timer() + timeout
|
|
|
| def sleep(interval):
|
|
|
| if timeout is not None:
|
| if _timer() >= stop_at:
|
| raise TimeoutExpired(timeout, pid=pid, name=proc_name)
|
| _sleep(interval)
|
| return _min(interval * 2, 0.04)
|
|
|
|
|
| while True:
|
| try:
|
| retpid, status = os.waitpid(pid, flags)
|
| except InterruptedError:
|
| interval = sleep(interval)
|
| except ChildProcessError:
|
|
|
|
|
|
|
|
|
|
|
|
|
| while _pid_exists(pid):
|
| interval = sleep(interval)
|
| return None
|
| else:
|
| if retpid == 0:
|
|
|
| interval = sleep(interval)
|
| continue
|
|
|
| if os.WIFEXITED(status):
|
|
|
|
|
|
|
| return os.WEXITSTATUS(status)
|
| elif os.WIFSIGNALED(status):
|
|
|
|
|
| return negsig_to_enum(-os.WTERMSIG(status))
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| else:
|
|
|
| msg = f"unknown process exit status {status!r}"
|
| raise ValueError(msg)
|
|
|
|
|
| def disk_usage(path):
|
| """Return disk usage associated with path.
|
| Note: UNIX usually reserves 5% disk space which is not accessible
|
| by user. In this function "total" and "used" values reflect the
|
| total and used disk space whereas "free" and "percent" represent
|
| the "free" and "used percent" user disk space.
|
| """
|
| st = os.statvfs(path)
|
|
|
|
|
| total = st.f_blocks * st.f_frsize
|
|
|
| avail_to_root = st.f_bfree * st.f_frsize
|
|
|
| avail_to_user = st.f_bavail * st.f_frsize
|
|
|
| used = total - avail_to_root
|
| if MACOS:
|
|
|
| used = _psutil_osx.disk_usage_used(path, used)
|
|
|
|
|
| total_user = used + avail_to_user
|
|
|
|
|
|
|
| usage_percent_user = usage_percent(used, total_user, round_=1)
|
|
|
|
|
|
|
|
|
| return sdiskusage(
|
| total=total, used=used, free=avail_to_user, percent=usage_percent_user
|
| )
|
|
|
|
|
| @memoize
|
| def get_terminal_map():
|
| """Get a map of device-id -> path as a dict.
|
| Used by Process.terminal().
|
| """
|
| ret = {}
|
| ls = glob.glob('/dev/tty*') + glob.glob('/dev/pts/*')
|
| for name in ls:
|
| assert name not in ret, name
|
| try:
|
| ret[os.stat(name).st_rdev] = name
|
| except FileNotFoundError:
|
| pass
|
| return ret
|
|
|