1
Fork 0
mirror of git://git.sv.gnu.org/emacs.git synced 2026-01-16 08:10:43 -08:00
emacs/mps/tool/monitor
Gareth Rees 25ee1ea3ae Linearly interpolate clocks for each batch once.
Copied from Perforce
 Change: 194150
2018-06-25 15:43:49 +01:00

460 lines
15 KiB
Python
Executable file

#!/usr/bin/env python
#
# $Id$
# Copyright (c) 2018 Ravenbrook Limited. See end of file for license.
#
# This program reads a telemetry stream from a program using the MPS,
# and constructs a model of the MPS data structures in the progam.
import argparse
from collections import namedtuple
from itertools import cycle
import os
from struct import Struct
import sys
import time
from matplotlib.backends.qt_compat import QtCore, QtGui, QtWidgets
from matplotlib.backends.backend_qt5agg import (
FigureCanvas, NavigationToolbar2QT as NavigationToolbar)
from matplotlib.figure import Figure
import numpy as np
import mpsevent
# Mapping from event code to a namedtuple for that event.
EVENT_NAMEDTUPLE = {
code: namedtuple(desc.name, ['header'] + [p.name for p in desc.params])
for code, desc in mpsevent.EVENT.items()
}
# Mapping from event code to event name.
EVENT_NAME = {code:desc.name for code, desc in mpsevent.EVENT.items()}
def batch_decoder(read):
"""Decode the events in an I/O stream and generate batches as lists of
tuples in eventclock order.
The argument must be a function implementing the io.RawIOBase.read
specification (that is, it takes a size and returns up to size
bytes from the I/O stream).
"""
# Cache frequently-used values in local variables.
header_desc = mpsevent.HeaderDesc
header_size = mpsevent.HEADER_SIZE
event_dict = mpsevent.EVENT
event_namedtuple = EVENT_NAMEDTUPLE
EventClockSync_code = mpsevent.Event.EventClockSync.code
# Special handling for Intern.
Intern_desc = mpsevent.Event.Intern
Intern_code = Intern_desc.code
Intern_struct = Struct(Intern_desc.format)
Intern_size = Intern_struct.size
Intern_unpack = Intern_struct.unpack
Intern_namedtuple = event_namedtuple[Intern_code]
# Build unpacker functions for each type of event.
header_struct = Struct(mpsevent.HEADER_FORMAT)
assert header_struct.size == header_size
header_unpack = header_struct.unpack
event_unpack = {}
for code, desc in event_dict.items():
assert code == desc.code
s = Struct(desc.format)
assert code == Intern_code or s.size == desc.maxsize
event_unpack[code] = s.unpack
def key(event):
return event.header.clock
batch = []
def decoder():
# Events are output in batches terminated by an EventClockSync
# event. When we see this, we know that we've received all
# events up to that one and can sort the batch.
while True:
header_data = read(header_size)
if not header_data:
break
header = header_desc(*header_unpack(header_data))
code = header.code
event_desc = event_dict[code]
size = header.size - header_size
if code == Intern_code:
assert size <= event_desc.maxsize
event = Intern_namedtuple(
header,
*Intern_unpack(read(Intern_size)),
read(size - Intern_size).rstrip(b'\0'))
else:
assert size == event_desc.maxsize
event = event_namedtuple[code](
header, *event_unpack[code](read(size)))
batch.append(event)
if event.header.code == EventClockSync_code:
batch.sort(key=key)
yield batch
batch.clear()
return decoder
class TimeSeries:
"Series of data points in time order."
def __init__(self):
self.t = []
self.y = []
def append(self, t, y):
"Append data y at time t."
assert not self.t or t >= self.t[-1]
self.t.append(t)
self.y.append(y)
class Accumulator(TimeSeries):
"Time series that is always non-negative and updates by accumulation."
def __init__(self, initial=0):
super().__init__()
self.value = initial
def add(self, t, delta):
"Add delta to the accumulator at time t."
self.append(t, self.value)
self.value += delta
self.append(t, self.value)
def sub(self, t, delta):
"Subtract delta from the accumulator at time t."
assert self.value >= delta
self.append(t, self.value)
self.value -= delta
self.append(t, self.value)
class EventHandler:
"""Object that handles a telemetry event by dispatching to the method
with the same name as the event.
"""
def ignore(self, t, event):
"Handle a telemetry event by doing nothing."
def handle(self, t, event):
"Handle a telemetry event by dispatching."
getattr(self, EVENT_NAME[event.header.code], self.ignore)(t, event)
class Pool(EventHandler):
"Model of an MPS pool."
def __init__(self, arena, pointer):
self.arena = arena
self.model = arena.model
self.pointer = pointer
self.pool_class = None
self.serial = None
self.alloc = Accumulator()
self.model.add_time_series(self, "alloc", self.alloc)
@property
def name(self):
name = self.model.label(self.pointer)
if not name:
class_name = self.model.label(self.pool_class) or 'Pool'
if self.serial is not None:
name = f"{class_name}[{self.serial}]"
else:
name = f"{class_name}[{self.pointer:x}]"
return f"{self.arena.name}.{name}"
def ArenaAlloc(self, t, event):
self.alloc.add(t, event.size)
def ArenaFree(self, t, event):
self.alloc.sub(t, event.size)
def PoolInit(self, t, event):
self.pool_class = event.poolClass
self.serial = event.serial
class Arena(EventHandler):
"Model of an MPS arena."
def __init__(self, model, pointer):
self.model = model
self.pointer = pointer
self.arena_class = None
self.serial = None
self.pools = []
self.pool = {} # pointer -> Pool
@property
def name(self):
if len(self.model.arenas) <= 1:
# No need to distinguish arenas if there's just one.
return ""
name = self.model.label(self.pointer)
if not name:
class_name = self.model.label(self.arena_class) or 'Arena'
if self.serial is not None:
name = f"{class_name}[{self.serial}]"
else:
name = f"{class_name}[{self.pointer:x}]"
return name
def delegate_to_pool(self, t, event):
"Handle a telemetry event by delegating to the pool model."
pointer = event.pool
try:
pool = self.pool[pointer]
except KeyError:
self.pool[pointer] = pool = Pool(self, pointer)
self.pools.append(pool)
pool.handle(t, event)
ArenaAlloc = ArenaFree = PoolInit = delegate_to_pool
def ArenaCreateVM(self, t, event):
self.arena_class = event.arenaClass
self.serial = event.serial
ArenaCreateCL = ArenaCreateVM
def PoolFinish(self, t, event):
del self.pool[event.pool]
class Line:
colors = cycle('blue orange green red purple brown pink gray olive cyan'
.split())
"A line in a Matplotlib plot wrapping a TimeSeries."
def __init__(self, owner, desc, series):
self.owner = owner
self.desc = desc
self.series = series
self.color = next(self.colors)
self.draw = True
@property
def label(self):
return f"{self.owner.name}.{self.desc}"
@property
def ready(self):
return len(self.series.t) >= 2
def plot(self, axes):
"Plot line on axes."
if self.ready and self.draw:
x = self.series.t
y = self.series.y
axes.plot(x, y, color=self.color, label=self.label)
class Model(EventHandler):
"Model of an application using the MPS."
def __init__(self, batches):
self._batches = batches
self._intern = {} # stringId -> string
self._label = {} # address or pointer -> stringId
self.arena = {} # pointer -> Arena
self.arenas = []
self.lines = [] # list(Line)
self.clocks_per_second = 1000000.0
self.sync = TimeSeries()
self._needs_update = False
def add_time_series(self, owner, desc, series):
"Add a time series to the model."
self.lines.append(Line(owner, desc, series))
def label(self, pointer):
"Return string labelling address or pointer, or None if unlabelled."
return self._intern.get(self._label.get(pointer))
@property
def eventclocks_to_seconds(self):
"Return function converting eventclocks to seconds."
def plot(self, axes):
"Draw time series on the given axes."
axes.clear()
axes.set_xlabel("time (seconds)")
axes.set_ylabel("bytes")
for line in self.lines:
line.plot(axes)
axes.figure.canvas.draw()
def update(self, axes):
EventClockSync_code = mpsevent.Event.EventClockSync.code
try:
for batch in self._batches():
sync = batch[-1]
assert sync.header.code == EventClockSync_code
self.sync.append(sync.header.clock, sync.clock)
eventclocks = self.sync.t[-2:]
clocks = self.sync.y[-2:]
cps = self.clocks_per_second
# The cycle counter frequency can vary due to thermal
# throttling, turbo boost etc., so use piecewise linear
# interpolation to convert to clocks and thence to seconds.
seconds = lambda e: np.interp(e, eventclocks, clocks) / cps
for event in batch:
self.handle(seconds(event.header.clock), event)
if len(self.sync.t) >= 2:
self.needs_update()
except BlockingIOError:
pass
if self._needs_update:
self._needs_update = False
self.plot(axes)
def needs_update(self, *args):
self._needs_update = True
def delegate_to_arena(self, t, event):
"Handle a telemetry event by delegating to the arena model."
addr = event.arena
try:
arena = self.arena[addr]
except KeyError:
self.arena[addr] = arena = Arena(self, addr)
self.arenas.append(arena)
arena.handle(t, event)
ArenaCreateVM = ArenaCreateCL = ArenaAlloc = ArenaFree = PoolInit = \
PoolFinish = delegate_to_arena
def EventInit(self, t, event):
stream_version = event.major, event.median
monitor_version = mpsevent.__version__[:2]
if stream_version != monitor_version:
raise RuntimeError("Monitor version {} is incompatible with "
"telemetry stream version {}.".format(
'.'.join(map(str, monitor_version)),
'.'.join(map(str, stream_version))))
self.clocks_per_second = event.clocksPerSec
def Intern(self, t, event):
self._intern[event.stringId] = event.string.decode('ascii', 'replace')
def Label(self, t, event):
self._label[event.address] = event.stringId
def LabelPointer(self, t, event):
self._label[event.pointer] = event.stringId
def ArenaDestroy(self, t, event):
del self.arena[event.arena]
class ApplicationWindow(QtWidgets.QMainWindow):
def __init__(self, model, title):
super().__init__()
self._model = model
self._main = QtWidgets.QWidget()
self.setWindowTitle(title)
self.setCentralWidget(self._main)
shortcut = QtWidgets.QShortcut(QtGui.QKeySequence("Ctrl+W"), self)
shortcut.activated.connect(self.close)
layout = QtWidgets.QHBoxLayout(self._main)
self._line_checkbox = {}
left_panel = QtWidgets.QVBoxLayout()
layout.addLayout(left_panel)
self._lines = QtWidgets.QVBoxLayout()
left_panel.addLayout(self._lines)
left_panel.addStretch(1)
canvas = FigureCanvas(Figure(figsize=(10, 6)))
layout.addWidget(canvas)
self.addToolBar(QtCore.Qt.BottomToolBarArea,
NavigationToolbar(canvas, self))
self._axes = canvas.figure.subplots()
self._update()
self._timer = canvas.new_timer(100, [(self._update, (), {})])
self._timer.start()
def _update(self):
self._model.update(self._axes)
for line in self._model.lines:
if line.ready:
label = line.label
if line in self._line_checkbox:
self._line_checkbox[line].setText(label)
else:
checkbox = QtWidgets.QCheckBox(label)
self._line_checkbox[line] = checkbox
checkbox.setChecked(True)
self._lines.addWidget(checkbox)
def state_changed(state, line=line):
line.draw = bool(state)
self._model.needs_update()
checkbox.stateChanged.connect(state_changed)
checkbox.setStyleSheet(f"color:{line.color}")
def main():
parser = argparse.ArgumentParser(description="Memory Pool System Monitor.")
parser.add_argument(
'telemetry', metavar='FILENAME', nargs='?', type=str,
default=os.environ.get('MPS_TELEMETRY_FILENAME', 'mpsio.log'),
help="telemetry output from the MPS instance")
args = parser.parse_args()
# TODO: O_NONBLOCK does not work on Windows.
fd = os.open(args.telemetry, os.O_NONBLOCK, os.O_RDONLY)
read = os.fdopen(fd, 'rb').read
model = Model(batch_decoder(read))
qapp = QtWidgets.QApplication([])
app = ApplicationWindow(model, args.telemetry)
app.show()
return qapp.exec_()
if __name__ == '__main__':
exit(main())
# C. COPYRIGHT AND LICENCE
#
# Copyright (c) 2018 Ravenbrook Ltd. All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# 1. Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
#
# 2. Redistributions in binary form must reproduce the above copyright
# notice, this list of conditions and the following disclaimer in the
# documentation and/or other materials provided with the
# distribution.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
# EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
# PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
# PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
# LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
# NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
# SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#
#
# $Id$