from collections import OrderedDict
import shutil
from yggdrasil.components import import_component
from yggdrasil.drivers.ModelDriver import ModelDriver
from yggdrasil.multitasking import MPI, MPIRequestWrapper
[docs]class MPIPartnerModel(ModelDriver):
r"""Class for shadowing a model run on another MPI process."""
_schema_subtype_description = ('Model is being run on another MPI '
'process and this driver is used as '
'as stand-in to monitor it on the root '
'process.')
executable_type = 'other'
language = 'mpi'
full_language = False
base_languages = []
language_ext = []
comms_implicit = True
def __init__(self, *args, **kwargs):
kwargs.pop('function', None)
super(MPIPartnerModel, self).__init__(*args, **kwargs)
self.partner_driver = import_component('model',
self.yml['partner_driver'],
without_schema=True)
self.partner_driver.mpi_partner_init(self)
[docs] def cleanup(self, *args, **kwargs):
r"""Remove compile executable."""
self.partner_driver.mpi_partner_cleanup(self)
super(MPIPartnerModel, self).cleanup(*args, **kwargs)
[docs] @classmethod
def is_language_installed(self):
r"""Determine if this model driver is installed on the current
machine.
Returns:
bool: Truth of if this model driver can be run on the current
machine.
"""
return (MPI is not None)
[docs] @classmethod
def configuration_steps(cls):
r"""Get a list of configuration steps with tuples of flags and
boolean values.
Returns:
OrderedDict: Pairs of descriptions and states for
different steps in the configuration all steps must be
True for the language to be configured.
"""
return OrderedDict()
[docs] @classmethod
def is_comm_installed(cls, **kwargs):
r"""Determine if a comm is installed for the associated programming
language.
Args:
**kwargs: Keyword arguments are ignored.
Returns:
bool: True if a comm is installed for this language.
"""
return True
[docs] @classmethod
def language_version(cls, **kwargs):
r"""Determine the version of this language.
Args:
**kwargs: Keyword arguments are passed to cls.run_executable.
Returns:
str: Version of compiler/interpreter for this language.
"""
import mpi4py
return 'mpi4py %s' % mpi4py.__version__
[docs] @classmethod
def language_executable(cls, **kwargs):
r"""Command required to compile/run a model written in this language
from the command line.
Returns:
str: Name of (or path to) compiler/interpreter executable required
to run the compiler/interpreter from the command line.
"""
return shutil.which('mpiexec')
[docs] def run_model(self, **kwargs):
r"""Dummy stand-in for ModelDriver run_model method."""
return None
[docs] def before_start(self, **kwargs):
r"""Actions to perform before the run starts."""
kwargs['no_queue_thread'] = True
super(MPIPartnerModel, self).before_start(**kwargs)
# def init_mpi_env(self):
# r"""Send env information to the partner model."""
# env = copy.deepcopy(self.env)
# env.update(self.get_io_env())
# self.send_mpi(env, tag=self._mpi_tags['ENV'])
[docs] def init_mpi(self):
r"""Initialize MPI communicator."""
self.send_mpi('START', tag=self._mpi_tags['START'])
self._mpi_requests['stopped'] = MPIRequestWrapper(
self.recv_mpi(tag=self._mpi_tags['STOP_RANK0'], dont_block=True))
[docs] def stop_mpi_partner(self, **kwargs):
r"""Send a message to stop the MPI partner model on the main process."""
kwargs.update(dest=self._mpi_partner_rank,
tag=self._mpi_tags['STOP_RANKX'],
msg=self.n_sent_messages)
super(MPIPartnerModel, self).stop_mpi_partner(**kwargs)
[docs] def run_loop(self):
r"""Loop to check if model is still running."""
if self.check_mpi_request('stopped'):
self.set_break_flag()
else:
self.sleep()
[docs] def graceful_stop(self):
r"""Gracefully stop the driver."""
if self.has_sent_messages:
self.wait_on_mpi_request('stopped', timeout=10)
self.set_break_flag()
super(MPIPartnerModel, self).graceful_stop()
[docs] def kill_process(self):
r"""Kill the process running the model, checking return code."""
self.set_break_flag()
if not self.model_process_complete: # pragma: debug
self._mpi_requests['stopped'].completed = True
super(MPIPartnerModel, self).kill_process()
@property
def model_process_complete(self):
r"""bool: Has the process finished or not. Returns True if the process
has not started."""
return self.check_mpi_request('stopped')