|
|
|
|
|
|
|
|
|
|
|
|
| """Iterate over all process PIDs and for each one of them invoke and
|
| test all psutil.Process() methods.
|
| """
|
|
|
| import enum
|
| import errno
|
| import multiprocessing
|
| import os
|
| import stat
|
| import time
|
| import traceback
|
|
|
| import psutil
|
| from psutil import AIX
|
| from psutil import BSD
|
| from psutil import FREEBSD
|
| from psutil import LINUX
|
| from psutil import MACOS
|
| from psutil import NETBSD
|
| from psutil import OPENBSD
|
| from psutil import OSX
|
| from psutil import POSIX
|
| from psutil import WINDOWS
|
| from psutil.tests import CI_TESTING
|
| from psutil.tests import PYTEST_PARALLEL
|
| from psutil.tests import VALID_PROC_STATUSES
|
| from psutil.tests import PsutilTestCase
|
| from psutil.tests import check_connection_ntuple
|
| from psutil.tests import create_sockets
|
| from psutil.tests import is_namedtuple
|
| from psutil.tests import is_win_secure_system_proc
|
| from psutil.tests import process_namespace
|
| from psutil.tests import pytest
|
|
|
|
|
|
|
| USE_PROC_POOL = LINUX and not CI_TESTING and not PYTEST_PARALLEL
|
|
|
|
|
| def proc_info(pid):
|
| tcase = PsutilTestCase()
|
|
|
| def check_exception(exc, proc, name, ppid):
|
| assert exc.pid == pid
|
| if exc.name is not None:
|
| assert exc.name == name
|
| if isinstance(exc, psutil.ZombieProcess):
|
| tcase.assert_proc_zombie(proc)
|
| if exc.ppid is not None:
|
| assert exc.ppid >= 0
|
| assert exc.ppid == ppid
|
| elif isinstance(exc, psutil.NoSuchProcess):
|
| tcase.assert_proc_gone(proc)
|
| str(exc)
|
| repr(exc)
|
|
|
| def do_wait():
|
| if pid != 0:
|
| try:
|
| proc.wait(0)
|
| except psutil.Error as exc:
|
| check_exception(exc, proc, name, ppid)
|
|
|
| try:
|
| proc = psutil.Process(pid)
|
| except psutil.NoSuchProcess:
|
| tcase.assert_pid_gone(pid)
|
| return {}
|
| try:
|
| d = proc.as_dict(['ppid', 'name'])
|
| except psutil.NoSuchProcess:
|
| tcase.assert_proc_gone(proc)
|
| else:
|
| name, ppid = d['name'], d['ppid']
|
| info = {'pid': proc.pid}
|
| ns = process_namespace(proc)
|
|
|
|
|
| for fun, fun_name in ns.iter(ns.getters, clear_cache=False):
|
| try:
|
| info[fun_name] = fun()
|
| except psutil.Error as exc:
|
| check_exception(exc, proc, name, ppid)
|
| continue
|
| do_wait()
|
| return info
|
|
|
|
|
| class TestFetchAllProcesses(PsutilTestCase):
|
| """Test which iterates over all running processes and performs
|
| some sanity checks against Process API's returned values.
|
| Uses a process pool to get info about all processes.
|
| """
|
|
|
| def setUp(self):
|
| psutil._set_debug(False)
|
|
|
|
|
| if USE_PROC_POOL:
|
|
|
|
|
|
|
|
|
|
|
|
|
| multiprocessing.set_start_method('fork')
|
| self.pool = multiprocessing.Pool()
|
|
|
| def tearDown(self):
|
| psutil._set_debug(True)
|
| if USE_PROC_POOL:
|
| self.pool.terminate()
|
| self.pool.join()
|
|
|
| def iter_proc_info(self):
|
|
|
|
|
| from psutil.tests.test_process_all import proc_info
|
|
|
| if USE_PROC_POOL:
|
| return self.pool.imap_unordered(proc_info, psutil.pids())
|
| else:
|
| ls = [proc_info(pid) for pid in psutil.pids()]
|
| return ls
|
|
|
| def test_all(self):
|
| failures = []
|
| for info in self.iter_proc_info():
|
| for name, value in info.items():
|
| meth = getattr(self, name)
|
| try:
|
| meth(value, info)
|
| except Exception:
|
| s = '\n' + '=' * 70 + '\n'
|
| s += (
|
| "FAIL: name=test_{}, pid={}, ret={}\ninfo={}\n".format(
|
| name,
|
| info['pid'],
|
| repr(value),
|
| info,
|
| )
|
| )
|
| s += '-' * 70
|
| s += f"\n{traceback.format_exc()}"
|
| s = "\n".join((" " * 4) + i for i in s.splitlines()) + "\n"
|
| failures.append(s)
|
| else:
|
| if value not in (0, 0.0, [], None, '', {}):
|
| assert value, value
|
| if failures:
|
| return pytest.fail(''.join(failures))
|
|
|
| def cmdline(self, ret, info):
|
| assert isinstance(ret, list)
|
| for part in ret:
|
| assert isinstance(part, str)
|
|
|
| def exe(self, ret, info):
|
| assert isinstance(ret, str)
|
| assert ret.strip() == ret
|
| if ret:
|
| if WINDOWS and not ret.endswith('.exe'):
|
| return
|
| assert os.path.isabs(ret), ret
|
|
|
|
|
|
|
| if POSIX and os.path.isfile(ret):
|
| if hasattr(os, 'access') and hasattr(os, "X_OK"):
|
|
|
| try:
|
| assert os.access(ret, os.X_OK)
|
| except AssertionError:
|
| if os.path.exists(ret) and not CI_TESTING:
|
| raise
|
|
|
| def pid(self, ret, info):
|
| assert isinstance(ret, int)
|
| assert ret >= 0
|
|
|
| def ppid(self, ret, info):
|
| assert isinstance(ret, int)
|
| assert ret >= 0
|
| proc_info(ret)
|
|
|
| def name(self, ret, info):
|
| assert isinstance(ret, str)
|
| if WINDOWS and not ret and is_win_secure_system_proc(info['pid']):
|
|
|
| return
|
|
|
| if not AIX:
|
| assert ret, repr(ret)
|
|
|
| def create_time(self, ret, info):
|
| assert isinstance(ret, float)
|
| try:
|
| assert ret >= 0
|
| except AssertionError:
|
|
|
| if OPENBSD and info['status'] == psutil.STATUS_ZOMBIE:
|
| pass
|
| else:
|
| raise
|
|
|
|
|
|
|
|
|
| time.strftime("%Y %m %d %H:%M:%S", time.localtime(ret))
|
|
|
| def uids(self, ret, info):
|
| assert is_namedtuple(ret)
|
| for uid in ret:
|
| assert isinstance(uid, int)
|
| assert uid >= 0
|
|
|
| def gids(self, ret, info):
|
| assert is_namedtuple(ret)
|
|
|
|
|
| for gid in ret:
|
| assert isinstance(gid, int)
|
| if not MACOS and not NETBSD:
|
| assert gid >= 0
|
|
|
| def username(self, ret, info):
|
| assert isinstance(ret, str)
|
| assert ret.strip() == ret
|
| assert ret.strip()
|
|
|
| def status(self, ret, info):
|
| assert isinstance(ret, str)
|
| assert ret, ret
|
| assert ret != '?'
|
| assert ret in VALID_PROC_STATUSES
|
|
|
| def io_counters(self, ret, info):
|
| assert is_namedtuple(ret)
|
| for field in ret:
|
| assert isinstance(field, int)
|
| if field != -1:
|
| assert field >= 0
|
|
|
| def ionice(self, ret, info):
|
| if LINUX:
|
| assert isinstance(ret.ioclass, int)
|
| assert isinstance(ret.value, int)
|
| assert ret.ioclass >= 0
|
| assert ret.value >= 0
|
| else:
|
| choices = [
|
| psutil.IOPRIO_VERYLOW,
|
| psutil.IOPRIO_LOW,
|
| psutil.IOPRIO_NORMAL,
|
| psutil.IOPRIO_HIGH,
|
| ]
|
| assert isinstance(ret, int)
|
| assert ret >= 0
|
| assert ret in choices
|
|
|
| def num_threads(self, ret, info):
|
| assert isinstance(ret, int)
|
| if WINDOWS and ret == 0 and is_win_secure_system_proc(info['pid']):
|
|
|
| return
|
| assert ret >= 1
|
|
|
| def threads(self, ret, info):
|
| assert isinstance(ret, list)
|
| for t in ret:
|
| assert is_namedtuple(t)
|
| assert t.id >= 0
|
| assert t.user_time >= 0
|
| assert t.system_time >= 0
|
| for field in t:
|
| assert isinstance(field, (int, float))
|
|
|
| def cpu_times(self, ret, info):
|
| assert is_namedtuple(ret)
|
| for n in ret:
|
| assert isinstance(n, float)
|
| assert n >= 0
|
|
|
|
|
| def cpu_percent(self, ret, info):
|
| assert isinstance(ret, float)
|
| assert 0.0 <= ret <= 100.0, ret
|
|
|
| def cpu_num(self, ret, info):
|
| assert isinstance(ret, int)
|
| if FREEBSD and ret == -1:
|
| return
|
| assert ret >= 0
|
| if psutil.cpu_count() == 1:
|
| assert ret == 0
|
| assert ret in list(range(psutil.cpu_count()))
|
|
|
| def memory_info(self, ret, info):
|
| assert is_namedtuple(ret)
|
| for value in ret:
|
| assert isinstance(value, int)
|
| assert value >= 0
|
| if WINDOWS:
|
| assert ret.peak_wset >= ret.wset
|
| assert ret.peak_paged_pool >= ret.paged_pool
|
| assert ret.peak_nonpaged_pool >= ret.nonpaged_pool
|
| assert ret.peak_pagefile >= ret.pagefile
|
|
|
| def memory_full_info(self, ret, info):
|
| assert is_namedtuple(ret)
|
| total = psutil.virtual_memory().total
|
| for name in ret._fields:
|
| value = getattr(ret, name)
|
| assert isinstance(value, int)
|
| assert value >= 0
|
| if LINUX or (OSX and name in {'vms', 'data'}):
|
|
|
|
|
| continue
|
| assert value <= total, name
|
|
|
| if LINUX:
|
| assert ret.pss >= ret.uss
|
|
|
| def open_files(self, ret, info):
|
| assert isinstance(ret, list)
|
| for f in ret:
|
| assert isinstance(f.fd, int)
|
| assert isinstance(f.path, str)
|
| assert f.path.strip() == f.path
|
| if WINDOWS:
|
| assert f.fd == -1
|
| elif LINUX:
|
| assert isinstance(f.position, int)
|
| assert isinstance(f.mode, str)
|
| assert isinstance(f.flags, int)
|
| assert f.position >= 0
|
| assert f.mode in {'r', 'w', 'a', 'r+', 'a+'}
|
| assert f.flags > 0
|
| elif BSD and not f.path:
|
|
|
| continue
|
| assert os.path.isabs(f.path), f
|
| try:
|
| st = os.stat(f.path)
|
| except FileNotFoundError:
|
| pass
|
| else:
|
| assert stat.S_ISREG(st.st_mode), f
|
|
|
| def num_fds(self, ret, info):
|
| assert isinstance(ret, int)
|
| assert ret >= 0
|
|
|
| def net_connections(self, ret, info):
|
| with create_sockets():
|
| assert len(ret) == len(set(ret))
|
| for conn in ret:
|
| assert is_namedtuple(conn)
|
| check_connection_ntuple(conn)
|
|
|
| def cwd(self, ret, info):
|
| assert isinstance(ret, str)
|
| assert ret.strip() == ret
|
| if ret:
|
| assert os.path.isabs(ret), ret
|
| try:
|
| st = os.stat(ret)
|
| except OSError as err:
|
| if WINDOWS and psutil._psplatform.is_permission_err(err):
|
| pass
|
|
|
| elif err.errno != errno.ENOENT:
|
| raise
|
| else:
|
| assert stat.S_ISDIR(st.st_mode)
|
|
|
| def memory_percent(self, ret, info):
|
| assert isinstance(ret, float)
|
| assert 0 <= ret <= 100, ret
|
|
|
| def is_running(self, ret, info):
|
| assert isinstance(ret, bool)
|
|
|
| def cpu_affinity(self, ret, info):
|
| assert isinstance(ret, list)
|
| assert ret != []
|
| cpus = list(range(psutil.cpu_count()))
|
| for n in ret:
|
| assert isinstance(n, int)
|
| assert n in cpus
|
|
|
| def terminal(self, ret, info):
|
| assert isinstance(ret, (str, type(None)))
|
| if ret is not None:
|
| assert os.path.isabs(ret), ret
|
| assert os.path.exists(ret), ret
|
|
|
| def memory_maps(self, ret, info):
|
| for nt in ret:
|
| assert isinstance(nt.addr, str)
|
| assert isinstance(nt.perms, str)
|
| assert isinstance(nt.path, str)
|
| for fname in nt._fields:
|
| value = getattr(nt, fname)
|
| if fname == 'path':
|
| if value.startswith(("[", "anon_inode:")):
|
| continue
|
| if BSD and value == "pvclock":
|
| continue
|
| assert os.path.isabs(nt.path), nt.path
|
|
|
|
|
|
|
| elif fname == 'addr':
|
| assert value, repr(value)
|
| elif fname == 'perms':
|
| if not WINDOWS:
|
| assert value, repr(value)
|
| else:
|
| assert isinstance(value, int)
|
| assert value >= 0
|
|
|
| def num_handles(self, ret, info):
|
| assert isinstance(ret, int)
|
| assert ret >= 0
|
|
|
| def nice(self, ret, info):
|
| assert isinstance(ret, int)
|
| if POSIX:
|
| assert -20 <= ret <= 20, ret
|
| else:
|
| priorities = [
|
| getattr(psutil, x)
|
| for x in dir(psutil)
|
| if x.endswith('_PRIORITY_CLASS')
|
| ]
|
| assert ret in priorities
|
| assert isinstance(ret, enum.IntEnum)
|
|
|
| def num_ctx_switches(self, ret, info):
|
| assert is_namedtuple(ret)
|
| for value in ret:
|
| assert isinstance(value, int)
|
| assert value >= 0
|
|
|
| def rlimit(self, ret, info):
|
| assert isinstance(ret, tuple)
|
| assert len(ret) == 2
|
| assert ret[0] >= -1
|
| assert ret[1] >= -1
|
|
|
| def environ(self, ret, info):
|
| assert isinstance(ret, dict)
|
| for k, v in ret.items():
|
| assert isinstance(k, str)
|
| assert isinstance(v, str)
|
|
|
|
|
| class TestPidsRange(PsutilTestCase):
|
| """Given pid_exists() return value for a range of PIDs which may or
|
| may not exist, make sure that psutil.Process() and psutil.pids()
|
| agree with pid_exists(). This guarantees that the 3 APIs are all
|
| consistent with each other. See:
|
| https://github.com/giampaolo/psutil/issues/2359
|
|
|
| XXX - Note about Windows: it turns out there are some "hidden" PIDs
|
| which are not returned by psutil.pids() and are also not revealed
|
| by taskmgr.exe and ProcessHacker, still they can be instantiated by
|
| psutil.Process() and queried. One of such PIDs is "conhost.exe".
|
| Running as_dict() for it reveals that some Process() APIs
|
| erroneously raise NoSuchProcess, so we know we have problem there.
|
| Let's ignore this for now, since it's quite a corner case (who even
|
| imagined hidden PIDs existed on Windows?).
|
| """
|
|
|
| def setUp(self):
|
| psutil._set_debug(False)
|
|
|
| def tearDown(self):
|
| psutil._set_debug(True)
|
|
|
| def test_it(self):
|
| def is_linux_tid(pid):
|
| try:
|
| f = open(f"/proc/{pid}/status", "rb")
|
| except FileNotFoundError:
|
| return False
|
| else:
|
| with f:
|
| for line in f:
|
| if line.startswith(b"Tgid:"):
|
| tgid = int(line.split()[1])
|
|
|
|
|
| return tgid != pid
|
| raise ValueError("'Tgid' line not found")
|
|
|
| def check(pid):
|
|
|
|
|
|
|
|
|
| x = 3
|
| while True:
|
| exists = psutil.pid_exists(pid)
|
| try:
|
| if exists:
|
| psutil.Process(pid)
|
| if not WINDOWS:
|
| assert pid in psutil.pids()
|
| else:
|
|
|
|
|
|
|
| if not OPENBSD:
|
| with pytest.raises(psutil.NoSuchProcess):
|
| psutil.Process(pid)
|
| if not WINDOWS:
|
| assert pid not in psutil.pids()
|
| except (psutil.Error, AssertionError):
|
| x -= 1
|
| if x == 0:
|
| raise
|
| else:
|
| return
|
|
|
| for pid in range(1, 3000):
|
| if LINUX and is_linux_tid(pid):
|
|
|
|
|
|
|
| continue
|
| check(pid)
|
|
|