1
Fork 0
mirror of git://git.sv.gnu.org/emacs.git synced 2026-03-26 08:41:47 -07:00
emacs/mps/tool/monitor
2018-08-07 12:29:55 +01:00

1558 lines
56 KiB
Python
Executable file

#!/usr/bin/env python
#
# $Id$
# Copyright (c) 2018 Ravenbrook Limited. See end of file for license.
#
# Read a telemetry stream from a program using the MPS, construct a
# model of the MPS data structures in the progam, and display selected
# time series from the model in a graphical user interface.
#
# Requirements: Python 3.6, Matplotlib, PyQt5.
import argparse
from collections import defaultdict, deque, namedtuple
from itertools import count, cycle
import os
import queue
from struct import Struct
import sys
import threading
import time
import math
import bisect
from matplotlib.backends.qt_compat import QtCore, QtGui, QtWidgets
from matplotlib.backend_bases import key_press_handler
from matplotlib.backends.backend_qt5agg import (
FigureCanvas, NavigationToolbar2QT as NavigationToolbar)
from matplotlib.figure import Figure
from matplotlib import ticker
import mpsevent
class YAxis:
"The Y-axis of a plot."
def __init__(self, label, fmt):
self._label = label
self.fmt = fmt
def label(self): return self._label
def bytesFormat(y):
"Format a number of bytes as a string."
return with_SI_prefix(y) + (' bytes' if y < 10000 else 'B')
def with_SI_prefix(y,unit=''):
if y < 0:
return '-'+with_SI_prefix(-y)
i = 0
while y >= 10000:
y /= 1000
i += 1
return ('{:.0f} {}'.format(y,' KMGTPE'[i])+unit).strip()
def bytesTickFormatter(y,pos):
return with_SI_prefix(y)
def cyc(n):
return with_SI_prefix(n, unit='c')
# The three y axes which we support
bytesAxis = YAxis('bytes', bytesFormat)
fractionAxis = YAxis('fraction', lambda v: f'{v:.5f}')
traceAxis = YAxis('gens', lambda v: f'{v} gens')
countAxis = YAxis('count', lambda v: f'{v:,d}')
# Names of scanning ranks
rank_name = {0: 'Ambig',
1: 'Exact',
2: 'Final',
3: 'Weak',
}
# Names of access modes
access_mode = {1: 'Read',
2: 'Write',
3: 'Read/Write',
}
trace_why = {1: 'gen 0 capacity',
2: 'dynamic criterion',
3: 'opportunitism',
4: 'full incremental',
5: 'full',
6: 'walking',
7: 'extension'
}
# Mapping from event code to a namedtuple for that event.
EVENT_NAMEDTUPLE = {
code: namedtuple(desc.name, ['header'] + [p.name for p in desc.params])
for code, desc in mpsevent.EVENT.items()
}
# Mapping from event code to event name.
EVENT_NAME = {code:desc.name for code, desc in mpsevent.EVENT.items()}
# Unpack function for event header.
HEADER_UNPACK = Struct(mpsevent.HEADER_FORMAT).unpack
# Unpack function for each event code.
EVENT_UNPACK = {c:Struct(d.format).unpack for c, d in mpsevent.EVENT.items()}
# Icon for the toolbar pause button.
PAUSE_ICON = os.path.abspath(os.path.join(os.path.dirname(__file__), 'pause'))
def telemetry_decoder(read):
"""Decode the events in an I/O stream and generate batches of events
as lists of pairs (time, event) in time order, where time is CPU
time in seconds and event is a tuple.
Unknown event codes are read but ignored.
The 'read' argument must be a function implementing the
io.RawIOBase.read specification (that is, it takes a size and
returns up to size bytes from the I/O stream).
"""
# Cache frequently-used values in local variables.
header_desc = mpsevent.HeaderDesc
header_size = mpsevent.HEADER_SIZE
event_dict = mpsevent.EVENT
event_namedtuple = EVENT_NAMEDTUPLE
event_unpack = EVENT_UNPACK
header_unpack = HEADER_UNPACK
EventClockSync_code = mpsevent.Event.EventClockSync.code
EventInit_code = mpsevent.Event.EventInit.code
# Special handling for Intern events.
Intern_desc = mpsevent.Event.Intern
Intern_code = Intern_desc.code
Intern_struct = Struct(Intern_desc.format)
Intern_size = Intern_struct.size
Intern_unpack = Intern_struct.unpack
Intern_namedtuple = event_namedtuple[Intern_code]
batch = [] # Current batch of (unordered) events.
clocks_per_sec = None # CLOCKS_PER_SEC value from EventInit event.
# last two EventClockSync events with distinct clock values
eventclocks = deque(maxlen=2) # header clock values
clocks = deque(maxlen=2) # clock values
def key(event):
# Key function for sorting events into time order.
return event.header.clock
def decoder(n=None):
# Generate up to n batches of events decoded from the I/O stream.
nonlocal clocks_per_sec
for _ in (count() if n is None else range(n)):
header_data = read(header_size)
if not header_data:
break
header = header_desc(*header_unpack(header_data))
code = header.code
size = header.size - header_size
if code == Intern_code:
event_desc = event_dict[code]
assert size <= event_desc.maxsize
event = Intern_namedtuple(
header,
*Intern_unpack(read(Intern_size)),
read(size - Intern_size).rstrip(b'\0'))
elif code in event_dict:
event_desc = event_dict[code]
assert size == event_desc.maxsize
event = event_namedtuple[code](
header, *event_unpack[code](read(size)))
else:
# Unknown code might indicate a new event added since
# mpsevent.py was updated, so just read and ignore.
read(size)
continue
batch.append(event)
if event.header.code == EventClockSync_code:
# Events are output in batches terminated by an
# EventClockSync event. So when we see an
# EventClockSync event with a new clock value, we know
# that we've received all events up to that one and
# can sort and emit the batch.
#
# The Time Stamp Counter frequency can vary due to thermal
# throttling, turbo boost etc., so linearly interpolate within
# each batch to convert to clocks and thence to seconds. (This
# requires at least two EventClockSync events.)
#
# In theory the Time Stamp Counter can wrap around, but it is
# a 64-bit register even on IA-32, and at 2.5 GHz it will take
# hundreds of years to do so, so we ignore this possibility.
#
# TODO: on 32-bit platforms at 1 MHz, clock values will wrap
# around in about 72 minutes and so this needs to be handled.
#
# TODO: reduce problems caused by discretized clock
# values. See job004100.
if clocks and event.clock == clocks[-1]:
# The clock value hasn't changed since the last
# EventClockSync (because clocks_per_sec isn't
# high enough) so we disregard this event,
# otherwise linearising gives us loads of events
# with identical timestamps.
continue
clocks.append(event.clock)
eventclocks.append(event.header.clock)
if len(clocks) == 2:
batch.sort(key=key)
dt = (clocks[1] - clocks[0]) / clocks_per_sec
dTSC = eventclocks[1] - eventclocks[0]
m = dt / dTSC # gradient
t0 = clocks[0] / clocks_per_sec
c = t0 - m * eventclocks[0] # y-intercept
yield [(m * e.header.clock + c, e) for e in batch]
batch.clear()
elif event.header.code == EventInit_code:
stream_version = event.major, event.median, event.minor
if stream_version[:2] != mpsevent.__version__[:2]:
raise RuntimeError(
"Monitor version {} is incompatible with "
"telemetry stream version {}.".format(
'.'.join(map(str, mpsevent.__version__)),
'.'.join(map(str, stream_version))))
clocks_per_sec = event.clocksPerSec
return decoder
def bits_of_word(w, n):
"Generate the bits in the word w, which has n bits."
for _ in range(n):
w, bit = divmod(w, 2)
yield bit
class TimeSeries:
"Series of data points in time order."
def __init__(self, note=None, zoom=None, draw=None):
self._note_fn = note
self._zoom_fn = zoom
self._draw_fn = draw
self.t = []
self.y = []
def __len__(self):
return len(self.t)
# Doesn't handle slices
def __getitem__(self, key):
return (self.t[key], self.y[key])
def append(self, t, y):
"Append data y at time t."
assert not self.t or t >= self.t[-1]
self.t.append(t)
self.y.append(y)
def closest(self, t):
i = bisect.bisect(self.t, t)
if (i == len(self) or
(i > 0 and (self.t[i] - t) > (t - self.t[i-1]))):
i = i-1
return i
def recompute(self, f):
pass
def note(self, line, t, index, verbose=False):
if self._note_fn:
return self._note_fn(line, t, index, verbose=verbose)
return None, None
def zoom(self, line, t, index):
if self._zoom_fn:
return self._zoom_fn(line, t, index)
return None
def draw(self, line, t, index, axes_dict):
if self._draw_fn:
return self._draw_fn(line, t, index, axes_dict)
return None
class Accumulator(TimeSeries):
"Time series that is always non-negative and updates by accumulation."
def __init__(self, initial=0):
super().__init__()
self.value = initial
def add(self, t, delta):
"Add delta to the accumulator at time t."
assert self.value >= -delta
self.append(t, self.value)
self.value += delta
self.append(t, self.value)
def sub(self, t, delta):
"Subtract delta from the accumulator at time t."
assert self.value >= delta
self.append(t, self.value)
self.value -= delta
self.append(t, self.value)
class RateSeries(TimeSeries):
"Time series that counts events within consecutive periods."
def __init__(self, t, period=1):
super().__init__()
self._period = period
self._count = 0
self._start = t
self.ts=[]
self._limit = ((t // period) + 1) * period
def inc(self, t):
self.null(t)
self.ts.append(t)
self._count += 1
def null(self, t):
while t >= self._limit:
self.append(self._limit - self._period/2, self._count)
self._count = 0
self._limit += self._period
def recompute(self, f):
ts = self.ts
self.__init__(self._start, self._period * f)
for t in ts:
self.inc(t)
return f'period {self._period:.3f} s'
class OnOffSeries(TimeSeries):
"""Series of on/off events; can draw as an exponentially weighted moving
average on/off ratio or (potentially) as shading bars."""
def __init__(self, t, k=1):
super().__init__()
self._ons = []
self._start = self._last = t
self._k = k
self._ratio = 0.0
def off(self, t):
dt = t - self._last
f = math.exp(-self._k * dt)
self._ratio = 1 - f * (1 - self._ratio)
self._ons.append((self._last, t))
self._last = t
self.append(t, self._ratio)
def on(self, t):
dt = t - self._last
f = math.exp(-self._k * dt)
self._ratio = f * self._ratio
self._last = t
self.append(t, self._ratio)
def recompute(self, f):
ts = self.t
self.__init__(self._start, self._k / f)
for i in range(len(ts) // 2):
self.on(ts[i*2])
self.off(ts[i*2+1])
return f'time constant: {1/self._k:.3f} s'
def note(self, line, t, index, verbose=False):
on = self._ons[index // 2]
l = on[1]-on[0]
note = f"{line.name}: {on[0]:.3f} + {l * 1000:.3f} ms"
return note, note
def zoom(self, line, t, index):
on = self._ons[index // 2]
return (on[0], on[1])
def draw(self, line, t, index, axes_dict):
axes_to_draw = {ax.bbox.bounds: ax for ax in axes_dict.values()}.values()
on = self._ons[index // 2]
return [ax.axvspan(on[0], on[1], alpha=0.5, facecolor='g')
for ax in axes_to_draw]
class EventHandler:
"""Object that handles a telemetry event by dispatching to the method
with the same name as the event.
"""
def ignore(self, t, event):
"Handle a telemetry event at time t by doing nothing."
def handle(self, t, event):
"Handle a telemetry event at time t by dispatching."
getattr(self, EVENT_NAME[event.header.code], self.ignore)(t, event)
class Pool(EventHandler):
"Model of an MPS pool."
def __init__(self, arena, pointer, t):
"Create Pool owned by arena, at pointer, at time t."
self._arena = arena # Owning arena.
self._model = arena.model # Owning model.
self._pointer = pointer # Pool's pointer.
self._pool_class = None # Pool's class pointer.
self._serial = None # Pool's serial number within arena.
self._alloc = Accumulator()
self._model.add_time_series(
self, self._alloc, bytesAxis, "alloc",
"memory allocated by the pool from the arena",
draw=False)
@property
def name(self):
name = self._model.label(self._pointer)
if not name:
class_name = self._model.label(self._pool_class) or 'Pool'
if self._serial is not None:
name = f"{class_name}[{self._serial}]"
else:
name = f"{class_name}[{self._pointer:x}]"
return f"{self._arena.name}.{name}"
def ArenaAlloc(self, t, event):
self._alloc.add(t, event.size)
def ArenaFree(self, t, event):
self._alloc.sub(t, event.size)
def PoolInit(self, t, event):
self._pool_class = event.poolClass
self._serial = event.serial
class Gen(EventHandler):
"Model of an MPS generation."
def __init__(self, arena, pointer):
self._arena = arena # Owning arena.
self._model = arena.model # Owning model.
self._pointer = pointer # Gen's pointer.
self._serial = None # Gen's serial number.
self.zone_set = 0 # Gen's current zone set.
def update_ref_size(self, t, seg_summary, seg_size):
"""Update the size of segments referencing this generation.
seg_summary must be a mapping from segment to its summary, and
seg_size a mapping from segment to its size in bytes.
"""
ref_size = 0
for seg, summary in seg_summary.items():
if self.zone_set & summary:
ref_size += seg_size[seg]
self._ref_size.append(t, ref_size)
@property
def name(self):
name = self._model.label(self._pointer)
if not name:
if self._serial is not None:
name = f"gen-{self._serial}"
else:
name = f"gen-{self._pointer:x}"
return f"{self._arena.name}.{name}"
def GenZoneSet(self, t, event):
self.zone_set = event.zoneSet
def GenInit(self, t, event):
self._serial = serial = event.serial
self._mortality_trace = mortality_trace = TimeSeries()
per_trace_line = self._model.add_time_series(
self, mortality_trace, fractionAxis, f"mortality.trace",
f"mortality of data in generation, per trace",
draw=False, marker='+', linestyle='None')
self._mortality_average = mortality_average = TimeSeries()
self._model.add_time_series(
self, mortality_average, fractionAxis, f"mortality.avg",
f"mortality of data in generation, moving average",
draw=False, color_as=per_trace_line)
mortality_average.append(t, event.mortality);
self._ref_size = ref_size = TimeSeries()
self._model.add_time_series(
self, ref_size, bytesAxis, f"ref",
f"size of segments referencing generation")
def TraceEndGen(self, t, event):
self._mortality_trace.append(t, event.mortalityTrace)
self._mortality_average.append(t, event.mortalityAverage)
class Trace(EventHandler):
def __init__(self, arena, t, event):
self._arena = arena
self.create = t
self.pauses = (0, 0, 0)
self.why = trace_why[event.why]
self.gens = 'none'
self.times = [(t, event.header.clock, 'create')]
self.sizes = []
self.counts = []
self.accesses = defaultdict(int)
self.pause_start = None
self.begin_pause(t, event)
def add_time(self, name, t, event):
self.times.append((t, event.header.clock, name))
def add_size(self, name, s):
self.sizes.append((name, s))
def add_count(self, name, c):
self.counts.append((name, c))
def begin_pause(self, t, event):
assert self.pause_start is None
self.pause_start = (t, event.header.clock)
def end_pause(self, t, event):
assert self.pause_start is not None
st, sc = self.pause_start
tn, tt, tc = self.pauses
self.pauses = (tn+1, tt + t-st, tc + event.header.clock-sc)
self.pause_start = None
def TraceStart(self, t, event):
self.add_time("start", t, event)
self.add_size("condemned", event.condemned)
self.add_size("notCondemned", event.notCondemned)
self.add_size("foundation", event.foundation)
self.whiteRefSet = event.white
self.whiteZones = bin(self.whiteRefSet).count('1')
def TraceFlipBegin(self, t, event):
self.add_time("flip begin", t, event)
def TraceFlipEnd(self, t, event):
self.add_time("flip end", t, event)
def TraceBandAdvance(self, t, event):
self.add_time(f"band advance {rank_name[event.rank]}", t, event)
def TraceReclaim(self, t, event):
self.add_time("reclaim", t, event)
def TraceDestroy(self, t, event):
self.add_time("destroy", t, event)
self.end_pause(t, event)
def TraceStatScan(self, t, event):
self.add_count('roots scanned', event.rootScanCount)
self.add_size('roots scanned', event.rootScanSize)
self.add_size('copied during root scan', event.rootCopiedSize)
self.add_count('segments scanned', event.segScanCount)
self.add_size('segments scanned', event.segScanSize)
self.add_size('copied during segment scan', event.segCopiedSize)
self.add_count('single ref scan', event.singleScanCount)
self.add_size('single refs scanned', event.singleScanSize)
self.add_size('copied during scan of single refs', event.singleCopiedSize)
self.add_count('read barrier hits', event.readBarrierHitCount)
self.add_count('max grey segments', event.greySegMax)
self.add_count('segments scanned without finding refs to white segments', event.pointlessScanCount)
def TraceStatFix(self, t, event):
self.add_count('fixed refs', event.fixRefCount)
self.add_count('fixed refs referring to segs', event.segRefCount)
self.add_count('fixed white refs', event.whiteSegRefCount)
self.add_count('nailboards', event.nailCount)
self.add_count('snaps', event.snapCount)
self.add_count('forwarded', event.forwardedCount)
self.add_size('forwarded', event.forwardedSize)
self.add_count('preseved in place', event.preservedInPlaceCount)
self.add_size('preserved in place', event.preservedInPlaceSize)
def TraceStatReclaim(self, t, event):
self.add_count('segs reclaimed', event.reclaimCount)
self.add_size('reclaimed', event.reclaimSize)
def ChainCondemnAuto(self, t, event):
self.gens = event.topCondemnedGenIndex + 1
def TraceCondemnAll(self, t, event):
self.gens = "all"
def ArenaAccess(self, t, event):
self.accesses[event.mode] += 1
def ArenaPollBegin(self, t, event):
self.begin_pause(t, event)
def ArenaPollEnd(self, t, event):
self.end_pause(t, event)
def note(self, verbose=False):
base_t, base_cycles, _ = self.times[0]
if verbose:
log = f"Trace of {self.gens} gens at {base_t:.6f} ({self.why}):\nTimes: \n"
ot, oc = base_t, base_cycles
for t,c,n in self.times[1:]:
log += " {:20} +{:.3f} ms ({}): ({:.3f} ms, {})\n".format(
n, (t-ot)*1000, cyc(c-oc), (t-base_t)*1000, cyc(c-base_cycles))
ot, oc = t, c
final_t, final_cycles,_ = self.times[-1]
elapsed_t = final_t - base_t
elapsed_cycles = final_cycles - base_cycles
pn, pt, pc = self.pauses
if pc < elapsed_cycles:
log += f"{pn:,d} Pauses ({pt*1000:,.3f} ms, {cyc(pc)}). Mark/space: {pt/elapsed_t:,.3f}/{pc/elapsed_cycles:,.3f}\n"
log += "Sizes:\n"
for (n, s) in self.sizes:
log += f" {n}: {bytesFormat(s)}\n"
log += "Counts:\n"
for (n, c) in self.counts:
log += f" {n}: {c:,d}\n"
for (mode, count) in sorted(self.accesses.items()):
log += f" {access_mode[mode]} barrier hits: {count:,d}\n"
log += f"white zones: {self.whiteZones}: "
log += ' '.join(f'{((self.whiteRefSet >> (64-8*i)) & 255):08b}'
for i in range(1,9))
else:
log = f"Trace of {self.gens} gens at {base_t:.6f} ({self.why})"
return f"trace\n{self.create:f} s\n{self.gens}", log
def zoom(self):
return (self.times[0][0], self.times[-1][0])
def draw(self, axes_dict):
# uniquify axes based on bounding boxes
axes_to_draw = {ax.bbox.bounds: ax for ax in axes_dict.values()}.values()
return ([ax.axvline(t)
for ax in axes_to_draw
for (t,_,_) in self.times] +
[ax.axvspan(self.times[0][0], self.times[-1][0], alpha=0.5, facecolor='r')
for ax in axes_to_draw])
class Arena(EventHandler):
"Model of an MPS arena."
# Number of pools that are internal to the arena; see the list in
# global.c:GlobalsPrepareToDestroy.
_internal_pools = 4
def __init__(self, model, pointer, t):
"Create Arena owned by model, at pointer, at time t."
self.model = model # Owning model.
self._pointer = pointer # Arena's pointer.
self._arena_class = None # Arena's class pointer.
self._serial = None # Arena's serial number.
self._pools = [] # List of Pools ever belonging to arena.
self._pool = {} # pointer -> Pool (for live pools)
self._gens = [] # List of Gens ever belonging to arena.
self._gen = {} # pointer -> Gen (for live gens)
self._alloc = Accumulator()
self.model.add_time_series(
self, self._alloc, bytesAxis, "alloc",
"total allocation by client pools")
self._poll = OnOffSeries(t)
self.model.add_time_series(
self, self._poll, fractionAxis, "poll",
"polling time moving average",
clickdraw=True)
self._access = {}
for am, name in sorted(access_mode.items()):
self._access[am] = RateSeries(t)
self.model.add_time_series(
self, self._access[am], countAxis, f"{name} barrier",
f"{name} barrier hits per second")
self._last_access = None
self._seg_size = {} # segment pointer -> size
self._seg_summary = {} # segment pointer -> summary
self._zone_ref_size = {} # zone -> refsize Accumulator
self._univ_ref_size = Accumulator()
self.model.add_time_series(
self, self._univ_ref_size, bytesAxis, "zone-univ.ref",
"size of segments referencing the universe")
self._live_traces = {} # trace pointer -> dictionary
self._all_traces = {} # start time -> dictionary
self._traces = TimeSeries(note=self.trace_note,
zoom=self.trace_zoom,
draw=self.trace_draw)
self.model.add_time_series(
self, self._traces, traceAxis, "trace",
"generations condemned by trace", clickdraw=True,
marker='x', linestyle='None')
self._condemned_size = TimeSeries()
self.model.add_time_series(
self, self._condemned_size, bytesAxis, "condemned.size",
"size of segments condemned by trace", marker='+',
linestyle='None')
@property
def name(self):
if len(self.model.arenas) <= 1:
# No need to distinguish arenas if there's just one.
return ""
name = self.model.label(self._pointer)
if not name:
class_name = self.model.label(self._arena_class) or 'Arena'
if self._serial is not None:
name = f"{class_name}[{self._serial}]"
else:
name = f"{class_name}[{self._pointer:x}]"
return name
def delegate_to_pool(self, t, event):
"Handle a telemetry event by delegating to the pool model."
pointer = event.pool
try:
pool = self._pool[pointer]
except KeyError:
self._pool[pointer] = pool = Pool(self, pointer, t)
self._pools.append(pool)
pool.handle(t, event)
def ArenaAlloc(self, t, event):
self.delegate_to_pool(t, event)
if self._pool[event.pool]._serial >= self._internal_pools:
self._alloc.add(t, event.size)
def ArenaFree(self, t, event):
self.delegate_to_pool(t, event)
if self._pool[event.pool]._serial >= self._internal_pools:
self._alloc.sub(t, event.size)
PoolInit = \
delegate_to_pool
def delegate_to_gen(self, t, event):
"Handle a telemetry event by delegating to the generation model."
pointer = event.gen
try:
gen = self._gen[pointer]
except KeyError:
self._gen[pointer] = gen = Gen(self, pointer)
self._gens.append(gen)
gen.handle(t, event)
GenInit = \
GenZoneSet = \
TraceEndGen = \
delegate_to_gen
def ArenaCreateVM(self, t, event):
self._arena_class = event.arenaClass
self._serial = event.serial
ArenaCreateCL = ArenaCreateVM
def PoolFinish(self, t, event):
del self._pool[event.pool]
def GenFinish(self, t, event):
del self._gen[event.gen]
def ArenaPollBegin(self, t, event):
for trace in self._live_traces.values():
trace.ArenaPollBegin(t, event)
self._poll.on(t)
def ArenaPollEnd(self, t, event):
for trace in self._live_traces.values():
trace.ArenaPollEnd(t, event)
self._poll.off(t)
def ArenaAccess(self, t, event):
if self._last_access is None or event.count != self._last_access.count:
self._last_access = event
self._access[event.mode].inc(t)
for trace in self._live_traces.values():
trace.ArenaAccess(t, event)
def null(self, t):
"""Update anything in the model which depends on the passage of time,
such as anything tracking rates."""
for series in self._access.values():
series.null(t)
def trace_note(self, line, t, index, verbose=False):
if t not in self._all_traces:
return f'no trace {t}', f'no trace {t}'
return self._all_traces[t].note(verbose=verbose)
def trace_draw(self, line, t, index, axes_dict):
if t not in self._all_traces:
return []
return self._all_traces[t].draw(axes_dict)
def trace_zoom(self, line, t, index):
if t not in self._all_traces:
return None
return self._all_traces[t].zoom()
def TraceCreate(self, t, event):
assert event.trace not in self._live_traces
assert t not in self._all_traces
trace = Trace(self, t, event)
self._live_traces[event.trace] = self._all_traces[t] = trace
# seems like a reasonable time to call this
self.null(t)
def delegate_to_trace(self, t, event):
"Handle a telemetry event by delegating to the trace model."
trace = self._live_traces[event.trace]
trace.handle(t, event)
return trace
TraceFlipBegin = \
TraceFlipEnd = \
TraceBandAdvance = \
TraceReclaim = \
TraceStatScan = \
TraceStatFix = \
TraceStatReclaim = \
delegate_to_trace
def ChainCondemnAuto(self, t, event):
trace = self.delegate_to_trace(t, event)
self._traces.append(trace.create, event.topCondemnedGenIndex + 1)
def TraceCondemnAll(self, t, event):
trace = self.delegate_to_trace(t, event)
self._traces.append(trace.create, len(self._gens)) # TODO what's the right number here??!
def TraceDestroy(self, t, event):
self.delegate_to_trace(t, event)
del self._live_traces[event.trace]
def TraceStart(self, t, event):
self.delegate_to_trace(t, event)
self._condemned_size.append(t, event.condemned)
if self._seg_summary:
for gen in self._gen.values():
gen.update_ref_size(t, self._seg_summary, self._seg_size)
def SegSetSummary(self, t, event):
size = event.size
self._seg_summary[event.seg] = event.newSummary
self._seg_size[event.seg] = size
n = self.model.word_width
univ = (1 << n) - 1
new_univ = event.newSummary == univ
old_univ = event.oldSummary == univ
self._univ_ref_size.add(t, (new_univ - old_univ) * size)
old_summary = 0 if old_univ else event.oldSummary
new_summary = 0 if new_univ else event.newSummary
for zone, old, new in zip(reversed(range(n)),
bits_of_word(old_summary, n),
bits_of_word(new_summary, n)):
if new == old:
continue
if zone not in self._zone_ref_size:
self._zone_ref_size[zone] = ref_size = Accumulator()
self.model.add_time_series(
self, ref_size, bytesAxis, f"zone-{zone}.ref",
f"size of segments referencing zone {zone}")
self._zone_ref_size[zone].add(t, (new - old) * size)
class Line:
"A line in a Matplotlib plot wrapping a TimeSeries."
colors = cycle('blue orange green red purple brown pink gray olive cyan'
.split())
def __init__(self, owner, series, yaxis, name, desc,
draw=True, color_as=None, clickdraw=False,
**kwargs):
self.owner = owner # Owning object.
self.series = series # Time series.
self.yaxis = yaxis # Y Axis
self._name = name # Brief description.
self.desc = desc # Brief description.
self.draw = draw # Plot this line?
self.clickdraw = clickdraw # should a click on a data point draw something on the axes?
if color_as:
self.color = color_as.color
else:
self.color = next(self.colors)
self.axes = None # Currently plotted on axes.
self.line = None # Matplotlib Line2D object.
self._kwargs = kwargs # Keyword arguments for Axes.plot.
self._marker = 'marker' in self._kwargs
def __len__(self):
return len(self.series)
# doesn't handle slices
def __getitem__(self, key):
return self.series[key]
@property
def name(self):
return f"{self.owner.name}.{self._name}"
@property
def ready(self):
return len(self) >= 1
def unplot(self):
if self.axes:
self.line.remove()
self.axes = None
def plot(self, axes):
"Plot or update line on axes."
x = self.series.t
y = self.series.y
if self.line is None:
self.axes = axes
# lines without markers should have markers if they have a singleton point.
if not self._marker:
if len(self) == 1:
self._kwargs['marker']='x'
else:
self._kwargs.pop('marker',None)
self.line, = axes.plot(x, y, color=self.color, label=self.name,
**self._kwargs)
else:
if self.axes != axes:
self.unplot()
axes.add_line(self.line)
self.axes = axes
self.line.set_data(x, y)
self.line.set_label(self.name)
def contains(self, event):
if self.line is None:
return False, None
return self.line.contains(event)
def dispxy(self, i):
t, y = self[i]
return self.line.axes.transData.transform((t,y))
def closest(self, t, dispx, range=10):
if self.draw and self.ready:
i = self.series.closest(t)
dx, _ = self.dispxy(i)
if abs(dispx-dx) < range:
return i
return None
def note(self, index, verbose=False):
"Return annotation text and log box text for a selected point."
t, _ = self.series[index]
return self.series.note(self, t, index, verbose=verbose)
def zoom(self, index):
"Return (low, high) limits for the point of interest, or None."
t, _ = self.series[index]
return self.series.zoom(self, t, index)
def drawPoint(self, index, axes_dict):
"Draw in response to a click on a data point, and return a list of drawn items."
t,_ = self.series[index]
drawn = self.series.draw(self, t, index, axes_dict)
# Could just draw on axes_dict[self.yaxis] ??
if drawn is None:
if self.clickdraw:
drawn = [ax.axvline(t) for ax in axes_dict.values()]
else:
drawn = []
return drawn
def recompute(self, f):
return self.series.recompute(f)
class Model(EventHandler):
"Model of an application using the MPS."
def __init__(self, event_queue):
"Create model based on queue of batches of telemetry events."
self._queue = event_queue
self._intern = {} # stringId -> string
self._label = {} # address or pointer -> stringId
self._arena = {} # pointer -> Arena (for live arenas)
self.arenas = [] # All arenas created in the model.
self.lines = [] # All Lines available for plotting.
self._needs_redraw = True # Plot needs redrawing?
def add_time_series(self, *args, **kwargs):
"Add a time series to the model."
line = Line(*args, **kwargs)
self.lines.append(line)
return line
def label(self, pointer):
"Return string labelling address or pointer, or None if unlabelled."
return self._intern.get(self._label.get(pointer))
def plot(self, axes_dict, keep_limits=False):
"Draw time series on the given axes."
if not self._needs_redraw:
return
self._needs_redraw = False
# Collate drawable lines by y axis
yaxis_lines = defaultdict(list)
for line in self.lines:
if line.ready and line.draw:
yaxis_lines[line.yaxis].append(line)
else:
line.unplot()
bounds_axes = defaultdict(list) # axes drawn in each area
# Draw the lines
for yax in yaxis_lines:
axes = axes_dict[yax]
axes.set_axis_on()
for line in yaxis_lines[yax]:
line.plot(axes)
if not keep_limits:
axes.relim(visible_only=True)
axes.autoscale_view()
bounds_axes[axes.bbox.bounds].append((axes, yax))
# Set the format_coord method for each axes
for bounds, ax_list in bounds_axes.items():
if len(ax_list) > 1:
# If format_coord iterates of ax_list, it may iterate
# over the wrong value of ax_list (the last value
# bound by the bounds_axes iteration). So build this
# separate yax_trans_list here.
yax_trans_list = [(yax, ax.transData)
for ax, yax in ax_list]
for ax, yax in ax_list:
tData = ax.transData
def format_coord(x, y):
# x, y are data coordinates
_, axy = tData.transform((0, y)) # y in display coordinates
# Invert the transforms here. If you invert them at plotting time
# and cache them so we don't have to invert them every time format_coord
# is called, then you get the wrong answer.
return (f'{x:f} s, ' +
', '.join(inner_yax.fmt(t.inverted().transform((0, axy))[1])
for inner_yax, t in yax_trans_list))
ax.format_coord = format_coord
else:
ax, yax = ax_list[0]
def format_coord(x, y):
return f'{x:f} s, {yax.fmt(y)}'
ax.format_coord = format_coord
def update(self):
"Consume available telemetry events and update the model."
while True:
try:
batch = self._queue.get_nowait()
except queue.Empty:
break
else:
for t, event in batch:
self.handle(t, event)
def needs_redraw(self):
"Call this when the model needs redrawing."
self._needs_redraw = True
def delegate_to_arena(self, t, event):
"Handle a telemetry event by delegating to the arena model."
addr = event.arena
try:
arena = self._arena[addr]
except KeyError:
self._arena[addr] = arena = Arena(self, addr, t)
self.arenas.append(arena)
arena.handle(t, event)
ArenaAlloc = \
ArenaCreateCL = \
ArenaCreateVM = \
ArenaFree = \
ArenaPollBegin = \
ArenaPollEnd = \
ChainCondemnAuto = \
GenInit = \
GenFinish = \
GenZoneSet = \
PoolFinish = \
PoolInit = \
SegSetSummary = \
TraceCondemnAll = \
TraceEndGen = \
TraceStart = \
TraceCreate = \
TraceDestroy = \
TraceStart = \
TraceFlipBegin = \
TraceFlipEnd = \
TraceBandAdvance = \
TraceReclaim = \
TraceStatScan = \
TraceStatFix = \
TraceStatReclaim = \
ArenaAccess = \
delegate_to_arena
def EventClockSync(self, t, event):
self.needs_redraw()
def Intern(self, t, event):
self._intern[event.stringId] = event.string.decode('ascii', 'replace')
def Label(self, t, event):
self._label[event.address] = event.stringId
def LabelPointer(self, t, event):
self._label[event.pointer] = event.stringId
def ArenaDestroy(self, t, event):
del self._arena[event.arena]
def EventInit(self, t, event):
self.word_width = event.wordWidth
class ApplicationToolbar(NavigationToolbar):
"Subclass of Matplotlib's navigation toolbar adding a pause button."
def __init__(self, *args):
self.toolitems += (('Pause', 'Pause', PAUSE_ICON, 'pause'),)
super().__init__(*args)
self._actions['pause'].setCheckable(True)
self.paused = False
def pause(self):
"Toggle the pause button."
self.paused = not self.paused
self._actions['pause'].setChecked(self.paused)
class ApplicationWindow(QtWidgets.QMainWindow):
"""PyQt5 application displaying time series derived from MPS telemetry
output.
"""
def __init__(self, model : Model, title : str):
"""Create application. 'model' is the MPS model whose time series are
to be displayed, and 'title' is the main window title.
"""
super().__init__()
self._model = model # The MPS model.
self._home_limits = None # Limits of the graph in "home" position.
self._line_checkbox = {} # Line -> QCheckbox
self.setWindowTitle(title)
main = QtWidgets.QWidget()
self.setCentralWidget(main)
# make a splitter and a layout to contain it
main_layout = QtWidgets.QHBoxLayout()
splitter = QtWidgets.QSplitter(QtCore.Qt.Vertical)
main_layout.addWidget(splitter)
main.setLayout(main_layout)
# Above the splitter, an hbox layout
upper = QtWidgets.QWidget()
upper_layout = QtWidgets.QHBoxLayout()
upper.setLayout(upper_layout)
splitter.addWidget(upper)
# Scrollable list of checkboxes, one for each time series.
self._lines = QtWidgets.QVBoxLayout()
self._lines_scroll = QtWidgets.QScrollArea(
horizontalScrollBarPolicy=QtCore.Qt.ScrollBarAlwaysOff)
self._lines_widget = QtWidgets.QWidget()
lines_layout = QtWidgets.QVBoxLayout(self._lines_widget)
lines_layout.addLayout(self._lines)
lines_layout.addStretch(1)
self._lines_scroll.setWidget(self._lines_widget)
self._lines_scroll.setWidgetResizable(True)
upper_layout.addWidget(self._lines_scroll)
# Matplot canvas.
self._canvas = FigureCanvas(Figure(figsize=(10, 8)))
upper_layout.addWidget(self._canvas)
# Create all axes, set up tickmarks etc
bytes_axes, trace_axes = self._canvas.figure.subplots(nrows=2, sharex=True,
gridspec_kw={'hspace':0,
'height_ratios':(5, 2)})
fraction_axes = bytes_axes.twinx()
count_axes = trace_axes.twinx()
self._axes_dict = {bytesAxis: bytes_axes,
fractionAxis: fraction_axes,
traceAxis: trace_axes,
countAxis: count_axes}
for yax in self._axes_dict:
self._axes_dict[yax].set_ylabel(yax.label())
self._axes_dict[yax].set_xlabel("time (seconds)")
# bytes tick labels in megabytes etc.
bytes_axes.ticklabel_format(style='plain')
bytes_axes.yaxis.set_major_formatter(ticker.FuncFormatter(bytesTickFormatter))
# make a toolbar and put it on the top of the whole layout.
self._toolbar = ApplicationToolbar(self._canvas, self)
self.addToolBar(QtCore.Qt.TopToolBarArea, self._toolbar)
# Below the splitter, a text box for logging
self._logbox = QtWidgets.QTextEdit()
self._logbox.setReadOnly(True)
self._logbox.setLineWrapMode(True)
splitter.addWidget(self._logbox)
# Line annotations.
self._line_annotation = bytes_axes.annotate(
"", xy=(0, 0), xytext=(-20, 20),
textcoords='offset points',
bbox=dict(boxstyle='round', fc='w'),
arrowprops=dict(arrowstyle='->'),
annotation_clip=False,
visible=False)
self._line_annotation.get_bbox_patch().set_alpha(0.8)
self._canvas.mpl_connect("button_release_event", self._click)
# points close to time of most recent selection, on each line, in increasing y order
# (line, index, ...)
self._close_points = None
# Map from line to index into self._close_points
self._close_line = None
# index of currently selected point in self._close_points
self._selected = None
# Things drawn for the current selection
self._drawn = []
# pass all keystrokes to on_key_press, where we can capture them or pass them on to
# the toolbar.
self._canvas.mpl_connect('key_press_event', self._on_key_press)
self._canvas.setFocusPolicy(QtCore.Qt.StrongFocus)
self._canvas.setFocus()
# keyboard shortcuts (see on_key_press()).
self._shortcuts = {'ctrl+W': self.close,
'right': self._next_point,
'left': self._previous_point,
'up': self._up_line,
'down': self._down_line,
'pageup': self._slower,
'pagedown': self._faster,
'pause': self._toolbar.pause,
'+': self._zoom_in,
'-': self._zoom_out,
'z': self._zoom,
'i': self._info,
}
# Call self._update in a loop forever.
self._update()
self._timer = self._canvas.new_timer(100, [(self._update, (), {})])
self._timer.start()
def _log(self, msg):
"Append msg to the log box."
self._logbox.append(msg)
def _on_key_press(self, event):
"""Handle a keyboard event, either one of our shortcuts or something for the
navigation toolbar."""
if event.key in self._shortcuts:
self._shortcuts[event.key]()
else:
key_press_handler(event, self._canvas, self._toolbar)
def _next_point(self):
"""Select the next point on the selected line."""
if self._close_points is None:
return
for d in self._drawn:
d.set_visible(False)
line, index = self._close_points[self._selected]
self._select(line, index + 1)
def _previous_point(self):
"""Select the previous point on the selected line."""
if self._close_points is None:
return
for d in self._drawn:
d.set_visible(False)
line, index = self._close_points[self._selected]
self._select(line, index - 1)
def _up_line(self):
"""Select the point on the line above the currently selected point."""
if self._selected is None:
return
self._annotate(self._selected + 1)
def _down_line(self):
"""Select the point on the line below the currently selected point."""
if self._selected is None:
return
self._annotate(self._selected - 1)
def _select(self, line, index):
"""Select the point with index `index` on `line`, if it exists."""
if index < 0 or index >= len(line):
return
t, y = line[index]
self._recentre(mid=t)
dispx, _ = line.dispxy(index)
self._find_close(t, dispx, on_line=line, index=index)
self._annotate(self._close_line[line])
def _clear(self):
"Remove annotations."
self._line_annotation.set_visible(False)
for d in self._drawn:
d.set_visible(False)
self._drawn = []
def _unselect(self, line=None):
"Undo selection. If `line` is currently selected, remove annotations."
if self._selected is not None and line is not None:
selected_line, index = self._close_points[self._selected]
if line == selected_line:
self._clear()
self._selected = self._close_points = None
def _annotate(self, line_index):
"Select the closest point on line `line_index`."
if line_index < 0 or line_index >= len(self._close_points):
return
self._selected = line_index
line, index = self._close_points[self._selected]
x, y = line[index]
note, log = line.note(index)
if note is None:
note = [f"{line.name}",f"{x:f} s",f"{line.yaxis.fmt(y)}"]
log = ' '.join(note)
note = '\n'.join(note)
self._log(log)
self._clear()
a = self._line_annotation
if a.figure is not None:
a.remove()
line.axes.add_artist(a)
a.xy = x, y
a.set_text(note)
a.set_visible(True)
self._drawn += line.drawPoint(index, self._axes_dict)
def _info(self):
"Report more information about the currently selected point."
if self._close_points is None:
self._log('No selected data point')
return
line, index = self._close_points[self._selected]
note, log = line.note(index, verbose=True)
if log is not None:
self._log(log)
def _find_close(self, t, dispx, on_line=None, index=None):
"Find all the points at times close to `t`, so we can select one."
pts = []
for line in self._model.lines:
if line == on_line:
closest = index
else:
closest = line.closest(t, dispx)
if closest is not None:
_, dispy = line.dispxy(closest)
pts.append((dispy, line, closest))
self._close_points = []
self._close_line = {}
for dispy, line, index in sorted(pts, key=lambda pt:pt[0]):
self._close_line[line] = len(self._close_points)
self._close_points.append((line, index))
def _recompute(self, factor):
self._log(f'Scaling time constants by a factor {factor}:...')
for line in self._model.lines:
log = line.recompute(factor)
if log:
self._log(f' {line.name}: {log}')
self._model.needs_redraw()
def _slower(self):
self._recompute(2)
def _faster(self):
self._recompute(0.5)
def _click(self, event):
"Handle left mouse click by annotating line clicked on."
if event.button != 1 or not event.inaxes:
return
# if we want control-click, shift-click, and so on:
# modifiers = QtGui.QGuiApplication.keyboardModifiers()
# if (modifiers & QtCore.Qt.ControlModifier): ...
for line in self._model.lines:
if not (line.ready and line.draw):
continue
contains, index = line.contains(event)
if contains:
i = index['ind'][0]
t, y = line[i]
dispx, _ = line.dispxy(i)
self._find_close(t, dispx)
self._annotate(self._close_line[line])
break
else:
self._unselect()
self._clear()
def _zoom_in(self):
self._recentre(zoom=2)
def _zoom_out(self):
self._recentre(zoom=0.5)
def _zoom(self):
if self._close_points is None:
self._log('No selected data point')
return
line, index = self._close_points[self._selected]
lim = line.zoom(index)
if lim is None:
self._recentre(zoom=2, mid=line[index][0])
else: # make a bit of slack
lo, hi = lim
width = hi-lo
self._zoom_to(lo - width/4, hi + width/4)
def _recentre(self, zoom=1.0, mid=None):
xlim, _ = self._limits
lo, hi = xlim
if mid is None:
mid = (hi+lo)/2
half_width = (hi-lo)/2
newlo, newhi = mid- half_width/zoom, mid + half_width/zoom
self._zoom_to(newlo, newhi)
def _zoom_to(self, lo, hi):
ax = self._axes_dict[bytesAxis]
ax.set_xlim(lo, hi)
@property
def _limits(self):
"Current x and y limits of the Matplotlib graph."
ax = self._axes_dict[bytesAxis]
return ax.get_xlim(), ax.get_ylim()
def _update(self):
"Update the model and redraw if not paused."
if (not self._toolbar.paused
and self._home_limits not in (None, self._limits)):
# Limits changed (for example, because user zoomed in), so
# pause further updates to the limits of all axes, to give
# user a chance to explore.
self._toolbar.pause()
self._home_limits = None
self._model.update()
self._model.plot(self._axes_dict, keep_limits = self._toolbar.paused)
if not self._toolbar.paused:
self._home_limits = self._limits
self._canvas.draw()
# Find new time series and create corresponding checkboxes.
checkboxes_changed = False
for line in self._model.lines:
if not line.ready:
continue
new_name = line.name
if line in self._line_checkbox:
# A line's name can change dynamically (for example, because
# of the creation of a second arena, or a Label event), so
# ensure that it is up to date.
old_name = self._line_checkbox[line].text()
if old_name != new_name:
self._line_checkbox[line].setText(new_name)
checkboxes_changed = True
else:
checkboxes_changed = True
checkbox = QtWidgets.QCheckBox(new_name)
self._line_checkbox[line] = checkbox
checkbox.setChecked(line.draw)
checkbox.setToolTip(f"{line.desc} ({line.yaxis.label()})")
self._lines.addWidget(checkbox)
def state_changed(state, line=line):
self._unselect(line)
line.draw = bool(state)
self._model.needs_redraw()
checkbox.stateChanged.connect(state_changed)
checkbox.setStyleSheet(f"color:{line.color}")
# Sort checkboxes into order by name and update width.
if checkboxes_changed:
checkboxes = self._line_checkbox.values()
for checkbox in checkboxes:
self._lines.removeWidget(checkbox)
for checkbox in sorted(checkboxes, key=lambda c:c.text()):
self._lines.addWidget(checkbox)
self._lines_scroll.setFixedWidth(
self._lines_widget.sizeHint().width())
def main():
parser = argparse.ArgumentParser(description="Memory Pool System Monitor.")
parser.add_argument(
'telemetry', metavar='FILENAME', nargs='?', type=str,
default=os.environ.get('MPS_TELEMETRY_FILENAME', 'mpsio.log'),
help="telemetry output from the MPS instance")
args = parser.parse_args()
with open(args.telemetry, 'rb') as telemetry_file:
event_queue = queue.Queue()
model = Model(event_queue)
decoder = telemetry_decoder(telemetry_file.read)
for batch in decoder(1):
event_queue.put(batch)
model.update()
stop = threading.Event()
def decoder_thread():
while not stop.isSet():
for batch in decoder():
if stop.isSet():
break
event_queue.put(batch)
thread = threading.Thread(target=decoder_thread)
thread.start()
qapp = QtWidgets.QApplication([])
app = ApplicationWindow(model, args.telemetry)
app.show()
result = qapp.exec_()
stop.set()
thread.join()
return result
if __name__ == '__main__':
exit(main())
# C. COPYRIGHT AND LICENCE
#
# Copyright (c) 2018 Ravenbrook Ltd. All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# 1. Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
#
# 2. Redistributions in binary form must reproduce the above copyright
# notice, this list of conditions and the following disclaimer in the
# documentation and/or other materials provided with the
# distribution.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
# EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
# PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
# PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
# LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
# NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
# SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#
#
# $Id$