Source code for yggdrasil.drivers.Driver

import os
from yggdrasil import multitasking

[docs]class Driver(multitasking.YggTaskLoop): r"""Base class for all drivers. Args: name (str): Driver name. yml (dict, optional): Dictionary of yaml specification options for this driver. Defaults to empty dict. env (dict, optional): Dictionary of environment variables that should be set when the driver starts. Defaults to {}. comm_env (dict, optional): Dictionary of environment variables for paired IO communication drivers. Defaults to {}. namespace (str, optional): Namespace for set of drivers running together. If not provided, the config option ('rmq', 'namespace') is used. rank (int, optional): Rank of the integration. Defaults to None. Attributes: name (str): Driver name. env (dict): Dictionary of environment variables. comm_env (dict): Dictionary of environment variables for paired IO communication drivers. yml (dict): Dictionary of yaml specification options for this driver. namespace (str): Namespace for set of drivers running together. rank (int): Rank of the integration. """ # ========================================================================= # METHODS THAT MUST HAVE SUPER AT BEGINNING AND CAN BE OVERRIDEN BY CHILD # CLASSES TO ADD DRIVER FUNCTIONALITY. ALL OF THE CHILD CLASSES MUST HAVE # COMPATIBLE FORMATS (THE SAME NAMED ARGUMENTS). def __init__(self, name, yml=None, env=None, comm_env=None, namespace=None, rank=None, **kwargs): if yml is None: yml = {} if env is None: env = {} if comm_env is None: comm_env = {} # Check if thread initialized to avoid doing it twice for drivers # with multiple inheritance that both need to call __init__ if getattr(self, '_thread_initialized', False): # pragma: debug raise Exception("Thread already initialized. " + "Check multiple inheritance") super(Driver, self).__init__(name, **kwargs) self._thread_initialized = True self.debug('') # if ygg_cfg.get('debug', 'ygg') == 'DEBUG': # self.sleeptime = 1.0 # Set defaults if namespace is None: from yggdrasil.config import ygg_cfg namespace = ygg_cfg.get('rmq', 'namespace') self.debug("Setting namespace to %s", namespace) if kwargs.get('working_dir', None) is None: if isinstance(yml, dict) and ('working_dir' in yml): self.working_dir = yml['working_dir'] else: self.working_dir = os.getcwd() # Assign things self.yml = yml self.env = env self.comm_env = comm_env self.namespace = namespace self.rank = rank self._term_meth = "terminate" @property def is_valid(self): r"""bool: True if the driver is functional.""" return True
[docs] def stop(self): r"""Stop the driver.""" if self.was_terminated: self.debug('Driver already terminated.') return self.debug('') self._term_meth = 'stop' self.graceful_stop() self.terminate()
[docs] def graceful_stop(self): r"""Gracefully stop the driver.""" self.debug('')
[docs] def do_terminate(self): r"""Actions that should stop the driver.""" self.debug('Returning')
[docs] def terminate(self): r"""Stop the driver, without attempting to allow it to finish.""" if self.was_terminated: self.debug('Driver already terminated.') return self.do_terminate() self.debug('') super(Driver, self).terminate() self.debug('Returning')