#!/usr/bin/env python # # $Id$ # Copyright (c) 2018-2020 Ravenbrook Limited. See end of file for license. # # Read a telemetry stream from a program using the MPS, construct a # model of the MPS data structures in the progam, and display selected # time series from the model in a graphical user interface. # # Requirements: Python 3.6, Matplotlib, PyQt5. import argparse import bisect from collections import defaultdict, deque, namedtuple from contextlib import redirect_stdout, ContextDecorator import decimal from itertools import count, cycle, product import math import os import queue from struct import Struct import sys import threading import time import traceback from matplotlib.backend_bases import key_press_handler from matplotlib.backends.qt_compat import QtCore, QtGui, QtWidgets from matplotlib.backends.backend_qt5agg import ( FigureCanvas, NavigationToolbar2QT as NavigationToolbar) from matplotlib.figure import Figure from matplotlib import ticker import mpsevent # Mapping from event code to a namedtuple for that event. EVENT_NAMEDTUPLE = { code: namedtuple(desc.name, ['header'] + [p.name for p in desc.params]) for code, desc in mpsevent.EVENT.items() } # Mapping from event code to event name. EVENT_NAME = {code:desc.name for code, desc in mpsevent.EVENT.items()} # Unpack function for event header. HEADER_UNPACK = Struct(mpsevent.HEADER_FORMAT).unpack # Unpack function for each event code. EVENT_UNPACK = {c:Struct(d.format).unpack for c, d in mpsevent.EVENT.items()} # Icon for the toolbar pause button. PAUSE_ICON = os.path.abspath(os.path.join(os.path.dirname(__file__), 'pause')) def telemetry_decoder(read): """Decode the events in an I/O stream and generate batches of events as lists of pairs (time, event) in time order, where time is CPU time in seconds and event is a tuple. Unknown event codes are read but ignored. The 'read' argument must be a function implementing the io.RawIOBase.read specification (that is, it takes a size and returns up to size bytes from the I/O stream). """ # Cache frequently-used values in local variables. header_desc = mpsevent.HeaderDesc header_size = mpsevent.HEADER_SIZE event_dict = mpsevent.EVENT event_namedtuple = EVENT_NAMEDTUPLE event_unpack = EVENT_UNPACK header_unpack = HEADER_UNPACK EventClockSync_code = mpsevent.Event.EventClockSync.code EventInit_code = mpsevent.Event.EventInit.code # Special handling for Intern events. Intern_desc = mpsevent.Event.Intern Intern_code = Intern_desc.code Intern_struct = Struct(Intern_desc.format) Intern_size = Intern_struct.size Intern_unpack = Intern_struct.unpack Intern_namedtuple = event_namedtuple[Intern_code] batch = [] # Current batch of (unordered) events. clocks_per_sec = None # CLOCKS_PER_SEC value from EventInit event. # Last two EventClockSync events with distinct clock values. eventclocks = deque(maxlen=2) # Eventclock values. clocks = deque([float('-inf')] * 2, maxlen=2) # Corresponding clock values. def key(event): # Key function for sorting events into time order. return event.header.clock def decoder(n=None): # Generate up to n batches of events decoded from the I/O stream. nonlocal clocks_per_sec for _ in (count() if n is None else range(n)): header_data = read(header_size) if not header_data: break header = header_desc(*header_unpack(header_data)) code = header.code size = header.size - header_size if code == Intern_code: event_desc = event_dict[code] assert size <= event_desc.maxsize event = Intern_namedtuple( header, *Intern_unpack(read(Intern_size)), read(size - Intern_size).rstrip(b'\0')) elif code in event_dict: event_desc = event_dict[code] assert size == event_desc.maxsize event = event_namedtuple[code]( header, *event_unpack[code](read(size))) else: # Unknown code might indicate a new event added since # mpsevent.py was updated, so just read and ignore. read(size) continue batch.append(event) if event.header.code == EventClockSync_code: # Events are output in batches terminated by an EventClockSync # event. So when we see an EventClockSync event with a new # clock value, we know that we've received all events up to # that one and can sort and emit the batch. # # The Time Stamp Counter frequency can vary due to thermal # throttling, turbo boost etc., so linearly interpolate within # each batch to convert to clocks and thence to seconds. (This # requires at least two EventClockSync events.) # # In theory the Time Stamp Counter can wrap around, but it is # a 64-bit register even on IA-32, and at 2.5 GHz it will take # hundreds of years to do so, so we ignore this possibility. # # TODO: on 32-bit platforms at 1 MHz, clock values will wrap # around in about 72 minutes and so this needs to be handled. # # TODO: reduce problems caused by discretized clock # values. See job004100. if event.clock == clocks[-1]: # The clock value hasn't changed since the last # EventClockSync (because clocks_per_sec isn't high # enough) so we disregard this event, otherwise # linearising gives us loads of events with identical # timestamps. continue clocks.append(event.clock) eventclocks.append(event.header.clock) if len(eventclocks) == 2: batch.sort(key=key) dt = (clocks[1] - clocks[0]) / clocks_per_sec d_eventclock = eventclocks[1] - eventclocks[0] m = dt / d_eventclock # Gradient. t0 = clocks[0] / clocks_per_sec c = t0 - m * eventclocks[0] # Y-intercept. yield [(m * e.header.clock + c, e) for e in batch] batch.clear() elif event.header.code == EventInit_code: stream_version = event.major, event.median, event.minor if stream_version[:2] != mpsevent.__version__[:2]: raise RuntimeError( "Monitor version {} is incompatible with " "telemetry stream version {}.".format( '.'.join(map(str, mpsevent.__version__)), '.'.join(map(str, stream_version)))) clocks_per_sec = event.clocksPerSec return decoder # SI_PREFIX[i] is the SI prefix for 10 to the power of 3(i-8). SI_PREFIX = list('yzafpnµm') + [''] + list('kMGTPEZY') def with_SI_prefix(y, precision=5, unit=''): "Turn the number y into a string using SI prefixes followed by unit." if y < 0: return '-' + with_SI_prefix(-y, precision, unit) y = decimal.Context(prec=precision).create_decimal(y) e = y.adjusted() # Exponent of leading digit. if e: e -= 1 + (e - 1) % 3 # Make exponent a multiple of 3. prefixed_unit = SI_PREFIX[e // 3 + 8] + unit return f"{y.scaleb(-e):f}" + " " * bool(prefixed_unit) + prefixed_unit def format_bytes(y): "Format a number of bytes as a string." return with_SI_prefix(y) + (' bytes' if y < 10000 else 'B') @ticker.FuncFormatter def format_tick_bytes(y, pos): "A tick formatter for matplotlib, for a number of bytes." return with_SI_prefix(y) def format_cycles(n): "Format a number of clock cycles as a string." return with_SI_prefix(n, unit='c') def format_seconds(t): "Format a duration in seconds as a string." return with_SI_prefix(t, unit='s') def bits_of_word(w, n): "Generate the bits in the word w, which has n bits." for _ in range(n): w, bit = divmod(w, 2) yield bit AxisDesc = namedtuple('AxisDesc', 'label format') AxisDesc.__doc__ = """Description of how to format an axis of a plot. label: str -- label for the whole axis. format -- function taking a value and returning it as a readable string. """ # The y-axes which we support. BYTES_AXIS = AxisDesc('bytes', format_bytes) FRACTION_AXIS = AxisDesc('fraction', '{:.5f}'.format) TRACE_AXIS = AxisDesc('gens', '{:,.2f} gens'.format) COUNT_AXIS = AxisDesc('count', '{:,.0f}'.format) class TimeSeries: "Series of data points in time order." def __init__(self): self.t = [] self.y = [] def __len__(self): return len(self.t) # Doesn't handle slices def __getitem__(self, key): return self.t[key], self.y[key] def append(self, t, y): "Append data y at time t." assert not self.t or t >= self.t[-1] self.t.append(t) self.y.append(y) def closest(self, t): "Return the index of the closest point in the series to time `t`." i = bisect.bisect(self.t, t) if (i == len(self) or (i > 0 and (self.t[i] - t) > (t - self.t[i - 1]))): i -= 1 return i def recompute(self, f): "Recompute the time series with a time constant changed by factor `f`" def note(self, line, index): "Return list of lines briefly describing the data point at index." t, y = self[index] return [line.name, format_seconds(t), line.yaxis.format(y)] def info(self, line, index): "Return list of lines describing the data point at index in detail." return self.note(line, index) def zoom(self, line, index): """Return minimum and maximum times for a zoom range around the data point at the given index, or None if there's no particular range. """ return None def draw(self, line, index, axes_dict): """Draw something on the axes in `axes_dict` when the data point at the given index is selected. """ return None class Accumulator(TimeSeries): "Time series that is always non-negative and updates by accumulation." def __init__(self, initial=0): super().__init__() self.value = initial def add(self, t, delta): "Add delta to the accumulator at time t." assert self.value >= -delta self.append(t, self.value) self.value += delta self.append(t, self.value) def sub(self, t, delta): "Subtract delta from the accumulator at time t." assert self.value >= delta self.append(t, self.value) self.value -= delta self.append(t, self.value) class RateSeries(TimeSeries): "Time series of periodized counts of events." def __init__(self, t, period=1): """Create a RateSeries. Argument t gives the start time, and period the length of periods in seconds (default 1). """ super().__init__() self._period = period self._count = 0 # Count of events within current period. # Consider a series starting near the beginning of time to be # starting at zero. if t < period / 16: self._start = 0 else: self._start = t self._event_t = [] # Timestamps of the individual events. self._limit = ((t // period) + 1) * period # End of current period. def inc(self, t): "A counted event took place." self.update_to(t) self._event_t.append(t) self._count += 1 def update_to(self, t): """Bring series up to timestamp t, possibly completing one or more periods. """ while t >= self._limit: self.append(self._limit - self._period / 2, self._count) self._count = 0 self._limit += self._period def recompute(self, f): "Recompute the series with a different period." event_t = self._event_t self.__init__(self._start, self._period * f) for t in event_t: self.inc(t) return f'period {format_seconds(self._period)}' def note(self, line, index): start = self._start + self._period * index end = start + self._period return [line.name, f"{format_seconds(start)} -- {format_seconds(end)}", line.yaxis.format(self.y[index])] def zoom(self, line, index): start = self._start + self._period * index end = start + self._period return start, end def draw(self, line, index, axes_dict): ax = axes_dict[line.yaxis] start = self._start + self._period * index end = start + self._period return [ax.axvspan(start, end, alpha=0.5, facecolor=line.color)] class OnOffSeries(TimeSeries): """Series of on/off events; can draw as an exponentially weighted moving average on/off ratio or (potentially) as shading bars. """ def __init__(self, t, k=1): super().__init__() self._ons = [] self._start = self._last = t self._k = k self._ratio = 0.0 def on(self, t): "Record the start of an event." dt = t - self._last f = math.exp(-self._k * dt) self._ratio = f * self._ratio self._last = t self.append(t, self._ratio) def off(self, t): "Record the end of an event." dt = t - self._last f = math.exp(-self._k * dt) self._ratio = 1 - f * (1 - self._ratio) self._ons.append((self._last, t)) self._last = t self.append(t, self._ratio) def recompute(self, f): ts = self.t self.__init__(self._start, self._k / f) for i in range(len(ts) // 2): self.on(ts[i * 2]) self.off(ts[i * 2 + 1]) return f'time constant: {format_seconds(1 / self._k)}' def note(self, line, index): on = self._ons[index // 2] return [f"{line.name}", f"{format_seconds(on[0])} + {format_seconds(on[1] - on[0])}"] def zoom(self, line, index): on = self._ons[index // 2] return on[0], on[1] def draw(self, line, index, axes_dict): axes_to_draw = {ax.bbox.bounds: ax for ax in axes_dict.values()}.values() on = self._ons[index // 2] return [ax.axvspan(on[0], on[1], alpha=0.5, facecolor=line.color) for ax in axes_to_draw] class TraceSeries(TimeSeries): "Time series of traces." def __init__(self, traces): """Create a time series of traces. The argument traces must be a mapping from start time to the Trace object that started at that time. """ super().__init__() self._traces = traces def delegate_to_trace(name): def wrapped(self, line, index, *args): t, _ = self[index] return getattr(self._traces[t], name)(*args) return wrapped note = delegate_to_trace('note') info = delegate_to_trace('info') zoom = delegate_to_trace('zoom') draw = delegate_to_trace('draw') class EventHandler: """Model of an MPS data structure that handles a telemetry event by dispatching to the method with the same name as the event. """ def ignore(self, t, event): "Handle a telemetry event at time t by doing nothing." def handle(self, t, event): "Handle a telemetry event at time t by dispatching." getattr(self, EVENT_NAME[event.header.code], self.ignore)(t, event) class Pool(EventHandler): "Model of an MPS pool." def __init__(self, arena, pointer, t): "Create Pool owned by arena, at pointer, at time t." self._arena = arena # Owning arena. self._model = arena.model # Owning model. self._pointer = pointer # Pool's pointer. self._pool_class = None # Pool's class pointer. self._serial = None # Pool's serial number within arena. self._alloc = Accumulator() self._model.add_time_series( self, self._alloc, BYTES_AXIS, "alloc", "memory allocated by the pool from the arena", draw=False) @property def name(self): name = self._model.label(self._pointer) if not name: class_name = self._model.label(self._pool_class) or 'Pool' if self._serial is not None: name = f"{class_name}[{self._serial}]" else: name = f"{class_name}[{self._pointer:x}]" return f"{self._arena.name}.{name}" def ArenaAlloc(self, t, event): self._alloc.add(t, event.size) def ArenaFree(self, t, event): self._alloc.sub(t, event.size) def PoolInit(self, t, event): self._pool_class = event.poolClass self._serial = event.serial class Gen(EventHandler): "Model of an MPS generation." def __init__(self, arena, pointer): self._arena = arena # Owning arena. self._model = arena.model # Owning model. self._pointer = pointer # Gen's pointer. self._serial = None # Gen's serial number. self.zone_set = 0 # Gen's current zone set. def update_ref_size(self, t, seg_summary, seg_size): """Update the size of segments referencing this generation. seg_summary must be a mapping from segment to its summary, and seg_size a mapping from segment to its size in bytes. """ ref_size = 0 for seg, summary in seg_summary.items(): if self.zone_set & summary: ref_size += seg_size[seg] self._ref_size.append(t, ref_size) @property def name(self): name = self._model.label(self._pointer) if not name: if self._serial is not None: name = f"gen-{self._serial}" else: name = f"gen-{self._pointer:x}" return f"{self._arena.name}.{name}" def GenZoneSet(self, t, event): self.zone_set = event.zoneSet def GenInit(self, t, event): self._serial = serial = event.serial self._mortality_trace = mortality_trace = TimeSeries() per_trace_line = self._model.add_time_series( self, mortality_trace, FRACTION_AXIS, f"mortality.trace", f"mortality of data in generation, per trace", draw=False, marker='+', linestyle='None') self._mortality_average = mortality_average = TimeSeries() self._model.add_time_series( self, mortality_average, FRACTION_AXIS, f"mortality.avg", f"mortality of data in generation, moving average", draw=False, color=per_trace_line.color) mortality_average.append(t, event.mortality); self._ref_size = ref_size = TimeSeries() self._model.add_time_series( self, ref_size, BYTES_AXIS, f"ref", f"size of segments referencing generation") def TraceEndGen(self, t, event): self._mortality_trace.append(t, event.mortalityTrace) self._mortality_average.append(t, event.mortalityAverage) class Trace(EventHandler): "Model of an MPS Trace." def __init__(self, arena, t, event): self._arena = arena self.create = t self.pauses = (0, 0, 0) self.why = mpsevent.TRACE_START_WHY[event.why] self.gens = 'none' self.times = [(t, event.header.clock, 'create')] self.sizes = [] self.counts = [] self.accesses = defaultdict(int) self.pause_start = None self.pause_begin(t, event) def add_time(self, name, t, event): "Log a particular event for this trace, e.g. beginning or end of a phase." self.times.append((t, event.header.clock, name)) def add_size(self, name, s): "Log a size related to this trace, so all sizes can be reported together." self.sizes.append((name, s)) def add_count(self, name, c): "Log a count related to this trace, so all counts can be reported together." self.counts.append((name, c)) def pause_begin(self, t, event): """Log the start of some MPS activity during this trace, so we can compute mark/space etc. """ assert self.pause_start is None self.pause_start = (t, event.header.clock) def pause_end(self, t, event): """Log the end of some MPS activity during this trace, so we can compute mark/space etc. """ assert self.pause_start is not None st, sc = self.pause_start tn, tt, tc = self.pauses self.pauses = (tn + 1, tt + t - st, tc + event.header.clock - sc) self.pause_start = None def TraceStart(self, t, event): self.add_time("start", t, event) self.add_size("condemned", event.condemned) self.add_size("notCondemned", event.notCondemned) self.add_size("foundation", event.foundation) self.whiteRefSet = event.white self.whiteZones = bin(self.whiteRefSet).count('1') def TraceFlipBegin(self, t, event): self.add_time("flip begin", t, event) def TraceFlipEnd(self, t, event): self.add_time("flip end", t, event) def TraceBandAdvance(self, t, event): self.add_time(f"{mpsevent.RANK[event.rank].lower()} band", t, event) def TraceReclaim(self, t, event): self.add_time("reclaim", t, event) def TraceDestroy(self, t, event): self.add_time("destroy", t, event) def TraceStatScan(self, t, event): self.add_count('roots scanned', event.rootScanCount) self.add_size('roots scanned', event.rootScanSize) self.add_size('copied during root scan', event.rootCopiedSize) self.add_count('segments scanned', event.segScanCount) self.add_size('segments scanned', event.segScanSize) self.add_size('copied during segment scan', event.segCopiedSize) self.add_count('single ref scan', event.singleScanCount) self.add_size('single refs scanned', event.singleScanSize) self.add_size('copied during scan of single refs', event.singleCopiedSize) self.add_count('read barrier hits', event.readBarrierHitCount) self.add_count('max grey segments', event.greySegMax) self.add_count('segments scanned without finding refs to white segments', event.pointlessScanCount) def TraceStatFix(self, t, event): self.add_count('fixed refs', event.fixRefCount) self.add_count('fixed refs referring to segs', event.segRefCount) self.add_count('fixed white refs', event.whiteSegRefCount) self.add_count('nailboards', event.nailCount) self.add_count('snaps', event.snapCount) self.add_count('forwarded', event.forwardedCount) self.add_size('forwarded', event.forwardedSize) self.add_count('preserved in place', event.preservedInPlaceCount) self.add_size('preserved in place', event.preservedInPlaceSize) def TraceStatReclaim(self, t, event): self.add_count('segs reclaimed', event.reclaimCount) self.add_size('reclaimed', event.reclaimSize) def ChainCondemnAuto(self, t, event): self.gens = event.topCondemnedGenIndex + 1 def TraceCondemnAll(self, t, event): self.gens = "all" def ArenaAccessBegin(self, t, event): self.accesses[event.mode] += 1 def ArenaPollBegin(self, t, event): self.pause_begin(t, event) def ArenaPollEnd(self, t, event): self.pause_end(t, event) def note(self): return ["trace", format_seconds(self.create), f"{self.gens} gens"] def info(self): info = [] log = info.append base_t, base_cycles, _ = self.times[0] log(f"Trace of {self.gens} gens at {format_seconds(base_t)}") log(f"Why: {self.why}") log("Times:") ot, oc = base_t, base_cycles for t, c, n in self.times[1:]: log(f" {n}\t+{format_seconds(t - ot)} " f"({format_cycles(c - oc)})" f"\t{format_seconds(t - base_t)} " f"({format_cycles(c - base_cycles)})") ot, oc = t, c final_t, final_cycles, _ = self.times[-1] elapsed_t = final_t - base_t elapsed_cycles = final_cycles - base_cycles pn, pt, pc = self.pauses if pc < elapsed_cycles: log(f"{pn:,d} Pauses ({format_seconds(pt)}, {format_cycles(pc)}). " f"Mark/space: {pt / elapsed_t:,.3f}/{pc / elapsed_cycles:,.3f}") log("Sizes:") for n, s in self.sizes: log(f" {n}: {format_bytes(s)}") log("Counts:") for n, c in self.counts: log(f" {n}: {c:,d}") for mode, count in sorted(self.accesses.items()): log(f" {mpsevent.ACCESS_MODE[mode]} barrier hits: {count:,d}") zones = " ".join(f"{((self.whiteRefSet >> (64 - 8 * i)) & 255):08b}" for i in range(1, 9)) log(f"white zones: {self.whiteZones}: {zones}") return info def zoom(self): "Return the period of interest for this trace." return self.times[0][0], self.times[-1][0] def draw(self, axes_dict): "Draw things related to the trace on all the axes." # Uniquify axes based on bounding boxes. axes = {ax.bbox.bounds: ax for ax in axes_dict.values()}.values() return [ ax.axvline(t) for ax, (t, _, _) in product(axes, self.times) ] + [ ax.axvspan(*self.zoom(), alpha=0.5, facecolor='r') for ax in axes ] class Arena(EventHandler): "Model of an MPS arena." def __init__(self, model, pointer, t): "Create Arena owned by model, at pointer, at time t." self.model = model # Owning model. self._pointer = pointer # Arena's pointer. self._arena_class = None # Arena's class pointer. self._serial = None # Arena's serial number. self._system_pools = 0 # Number of system pools. self._pools = [] # List of Pools ever belonging to arena. self._pool = {} # Pointer -> Pool (for live pools). self._gens = [] # List of Gens ever belonging to arena. self._gen = {} # Pointer -> Gen (for live gens). self._alloc = Accumulator() self.model.add_time_series( self, self._alloc, BYTES_AXIS, "alloc", "total allocation by client pools") self._poll = OnOffSeries(t) self.model.add_time_series( self, self._poll, FRACTION_AXIS, "poll", "polling time moving average", click_axis_draw=True) self._access = {} for am, name in sorted(mpsevent.ACCESS_MODE.items()): self._access[am] = RateSeries(t) self.model.add_time_series( self, self._access[am], COUNT_AXIS, f"{name} barrier", f"{name} barrier hits per second") self._seg_size = {} # Segment pointer -> size. self._seg_summary = {} # Segment pointer -> summary. self._zone_ref_size = {} # Zone -> refsize Accumulator. self._univ_ref_size = Accumulator() self.model.add_time_series( self, self._univ_ref_size, BYTES_AXIS, "zone-univ.ref", "size of segments referencing the universe") self._live_traces = {} # Trace pointer -> Trace. self._all_traces = {} # Start time -> Trace. self._traces = TraceSeries(self._all_traces) self.model.add_time_series( self, self._traces, TRACE_AXIS, "trace", "generations condemned by trace", click_axis_draw=True, marker='x', linestyle='None') self._condemned_size = TimeSeries() self.model.add_time_series( self, self._condemned_size, BYTES_AXIS, "condemned.size", "size of segments condemned by trace", marker='+', linestyle='None') @property def name(self): if len(self.model.arenas) <= 1: # No need to distinguish arenas if there's just one. return "" name = self.model.label(self._pointer) if not name: class_name = self.model.label(self._arena_class) or 'Arena' if self._serial is not None: name = f"{class_name}[{self._serial}]" else: name = f"{class_name}[{self._pointer:x}]" return name def delegate_to_pool(self, t, event): "Handle a telemetry event by delegating to the pool model." pointer = event.pool try: pool = self._pool[pointer] except KeyError: self._pool[pointer] = pool = Pool(self, pointer, t) self._pools.append(pool) pool.handle(t, event) def ArenaAlloc(self, t, event): self.delegate_to_pool(t, event) if self._pool[event.pool]._serial >= self._system_pools: self._alloc.add(t, event.size) def ArenaFree(self, t, event): self.delegate_to_pool(t, event) if self._pool[event.pool]._serial >= self._system_pools: self._alloc.sub(t, event.size) PoolInit = \ delegate_to_pool def delegate_to_gen(self, t, event): "Handle a telemetry event by delegating to the generation model." pointer = event.gen try: gen = self._gen[pointer] except KeyError: self._gen[pointer] = gen = Gen(self, pointer) self._gens.append(gen) gen.handle(t, event) GenInit = \ GenZoneSet = \ TraceEndGen = \ delegate_to_gen def ArenaCreateVM(self, t, event): self._arena_class = event.arenaClass self._serial = event.serial self._system_pools = event.systemPools ArenaCreateCL = ArenaCreateVM def PoolFinish(self, t, event): del self._pool[event.pool] def GenFinish(self, t, event): del self._gen[event.gen] def ArenaPollBegin(self, t, event): for trace in self._live_traces.values(): trace.ArenaPollBegin(t, event) self._poll.on(t) def ArenaPollEnd(self, t, event): for trace in self._live_traces.values(): trace.ArenaPollEnd(t, event) self._poll.off(t) def ArenaAccessBegin(self, t, event): self._access[event.mode].inc(t) for trace in self._live_traces.values(): trace.ArenaAccessBegin(t, event) def update_to(self, t): """Update anything in the model which depends on the passage of time, such as anything tracking rates. """ for series in self._access.values(): series.update_to(t) def TraceCreate(self, t, event): assert event.trace not in self._live_traces assert t not in self._all_traces trace = Trace(self, t, event) self._live_traces[event.trace] = self._all_traces[t] = trace # Seems like a reasonable time to call this. self.update_to(t) def delegate_to_trace(self, t, event): "Handle a telemetry event by delegating to the trace model." trace = self._live_traces[event.trace] trace.handle(t, event) return trace TraceBandAdvance = \ TraceFlipBegin = \ TraceFlipEnd = \ TraceReclaim = \ TraceStatFix = \ TraceStatReclaim = \ TraceStatScan = \ delegate_to_trace def ChainCondemnAuto(self, t, event): trace = self.delegate_to_trace(t, event) self._traces.append(trace.create, event.topCondemnedGenIndex + 1) def TraceCondemnAll(self, t, event): trace = self.delegate_to_trace(t, event) self._traces.append(trace.create, len(self._gens)) # TODO what's the right number here??! def TraceDestroy(self, t, event): self.delegate_to_trace(t, event) del self._live_traces[event.trace] def TraceStart(self, t, event): self.delegate_to_trace(t, event) self._condemned_size.append(t, event.condemned) if self._seg_summary: for gen in self._gen.values(): gen.update_ref_size(t, self._seg_summary, self._seg_size) def SegSetSummary(self, t, event): size = event.size self._seg_summary[event.seg] = event.newSummary self._seg_size[event.seg] = size n = self.model.word_width univ = (1 << n) - 1 new_univ = event.newSummary == univ old_univ = event.oldSummary == univ self._univ_ref_size.add(t, (new_univ - old_univ) * size) old_summary = 0 if old_univ else event.oldSummary new_summary = 0 if new_univ else event.newSummary for zone, old, new in zip(reversed(range(n)), bits_of_word(old_summary, n), bits_of_word(new_summary, n)): if new == old: continue if zone not in self._zone_ref_size: self._zone_ref_size[zone] = ref_size = Accumulator() self.model.add_time_series( self, ref_size, BYTES_AXIS, f"zone-{zone}.ref", f"size of segments referencing zone {zone}") self._zone_ref_size[zone].add(t, (new - old) * size) class Line: "A line in a Matplotlib plot wrapping a TimeSeries." COLORS = cycle('blue orange green red purple brown pink gray olive cyan' .split()) def __init__(self, owner, series, yaxis, name, desc, draw=True, color=None, click_axis_draw=False, marker=None, **kwargs): """Create a Line. Arguments: owner -- owning object (whose name prefixes the name of the line). series: TimeSeries -- object whose data is to be drawn. yaxis: AxisDesc -- description of Y-axis for the line. name: str -- short name of line. desc: str -- description of line (for tooltip). draw: bool -- plot this line? color: str -- Matplotlib name of color for line. click_axis_draw: bool -- should a click on a data point draw something on the axes? marker -- Matplotlib marker style. The remaining keyword arguments are passed to Axes.plot when the line is plotted. """ self.owner = owner self.series = series self.yaxis = yaxis self._name = name self.desc = desc self.draw = draw self.click_axis_draw = click_axis_draw self.color = color or next(self.COLORS) self._marker = marker self.axes = None # Currently plotted on axes. self.line = None # Matplotlib Line2D object. self._kwargs = kwargs def __len__(self): return len(self.series) # Doesn't handle slices. def __getitem__(self, key): return self.series[key] @property def marker(self): "Return current Matplotlib marker style for line." if self._marker: return self._marker elif len(self) == 1: return 'x' else: return None @property def name(self): return f"{self.owner.name}.{self._name}" @property def ready(self): return len(self) >= 1 def unplot(self): if self.axes: self.line.remove() self.axes = None def plot(self, axes): "Plot or update line on axes." x = self.series.t y = self.series.y if self.line is None: self.axes = axes self.line, = axes.plot(x, y, color=self.color, label=self.name, marker=self.marker, **self._kwargs) else: if self.axes != axes: self.unplot() axes.add_line(self.line) self.axes = axes self.line.set_data(x, y) self.line.set_label(self.name) self.line.set_marker(self.marker) def contains(self, event): """Test whether the event occurred within the pick radius of the line, returning a pair (False, None) if not, or (True, {'ind': set of points within the radius}) if so. """ if self.line is None: return False, None return self.line.contains(event) def display_coords(self, i): "Return the display coordinates of the point with index `i`." t, y = self[i] return self.line.axes.transData.transform((t, y)) def closest(self, t, dispx, range=10): """Return the index of the point closest to time `t`, if within `range` points of display coordinate `dispx`, otherwise None.""" if self.draw and self.ready: i = self.series.closest(t) dx, _ = self.display_coords(i) if abs(dispx - dx) < range: return i return None def draw_point(self, index, axes_dict): """Draw in response to a click on a data point, and return a list of drawn items. """ drawn = self.series.draw(self, index, axes_dict) # Could just draw on axes_dict[self.yaxis] ?? if drawn is None: if self.click_axis_draw: t, _ = self[index] drawn = [ax.axvline(t) for ax in axes_dict.values()] else: drawn = [] return drawn def recompute(self, f): """Recompute the line's time series with a time constant changed by factor `f`. """ return self.series.recompute(f) class Model(EventHandler): "Model of an application using the MPS." def __init__(self, event_queue): "Create model based on queue of batches of telemetry events." self._queue = event_queue self._intern = {} # stringId -> string self._label = {} # address or pointer -> stringId self._arena = {} # pointer -> Arena (for live arenas) self.arenas = [] # All arenas created in the model. self.lines = [] # All Lines available for plotting. self._needs_redraw = True # Plot needs redrawing? def add_time_series(self, *args, **kwargs): "Add a time series to the model." line = Line(*args, **kwargs) self.lines.append(line) return line def label(self, pointer): "Return string labelling address or pointer, or None if unlabelled." return self._intern.get(self._label.get(pointer)) def plot(self, axes_dict, keep_limits=False): "Draw time series on the given axes." if not self._needs_redraw: return self._needs_redraw = False # Collate drawable lines by y-axis. yaxis_lines = defaultdict(list) for line in self.lines: if line.ready and line.draw: yaxis_lines[line.yaxis].append(line) else: line.unplot() bounds_axes = defaultdict(list) # Axes drawn in each area. # Draw the lines. for yax in yaxis_lines: axes = axes_dict[yax] axes.set_axis_on() for line in yaxis_lines[yax]: line.plot(axes) if not keep_limits: axes.relim(visible_only=True) axes.autoscale_view() bounds_axes[axes.bbox.bounds].append((axes, yax)) # Set the format_coord method for each axis. for bounds, ax_list in bounds_axes.items(): if len(ax_list) > 1: for ax, yax in ax_list: # Capture the current values of ax_list and tData here. def format_coord(x, y, ax_list=ax_list, tData=ax.transData): # x, y are data coordinates. # axy is corresponding display coordinate. _, axy = tData.transform((0, y)) # Invert the transforms here. If you invert them at # plotting time and cache them so we don't have to # invert them every time format_coord is called, then # you get the wrong answer. We don't know why. return (f"{format_seconds(x)}, " + ", ".join(yax.format(ax.transData.inverted() .transform((0, axy))[1]) for ax, yax in ax_list)) ax.format_coord = format_coord else: ax, yax = ax_list[0] def format_coord(x, y): return f'{format_seconds(x)}, {yax.format(y)}' ax.format_coord = format_coord def update(self): "Consume available telemetry events and update the model." while True: try: batch = self._queue.get_nowait() except queue.Empty: break else: for t, event in batch: self.handle(t, event) def needs_redraw(self): "Call this when the model needs redrawing." self._needs_redraw = True def delegate_to_arena(self, t, event): "Handle a telemetry event by delegating to the arena model." addr = event.arena try: arena = self._arena[addr] except KeyError: self._arena[addr] = arena = Arena(self, addr, t) self.arenas.append(arena) arena.handle(t, event) ArenaAccessBegin = \ ArenaAlloc = \ ArenaCreateCL = \ ArenaCreateVM = \ ArenaFree = \ ArenaPollBegin = \ ArenaPollEnd = \ ChainCondemnAuto = \ GenFinish = \ GenInit = \ GenZoneSet = \ PoolFinish = \ PoolInit = \ SegSetSummary = \ TraceBandAdvance = \ TraceCondemnAll = \ TraceCreate = \ TraceDestroy = \ TraceEndGen = \ TraceFlipBegin = \ TraceFlipEnd = \ TraceReclaim = \ TraceStart = \ TraceStart = \ TraceStatFix = \ TraceStatReclaim = \ TraceStatScan = \ delegate_to_arena def EventClockSync(self, t, event): self.needs_redraw() def Intern(self, t, event): self._intern[event.stringId] = event.string.decode('ascii', 'replace') def Label(self, t, event): self._label[event.address] = event.stringId def LabelPointer(self, t, event): self._label[event.pointer] = event.stringId def ArenaDestroy(self, t, event): del self._arena[event.arena] def EventInit(self, t, event): self.word_width = event.wordWidth class ApplicationToolbar(NavigationToolbar): "Subclass of Matplotlib's navigation toolbar adding a pause button." def __init__(self, canvas, app): self.toolitems += (('Pause', 'Pause', PAUSE_ICON, 'pause'),) super().__init__(canvas, app) self._actions['pause'].setCheckable(True) self._app = app self.paused = False def pause(self, event=None): "Toggle the pause button." self.paused = not self.paused self._actions['pause'].setChecked(self.paused) def empty(self): "Is the stack of views empty?" return self._nav_stack.empty() class ErrorReporter(ContextDecorator): """Context manager which reports the traceback of any exception to the function provided to its constructor. Useful when exceptions are otherwise silently ignored or reported to a stream which is not promptly flushed. May also be used as a decorator. """ def __init__(self, writelines): self._writelines = writelines def __enter__(self): return self def __exit__(self, ty, val, tb): if ty is not None: self._writelines(traceback.format_exception(ty, val, tb)) # All keyboard shortcuts. Each one is a triple: # `(iterable, method name, documentation)`. # # If `iterable` is empty, `documentation` is a string output as part of # help documentation. # # Otherwise the members of `iterable` are presentation names of key # presses. After convertion via the event_key function, they are matched # against `event.key` for MPL key press events. So `iterable` may be a # single character, or a short string (whose individual characters are # the keys), or an iterable of strings. # # `method_name` should be the name of a method on ApplicationWindow, # without the preceding underscore. # # If method_name is None, there is no binding. Also later entries # over-ride earlier ones. The combination of these two facts allows # us to give all the built-in MPL bindings as the first entries in # this list, and just over-ride them, either with a disabling # None/None or with our own binding. While the monitor is in active # development this flexibility is good. SHORTCUTS = [ # First the shortcuts which come with the MPL navigation toolbar. ((), None, 'Navigation bar shortcuts:'), (('h', 'r', 'Home'), 'mpl_key', "Zoom out to the whole dataset"), (('c', 'Backspace', 'Left'), 'mpl_key', "Back to the previous view"), (('v', 'Right'), 'mpl_key', "Forward to the next view"), ('p', 'mpl_key', "Select the pan/zoom tool"), ('o', 'mpl_key', "Select the zoom-to-rectangle tool"), (('Ctrl+S', 'Cmd+S'), 'mpl_key', "Save the current view as a PNG file"), ('g', 'mpl_key', "Show major grid lines"), ('G', 'mpl_key', "Show minor grid lines"), ('Lk', 'mpl_key', "Toggle log/linear on time axis"), (('Ctrl+F', 'Ctrl+Alt+F'), 'mpl_key', "Toggle full-screen mode"), # Disable some of the MPL's shortcuts. (('Ctrl+F',), None, None), # Full-screen doesn't work. ('g', None, None), # No major grids. ('G', None, None), # No useful minor grids. ('L', None, None), # Log time axis not useful. ('k', None, None), # Log time axis not useful. # Our own shortcuts, some of which over-ride MPL ones. ((), None, "Other shortcuts:"), (('Ctrl+W', 'Cmd+W'), 'close', "Close the monitor"), ('l', 'toggle_log_linear', "Toggle log/linear byte scale"), (('Right',), 'next_point', "Select next point of selected series"), (('Left',), 'previous_point', "Select previous point of selected series"), (('Up',), 'up_line', "Select point on higher series"), (('Down',), 'down_line', "Select point on lower series"), (('PageUp',), 'slower', "Double time constant for time-dependent series"), (('PageDown',), 'faster', "Halve time constant for time-dependent series"), (('Pause',), 'pause', "Freeze/thaw axis limits"), ('+', 'zoom_in', "Zoom in"), ('-', 'zoom_out', "Zoom out"), ('z', 'zoom', "Zoom in to selected point"), ('i', 'info', "Show detail on selected point"), ('?h', 'help', "Show help"), ] # Set of keys whose presses are not logged. IGNORED_KEYS = { 'alt', 'cmd', 'control', 'ctrl', 'shift', 'super', # Windows key } def event_key(key): """Convert presentation name of key to a string that can be matched against a Matplotlib event.key. Names of length 1 are unchanged, but longer names are converted to lower case. """ if len(key) <= 1: return key else: return key.lower() class ApplicationWindow(QtWidgets.QMainWindow): """PyQt5 application displaying time series derived from MPS telemetry output. """ def __init__(self, model : Model, title : str): """Create application. 'model' is the MPS model whose time series are to be displayed, and 'title' is the main window title. """ super().__init__() self._model = model # The MPS model. self._home_limits = None # Limits of the graph in "home" position. self._line_checkbox = {} # Line -> QCheckbox. self.setWindowTitle(title) main = QtWidgets.QWidget() self.setCentralWidget(main) # Make a splitter and a layout to contain it. main_layout = QtWidgets.QHBoxLayout() splitter = QtWidgets.QSplitter(QtCore.Qt.Vertical) main_layout.addWidget(splitter) main.setLayout(main_layout) # Above the splitter, an hbox layout. upper = QtWidgets.QWidget() upper_layout = QtWidgets.QHBoxLayout() upper.setLayout(upper_layout) splitter.addWidget(upper) # Scrollable list of checkboxes, one for each time series. self._lines = QtWidgets.QVBoxLayout() self._lines_scroll = QtWidgets.QScrollArea( horizontalScrollBarPolicy=QtCore.Qt.ScrollBarAlwaysOff) self._lines_widget = QtWidgets.QWidget() lines_layout = QtWidgets.QVBoxLayout(self._lines_widget) lines_layout.addLayout(self._lines) lines_layout.addStretch(1) self._lines_scroll.setWidget(self._lines_widget) self._lines_scroll.setWidgetResizable(True) upper_layout.addWidget(self._lines_scroll) # Matplotlib canvas. self._canvas = FigureCanvas(Figure(figsize=(10, 8))) upper_layout.addWidget(self._canvas) # Create all axes, set up tickmarks etc bytes_axes, trace_axes = self._canvas.figure.subplots( nrows=2, sharex=True, gridspec_kw={'hspace': 0, 'height_ratios': (5, 2)}) fraction_axes = bytes_axes.twinx() count_axes = trace_axes.twinx() self._axes_dict = { BYTES_AXIS: bytes_axes, FRACTION_AXIS: fraction_axes, TRACE_AXIS: trace_axes, COUNT_AXIS: count_axes, } for yax in self._axes_dict: self._axes_dict[yax].set_ylabel(yax.label) self._axes_dict[yax].set_xlabel("time (seconds)") self._axes_dict[yax].set_yscale('linear') # Bytes tick labels in megabytes etc. bytes_axes.ticklabel_format(style='plain') bytes_axes.yaxis.set_major_formatter(format_tick_bytes) self._log_scale = False # Make a toolbar and put it on the top of the whole layout. self._toolbar = ApplicationToolbar(self._canvas, self) self.addToolBar(QtCore.Qt.TopToolBarArea, self._toolbar) # Below the splitter, a logging pane. self._logbox = QtWidgets.QTextEdit() self._logbox.setReadOnly(True) self._logbox.setLineWrapMode(True) splitter.addWidget(self._logbox) # Line annotations. self._line_annotation = bytes_axes.annotate( "", xy=(0, 0), xytext=(-20, 20), textcoords='offset points', bbox=dict(boxstyle='round', fc='w'), arrowprops=dict(arrowstyle='->'), annotation_clip=False, visible=False) self._line_annotation.get_bbox_patch().set_alpha(0.8) self._canvas.mpl_connect("button_release_event", self._click) # Points close in time to the most recent selection, on each line, in # increasing y order (line, index, ...). self._close_points = None # Map from line to index into self._close_points. self._close_line = None # Index of currently selected point in self._close_points. self._selected = None # Things drawn for the current selection. self._drawn = [] # Mapping from event key to (method, presentation name, # documentation) for keyboard shortcuts. self._shortcuts = {} for keys, method, doc in SHORTCUTS: for key in keys: if method is None: self._shortcuts.pop(event_key(key), None) else: self._shortcuts[event_key(key)] = getattr( self, '_' + method), key, doc # Pass all keystrokes to on_key_press, where we can capture them or # pass them on to the toolbar. self._canvas.mpl_connect('key_press_event', self._on_key_press) self._canvas.setFocusPolicy(QtCore.Qt.StrongFocus) self._canvas.setFocus() # Call self._update in a loop forever. self._update() self._timer = self._canvas.new_timer(100, [(self._update, (), {})]) self._timer.start() def _log(self, message): "Append message to the log box." self._logbox.append(message.rstrip("\n")) def _log_lines(self, messages): "Append messages to the log box." for message in messages: self._log(message) def _on_key_press(self, event): "Handle a keyboard event." with ErrorReporter(self._log_lines): if event.key in self._shortcuts: self._shortcuts[event.key][0](event) elif not set(event.key.split('+')).issubset(IGNORED_KEYS): self._log(f"Unknown key {event.key!r}") def _mpl_key(self, event): "Pass a key-press event to the toolbar." key_press_handler(event, self._canvas, self._toolbar) def _help(self, event): "Report keyboard help to the log pane." # Collate shortcut keys by their documentation string. doc_keys = defaultdict(list) for _, key, doc in self._shortcuts.values(): doc_keys[doc].append(key) for keys, method, doc in SHORTCUTS: if not keys: self._log(doc) elif doc in doc_keys: self._log(f"\t{'/'.join(doc_keys[doc])}\t{doc}") def _pause(self, event): "Toggle pausing of axis limit updates." self._toolbar.pause() def _close(self, event): "Close the monitor application." self.close() def _toggle_log_linear(self, event): "Toggle the bytes axis between log and linear scales." yscale = 'linear' if self._log_scale else 'log' self._axes_dict[BYTES_AXIS].set_yscale(yscale) self._axes_dict[BYTES_AXIS].yaxis.set_major_formatter( format_tick_bytes) self._log_scale = not self._log_scale self._log(f'Switched bytes axis to {yscale} scale.') def _next_point(self, event): "Select the next point on the selected line." if self._close_points is None: return line, index = self._close_points[self._selected] self._select(line, index + 1) def _previous_point(self, event): "Select the previous point on the selected line." if self._close_points is None: return line, index = self._close_points[self._selected] self._select(line, index - 1) def _up_line(self, event): "Select the point on the line above the currently selected point." if self._selected is None: return self._annotate(self._selected + 1) def _down_line(self, event): "Select the point on the line below the currently selected point." if self._selected is None: return self._annotate(self._selected - 1) def _select(self, line, index): "Select the point with index `index` on `line`, if it exists." if index < 0 or index >= len(line): return t, y = line[index] self._recentre(mid=t, force=False) dispx, _ = line.display_coords(index) self._find_close(t, dispx, on_line=line, index=index) self._annotate(self._close_line[line]) def _clear(self): "Remove all annotations and visible markings of selected points." self._line_annotation.set_visible(False) for d in self._drawn: d.set_visible(False) self._drawn = [] def _unselect(self, line=None): "Undo selection. If `line` is currently selected, remove annotations." if self._selected is not None and line is not None: selected_line, index = self._close_points[self._selected] if line == selected_line: self._clear() self._selected = self._close_points = None def _annotate(self, line_index): "Select the closest point on line `line_index`." if line_index < 0 or line_index >= len(self._close_points): return self._selected = line_index line, index = self._close_points[self._selected] note = line.series.note(line, index) self._log_lines(note) self._clear() a = self._line_annotation if a.figure is not None: a.remove() line.axes.add_artist(a) a.xy = line[index] a.set_text("\n".join(note)) a.set_visible(True) self._drawn += line.draw_point(index, self._axes_dict) def _info(self, event): "Report more information about the currently selected point." if self._close_points is None: self._log('No selected data point') return line, index = self._close_points[self._selected] self._log_lines(line.series.info(line, index)) def _find_close(self, t, dispx, on_line=None, index=None): "Find all the points at times close to `t`, so we can select one." pts = [] for line in self._model.lines: if line == on_line: closest = index else: closest = line.closest(t, dispx) if closest is not None: _, dispy = line.display_coords(closest) pts.append((dispy, line, closest)) self._close_points = [] self._close_line = {} for dispy, line, index in sorted(pts, key=lambda pt:pt[0]): self._close_line[line] = len(self._close_points) self._close_points.append((line, index)) def _recompute(self, factor): "Scale all time constants by some factor." self._log(f'Scaling time constants by a factor {factor}:...') selected_line, _ = self._close_points[self._selected] for line in self._model.lines: log = line.recompute(factor) if log: self._log(f' {line.name}: {log}') if line == selected_line: self._clear() self._model.needs_redraw() def _slower(self, event): "Double all time constants." self._recompute(2) def _faster(self, event): "Halve all time constants." self._recompute(0.5) def _click(self, event): "Handle left mouse click by annotating line clicked on." if event.button != 1 or not event.inaxes: return # If we want control-click, shift-click, and so on: # modifiers = QtGui.QGuiApplication.keyboardModifiers() # if (modifiers & QtCore.Qt.ControlModifier): ... for line in self._model.lines: if not (line.ready and line.draw): continue contains, index = line.contains(event) if contains: i = index['ind'][0] t, y = line[i] dispx, _ = line.display_coords(i) self._find_close(t, dispx) self._annotate(self._close_line[line]) break else: self._unselect() self._clear() def _zoom_in(self, event): "Zoom in by a factor of 2." self._recentre(zoom=2) def _zoom_out(self, event): "Zoom out by a factor of 2." self._recentre(zoom=0.5) def _zoom(self, event): """Zoom in to current data point, by a factor of two or to the point's natural limits. If there's no current point, zoom in by a factor of 2. """ if self._close_points is None: self._zoom_in(event) return line, index = self._close_points[self._selected] lim = line.series.zoom(line, index) if lim is None: self._recentre(zoom=2, mid=line[index][0]) else: # Make a bit of slack. lo, hi = lim width = hi - lo self._zoom_to(lo - width / 8, hi + width / 8) def _recentre(self, zoom=1.0, mid=None, force=True): """Recentre on `mid`, if given, and zoom in or out by factor `zoom`. If `force` is false, and `mid` is near the middle of the resulting box, or near the lowest time, or near the highest time, don't do it. """ xlim, _ = self._limits tmin, tmax = self._time_range lo, hi = xlim half_width = (hi - lo) / (2 * zoom) if mid is None: mid = (hi + lo) / 2 elif not force: if mid - lo > half_width / 4 and hi - mid > half_width / 4: # If data point is in centre half, don't shift. return if mid < lo + half_width / 4 and tmin > lo: # Don't shift left if lowest T is already displayed. return if mid > hi - half_width / 4 and tmax < hi: # Don't shift right if highest T is already displayed. return newlo = max(tmin - (tmax - tmin) / 16, mid - half_width) newhi = min(tmax + (tmax - tmin) / 16, mid + half_width) self._zoom_to(newlo, newhi) def _zoom_to(self, lo, hi): "Redraw with new limits on the time axis." ax = self._axes_dict[BYTES_AXIS] if self._toolbar.empty(): self._toolbar.push_current() ax.set_xlim(lo, hi) self._toolbar.push_current() @property def _time_range(self): "Pair (minimum time, maximum time) for any data point." return (min(line[0][0] for line in self._model.lines if line.ready), max(line[-1][0] for line in self._model.lines if line.ready)) @property def _limits(self): "Current x and y limits of the Matplotlib graph." ax = self._axes_dict[BYTES_AXIS] return ax.get_xlim(), ax.get_ylim() def _update(self): "Update the model and redraw if not paused." with ErrorReporter(self._log_lines): if (not self._toolbar.paused and self._home_limits not in (None, self._limits)): # Limits changed (for example, because user zoomed in), so # pause further updates to the limits of all axes, to give # user a chance to explore. self._toolbar.pause() self._home_limits = None self._model.update() self._model.plot(self._axes_dict, keep_limits=self._toolbar.paused) if not self._toolbar.paused: self._home_limits = self._limits self._canvas.draw() # Find new time series and create corresponding checkboxes. checkboxes_changed = False for line in self._model.lines: if not line.ready: continue new_name = line.name if line in self._line_checkbox: # A line's name can change dynamically (for example, # because of the creation of a second arena, or a Label # event), so ensure that it is up to date. old_name = self._line_checkbox[line].text() if old_name != new_name: self._line_checkbox[line].setText(new_name) checkboxes_changed = True else: checkboxes_changed = True checkbox = QtWidgets.QCheckBox(new_name) self._line_checkbox[line] = checkbox checkbox.setChecked(line.draw) checkbox.setToolTip(f"{line.desc} ({line.yaxis.label})") self._lines.addWidget(checkbox) def state_changed(state, line=line): self._unselect(line) line.draw = bool(state) self._model.needs_redraw() checkbox.stateChanged.connect(state_changed) checkbox.setStyleSheet(f"color:{line.color}") # Sort checkboxes into order by name and update width. if checkboxes_changed: checkboxes = self._line_checkbox.values() for checkbox in checkboxes: self._lines.removeWidget(checkbox) for checkbox in sorted(checkboxes, key=lambda c:c.text()): self._lines.addWidget(checkbox) self._lines_scroll.setFixedWidth( self._lines_widget.sizeHint().width()) def main(): parser = argparse.ArgumentParser(description="Memory Pool System Monitor.") parser.add_argument( 'telemetry', metavar='FILENAME', nargs='?', type=str, default=os.environ.get('MPS_TELEMETRY_FILENAME', 'mpsio.log'), help="telemetry output from the MPS instance") args = parser.parse_args() with open(args.telemetry, 'rb') as telemetry_file: event_queue = queue.Queue() model = Model(event_queue) decoder = telemetry_decoder(telemetry_file.read) for batch in decoder(1): event_queue.put(batch) model.update() stop = threading.Event() def decoder_thread(): while not stop.isSet(): for batch in decoder(): if stop.isSet(): break event_queue.put(batch) thread = threading.Thread(target=decoder_thread) thread.start() qapp = QtWidgets.QApplication([]) app = ApplicationWindow(model, args.telemetry) app.show() result = qapp.exec_() stop.set() thread.join() return result if __name__ == '__main__': exit(main()) # C. COPYRIGHT AND LICENSE # # Copyright (C) 2018-2020 Ravenbrook Limited . # # Redistribution and use in source and binary forms, with or without # modification, are permitted provided that the following conditions are # met: # # 1. Redistributions of source code must retain the above copyright # notice, this list of conditions and the following disclaimer. # # 2. Redistributions in binary form must reproduce the above copyright # notice, this list of conditions and the following disclaimer in the # documentation and/or other materials provided with the # distribution. # # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS # IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED # TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A # PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT # HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, # DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY # THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. # # # $Id$