import os
import sys
import time
import copy
import json
import yaml
import uuid
import pyperf
import subprocess
import warnings
import tempfile
import itertools
import contextlib
import numpy as np
import pandas as pd
import logging
import pickle
from yggdrasil.components import import_component
from yggdrasil import tools, runner, examples, platform, config
from yggdrasil import platform as ygg_platform
from yggdrasil.multitasking import wait_on_function
from yggdrasil.drivers import MatlabModelDriver
try:
import matplotlib as mpl
if os.environ.get('DISPLAY', '') == '': # pragma: debug
mpl.use('Agg')
elif ygg_platform._is_mac: # pragma: travis
mpl.use('TkAgg')
import matplotlib.pyplot as plt # noqa: E402
mpl.rc('font', size=18)
except ImportError: # pragma: debug
warnings.warn("Plotting disabled")
mpl = None
plt = None
logger = logging.getLogger(__name__)
_linewidth = 2
_legend_fontsize = 14
_pyperf_warmups = 0
_python_version = '%d.%d' % (sys.version_info[0], sys.version_info[1])
# TODO:
# - Use pandas with Seaborn for plotting?
# - Converting to using sparse benchmark data
# - Create separate classes for saving/loading benchmarks and running tests
# - Add functions for overwriting specific entries
[docs]def get_lang_list():
r"""Get the list of testable languages on the current platform.
Returns:
list: Names of testable languages using timed_pipe.
"""
_all_lang = tools.get_installed_lang()
_lang_list = []
for k in _all_lang:
drv = import_component('model', k)
if drv.full_language:
_lang_list.append(k)
if (len(_lang_list) == 0): # pragma: debug
raise Exception(("Timings cannot be performed if there is not at least "
"one valid language. len(valid_languages) = %d")
% len(_lang_list))
return _lang_list
[docs]def get_comm_list():
r"""Get the list of testable communication methods on the current platform.
Returns:
list: Names of testable communication methods using timed_pipe.
"""
_comm_list = tools.get_installed_comm(language=get_lang_list(),
dont_include_value=True)
if (len(_comm_list) == 0): # pragma: debug
raise Exception(("Timings cannot be performed if there is not at least "
"one valid communication mechanism. len(valid_comms) "
"= %d") % len(_comm_list))
return _comm_list
[docs]def write_pyperf_script(script_file, nmsg, msg_size,
lang_src, lang_dst, comm_type,
nrep=10, max_errors=5, matlab_running=False):
r"""Write a script to run pyperf.
Args:
script_file (str): Full path to the file where the script should be
saved.
nmsg (int): The number of messages that should be sent during the run.
msg_size (int): The size (in bytes) of the test messages that should be
sent during the run.
lang_src (str): The language that the source program should be in.
lang_dst (str): The language that the destination program should be in.
comm_type (str): The type of communication channel that should be used
for the test.
nrep (int, optional): The number of times the test run should be
repeated. Defaults to 3.
max_errors (int, optional): Maximum number of errors that should be
retried. Defaults to 5.
matlab_running (bool, optional): If True, the test will assert that
there is an existing Matlab engine before starting, otherwise the
test will assert that there is not an existing Matlab engine.
Defaults to False.
"""
lines = [
'import pyperf',
'import os',
'from yggdrasil import timing',
'nrep = %d' % nrep,
'nmsg = %d' % nmsg,
'msg_size = %d' % msg_size,
'max_errors = %d' % max_errors,
'lang_src = "%s"' % lang_src,
'lang_dst = "%s"' % lang_dst,
'comm_type = "%s"' % comm_type,
'matlab_running = %s' % str(matlab_running)]
# if os.environ.get('TMPDIR', ''):
# tmpdir = os.environ['TMPDIR']
# if platform._is_win: # pragma: windows
# tmpdir = repr(tmpdir)
# assert tmpdir.startswith('\'') and tmpdir.endswith('\'')
# tmpdir = tmpdir[1:-1]
# # tmpdir = tmpdir.replace('\\', '\\\\')
# lines += ['os.environ["TMPDIR"] = "%s"' % tmpdir]
lines += [
'timer = timing.TimedRun(lang_src, lang_dst,'
' comm_type=comm_type,'
' matlab_running=matlab_running)',
'runner = pyperf.Runner(values=1, processes=nrep)',
'out = runner.bench_time_func(timer.entry_name(nmsg, msg_size),',
' timing.pyperf_func,',
' timer, nmsg, msg_size, max_errors)']
assert not os.path.isfile(script_file)
with open(script_file, 'w') as fd:
fd.write('\n'.join(lines))
[docs]def pyperf_func(loops, timer, nmsg, msg_size, max_errors):
r"""Function to do pyperf loops over function.
Args:
loops (int): Number of loops to perform.
timer (TimedRun): Class with information about the run and methods
required for setup/teardown.
nmsg (int): Number of messages that should be sent in the test.
msg_size (int): Size of messages that should be sent in the test.
max_errors (int): Maximum number of errors that should be retried.
Returns:
float: Time (in seconds) required to perform the test the required
number of times.
"""
ttot = 0
range_it = range(loops)
for i in range_it:
with change_default_comm(timer.comm_type):
run_uuid = timer.before_run(nmsg, msg_size)
flag = False
nerrors = 0
while not flag:
try:
t0 = pyperf.perf_counter()
timer.run(run_uuid, timer=pyperf.perf_counter)
t1 = pyperf.perf_counter()
tdif = t1 - t0
timer.after_run(run_uuid, tdif)
ttot += tdif
flag = True
except AssertionError as e: # pragma: debug
nerrors += 1
if nerrors >= max_errors:
raise
else:
warnings.warn(
f"Error {nerrors}/{max_errors}. Trying again. "
f"(error = '{e}')", RuntimeWarning)
return ttot
[docs]def get_source(lang, direction, test_name='timed_pipe'):
r"""Get the path to the source file.
Args:
lang (str): Language that should be returned.
direction (str): 'src' or 'dst'.
test_name (str, optional): Name of the example. Defaults to 'timed_pipe'.
Returns:
str: Full path to the source file.
"""
dir = os.path.join(examples._example_dir, test_name, 'src')
out = os.path.join(dir, '%s_%s%s' % (test_name, direction,
examples.ext_map[lang.lower()]))
return out
[docs]@contextlib.contextmanager
def change_default_comm(default_comm):
from yggdrasil.communication.DefaultComm import DefaultComm
old_default_comm = os.environ.pop('YGG_DEFAULT_COMM', None)
if default_comm is not None:
os.environ['YGG_DEFAULT_COMM'] = default_comm
DefaultComm._reset_alias()
yield
del os.environ['YGG_DEFAULT_COMM']
if old_default_comm is not None: # pragma: debug
os.environ['YGG_DEFAULT_COMM'] = old_default_comm
DefaultComm._reset_alias()
[docs]@contextlib.contextmanager
def debug_log(): # pragma: debug
r"""Set the log level to debug."""
from yggdrasil.config import ygg_cfg, cfg_logging
loglevel = ygg_cfg.get('debug', 'ygg')
ygg_cfg.set('debug', 'ygg', 'DEBUG')
cfg_logging()
yield
if loglevel is not None:
ygg_cfg.set('debug', 'ygg', loglevel)
cfg_logging()
[docs]class TimedRun(tools.YggClass):
r"""Class to time sending messages from one language to another.
Args:
lang_src (str): Language that messages should be sent from.
lang_dst (str): Language that messages should be sent to.
test_name (str, optional): Name of the example. Defaults to 'timed_pipe'.
filename (str, optional): Full path to the file where timing statistics
will be saved. This can be a pyperf json or a Python pickle of run data.
Defaults to 'scalings_{test_name}_{comm_type}.json' if dont_use_pyperf is
False, otherwise the extension is '.dat'.
comm_type (str, optional): Name of communication class that should be
used for tests. Defaults to the current default comm class.
platform (str, optional): Platform that the test should be run on. If the
data doesn't already exist and this doesn't match the current
platform, an error will be raised. Defaults to the current platform.
python_ver (str, optional): Version of Python that the test should be run
with. If the data doesn't already exist and this doesn't match the
current version of python, an error will be raised. Defaults to the
current version of python.
max_errors (int, optional): Maximum number of errors that should be
retried. Defaults to 5.
matlab_running (bool, optional): If True, the test will assert that
there is an existing Matlab engine before starting, otherwise the
test will assert that there is not an existing Matlab engine.
Defaults to False.
dont_use_pyperf (bool, optional): If True, the timings will be run without
using the pyperf package. Defaults to False.
Attributes:
lang_src (str): Language that messages should be sent from.
lang_dst (str): Language that messages should be sent to.
platform (str): Platform that the test is being run on.
python_ver (str): Version of Python that the test should be run with.
filename (str): Full path to the file where timing statistics will be
saved. This can be a pyperf json or a Python pickle of run data.
comm_type (str): Name of communication class that should be used for
tests.
max_errors (int): Maximum number of errors that should be retried.
matlab_running (bool): True if there was a Matlab engine running when
the test was created. False otherwise.
dont_use_pyperf (bool): If True, the timings will be run without using the
pyperf package.
"""
def __init__(self, lang_src, lang_dst, test_name='timed_pipe', filename=None,
comm_type=None, platform=None, python_ver=None, max_errors=5,
matlab_running=False, dont_use_pyperf=False, **kwargs):
if comm_type is None:
comm_type = tools.get_default_comm()
if platform is None:
platform = ygg_platform._platform
if python_ver is None:
python_ver = _python_version
suffix = '%s_%s_py%s' % (test_name, platform, python_ver.replace('.', ''))
self.dont_use_pyperf = dont_use_pyperf
if filename is None:
if self.dont_use_pyperf:
filename = os.path.join(os.getcwd(), 'scaling_%s.dat' % suffix)
else:
filename = os.path.join(os.getcwd(), 'scaling_%s.json' % suffix)
self.matlab_running = matlab_running
self.filename = filename
self.comm_type = comm_type
self.platform = platform
self.python_ver = python_ver
self.max_errors = max_errors
self.program_name = test_name
self._lang_list = get_lang_list()
self._comm_list = get_comm_list()
name = '%s_%s_%s' % (test_name, lang_src, lang_dst)
super(TimedRun, self).__init__(name, **kwargs)
self.lang_src = lang_src
self.lang_dst = lang_dst
self._data = None
self.reload()
self.fyaml = dict()
self.foutput = dict()
self.entries = dict()
@property
def data(self):
r"""dict or pyperf.BenchmarkSuite: Timing statistics data."""
return self._data
[docs] def can_run(self, raise_error=False):
r"""Determine if the test can be run from the current platform and
python version.
Args:
raise_error (bool, optional): If True, an error will be raised if
the test cannot be completed from the current platform. Defaults
to False.
Returns:
bool: True if the test can be run, False otherwise.
"""
out = ((self.platform.lower() == ygg_platform._platform.lower())
and (self.python_ver == _python_version)
and (self.matlab_running == MatlabModelDriver.is_matlab_running())
and (self.lang_src in self._lang_list)
and (self.lang_dst in self._lang_list)
and (self.comm_type in self._comm_list))
if (not out) and raise_error:
from yggdrasil.command_line import ygginfo
msg_info = ygginfo(args=['--verbose'], return_str=True)
msg = ['Cannot run test with parameters:',
'\tOperating System: %s' % self.platform,
'\tPython Version: %s' % self.python_ver,
'\tMatlab Running: %s' % self.matlab_running,
'\tSource Language: %s' % self.lang_src,
'\tDestination Language: %s' % self.lang_dst,
'\tCommunication Method: %s' % self.comm_type,
'Because one or more platform properties are incompatible:',
'\tOperating System: %s' % ygg_platform._platform,
'\tPython Version: %s' % _python_version,
'\tMatlab Running: %s' % MatlabModelDriver.is_matlab_running(),
'\tSupported Languages: %s' % ', '.join(self._lang_list),
'\tSupported Communication: %s' % ', '.join(self._comm_list),
'\n\n' + msg_info]
raise RuntimeError('\n'.join(msg))
return out
[docs] def entry_name(self, nmsg, msg_size):
r"""Get a unique identifier for a run.
Args:
nmsg (int): Number of messages that should be sent.
msg_size (int): Size of each message that should be sent.
"""
# TODO: The translation to using the form XXXComm should be abandoned
# if/when the timing statistics are recomputed (only used here to make
# use of the existing data).
out = '%s(%s,%s,%s,%s,%s,%d,%d)' % (self.program_name,
self.platform, self.python_ver,
self.comm_type.upper() + 'Comm',
self.lang_src, self.lang_dst,
nmsg, msg_size)
if ((self.matlab_running
and ('matlab' in [self.lang_src, self.lang_dst]))): # pragma: matlab
out += '-MLStarted'
return out
@property
def max_msg_size(self):
r"""int: Largest size of message that can be sent without being split."""
return tools.get_YGG_MSG_MAX(comm_type=self.comm_type)
@property
def default_msg_size(self):
r"""list: Default message sizes for scaling tests. This will vary
depending on the comm type so that the maximum size is not more than 10x
larger than the maximum message size."""
if self.comm_type.lower().startswith('ipc'):
msg_size = [1, 1e2, 1e3, 1e4, 5e4, 1e5]
else:
msg_size = [1, 1e2, 1e3, 1e4, 1e5, 1e6, 5e6, 1e7]
return msg_size
@property
def default_msg_count(self):
r"""list: Default message count for scaling tests."""
return [5, 10, 20, 30, 40, 50, 60, 70, 80, 90, 100]
@property
def base_msg_size(self):
r"""int: Message size to use for tests varrying message count."""
return 1000
@property
def base_msg_count(self):
r"""int: Message count to use for tests varrying message size."""
return 5
@property
def time_per_byte(self):
return self.fit_scaling_size()[0]
@property
def time_per_message(self):
r"""float: Time required to send a single message of 1000 bytes."""
return self.fit_scaling_count()[0]
@property
def startup_time(self):
r"""float: Time required to set up communications and start models."""
return self.fit_scaling_count()[1]
# @property
# def description_prefix(self):
# r"""Prefix message with test name."""
# return self.name
@property
def tempdir(self):
r"""str: Temporary directory."""
return tempfile.gettempdir()
@property
def output_file_format(self):
r"""str: Full path to the output file created by the run."""
return os.path.join(self.tempdir, 'output_%s.txt')
[docs] def get_new_uuid(self):
r"""Get a new unique ID.
Returns:
str: Unique identifier.
"""
return str(uuid.uuid4())
[docs] def output_content(self, nmsg, msg_size):
r"""Get the result that should be output to file during the run.
Args:
nmsg: The number of messages that will be sent.
msg_sizze: The size of the the messages that will be sent.
Returns:
str: The contents expected in the file.
"""
siz = nmsg * msg_size
return '0' * siz
[docs] def check_output(self, fout, nmsg, msg_size):
r"""Assert that the output file contains the expected result.
Args:
fout (str): The file that should be checked.
nmsg (int): The number of messages that will be sent.
msg_size (int): The size of the the messages that will be sent.
"""
fres = self.output_content(nmsg, msg_size)
wait_on_function(lambda: os.path.isfile(fout), timeout=self.timeout,
on_timeout=f"File '{fout}' does not exist")
def on_timeout(): # pragma: debug
if (fres is not None) and (len(fres) < 200):
print(f"Expected:\n{fres}\n"
f"Actual:\n{open(fout, 'r').read()}")
raise AssertionError(f"File size ({os.stat(fout).st_size}), "
f"dosn't match expected size ({len(fres)}).")
wait_on_function(lambda: os.stat(fout).st_size == len(fres),
timeout=self.timeout, on_timeout=on_timeout)
ocont = open(fout, 'r').read()
assert ocont == fres
[docs] def cleanup_output(self, fout):
r"""Cleanup the output file.
Args:
fout (str): The file to be cleaned up.
"""
if os.path.isfile(fout):
os.remove(fout)
@property
def source_src(self):
r"""str: Source file for language messages will be sent from."""
return get_source(self.lang_src, 'src', test_name=self.program_name)
@property
def source_dst(self):
r"""str: Source file for language messages will be sent to."""
return get_source(self.lang_dst, 'dst', test_name=self.program_name)
@property
def yamlfile_format(self):
r"""str: Format string for creating a yaml file."""
path = os.path.join(self.tempdir, '%s.yml')
return path
@property
def pyperfscript(self):
r"""str: Format string for creating a pyperf script."""
return os.path.join(self.tempdir, 'runperf.py')
[docs] def make_yamlfile(self, path):
r"""Create a YAML file for running the test.
Args:
path (str): Full path to file where the YAML should be saved.
"""
out = {'models': [self.get_yaml_src(self.lang_src),
self.get_yaml_dst(self.lang_dst)],
'connections': [{'input': 'output_pipe',
'output': 'input_pipe'}]}
lines = yaml.dump(out, default_flow_style=False)
with open(path, 'w') as fd:
fd.write(lines)
[docs] def get_yaml_src(self, lang):
r"""Get the yaml entry for the source model.
Args:
lang (str): Language for the source model.
"""
out = {'name': 'timed_pipe_src',
'language': lang,
'args': [os.path.join('.', self.source_src),
"{{PIPE_MSG_COUNT}}", "{{PIPE_MSG_SIZE}}"],
'outputs': {'name': 'output_pipe'}}
return out
[docs] def get_yaml_dst(self, lang):
r"""Get the yaml entry for the destination model.
Args:
lang (str): Language for the destination model.
"""
out = {'name': 'timed_pipe_dst',
'language': lang,
'args': os.path.join('.', self.source_dst),
'inputs': {'name': 'input_pipe'},
'outputs': {'name': 'output_file',
'default_file': {
'name': "{{PIPE_OUT_FILE}}",
'filetype': 'ascii',
'in_temp': True}}}
return out
[docs] def before_run(self, nmsg, msg_size):
r"""Actions that should be performed before a run.
Args:
nmsg (int): Number of messages that should be sent.
msg_size (int): Size of each message that should be sent.
Returns:
str: Unique identifier for the run.
"""
assert self.matlab_running == MatlabModelDriver.is_matlab_running()
nmsg = int(nmsg)
msg_size = int(msg_size)
run_uuid = self.get_new_uuid()
self.entries[run_uuid] = (nmsg, msg_size)
self.fyaml[run_uuid] = self.yamlfile_format % run_uuid
self.foutput[run_uuid] = self.output_file_format % run_uuid
if os.path.isfile(self.fyaml[run_uuid]): # pragma: debug
os.remove(self.fyaml[run_uuid])
self.make_yamlfile(self.fyaml[run_uuid])
env = {'PIPE_MSG_COUNT': str(nmsg),
'PIPE_MSG_SIZE': str(msg_size),
'PIPE_OUT_FILE': self.foutput[run_uuid]}
os.environ.update(env)
self.cleanup_output(self.foutput[run_uuid])
self.info("Starting %s...", self.entry_name(nmsg, msg_size))
return run_uuid
[docs] def after_run(self, run_uuid, result):
r"""Actions that should be performed after a run.
Args:
nmsg (int): Number of messages that were sent.
msg_size (int): Size of each message that were sent.
result (float): Time required (in seconds) to execute the program.
"""
nmsg, msg_size = self.entries[run_uuid]
fout = self.foutput[run_uuid]
self.info("Finished %s: %f s", self.entry_name(nmsg, msg_size), result)
self.check_output(fout, nmsg, msg_size)
self.cleanup_output(fout)
assert self.matlab_running == MatlabModelDriver.is_matlab_running()
del self.entries[run_uuid], self.fyaml[run_uuid], self.foutput[run_uuid]
[docs] def run(self, run_uuid, timer=time.time, t0=None):
r"""Run test sending a set of messages between the designated models.
Args:
run_uuid (str): Unique ID for the run.
timer (function, optional): Function that should be called to get
intermediate timing statistics. Defaults to time.time if not
provided.
t0 (float, optional): Zero point for timing statistics. Is set
using the provided timer if not provided.
Returns:
dict: Intermediate times from the run.
"""
if t0 is None:
t0 = timer()
r = runner.get_runner(self.fyaml[run_uuid],
namespace=self.name + run_uuid)
times = r.run(timer=timer, t0=t0)
assert not r.error_flag
del r
return times
[docs] def time_run(self, nmsg, msg_size, nrep=10, overwrite=False):
r"""Time sending a set of messages between the designated models.
Args:
nmsg (int): Number of messages that should be sent.
msg_size (int): Size of each message that should be sent.
nrep (int, optional): Number of times the test should be repeated
to get an average execution time and standard deviation.
Defaults to 10.
overwrite (bool, optional): If True, any existing entry for this
run will be overwritten. Defaults to False.
Returns:
tuple: Best of, average and standard deviation in the time (in seconds)
required to execute the program.
"""
entry_name = self.entry_name(nmsg, msg_size)
if overwrite:
self.remove_entry(entry_name)
reps = self.get_entry(entry_name)
nrep_remain = nrep - len(reps)
# Only run if there are not enough existing reps to get stat
if nrep_remain > 0:
self.can_run(raise_error=True)
if self.dont_use_pyperf:
self.time_run_mine(nmsg, msg_size, nrep_remain)
else:
self.time_run_pyperf(nmsg, msg_size, nrep_remain)
# Reload after new runs have been added
self.reload()
reps = self.get_entry(entry_name)
# Calculate variables
if len(reps) < 2:
ret = (np.min(reps), np.mean(reps), 0.0)
else:
ret = (np.min(reps), np.mean(reps), np.std(reps))
# self.info(reps)
# self.info(ret)
self.info("Result for %s: %f +/- %f (%d runs)", entry_name,
ret[1], ret[2], len(reps))
return ret
[docs] def time_run_pyperf(self, nmsg, msg_size, nrep):
r"""Time sending a set of messages between the designated models using
the pyperf package.
Args:
nmsg (int): Number of messages that should be sent.
msg_size (int): Size of each message that should be sent.
nrep (int): Number of times the test should be repeated. These reps
will be appended to the existing reps for this entry.
"""
write_pyperf_script(self.pyperfscript, nmsg, msg_size,
self.lang_src, self.lang_dst, self.comm_type,
nrep=nrep, matlab_running=self.matlab_running,
max_errors=self.max_errors)
copy_env = ['TMPDIR', 'CONDA_PREFIX', 'PATHEXT'
'CC', 'CFLAGS', 'CXX', 'CXXFLAGS', 'CPPFLAGS',
'F77', 'F90', 'F95', 'FC', 'FFLAGS', 'FORTRANFLAGS',
'GFORTRAN', 'LD', 'LDFLAGS', 'LD_LIBRARY_PATH']
copy_env += [v['env'] for v in config._cfg_map.values()]
if platform._is_mac:
copy_env += ['SDKROOT', 'MACOSX_DEPLOYMENT_TARGET',
'CONDA_BUILD_SYSROOT']
if platform._is_win: # pragma: windows
copy_env += ['HOME', 'USERPROFILE', 'HOMEDRIVE', 'HOMEPATH',
'NUMBER_OF_PROCESSORS',
'INCLUDE', 'LIB', 'LIBPATH',
'ChocolateyInstall', 'VCPKG_ROOT']
cmd = [sys.executable, self.pyperfscript, '--append=' + self.filename,
'--inherit-environ=' + ','.join(copy_env),
'--warmups=%d' % _pyperf_warmups]
subprocess.call(cmd)
assert os.path.isfile(self.filename)
os.remove(self.pyperfscript)
[docs] def time_run_mine(self, nmsg, msg_size, nrep):
r"""Time sending a set of messages between the designated models without
using the pyperf package.
Args:
nmsg (int): Number of messages that should be sent.
msg_size (int): Size of each message that should be sent.
nrep (int): Number of times the test should be repeated. These reps
will be appended to the existing reps for this entry.
"""
entry_name = self.entry_name(nmsg, msg_size)
old_reps = self.get_entry(entry_name)
new_reps = []
for i in range(nrep):
with change_default_comm(self.comm_type):
run_uuid = self.before_run(nmsg, msg_size)
t0 = time.time()
self.run(run_uuid, timer=time.time)
t1 = time.time()
new_reps.append(t1 - t0)
self.after_run(run_uuid, new_reps[-1])
if self.data is None:
self._data = dict()
self._data[entry_name] = tuple(list(old_reps) + list(new_reps))
self.save(self.data, overwrite=True)
[docs] @classmethod
def class_plot(cls, lang_src='python', lang_dst='python', **kwargs):
"""Create the class for a given combo of languages and comm types, then
call plot_scaling_joint.
Args:
lang_src (str, optional): Language that messages should be sent from.
Defaults to 'python'.
lang_dst (str, optional): Language that messages should be sent to.
Defaults to 'python'.
**kwargs: Additional keywords are passed to either the class
constructor or plot_scaling_joint as appropriate.
Returns:
tuple(matplotlib.Axes, matplotlib.Axes): Pair of axes containing the
plotted scalings and the fit.
"""
cls_kwargs_keys = ['test_name', 'filename', 'matlab_running',
'comm_type', 'platform', 'python_ver',
'dont_use_pyperf', 'max_errors']
cls_kwargs = {}
for k in cls_kwargs_keys:
if k in kwargs:
cls_kwargs[k] = kwargs.pop(k)
x = TimedRun(lang_src, lang_dst, **cls_kwargs)
axs, fit = x.plot_scaling_joint(**kwargs)
return axs, fit
[docs] def plot_scaling_joint(self, msg_size0=None, msg_count0=None,
msg_size=None, msg_count=None, axs=None, **kwargs):
r"""Plot scaling of run time with both count and size, side by side.
Anywhere data is exchanged as a tuple for each plot, the plot of
scaling with count is first and the scaling with size is second.
Args:
msg_size0 (int, optional): Size of messages to use for count scaling.
Defaults to self.base_msg_size.
msg_count0 (int, optional): Number of messages to use for size
scaling. Defaults to self.base_msg_count.
msg_size (list, np.ndarray, optional): List of message sizes to use
as x variable on the size scaling plot. Defaults to
self.default_msg_size.
msg_count (list, np.ndarray, optional)): List of message counts to
use as x variable on the count scaling plot. Defaults to
self.default_msg_count.
axs (tuple, optional): Pair of axes objects that lines should be
added to. If not provided, they are created.
**kwargs: Additional keyword arguments are passed to plot_scaling.
Returns:
tuple(matplotlib.Axes, matplotlib.Axes): Pair of axes containing the
plotted scalings and the fit.
"""
if msg_size0 is None:
msg_size0 = self.base_msg_size
if msg_count0 is None:
msg_count0 = self.base_msg_count
if msg_size is None:
msg_size = self.default_msg_size
if msg_count is None:
msg_count = self.default_msg_count
if axs is None and plt is not None:
figure_size = (15.0, 6.0)
figure_buff = 0.75
fig, axs = plt.subplots(1, 2, figsize=figure_size, sharey=True)
axs[0].set_xlabel('Message Count (size = %d)' % msg_size0)
axs[0].set_ylabel('Time (s)')
if kwargs.get('per_message', False):
axs[0].set_ylabel('Time per Message (s)')
axs[1].set_xlabel('Message Size (count = %d)' % msg_count0)
axs_wbuffer = figure_buff / figure_size[0]
axs_hbuffer = figure_buff / figure_size[1]
axs_width = (1.0 - (3.0 * axs_wbuffer)) / 2.0
axs_height = 1.0 - (2.0 * axs_hbuffer)
pos1 = [axs_wbuffer, axs_hbuffer, axs_width, axs_height]
pos2 = [2.0 * axs_wbuffer + axs_width, axs_hbuffer,
axs_width, axs_height]
axs[0].set_position(pos1)
axs[1].set_position(pos2)
if plt is not None:
self.plot_scaling(msg_size0, msg_count, axs=axs[0], **kwargs)
self.plot_scaling(msg_size, msg_count0, axs=axs[1], **kwargs)
# Get slopes
fit = self.fit_scaling_count(msg_size=msg_size0, counts=msg_count)
self.info('fit: slope = %f, intercept = %f', fit[0], fit[1])
# m, b = self.fit_scaling_size()
# xname = 'size'
# self.info('%s: slope = %f, intercept = %f', xname, m, b)
# Legend
if plt is not None:
axs[1].legend(loc='upper left', ncol=2,
fontsize=_legend_fontsize)
return axs, fit
[docs] def plot_scaling(self, msg_size, msg_count, axs=None, label=None,
xscale=None, yscale='linear', plot_kws={},
time_method='average', per_message=False, **kwargs):
r"""Plot scaling of run time with a variable.
Args:
msg_size (int, list, np.ndarray): List of message sizes to use as
x variable, or message size to use when plotting dependent on
message count.
msg_count (int, list, np.ndarray): List of message counts to use as
x variable, or message count to use when plotting dependent on
message size.
axs (matplotlib.Axes, optional): Axes object that line should be
added to. If not provided, one is created.
label (str, optional): Label that should be used for the line.
Defaults to None.
xscale (str, optional): 'log' or 'linear' to indicate what scale
the x axis should use. Defaults to 'linear'.
yscale (str, optional): 'log' or 'linear' to indicate what scale
the y axis should use. Defaults to 'linear'.
plot_kws (dict, optional): Ploting keywords that should be passed.
Defaults to {}.
time_method (str, optional): Timing method that should be used.
Valid values include 'bestof' and 'average'. Defaults to
'average'.
per_message (bool, optional): If True, the time per message is
returned rather than the total time. Defaults to False.
**kwargs: Additional keyword arguments are passed to scaling_size or
scaling_count.
Returns:
matplotlib.Axes: Axes containing the plotted scaling.
"""
if plt is None: # pragma: debug
return axs
if isinstance(msg_size, list):
msg_size = np.array(msg_size)
if isinstance(msg_count, list):
msg_count = np.array(msg_count)
# Get data
if isinstance(msg_size, np.ndarray) and isinstance(msg_count, np.ndarray):
raise RuntimeError("Arrays provided for both msg_size & msg_count.")
elif isinstance(msg_size, np.ndarray):
xname = 'size'
x, mbo, avg, std = self.scaling_size(msg_count, sizes=msg_size,
per_message=per_message, **kwargs)
elif isinstance(msg_count, np.ndarray):
xname = 'count'
x, mbo, avg, std = self.scaling_count(msg_size, counts=msg_count,
per_message=per_message, **kwargs)
else:
raise RuntimeError("Array not provided for msg_size or msg_count.")
# Parse input values
if xscale is None:
if xname == 'size':
xscale = 'log'
else:
xscale = 'linear'
if time_method == 'bestof':
y = mbo
yerr = None
elif time_method == 'average':
y = avg
yerr = std
else:
raise ValueError("Invalid time_method: '%s'" % time_method)
# Ensure everything in array format
if isinstance(x, list):
x = np.array(x)
if isinstance(y, list):
y = np.array(y)
if isinstance(yerr, list):
yerr = np.array(yerr)
# Create axes if not provded
if axs is None:
fig, axs = plt.subplots()
axs.set_xlabel(xname)
axs.set_ylabel('Time (s)')
if per_message:
axs.set_ylabel('Time per Message (s)')
# Set axes scales
if xscale == 'log':
axs.set_xscale('log')
if yscale == 'log':
axs.set_yscale('log')
# Plot
if yerr is not None:
# Convert yscale to prevent negative values for log y
if yscale == 'log':
ylower = y - yerr
ylower[ylower <= 0] = 1.0e-6
# ylower = np.maximum(1e-6, y - yerr)
yerr_lower = y - ylower
yerr_upper = yerr
else:
yerr_lower = yerr
yerr_upper = yerr
axs.plot(x, y, label=label, **plot_kws)
plot_kws_fill = copy.deepcopy(plot_kws)
plot_kws_fill['linewidth'] = 0
plot_kws_fill['alpha'] = 0.2
axs.fill_between(x, y - yerr_lower, y + yerr_upper, **plot_kws_fill)
# axs.errorbar(x, y, yerr=[yerr_lower, yerr_upper],
# label=label, **plot_kws)
else:
axs.plot(x, y, label=label, **plot_kws)
return axs
[docs] def fit_scaling_count(self, msg_size=None, counts=None, **kwargs):
r"""Do a linear fit to the scaling of execution time with message count.
Args:
msg_size (int, optional): Size of each message that should be sent.
Defaults to self.base_msg_size.
counts (list, optional): List of counts to test. Defaults to
self.default_msg_count if not provided.
**kwargs: Additional keyword arguments are passed to scaling_count.
Returns:
tuple: The slope and intercept of the linear fit.
"""
if msg_size is None:
msg_size = self.base_msg_size
if counts is None:
counts = self.default_msg_count
out = self.scaling_count(msg_size, counts=counts, **kwargs)
x = out[0]
y = out[2]
return np.polyfit(x, y, 1)
[docs] def fit_scaling_size(self, msg_count=None, sizes=None, **kwargs):
r"""Do a linear fit to the scaling of execution time with message count.
Args:
msg_count (int, optional): Number of messages that should be sent
for each size. Defaults to self.base_msg_count.
sizes (list, optional): List of sizes to test. Defaults to
self.default_msg_size if not provided.
**kwargs: Additional keyword arguments are passed to scaling_size.
Returns:
tuple: The slope and intercept of the linear fit.
"""
if msg_count is None:
msg_count = self.base_msg_count
if sizes is None:
sizes = self.default_msg_size[:-2]
max_size = self.max_msg_size
sizes_limit = []
for s in sizes:
if s < max_size:
sizes_limit.append(s)
out = self.scaling_size(msg_count, sizes=sizes_limit, **kwargs)
x = out[0]
y = out[2]
return np.polyfit(x, y, 1)
[docs] def scaling_count(self, msg_size, counts=None, min_count=1, max_count=100,
nsamples=10, scaling='linear', per_message=False, **kwargs):
r"""Get scaling of run time with message count.
Args:
msg_size (int): Size of each message that should be sent.
counts (list, optional): List of counts to test. Defaults to None
and a list is created based on the other keyword arguments.
min_count (int, optional): Minimum message count that should be timed.
Defaults to 1. This is ignored if 'counts' is provided.
max_count (int, optional): Maximum message count that should be timed.
Defaults to 100. This is ignored if 'counts' is provided.
nsamples (int, optional): Number of samples that should be done
between 'min_count' and 'max_count'. Defaults to 10. This is
ignored if 'counts' is provided.
scaling (str, optional): Scaling for sampling of message counts
between 'min_count' and 'max_count'. Defaults to 'linear'. This
is ignored if 'counts' is provided.
per_message (bool, optional): If True, the time per message is
returned rather than the total time. Defaults to False.
**kwargs: Additional keyword arguments are passed to time_run.
Returns:
tuple: Lists of counts timed, minimum execution time, average
execution times, and standard deviations.
"""
if counts is None:
if scaling == 'linear':
counts = np.linspace(min_count, max_count, nsamples,
dtype='int64')
elif scaling == 'log':
counts = np.logspace(np.log10(min_count), np.log10(max_count),
nsamples, dtype='int64')
else:
raise ValueError("Scaling must be 'linear' or 'log'.")
mbo = []
avg = []
std = []
for c in counts:
imin, iavg, istd = self.time_run(c, msg_size, **kwargs)
mbo.append(imin)
avg.append(iavg)
std.append(istd)
if per_message:
t0 = self.startup_time
for i, c in enumerate(counts):
mbo[i] = (mbo[i] - t0) / c
avg[i] = (avg[i] - t0) / c
return (list(counts), mbo, avg, std)
[docs] def scaling_size(self, nmsg, sizes=None, min_size=1, max_size=1e7,
nsamples=10, scaling='log', per_message=False, **kwargs):
r"""Get scaling of run time with message size.
Args:
nmsg (int): Number of messages that should be sent.
sizes (list, optional): List of sizes to test. Defaults to None
and a list is created based on the other keyword arguments.
min_size (int, optional): Minimum message size that should be timed.
Defaults to 1. This is ignored if 'sizes' is provided.
max_size (int, optional): Maximum message size that should be timed.
Defaults to 1e7. This is ignored if 'sizes' is provided.
nsamples (int, optional): Number of samples that should be done
between 'min_size' and 'max_size'. Defaults to 10. This is
ignored if 'sizes' is provided.
scaling (str, optional): Scaling for sampling of message sizes
between 'min_size' and 'max_size'. Defaults to 'linear'. This
is ignored if 'sizes' is provided.
per_message (bool, optional): If True, the time per message is
returned rather than the total time. Defaults to False.
**kwargs: Additional keyword arguments are passed to time_run.
Returns:
tuple: Lists of sizes timed, minimum execution times, average
execution times, and standard deviations.
"""
if sizes is None:
if scaling == 'linear':
sizes = np.linspace(min_size, max_size, nsamples,
dtype='int64')
elif scaling == 'log':
sizes = np.logspace(np.log10(min_size), np.log10(max_size),
nsamples, dtype='int64')
else:
raise ValueError("Scaling must be 'linear' or 'log'.")
mbo = []
avg = []
std = []
for s in sizes:
imin, iavg, istd = self.time_run(nmsg, s, **kwargs)
mbo.append(imin)
avg.append(iavg)
std.append(istd)
if per_message:
t0 = self.startup_time
for i, s in enumerate(sizes):
mbo[i] = (mbo[i] - t0) / nmsg
avg[i] = (avg[i] - t0) / nmsg
return (list(sizes), mbo, avg, std)
[docs] def get_entry(self, name):
r"""Get values for an entry.
Args:
name (str): Name of the entry to return.
"""
out = tuple()
if self.has_entry(name):
if self.dont_use_pyperf:
out = self.data[name]
else:
out = self.data.get_benchmark(name).get_values()
return out
[docs] def has_entry(self, name):
r"""Check to see if there is an entry with the provided name.
Args:
name (str): Name of the entry to check for.
"""
out = False
if self.data is not None:
if self.dont_use_pyperf:
out = (name in self.data)
else:
out = (name in self.data.get_benchmark_names())
return out
[docs] def remove_entry(self, name):
r"""Remove all runs associated with an entry.
Args:
name (str): Name of the entry to be removed.
"""
if not self.has_entry(name):
return
if self.dont_use_pyperf:
data_out = copy.deepcopy(self.data)
del data_out[name]
else:
data_out = copy.deepcopy(self.data)
data_bench = data_out.get_benchmark(name)
ibench = None
for i, this_bench in enumerate(data_out):
if this_bench == data_bench:
ibench = i
break
if ibench is None: # pragma: debug
raise Exception("Could not find run '%s'" % name)
del data_out._benchmarks[ibench]
if len(data_out) == 0:
data_out = None
if os.path.isfile(self.filename):
os.remove(self.filename)
# Save
self.save(data_out, overwrite=True)
# Reload
self.reload()
[docs] def reload(self):
r"""Reload scalings data and store it in the data attribute."""
self._data = self.load()
[docs] def load(self, as_json=False):
r"""Load scalings data from a pyperf BenchmarkSuite json file or a
Python pickle.
Args:
as_json (bool, optional): If True and self.dont_use_pyperf is False,
the pyperf BenchmarkSuite data will be loaded as a dictionary from
the json. Defaults to False.
Returns:
pyperf.BenchmarkSuite or dict: Loaded scalings data. None is returned
if the file does not exit.
"""
if not os.path.isfile(self.filename):
return None
if self.dont_use_pyperf:
with open(self.filename, 'rb') as fd:
out = pickle.load(fd, encoding='latin1')
else:
assert self.filename.endswith('.json')
if as_json:
with open(self.filename, 'r') as fd:
out = json.load(fd)
else:
out = pyperf.BenchmarkSuite.load(self.filename)
return out
[docs] def save(self, data, overwrite=False):
r"""Save scalings data to a new pyperf BenchmarkSuite json file or a
Python pickle. If the file exists and overwrite is not set, an error
will be raised. No file is written if data is None.
Args:
data (pyperf.BenchmarkSuite or dict): Data to be saved.
overwrite (bool, optional): If True, any existing file will be
overwritten. Defaults to False.
Raises:
RuntimeError: If the file already exists and overwrite is False.
"""
if os.path.isfile(self.filename) and (not overwrite):
raise RuntimeError("'%s' exists" % self.filename)
if data is not None:
if self.dont_use_pyperf:
with open(self.filename, 'wb') as fd:
pickle.dump(data, fd)
else:
if isinstance(data, pyperf.BenchmarkSuite):
data.dump(self.filename, replace=overwrite)
else:
with open(self.filename, 'w') as fd:
json.dump(data, fd, sort_keys=True,
separators=(',', ':'))
fd.write("\n")
[docs]def plot_scalings(compare='comm_type', compare_values=None,
plotfile=None, test_name='timed_pipe',
cleanup_plot=False, use_paper_values=False, **kwargs):
r"""Plot comparison of scaling for chosen variable.
Args:
compare (str, optional): Name of variable that should be compared.
Valid values are 'language', 'comm_type', 'platform', 'python_ver'.
Defaults to 'comm_type'.
compare_values (list, optional): Values that should be plotted.
If not provided, the values will be determined based on the
current platform.
plotfile (str, optional): Full path to the file where the plot will be
saved. If not provided, one is created based on the test parameters
in the current working directory.
test_name (str, optional): Name of the test that should be used. Defaults
to 'timed_pipe'.
cleanup_plot (bool, optional): If True, the create plotfile will be
removed before the function returns. This is generally only useful
for testing. Defaults to False.
use_paper_values (bool, optional): If True, use the values from the paper.
Defaults to False.
**kwargs: Additional keyword arguments are passed to plot_scaling_joint.
Returns:
str: Path where the figure was saved.
"""
_lang_list = get_lang_list()
_comm_list = get_comm_list()
if use_paper_values:
default_vars = {'comm_type': 'zmq',
'lang_src': 'python',
'lang_dst': 'python',
'platform': 'Linux',
'python_ver': '2.7'}
default_vals = {'comm_type': ['zmq', 'ipc'],
'language': ['c', 'cpp', 'python'],
'platform': ['Linux', 'MacOS', 'Windows'],
'python_ver': ['2.7', '3.5']}
if kwargs.get('platform', default_vars['platform']) == 'MacOS':
default_vals['language'].append('matlab')
else:
default_vars = {'comm_type': tools.get_default_comm(),
'lang_src': 'python',
'lang_dst': 'python',
'platform': ygg_platform._platform,
'python_ver': _python_version}
default_vals = {'comm_type': _comm_list,
'language': _lang_list,
'platform': ['Linux', 'MacOS', 'Windows'],
'python_ver': ['2.7', '3.5']}
if compare_values is None:
compare_values = default_vals.get(compare, None)
else:
assert isinstance(compare_values, list)
per_message = kwargs.get('per_message', False)
if compare == 'comm_type':
color_var = 'comm_type'
color_map = {'zmq': 'b', 'ipc': 'r', 'rmq': 'g'}
style_var = 'comm_type'
style_map = {'zmq': '-', 'ipc': '--', 'rmq': ':'}
var_list = compare_values
var_kws = [{color_var: k} for k in var_list]
kws2label = lambda x: x['comm_type'].split('Comm')[0] # noqa: E731
yscale = 'linear'
elif compare == 'language':
color_var = 'lang_src'
color_map = {'python': 'b', 'matlab': 'm', 'c': 'g', 'cpp': 'r'}
style_var = 'lang_dst'
style_map = {'python': '-', 'matlab': '-.', 'c': '--', 'cpp': ':'}
var_list = itertools.product(compare_values, repeat=2)
var_kws = [{'lang_src': l1, 'lang_dst': l2} for l1, l2 in var_list]
if 'matlab' in compare_values: # pragma: matlab
var_kws.append({'lang_src': 'matlab', 'lang_dst': 'matlab',
'matlab_running': True})
kws2label = lambda x: '%s to %s' % (x['lang_src'], x['lang_dst']) # noqa: E731
yscale = 'linear' # was log originally
elif compare == 'platform':
color_var = 'platform'
color_map = {'Linux': 'b', 'Windows': 'r', 'MacOS': 'g'}
style_var = None
style_map = None
var_list = compare_values
var_kws = [{color_var: k} for k in var_list]
kws2label = lambda x: x[color_var] # noqa: E731
yscale = 'linear'
elif compare == 'python_ver':
color_var = 'python_ver'
color_map = {'2.7': 'b', '3.4': 'g', '3.5': 'orange', '3.6': 'r',
'3.7': 'm'}
style_var = 'lang_src'
style_map = {'python': '-', 'matlab': '-.', 'c': '--', 'cpp': ':'}
var_list = compare_values
var_kws = [{color_var: k} for k in var_list]
if 'c' in _lang_list:
for k in var_list:
var_kws.append({color_var: k, 'lang_src': 'c', 'lang_dst': 'c'})
kws2label = lambda x: '%s (%s)' % (x[color_var], x[style_var]) # noqa: E731
yscale = 'linear'
else:
raise ValueError("Invalid compare: '%s'" % compare)
assert len(var_kws) > 0
# Raise error if any of the varied keys are set in kwargs
for k in var_kws[0].keys():
if k in kwargs:
raise RuntimeError("Cannot set variable '%s' when comparing '%s' " % (
k, compare))
# Create plotfile name with information in it
if plotfile is None:
plotbase = 'compare_%s_%s' % (test_name, compare.replace('_', ''))
for k in sorted(default_vars.keys()):
v = kwargs.get(k, default_vars[k])
if k not in var_kws[0]:
plotbase += '_%s' % v.replace('.', '')
plotbase += '_%s' % kwargs.get('time_method', 'average')
if per_message:
plotbase += '_per_message'
plotfile = os.path.join(os.getcwd(), plotbase + '.png')
# Iterate over variables
axs = None
fits = {}
for kws in var_kws:
for k, v in default_vars.items():
if k not in kws:
kws[k] = v
label = kws2label(kws)
clr = 'b'
sty = '-'
if color_map is not None:
clr = color_map[kws[color_var]]
if style_map is not None:
sty = style_map[kws[style_var]]
plot_kws = {'color': clr, 'linestyle': sty, 'linewidth': _linewidth}
kws.update(kwargs)
if MatlabModelDriver.is_matlab_running(): # pragma: debug
MatlabModelDriver.kill_all()
assert not MatlabModelDriver.is_matlab_running()
if ((kws.get('matlab_running', False)
and MatlabModelDriver._matlab_engine_installed)): # pragma: matlab
nml = 0
for k in ['lang_src', 'lang_dst']:
if kws[k] == 'matlab':
nml += 1
ml_sessions = []
for i in range(nml):
ml_sessions.append(MatlabModelDriver.start_matlab_engine())
label += ' (Existing)'
plot_kws['color'] = 'orange'
axs, fit = TimedRun.class_plot(test_name=test_name, axs=axs, label=label,
yscale=yscale, plot_kws=plot_kws, **kws)
fits[label] = fit
if ((kws.get('matlab_running', False)
and MatlabModelDriver._matlab_engine_installed)): # pragma: matlab
for v in ml_sessions:
MatlabModelDriver.stop_matlab_engine(*v)
assert not MatlabModelDriver.is_matlab_running()
# Print a table
print('%-20s\t%-20s\t%-20s' % ('Label', 'Time per Message (s)', 'Overhead (s)'))
print('%-20s\t%-20s\t%-20s' % (20 * '=', 20 * '=', 20 * '='))
fmt_row = '%-20s\t%-20.5f\t%-20.5f'
for k in sorted(fits.keys()):
v = fits[k]
print(fmt_row % (k, v[0], v[1]))
# Save plot
if plt is not None:
plt.savefig(plotfile, dpi=600)
logger.info('plotfile: %s', plotfile)
if cleanup_plot:
os.remove(plotfile)
return plotfile
[docs]def pyperfjson_to_pandas(json_file):
r"""Convert pyperf benchmarks json file to a Pandas data frame.
Args:
json_file (str): Full path to the JSON benchmarks file that should be
added.
Returns:
pandas.DataFrame: Data frame version of pyperf benchmarks.
"""
# Load benchmarks
x_js = pyperf.BenchmarkSuite.load(json_file)
meta = copy.deepcopy(x_js.get_metadata())
data = None
# Loop over keys
for k in x_js.get_benchmark_names():
meta['test_name'], rem = k.split('(')
test_keys = rem.split(')')[0].split(',')
meta['communication_type'] = test_keys[2]
meta['language_src'] = test_keys[3]
meta['language_dst'] = test_keys[4]
meta['message_count'] = int(float(test_keys[5]))
meta['message_size'] = int(float(test_keys[6]))
for v in x_js.get_benchmark(k).get_values():
meta['execution_time'] = v
if data is None:
data = {mk: [mv] for mk, mv in meta.items()}
else:
for mk, mv in meta.items():
data[mk].append(mv)
x_pd = pd.DataFrame(data)
return x_pd