1
Fork 0
mirror of git://git.sv.gnu.org/emacs.git synced 2026-03-25 08:12:11 -07:00

Catch-up merge from custom/cet/branch/branch/2018-06-20/monitor to branch/2018-06-20/monitor.

Copied from Perforce
 Change: 195052
This commit is contained in:
Gareth Rees 2018-09-12 09:25:26 +01:00
parent e12c5afaff
commit 3f0dd10bdd

View file

@ -9,7 +9,6 @@
#
# Requirements: Python 3.6, Matplotlib, PyQt5.
import argparse
from collections import defaultdict, deque, namedtuple
from itertools import count, cycle
@ -21,6 +20,8 @@ import threading
import time
import math
import bisect
import traceback
from contextlib import redirect_stdout, ContextDecorator
from matplotlib.backends.qt_compat import QtCore, QtGui, QtWidgets
from matplotlib.backend_bases import key_press_handler
@ -39,30 +40,61 @@ class YAxis:
def label(self): return self._label
def with_SI_prefix(y, precision=5, unit=None):
"Turn the number `y` into a string using SI prefixes followed by `unit`"
if y < 0:
return '-'+with_SI_prefix(-y, precision, unit)
elif y == 0:
s = '0 '
else:
l = math.floor(math.log(y, 1000))
if l < 0:
smalls = 'munpfazy'
if l < -len(smalls):
l = -len(smalls)
f = 1000 ** l
m = y / f
if l > -len(smalls) and m < 10: # up to 4 digits before the decimal
m *= 1000
l -= 1
s = f'{m:.{precision}g} {smalls[-l-1]}'
elif y > 10000:
bigs = 'kMGTPEZY'
if l > len(bigs):
l = len(bigs)
f = 1000 ** l
m = y / f
if m < 10: # up to 4 digits before the decimal
m *= 1000
l -= 1
s = f'{m:.{precision}g} {bigs[l-1]}'
else: # l = 0
s = f'{y:.{precision}g} '
if unit:
s += unit
return s.strip()
def bytesFormat(y):
"Format a number of bytes as a string."
return with_SI_prefix(y) + (' bytes' if y < 10000 else 'B')
def with_SI_prefix(y,unit=''):
if y < 0:
return '-'+with_SI_prefix(-y)
i = 0
while y >= 10000:
y /= 1000
i += 1
return ('{:.0f} {}'.format(y,' KMGTPE'[i])+unit).strip()
def bytesTickFormatter(y,pos):
"A tick formatter for matplotlib, for a number of bytes."
return with_SI_prefix(y)
def cyc(n):
"Format a number of clock cycles as a string."
return with_SI_prefix(n, unit='c')
# The three y axes which we support
def dur(t):
"Format a duration in seconds as a string."
return with_SI_prefix(t, unit='s')
# The y axes which we support
bytesAxis = YAxis('bytes', bytesFormat)
fractionAxis = YAxis('fraction', lambda v: f'{v:.5f}')
traceAxis = YAxis('gens', lambda v: f'{v} gens')
countAxis = YAxis('count', lambda v: f'{v:,d}')
traceAxis = YAxis('gens', lambda v: f'{v:,.2f} gens')
countAxis = YAxis('count', lambda v: f'{v:,.0f}')
# Names of scanning ranks
rank_name = {0: 'Ambig',
@ -77,6 +109,7 @@ access_mode = {1: 'Read',
3: 'Read/Write',
}
# Names of reasons for traces
trace_why = {1: 'gen 0 capacity',
2: 'dynamic criterion',
3: 'opportunitism',
@ -257,6 +290,8 @@ class TimeSeries:
self.y.append(y)
def closest(self, t):
"Return the index of the closest point in the series to time `t`."
i = bisect.bisect(self.t, t)
if (i == len(self) or
(i > 0 and (self.t[i] - t) > (t - self.t[i-1]))):
@ -264,21 +299,50 @@ class TimeSeries:
return i
def recompute(self, f):
pass
"Recompute the time series with a time constant changed by factor `f`"
def note(self, line, t, index, verbose=False):
"""Return text for a tooltip, and for the log pane, describing the
data point at time `t` (which has index `index`), or None,
None if there is nothing to say. Subclasses should override
`series_note` rather than this.
"""
if self._note_fn:
return self._note_fn(line, t, index, verbose=verbose)
return self.series_note(line, t, index, verbose=verbose)
def series_note(self, line, t, index, verbose=False):
"""As for `note`. Subclasses should override this rather than `note`."""
return None, None
def zoom(self, line, t, index):
"""Return minimum and maximum times for a zoom range around the data
point at time `t` (which has index `index`), or None if
there's no particular range. Subclasses should override
`series_zoom` rather than this.
"""
if self._zoom_fn:
return self._zoom_fn(line, t, index)
return self.series_zoom(line, t, index)
def series_zoom(self, line, t, index):
"""As for `zoom`. Subclasses should override this rather than `zoom`."""
return None
def draw(self, line, t, index, axes_dict):
"""Draw something on the axes in `axes_dict` when the data point at
time `t` (which has inddex `index`) is selected. Subclasses
should override `series_zoom` rather than this.
"""
if self._draw_fn:
return self._draw_fn(line, t, index, axes_dict)
return self.series_draw(line, t, index, axes_dict)
def series_draw(self, line, t, index, axes_dict):
"""As for `draw`. Subclasses should override this rather than `draw`."""
return None
class Accumulator(TimeSeries):
@ -307,11 +371,16 @@ class RateSeries(TimeSeries):
super().__init__()
self._period = period
self._count = 0
self._start = t
# Consider a series starting near the beginning of time to be starting at zero.
if t < period/16:
self._start = 0
else:
self._start = t
self.ts=[]
self._limit = ((t // period) + 1) * period
def inc(self, t):
"A counted event took place."
self.null(t)
self.ts.append(t)
self._count += 1
@ -323,11 +392,29 @@ class RateSeries(TimeSeries):
self._limit += self._period
def recompute(self, f):
"Recompute the series with a different period."
ts = self.ts
self.__init__(self._start, self._period * f)
for t in ts:
self.inc(t)
return f'period {self._period:.3f} s'
return f'period {dur(self._period)}'
def series_note(self, line, t, index, verbose=False):
start = self._start + self._period * index
end = start + self._period
note = f'{line.name}\n {dur(start)}-{dur(end)}\n{line.yaxis.fmt(self.y[index])}'
return note, note.replace('\n', ' ')
def series_zoom(self, line, t, index):
start = self._start + self._period * index
end = start + self._period
return start, end
def series_draw(self, line, t, index, axes_dict):
ax = axes_dict[line.yaxis]
start = self._start + self._period * index
end = start + self._period
return [ax.axvspan(start, end, alpha=0.5, facecolor=line.color)]
class OnOffSeries(TimeSeries):
"""Series of on/off events; can draw as an exponentially weighted moving
@ -339,7 +426,16 @@ average on/off ratio or (potentially) as shading bars."""
self._k = k
self._ratio = 0.0
def on(self, t):
"Record the start of an event."
dt = t - self._last
f = math.exp(-self._k * dt)
self._ratio = f * self._ratio
self._last = t
self.append(t, self._ratio)
def off(self, t):
"Record the end of an event."
dt = t - self._last
f = math.exp(-self._k * dt)
self._ratio = 1 - f * (1 - self._ratio)
@ -347,35 +443,28 @@ average on/off ratio or (potentially) as shading bars."""
self._last = t
self.append(t, self._ratio)
def on(self, t):
dt = t - self._last
f = math.exp(-self._k * dt)
self._ratio = f * self._ratio
self._last = t
self.append(t, self._ratio)
def recompute(self, f):
ts = self.t
self.__init__(self._start, self._k / f)
for i in range(len(ts) // 2):
self.on(ts[i*2])
self.off(ts[i*2+1])
return f'time constant: {1/self._k:.3f} s'
return f'time constant: {dur(1/self._k)}'
def note(self, line, t, index, verbose=False):
def series_note(self, line, t, index, verbose=False):
on = self._ons[index // 2]
l = on[1]-on[0]
note = f"{line.name}: {on[0]:.3f} + {l * 1000:.3f} ms"
note = f"{line.name}: {dur(on[0])} + {dur(l)}"
return note, note
def zoom(self, line, t, index):
def series_zoom(self, line, t, index):
on = self._ons[index // 2]
return (on[0], on[1])
def draw(self, line, t, index, axes_dict):
def series_draw(self, line, t, index, axes_dict):
axes_to_draw = {ax.bbox.bounds: ax for ax in axes_dict.values()}.values()
on = self._ons[index // 2]
return [ax.axvspan(on[0], on[1], alpha=0.5, facecolor='g')
return [ax.axvspan(on[0], on[1], alpha=0.5, facecolor=line.color)
for ax in axes_to_draw]
class EventHandler:
@ -485,6 +574,7 @@ class Gen(EventHandler):
self._mortality_average.append(t, event.mortalityAverage)
class Trace(EventHandler):
"Model of an MPS Trace."
def __init__(self, arena, t, event):
self._arena = arena
self.create = t
@ -499,19 +589,24 @@ class Trace(EventHandler):
self.begin_pause(t, event)
def add_time(self, name, t, event):
"Log a particular event for this trace, e.g. beginning or end of a phase."
self.times.append((t, event.header.clock, name))
def add_size(self, name, s):
"Log a size related to this trace, so all sizes can be reported together."
self.sizes.append((name, s))
def add_count(self, name, c):
"Log a count related to this trace, so all counts can be reported together."
self.counts.append((name, c))
def begin_pause(self, t, event):
"Log the start of some MPS activity during this trace, so we can compute mark/space etc."
assert self.pause_start is None
self.pause_start = (t, event.header.clock)
def end_pause(self, t, event):
"Log the end of some MPS activity during this trace, so we can compute mark/space etc."
assert self.pause_start is not None
st, sc = self.pause_start
tn, tt, tc = self.pauses
@ -533,7 +628,7 @@ class Trace(EventHandler):
self.add_time("flip end", t, event)
def TraceBandAdvance(self, t, event):
self.add_time(f"band advance {rank_name[event.rank]}", t, event)
self.add_time(f"{rank_name[event.rank]} band", t, event)
def TraceReclaim(self, t, event):
self.add_time("reclaim", t, event)
@ -587,20 +682,20 @@ class Trace(EventHandler):
self.end_pause(t, event)
def note(self, verbose=False):
"Describe this trace for tooltip and the log pane."
base_t, base_cycles, _ = self.times[0]
if verbose:
log = f"Trace of {self.gens} gens at {base_t:.6f} ({self.why}):\nTimes: \n"
log = f"Trace of {self.gens} gens at {dur(base_t)} ({self.why}):\nTimes: \n"
ot, oc = base_t, base_cycles
for t,c,n in self.times[1:]:
log += " {:20} +{:.3f} ms ({}): ({:.3f} ms, {})\n".format(
n, (t-ot)*1000, cyc(c-oc), (t-base_t)*1000, cyc(c-base_cycles))
log += f" {n:20} +{dur(t-ot)} ({cyc(c-oc)}): ({dur(t-base_t)}, {cyc(c-base_cycles)})\n"
ot, oc = t, c
final_t, final_cycles,_ = self.times[-1]
elapsed_t = final_t - base_t
elapsed_cycles = final_cycles - base_cycles
pn, pt, pc = self.pauses
if pc < elapsed_cycles:
log += f"{pn:,d} Pauses ({pt*1000:,.3f} ms, {cyc(pc)}). Mark/space: {pt/elapsed_t:,.3f}/{pc/elapsed_cycles:,.3f}\n"
log += f"{pn:,d} Pauses ({dur(pt)}, {cyc(pc)}). Mark/space: {pt/elapsed_t:,.3f}/{pc/elapsed_cycles:,.3f}\n"
log += "Sizes:\n"
for (n, s) in self.sizes:
log += f" {n}: {bytesFormat(s)}\n"
@ -613,13 +708,17 @@ class Trace(EventHandler):
log += ' '.join(f'{((self.whiteRefSet >> (64-8*i)) & 255):08b}'
for i in range(1,9))
else:
log = f"Trace of {self.gens} gens at {base_t:.6f} ({self.why})"
return f"trace\n{self.create:f} s\n{self.gens}", log
log = f"Trace of {self.gens} gens at {dur(base_t)} ({self.why})"
return f"trace\n{self.create:f} s\n{self.gens} gens", log
def zoom(self):
"Return the period of interest for this trace."
if self._zoom_fn:
return self._zoom_fn(line, t, index)
return (self.times[0][0], self.times[-1][0])
def draw(self, axes_dict):
"Draw things related to the trace on all the axes."
# uniquify axes based on bounding boxes
axes_to_draw = {ax.bbox.bounds: ax for ax in axes_dict.values()}.values()
return ([ax.axvline(t)
@ -921,10 +1020,14 @@ class Line:
return self.line.contains(event)
def dispxy(self, i):
"Return the display coordinates of the point with index `i`."
t, y = self[i]
return self.line.axes.transData.transform((t,y))
def closest(self, t, dispx, range=10):
"""Return the index of the point closest to time `t`, if within
`range` points of display coordinate `dispx`, otherwise None."""
if self.draw and self.ready:
i = self.series.closest(t)
dx, _ = self.dispxy(i)
@ -933,7 +1036,7 @@ class Line:
return None
def note(self, index, verbose=False):
"Return annotation text and log box text for a selected point."
"Return annotation text and log pane text for a selected point."
t, _ = self.series[index]
return self.series.note(self, t, index, verbose=verbose)
@ -1009,28 +1112,23 @@ class Model(EventHandler):
# Set the format_coord method for each axes
for bounds, ax_list in bounds_axes.items():
if len(ax_list) > 1:
# If format_coord iterates of ax_list, it may iterate
# over the wrong value of ax_list (the last value
# bound by the bounds_axes iteration). So build this
# separate yax_trans_list here.
yax_trans_list = [(yax, ax.transData)
for ax, yax in ax_list]
for ax, yax in ax_list:
tData = ax.transData
def format_coord(x, y):
# capture the current values of ax_list and tData here
def format_coord(x, y, ax_list=ax_list, tData=ax.transData):
# x, y are data coordinates
_, axy = tData.transform((0, y)) # y in display coordinates
# Invert the transforms here. If you invert them at plotting time
# and cache them so we don't have to invert them every time format_coord
# is called, then you get the wrong answer.
return (f'{x:f} s, ' +
', '.join(inner_yax.fmt(t.inverted().transform((0, axy))[1])
for inner_yax, t in yax_trans_list))
# is called, then you get the wrong answer. We don't know why.
return (f'{dur(x)}, ' +
', '.join(yax.fmt(ax.transData.inverted().transform((0, axy))
[1])
for ax, yax in ax_list))
ax.format_coord = format_coord
else:
ax, yax = ax_list[0]
def format_coord(x, y):
return f'{x:f} s, {yax.fmt(y)}'
return f'{dur(x)}, {yax.fmt(y)}'
ax.format_coord = format_coord
def update(self):
@ -1108,17 +1206,140 @@ class Model(EventHandler):
class ApplicationToolbar(NavigationToolbar):
"Subclass of Matplotlib's navigation toolbar adding a pause button."
def __init__(self, *args):
def __init__(self, canvas, app):
# def __init__(self, *args):
self.toolitems += (('Pause', 'Pause', PAUSE_ICON, 'pause'),)
super().__init__(*args)
super().__init__(canvas, app)
# super().__init__(*args)
self._actions['pause'].setCheckable(True)
self._app = app
self.paused = False
def pause(self):
def pause(self, event=None):
"Toggle the pause button."
self.paused = not self.paused
self._actions['pause'].setChecked(self.paused)
def empty(self):
"Is the stack of views empty?"
return self._nav_stack.empty()
class ErrorReporter(ContextDecorator):
"""Context manager which reports the traceback of any exception to the
stream provided to its constructor. Useful when exceptions are
otherwise silently ignored or reported to a stream which is not
promptly flushed.
May also be used as a decorator.
"""
def __init__(self, f):
self._f = f
def __enter__(self):
return self
def __exit__(self, ty, val, tb):
if ty is not None:
traceback.print_exception(ty, val, tb, file=self._f)
# All keyboard shortcuts. Each one is a triple:
# `(iterable, method name, documentation)`.
#
# If `iterable` is None, `documentation` is a string output as part of
# help documentation.
#
# Otherwise the members of `iterable` must match `event.key` for an
# MPL key press event. So `iterable` may be a single character, or a
# short string, or an iterable of strings.
#
# If a member has length > 1, it is downcased before matching against
# `event.key`. This is a hack to improve the help documentation, and
# may be a bad idea. Note that some length-1 keybindings are
# upper case.
#
# `method_name` should be the name of a method on ApplicationWindow,
# without the preceding underscore.
#
# If method_name is None, there is no binding. Also later entries
# over-ride earlier ones. The combination of these two facts allows
# us to give all the built-in MPL bindings as the first entries in
# this list, and just over-ride them, either with a disabling
# None/None or with our own binding. While the monitor is in active
# development this flexibility is good.
shortcuts = [
# first the shortcuts which come with the MPL navigation toolbar.
(None, None, 'Navigation bar shortcuts:'),
(('h','r', 'Home'),
'mpl_key', '"home": zoom out to the whole dataset'),
(('c', 'Backspace', 'Left'),
'mpl_key', '"back": go back to the previous view'),
(('v', 'Right'),
'mpl_key', '"forward": go forward to the next view'),
('p',
'mpl_key', '"pan/zoom": select the pan/zoom tool'),
('o',
'mpl_key', '"zoom": select the zoom-to-rectangle tool'),
(('Ctrl+S',),
'mpl_key', '"save": save the current view as a PNG file'),
('g',
'mpl_key', 'show major grid lines'),
('G',
'mpl_key', 'show minor grid lines'),
('Lk',
'mpl_key', 'toggle log/linear on time axis'),
(('Ctrl+F',),
'mpl_key', 'toggle full-screen mode'),
# Now, disabling some of the MPL's shortcuts
(('Ctrl+F',), None, None), # full-screen doesn't work
('g', None, None), # no major grids
('G', None, None), # no useful minor grids
('L', None, None), # log time axis not useful
('k', None, None), # log time axis not useful
# Now, our own shortcuts, some of which over-ride MPL ones.
(None, None, 'Other shortcuts:'),
(('Ctrl+W',),
'close', 'Close the monitor'),
('l',
'toggle_log_linear', 'Toggle log/linear byte scale'),
(('Right',),
'next_point', 'Select next point of selected series'),
(('Left',),
'previous_point', 'Select previous point of selected series'),
(('Up',),
'up_line', 'Select point on higher line'),
(('Down',),
'down_line', 'Select point on lower line'),
(('PageUp',),
'slower', 'Double time constant for time-dependent series'),
(('PageDown',),
'faster', 'Halve time constant for time-dependent series'),
(('Pause',),
'pause', 'Freeze/thaw axis limits'),
('+',
'zoom_in', 'Zoom in'),
('-',
'zoom_out', 'Zoom out'),
('z',
'zoom', 'Zoom in to selected point'),
('i',
'info', 'report detailed info on selected point'),
('?h',
'help', 'report help'),
]
# The set of keyboard modifiers, which we need to know so we avoid
# reporting their presses in the log pane.
modifiers = (
'shift',
'control',
'alt',
'super', # Windows key on my keyboard
'ctrl+alt' # AltGr on my keyboard
)
class ApplicationWindow(QtWidgets.QMainWindow):
"""PyQt5 application displaying time series derived from MPS telemetry
@ -1181,15 +1402,17 @@ class ApplicationWindow(QtWidgets.QMainWindow):
for yax in self._axes_dict:
self._axes_dict[yax].set_ylabel(yax.label())
self._axes_dict[yax].set_xlabel("time (seconds)")
self._axes_dict[yax].set_yscale('linear')
# bytes tick labels in megabytes etc.
bytes_axes.ticklabel_format(style='plain')
bytes_axes.yaxis.set_major_formatter(ticker.FuncFormatter(bytesTickFormatter))
self._log_scale = False
# make a toolbar and put it on the top of the whole layout.
self._toolbar = ApplicationToolbar(self._canvas, self)
self.addToolBar(QtCore.Qt.TopToolBarArea, self._toolbar)
# Below the splitter, a text box for logging
# Below the splitter, a logging pane
self._logbox = QtWidgets.QTextEdit()
self._logbox.setReadOnly(True)
self._logbox.setLineWrapMode(True)
@ -1206,8 +1429,8 @@ class ApplicationWindow(QtWidgets.QMainWindow):
self._line_annotation.get_bbox_patch().set_alpha(0.8)
self._canvas.mpl_connect("button_release_event", self._click)
# points close to time of most recent selection, on each line, in increasing y order
# (line, index, ...)
# points close in time to the most recent selection, on each
# line, in increasing y order (line, index, ...)
self._close_points = None
# Map from line to index into self._close_points
self._close_line = None
@ -1216,27 +1439,25 @@ class ApplicationWindow(QtWidgets.QMainWindow):
# Things drawn for the current selection
self._drawn = []
# shortcuts
self._shortcuts = {}
for kl, method, doc in shortcuts:
if kl is None:
continue
for k in kl:
if len(k) > 1:
k = k.lower()
if method is None:
self._shortcuts.pop(k, None)
else:
self._shortcuts[k] = getattr(self,'_'+method), doc
# pass all keystrokes to on_key_press, where we can capture them or pass them on to
# the toolbar.
self._canvas.mpl_connect('key_press_event', self._on_key_press)
self._canvas.setFocusPolicy(QtCore.Qt.StrongFocus)
self._canvas.setFocus()
# keyboard shortcuts (see on_key_press()).
self._shortcuts = {'ctrl+W': self.close,
'right': self._next_point,
'left': self._previous_point,
'up': self._up_line,
'down': self._down_line,
'pageup': self._slower,
'pagedown': self._faster,
'pause': self._toolbar.pause,
'+': self._zoom_in,
'-': self._zoom_out,
'z': self._zoom,
'i': self._info,
}
# Call self._update in a loop forever.
self._update()
self._timer = self._canvas.new_timer(100, [(self._update, (), {})])
@ -1244,41 +1465,79 @@ class ApplicationWindow(QtWidgets.QMainWindow):
def _log(self, msg):
"Append msg to the log box."
self._logbox.append(msg)
if msg:
self._logbox.append(msg)
# this pair of methods should let us redirect stdout to self,
# useful when debugging (see also ErrorReporter).
def write(self, msg):
self._log(msg.rstrip('\n'))
def flush(self):
pass
def _on_key_press(self, event):
"""Handle a keyboard event, either one of our shortcuts or something for the
navigation toolbar."""
if event.key in self._shortcuts:
self._shortcuts[event.key]()
else:
key_press_handler(event, self._canvas, self._toolbar)
"""Handle a keyboard event."""
with ErrorReporter(self):
if event.key in self._shortcuts:
self._shortcuts[event.key][0](event)
elif event.key not in modifiers:
self._log(f"Unknown key '{event.key}'")
def _next_point(self):
def _mpl_key(self, event):
"""Hand a key-press even on to the toolbar."""
key_press_handler(event, self._canvas, self._toolbar)
def _help(self, event):
"""Report keyboard help to the log pane."""
self._log('Keyboard shortcuts:')
for kl, method, doc in shortcuts:
if kl is None:
self._log(doc)
continue
if doc is not None:
ks = '/'.join(k for k in kl
if self._shortcuts.get(k if len(k) == 1 else k.lower(),
(None,None))[1] == doc)
if ks:
self._log(f' {ks}\t{doc}')
def _pause(self, event):
"""Toggle pausing of axis limit updates."""
self._toolbar.pause()
def _close(self, event):
"""Close the monitor application."""
self.close()
def _toggle_log_linear(self, event):
"""Toggle the bytes axis between log and linear scales."""
yscale = 'linear' if self._log_scale else 'log'
self._axes_dict[bytesAxis].set_yscale(yscale)
self._axes_dict[bytesAxis].yaxis.set_major_formatter(ticker.FuncFormatter(bytesTickFormatter))
self._log_scale = not self._log_scale
self._log(f'Switched bytes axis to {yscale} scale.')
def _next_point(self, event):
"""Select the next point on the selected line."""
if self._close_points is None:
return
for d in self._drawn:
d.set_visible(False)
line, index = self._close_points[self._selected]
self._select(line, index + 1)
def _previous_point(self):
def _previous_point(self, event):
"""Select the previous point on the selected line."""
if self._close_points is None:
return
for d in self._drawn:
d.set_visible(False)
line, index = self._close_points[self._selected]
self._select(line, index - 1)
def _up_line(self):
def _up_line(self, event):
"""Select the point on the line above the currently selected point."""
if self._selected is None:
return
self._annotate(self._selected + 1)
def _down_line(self):
def _down_line(self, event):
"""Select the point on the line below the currently selected point."""
if self._selected is None:
return
@ -1289,13 +1548,13 @@ class ApplicationWindow(QtWidgets.QMainWindow):
if index < 0 or index >= len(line):
return
t, y = line[index]
self._recentre(mid=t)
self._recentre(mid=t, force=False)
dispx, _ = line.dispxy(index)
self._find_close(t, dispx, on_line=line, index=index)
self._annotate(self._close_line[line])
def _clear(self):
"Remove annotations."
"Remove all annotations and visible markings of selected points."
self._line_annotation.set_visible(False)
for d in self._drawn:
d.set_visible(False)
@ -1318,7 +1577,7 @@ class ApplicationWindow(QtWidgets.QMainWindow):
x, y = line[index]
note, log = line.note(index)
if note is None:
note = [f"{line.name}",f"{x:f} s",f"{line.yaxis.fmt(y)}"]
note = [f"{line.name}",f"{dur(x)}",f"{line.yaxis.fmt(y)}"]
log = ' '.join(note)
note = '\n'.join(note)
self._log(log)
@ -1332,7 +1591,7 @@ class ApplicationWindow(QtWidgets.QMainWindow):
a.set_visible(True)
self._drawn += line.drawPoint(index, self._axes_dict)
def _info(self):
def _info(self, event):
"Report more information about the currently selected point."
if self._close_points is None:
self._log('No selected data point')
@ -1360,17 +1619,23 @@ class ApplicationWindow(QtWidgets.QMainWindow):
self._close_points.append((line, index))
def _recompute(self, factor):
"Scale all time constants by some factor."
self._log(f'Scaling time constants by a factor {factor}:...')
selected_line, _ = self._close_points[self._selected]
for line in self._model.lines:
log = line.recompute(factor)
if log:
self._log(f' {line.name}: {log}')
if line == selected_line:
self._clear()
self._model.needs_redraw()
def _slower(self):
def _slower(self, event):
"Double all time constants."
self._recompute(2)
def _faster(self):
def _faster(self, event):
"Halve all time constants."
self._recompute(0.5)
def _click(self, event):
@ -1395,15 +1660,20 @@ class ApplicationWindow(QtWidgets.QMainWindow):
self._unselect()
self._clear()
def _zoom_in(self):
def _zoom_in(self, event):
"Zoom in by a factor of 2."
self._recentre(zoom=2)
def _zoom_out(self):
def _zoom_out(self, event):
"Zoom out by a factor of 2."
self._recentre(zoom=0.5)
def _zoom(self):
def _zoom(self, event):
"""Zoom in to current data point, by a factor of two or to the point's
natural limits. If there's no current point, zoom in by a factor of 2."""
if self._close_points is None:
self._log('No selected data point')
self._zoom_in(event)
return
line, index = self._close_points[self._selected]
lim = line.zoom(index)
@ -1412,20 +1682,44 @@ class ApplicationWindow(QtWidgets.QMainWindow):
else: # make a bit of slack
lo, hi = lim
width = hi-lo
self._zoom_to(lo - width/4, hi + width/4)
self._zoom_to(lo - width/8, hi + width/8)
def _recentre(self, zoom=1.0, mid=None):
def _recentre(self, zoom=1.0, mid=None, force=True):
"""Recentre on `mid`, if given, and zoom in or out by factor `zoom`.
If `force` is false, and `mid` is near the middle of the resulting box,
or near the lowest time, or near the highest time, don't do it."""
xlim, _ = self._limits
tmin, tmax = self._time_range
lo, hi = xlim
half_width = (hi-lo)/2/zoom
if mid is None:
mid = (hi+lo)/2
half_width = (hi-lo)/2
newlo, newhi = mid- half_width/zoom, mid + half_width/zoom
elif not force:
if mid-lo > half_width/4 and hi-mid > half_width/4:
# if data point is in centre half, don't shift
return
if mid < lo + half_width/4 and tmin > lo:
# don't shift left if lowest T is already displayed
return
if mid > hi - half_width/4 and tmax < hi:
# don't shift right if highest T is already displayed
return
newlo = max(tmin - (tmax-tmin)/16, mid- half_width)
newhi = min(tmax + (tmax-tmin)/16, mid + half_width)
self._zoom_to(newlo, newhi)
def _zoom_to(self, lo, hi):
"""Redraw with new limits on the time axis."""
ax = self._axes_dict[bytesAxis]
if self._toolbar.empty():
self._toolbar.push_current()
ax.set_xlim(lo, hi)
self._toolbar.push_current()
@property
def _time_range(self):
return (min(line[0][0] for line in self._model.lines if line.ready),
max(line[0][0] for line in self._model.lines if line.ready))
@property
def _limits(self):
@ -1434,58 +1728,58 @@ class ApplicationWindow(QtWidgets.QMainWindow):
return ax.get_xlim(), ax.get_ylim()
def _update(self):
"Update the model and redraw if not paused."
if (not self._toolbar.paused
and self._home_limits not in (None, self._limits)):
# Limits changed (for example, because user zoomed in), so
# pause further updates to the limits of all axes, to give
# user a chance to explore.
self._toolbar.pause()
self._home_limits = None
self._model.update()
self._model.plot(self._axes_dict, keep_limits = self._toolbar.paused)
if not self._toolbar.paused:
self._home_limits = self._limits
self._canvas.draw()
with ErrorReporter(self):
"Update the model and redraw if not paused."
if (not self._toolbar.paused
and self._home_limits not in (None, self._limits)):
# Limits changed (for example, because user zoomed in), so
# pause further updates to the limits of all axes, to give
# user a chance to explore.
self._toolbar.pause()
self._home_limits = None
self._model.update()
self._model.plot(self._axes_dict, keep_limits = self._toolbar.paused)
if not self._toolbar.paused:
self._home_limits = self._limits
self._canvas.draw()
# Find new time series and create corresponding checkboxes.
checkboxes_changed = False
for line in self._model.lines:
if not line.ready:
continue
new_name = line.name
if line in self._line_checkbox:
# A line's name can change dynamically (for example, because
# of the creation of a second arena, or a Label event), so
# ensure that it is up to date.
old_name = self._line_checkbox[line].text()
if old_name != new_name:
self._line_checkbox[line].setText(new_name)
# Find new time series and create corresponding checkboxes.
checkboxes_changed = False
for line in self._model.lines:
if not line.ready:
continue
new_name = line.name
if line in self._line_checkbox:
# A line's name can change dynamically (for example, because
# of the creation of a second arena, or a Label event), so
# ensure that it is up to date.
old_name = self._line_checkbox[line].text()
if old_name != new_name:
self._line_checkbox[line].setText(new_name)
checkboxes_changed = True
else:
checkboxes_changed = True
else:
checkboxes_changed = True
checkbox = QtWidgets.QCheckBox(new_name)
self._line_checkbox[line] = checkbox
checkbox.setChecked(line.draw)
checkbox.setToolTip(f"{line.desc} ({line.yaxis.label()})")
self._lines.addWidget(checkbox)
def state_changed(state, line=line):
self._unselect(line)
line.draw = bool(state)
self._model.needs_redraw()
checkbox.stateChanged.connect(state_changed)
checkbox.setStyleSheet(f"color:{line.color}")
# Sort checkboxes into order by name and update width.
if checkboxes_changed:
checkboxes = self._line_checkbox.values()
for checkbox in checkboxes:
self._lines.removeWidget(checkbox)
for checkbox in sorted(checkboxes, key=lambda c:c.text()):
self._lines.addWidget(checkbox)
self._lines_scroll.setFixedWidth(
self._lines_widget.sizeHint().width())
checkbox = QtWidgets.QCheckBox(new_name)
self._line_checkbox[line] = checkbox
checkbox.setChecked(line.draw)
checkbox.setToolTip(f"{line.desc} ({line.yaxis.label()})")
self._lines.addWidget(checkbox)
def state_changed(state, line=line):
self._unselect(line)
line.draw = bool(state)
self._model.needs_redraw()
checkbox.stateChanged.connect(state_changed)
checkbox.setStyleSheet(f"color:{line.color}")
# Sort checkboxes into order by name and update width.
if checkboxes_changed:
checkboxes = self._line_checkbox.values()
for checkbox in checkboxes:
self._lines.removeWidget(checkbox)
for checkbox in sorted(checkboxes, key=lambda c:c.text()):
self._lines.addWidget(checkbox)
self._lines_scroll.setFixedWidth(
self._lines_widget.sizeHint().width())
def main():
parser = argparse.ArgumentParser(description="Memory Pool System Monitor.")