|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| """Tests specific to all BSD platforms."""
|
|
|
| import datetime
|
| import os
|
| import re
|
| import shutil
|
| import time
|
|
|
| import psutil
|
| from psutil import BSD
|
| from psutil import FREEBSD
|
| from psutil import NETBSD
|
| from psutil import OPENBSD
|
| from psutil.tests import HAS_BATTERY
|
| from psutil.tests import TOLERANCE_SYS_MEM
|
| from psutil.tests import PsutilTestCase
|
| from psutil.tests import pytest
|
| from psutil.tests import retry_on_failure
|
| from psutil.tests import sh
|
| from psutil.tests import spawn_subproc
|
| from psutil.tests import terminate
|
|
|
| if BSD:
|
| PAGESIZE = psutil._psplatform.cext.getpagesize()
|
|
|
| MUSE_AVAILABLE = os.getuid() == 0 and shutil.which("muse")
|
| else:
|
| PAGESIZE = None
|
| MUSE_AVAILABLE = False
|
|
|
|
|
| def sysctl(cmdline):
|
| """Expects a sysctl command with an argument and parse the result
|
| returning only the value of interest.
|
| """
|
| result = sh("sysctl " + cmdline)
|
| if FREEBSD:
|
| result = result[result.find(": ") + 2 :]
|
| elif OPENBSD or NETBSD:
|
| result = result[result.find("=") + 1 :]
|
| try:
|
| return int(result)
|
| except ValueError:
|
| return result
|
|
|
|
|
| def muse(field):
|
| """Thin wrapper around 'muse' cmdline utility."""
|
| out = sh('muse')
|
| for line in out.split('\n'):
|
| if line.startswith(field):
|
| break
|
| else:
|
| raise ValueError("line not found")
|
| return int(line.split()[1])
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| @pytest.mark.skipif(not BSD, reason="BSD only")
|
| class BSDTestCase(PsutilTestCase):
|
| """Generic tests common to all BSD variants."""
|
|
|
| @classmethod
|
| def setUpClass(cls):
|
| cls.pid = spawn_subproc().pid
|
|
|
| @classmethod
|
| def tearDownClass(cls):
|
| terminate(cls.pid)
|
|
|
| @pytest.mark.skipif(NETBSD, reason="-o lstart doesn't work on NETBSD")
|
| def test_process_create_time(self):
|
| output = sh(f"ps -o lstart -p {self.pid}")
|
| start_ps = output.replace('STARTED', '').strip()
|
| start_psutil = psutil.Process(self.pid).create_time()
|
| start_psutil = time.strftime(
|
| "%a %b %e %H:%M:%S %Y", time.localtime(start_psutil)
|
| )
|
| assert start_ps == start_psutil
|
|
|
| def test_disks(self):
|
|
|
|
|
| def df(path):
|
| out = sh(f'df -k "{path}"').strip()
|
| lines = out.split('\n')
|
| lines.pop(0)
|
| line = lines.pop(0)
|
| dev, total, used, free = line.split()[:4]
|
| if dev == 'none':
|
| dev = ''
|
| total = int(total) * 1024
|
| used = int(used) * 1024
|
| free = int(free) * 1024
|
| return dev, total, used, free
|
|
|
| for part in psutil.disk_partitions(all=False):
|
| usage = psutil.disk_usage(part.mountpoint)
|
| dev, total, used, free = df(part.mountpoint)
|
| assert part.device == dev
|
| assert usage.total == total
|
|
|
| if abs(usage.free - free) > 10 * 1024 * 1024:
|
| return pytest.fail(f"psutil={usage.free}, df={free}")
|
| if abs(usage.used - used) > 10 * 1024 * 1024:
|
| return pytest.fail(f"psutil={usage.used}, df={used}")
|
|
|
| @pytest.mark.skipif(
|
| not shutil.which("sysctl"), reason="sysctl cmd not available"
|
| )
|
| def test_cpu_count_logical(self):
|
| syst = sysctl("hw.ncpu")
|
| assert psutil.cpu_count(logical=True) == syst
|
|
|
| @pytest.mark.skipif(
|
| not shutil.which("sysctl"), reason="sysctl cmd not available"
|
| )
|
| @pytest.mark.skipif(
|
| NETBSD, reason="skipped on NETBSD"
|
| )
|
| def test_virtual_memory_total(self):
|
| num = sysctl('hw.physmem')
|
| assert num == psutil.virtual_memory().total
|
|
|
| @pytest.mark.skipif(
|
| not shutil.which("ifconfig"), reason="ifconfig cmd not available"
|
| )
|
| def test_net_if_stats(self):
|
| for name, stats in psutil.net_if_stats().items():
|
| try:
|
| out = sh(f"ifconfig {name}")
|
| except RuntimeError:
|
| pass
|
| else:
|
| assert stats.isup == ('RUNNING' in out)
|
| if "mtu" in out:
|
| assert stats.mtu == int(re.findall(r'mtu (\d+)', out)[0])
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| @pytest.mark.skipif(not FREEBSD, reason="FREEBSD only")
|
| class FreeBSDPsutilTestCase(PsutilTestCase):
|
| @classmethod
|
| def setUpClass(cls):
|
| cls.pid = spawn_subproc().pid
|
|
|
| @classmethod
|
| def tearDownClass(cls):
|
| terminate(cls.pid)
|
|
|
| @retry_on_failure()
|
| def test_memory_maps(self):
|
| out = sh(f"procstat -v {self.pid}")
|
| maps = psutil.Process(self.pid).memory_maps(grouped=False)
|
| lines = out.split('\n')[1:]
|
| while lines:
|
| line = lines.pop()
|
| fields = line.split()
|
| _, start, stop, _perms, res = fields[:5]
|
| map = maps.pop()
|
| assert f"{start}-{stop}" == map.addr
|
| assert int(res) == map.rss
|
| if not map.path.startswith('['):
|
| assert fields[10] == map.path
|
|
|
| def test_exe(self):
|
| out = sh(f"procstat -b {self.pid}")
|
| assert psutil.Process(self.pid).exe() == out.split('\n')[1].split()[-1]
|
|
|
| def test_cmdline(self):
|
| out = sh(f"procstat -c {self.pid}")
|
| assert ' '.join(psutil.Process(self.pid).cmdline()) == ' '.join(
|
| out.split('\n')[1].split()[2:]
|
| )
|
|
|
| def test_uids_gids(self):
|
| out = sh(f"procstat -s {self.pid}")
|
| euid, ruid, suid, egid, rgid, sgid = out.split('\n')[1].split()[2:8]
|
| p = psutil.Process(self.pid)
|
| uids = p.uids()
|
| gids = p.gids()
|
| assert uids.real == int(ruid)
|
| assert uids.effective == int(euid)
|
| assert uids.saved == int(suid)
|
| assert gids.real == int(rgid)
|
| assert gids.effective == int(egid)
|
| assert gids.saved == int(sgid)
|
|
|
| @retry_on_failure()
|
| def test_ctx_switches(self):
|
| tested = []
|
| out = sh(f"procstat -r {self.pid}")
|
| p = psutil.Process(self.pid)
|
| for line in out.split('\n'):
|
| line = line.lower().strip()
|
| if ' voluntary context' in line:
|
| pstat_value = int(line.split()[-1])
|
| psutil_value = p.num_ctx_switches().voluntary
|
| assert pstat_value == psutil_value
|
| tested.append(None)
|
| elif ' involuntary context' in line:
|
| pstat_value = int(line.split()[-1])
|
| psutil_value = p.num_ctx_switches().involuntary
|
| assert pstat_value == psutil_value
|
| tested.append(None)
|
| if len(tested) != 2:
|
| raise RuntimeError("couldn't find lines match in procstat out")
|
|
|
| @retry_on_failure()
|
| def test_cpu_times(self):
|
| tested = []
|
| out = sh(f"procstat -r {self.pid}")
|
| p = psutil.Process(self.pid)
|
| for line in out.split('\n'):
|
| line = line.lower().strip()
|
| if 'user time' in line:
|
| pstat_value = float('0.' + line.split()[-1].split('.')[-1])
|
| psutil_value = p.cpu_times().user
|
| assert pstat_value == psutil_value
|
| tested.append(None)
|
| elif 'system time' in line:
|
| pstat_value = float('0.' + line.split()[-1].split('.')[-1])
|
| psutil_value = p.cpu_times().system
|
| assert pstat_value == psutil_value
|
| tested.append(None)
|
| if len(tested) != 2:
|
| raise RuntimeError("couldn't find lines match in procstat out")
|
|
|
|
|
| @pytest.mark.skipif(not FREEBSD, reason="FREEBSD only")
|
| class FreeBSDSystemTestCase(PsutilTestCase):
|
| @staticmethod
|
| def parse_swapinfo():
|
|
|
| output = sh("swapinfo -k").splitlines()[-1]
|
| parts = re.split(r'\s+', output)
|
|
|
| if not parts:
|
| raise ValueError(f"Can't parse swapinfo: {output}")
|
|
|
|
|
| total, used, free = (int(p) * 1024 for p in parts[1:4])
|
| return total, used, free
|
|
|
| def test_cpu_frequency_against_sysctl(self):
|
|
|
|
|
| sensor = "dev.cpu.0.freq"
|
| try:
|
| sysctl_result = int(sysctl(sensor))
|
| except RuntimeError:
|
| return pytest.skip("frequencies not supported by kernel")
|
| assert psutil.cpu_freq().current == sysctl_result
|
|
|
| sensor = "dev.cpu.0.freq_levels"
|
| sysctl_result = sysctl(sensor)
|
|
|
|
|
|
|
| max_freq = int(sysctl_result.split()[0].split("/")[0])
|
| min_freq = int(sysctl_result.split()[-1].split("/")[0])
|
| assert psutil.cpu_freq().max == max_freq
|
| assert psutil.cpu_freq().min == min_freq
|
|
|
|
|
|
|
| @retry_on_failure()
|
| def test_vmem_active(self):
|
| syst = sysctl("vm.stats.vm.v_active_count") * PAGESIZE
|
| assert abs(psutil.virtual_memory().active - syst) < TOLERANCE_SYS_MEM
|
|
|
| @retry_on_failure()
|
| def test_vmem_inactive(self):
|
| syst = sysctl("vm.stats.vm.v_inactive_count") * PAGESIZE
|
| assert abs(psutil.virtual_memory().inactive - syst) < TOLERANCE_SYS_MEM
|
|
|
| @retry_on_failure()
|
| def test_vmem_wired(self):
|
| syst = sysctl("vm.stats.vm.v_wire_count") * PAGESIZE
|
| assert abs(psutil.virtual_memory().wired - syst) < TOLERANCE_SYS_MEM
|
|
|
| @retry_on_failure()
|
| def test_vmem_cached(self):
|
| syst = sysctl("vm.stats.vm.v_cache_count") * PAGESIZE
|
| assert abs(psutil.virtual_memory().cached - syst) < TOLERANCE_SYS_MEM
|
|
|
| @retry_on_failure()
|
| def test_vmem_free(self):
|
| syst = sysctl("vm.stats.vm.v_free_count") * PAGESIZE
|
| assert abs(psutil.virtual_memory().free - syst) < TOLERANCE_SYS_MEM
|
|
|
| @retry_on_failure()
|
| def test_vmem_buffers(self):
|
| syst = sysctl("vfs.bufspace")
|
| assert abs(psutil.virtual_memory().buffers - syst) < TOLERANCE_SYS_MEM
|
|
|
|
|
|
|
| @pytest.mark.skipif(not MUSE_AVAILABLE, reason="muse not installed")
|
| def test_muse_vmem_total(self):
|
| num = muse('Total')
|
| assert psutil.virtual_memory().total == num
|
|
|
| @pytest.mark.skipif(not MUSE_AVAILABLE, reason="muse not installed")
|
| @retry_on_failure()
|
| def test_muse_vmem_active(self):
|
| num = muse('Active')
|
| assert abs(psutil.virtual_memory().active - num) < TOLERANCE_SYS_MEM
|
|
|
| @pytest.mark.skipif(not MUSE_AVAILABLE, reason="muse not installed")
|
| @retry_on_failure()
|
| def test_muse_vmem_inactive(self):
|
| num = muse('Inactive')
|
| assert abs(psutil.virtual_memory().inactive - num) < TOLERANCE_SYS_MEM
|
|
|
| @pytest.mark.skipif(not MUSE_AVAILABLE, reason="muse not installed")
|
| @retry_on_failure()
|
| def test_muse_vmem_wired(self):
|
| num = muse('Wired')
|
| assert abs(psutil.virtual_memory().wired - num) < TOLERANCE_SYS_MEM
|
|
|
| @pytest.mark.skipif(not MUSE_AVAILABLE, reason="muse not installed")
|
| @retry_on_failure()
|
| def test_muse_vmem_cached(self):
|
| num = muse('Cache')
|
| assert abs(psutil.virtual_memory().cached - num) < TOLERANCE_SYS_MEM
|
|
|
| @pytest.mark.skipif(not MUSE_AVAILABLE, reason="muse not installed")
|
| @retry_on_failure()
|
| def test_muse_vmem_free(self):
|
| num = muse('Free')
|
| assert abs(psutil.virtual_memory().free - num) < TOLERANCE_SYS_MEM
|
|
|
| @pytest.mark.skipif(not MUSE_AVAILABLE, reason="muse not installed")
|
| @retry_on_failure()
|
| def test_muse_vmem_buffers(self):
|
| num = muse('Buffer')
|
| assert abs(psutil.virtual_memory().buffers - num) < TOLERANCE_SYS_MEM
|
|
|
| def test_cpu_stats_ctx_switches(self):
|
| assert (
|
| abs(
|
| psutil.cpu_stats().ctx_switches
|
| - sysctl('vm.stats.sys.v_swtch')
|
| )
|
| < 1000
|
| )
|
|
|
| def test_cpu_stats_interrupts(self):
|
| assert (
|
| abs(psutil.cpu_stats().interrupts - sysctl('vm.stats.sys.v_intr'))
|
| < 1000
|
| )
|
|
|
| def test_cpu_stats_soft_interrupts(self):
|
| assert (
|
| abs(
|
| psutil.cpu_stats().soft_interrupts
|
| - sysctl('vm.stats.sys.v_soft')
|
| )
|
| < 1000
|
| )
|
|
|
| @retry_on_failure()
|
| def test_cpu_stats_syscalls(self):
|
|
|
| assert (
|
| abs(psutil.cpu_stats().syscalls - sysctl('vm.stats.sys.v_syscall'))
|
| < 200000
|
| )
|
|
|
|
|
|
|
| def test_swapmem_free(self):
|
| _total, _used, free = self.parse_swapinfo()
|
| assert abs(psutil.swap_memory().free - free) < TOLERANCE_SYS_MEM
|
|
|
| def test_swapmem_used(self):
|
| _total, used, _free = self.parse_swapinfo()
|
| assert abs(psutil.swap_memory().used - used) < TOLERANCE_SYS_MEM
|
|
|
| def test_swapmem_total(self):
|
| total, _used, _free = self.parse_swapinfo()
|
| assert abs(psutil.swap_memory().total - total) < TOLERANCE_SYS_MEM
|
|
|
|
|
|
|
| def test_boot_time(self):
|
| s = sysctl('sysctl kern.boottime')
|
| s = s[s.find(" sec = ") + 7 :]
|
| s = s[: s.find(',')]
|
| btime = int(s)
|
| assert btime == psutil.boot_time()
|
|
|
|
|
|
|
| @pytest.mark.skipif(not HAS_BATTERY, reason="no battery")
|
| def test_sensors_battery(self):
|
| def secs2hours(secs):
|
| m, _s = divmod(secs, 60)
|
| h, m = divmod(m, 60)
|
| return f"{int(h)}:{int(m):02}"
|
|
|
| out = sh("acpiconf -i 0")
|
| fields = {x.split('\t')[0]: x.split('\t')[-1] for x in out.split("\n")}
|
| metrics = psutil.sensors_battery()
|
| percent = int(fields['Remaining capacity:'].replace('%', ''))
|
| remaining_time = fields['Remaining time:']
|
| assert metrics.percent == percent
|
| if remaining_time == 'unknown':
|
| assert metrics.secsleft == psutil.POWER_TIME_UNLIMITED
|
| else:
|
| assert secs2hours(metrics.secsleft) == remaining_time
|
|
|
| @pytest.mark.skipif(not HAS_BATTERY, reason="no battery")
|
| def test_sensors_battery_against_sysctl(self):
|
| assert psutil.sensors_battery().percent == sysctl(
|
| "hw.acpi.battery.life"
|
| )
|
| assert psutil.sensors_battery().power_plugged == (
|
| sysctl("hw.acpi.acline") == 1
|
| )
|
| secsleft = psutil.sensors_battery().secsleft
|
| if secsleft < 0:
|
| assert sysctl("hw.acpi.battery.time") == -1
|
| else:
|
| assert secsleft == sysctl("hw.acpi.battery.time") * 60
|
|
|
| @pytest.mark.skipif(HAS_BATTERY, reason="has battery")
|
| def test_sensors_battery_no_battery(self):
|
|
|
|
|
|
|
| with pytest.raises(RuntimeError):
|
| sysctl("hw.acpi.battery.life")
|
| sysctl("hw.acpi.battery.time")
|
| sysctl("hw.acpi.acline")
|
| assert psutil.sensors_battery() is None
|
|
|
|
|
|
|
| def test_sensors_temperatures_against_sysctl(self):
|
| num_cpus = psutil.cpu_count(True)
|
| for cpu in range(num_cpus):
|
| sensor = f"dev.cpu.{cpu}.temperature"
|
|
|
| try:
|
| sysctl_result = int(float(sysctl(sensor)[:-1]))
|
| except RuntimeError:
|
| return pytest.skip("temperatures not supported by kernel")
|
| assert (
|
| abs(
|
| psutil.sensors_temperatures()["coretemp"][cpu].current
|
| - sysctl_result
|
| )
|
| < 10
|
| )
|
|
|
| sensor = f"dev.cpu.{cpu}.coretemp.tjmax"
|
| sysctl_result = int(float(sysctl(sensor)[:-1]))
|
| assert (
|
| psutil.sensors_temperatures()["coretemp"][cpu].high
|
| == sysctl_result
|
| )
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| @pytest.mark.skipif(not OPENBSD, reason="OPENBSD only")
|
| class OpenBSDTestCase(PsutilTestCase):
|
| def test_boot_time(self):
|
| s = sysctl('kern.boottime')
|
| sys_bt = datetime.datetime.strptime(s, "%a %b %d %H:%M:%S %Y")
|
| psutil_bt = datetime.datetime.fromtimestamp(psutil.boot_time())
|
| assert sys_bt == psutil_bt
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| @pytest.mark.skipif(not NETBSD, reason="NETBSD only")
|
| class NetBSDTestCase(PsutilTestCase):
|
| @staticmethod
|
| def parse_meminfo(look_for):
|
| with open('/proc/meminfo') as f:
|
| for line in f:
|
| if line.startswith(look_for):
|
| return int(line.split()[1]) * 1024
|
| raise ValueError(f"can't find {look_for}")
|
|
|
|
|
|
|
| def test_vmem_total(self):
|
| assert psutil.virtual_memory().total == self.parse_meminfo("MemTotal:")
|
|
|
| def test_vmem_free(self):
|
| assert (
|
| abs(psutil.virtual_memory().free - self.parse_meminfo("MemFree:"))
|
| < TOLERANCE_SYS_MEM
|
| )
|
|
|
| def test_vmem_buffers(self):
|
| assert (
|
| abs(
|
| psutil.virtual_memory().buffers
|
| - self.parse_meminfo("Buffers:")
|
| )
|
| < TOLERANCE_SYS_MEM
|
| )
|
|
|
| def test_vmem_shared(self):
|
| assert (
|
| abs(
|
| psutil.virtual_memory().shared
|
| - self.parse_meminfo("MemShared:")
|
| )
|
| < TOLERANCE_SYS_MEM
|
| )
|
|
|
| def test_vmem_cached(self):
|
| assert (
|
| abs(psutil.virtual_memory().cached - self.parse_meminfo("Cached:"))
|
| < TOLERANCE_SYS_MEM
|
| )
|
|
|
|
|
|
|
| def test_swapmem_total(self):
|
| assert (
|
| abs(psutil.swap_memory().total - self.parse_meminfo("SwapTotal:"))
|
| < TOLERANCE_SYS_MEM
|
| )
|
|
|
| def test_swapmem_free(self):
|
| assert (
|
| abs(psutil.swap_memory().free - self.parse_meminfo("SwapFree:"))
|
| < TOLERANCE_SYS_MEM
|
| )
|
|
|
| def test_swapmem_used(self):
|
| smem = psutil.swap_memory()
|
| assert smem.used == smem.total - smem.free
|
|
|
|
|
|
|
| def test_cpu_stats_interrupts(self):
|
| with open('/proc/stat', 'rb') as f:
|
| for line in f:
|
| if line.startswith(b'intr'):
|
| interrupts = int(line.split()[1])
|
| break
|
| else:
|
| raise ValueError("couldn't find line")
|
| assert abs(psutil.cpu_stats().interrupts - interrupts) < 1000
|
|
|
| def test_cpu_stats_ctx_switches(self):
|
| with open('/proc/stat', 'rb') as f:
|
| for line in f:
|
| if line.startswith(b'ctxt'):
|
| ctx_switches = int(line.split()[1])
|
| break
|
| else:
|
| raise ValueError("couldn't find line")
|
| assert abs(psutil.cpu_stats().ctx_switches - ctx_switches) < 1000
|
|
|