|
|
|
|
|
|
|
|
| """Common objects shared by __init__.py and _ps*.py modules.
|
|
|
| Note: this module is imported by setup.py, so it should not import
|
| psutil or third-party modules.
|
| """
|
|
|
| import collections
|
| import enum
|
| import functools
|
| import os
|
| import socket
|
| import stat
|
| import sys
|
| import threading
|
| import warnings
|
| from collections import namedtuple
|
| from socket import AF_INET
|
| from socket import SOCK_DGRAM
|
| from socket import SOCK_STREAM
|
|
|
| try:
|
| from socket import AF_INET6
|
| except ImportError:
|
| AF_INET6 = None
|
| try:
|
| from socket import AF_UNIX
|
| except ImportError:
|
| AF_UNIX = None
|
|
|
|
|
| PSUTIL_DEBUG = bool(os.getenv('PSUTIL_DEBUG'))
|
| _DEFAULT = object()
|
|
|
|
|
| __all__ = [
|
|
|
| 'FREEBSD', 'BSD', 'LINUX', 'NETBSD', 'OPENBSD', 'MACOS', 'OSX', 'POSIX',
|
| 'SUNOS', 'WINDOWS',
|
|
|
| 'CONN_CLOSE', 'CONN_CLOSE_WAIT', 'CONN_CLOSING', 'CONN_ESTABLISHED',
|
| 'CONN_FIN_WAIT1', 'CONN_FIN_WAIT2', 'CONN_LAST_ACK', 'CONN_LISTEN',
|
| 'CONN_NONE', 'CONN_SYN_RECV', 'CONN_SYN_SENT', 'CONN_TIME_WAIT',
|
|
|
| 'NIC_DUPLEX_FULL', 'NIC_DUPLEX_HALF', 'NIC_DUPLEX_UNKNOWN',
|
|
|
| 'STATUS_DEAD', 'STATUS_DISK_SLEEP', 'STATUS_IDLE', 'STATUS_LOCKED',
|
| 'STATUS_RUNNING', 'STATUS_SLEEPING', 'STATUS_STOPPED', 'STATUS_SUSPENDED',
|
| 'STATUS_TRACING_STOP', 'STATUS_WAITING', 'STATUS_WAKE_KILL',
|
| 'STATUS_WAKING', 'STATUS_ZOMBIE', 'STATUS_PARKED',
|
|
|
| 'ENCODING', 'ENCODING_ERRS', 'AF_INET6',
|
|
|
| 'pconn', 'pcputimes', 'pctxsw', 'pgids', 'pio', 'pionice', 'popenfile',
|
| 'pthread', 'puids', 'sconn', 'scpustats', 'sdiskio', 'sdiskpart',
|
| 'sdiskusage', 'snetio', 'snicaddr', 'snicstats', 'sswap', 'suser',
|
|
|
| 'conn_tmap', 'deprecated_method', 'isfile_strict', 'memoize',
|
| 'parse_environ_block', 'path_exists_strict', 'usage_percent',
|
| 'supports_ipv6', 'sockfam_to_enum', 'socktype_to_enum', "wrap_numbers",
|
| 'open_text', 'open_binary', 'cat', 'bcat',
|
| 'bytes2human', 'conn_to_ntuple', 'debug',
|
|
|
| 'hilite', 'term_supports_colors', 'print_color',
|
| ]
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| POSIX = os.name == "posix"
|
| WINDOWS = os.name == "nt"
|
| LINUX = sys.platform.startswith("linux")
|
| MACOS = sys.platform.startswith("darwin")
|
| OSX = MACOS
|
| FREEBSD = sys.platform.startswith(("freebsd", "midnightbsd"))
|
| OPENBSD = sys.platform.startswith("openbsd")
|
| NETBSD = sys.platform.startswith("netbsd")
|
| BSD = FREEBSD or OPENBSD or NETBSD
|
| SUNOS = sys.platform.startswith(("sunos", "solaris"))
|
| AIX = sys.platform.startswith("aix")
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| STATUS_RUNNING = "running"
|
| STATUS_SLEEPING = "sleeping"
|
| STATUS_DISK_SLEEP = "disk-sleep"
|
| STATUS_STOPPED = "stopped"
|
| STATUS_TRACING_STOP = "tracing-stop"
|
| STATUS_ZOMBIE = "zombie"
|
| STATUS_DEAD = "dead"
|
| STATUS_WAKE_KILL = "wake-kill"
|
| STATUS_WAKING = "waking"
|
| STATUS_IDLE = "idle"
|
| STATUS_LOCKED = "locked"
|
| STATUS_WAITING = "waiting"
|
| STATUS_SUSPENDED = "suspended"
|
| STATUS_PARKED = "parked"
|
|
|
|
|
| CONN_ESTABLISHED = "ESTABLISHED"
|
| CONN_SYN_SENT = "SYN_SENT"
|
| CONN_SYN_RECV = "SYN_RECV"
|
| CONN_FIN_WAIT1 = "FIN_WAIT1"
|
| CONN_FIN_WAIT2 = "FIN_WAIT2"
|
| CONN_TIME_WAIT = "TIME_WAIT"
|
| CONN_CLOSE = "CLOSE"
|
| CONN_CLOSE_WAIT = "CLOSE_WAIT"
|
| CONN_LAST_ACK = "LAST_ACK"
|
| CONN_LISTEN = "LISTEN"
|
| CONN_CLOSING = "CLOSING"
|
| CONN_NONE = "NONE"
|
|
|
|
|
|
|
| class NicDuplex(enum.IntEnum):
|
| NIC_DUPLEX_FULL = 2
|
| NIC_DUPLEX_HALF = 1
|
| NIC_DUPLEX_UNKNOWN = 0
|
|
|
|
|
| globals().update(NicDuplex.__members__)
|
|
|
|
|
|
|
| class BatteryTime(enum.IntEnum):
|
| POWER_TIME_UNKNOWN = -1
|
| POWER_TIME_UNLIMITED = -2
|
|
|
|
|
| globals().update(BatteryTime.__members__)
|
|
|
|
|
|
|
| ENCODING = sys.getfilesystemencoding()
|
| ENCODING_ERRS = sys.getfilesystemencodeerrors()
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| sswap = namedtuple('sswap', ['total', 'used', 'free', 'percent', 'sin',
|
| 'sout'])
|
|
|
| sdiskusage = namedtuple('sdiskusage', ['total', 'used', 'free', 'percent'])
|
|
|
| sdiskio = namedtuple('sdiskio', ['read_count', 'write_count',
|
| 'read_bytes', 'write_bytes',
|
| 'read_time', 'write_time'])
|
|
|
| sdiskpart = namedtuple('sdiskpart', ['device', 'mountpoint', 'fstype', 'opts'])
|
|
|
| snetio = namedtuple('snetio', ['bytes_sent', 'bytes_recv',
|
| 'packets_sent', 'packets_recv',
|
| 'errin', 'errout',
|
| 'dropin', 'dropout'])
|
|
|
| suser = namedtuple('suser', ['name', 'terminal', 'host', 'started', 'pid'])
|
|
|
| sconn = namedtuple('sconn', ['fd', 'family', 'type', 'laddr', 'raddr',
|
| 'status', 'pid'])
|
|
|
| snicaddr = namedtuple('snicaddr',
|
| ['family', 'address', 'netmask', 'broadcast', 'ptp'])
|
|
|
| snicstats = namedtuple('snicstats',
|
| ['isup', 'duplex', 'speed', 'mtu', 'flags'])
|
|
|
| scpustats = namedtuple(
|
| 'scpustats', ['ctx_switches', 'interrupts', 'soft_interrupts', 'syscalls'])
|
|
|
| scpufreq = namedtuple('scpufreq', ['current', 'min', 'max'])
|
|
|
| shwtemp = namedtuple(
|
| 'shwtemp', ['label', 'current', 'high', 'critical'])
|
|
|
| sbattery = namedtuple('sbattery', ['percent', 'secsleft', 'power_plugged'])
|
|
|
| sfan = namedtuple('sfan', ['label', 'current'])
|
|
|
|
|
|
|
|
|
|
|
| pcputimes = namedtuple(
|
| 'pcputimes', ['user', 'system', 'children_user', 'children_system']
|
| )
|
|
|
| popenfile = namedtuple('popenfile', ['path', 'fd'])
|
|
|
| pthread = namedtuple('pthread', ['id', 'user_time', 'system_time'])
|
|
|
| puids = namedtuple('puids', ['real', 'effective', 'saved'])
|
|
|
| pgids = namedtuple('pgids', ['real', 'effective', 'saved'])
|
|
|
| pio = namedtuple(
|
| 'pio', ['read_count', 'write_count', 'read_bytes', 'write_bytes']
|
| )
|
|
|
| pionice = namedtuple('pionice', ['ioclass', 'value'])
|
|
|
| pctxsw = namedtuple('pctxsw', ['voluntary', 'involuntary'])
|
|
|
| pconn = namedtuple(
|
| 'pconn', ['fd', 'family', 'type', 'laddr', 'raddr', 'status']
|
| )
|
|
|
|
|
| addr = namedtuple('addr', ['ip', 'port'])
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| conn_tmap = {
|
| "all": ([AF_INET, AF_INET6, AF_UNIX], [SOCK_STREAM, SOCK_DGRAM]),
|
| "tcp": ([AF_INET, AF_INET6], [SOCK_STREAM]),
|
| "tcp4": ([AF_INET], [SOCK_STREAM]),
|
| "udp": ([AF_INET, AF_INET6], [SOCK_DGRAM]),
|
| "udp4": ([AF_INET], [SOCK_DGRAM]),
|
| "inet": ([AF_INET, AF_INET6], [SOCK_STREAM, SOCK_DGRAM]),
|
| "inet4": ([AF_INET], [SOCK_STREAM, SOCK_DGRAM]),
|
| "inet6": ([AF_INET6], [SOCK_STREAM, SOCK_DGRAM]),
|
| }
|
|
|
| if AF_INET6 is not None:
|
| conn_tmap.update({
|
| "tcp6": ([AF_INET6], [SOCK_STREAM]),
|
| "udp6": ([AF_INET6], [SOCK_DGRAM]),
|
| })
|
|
|
| if AF_UNIX is not None and not SUNOS:
|
| conn_tmap.update({"unix": ([AF_UNIX], [SOCK_STREAM, SOCK_DGRAM])})
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| class Error(Exception):
|
| """Base exception class. All other psutil exceptions inherit
|
| from this one.
|
| """
|
|
|
| __module__ = 'psutil'
|
|
|
| def _infodict(self, attrs):
|
| info = collections.OrderedDict()
|
| for name in attrs:
|
| value = getattr(self, name, None)
|
| if value or (name == "pid" and value == 0):
|
| info[name] = value
|
| return info
|
|
|
| def __str__(self):
|
|
|
| info = self._infodict(("pid", "ppid", "name"))
|
| if info:
|
| details = "({})".format(
|
| ", ".join([f"{k}={v!r}" for k, v in info.items()])
|
| )
|
| else:
|
| details = None
|
| return " ".join([x for x in (getattr(self, "msg", ""), details) if x])
|
|
|
| def __repr__(self):
|
|
|
| info = self._infodict(("pid", "ppid", "name", "seconds", "msg"))
|
| details = ", ".join([f"{k}={v!r}" for k, v in info.items()])
|
| return f"psutil.{self.__class__.__name__}({details})"
|
|
|
|
|
| class NoSuchProcess(Error):
|
| """Exception raised when a process with a certain PID doesn't
|
| or no longer exists.
|
| """
|
|
|
| __module__ = 'psutil'
|
|
|
| def __init__(self, pid, name=None, msg=None):
|
| Error.__init__(self)
|
| self.pid = pid
|
| self.name = name
|
| self.msg = msg or "process no longer exists"
|
|
|
| def __reduce__(self):
|
| return (self.__class__, (self.pid, self.name, self.msg))
|
|
|
|
|
| class ZombieProcess(NoSuchProcess):
|
| """Exception raised when querying a zombie process. This is
|
| raised on macOS, BSD and Solaris only, and not always: depending
|
| on the query the OS may be able to succeed anyway.
|
| On Linux all zombie processes are querable (hence this is never
|
| raised). Windows doesn't have zombie processes.
|
| """
|
|
|
| __module__ = 'psutil'
|
|
|
| def __init__(self, pid, name=None, ppid=None, msg=None):
|
| NoSuchProcess.__init__(self, pid, name, msg)
|
| self.ppid = ppid
|
| self.msg = msg or "PID still exists but it's a zombie"
|
|
|
| def __reduce__(self):
|
| return (self.__class__, (self.pid, self.name, self.ppid, self.msg))
|
|
|
|
|
| class AccessDenied(Error):
|
| """Exception raised when permission to perform an action is denied."""
|
|
|
| __module__ = 'psutil'
|
|
|
| def __init__(self, pid=None, name=None, msg=None):
|
| Error.__init__(self)
|
| self.pid = pid
|
| self.name = name
|
| self.msg = msg or ""
|
|
|
| def __reduce__(self):
|
| return (self.__class__, (self.pid, self.name, self.msg))
|
|
|
|
|
| class TimeoutExpired(Error):
|
| """Raised on Process.wait(timeout) if timeout expires and process
|
| is still alive.
|
| """
|
|
|
| __module__ = 'psutil'
|
|
|
| def __init__(self, seconds, pid=None, name=None):
|
| Error.__init__(self)
|
| self.seconds = seconds
|
| self.pid = pid
|
| self.name = name
|
| self.msg = f"timeout after {seconds} seconds"
|
|
|
| def __reduce__(self):
|
| return (self.__class__, (self.seconds, self.pid, self.name))
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| def usage_percent(used, total, round_=None):
|
| """Calculate percentage usage of 'used' against 'total'."""
|
| try:
|
| ret = (float(used) / total) * 100
|
| except ZeroDivisionError:
|
| return 0.0
|
| else:
|
| if round_ is not None:
|
| ret = round(ret, round_)
|
| return ret
|
|
|
|
|
| def memoize(fun):
|
| """A simple memoize decorator for functions supporting (hashable)
|
| positional arguments.
|
| It also provides a cache_clear() function for clearing the cache:
|
|
|
| >>> @memoize
|
| ... def foo()
|
| ... return 1
|
| ...
|
| >>> foo()
|
| 1
|
| >>> foo.cache_clear()
|
| >>>
|
|
|
| It supports:
|
| - functions
|
| - classes (acts as a @singleton)
|
| - staticmethods
|
| - classmethods
|
|
|
| It does NOT support:
|
| - methods
|
| """
|
|
|
| @functools.wraps(fun)
|
| def wrapper(*args, **kwargs):
|
| key = (args, frozenset(sorted(kwargs.items())))
|
| try:
|
| return cache[key]
|
| except KeyError:
|
| try:
|
| ret = cache[key] = fun(*args, **kwargs)
|
| except Exception as err:
|
| raise err from None
|
| return ret
|
|
|
| def cache_clear():
|
| """Clear cache."""
|
| cache.clear()
|
|
|
| cache = {}
|
| wrapper.cache_clear = cache_clear
|
| return wrapper
|
|
|
|
|
| def memoize_when_activated(fun):
|
| """A memoize decorator which is disabled by default. It can be
|
| activated and deactivated on request.
|
| For efficiency reasons it can be used only against class methods
|
| accepting no arguments.
|
|
|
| >>> class Foo:
|
| ... @memoize
|
| ... def foo()
|
| ... print(1)
|
| ...
|
| >>> f = Foo()
|
| >>> # deactivated (default)
|
| >>> foo()
|
| 1
|
| >>> foo()
|
| 1
|
| >>>
|
| >>> # activated
|
| >>> foo.cache_activate(self)
|
| >>> foo()
|
| 1
|
| >>> foo()
|
| >>> foo()
|
| >>>
|
| """
|
|
|
| @functools.wraps(fun)
|
| def wrapper(self):
|
| try:
|
|
|
| ret = self._cache[fun]
|
| except AttributeError:
|
|
|
| try:
|
| return fun(self)
|
| except Exception as err:
|
| raise err from None
|
| except KeyError:
|
|
|
|
|
| try:
|
| ret = fun(self)
|
| except Exception as err:
|
| raise err from None
|
| try:
|
| self._cache[fun] = ret
|
| except AttributeError:
|
|
|
|
|
| pass
|
| return ret
|
|
|
| def cache_activate(proc):
|
| """Activate cache. Expects a Process instance. Cache will be
|
| stored as a "_cache" instance attribute.
|
| """
|
| proc._cache = {}
|
|
|
| def cache_deactivate(proc):
|
| """Deactivate and clear cache."""
|
| try:
|
| del proc._cache
|
| except AttributeError:
|
| pass
|
|
|
| wrapper.cache_activate = cache_activate
|
| wrapper.cache_deactivate = cache_deactivate
|
| return wrapper
|
|
|
|
|
| def isfile_strict(path):
|
| """Same as os.path.isfile() but does not swallow EACCES / EPERM
|
| exceptions, see:
|
| http://mail.python.org/pipermail/python-dev/2012-June/120787.html.
|
| """
|
| try:
|
| st = os.stat(path)
|
| except PermissionError:
|
| raise
|
| except OSError:
|
| return False
|
| else:
|
| return stat.S_ISREG(st.st_mode)
|
|
|
|
|
| def path_exists_strict(path):
|
| """Same as os.path.exists() but does not swallow EACCES / EPERM
|
| exceptions. See:
|
| http://mail.python.org/pipermail/python-dev/2012-June/120787.html.
|
| """
|
| try:
|
| os.stat(path)
|
| except PermissionError:
|
| raise
|
| except OSError:
|
| return False
|
| else:
|
| return True
|
|
|
|
|
| def supports_ipv6():
|
| """Return True if IPv6 is supported on this platform."""
|
| if not socket.has_ipv6 or AF_INET6 is None:
|
| return False
|
| try:
|
| with socket.socket(AF_INET6, socket.SOCK_STREAM) as sock:
|
| sock.bind(("::1", 0))
|
| return True
|
| except OSError:
|
| return False
|
|
|
|
|
| def parse_environ_block(data):
|
| """Parse a C environ block of environment variables into a dictionary."""
|
|
|
|
|
| ret = {}
|
| pos = 0
|
|
|
|
|
| WINDOWS_ = WINDOWS
|
| while True:
|
| next_pos = data.find("\0", pos)
|
|
|
| if next_pos <= pos:
|
| break
|
|
|
| equal_pos = data.find("=", pos, next_pos)
|
| if equal_pos > pos:
|
| key = data[pos:equal_pos]
|
| value = data[equal_pos + 1 : next_pos]
|
|
|
| if WINDOWS_:
|
| key = key.upper()
|
| ret[key] = value
|
| pos = next_pos + 1
|
|
|
| return ret
|
|
|
|
|
| def sockfam_to_enum(num):
|
| """Convert a numeric socket family value to an IntEnum member.
|
| If it's not a known member, return the numeric value itself.
|
| """
|
| try:
|
| return socket.AddressFamily(num)
|
| except ValueError:
|
| return num
|
|
|
|
|
| def socktype_to_enum(num):
|
| """Convert a numeric socket type value to an IntEnum member.
|
| If it's not a known member, return the numeric value itself.
|
| """
|
| try:
|
| return socket.SocketKind(num)
|
| except ValueError:
|
| return num
|
|
|
|
|
| def conn_to_ntuple(fd, fam, type_, laddr, raddr, status, status_map, pid=None):
|
| """Convert a raw connection tuple to a proper ntuple."""
|
| if fam in {socket.AF_INET, AF_INET6}:
|
| if laddr:
|
| laddr = addr(*laddr)
|
| if raddr:
|
| raddr = addr(*raddr)
|
| if type_ == socket.SOCK_STREAM and fam in {AF_INET, AF_INET6}:
|
| status = status_map.get(status, CONN_NONE)
|
| else:
|
| status = CONN_NONE
|
| fam = sockfam_to_enum(fam)
|
| type_ = socktype_to_enum(type_)
|
| if pid is None:
|
| return pconn(fd, fam, type_, laddr, raddr, status)
|
| else:
|
| return sconn(fd, fam, type_, laddr, raddr, status, pid)
|
|
|
|
|
| def broadcast_addr(addr):
|
| """Given the address ntuple returned by ``net_if_addrs()``
|
| calculates the broadcast address.
|
| """
|
| import ipaddress
|
|
|
| if not addr.address or not addr.netmask:
|
| return None
|
| if addr.family == socket.AF_INET:
|
| return str(
|
| ipaddress.IPv4Network(
|
| f"{addr.address}/{addr.netmask}", strict=False
|
| ).broadcast_address
|
| )
|
| if addr.family == socket.AF_INET6:
|
| return str(
|
| ipaddress.IPv6Network(
|
| f"{addr.address}/{addr.netmask}", strict=False
|
| ).broadcast_address
|
| )
|
|
|
|
|
| def deprecated_method(replacement):
|
| """A decorator which can be used to mark a method as deprecated
|
| 'replcement' is the method name which will be called instead.
|
| """
|
|
|
| def outer(fun):
|
| msg = (
|
| f"{fun.__name__}() is deprecated and will be removed; use"
|
| f" {replacement}() instead"
|
| )
|
| if fun.__doc__ is None:
|
| fun.__doc__ = msg
|
|
|
| @functools.wraps(fun)
|
| def inner(self, *args, **kwargs):
|
| warnings.warn(msg, category=DeprecationWarning, stacklevel=2)
|
| return getattr(self, replacement)(*args, **kwargs)
|
|
|
| return inner
|
|
|
| return outer
|
|
|
|
|
| class _WrapNumbers:
|
| """Watches numbers so that they don't overflow and wrap
|
| (reset to zero).
|
| """
|
|
|
| def __init__(self):
|
| self.lock = threading.Lock()
|
| self.cache = {}
|
| self.reminders = {}
|
| self.reminder_keys = {}
|
|
|
| def _add_dict(self, input_dict, name):
|
| assert name not in self.cache
|
| assert name not in self.reminders
|
| assert name not in self.reminder_keys
|
| self.cache[name] = input_dict
|
| self.reminders[name] = collections.defaultdict(int)
|
| self.reminder_keys[name] = collections.defaultdict(set)
|
|
|
| def _remove_dead_reminders(self, input_dict, name):
|
| """In case the number of keys changed between calls (e.g. a
|
| disk disappears) this removes the entry from self.reminders.
|
| """
|
| old_dict = self.cache[name]
|
| gone_keys = set(old_dict.keys()) - set(input_dict.keys())
|
| for gone_key in gone_keys:
|
| for remkey in self.reminder_keys[name][gone_key]:
|
| del self.reminders[name][remkey]
|
| del self.reminder_keys[name][gone_key]
|
|
|
| def run(self, input_dict, name):
|
| """Cache dict and sum numbers which overflow and wrap.
|
| Return an updated copy of `input_dict`.
|
| """
|
| if name not in self.cache:
|
|
|
| self._add_dict(input_dict, name)
|
| return input_dict
|
|
|
| self._remove_dead_reminders(input_dict, name)
|
|
|
| old_dict = self.cache[name]
|
| new_dict = {}
|
| for key in input_dict:
|
| input_tuple = input_dict[key]
|
| try:
|
| old_tuple = old_dict[key]
|
| except KeyError:
|
|
|
|
|
| new_dict[key] = input_tuple
|
| continue
|
|
|
| bits = []
|
| for i in range(len(input_tuple)):
|
| input_value = input_tuple[i]
|
| old_value = old_tuple[i]
|
| remkey = (key, i)
|
| if input_value < old_value:
|
|
|
| self.reminders[name][remkey] += old_value
|
| self.reminder_keys[name][key].add(remkey)
|
| bits.append(input_value + self.reminders[name][remkey])
|
|
|
| new_dict[key] = tuple(bits)
|
|
|
| self.cache[name] = input_dict
|
| return new_dict
|
|
|
| def cache_clear(self, name=None):
|
| """Clear the internal cache, optionally only for function 'name'."""
|
| with self.lock:
|
| if name is None:
|
| self.cache.clear()
|
| self.reminders.clear()
|
| self.reminder_keys.clear()
|
| else:
|
| self.cache.pop(name, None)
|
| self.reminders.pop(name, None)
|
| self.reminder_keys.pop(name, None)
|
|
|
| def cache_info(self):
|
| """Return internal cache dicts as a tuple of 3 elements."""
|
| with self.lock:
|
| return (self.cache, self.reminders, self.reminder_keys)
|
|
|
|
|
| def wrap_numbers(input_dict, name):
|
| """Given an `input_dict` and a function `name`, adjust the numbers
|
| which "wrap" (restart from zero) across different calls by adding
|
| "old value" to "new value" and return an updated dict.
|
| """
|
| with _wn.lock:
|
| return _wn.run(input_dict, name)
|
|
|
|
|
| _wn = _WrapNumbers()
|
| wrap_numbers.cache_clear = _wn.cache_clear
|
| wrap_numbers.cache_info = _wn.cache_info
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| FILE_READ_BUFFER_SIZE = 32 * 1024
|
|
|
|
|
| def open_binary(fname):
|
| return open(fname, "rb", buffering=FILE_READ_BUFFER_SIZE)
|
|
|
|
|
| def open_text(fname):
|
| """Open a file in text mode by using the proper FS encoding and
|
| en/decoding error handlers.
|
| """
|
|
|
|
|
|
|
| fobj = open(
|
| fname,
|
| buffering=FILE_READ_BUFFER_SIZE,
|
| encoding=ENCODING,
|
| errors=ENCODING_ERRS,
|
| )
|
| try:
|
|
|
|
|
| fobj._CHUNK_SIZE = FILE_READ_BUFFER_SIZE
|
| except AttributeError:
|
| pass
|
| except Exception:
|
| fobj.close()
|
| raise
|
|
|
| return fobj
|
|
|
|
|
| def cat(fname, fallback=_DEFAULT, _open=open_text):
|
| """Read entire file content and return it as a string. File is
|
| opened in text mode. If specified, `fallback` is the value
|
| returned in case of error, either if the file does not exist or
|
| it can't be read().
|
| """
|
| if fallback is _DEFAULT:
|
| with _open(fname) as f:
|
| return f.read()
|
| else:
|
| try:
|
| with _open(fname) as f:
|
| return f.read()
|
| except OSError:
|
| return fallback
|
|
|
|
|
| def bcat(fname, fallback=_DEFAULT):
|
| """Same as above but opens file in binary mode."""
|
| return cat(fname, fallback=fallback, _open=open_binary)
|
|
|
|
|
| def bytes2human(n, format="%(value).1f%(symbol)s"):
|
| """Used by various scripts. See: https://code.activestate.com/recipes/578019-bytes-to-human-human-to-bytes-converter/?in=user-4178764.
|
|
|
| >>> bytes2human(10000)
|
| '9.8K'
|
| >>> bytes2human(100001221)
|
| '95.4M'
|
| """
|
| symbols = ('B', 'K', 'M', 'G', 'T', 'P', 'E', 'Z', 'Y')
|
| prefix = {}
|
| for i, s in enumerate(symbols[1:]):
|
| prefix[s] = 1 << (i + 1) * 10
|
| for symbol in reversed(symbols[1:]):
|
| if abs(n) >= prefix[symbol]:
|
| value = float(n) / prefix[symbol]
|
| return format % locals()
|
| return format % dict(symbol=symbols[0], value=n)
|
|
|
|
|
| def get_procfs_path():
|
| """Return updated psutil.PROCFS_PATH constant."""
|
| return sys.modules['psutil'].PROCFS_PATH
|
|
|
|
|
| def decode(s):
|
| return s.decode(encoding=ENCODING, errors=ENCODING_ERRS)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| @memoize
|
| def term_supports_colors(file=sys.stdout):
|
| if os.name == 'nt':
|
| return True
|
| try:
|
| import curses
|
|
|
| assert file.isatty()
|
| curses.setupterm()
|
| assert curses.tigetnum("colors") > 0
|
| except Exception:
|
| return False
|
| else:
|
| return True
|
|
|
|
|
| def hilite(s, color=None, bold=False):
|
| """Return an highlighted version of 'string'."""
|
| if not term_supports_colors():
|
| return s
|
| attr = []
|
| colors = dict(
|
| blue='34',
|
| brown='33',
|
| darkgrey='30',
|
| green='32',
|
| grey='37',
|
| lightblue='36',
|
| red='91',
|
| violet='35',
|
| yellow='93',
|
| )
|
| colors[None] = '29'
|
| try:
|
| color = colors[color]
|
| except KeyError:
|
| msg = f"invalid color {color!r}; choose amongst {list(colors.keys())}"
|
| raise ValueError(msg) from None
|
| attr.append(color)
|
| if bold:
|
| attr.append('1')
|
| return f"\x1b[{';'.join(attr)}m{s}\x1b[0m"
|
|
|
|
|
| def print_color(
|
| s, color=None, bold=False, file=sys.stdout
|
| ):
|
| """Print a colorized version of string."""
|
| if not term_supports_colors():
|
| print(s, file=file)
|
| elif POSIX:
|
| print(hilite(s, color, bold), file=file)
|
| else:
|
| import ctypes
|
|
|
| DEFAULT_COLOR = 7
|
| GetStdHandle = ctypes.windll.Kernel32.GetStdHandle
|
| SetConsoleTextAttribute = (
|
| ctypes.windll.Kernel32.SetConsoleTextAttribute
|
| )
|
|
|
| colors = dict(green=2, red=4, brown=6, yellow=6)
|
| colors[None] = DEFAULT_COLOR
|
| try:
|
| color = colors[color]
|
| except KeyError:
|
| msg = (
|
| f"invalid color {color!r}; choose between"
|
| f" {list(colors.keys())!r}"
|
| )
|
| raise ValueError(msg) from None
|
| if bold and color <= 7:
|
| color += 8
|
|
|
| handle_id = -12 if file is sys.stderr else -11
|
| GetStdHandle.restype = ctypes.c_ulong
|
| handle = GetStdHandle(handle_id)
|
| SetConsoleTextAttribute(handle, color)
|
| try:
|
| print(s, file=file)
|
| finally:
|
| SetConsoleTextAttribute(handle, DEFAULT_COLOR)
|
|
|
|
|
| def debug(msg):
|
| """If PSUTIL_DEBUG env var is set, print a debug message to stderr."""
|
| if PSUTIL_DEBUG:
|
| import inspect
|
|
|
| fname, lineno, _, _lines, _index = inspect.getframeinfo(
|
| inspect.currentframe().f_back
|
| )
|
| if isinstance(msg, Exception):
|
| if isinstance(msg, OSError):
|
|
|
| msg = f"ignoring {msg}"
|
| else:
|
| msg = f"ignoring {msg!r}"
|
| print(
|
| f"psutil-debug [{fname}:{lineno}]> {msg}", file=sys.stderr
|
| )
|
|
|