"""
This module provides functions for platform integration. Most of the functionality here
could also be achieved with psutils but we want to avoid the large dependency.
"""
import os
import platform
import enum
import resource
import requests
import time
import logging
from pathlib import Path
from typing import Union, Tuple, Optional
from urllib.parse import urlparse
__all__ = [
"get_ac_state",
"ACState",
"get_inotify_limits",
"CPU_COUNT",
"cpu_usage_percent",
"check_connection",
]
[docs]CPU_COUNT = os.cpu_count() or 1 # os.cpu_count can return None
LINUX_POWER_SUPPLY_PATH = "/sys/class/power_supply"
def multi_cat(*paths: Path) -> Union[int, bytes, None]:
"""
Attempts to read the content of multiple files which may not exist. Returns the
content of the first file which can be read. If none of them can be read return
None. Returns an integer if the content is a digit.
"""
for path in paths:
try:
ret = path.read_bytes().strip()
except OSError:
pass
else:
return int(ret) if ret.isdigit() else ret
return None
[docs]class ACState(enum.Enum):
"""Enumeration of AC power states"""
[docs] Connected = "Connected"
[docs] Disconnected = "Disconnected"
[docs] Undetermined = "Undetermined"
[docs]def get_ac_state() -> ACState:
"""
Checks if the current device has AC power or is running on battery.
:returns: ``True`` if the device has AC power, ``False`` otherwise.
"""
if platform.system() == "Darwin":
from ctypes import c_double
from rubicon.objc.runtime import load_library
iokit = load_library("IOKit")
kIOPSTimeRemainingUnlimited = -2.0
iokit.IOPSGetTimeRemainingEstimate.restype = c_double
remaining_time = iokit.IOPSGetTimeRemainingEstimate()
if remaining_time == kIOPSTimeRemainingUnlimited:
return ACState.Connected
else:
return ACState.Disconnected
elif platform.system() == "Linux":
# taken from https://github.com/giampaolo/psutil
supply_entry = list(os.scandir(LINUX_POWER_SUPPLY_PATH))
ac_paths = [
Path(entry.path)
for entry in supply_entry
if entry.name.startswith("A") or "ac" in entry.name.lower()
]
battery_paths = [
Path(entry.path)
for entry in supply_entry
if entry.name.startswith("B") or "battery" in entry.name.lower()
]
online = multi_cat(*iter(path / "online" for path in ac_paths))
if online is not None:
if online == 1:
return ACState.Connected
else:
return ACState.Disconnected
elif len(battery_paths) > 0:
# Get the first available battery. Usually this is "BAT0", except
# some rare exceptions:
# https://github.com/giampaolo/psutil/issues/1238
bat0 = sorted(battery_paths)[0]
try:
status = (bat0 / "status").read_text().strip().lower()
except OSError:
status = ""
if status == "discharging":
return ACState.Disconnected
elif status in ("charging", "full"):
return ACState.Connected
return ACState.Undetermined
[docs]def get_inotify_limits() -> Tuple[int, int, int]:
"""
Returns the current inotify limit settings as tuple.
:returns: ``(max_user_watches, max_user_instances, max_queued_events)``
:raises OSError: if the settings cannot be read from /proc/sys/fs/inotify. This may
happen if /proc/sys is left out of the kernel image or simply not mounted.
"""
root = Path("/proc/sys/fs/inotify")
max_user_watches_path = root / "max_user_watches"
max_user_instances_path = root / "max_user_instances"
max_queued_events_path = root / "max_queued_events"
max_user_watches = int(max_user_watches_path.read_bytes().strip())
max_user_instances = int(max_user_instances_path.read_bytes().strip())
max_queued_events = int(max_queued_events_path.read_bytes().strip())
return max_user_watches, max_user_instances, max_queued_events
[docs]def cpu_usage_percent(interval: float = 0.1) -> float:
"""
Returns a float representing the CPU utilization of the current process as a
percentage. This duplicates the similar method from psutil to avoid the psutil
dependency.
Compares process times to system CPU times elapsed before and after the interval
(blocking). It is recommended for accuracy that this function be called with an
interval of at least 0.1 sec.
A value > 100.0 can be returned in case of processes running multiple threads on
different CPU cores. The returned value is explicitly NOT split evenly between all
available logical CPUs. This means that a busy loop process running on a system with
2 logical CPUs will be reported as having 100% CPU utilization instead of 50%.
:param interval: Interval in sec between comparisons of CPU times.
:returns: CPU usage during interval in percent.
"""
if interval <= 0:
raise ValueError(f"interval is not positive (got {interval!r})")
def timer():
return time.monotonic() * CPU_COUNT
st1 = timer()
rt1 = resource.getrusage(resource.RUSAGE_SELF)
time.sleep(interval)
st2 = timer()
rt2 = resource.getrusage(resource.RUSAGE_SELF)
delta_proc = (rt2.ru_utime - rt1.ru_utime) + (rt2.ru_stime - rt1.ru_stime)
delta_time = st2 - st1
try:
overall_cpus_percent = (delta_proc / delta_time) * 100
except ZeroDivisionError:
return 0.0
else:
single_cpu_percent = overall_cpus_percent * CPU_COUNT
return round(single_cpu_percent, 1)
[docs]def check_connection(
hostname: str, timeout: int = 2, logger: Optional[logging.Logger] = None
) -> bool:
"""
A low latency check for an internet connection.
:param hostname: Hostname to use for connection check.
:param timeout: Timeout in seconds for connection check.
:param logger: If provided, log output for connection failures will be logged to
this logger with the level DEBUG.
:returns: Connection availability.
"""
if urlparse(hostname).scheme not in ["http", "https"]:
hostname = "http://" + hostname
try:
requests.head(hostname, timeout=timeout)
return True
except Exception:
if logger:
logger.debug("Could not reach %s", hostname, exc_info=True)
return False