generated from egvimo/template
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathapt_info.py
140 lines (105 loc) · 4.7 KB
/
apt_info.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
#!/usr/bin/env python3
"""
Description: Expose metrics from apt. This is inspired by and
intended to be a replacement for the original apt.sh.
This script deliberately does *not* update the apt cache. You need
something else to run `apt update` regularly for the metrics to be
up to date. This can be done in numerous ways, but the canonical way
is to use the normal `APT::Periodic::Update-Package-Lists`
setting.
This, for example, will enable a nightly job that runs `apt update`:
echo 'APT::Periodic::Update-Package-Lists "1";' > /etc/apt/apt.conf.d/99_auto_apt_update.conf
See /usr/lib/apt/apt.systemd.daily for details.
Dependencies: python3-apt, python3-prometheus-client
Authors: Kyle Fazzari <[email protected]>
Daniel Swarbrick <[email protected]>
"""
import argparse
import collections
import os
import apt
import apt_pkg
from prometheus_client import CollectorRegistry, Gauge, generate_latest
_UpgradeInfo = collections.namedtuple("_UpgradeInfo", ["labels", "count"])
def _convert_candidates_to_upgrade_infos(candidates):
changes_dict = collections.defaultdict(lambda: collections.defaultdict(int))
for candidate in candidates:
origins = sorted(
{f"{o.origin}:{o.codename}/{o.archive}" for o in candidate.origins}
)
changes_dict[",".join(origins)][candidate.architecture] += 1
changes_list = []
for origin in sorted(changes_dict.keys()):
for arch in sorted(changes_dict[origin].keys()):
changes_list.append(
_UpgradeInfo(
labels={"origin": origin, "arch": arch},
count=changes_dict[origin][arch],
)
)
return changes_list
def _write_pending_upgrades(registry, cache):
candidates = {
p.candidate for p in cache if p.is_upgradable
}
upgrade_list = _convert_candidates_to_upgrade_infos(candidates)
if upgrade_list:
g = Gauge('apt_upgrades_pending', "Apt packages pending updates by origin",
['origin', 'arch'], registry=registry)
for change in upgrade_list:
g.labels(change.labels['origin'], change.labels['arch']).set(change.count)
def _write_held_upgrades(registry, cache):
held_candidates = {
p.candidate for p in cache
if p.is_upgradable and p._pkg.selected_state == apt_pkg.SELSTATE_HOLD # pylint: disable=protected-access
}
upgrade_list = _convert_candidates_to_upgrade_infos(held_candidates)
if upgrade_list:
g = Gauge('apt_upgrades_held', "Apt packages pending updates but held back.",
['origin', 'arch'], registry=registry)
for change in upgrade_list:
g.labels(change.labels['origin'], change.labels['arch']).set(change.count)
def _write_autoremove_pending(registry, cache):
autoremovable_packages = {p for p in cache if p.is_auto_removable}
g = Gauge('apt_autoremove_pending', "Apt packages pending autoremoval.",
registry=registry)
g.set(len(autoremovable_packages))
def _write_cache_timestamps(registry):
g = Gauge('apt_package_cache_timestamp_seconds', "Apt update last run time.", registry=registry)
apt_pkg.init_config()
if (
apt_pkg.config.find_b("APT::Periodic::Update-Package-Lists") and
os.path.isfile("/var/lib/apt/periodic/update-success-stamp")
):
# if we run updates automatically with APT::Periodic, we can
# check this timestamp file if it exists
stamp_file = "/var/lib/apt/periodic/update-success-stamp"
else:
# if not, let's just fallback on the partial file of the lists directory
stamp_file = '/var/lib/apt/lists/partial'
try:
g.set(os.stat(stamp_file).st_mtime)
except OSError:
pass
def _write_reboot_required(registry, root_dir):
g = Gauge('node_reboot_required', "Node reboot is required for software updates.",
registry=registry)
g.set(int(os.path.isfile(os.path.join(root_dir, 'run/reboot-required'))))
def generate_metrics(root_dir: str = '/') -> bytes:
cache = apt.cache.Cache(rootdir=root_dir)
registry = CollectorRegistry()
_write_pending_upgrades(registry, cache)
_write_held_upgrades(registry, cache)
_write_autoremove_pending(registry, cache)
_write_cache_timestamps(registry)
_write_reboot_required(registry, root_dir)
return generate_latest(registry)
def _main():
parser = argparse.ArgumentParser()
parser.add_argument('-r', '--root-dir', dest='root_dir', type=str, default='/',
help="Set root directory to a different path than /")
args = parser.parse_args()
metrics = generate_metrics(args.root_dir)
print(metrics.decode(), end='')
if __name__ == "__main__":
_main()