|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
"""POSIX specific tests."""
|
|
|
|
|
|
import datetime
|
|
|
import errno
|
|
|
import os
|
|
|
import re
|
|
|
import shutil
|
|
|
import subprocess
|
|
|
import time
|
|
|
from unittest import mock
|
|
|
|
|
|
import psutil
|
|
|
from psutil import AIX
|
|
|
from psutil import BSD
|
|
|
from psutil import LINUX
|
|
|
from psutil import MACOS
|
|
|
from psutil import OPENBSD
|
|
|
from psutil import POSIX
|
|
|
from psutil import SUNOS
|
|
|
from psutil.tests import AARCH64
|
|
|
from psutil.tests import HAS_NET_IO_COUNTERS
|
|
|
from psutil.tests import PYTHON_EXE
|
|
|
from psutil.tests import PsutilTestCase
|
|
|
from psutil.tests import pytest
|
|
|
from psutil.tests import retry_on_failure
|
|
|
from psutil.tests import sh
|
|
|
from psutil.tests import skip_on_access_denied
|
|
|
from psutil.tests import spawn_testproc
|
|
|
from psutil.tests import terminate
|
|
|
|
|
|
|
|
|
if POSIX:
|
|
|
import mmap
|
|
|
import resource
|
|
|
|
|
|
from psutil._psutil_posix import getpagesize
|
|
|
|
|
|
|
|
|
def ps(fmt, pid=None):
|
|
|
"""Wrapper for calling the ps command with a little bit of cross-platform
|
|
|
support for a narrow range of features.
|
|
|
"""
|
|
|
|
|
|
cmd = ['ps']
|
|
|
|
|
|
if LINUX:
|
|
|
cmd.append('--no-headers')
|
|
|
|
|
|
if pid is not None:
|
|
|
cmd.extend(['-p', str(pid)])
|
|
|
elif SUNOS or AIX:
|
|
|
cmd.append('-A')
|
|
|
else:
|
|
|
cmd.append('ax')
|
|
|
|
|
|
if SUNOS:
|
|
|
fmt = fmt.replace("start", "stime")
|
|
|
|
|
|
cmd.extend(['-o', fmt])
|
|
|
|
|
|
output = sh(cmd)
|
|
|
|
|
|
output = output.splitlines() if LINUX else output.splitlines()[1:]
|
|
|
|
|
|
all_output = []
|
|
|
for line in output:
|
|
|
line = line.strip()
|
|
|
|
|
|
try:
|
|
|
line = int(line)
|
|
|
except ValueError:
|
|
|
pass
|
|
|
|
|
|
all_output.append(line)
|
|
|
|
|
|
if pid is None:
|
|
|
return all_output
|
|
|
else:
|
|
|
return all_output[0]
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def ps_name(pid):
|
|
|
field = "command"
|
|
|
if SUNOS:
|
|
|
field = "comm"
|
|
|
command = ps(field, pid).split()
|
|
|
return command[0]
|
|
|
|
|
|
|
|
|
def ps_args(pid):
|
|
|
field = "command"
|
|
|
if AIX or SUNOS:
|
|
|
field = "args"
|
|
|
out = ps(field, pid)
|
|
|
|
|
|
out = re.sub(r"\(python.*?\)$", "", out)
|
|
|
return out.strip()
|
|
|
|
|
|
|
|
|
def ps_rss(pid):
|
|
|
field = "rss"
|
|
|
if AIX:
|
|
|
field = "rssize"
|
|
|
return ps(field, pid)
|
|
|
|
|
|
|
|
|
def ps_vsz(pid):
|
|
|
field = "vsz"
|
|
|
if AIX:
|
|
|
field = "vsize"
|
|
|
return ps(field, pid)
|
|
|
|
|
|
|
|
|
def df(device):
|
|
|
try:
|
|
|
out = sh(f"df -k {device}").strip()
|
|
|
except RuntimeError as err:
|
|
|
if "device busy" in str(err).lower():
|
|
|
raise pytest.skip("df returned EBUSY")
|
|
|
raise
|
|
|
line = out.split('\n')[1]
|
|
|
fields = line.split()
|
|
|
sys_total = int(fields[1]) * 1024
|
|
|
sys_used = int(fields[2]) * 1024
|
|
|
sys_free = int(fields[3]) * 1024
|
|
|
sys_percent = float(fields[4].replace('%', ''))
|
|
|
return (sys_total, sys_used, sys_free, sys_percent)
|
|
|
|
|
|
|
|
|
@pytest.mark.skipif(not POSIX, reason="POSIX only")
|
|
|
class TestProcess(PsutilTestCase):
|
|
|
"""Compare psutil results against 'ps' command line utility (mainly)."""
|
|
|
|
|
|
@classmethod
|
|
|
def setUpClass(cls):
|
|
|
cls.pid = spawn_testproc(
|
|
|
[PYTHON_EXE, "-E", "-O"], stdin=subprocess.PIPE
|
|
|
).pid
|
|
|
|
|
|
@classmethod
|
|
|
def tearDownClass(cls):
|
|
|
terminate(cls.pid)
|
|
|
|
|
|
def test_ppid(self):
|
|
|
ppid_ps = ps('ppid', self.pid)
|
|
|
ppid_psutil = psutil.Process(self.pid).ppid()
|
|
|
assert ppid_ps == ppid_psutil
|
|
|
|
|
|
def test_uid(self):
|
|
|
uid_ps = ps('uid', self.pid)
|
|
|
uid_psutil = psutil.Process(self.pid).uids().real
|
|
|
assert uid_ps == uid_psutil
|
|
|
|
|
|
def test_gid(self):
|
|
|
gid_ps = ps('rgid', self.pid)
|
|
|
gid_psutil = psutil.Process(self.pid).gids().real
|
|
|
assert gid_ps == gid_psutil
|
|
|
|
|
|
def test_username(self):
|
|
|
username_ps = ps('user', self.pid)
|
|
|
username_psutil = psutil.Process(self.pid).username()
|
|
|
assert username_ps == username_psutil
|
|
|
|
|
|
def test_username_no_resolution(self):
|
|
|
|
|
|
|
|
|
|
|
|
p = psutil.Process()
|
|
|
with mock.patch("psutil.pwd.getpwuid", side_effect=KeyError) as fun:
|
|
|
assert p.username() == str(p.uids().real)
|
|
|
assert fun.called
|
|
|
|
|
|
@skip_on_access_denied()
|
|
|
@retry_on_failure()
|
|
|
def test_rss_memory(self):
|
|
|
|
|
|
|
|
|
time.sleep(0.1)
|
|
|
rss_ps = ps_rss(self.pid)
|
|
|
rss_psutil = psutil.Process(self.pid).memory_info()[0] / 1024
|
|
|
assert rss_ps == rss_psutil
|
|
|
|
|
|
@skip_on_access_denied()
|
|
|
@retry_on_failure()
|
|
|
def test_vsz_memory(self):
|
|
|
|
|
|
|
|
|
time.sleep(0.1)
|
|
|
vsz_ps = ps_vsz(self.pid)
|
|
|
vsz_psutil = psutil.Process(self.pid).memory_info()[1] / 1024
|
|
|
assert vsz_ps == vsz_psutil
|
|
|
|
|
|
def test_name(self):
|
|
|
name_ps = ps_name(self.pid)
|
|
|
|
|
|
name_ps = os.path.basename(name_ps).lower()
|
|
|
name_psutil = psutil.Process(self.pid).name().lower()
|
|
|
|
|
|
|
|
|
name_ps = re.sub(r"\d.\d", "", name_ps)
|
|
|
name_psutil = re.sub(r"\d.\d", "", name_psutil)
|
|
|
|
|
|
name_ps = re.sub(r"\d", "", name_ps)
|
|
|
name_psutil = re.sub(r"\d", "", name_psutil)
|
|
|
assert name_ps == name_psutil
|
|
|
|
|
|
def test_name_long(self):
|
|
|
|
|
|
|
|
|
|
|
|
name = "long-program-name"
|
|
|
cmdline = ["long-program-name-extended", "foo", "bar"]
|
|
|
with mock.patch("psutil._psplatform.Process.name", return_value=name):
|
|
|
with mock.patch(
|
|
|
"psutil._psplatform.Process.cmdline", return_value=cmdline
|
|
|
):
|
|
|
p = psutil.Process()
|
|
|
assert p.name() == "long-program-name-extended"
|
|
|
|
|
|
def test_name_long_cmdline_ad_exc(self):
|
|
|
|
|
|
|
|
|
|
|
|
name = "long-program-name"
|
|
|
with mock.patch("psutil._psplatform.Process.name", return_value=name):
|
|
|
with mock.patch(
|
|
|
"psutil._psplatform.Process.cmdline",
|
|
|
side_effect=psutil.AccessDenied(0, ""),
|
|
|
):
|
|
|
p = psutil.Process()
|
|
|
assert p.name() == "long-program-name"
|
|
|
|
|
|
def test_name_long_cmdline_nsp_exc(self):
|
|
|
|
|
|
|
|
|
name = "long-program-name"
|
|
|
with mock.patch("psutil._psplatform.Process.name", return_value=name):
|
|
|
with mock.patch(
|
|
|
"psutil._psplatform.Process.cmdline",
|
|
|
side_effect=psutil.NoSuchProcess(0, ""),
|
|
|
):
|
|
|
p = psutil.Process()
|
|
|
with pytest.raises(psutil.NoSuchProcess):
|
|
|
p.name()
|
|
|
|
|
|
@pytest.mark.skipif(MACOS or BSD, reason="ps -o start not available")
|
|
|
def test_create_time(self):
|
|
|
time_ps = ps('start', self.pid)
|
|
|
time_psutil = psutil.Process(self.pid).create_time()
|
|
|
time_psutil_tstamp = datetime.datetime.fromtimestamp(
|
|
|
time_psutil
|
|
|
).strftime("%H:%M:%S")
|
|
|
|
|
|
|
|
|
round_time_psutil = round(time_psutil)
|
|
|
round_time_psutil_tstamp = datetime.datetime.fromtimestamp(
|
|
|
round_time_psutil
|
|
|
).strftime("%H:%M:%S")
|
|
|
assert time_ps in {time_psutil_tstamp, round_time_psutil_tstamp}
|
|
|
|
|
|
def test_exe(self):
|
|
|
ps_pathname = ps_name(self.pid)
|
|
|
psutil_pathname = psutil.Process(self.pid).exe()
|
|
|
try:
|
|
|
assert ps_pathname == psutil_pathname
|
|
|
except AssertionError:
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
adjusted_ps_pathname = ps_pathname[: len(ps_pathname)]
|
|
|
assert ps_pathname == adjusted_ps_pathname
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@retry_on_failure()
|
|
|
def test_cmdline(self):
|
|
|
ps_cmdline = ps_args(self.pid)
|
|
|
psutil_cmdline = " ".join(psutil.Process(self.pid).cmdline())
|
|
|
if AARCH64 and len(ps_cmdline) < len(psutil_cmdline):
|
|
|
assert psutil_cmdline.startswith(ps_cmdline)
|
|
|
else:
|
|
|
assert ps_cmdline == psutil_cmdline
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@pytest.mark.skipif(SUNOS, reason="not reliable on SUNOS")
|
|
|
@pytest.mark.skipif(AIX, reason="not reliable on AIX")
|
|
|
def test_nice(self):
|
|
|
ps_nice = ps('nice', self.pid)
|
|
|
psutil_nice = psutil.Process().nice()
|
|
|
assert ps_nice == psutil_nice
|
|
|
|
|
|
|
|
|
@pytest.mark.skipif(not POSIX, reason="POSIX only")
|
|
|
class TestSystemAPIs(PsutilTestCase):
|
|
|
"""Test some system APIs."""
|
|
|
|
|
|
@retry_on_failure()
|
|
|
def test_pids(self):
|
|
|
|
|
|
|
|
|
pids_ps = sorted(ps("pid"))
|
|
|
pids_psutil = psutil.pids()
|
|
|
|
|
|
|
|
|
if MACOS or (OPENBSD and 0 not in pids_ps):
|
|
|
pids_ps.insert(0, 0)
|
|
|
|
|
|
|
|
|
if len(pids_ps) - len(pids_psutil) > 1:
|
|
|
difference = [x for x in pids_psutil if x not in pids_ps] + [
|
|
|
x for x in pids_ps if x not in pids_psutil
|
|
|
]
|
|
|
raise self.fail("difference: " + str(difference))
|
|
|
|
|
|
|
|
|
|
|
|
@pytest.mark.skipif(SUNOS, reason="unreliable on SUNOS")
|
|
|
@pytest.mark.skipif(not shutil.which("ifconfig"), reason="no ifconfig cmd")
|
|
|
@pytest.mark.skipif(not HAS_NET_IO_COUNTERS, reason="not supported")
|
|
|
def test_nic_names(self):
|
|
|
output = sh("ifconfig -a")
|
|
|
for nic in psutil.net_io_counters(pernic=True):
|
|
|
for line in output.split():
|
|
|
if line.startswith(nic):
|
|
|
break
|
|
|
else:
|
|
|
raise self.fail(
|
|
|
f"couldn't find {nic} nic in 'ifconfig -a'"
|
|
|
f" output\n{output}"
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
@retry_on_failure()
|
|
|
def test_users(self):
|
|
|
out = sh("who -u")
|
|
|
if not out.strip():
|
|
|
raise pytest.skip("no users on this system")
|
|
|
lines = out.split('\n')
|
|
|
users = [x.split()[0] for x in lines]
|
|
|
terminals = [x.split()[1] for x in lines]
|
|
|
assert len(users) == len(psutil.users())
|
|
|
with self.subTest(psutil=psutil.users(), who=out):
|
|
|
for idx, u in enumerate(psutil.users()):
|
|
|
assert u.name == users[idx]
|
|
|
assert u.terminal == terminals[idx]
|
|
|
if u.pid is not None:
|
|
|
psutil.Process(u.pid)
|
|
|
|
|
|
@retry_on_failure()
|
|
|
def test_users_started(self):
|
|
|
out = sh("who -u")
|
|
|
if not out.strip():
|
|
|
raise pytest.skip("no users on this system")
|
|
|
tstamp = None
|
|
|
|
|
|
started = re.findall(r"\d\d\d\d-\d\d-\d\d \d\d:\d\d", out)
|
|
|
if started:
|
|
|
tstamp = "%Y-%m-%d %H:%M"
|
|
|
else:
|
|
|
|
|
|
started = re.findall(r"[A-Z][a-z][a-z] \d\d \d\d:\d\d", out)
|
|
|
if started:
|
|
|
tstamp = "%b %d %H:%M"
|
|
|
else:
|
|
|
|
|
|
started = re.findall(r"[A-Z][a-z][a-z] \d\d", out)
|
|
|
if started:
|
|
|
tstamp = "%b %d"
|
|
|
else:
|
|
|
|
|
|
started = re.findall(r"[a-z][a-z][a-z] \d\d", out)
|
|
|
if started:
|
|
|
tstamp = "%b %d"
|
|
|
started = [x.capitalize() for x in started]
|
|
|
|
|
|
if not tstamp:
|
|
|
raise pytest.skip(f"cannot interpret tstamp in who output\n{out}")
|
|
|
|
|
|
with self.subTest(psutil=psutil.users(), who=out):
|
|
|
for idx, u in enumerate(psutil.users()):
|
|
|
psutil_value = datetime.datetime.fromtimestamp(
|
|
|
u.started
|
|
|
).strftime(tstamp)
|
|
|
assert psutil_value == started[idx]
|
|
|
|
|
|
def test_pid_exists_let_raise(self):
|
|
|
|
|
|
|
|
|
|
|
|
with mock.patch(
|
|
|
"psutil._psposix.os.kill", side_effect=OSError(errno.EBADF, "")
|
|
|
) as m:
|
|
|
with pytest.raises(OSError):
|
|
|
psutil._psposix.pid_exists(os.getpid())
|
|
|
assert m.called
|
|
|
|
|
|
def test_os_waitpid_let_raise(self):
|
|
|
|
|
|
|
|
|
with mock.patch(
|
|
|
"psutil._psposix.os.waitpid", side_effect=OSError(errno.EBADF, "")
|
|
|
) as m:
|
|
|
with pytest.raises(OSError):
|
|
|
psutil._psposix.wait_pid(os.getpid())
|
|
|
assert m.called
|
|
|
|
|
|
def test_os_waitpid_eintr(self):
|
|
|
|
|
|
with mock.patch(
|
|
|
"psutil._psposix.os.waitpid", side_effect=OSError(errno.EINTR, "")
|
|
|
) as m:
|
|
|
with pytest.raises(psutil._psposix.TimeoutExpired):
|
|
|
psutil._psposix.wait_pid(os.getpid(), timeout=0.01)
|
|
|
assert m.called
|
|
|
|
|
|
def test_os_waitpid_bad_ret_status(self):
|
|
|
|
|
|
with mock.patch(
|
|
|
"psutil._psposix.os.waitpid", return_value=(1, -1)
|
|
|
) as m:
|
|
|
with pytest.raises(ValueError):
|
|
|
psutil._psposix.wait_pid(os.getpid())
|
|
|
assert m.called
|
|
|
|
|
|
|
|
|
@pytest.mark.skipif(AIX, reason="unreliable on AIX")
|
|
|
@retry_on_failure()
|
|
|
def test_disk_usage(self):
|
|
|
tolerance = 4 * 1024 * 1024
|
|
|
for part in psutil.disk_partitions(all=False):
|
|
|
usage = psutil.disk_usage(part.mountpoint)
|
|
|
try:
|
|
|
sys_total, sys_used, sys_free, sys_percent = df(part.device)
|
|
|
except RuntimeError as err:
|
|
|
|
|
|
|
|
|
|
|
|
err = str(err).lower()
|
|
|
if (
|
|
|
"no such file or directory" in err
|
|
|
or "raw devices not supported" in err
|
|
|
or "permission denied" in err
|
|
|
):
|
|
|
continue
|
|
|
raise
|
|
|
else:
|
|
|
assert abs(usage.total - sys_total) < tolerance
|
|
|
assert abs(usage.used - sys_used) < tolerance
|
|
|
assert abs(usage.free - sys_free) < tolerance
|
|
|
assert abs(usage.percent - sys_percent) <= 1
|
|
|
|
|
|
|
|
|
@pytest.mark.skipif(not POSIX, reason="POSIX only")
|
|
|
class TestMisc(PsutilTestCase):
|
|
|
def test_getpagesize(self):
|
|
|
pagesize = getpagesize()
|
|
|
assert pagesize > 0
|
|
|
assert pagesize == resource.getpagesize()
|
|
|
assert pagesize == mmap.PAGESIZE
|
|
|
|