| | """Utilities for identifying local IP addresses.""" |
| | |
| | |
| | from __future__ import annotations |
| |
|
| | import os |
| | import re |
| | import socket |
| | import subprocess |
| | from subprocess import PIPE, Popen |
| | from typing import Any, Callable, Iterable, Sequence |
| | from warnings import warn |
| |
|
| | LOCAL_IPS: list = [] |
| | PUBLIC_IPS: list = [] |
| |
|
| | LOCALHOST: str = "" |
| |
|
| |
|
| | def _uniq_stable(elems: Iterable) -> list: |
| | """uniq_stable(elems) -> list |
| | |
| | Return from an iterable, a list of all the unique elements in the input, |
| | maintaining the order in which they first appear. |
| | """ |
| | seen = set() |
| | value = [] |
| | for x in elems: |
| | if x not in seen: |
| | value.append(x) |
| | seen.add(x) |
| | return value |
| |
|
| |
|
| | def _get_output(cmd: str | Sequence[str]) -> str: |
| | """Get output of a command, raising IOError if it fails""" |
| | startupinfo = None |
| | if os.name == "nt": |
| | startupinfo = subprocess.STARTUPINFO() |
| | startupinfo.dwFlags |= subprocess.STARTF_USESHOWWINDOW |
| | p = Popen(cmd, stdout=PIPE, stderr=PIPE, startupinfo=startupinfo) |
| | stdout, stderr = p.communicate() |
| | if p.returncode: |
| | msg = "Failed to run {}: {}".format(cmd, stderr.decode("utf8", "replace")) |
| | raise OSError(msg) |
| | return stdout.decode("utf8", "replace") |
| |
|
| |
|
| | def _only_once(f: Callable) -> Callable: |
| | """decorator to only run a function once""" |
| | f.called = False |
| |
|
| | def wrapped(**kwargs: Any) -> Any: |
| | if f.called: |
| | return |
| | ret = f(**kwargs) |
| | f.called = True |
| | return ret |
| |
|
| | return wrapped |
| |
|
| |
|
| | def _requires_ips(f: Callable) -> Callable: |
| | """decorator to ensure load_ips has been run before f""" |
| |
|
| | def ips_loaded(*args: Any, **kwargs: Any) -> Any: |
| | _load_ips() |
| | return f(*args, **kwargs) |
| |
|
| | return ips_loaded |
| |
|
| |
|
| | |
| | class NoIPAddresses(Exception): |
| | pass |
| |
|
| |
|
| | def _populate_from_list(addrs: Sequence[str] | None) -> None: |
| | """populate local and public IPs from flat list of all IPs""" |
| | if not addrs: |
| | raise NoIPAddresses |
| |
|
| | global LOCALHOST |
| | public_ips = [] |
| | local_ips = [] |
| |
|
| | for ip in addrs: |
| | local_ips.append(ip) |
| | if not ip.startswith("127."): |
| | public_ips.append(ip) |
| | elif not LOCALHOST: |
| | LOCALHOST = ip |
| |
|
| | if not LOCALHOST or LOCALHOST == "127.0.0.1": |
| | LOCALHOST = "127.0.0.1" |
| | local_ips.insert(0, LOCALHOST) |
| |
|
| | local_ips.extend(["0.0.0.0", ""]) |
| |
|
| | LOCAL_IPS[:] = _uniq_stable(local_ips) |
| | PUBLIC_IPS[:] = _uniq_stable(public_ips) |
| |
|
| |
|
| | _ifconfig_ipv4_pat = re.compile(r"inet\b.*?(\d+\.\d+\.\d+\.\d+)", re.IGNORECASE) |
| |
|
| |
|
| | def _load_ips_ifconfig() -> None: |
| | """load ip addresses from `ifconfig` output (posix)""" |
| |
|
| | try: |
| | out = _get_output("ifconfig") |
| | except OSError: |
| | |
| | out = _get_output("/sbin/ifconfig") |
| |
|
| | lines = out.splitlines() |
| | addrs = [] |
| | for line in lines: |
| | m = _ifconfig_ipv4_pat.match(line.strip()) |
| | if m: |
| | addrs.append(m.group(1)) |
| | _populate_from_list(addrs) |
| |
|
| |
|
| | def _load_ips_ip() -> None: |
| | """load ip addresses from `ip addr` output (Linux)""" |
| | out = _get_output(["ip", "-f", "inet", "addr"]) |
| |
|
| | lines = out.splitlines() |
| | addrs = [] |
| | for line in lines: |
| | blocks = line.lower().split() |
| | if (len(blocks) >= 2) and (blocks[0] == "inet"): |
| | addrs.append(blocks[1].split("/")[0]) |
| | _populate_from_list(addrs) |
| |
|
| |
|
| | _ipconfig_ipv4_pat = re.compile(r"ipv4.*?(\d+\.\d+\.\d+\.\d+)$", re.IGNORECASE) |
| |
|
| |
|
| | def _load_ips_ipconfig() -> None: |
| | """load ip addresses from `ipconfig` output (Windows)""" |
| | out = _get_output("ipconfig") |
| |
|
| | lines = out.splitlines() |
| | addrs = [] |
| | for line in lines: |
| | m = _ipconfig_ipv4_pat.match(line.strip()) |
| | if m: |
| | addrs.append(m.group(1)) |
| | _populate_from_list(addrs) |
| |
|
| |
|
| | def _load_ips_netifaces() -> None: |
| | """load ip addresses with netifaces""" |
| | import netifaces |
| |
|
| | global LOCALHOST |
| | local_ips = [] |
| | public_ips = [] |
| |
|
| | |
| | for iface in netifaces.interfaces(): |
| | |
| | ipv4s = netifaces.ifaddresses(iface).get(netifaces.AF_INET, []) |
| | for entry in ipv4s: |
| | addr = entry.get("addr") |
| | if not addr: |
| | continue |
| | if not (iface.startswith("lo") or addr.startswith("127.")): |
| | public_ips.append(addr) |
| | elif not LOCALHOST: |
| | LOCALHOST = addr |
| | local_ips.append(addr) |
| | if not LOCALHOST: |
| | |
| | LOCALHOST = "127.0.0.1" |
| | local_ips.insert(0, LOCALHOST) |
| | local_ips.extend(["0.0.0.0", ""]) |
| | LOCAL_IPS[:] = _uniq_stable(local_ips) |
| | PUBLIC_IPS[:] = _uniq_stable(public_ips) |
| |
|
| |
|
| | def _load_ips_gethostbyname() -> None: |
| | """load ip addresses with socket.gethostbyname_ex |
| | |
| | This can be slow. |
| | """ |
| | global LOCALHOST |
| | try: |
| | LOCAL_IPS[:] = socket.gethostbyname_ex("localhost")[2] |
| | except OSError: |
| | |
| | LOCAL_IPS[:] = ["127.0.0.1"] |
| |
|
| | try: |
| | hostname = socket.gethostname() |
| | PUBLIC_IPS[:] = socket.gethostbyname_ex(hostname)[2] |
| | |
| | if not hostname.endswith(".local") and all(ip.startswith("127") for ip in PUBLIC_IPS): |
| | PUBLIC_IPS[:] = socket.gethostbyname_ex(socket.gethostname() + ".local")[2] |
| | except OSError: |
| | pass |
| | finally: |
| | PUBLIC_IPS[:] = _uniq_stable(PUBLIC_IPS) |
| | LOCAL_IPS.extend(PUBLIC_IPS) |
| |
|
| | |
| | LOCAL_IPS.extend(["0.0.0.0", ""]) |
| |
|
| | LOCAL_IPS[:] = _uniq_stable(LOCAL_IPS) |
| |
|
| | LOCALHOST = LOCAL_IPS[0] |
| |
|
| |
|
| | def _load_ips_dumb() -> None: |
| | """Fallback in case of unexpected failure""" |
| | global LOCALHOST |
| | LOCALHOST = "127.0.0.1" |
| | LOCAL_IPS[:] = [LOCALHOST, "0.0.0.0", ""] |
| | PUBLIC_IPS[:] = [] |
| |
|
| |
|
| | @_only_once |
| | def _load_ips(suppress_exceptions: bool = True) -> None: |
| | """load the IPs that point to this machine |
| | |
| | This function will only ever be called once. |
| | |
| | It will use netifaces to do it quickly if available. |
| | Then it will fallback on parsing the output of ifconfig / ip addr / ipconfig, as appropriate. |
| | Finally, it will fallback on socket.gethostbyname_ex, which can be slow. |
| | """ |
| |
|
| | try: |
| | |
| | try: |
| | return _load_ips_netifaces() |
| | except ImportError: |
| | pass |
| |
|
| | |
| |
|
| | if os.name == "nt": |
| | try: |
| | return _load_ips_ipconfig() |
| | except (OSError, NoIPAddresses): |
| | pass |
| | else: |
| | try: |
| | return _load_ips_ip() |
| | except (OSError, NoIPAddresses): |
| | pass |
| | try: |
| | return _load_ips_ifconfig() |
| | except (OSError, NoIPAddresses): |
| | pass |
| |
|
| | |
| |
|
| | return _load_ips_gethostbyname() |
| | except Exception as e: |
| | if not suppress_exceptions: |
| | raise |
| | |
| | warn("Unexpected error discovering local network interfaces: %s" % e, stacklevel=2) |
| | _load_ips_dumb() |
| |
|
| |
|
| | @_requires_ips |
| | def local_ips() -> list[str]: |
| | """return the IP addresses that point to this machine""" |
| | return LOCAL_IPS |
| |
|
| |
|
| | @_requires_ips |
| | def public_ips() -> list[str]: |
| | """return the IP addresses for this machine that are visible to other machines""" |
| | return PUBLIC_IPS |
| |
|
| |
|
| | @_requires_ips |
| | def localhost() -> str: |
| | """return ip for localhost (almost always 127.0.0.1)""" |
| | return LOCALHOST |
| |
|
| |
|
| | @_requires_ips |
| | def is_local_ip(ip: str) -> bool: |
| | """does `ip` point to this machine?""" |
| | return ip in LOCAL_IPS |
| |
|
| |
|
| | @_requires_ips |
| | def is_public_ip(ip: str) -> bool: |
| | """is `ip` a publicly visible address?""" |
| | return ip in PUBLIC_IPS |
| |
|