Source code for amsatop.utils.__status_file

import logging
from dataclasses import dataclass, field
from typing import List


[docs] @dataclass(frozen=True) class StatusFile: """ Represents the parsed contents of a `/proc/[pid or tgid]/status` file on a Linux system. All fields map to their corresponding values in the status file. """ Name: str State: str Tgid: int Ngid: int Pid: int PPid: int TracerPid: int Uid: List[int] Gid: List[int] FDSize: int Groups: List[int] = field(default_factory=list) NStgid: int = 0 NSpid: int = 0 NSpgid: int = 0 NSsid: int = 0 Kthread: int = 0 VmPeak: int = 0 VmSize: int = 0 VmLck: int = 0 VmPin: int = 0 VmHWM: int = 0 VmRSS: int = 0 RssAnon: int = 0 RssFile: int = 0 RssShmem: int = 0 VmData: int = 0 VmStk: int = 0 VmExe: int = 0 VmLib: int = 0 VmPTE: int = 0 VmSwap: int = 0 HugetlbPages: int = 0 CoreDumping: int = 0 THP_enabled: int = 0 untag_mask: str = '' Threads: int = 0 SigQ: str = '' SigPnd: str = '' ShdPnd: str = '' SigBlk: str = '' SigIgn: str = '' SigCgt: str = '' CapInh: str = '' CapPrm: str = '' CapEff: str = '' CapBnd: str = '' CapAmb: str = '' NoNewPrivs: int = 0 Seccomp: int = 0 Seccomp_filters: int = 0 Speculation_Store_Bypass: str = '' SpeculationIndirectBranch: str = '' Cpus_allowed: str = '' Cpus_allowed_list: str = '' Mems_allowed: str = '' Mems_allowed_list: str = '' voluntary_ctxt_switches: int = 0 nonvoluntary_ctxt_switches: int = 0 x86_Thread_features: str | None = '' x86_Thread_features_locked: str | None = '' Umask: str | None = None
[docs] @classmethod def from_lines(cls, lines: List[str]) -> "StatusFile": """ Get a StatusFile from the lines of the file :meta private: """ def parse_value(key: str, value: str): value = value.strip() if not value: return "" if "x86_Thread" in key else [] if key in {"Uid", "Gid", "Groups"}: return list(map(int, value.split())) if key.startswith("Vm") or key in { "HugetlbPages", "voluntary_ctxt_switches", "nonvoluntary_ctxt_switches" }: return int(value.replace("kB", "").strip()) if key in { "Tgid", "Ngid", "Pid", "PPid", "TracerPid", "FDSize", "NStgid", "NSpid", "NSpgid", "NSsid", "Kthread", "CoreDumping", "THP_enabled", "Threads", "NoNewPrivs", "Seccomp", "Seccomp_filters" }: return int(value) return value data = {} for line in lines: if not line.strip() or ':' not in line or line.strip() == "EOF": continue key, value = line.split(":", 1) field_key = key.strip().replace("-", "_") parsed_value = parse_value(key.strip(), value) data[field_key] = parsed_value return cls(**data)
[docs] def get_status_file_from_path(path: str) -> StatusFile | None: """ Given a path, returns a StatusFile, which can be useful for analyzing a process or thread. :param path: path to the status file (will be usually in the form of /proc/[pid or tgid]/status :returns: StatusFile or None if the path doesn't exist (or other unhandled exception) :raises: ValueError if the path isn't valid. """ if not path.endswith("/status"): raise ValueError("Status file, should end with /status.....") try: with open(path, "r", encoding="utf-8") as f: return StatusFile.from_lines(f.readlines()) except FileNotFoundError: return None except Exception as e: logging.error( f"Unexpected error in amsatop library (not caused by this code). " f"Please report it to your instructors. Error: {e}" ) return None