|
|
|
|
|
|
|
|
|
|
|
|
| """Tests for psutil.Process class."""
|
|
|
| import collections
|
| import contextlib
|
| import errno
|
| import getpass
|
| import io
|
| import itertools
|
| import os
|
| import signal
|
| import socket
|
| import stat
|
| import string
|
| import subprocess
|
| import sys
|
| import textwrap
|
| import time
|
| from unittest import mock
|
|
|
| import psutil
|
| from psutil import AIX
|
| from psutil import BSD
|
| from psutil import LINUX
|
| from psutil import MACOS
|
| from psutil import NETBSD
|
| from psutil import OPENBSD
|
| from psutil import OSX
|
| from psutil import POSIX
|
| from psutil import WINDOWS
|
| from psutil._common import open_text
|
| from psutil.tests import CI_TESTING
|
| from psutil.tests import GITHUB_ACTIONS
|
| from psutil.tests import GLOBAL_TIMEOUT
|
| from psutil.tests import HAS_CPU_AFFINITY
|
| from psutil.tests import HAS_ENVIRON
|
| from psutil.tests import HAS_IONICE
|
| from psutil.tests import HAS_MEMORY_MAPS
|
| from psutil.tests import HAS_PROC_CPU_NUM
|
| from psutil.tests import HAS_PROC_IO_COUNTERS
|
| from psutil.tests import HAS_RLIMIT
|
| from psutil.tests import HAS_THREADS
|
| from psutil.tests import MACOS_11PLUS
|
| from psutil.tests import PYPY
|
| from psutil.tests import PYTHON_EXE
|
| from psutil.tests import PYTHON_EXE_ENV
|
| from psutil.tests import PsutilTestCase
|
| from psutil.tests import ThreadTask
|
| from psutil.tests import call_until
|
| from psutil.tests import copyload_shared_lib
|
| from psutil.tests import create_c_exe
|
| from psutil.tests import create_py_exe
|
| from psutil.tests import process_namespace
|
| from psutil.tests import pytest
|
| from psutil.tests import reap_children
|
| from psutil.tests import retry_on_failure
|
| from psutil.tests import sh
|
| from psutil.tests import skip_on_access_denied
|
| from psutil.tests import skip_on_not_implemented
|
| from psutil.tests import wait_for_pid
|
|
|
|
|
|
|
|
|
|
|
|
|
| class TestProcess(PsutilTestCase):
|
| """Tests for psutil.Process class."""
|
|
|
| def test_pid(self):
|
| p = psutil.Process()
|
| assert p.pid == os.getpid()
|
| with pytest.raises(AttributeError):
|
| p.pid = 33
|
|
|
| def test_kill(self):
|
| p = self.spawn_psproc()
|
| p.kill()
|
| code = p.wait()
|
| if WINDOWS:
|
| assert code == signal.SIGTERM
|
| else:
|
| assert code == -signal.SIGKILL
|
| self.assert_proc_gone(p)
|
|
|
| def test_terminate(self):
|
| p = self.spawn_psproc()
|
| p.terminate()
|
| code = p.wait()
|
| if WINDOWS:
|
| assert code == signal.SIGTERM
|
| else:
|
| assert code == -signal.SIGTERM
|
| self.assert_proc_gone(p)
|
|
|
| def test_send_signal(self):
|
| sig = signal.SIGKILL if POSIX else signal.SIGTERM
|
| p = self.spawn_psproc()
|
| p.send_signal(sig)
|
| code = p.wait()
|
| if WINDOWS:
|
| assert code == sig
|
| else:
|
| assert code == -sig
|
| self.assert_proc_gone(p)
|
|
|
| @pytest.mark.skipif(not POSIX, reason="not POSIX")
|
| def test_send_signal_mocked(self):
|
| sig = signal.SIGTERM
|
| p = self.spawn_psproc()
|
| with mock.patch('psutil.os.kill', side_effect=ProcessLookupError):
|
| with pytest.raises(psutil.NoSuchProcess):
|
| p.send_signal(sig)
|
|
|
| p = self.spawn_psproc()
|
| with mock.patch('psutil.os.kill', side_effect=PermissionError):
|
| with pytest.raises(psutil.AccessDenied):
|
| p.send_signal(sig)
|
|
|
| def test_wait_exited(self):
|
|
|
|
|
| cmd = [PYTHON_EXE, "-c", "pass"]
|
| p = self.spawn_psproc(cmd)
|
| code = p.wait()
|
| assert code == 0
|
| self.assert_proc_gone(p)
|
|
|
| cmd = [PYTHON_EXE, "-c", "1 / 0"]
|
| p = self.spawn_psproc(cmd, stderr=subprocess.PIPE)
|
| code = p.wait()
|
| assert code == 1
|
| self.assert_proc_gone(p)
|
|
|
| cmd = [PYTHON_EXE, "-c", "import sys; sys.exit(5);"]
|
| p = self.spawn_psproc(cmd)
|
| code = p.wait()
|
| assert code == 5
|
| self.assert_proc_gone(p)
|
|
|
| cmd = [PYTHON_EXE, "-c", "import os; os._exit(5);"]
|
| p = self.spawn_psproc(cmd)
|
| code = p.wait()
|
| assert code == 5
|
| self.assert_proc_gone(p)
|
|
|
| @pytest.mark.skipif(NETBSD, reason="fails on NETBSD")
|
| def test_wait_stopped(self):
|
| p = self.spawn_psproc()
|
| if POSIX:
|
|
|
|
|
| p.send_signal(signal.SIGSTOP)
|
| with pytest.raises(psutil.TimeoutExpired):
|
| p.wait(timeout=0.001)
|
| p.send_signal(signal.SIGCONT)
|
| with pytest.raises(psutil.TimeoutExpired):
|
| p.wait(timeout=0.001)
|
| p.send_signal(signal.SIGTERM)
|
| assert p.wait() == -signal.SIGTERM
|
| assert p.wait() == -signal.SIGTERM
|
| else:
|
| p.suspend()
|
| with pytest.raises(psutil.TimeoutExpired):
|
| p.wait(timeout=0.001)
|
| p.resume()
|
| with pytest.raises(psutil.TimeoutExpired):
|
| p.wait(timeout=0.001)
|
| p.terminate()
|
| assert p.wait() == signal.SIGTERM
|
| assert p.wait() == signal.SIGTERM
|
|
|
| def test_wait_non_children(self):
|
|
|
|
|
| child, grandchild = self.spawn_children_pair()
|
| with pytest.raises(psutil.TimeoutExpired):
|
| child.wait(0.01)
|
| with pytest.raises(psutil.TimeoutExpired):
|
| grandchild.wait(0.01)
|
|
|
|
|
| child.terminate()
|
| grandchild.terminate()
|
| child_ret = child.wait()
|
| grandchild_ret = grandchild.wait()
|
| if POSIX:
|
| assert child_ret == -signal.SIGTERM
|
|
|
|
|
| assert grandchild_ret is None
|
| else:
|
| assert child_ret == signal.SIGTERM
|
| assert child_ret == signal.SIGTERM
|
|
|
| def test_wait_timeout(self):
|
| p = self.spawn_psproc()
|
| p.name()
|
| with pytest.raises(psutil.TimeoutExpired):
|
| p.wait(0.01)
|
| with pytest.raises(psutil.TimeoutExpired):
|
| p.wait(0)
|
| with pytest.raises(ValueError):
|
| p.wait(-1)
|
|
|
| def test_wait_timeout_nonblocking(self):
|
| p = self.spawn_psproc()
|
| with pytest.raises(psutil.TimeoutExpired):
|
| p.wait(0)
|
| p.kill()
|
| stop_at = time.time() + GLOBAL_TIMEOUT
|
| while time.time() < stop_at:
|
| try:
|
| code = p.wait(0)
|
| break
|
| except psutil.TimeoutExpired:
|
| pass
|
| else:
|
| return pytest.fail('timeout')
|
| if POSIX:
|
| assert code == -signal.SIGKILL
|
| else:
|
| assert code == signal.SIGTERM
|
| self.assert_proc_gone(p)
|
|
|
| def test_cpu_percent(self):
|
| p = psutil.Process()
|
| p.cpu_percent(interval=0.001)
|
| p.cpu_percent(interval=0.001)
|
| for _ in range(100):
|
| percent = p.cpu_percent(interval=None)
|
| assert isinstance(percent, float)
|
| assert percent >= 0.0
|
| with pytest.raises(ValueError):
|
| p.cpu_percent(interval=-1)
|
|
|
| def test_cpu_percent_numcpus_none(self):
|
|
|
| with mock.patch('psutil.cpu_count', return_value=None) as m:
|
| psutil.Process().cpu_percent()
|
| assert m.called
|
|
|
| def test_cpu_times(self):
|
| times = psutil.Process().cpu_times()
|
| assert times.user >= 0.0, times
|
| assert times.system >= 0.0, times
|
| assert times.children_user >= 0.0, times
|
| assert times.children_system >= 0.0, times
|
| if LINUX:
|
| assert times.iowait >= 0.0, times
|
|
|
| for name in times._fields:
|
| time.strftime("%H:%M:%S", time.localtime(getattr(times, name)))
|
|
|
| @pytest.mark.skipif(not HAS_PROC_CPU_NUM, reason="not supported")
|
| def test_cpu_num(self):
|
| p = psutil.Process()
|
| num = p.cpu_num()
|
| assert num >= 0
|
| if psutil.cpu_count() == 1:
|
| assert num == 0
|
| assert p.cpu_num() in range(psutil.cpu_count())
|
|
|
| def test_create_time(self):
|
| p = self.spawn_psproc()
|
| now = time.time()
|
|
|
| assert abs(p.create_time() - now) < 2
|
|
|
| time.strftime("%Y %m %d %H:%M:%S", time.localtime(p.create_time()))
|
|
|
| @pytest.mark.skipif(not POSIX, reason="POSIX only")
|
| def test_terminal(self):
|
| terminal = psutil.Process().terminal()
|
| if terminal is not None:
|
| try:
|
| tty = os.path.realpath(sh('tty'))
|
| except RuntimeError:
|
|
|
| return pytest.skip("can't rely on `tty` CLI")
|
| else:
|
| assert terminal == tty
|
|
|
| @pytest.mark.skipif(not HAS_PROC_IO_COUNTERS, reason="not supported")
|
| @skip_on_not_implemented(only_if=LINUX)
|
| def test_io_counters(self):
|
| p = psutil.Process()
|
|
|
| io1 = p.io_counters()
|
| with open(PYTHON_EXE, 'rb') as f:
|
| f.read()
|
| io2 = p.io_counters()
|
| if not BSD and not AIX:
|
| assert io2.read_count > io1.read_count
|
| assert io2.write_count == io1.write_count
|
| if LINUX:
|
| assert io2.read_chars > io1.read_chars
|
| assert io2.write_chars == io1.write_chars
|
| else:
|
| assert io2.read_bytes >= io1.read_bytes
|
| assert io2.write_bytes >= io1.write_bytes
|
|
|
|
|
| io1 = p.io_counters()
|
| with open(self.get_testfn(), 'wb') as f:
|
| f.write(bytes("x" * 1000000, 'ascii'))
|
| io2 = p.io_counters()
|
| assert io2.write_count >= io1.write_count
|
| assert io2.write_bytes >= io1.write_bytes
|
| assert io2.read_count >= io1.read_count
|
| assert io2.read_bytes >= io1.read_bytes
|
| if LINUX:
|
| assert io2.write_chars > io1.write_chars
|
| assert io2.read_chars >= io1.read_chars
|
|
|
|
|
| for i in range(len(io2)):
|
| if BSD and i >= 2:
|
|
|
| continue
|
| assert io2[i] >= 0
|
| assert io2[i] >= 0
|
|
|
| @pytest.mark.skipif(not HAS_IONICE, reason="not supported")
|
| @pytest.mark.skipif(not LINUX, reason="linux only")
|
| def test_ionice_linux(self):
|
| def cleanup(init):
|
| ioclass, value = init
|
| if ioclass == psutil.IOPRIO_CLASS_NONE:
|
| value = 0
|
| p.ionice(ioclass, value)
|
|
|
| p = psutil.Process()
|
| if not CI_TESTING:
|
| assert p.ionice()[0] == psutil.IOPRIO_CLASS_NONE
|
| assert psutil.IOPRIO_CLASS_NONE == 0
|
| assert psutil.IOPRIO_CLASS_RT == 1
|
| assert psutil.IOPRIO_CLASS_BE == 2
|
| assert psutil.IOPRIO_CLASS_IDLE == 3
|
| init = p.ionice()
|
| self.addCleanup(cleanup, init)
|
|
|
|
|
| p.ionice(psutil.IOPRIO_CLASS_IDLE)
|
| assert tuple(p.ionice()) == (psutil.IOPRIO_CLASS_IDLE, 0)
|
| with pytest.raises(ValueError):
|
| p.ionice(psutil.IOPRIO_CLASS_IDLE, value=7)
|
|
|
| p.ionice(psutil.IOPRIO_CLASS_BE)
|
| assert tuple(p.ionice()) == (psutil.IOPRIO_CLASS_BE, 0)
|
| p.ionice(psutil.IOPRIO_CLASS_BE, value=7)
|
| assert tuple(p.ionice()) == (psutil.IOPRIO_CLASS_BE, 7)
|
| with pytest.raises(ValueError):
|
| p.ionice(psutil.IOPRIO_CLASS_BE, value=8)
|
| try:
|
| p.ionice(psutil.IOPRIO_CLASS_RT, value=7)
|
| except psutil.AccessDenied:
|
| pass
|
|
|
| with pytest.raises(ValueError, match="ioclass accepts no value"):
|
| p.ionice(psutil.IOPRIO_CLASS_NONE, 1)
|
| with pytest.raises(ValueError, match="ioclass accepts no value"):
|
| p.ionice(psutil.IOPRIO_CLASS_IDLE, 1)
|
| with pytest.raises(
|
| ValueError, match="'ioclass' argument must be specified"
|
| ):
|
| p.ionice(value=1)
|
|
|
| @pytest.mark.skipif(not HAS_IONICE, reason="not supported")
|
| @pytest.mark.skipif(
|
| not WINDOWS, reason="not supported on this win version"
|
| )
|
| def test_ionice_win(self):
|
| p = psutil.Process()
|
| if not CI_TESTING:
|
| assert p.ionice() == psutil.IOPRIO_NORMAL
|
| init = p.ionice()
|
| self.addCleanup(p.ionice, init)
|
|
|
|
|
| p.ionice(psutil.IOPRIO_VERYLOW)
|
| assert p.ionice() == psutil.IOPRIO_VERYLOW
|
| p.ionice(psutil.IOPRIO_LOW)
|
| assert p.ionice() == psutil.IOPRIO_LOW
|
| try:
|
| p.ionice(psutil.IOPRIO_HIGH)
|
| except psutil.AccessDenied:
|
| pass
|
| else:
|
| assert p.ionice() == psutil.IOPRIO_HIGH
|
|
|
| with pytest.raises(
|
| TypeError, match="value argument not accepted on Windows"
|
| ):
|
| p.ionice(psutil.IOPRIO_NORMAL, value=1)
|
| with pytest.raises(ValueError, match="is not a valid priority"):
|
| p.ionice(psutil.IOPRIO_HIGH + 1)
|
|
|
| @pytest.mark.skipif(not HAS_RLIMIT, reason="not supported")
|
| def test_rlimit_get(self):
|
| import resource
|
|
|
| p = psutil.Process(os.getpid())
|
| names = [x for x in dir(psutil) if x.startswith('RLIMIT')]
|
| assert names, names
|
| for name in names:
|
| value = getattr(psutil, name)
|
| assert value >= 0
|
| if name in dir(resource):
|
| assert value == getattr(resource, name)
|
|
|
|
|
|
|
| if PYPY:
|
| continue
|
| assert p.rlimit(value) == resource.getrlimit(value)
|
| else:
|
| ret = p.rlimit(value)
|
| assert len(ret) == 2
|
| assert ret[0] >= -1
|
| assert ret[1] >= -1
|
|
|
| @pytest.mark.skipif(not HAS_RLIMIT, reason="not supported")
|
| def test_rlimit_set(self):
|
| p = self.spawn_psproc()
|
| p.rlimit(psutil.RLIMIT_NOFILE, (5, 5))
|
| assert p.rlimit(psutil.RLIMIT_NOFILE) == (5, 5)
|
|
|
|
|
| if LINUX:
|
| with pytest.raises(ValueError, match="can't use prlimit"):
|
| psutil._psplatform.Process(0).rlimit(0)
|
| with pytest.raises(ValueError):
|
| p.rlimit(psutil.RLIMIT_NOFILE, (5, 5, 5))
|
|
|
| @pytest.mark.skipif(not HAS_RLIMIT, reason="not supported")
|
| def test_rlimit(self):
|
| p = psutil.Process()
|
| testfn = self.get_testfn()
|
| soft, hard = p.rlimit(psutil.RLIMIT_FSIZE)
|
| try:
|
| p.rlimit(psutil.RLIMIT_FSIZE, (1024, hard))
|
| with open(testfn, "wb") as f:
|
| f.write(b"X" * 1024)
|
|
|
|
|
| with pytest.raises(OSError) as exc:
|
| with open(testfn, "wb") as f:
|
| f.write(b"X" * 1025)
|
| assert exc.value.errno == errno.EFBIG
|
| finally:
|
| p.rlimit(psutil.RLIMIT_FSIZE, (soft, hard))
|
| assert p.rlimit(psutil.RLIMIT_FSIZE) == (soft, hard)
|
|
|
| @pytest.mark.skipif(not HAS_RLIMIT, reason="not supported")
|
| def test_rlimit_infinity(self):
|
|
|
|
|
| p = psutil.Process()
|
| soft, hard = p.rlimit(psutil.RLIMIT_FSIZE)
|
| try:
|
| p.rlimit(psutil.RLIMIT_FSIZE, (1024, hard))
|
| p.rlimit(psutil.RLIMIT_FSIZE, (psutil.RLIM_INFINITY, hard))
|
| with open(self.get_testfn(), "wb") as f:
|
| f.write(b"X" * 2048)
|
| finally:
|
| p.rlimit(psutil.RLIMIT_FSIZE, (soft, hard))
|
| assert p.rlimit(psutil.RLIMIT_FSIZE) == (soft, hard)
|
|
|
| @pytest.mark.skipif(not HAS_RLIMIT, reason="not supported")
|
| def test_rlimit_infinity_value(self):
|
|
|
|
|
|
|
|
|
|
|
| p = psutil.Process()
|
| soft, hard = p.rlimit(psutil.RLIMIT_FSIZE)
|
| assert hard == psutil.RLIM_INFINITY
|
| p.rlimit(psutil.RLIMIT_FSIZE, (soft, hard))
|
|
|
| @pytest.mark.xdist_group(name="serial")
|
| def test_num_threads(self):
|
|
|
|
|
|
|
| p = psutil.Process()
|
| if OPENBSD:
|
| try:
|
| step1 = p.num_threads()
|
| except psutil.AccessDenied:
|
| return pytest.skip("on OpenBSD this requires root access")
|
| else:
|
| step1 = p.num_threads()
|
|
|
| with ThreadTask():
|
| step2 = p.num_threads()
|
| assert step2 == step1 + 1
|
|
|
| @pytest.mark.skipif(not WINDOWS, reason="WINDOWS only")
|
| def test_num_handles(self):
|
|
|
| p = psutil.Process()
|
| assert p.num_handles() > 0
|
|
|
| @pytest.mark.skipif(not HAS_THREADS, reason="not supported")
|
| def test_threads(self):
|
| p = psutil.Process()
|
| if OPENBSD:
|
| try:
|
| step1 = p.threads()
|
| except psutil.AccessDenied:
|
| return pytest.skip("on OpenBSD this requires root access")
|
| else:
|
| step1 = p.threads()
|
|
|
| with ThreadTask():
|
| step2 = p.threads()
|
| assert len(step2) == len(step1) + 1
|
| athread = step2[0]
|
|
|
| assert athread.id == athread[0]
|
| assert athread.user_time == athread[1]
|
| assert athread.system_time == athread[2]
|
|
|
| @retry_on_failure()
|
| @skip_on_access_denied(only_if=MACOS)
|
| @pytest.mark.skipif(not HAS_THREADS, reason="not supported")
|
| def test_threads_2(self):
|
| p = self.spawn_psproc()
|
| if OPENBSD:
|
| try:
|
| p.threads()
|
| except psutil.AccessDenied:
|
| return pytest.skip("on OpenBSD this requires root access")
|
| assert (
|
| abs(p.cpu_times().user - sum(x.user_time for x in p.threads()))
|
| < 0.1
|
| )
|
| assert (
|
| abs(p.cpu_times().system - sum(x.system_time for x in p.threads()))
|
| < 0.1
|
| )
|
|
|
| @retry_on_failure()
|
| def test_memory_info(self):
|
| p = psutil.Process()
|
|
|
|
|
| rss1, vms1 = p.memory_info()[:2]
|
| percent1 = p.memory_percent()
|
| assert rss1 > 0
|
| assert vms1 > 0
|
|
|
|
|
| memarr = [None] * 1500000
|
|
|
| rss2, vms2 = p.memory_info()[:2]
|
| percent2 = p.memory_percent()
|
|
|
|
|
| assert rss2 > rss1
|
| assert vms2 >= vms1
|
| assert percent2 > percent1
|
| del memarr
|
|
|
| if WINDOWS:
|
| mem = p.memory_info()
|
| assert mem.rss == mem.wset
|
| assert mem.vms == mem.pagefile
|
|
|
| mem = p.memory_info()
|
| for name in mem._fields:
|
| assert getattr(mem, name) >= 0
|
|
|
| def test_memory_full_info(self):
|
| p = psutil.Process()
|
| total = psutil.virtual_memory().total
|
| mem = p.memory_full_info()
|
| for name in mem._fields:
|
| value = getattr(mem, name)
|
| assert value >= 0
|
| if (name == "vms" and OSX) or LINUX:
|
| continue
|
| assert value <= total
|
| if LINUX or WINDOWS or MACOS:
|
| assert mem.uss >= 0
|
| if LINUX:
|
| assert mem.pss >= 0
|
| assert mem.swap >= 0
|
|
|
| @pytest.mark.skipif(not HAS_MEMORY_MAPS, reason="not supported")
|
| def test_memory_maps(self):
|
| p = psutil.Process()
|
| maps = p.memory_maps()
|
| assert len(maps) == len(set(maps))
|
| ext_maps = p.memory_maps(grouped=False)
|
|
|
| for nt in maps:
|
| if nt.path.startswith('['):
|
| continue
|
| if BSD and nt.path == "pvclock":
|
| continue
|
| assert os.path.isabs(nt.path), nt.path
|
|
|
| if POSIX:
|
| try:
|
| assert os.path.exists(nt.path) or os.path.islink(
|
| nt.path
|
| ), nt.path
|
| except AssertionError:
|
| if not LINUX:
|
| raise
|
|
|
| with open_text('/proc/self/smaps') as f:
|
| data = f.read()
|
| if f"{nt.path} (deleted)" not in data:
|
| raise
|
| elif '64' not in os.path.basename(nt.path):
|
|
|
|
|
|
|
| try:
|
| st = os.stat(nt.path)
|
| except FileNotFoundError:
|
| pass
|
| else:
|
| assert stat.S_ISREG(st.st_mode), nt.path
|
|
|
| for nt in ext_maps:
|
| for fname in nt._fields:
|
| value = getattr(nt, fname)
|
| if fname == 'path':
|
| continue
|
| if fname in {'addr', 'perms'}:
|
| assert value, value
|
| else:
|
| assert isinstance(value, int)
|
| assert value >= 0, value
|
|
|
| @pytest.mark.skipif(not HAS_MEMORY_MAPS, reason="not supported")
|
| def test_memory_maps_lists_lib(self):
|
|
|
| p = psutil.Process()
|
| with copyload_shared_lib() as path:
|
|
|
| def normpath(p):
|
| return os.path.realpath(os.path.normcase(p))
|
|
|
| libpaths = [normpath(x.path) for x in p.memory_maps()]
|
| assert normpath(path) in libpaths
|
|
|
| def test_memory_percent(self):
|
| p = psutil.Process()
|
| p.memory_percent()
|
| with pytest.raises(ValueError):
|
| p.memory_percent(memtype="?!?")
|
| if LINUX or MACOS or WINDOWS:
|
| p.memory_percent(memtype='uss')
|
|
|
| def test_is_running(self):
|
| p = self.spawn_psproc()
|
| assert p.is_running()
|
| assert p.is_running()
|
| p.kill()
|
| p.wait()
|
| assert not p.is_running()
|
| assert not p.is_running()
|
|
|
| def test_exe(self):
|
| p = self.spawn_psproc()
|
| exe = p.exe()
|
| try:
|
| assert exe == PYTHON_EXE
|
| except AssertionError:
|
| if WINDOWS and len(exe) == len(PYTHON_EXE):
|
|
|
| normcase = os.path.normcase
|
| assert normcase(exe) == normcase(PYTHON_EXE)
|
| else:
|
|
|
|
|
|
|
|
|
|
|
|
|
| ver = f"{sys.version_info[0]}.{sys.version_info[1]}"
|
| try:
|
| assert exe.replace(ver, '') == PYTHON_EXE.replace(ver, '')
|
| except AssertionError:
|
|
|
| pass
|
|
|
| out = sh([exe, "-c", "import os; print('hey')"])
|
| assert out == 'hey'
|
|
|
| def test_cmdline(self):
|
| cmdline = [
|
| PYTHON_EXE,
|
| "-c",
|
| "import time; [time.sleep(0.1) for x in range(100)]",
|
| ]
|
| p = self.spawn_psproc(cmdline)
|
|
|
| if NETBSD and p.cmdline() == []:
|
|
|
| return pytest.skip("OPENBSD: returned EBUSY")
|
|
|
|
|
|
|
|
|
|
|
|
|
| if NETBSD or OPENBSD or AIX:
|
| assert p.cmdline()[0] == PYTHON_EXE
|
| else:
|
| if MACOS and CI_TESTING:
|
| pyexe = p.cmdline()[0]
|
| if pyexe != PYTHON_EXE:
|
| assert ' '.join(p.cmdline()[1:]) == ' '.join(cmdline[1:])
|
| return None
|
| assert ' '.join(p.cmdline()) == ' '.join(cmdline)
|
|
|
| def test_long_cmdline(self):
|
| cmdline = [PYTHON_EXE]
|
| cmdline.extend(["-v"] * 50)
|
| cmdline.extend(
|
| ["-c", "import time; [time.sleep(0.1) for x in range(100)]"]
|
| )
|
| p = self.spawn_psproc(cmdline)
|
|
|
|
|
|
|
| cmdline = cmdline[1:]
|
|
|
| if OPENBSD:
|
|
|
|
|
| try:
|
| assert p.cmdline()[1:] == cmdline
|
| except psutil.ZombieProcess:
|
| return pytest.skip("OPENBSD: process turned into zombie")
|
| else:
|
| ret = p.cmdline()[1:]
|
| if NETBSD and ret == []:
|
|
|
| return pytest.skip("OPENBSD: returned EBUSY")
|
| assert ret == cmdline
|
|
|
| def test_name(self):
|
| p = self.spawn_psproc()
|
| name = p.name().lower()
|
| if name.endswith("t"):
|
| name = name[:-1]
|
| pyexe = os.path.basename(os.path.realpath(sys.executable)).lower()
|
| assert pyexe.startswith(name), (pyexe, name)
|
|
|
| @retry_on_failure()
|
| def test_long_name(self):
|
| pyexe = create_py_exe(self.get_testfn(suffix=string.digits * 2))
|
| cmdline = [
|
| pyexe,
|
| "-c",
|
| "import time; [time.sleep(0.1) for x in range(100)]",
|
| ]
|
| p = self.spawn_psproc(cmdline)
|
| if OPENBSD:
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| try:
|
| assert p.name() == os.path.basename(pyexe)
|
| except AssertionError:
|
| if p.status() == psutil.STATUS_ZOMBIE:
|
| assert os.path.basename(pyexe).startswith(p.name())
|
| else:
|
| raise
|
| else:
|
| assert p.name() == os.path.basename(pyexe)
|
|
|
| @pytest.mark.skipif(not POSIX, reason="POSIX only")
|
| def test_uids(self):
|
| p = psutil.Process()
|
| real, effective, _saved = p.uids()
|
|
|
| assert real == os.getuid()
|
|
|
| assert effective == os.geteuid()
|
|
|
|
|
| if hasattr(os, "getresuid"):
|
| assert os.getresuid() == p.uids()
|
|
|
| @pytest.mark.skipif(not POSIX, reason="POSIX only")
|
| def test_gids(self):
|
| p = psutil.Process()
|
| real, effective, _saved = p.gids()
|
|
|
| assert real == os.getgid()
|
|
|
| assert effective == os.getegid()
|
|
|
|
|
| if hasattr(os, "getresuid"):
|
| assert os.getresgid() == p.gids()
|
|
|
| def test_nice(self):
|
| def cleanup(init):
|
| try:
|
| p.nice(init)
|
| except psutil.AccessDenied:
|
| pass
|
|
|
| p = psutil.Process()
|
| with pytest.raises(TypeError):
|
| p.nice("str")
|
| init = p.nice()
|
| self.addCleanup(cleanup, init)
|
|
|
| if WINDOWS:
|
| highest_prio = None
|
| for prio in [
|
| psutil.IDLE_PRIORITY_CLASS,
|
| psutil.BELOW_NORMAL_PRIORITY_CLASS,
|
| psutil.NORMAL_PRIORITY_CLASS,
|
| psutil.ABOVE_NORMAL_PRIORITY_CLASS,
|
| psutil.HIGH_PRIORITY_CLASS,
|
| psutil.REALTIME_PRIORITY_CLASS,
|
| ]:
|
| with self.subTest(prio=prio):
|
| try:
|
| p.nice(prio)
|
| except psutil.AccessDenied:
|
| pass
|
| else:
|
| new_prio = p.nice()
|
|
|
|
|
|
|
|
|
| if prio in {
|
| psutil.ABOVE_NORMAL_PRIORITY_CLASS,
|
| psutil.HIGH_PRIORITY_CLASS,
|
| psutil.REALTIME_PRIORITY_CLASS,
|
| }:
|
| if new_prio == prio or highest_prio is None:
|
| highest_prio = prio
|
| assert new_prio == highest_prio
|
| else:
|
| assert new_prio == prio
|
| else:
|
| try:
|
| if hasattr(os, "getpriority"):
|
| assert (
|
| os.getpriority(os.PRIO_PROCESS, os.getpid())
|
| == p.nice()
|
| )
|
| p.nice(1)
|
| assert p.nice() == 1
|
| if hasattr(os, "getpriority"):
|
| assert (
|
| os.getpriority(os.PRIO_PROCESS, os.getpid())
|
| == p.nice()
|
| )
|
|
|
|
|
| if not MACOS:
|
| p.nice(0)
|
| assert p.nice() == 0
|
| except psutil.AccessDenied:
|
| pass
|
|
|
| def test_status(self):
|
| p = psutil.Process()
|
| assert p.status() == psutil.STATUS_RUNNING
|
|
|
| def test_username(self):
|
| p = self.spawn_psproc()
|
| username = p.username()
|
| if WINDOWS:
|
| domain, username = username.split('\\')
|
| getpass_user = getpass.getuser()
|
| if getpass_user.endswith('$'):
|
|
|
|
|
|
|
| return pytest.skip('running as service account')
|
| assert username == getpass_user
|
| if 'USERDOMAIN' in os.environ:
|
| assert domain == os.environ['USERDOMAIN']
|
| else:
|
| assert username == getpass.getuser()
|
|
|
| def test_cwd(self):
|
| p = self.spawn_psproc()
|
| assert p.cwd() == os.getcwd()
|
|
|
| def test_cwd_2(self):
|
| cmd = [
|
| PYTHON_EXE,
|
| "-c",
|
| (
|
| "import os, time; os.chdir('..'); [time.sleep(0.1) for x in"
|
| " range(100)]"
|
| ),
|
| ]
|
| p = self.spawn_psproc(cmd)
|
| call_until(lambda: p.cwd() == os.path.dirname(os.getcwd()))
|
|
|
| @pytest.mark.skipif(not HAS_CPU_AFFINITY, reason="not supported")
|
| def test_cpu_affinity(self):
|
| p = psutil.Process()
|
| initial = p.cpu_affinity()
|
| assert initial, initial
|
| self.addCleanup(p.cpu_affinity, initial)
|
|
|
| if hasattr(os, "sched_getaffinity"):
|
| assert initial == list(os.sched_getaffinity(p.pid))
|
| assert len(initial) == len(set(initial))
|
|
|
| all_cpus = list(range(len(psutil.cpu_percent(percpu=True))))
|
| for n in all_cpus:
|
| p.cpu_affinity([n])
|
| assert p.cpu_affinity() == [n]
|
| if hasattr(os, "sched_getaffinity"):
|
| assert p.cpu_affinity() == list(os.sched_getaffinity(p.pid))
|
|
|
| if hasattr(p, "num_cpu"):
|
| assert p.cpu_affinity()[0] == p.num_cpu()
|
|
|
|
|
|
|
|
|
| p.cpu_affinity([])
|
| if LINUX:
|
| assert p.cpu_affinity() == p._proc._get_eligible_cpus()
|
| else:
|
| assert p.cpu_affinity() == all_cpus
|
| if hasattr(os, "sched_getaffinity"):
|
| assert p.cpu_affinity() == list(os.sched_getaffinity(p.pid))
|
|
|
| with pytest.raises(TypeError):
|
| p.cpu_affinity(1)
|
| p.cpu_affinity(initial)
|
|
|
| p.cpu_affinity(set(all_cpus))
|
| p.cpu_affinity(tuple(all_cpus))
|
|
|
| @pytest.mark.skipif(not HAS_CPU_AFFINITY, reason="not supported")
|
| def test_cpu_affinity_errs(self):
|
| p = self.spawn_psproc()
|
| invalid_cpu = [len(psutil.cpu_times(percpu=True)) + 10]
|
| with pytest.raises(ValueError):
|
| p.cpu_affinity(invalid_cpu)
|
| with pytest.raises(ValueError):
|
| p.cpu_affinity(range(10000, 11000))
|
| with pytest.raises((TypeError, ValueError)):
|
| p.cpu_affinity([0, "1"])
|
| with pytest.raises(ValueError):
|
| p.cpu_affinity([0, -1])
|
|
|
| @pytest.mark.skipif(not HAS_CPU_AFFINITY, reason="not supported")
|
| def test_cpu_affinity_all_combinations(self):
|
| p = psutil.Process()
|
| initial = p.cpu_affinity()
|
| assert initial, initial
|
| self.addCleanup(p.cpu_affinity, initial)
|
|
|
|
|
| if len(initial) > 12:
|
| initial = initial[:12]
|
| combos = []
|
| for i in range(len(initial) + 1):
|
| combos.extend(
|
| list(subset)
|
| for subset in itertools.combinations(initial, i)
|
| if subset
|
| )
|
|
|
| for combo in combos:
|
| p.cpu_affinity(combo)
|
| assert sorted(p.cpu_affinity()) == sorted(combo)
|
|
|
|
|
| @pytest.mark.skipif(BSD, reason="broken on BSD")
|
| def test_open_files(self):
|
| p = psutil.Process()
|
| testfn = self.get_testfn()
|
| files = p.open_files()
|
| assert testfn not in files
|
| with open(testfn, 'wb') as f:
|
| f.write(b'x' * 1024)
|
| f.flush()
|
|
|
| call_until(lambda: len(p.open_files()) != len(files))
|
| files = p.open_files()
|
| filenames = [os.path.normcase(x.path) for x in files]
|
| assert os.path.normcase(testfn) in filenames
|
| if LINUX:
|
| for file in files:
|
| if file.path == testfn:
|
| assert file.position == 1024
|
| for file in files:
|
| assert os.path.isfile(file.path), file
|
|
|
|
|
| cmdline = (
|
| f"import time; f = open(r'{testfn}', 'r'); [time.sleep(0.1) for x"
|
| " in range(100)];"
|
| )
|
| p = self.spawn_psproc([PYTHON_EXE, "-c", cmdline])
|
|
|
| for x in range(100):
|
| filenames = [os.path.normcase(x.path) for x in p.open_files()]
|
| if testfn in filenames:
|
| break
|
| time.sleep(0.01)
|
| else:
|
| assert os.path.normcase(testfn) in filenames
|
| for file in filenames:
|
| assert os.path.isfile(file), file
|
|
|
|
|
| @pytest.mark.skipif(BSD, reason="broken on BSD")
|
| def test_open_files_2(self):
|
|
|
| p = psutil.Process()
|
| normcase = os.path.normcase
|
| testfn = self.get_testfn()
|
| with open(testfn, 'w') as fileobj:
|
| for file in p.open_files():
|
| if (
|
| normcase(file.path) == normcase(fileobj.name)
|
| or file.fd == fileobj.fileno()
|
| ):
|
| break
|
| else:
|
| return pytest.fail(f"no file found; files={p.open_files()!r}")
|
| assert normcase(file.path) == normcase(fileobj.name)
|
| if WINDOWS:
|
| assert file.fd == -1
|
| else:
|
| assert file.fd == fileobj.fileno()
|
|
|
| ntuple = p.open_files()[0]
|
| assert ntuple[0] == ntuple.path
|
| assert ntuple[1] == ntuple.fd
|
|
|
| assert fileobj.name not in p.open_files()
|
|
|
| @pytest.mark.skipif(not POSIX, reason="POSIX only")
|
| @pytest.mark.xdist_group(name="serial")
|
| def test_num_fds(self):
|
| p = psutil.Process()
|
| testfn = self.get_testfn()
|
| start = p.num_fds()
|
| with open(testfn, 'w'):
|
| assert p.num_fds() == start + 1
|
| with socket.socket():
|
| assert p.num_fds() == start + 2
|
| assert p.num_fds() == start
|
|
|
| @skip_on_not_implemented(only_if=LINUX)
|
| @pytest.mark.skipif(
|
| OPENBSD or NETBSD, reason="not reliable on OPENBSD & NETBSD"
|
| )
|
| def test_num_ctx_switches(self):
|
| p = psutil.Process()
|
| before = sum(p.num_ctx_switches())
|
| for _ in range(2):
|
| time.sleep(0.05)
|
| after = sum(p.num_ctx_switches())
|
| if after > before:
|
| return None
|
| return pytest.fail(
|
| "num ctx switches still the same after 2 iterations"
|
| )
|
|
|
| def test_ppid(self):
|
| p = psutil.Process()
|
| if hasattr(os, 'getppid'):
|
| assert p.ppid() == os.getppid()
|
| p = self.spawn_psproc()
|
| assert p.ppid() == os.getpid()
|
|
|
| def test_parent(self):
|
| p = self.spawn_psproc()
|
| assert p.parent().pid == os.getpid()
|
|
|
| lowest_pid = psutil.pids()[0]
|
| assert psutil.Process(lowest_pid).parent() is None
|
|
|
| def test_parent_mocked_ctime(self):
|
|
|
|
|
|
|
|
|
|
|
| p = self.spawn_psproc()
|
| p.create_time()
|
| assert p._create_time
|
| p._create_time = 1
|
| assert p.parent().pid == os.getpid()
|
|
|
| def test_parent_multi(self):
|
| parent = psutil.Process()
|
| child, grandchild = self.spawn_children_pair()
|
| assert grandchild.parent() == child
|
| assert child.parent() == parent
|
|
|
| @retry_on_failure()
|
| def test_parents(self):
|
| parent = psutil.Process()
|
| assert parent.parents()
|
| child, grandchild = self.spawn_children_pair()
|
| assert child.parents()[0] == parent
|
| assert grandchild.parents()[0] == child
|
| assert grandchild.parents()[1] == parent
|
|
|
| def test_children(self):
|
| parent = psutil.Process()
|
| assert not parent.children()
|
| assert not parent.children(recursive=True)
|
|
|
|
|
|
|
| child = self.spawn_psproc(creationflags=0)
|
| children1 = parent.children()
|
| children2 = parent.children(recursive=True)
|
| for children in (children1, children2):
|
| assert len(children) == 1
|
| assert children[0].pid == child.pid
|
| assert children[0].ppid() == parent.pid
|
|
|
| def test_children_mocked_ctime(self):
|
|
|
|
|
|
|
|
|
|
|
| parent = psutil.Process()
|
| parent.create_time()
|
| assert parent._create_time
|
| parent._create_time += 100000
|
|
|
| assert not parent.children()
|
| assert not parent.children(recursive=True)
|
|
|
|
|
|
|
| child = self.spawn_psproc(creationflags=0)
|
| children1 = parent.children()
|
| children2 = parent.children(recursive=True)
|
| for children in (children1, children2):
|
| assert len(children) == 1
|
| assert children[0].pid == child.pid
|
| assert children[0].ppid() == parent.pid
|
|
|
| def test_children_recursive(self):
|
|
|
|
|
| parent = psutil.Process()
|
| child, grandchild = self.spawn_children_pair()
|
| assert parent.children() == [child]
|
| assert parent.children(recursive=True) == [child, grandchild]
|
|
|
|
|
| child.terminate()
|
| child.wait()
|
| assert not parent.children(recursive=True)
|
|
|
| def test_children_duplicates(self):
|
|
|
| table = collections.defaultdict(int)
|
| for p in psutil.process_iter():
|
| try:
|
| table[p.ppid()] += 1
|
| except psutil.Error:
|
| pass
|
|
|
| pid = max(table.items(), key=lambda x: x[1])[0]
|
| if LINUX and pid == 0:
|
| return pytest.skip("PID 0")
|
| p = psutil.Process(pid)
|
| try:
|
| c = p.children(recursive=True)
|
| except psutil.AccessDenied:
|
| pass
|
| else:
|
| assert len(c) == len(set(c))
|
|
|
| def test_parents_and_children(self):
|
| parent = psutil.Process()
|
| child, grandchild = self.spawn_children_pair()
|
|
|
| children = parent.children(recursive=True)
|
| assert len(children) == 2
|
| assert children[0] == child
|
| assert children[1] == grandchild
|
|
|
| parents = grandchild.parents()
|
| assert parents[0] == child
|
| assert parents[1] == parent
|
|
|
| def test_suspend_resume(self):
|
| p = self.spawn_psproc()
|
| p.suspend()
|
| for _ in range(100):
|
| if p.status() == psutil.STATUS_STOPPED:
|
| break
|
| time.sleep(0.01)
|
| p.resume()
|
| assert p.status() != psutil.STATUS_STOPPED
|
|
|
| def test_invalid_pid(self):
|
| with pytest.raises(TypeError):
|
| psutil.Process("1")
|
| with pytest.raises(ValueError):
|
| psutil.Process(-1)
|
|
|
| def test_as_dict(self):
|
| p = psutil.Process()
|
| d = p.as_dict(attrs=['exe', 'name'])
|
| assert sorted(d.keys()) == ['exe', 'name']
|
|
|
| p = psutil.Process(min(psutil.pids()))
|
| d = p.as_dict(attrs=['net_connections'], ad_value='foo')
|
| if not isinstance(d['net_connections'], list):
|
| assert d['net_connections'] == 'foo'
|
|
|
|
|
| with mock.patch(
|
| 'psutil.Process.nice', create=True, side_effect=psutil.AccessDenied
|
| ):
|
| assert p.as_dict(attrs=["nice"], ad_value=1) == {"nice": 1}
|
|
|
|
|
| with mock.patch(
|
| 'psutil.Process.nice',
|
| create=True,
|
| side_effect=psutil.NoSuchProcess(p.pid, "name"),
|
| ):
|
| with pytest.raises(psutil.NoSuchProcess):
|
| p.as_dict(attrs=["nice"])
|
|
|
|
|
| with mock.patch(
|
| 'psutil.Process.nice',
|
| create=True,
|
| side_effect=psutil.ZombieProcess(p.pid, "name"),
|
| ):
|
| assert p.as_dict(attrs=["nice"], ad_value="foo") == {"nice": "foo"}
|
|
|
|
|
|
|
| with mock.patch(
|
| 'psutil.Process.nice', create=True, side_effect=NotImplementedError
|
| ):
|
| d = p.as_dict()
|
| assert 'nice' not in list(d.keys())
|
|
|
| with pytest.raises(NotImplementedError):
|
| p.as_dict(attrs=["nice"])
|
|
|
|
|
| with pytest.raises(TypeError):
|
| p.as_dict('name')
|
| with pytest.raises(ValueError):
|
| p.as_dict(['foo'])
|
| with pytest.raises(ValueError):
|
| p.as_dict(['foo', 'bar'])
|
|
|
| def test_oneshot(self):
|
| p = psutil.Process()
|
| with mock.patch("psutil._psplatform.Process.cpu_times") as m:
|
| with p.oneshot():
|
| p.cpu_times()
|
| p.cpu_times()
|
| assert m.call_count == 1
|
|
|
| with mock.patch("psutil._psplatform.Process.cpu_times") as m:
|
| p.cpu_times()
|
| p.cpu_times()
|
| assert m.call_count == 2
|
|
|
| def test_oneshot_twice(self):
|
|
|
|
|
| p = psutil.Process()
|
| with mock.patch("psutil._psplatform.Process.cpu_times") as m1:
|
| with mock.patch("psutil._psplatform.Process.oneshot_enter") as m2:
|
| with p.oneshot():
|
| p.cpu_times()
|
| p.cpu_times()
|
| with p.oneshot():
|
| p.cpu_times()
|
| p.cpu_times()
|
| assert m1.call_count == 1
|
| assert m2.call_count == 1
|
|
|
| with mock.patch("psutil._psplatform.Process.cpu_times") as m:
|
| p.cpu_times()
|
| p.cpu_times()
|
| assert m.call_count == 2
|
|
|
| def test_oneshot_cache(self):
|
|
|
|
|
|
|
| p1, p2 = self.spawn_children_pair()
|
| p1_ppid = p1.ppid()
|
| p2_ppid = p2.ppid()
|
| assert p1_ppid != p2_ppid
|
| with p1.oneshot():
|
| assert p1.ppid() == p1_ppid
|
| assert p2.ppid() == p2_ppid
|
| with p2.oneshot():
|
| assert p1.ppid() == p1_ppid
|
| assert p2.ppid() == p2_ppid
|
|
|
| def test_halfway_terminated_process(self):
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| def assert_raises_nsp(fun, fun_name):
|
| try:
|
| ret = fun()
|
| except psutil.ZombieProcess:
|
| raise
|
| except psutil.NoSuchProcess:
|
| pass
|
| except psutil.AccessDenied:
|
| if OPENBSD and fun_name in {'threads', 'num_threads'}:
|
| return None
|
| raise
|
| else:
|
|
|
| if WINDOWS and fun_name in {'exe', 'name'}:
|
| return None
|
| return pytest.fail(
|
| f"{fun!r} didn't raise NSP and returned {ret!r} instead"
|
| )
|
|
|
| p = self.spawn_psproc()
|
| p.terminate()
|
| p.wait()
|
| if WINDOWS:
|
| call_until(lambda: p.pid not in psutil.pids())
|
| self.assert_proc_gone(p)
|
|
|
| ns = process_namespace(p)
|
| for fun, name in ns.iter(ns.all):
|
| assert_raises_nsp(fun, name)
|
|
|
| @pytest.mark.skipif(not POSIX, reason="POSIX only")
|
| def test_zombie_process(self):
|
| _parent, zombie = self.spawn_zombie()
|
| self.assert_proc_zombie(zombie)
|
| if hasattr(psutil._psplatform.cext, "proc_is_zombie"):
|
| assert not psutil._psplatform.cext.proc_is_zombie(os.getpid())
|
| assert psutil._psplatform.cext.proc_is_zombie(zombie.pid)
|
|
|
| @pytest.mark.skipif(not POSIX, reason="POSIX only")
|
| def test_zombie_process_is_running_w_exc(self):
|
|
|
|
|
| p = psutil.Process()
|
| with mock.patch(
|
| "psutil.Process", side_effect=psutil.ZombieProcess(0)
|
| ) as m:
|
| assert p.is_running()
|
| assert m.called
|
|
|
| @pytest.mark.skipif(not POSIX, reason="POSIX only")
|
| def test_zombie_process_status_w_exc(self):
|
|
|
|
|
| p = psutil.Process()
|
| with mock.patch(
|
| "psutil._psplatform.Process.status",
|
| side_effect=psutil.ZombieProcess(0),
|
| ) as m:
|
| assert p.status() == psutil.STATUS_ZOMBIE
|
| assert m.called
|
|
|
| def test_reused_pid(self):
|
|
|
| subp = self.spawn_subproc()
|
| p = psutil.Process(subp.pid)
|
| p._ident = (p.pid, p.create_time() + 100)
|
|
|
| list(psutil.process_iter())
|
| assert p.pid in psutil._pmap
|
| assert not p.is_running()
|
|
|
|
|
|
|
| with mock.patch.object(psutil._common, "PSUTIL_DEBUG", True):
|
| with contextlib.redirect_stderr(io.StringIO()) as f:
|
| list(psutil.process_iter())
|
| assert (
|
| f"refreshing Process instance for reused PID {p.pid}"
|
| in f.getvalue()
|
| )
|
| assert p.pid not in psutil._pmap
|
|
|
| assert p != psutil.Process(subp.pid)
|
| msg = "process no longer exists and its PID has been reused"
|
| ns = process_namespace(p)
|
| for fun, name in ns.iter(ns.setters + ns.killers, clear_cache=False):
|
| with self.subTest(name=name):
|
| with pytest.raises(psutil.NoSuchProcess, match=msg):
|
| fun()
|
|
|
| assert "terminated + PID reused" in str(p)
|
| assert "terminated + PID reused" in repr(p)
|
|
|
| with pytest.raises(psutil.NoSuchProcess, match=msg):
|
| p.ppid()
|
| with pytest.raises(psutil.NoSuchProcess, match=msg):
|
| p.parent()
|
| with pytest.raises(psutil.NoSuchProcess, match=msg):
|
| p.parents()
|
| with pytest.raises(psutil.NoSuchProcess, match=msg):
|
| p.children()
|
|
|
| def test_pid_0(self):
|
|
|
| if 0 not in psutil.pids():
|
| with pytest.raises(psutil.NoSuchProcess):
|
| psutil.Process(0)
|
|
|
|
|
| assert not psutil.pid_exists(0)
|
| assert psutil.Process(1).ppid() == 0
|
| return
|
|
|
| p = psutil.Process(0)
|
| exc = psutil.AccessDenied if WINDOWS else ValueError
|
| with pytest.raises(exc):
|
| p.wait()
|
| with pytest.raises(exc):
|
| p.terminate()
|
| with pytest.raises(exc):
|
| p.suspend()
|
| with pytest.raises(exc):
|
| p.resume()
|
| with pytest.raises(exc):
|
| p.kill()
|
| with pytest.raises(exc):
|
| p.send_signal(signal.SIGTERM)
|
|
|
|
|
| ns = process_namespace(p)
|
| for fun, name in ns.iter(ns.getters + ns.setters):
|
| try:
|
| ret = fun()
|
| except psutil.AccessDenied:
|
| pass
|
| else:
|
| if name in {"uids", "gids"}:
|
| assert ret.real == 0
|
| elif name == "username":
|
| user = 'NT AUTHORITY\\SYSTEM' if WINDOWS else 'root'
|
| assert p.username() == user
|
| elif name == "name":
|
| assert name, name
|
|
|
| if not OPENBSD:
|
| assert 0 in psutil.pids()
|
| assert psutil.pid_exists(0)
|
|
|
| @pytest.mark.skipif(not HAS_ENVIRON, reason="not supported")
|
| def test_environ(self):
|
| def clean_dict(d):
|
| exclude = {"PLAT", "HOME"}
|
| if MACOS:
|
| exclude.update([
|
| "__CF_USER_TEXT_ENCODING",
|
| "VERSIONER_PYTHON_PREFER_32_BIT",
|
| "VERSIONER_PYTHON_VERSION",
|
| ])
|
| for name in list(d.keys()):
|
| if name in exclude or name.startswith("PYTEST_"):
|
| d.pop(name)
|
| return {
|
| k.replace("\r", "").replace("\n", ""): (
|
| v.replace("\r", "").replace("\n", "")
|
| )
|
| for k, v in d.items()
|
| }
|
|
|
| self.maxDiff = None
|
| p = psutil.Process()
|
| d1 = clean_dict(p.environ())
|
| d2 = clean_dict(os.environ.copy())
|
| if not OSX and GITHUB_ACTIONS:
|
| assert d1 == d2
|
|
|
| @pytest.mark.skipif(not HAS_ENVIRON, reason="not supported")
|
| @pytest.mark.skipif(not POSIX, reason="POSIX only")
|
| @pytest.mark.skipif(
|
| MACOS_11PLUS,
|
| reason="macOS 11+ can't get another process environment, issue #2084",
|
| )
|
| @pytest.mark.skipif(
|
| NETBSD, reason="sometimes fails on `assert is_running()`"
|
| )
|
| def test_weird_environ(self):
|
|
|
| code = textwrap.dedent("""
|
| #include <unistd.h>
|
| #include <fcntl.h>
|
|
|
| char * const argv[] = {"cat", 0};
|
| char * const envp[] = {"A=1", "X", "C=3", 0};
|
|
|
| int main(void) {
|
| // Close stderr on exec so parent can wait for the
|
| // execve to finish.
|
| if (fcntl(2, F_SETFD, FD_CLOEXEC) != 0)
|
| return 0;
|
| return execve("/bin/cat", argv, envp);
|
| }
|
| """)
|
| cexe = create_c_exe(self.get_testfn(), c_code=code)
|
| sproc = self.spawn_subproc(
|
| [cexe], stdin=subprocess.PIPE, stderr=subprocess.PIPE
|
| )
|
| p = psutil.Process(sproc.pid)
|
| wait_for_pid(p.pid)
|
| assert p.is_running()
|
|
|
| assert sproc.stderr.read() == b""
|
| if MACOS and CI_TESTING:
|
| try:
|
| env = p.environ()
|
| except psutil.AccessDenied:
|
|
|
|
|
| return
|
| else:
|
| env = p.environ()
|
| assert env == {"A": "1", "C": "3"}
|
| sproc.communicate()
|
| assert sproc.returncode == 0
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| class TestPopen(PsutilTestCase):
|
| """Tests for psutil.Popen class."""
|
|
|
| @classmethod
|
| def tearDownClass(cls):
|
| reap_children()
|
|
|
| @pytest.mark.skipif(MACOS and GITHUB_ACTIONS, reason="hangs on OSX + CI")
|
| def test_misc(self):
|
|
|
|
|
|
|
| cmd = [
|
| PYTHON_EXE,
|
| "-c",
|
| "import time; [time.sleep(0.1) for x in range(100)];",
|
| ]
|
| with psutil.Popen(
|
| cmd,
|
| stdout=subprocess.PIPE,
|
| stderr=subprocess.PIPE,
|
| env=PYTHON_EXE_ENV,
|
| ) as proc:
|
| proc.name()
|
| proc.cpu_times()
|
| proc.stdin
|
| assert dir(proc)
|
| with pytest.raises(AttributeError):
|
| proc.foo
|
| proc.terminate()
|
| if POSIX:
|
| assert proc.wait(5) == -signal.SIGTERM
|
| else:
|
| assert proc.wait(5) == signal.SIGTERM
|
|
|
| def test_ctx_manager(self):
|
| with psutil.Popen(
|
| [PYTHON_EXE, "-V"],
|
| stdout=subprocess.PIPE,
|
| stderr=subprocess.PIPE,
|
| stdin=subprocess.PIPE,
|
| env=PYTHON_EXE_ENV,
|
| ) as proc:
|
| proc.communicate()
|
| assert proc.stdout.closed
|
| assert proc.stderr.closed
|
| assert proc.stdin.closed
|
| assert proc.returncode == 0
|
|
|
| def test_kill_terminate(self):
|
|
|
|
|
|
|
| cmd = [
|
| PYTHON_EXE,
|
| "-c",
|
| "import time; [time.sleep(0.1) for x in range(100)];",
|
| ]
|
| with psutil.Popen(
|
| cmd,
|
| stdout=subprocess.PIPE,
|
| stderr=subprocess.PIPE,
|
| env=PYTHON_EXE_ENV,
|
| ) as proc:
|
| proc.terminate()
|
| proc.wait()
|
| with pytest.raises(psutil.NoSuchProcess):
|
| proc.terminate()
|
| with pytest.raises(psutil.NoSuchProcess):
|
| proc.kill()
|
| with pytest.raises(psutil.NoSuchProcess):
|
| proc.send_signal(signal.SIGTERM)
|
| if WINDOWS:
|
| with pytest.raises(psutil.NoSuchProcess):
|
| proc.send_signal(signal.CTRL_C_EVENT)
|
| with pytest.raises(psutil.NoSuchProcess):
|
| proc.send_signal(signal.CTRL_BREAK_EVENT)
|
|
|
| def test__getattribute__(self):
|
| cmd = [
|
| PYTHON_EXE,
|
| "-c",
|
| "import time; [time.sleep(0.1) for x in range(100)];",
|
| ]
|
| with psutil.Popen(
|
| cmd,
|
| stdout=subprocess.PIPE,
|
| stderr=subprocess.PIPE,
|
| env=PYTHON_EXE_ENV,
|
| ) as proc:
|
| proc.terminate()
|
| proc.wait()
|
| with pytest.raises(AttributeError):
|
| proc.foo
|
|
|