1
Fork 0
mirror of git://git.sv.gnu.org/emacs.git synced 2026-01-04 11:00:45 -08:00
emacs/mps/tool/monitor
Gareth Rees df41d66de5 Don't represent capacities internally in kilobytes, convert once on initialization.
In the monitor, start average mortality time series with the initial predicted value.

Copied from Perforce
 Change: 194565
2018-07-10 13:10:00 +01:00

864 lines
31 KiB
Python
Executable file

#!/usr/bin/env python
#
# $Id$
# Copyright (c) 2018 Ravenbrook Limited. See end of file for license.
#
# Read a telemetry stream from a program using the MPS, construct a
# model of the MPS data structures in the progam, and display selected
# time series from the model in a graphical user interface.
#
# Requirements: Python 3.6, Matplotlib, PyQt5.
import argparse
from collections import defaultdict, deque, namedtuple
from itertools import count, cycle
import os
import queue
from struct import Struct
import sys
import threading
import time
from matplotlib.backends.qt_compat import QtCore, QtGui, QtWidgets
from matplotlib.backends.backend_qt5agg import (
FigureCanvas, NavigationToolbar2QT as NavigationToolbar)
from matplotlib.figure import Figure
import mpsevent
# Mapping from event code to a namedtuple for that event.
EVENT_NAMEDTUPLE = {
code: namedtuple(desc.name, ['header'] + [p.name for p in desc.params])
for code, desc in mpsevent.EVENT.items()
}
# Mapping from event code to event name.
EVENT_NAME = {code:desc.name for code, desc in mpsevent.EVENT.items()}
# Unpack function for event header.
HEADER_UNPACK = Struct(mpsevent.HEADER_FORMAT).unpack
# Unpack function for each event code.
EVENT_UNPACK = {c:Struct(d.format).unpack for c, d in mpsevent.EVENT.items()}
# Icon for the toolbar pause button.
PAUSE_ICON = os.path.abspath(os.path.join(os.path.dirname(__file__), 'pause'))
def telemetry_decoder(read):
"""Decode the events in an I/O stream and generate batches of events
as lists of pairs (time, event) in time order, where time is CPU
time in seconds and event is a tuple.
Unknown event codes are read but ignored.
The 'read' argument must be a function implementing the
io.RawIOBase.read specification (that is, it takes a size and
returns up to size bytes from the I/O stream).
"""
# Cache frequently-used values in local variables.
header_desc = mpsevent.HeaderDesc
header_size = mpsevent.HEADER_SIZE
event_dict = mpsevent.EVENT
event_namedtuple = EVENT_NAMEDTUPLE
event_unpack = EVENT_UNPACK
header_unpack = HEADER_UNPACK
EventClockSync_code = mpsevent.Event.EventClockSync.code
EventInit_code = mpsevent.Event.EventInit.code
# Special handling for Intern events.
Intern_desc = mpsevent.Event.Intern
Intern_code = Intern_desc.code
Intern_struct = Struct(Intern_desc.format)
Intern_size = Intern_struct.size
Intern_unpack = Intern_struct.unpack
Intern_namedtuple = event_namedtuple[Intern_code]
batch = [] # Current batch of (unordered) events.
eventclocks = deque(maxlen=2) # Eventclocks from last two EventClockSync.
clocks = deque(maxlen=2) # Clocks from last two EventClockSync.
clocks_per_sec = None # CLOCKS_PER_SEC value from EventInit event.
def key(event):
# Key function for sorting events into time order.
return event.header.clock
def decoder(n=None):
# Generate up to n batches of events decoded from the I/O stream.
nonlocal clocks_per_sec
for _ in (count() if n is None else range(n)):
header_data = read(header_size)
if not header_data:
break
header = header_desc(*header_unpack(header_data))
code = header.code
size = header.size - header_size
if code == Intern_code:
event_desc = event_dict[code]
assert size <= event_desc.maxsize
event = Intern_namedtuple(
header,
*Intern_unpack(read(Intern_size)),
read(size - Intern_size).rstrip(b'\0'))
elif code in event_dict:
event_desc = event_dict[code]
assert size == event_desc.maxsize
event = event_namedtuple[code](
header, *event_unpack[code](read(size)))
else:
# Unknown code might indicate a new event added since
# mpsevent.py was updated, so just read and ignore.
read(size)
continue
batch.append(event)
if event.header.code == EventClockSync_code:
# Events are output in batches terminated by an EventClockSync
# event. So when we see an EventClockSync event, we know that
# we've received all events up to that one and can sort and
# emit the batch.
#
# The Time Stamp Counter frequency can vary due to thermal
# throttling, turbo boost etc., so linearly interpolate within
# each batch to convert to clocks and thence to seconds. (This
# requires at least two EventClockSync events.)
#
# In theory the Time Stamp Counter can wrap around, but it is
# a 64-bit register even on IA-32, and at 2.5 GHz it will take
# hundreds of years to do so, so we ignore this possibility.
#
# TODO: on 32-bit platforms at 1 MHz, clock values will wrap
# around in about 72 minutes and so this needs to be handled.
eventclocks.append(event.header.clock)
clocks.append(event.clock)
if len(clocks) == 2:
batch.sort(key=key)
# Solve for: seconds = m * eventclocks + c
m = ((clocks[1] - clocks[0])
/ ((eventclocks[1] - eventclocks[0]) * clocks_per_sec))
c = clocks[0] / clocks_per_sec - m * eventclocks[0]
yield [(m * e.header.clock + c, e) for e in batch]
batch.clear()
elif event.header.code == EventInit_code:
stream_version = event.major, event.median, event.minor
if stream_version[:2] != mpsevent.__version__[:2]:
raise RuntimeError(
"Monitor version {} is incompatible with "
"telemetry stream version {}.".format(
'.'.join(map(str, mpsevent.__version__)),
'.'.join(map(str, stream_version))))
clocks_per_sec = event.clocksPerSec
return decoder
def bits_of_word(w, n):
"Generate the bits in the word w, which has n bits."
for _ in range(n):
w, bit = divmod(w, 2)
yield bit
class TimeSeries:
"Series of data points in time order."
def __init__(self):
self.t = []
self.y = []
def append(self, t, y):
"Append data y at time t."
assert not self.t or t >= self.t[-1]
self.t.append(t)
self.y.append(y)
class Accumulator(TimeSeries):
"Time series that is always non-negative and updates by accumulation."
def __init__(self, initial=0):
super().__init__()
self.value = initial
def add(self, t, delta):
"Add delta to the accumulator at time t."
assert self.value >= -delta
self.append(t, self.value)
self.value += delta
self.append(t, self.value)
def sub(self, t, delta):
"Subtract delta from the accumulator at time t."
assert self.value >= delta
self.append(t, self.value)
self.value -= delta
self.append(t, self.value)
class MovingAverageRatio(TimeSeries):
"Exponentially weighted moving average on/off ratio."
def __init__(self, t, alpha=0.99):
super().__init__()
self._on = 0.0
self._off = 0.0
self._last = t
self._alpha = alpha
self._beta = 1 - alpha
def off(self, t):
self._on = self._on * self._alpha + (t - self._last) * self._beta
self._last = t
self._ratio = self._on / (self._on + self._off)
self.append(t, self._ratio)
def on(self, t):
self._off = self._off * self._alpha + (t - self._last) * self._beta
self._last = t
class EventHandler:
"""Object that handles a telemetry event by dispatching to the method
with the same name as the event.
"""
def ignore(self, t, event):
"Handle a telemetry event at time t by doing nothing."
def handle(self, t, event):
"Handle a telemetry event at time t by dispatching."
getattr(self, EVENT_NAME[event.header.code], self.ignore)(t, event)
class Pool(EventHandler):
"Model of an MPS pool."
def __init__(self, arena, pointer, t):
"Create Pool owned by arena, at pointer, at time t."
self._arena = arena # Owning arena.
self._model = arena.model # Owning model.
self._pointer = pointer # Pool's pointer.
self._pool_class = None # Pool's class pointer.
self._serial = None # Pool's serial number within arena.
self._alloc = Accumulator()
self._model.add_time_series(
self, self._alloc, "bytes", "alloc",
"memory allocated by the pool from the arena")
@property
def name(self):
name = self._model.label(self._pointer)
if not name:
class_name = self._model.label(self._pool_class) or 'Pool'
if self._serial is not None:
name = f"{class_name}[{self._serial}]"
else:
name = f"{class_name}[{self._pointer:x}]"
return f"{self._arena.name}.{name}"
def ArenaAlloc(self, t, event):
self._alloc.add(t, event.size)
def ArenaFree(self, t, event):
self._alloc.sub(t, event.size)
def PoolInit(self, t, event):
self._pool_class = event.poolClass
self._serial = event.serial
class Gen(EventHandler):
"Model of an MPS generation."
def __init__(self, arena, pointer):
self._arena = arena # Owning arena.
self._model = arena.model # Owning model.
self._pointer = pointer # Gen's pointer.
self._serial = None # Gen's serial number.
self.zone_set = 0 # Gen's current zone set.
def update_ref_size(self, t, seg_summary, seg_size):
"""Update the size of segments referencing this generation.
seg_summary must be a mapping from segment to its summary, and
seg_size a mapping from segment to its size in bytes.
"""
ref_size = 0
for seg, summary in seg_summary.items():
if self.zone_set & summary:
ref_size += seg_size[seg]
self._ref_size.append(t, ref_size)
@property
def name(self):
name = self._model.label(self._pointer)
if not name:
if self._serial is not None:
name = f"gen-{self._serial}"
else:
name = f"gen-{self._pointer:x}"
return f"{self._arena.name}.{name}"
def GenZoneSet(self, t, event):
self.zone_set = event.zoneSet
def GenInit(self, t, event):
self._serial = serial = event.serial
self._mortality_trace = mortality_trace = TimeSeries()
self._model.add_time_series(
self, mortality_trace, "fraction", f"mortality.trace",
f"mortality of data in generation, per trace")
self._mortality_average = mortality_average = TimeSeries()
self._model.add_time_series(
self, mortality_average, "fraction", f"mortality.avg",
f"mortality of data in generation, moving average")
mortality_average.append(t, event.mortality);
self._ref_size = ref_size = TimeSeries()
self._model.add_time_series(
self, ref_size, "bytes", f"ref",
f"size of segments referencing generation")
def TraceEndGen(self, t, event):
self._mortality_trace.append(t, event.mortalityTrace)
self._mortality_average.append(t, event.mortalityAverage)
class Arena(EventHandler):
"Model of an MPS arena."
# Number of pools that are internal to the arena; see the list in
# global.c:GlobalsPrepareToDestroy.
_internal_pools = 4
def __init__(self, model, pointer, t):
"Create Arena owned by model, at pointer, at time t."
self.model = model # Owning model.
self._pointer = pointer # Arena's pointer.
self._arena_class = None # Arena's class pointer.
self._serial = None # Arena's serial number.
self._pools = [] # List of Pools ever belonging to arena.
self._pool = {} # pointer -> Pool (for live pools)
self._gens = [] # List of Gens ever belonging to arena.
self._gen = {} # pointer -> Gen (for live gens)
self._alloc = Accumulator()
self.model.add_time_series(
self, self._alloc, "bytes", "alloc",
"total allocation by client pools")
self._poll = MovingAverageRatio(t)
self.model.add_time_series(
self, self._poll, "fraction", "poll",
"polling time moving average")
self._seg_size = {} # segment pointer -> size
self._seg_summary = {} # segment pointer -> summary
self._zone_ref_size = {} # zone -> refsize Accumulator
self._univ_ref_size = Accumulator()
self.model.add_time_series(
self, self._univ_ref_size, "bytes", "zone-univ.ref",
"size of segments referencing the universe")
self._condemned_gens = TimeSeries()
self.model.add_time_series(
self, self._condemned_gens, "fraction", "condemned.gens",
"proportion of chain condemned by trace", marker='x',
linestyle='None')
self._condemned_size = TimeSeries()
self.model.add_time_series(
self, self._condemned_size, "bytes", "condemned.size",
"size of segments condemned by trace", marker='+',
linestyle='None')
@property
def name(self):
if len(self.model.arenas) <= 1:
# No need to distinguish arenas if there's just one.
return ""
name = self.model.label(self._pointer)
if not name:
class_name = self.model.label(self._arena_class) or 'Arena'
if self._serial is not None:
name = f"{class_name}[{self._serial}]"
else:
name = f"{class_name}[{self._pointer:x}]"
return name
def delegate_to_pool(self, t, event):
"Handle a telemetry event by delegating to the pool model."
pointer = event.pool
try:
pool = self._pool[pointer]
except KeyError:
self._pool[pointer] = pool = Pool(self, pointer, t)
self._pools.append(pool)
pool.handle(t, event)
def ArenaAlloc(self, t, event):
self.delegate_to_pool(t, event)
if self._pool[event.pool]._serial >= self._internal_pools:
self._alloc.add(t, event.size)
def ArenaFree(self, t, event):
self.delegate_to_pool(t, event)
if self._pool[event.pool]._serial >= self._internal_pools:
self._alloc.sub(t, event.size)
PoolInit = \
delegate_to_pool
def delegate_to_gen(self, t, event):
"Handle a telemetry event by delegating to the generation model."
pointer = event.gen
try:
gen = self._gen[pointer]
except KeyError:
self._gen[pointer] = gen = Gen(self, pointer)
self._gens.append(gen)
gen.handle(t, event)
GenInit = \
GenZoneSet = \
TraceEndGen = \
delegate_to_gen
def ArenaCreateVM(self, t, event):
self._arena_class = event.arenaClass
self._serial = event.serial
ArenaCreateCL = ArenaCreateVM
def PoolFinish(self, t, event):
del self._pool[event.pool]
def GenFinish(self, t, event):
del self._gen[event.gen]
def ArenaPollBegin(self, t, event):
self._poll.on(t)
def ArenaPollEnd(self, t, event):
self._poll.off(t)
def ChainCondemnAuto(self, t, event):
f = (event.topCondemnedGenIndex + 1) / (event.genCount + 1)
self._condemned_gens.append(t, f)
def TraceCondemnAll(self, t, event):
self._condemned_gens.append(t, 1.0)
def TraceStart(self, t, event):
self._condemned_size.append(t, event.condemned)
if self._seg_summary:
for gen in self._gen.values():
gen.update_ref_size(t, self._seg_summary, self._seg_size)
def SegSetSummary(self, t, event):
size = event.size
self._seg_summary[event.seg] = event.newSummary
self._seg_size[event.seg] = size
n = self.model.word_width
univ = (1 << n) - 1
new_univ = event.newSummary == univ
old_univ = event.oldSummary == univ
self._univ_ref_size.add(t, (new_univ - old_univ) * size)
old_summary = 0 if old_univ else event.oldSummary
new_summary = 0 if new_univ else event.newSummary
for zone, old, new in zip(reversed(range(n)),
bits_of_word(old_summary, n),
bits_of_word(new_summary, n)):
if new == old:
continue
if zone not in self._zone_ref_size:
self._zone_ref_size[zone] = ref_size = Accumulator()
self.model.add_time_series(
self, ref_size, "bytes", f"zone-{zone}.ref",
f"size of segments referencing zone {zone}")
self._zone_ref_size[zone].add(t, (new - old) * size)
class Line:
"A line in a Matplotlib plot wrapping a TimeSeries."
colors = cycle('blue orange green red purple brown pink gray olive cyan'
.split())
def __init__(self, owner, series, unit, name, desc, **kwargs):
self.owner = owner # Owning object.
self.series = series # Time series.
self.unit = unit # Unit.
self._name = name # Brief description.
self.desc = desc # Brief description.
self.draw = True # Plot this line?
self.color = next(self.colors)
self.axes = None # Currently plotted on axes.
self.line = None # Matplotlib Line2D object.
self._kwargs = kwargs # Keyword arguments for Axes.plot.
@property
def name(self):
return f"{self.owner.name}.{self._name}"
@property
def ready(self):
return len(self.series.t) >= 1
def unplot(self):
if self.axes:
self.line.remove()
self.axes = None
def plot(self, axes):
"Plot or update line on axes."
x = self.series.t
y = self.series.y
if self.line is None:
self.axes = axes
self.line, = axes.plot(x, y, color=self.color, label=self.name,
**self._kwargs)
else:
if self.axes != axes:
self.unplot()
axes.add_line(self.line)
self.axes = axes
self.line.set_data(x, y)
self.line.set_label(self.name)
def contains(self, event):
if self.line is None:
return False, None
return self.line.contains(event)
class Model(EventHandler):
"Model of an application using the MPS."
def __init__(self, event_queue):
"Create model based on queue of batches of telemetry events."
self._queue = event_queue
self._intern = {} # stringId -> string
self._label = {} # address or pointer -> stringId
self._arena = {} # pointer -> Arena (for live arenas)
self.arenas = [] # All arenas created in the model.
self.lines = [] # All Lines available for plotting.
self._needs_redraw = True # Plot needs redrawing?
def add_time_series(self, *args, **kwargs):
"Add a time series to the model."
line = Line(*args, **kwargs)
self.lines.append(line)
def label(self, pointer):
"Return string labelling address or pointer, or None if unlabelled."
return self._intern.get(self._label.get(pointer))
def plot(self, axes_list):
"Draw time series on the given axes."
if not self._needs_redraw:
return
self._needs_redraw = False
# Collate drawable lines by unit.
unit_lines = defaultdict(list)
for line in self.lines:
if line.ready and line.draw:
unit_lines[line.unit].append(line)
else:
line.unplot()
# Assign units to axes.
axes_iter = iter(axes_list)
lines_iter = iter(sorted(unit_lines.items()))
for (_, lines), axes in zip(lines_iter, axes_iter):
axes.set_axis_on()
axes.set_xlabel("time (seconds)")
axes.set_ylabel(lines[0].unit)
for line in lines:
line.plot(axes)
axes.relim()
axes.autoscale_view()
# Remaining axes are not needed.
for axes in axes_iter:
axes.set_axis_off()
def update(self):
"Consume available telemetry events and update the model."
while True:
try:
batch = self._queue.get_nowait()
except queue.Empty:
break
else:
for t, event in batch:
self.handle(t, event)
def needs_redraw(self):
"Call this when the model needs redrawing."
self._needs_redraw = True
def delegate_to_arena(self, t, event):
"Handle a telemetry event by delegating to the arena model."
addr = event.arena
try:
arena = self._arena[addr]
except KeyError:
self._arena[addr] = arena = Arena(self, addr, t)
self.arenas.append(arena)
arena.handle(t, event)
ArenaAlloc = \
ArenaCreateCL = \
ArenaCreateVM = \
ArenaFree = \
ArenaPollBegin = \
ArenaPollEnd = \
ChainCondemnAuto = \
GenInit = \
GenFinish = \
GenZoneSet = \
PoolFinish = \
PoolInit = \
SegSetSummary = \
TraceCondemnAll = \
TraceEndGen = \
TraceStart = \
delegate_to_arena
def EventClockSync(self, t, event):
self.needs_redraw()
def Intern(self, t, event):
self._intern[event.stringId] = event.string.decode('ascii', 'replace')
def Label(self, t, event):
self._label[event.address] = event.stringId
def LabelPointer(self, t, event):
self._label[event.pointer] = event.stringId
def ArenaDestroy(self, t, event):
del self._arena[event.arena]
def EventInit(self, t, event):
self.word_width = event.wordWidth
class ApplicationToolbar(NavigationToolbar):
"Subclass of Matplotlib's navigation toolbar adding a pause button."
def __init__(self, *args):
self.toolitems += (('Pause', 'Pause', PAUSE_ICON, 'pause'),)
super().__init__(*args)
self._actions['pause'].setCheckable(True)
self.paused = False
def pause(self):
"Toggle the pause button."
self.paused = not self.paused
self._actions['pause'].setChecked(self.paused)
class ApplicationWindow(QtWidgets.QMainWindow):
"""PyQt5 application displaying time series derived from MPS telemetry
output.
"""
def __init__(self, model : Model, title : str):
"""Create application. 'model' is the MPS model whose time series are
to be displayed, and 'title' is the main window title.
"""
super().__init__()
self._model = model # The MPS model.
self._home_limits = None # Limits of the graph in "home" position.
self._line_checkbox = {} # Line -> QCheckbox
self.setWindowTitle(title)
# Control-W (Command-W on macOS) shortcut for closing the window.
shortcut = QtWidgets.QShortcut(QtGui.QKeySequence("Ctrl+W"), self)
shortcut.activated.connect(self.close)
main = QtWidgets.QWidget()
self.setCentralWidget(main)
main_layout = QtWidgets.QHBoxLayout(main)
# Scrollable list of checkboxes, one for each time series.
self._lines = QtWidgets.QVBoxLayout()
self._lines_scroll = QtWidgets.QScrollArea(
horizontalScrollBarPolicy=QtCore.Qt.ScrollBarAlwaysOff)
self._lines_widget = QtWidgets.QWidget()
lines_layout = QtWidgets.QVBoxLayout(self._lines_widget)
lines_layout.addLayout(self._lines)
lines_layout.addStretch(1)
self._lines_scroll.setWidget(self._lines_widget)
self._lines_scroll.setWidgetResizable(True)
main_layout.addWidget(self._lines_scroll)
# Matplot canvas and toolbar.
self._canvas = FigureCanvas(Figure(figsize=(10, 6)))
self._axes = self._canvas.figure.subplots()
self._axes2 = self._axes.twinx()
main_layout.addWidget(self._canvas)
self._toolbar = ApplicationToolbar(self._canvas, self)
self.addToolBar(QtCore.Qt.BottomToolBarArea, self._toolbar)
# Line annotations.
self._line_annotation = self._axes.annotate(
"", xy=(0, 0), xytext=(-20, 20),
textcoords='offset points',
bbox=dict(boxstyle='round', fc='w'),
arrowprops=dict(arrowstyle='->'),
visible=False)
self._line_annotation.get_bbox_patch().set_alpha(0.8)
self._annotated_line = None
self._canvas.mpl_connect("button_release_event", self._click)
# Call self._update in a loop forever.
self._update()
self._timer = self._canvas.new_timer(100, [(self._update, (), {})])
self._timer.start()
def _click(self, event):
"Handle left mouse click by annotating line clicked on."
if event.button != 1 or not event.inaxes:
return
a = self._line_annotation
for line in self._model.lines:
if not (line.ready and line.draw):
continue
contains, index = line.contains(event)
if contains:
x, y = line.line.get_data()
i = index['ind'][0]
a.remove()
line.axes.add_artist(a)
a.xy = x[i], y[i]
a.set_text(line.name)
a.set_visible(True)
self._annotated_line = line
break
else:
a.set_visible(False)
self._annotated_line = None
@property
def _limits(self):
"Current x and y limits of the Matplotlib graph."
return self._axes.get_xlim(), self._axes.get_ylim()
def _update(self):
"Update the model and redraw if not paused."
if (not self._toolbar.paused
and self._home_limits not in (None, self._limits)):
# Limits changed (for example, because user zoomed in), so pause
# further updates to give user a chance to explore.
self._toolbar.pause()
self._home_limits = None
self._model.update()
if not self._toolbar.paused:
self._model.plot([self._axes, self._axes2])
self._home_limits = self._limits
self._canvas.draw()
# Find new time series and create corresponding checkboxes.
checkboxes_changed = False
for line in self._model.lines:
if not line.ready:
continue
new_name = line.name
if line in self._line_checkbox:
# A line's name can change dynamically (for example, because
# of the creation of a second arena, or a Label event), so
# ensure that it is up to date.
old_name = self._line_checkbox[line].text()
if old_name != new_name:
self._line_checkbox[line].setText(new_name)
checkboxes_changed = True
else:
checkboxes_changed = True
checkbox = QtWidgets.QCheckBox(new_name)
self._line_checkbox[line] = checkbox
checkbox.setChecked(True)
checkbox.setToolTip(f"{line.desc} ({line.unit})")
self._lines.addWidget(checkbox)
def state_changed(state, line=line):
line.draw = bool(state)
if line == self._annotated_line:
self._line_annotation.set_visible(False)
self._model.needs_redraw()
checkbox.stateChanged.connect(state_changed)
checkbox.setStyleSheet(f"color:{line.color}")
# Sort checkboxes into order by name and update width.
if checkboxes_changed:
checkboxes = self._line_checkbox.values()
for checkbox in checkboxes:
self._lines.removeWidget(checkbox)
for checkbox in sorted(checkboxes, key=lambda c:c.text()):
self._lines.addWidget(checkbox)
self._lines_scroll.setFixedWidth(
self._lines_widget.sizeHint().width())
def main():
parser = argparse.ArgumentParser(description="Memory Pool System Monitor.")
parser.add_argument(
'telemetry', metavar='FILENAME', nargs='?', type=str,
default=os.environ.get('MPS_TELEMETRY_FILENAME', 'mpsio.log'),
help="telemetry output from the MPS instance")
args = parser.parse_args()
with open(args.telemetry, 'rb') as telemetry_file:
event_queue = queue.Queue()
model = Model(event_queue)
decoder = telemetry_decoder(telemetry_file.read)
for batch in decoder(1):
event_queue.put(batch)
model.update()
stop = threading.Event()
def decoder_thread():
while not stop.isSet():
for batch in decoder():
if stop.isSet():
break
event_queue.put(batch)
thread = threading.Thread(target=decoder_thread)
thread.start()
qapp = QtWidgets.QApplication([])
app = ApplicationWindow(model, args.telemetry)
app.show()
result = qapp.exec_()
stop.set()
thread.join()
return result
if __name__ == '__main__':
exit(main())
# C. COPYRIGHT AND LICENCE
#
# Copyright (c) 2018 Ravenbrook Ltd. All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# 1. Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
#
# 2. Redistributions in binary form must reproduce the above copyright
# notice, this list of conditions and the following disclaimer in the
# documentation and/or other materials provided with the
# distribution.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
# EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
# PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
# PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
# LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
# NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
# SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#
#
# $Id$