Source code for cis_interface.drivers.ModelDriver

import os
import sys
import copy
import warnings
from pprint import pformat
from cis_interface import platform, tools
from cis_interface.drivers.Driver import Driver
from threading import Event
try:
    from Queue import Queue, Empty
except ImportError:
    from queue import Queue, Empty  # python 3.x
from cis_interface.schema import register_component


[docs]@register_component class ModelDriver(Driver): r"""Base class for Model drivers and for running executable based models. Args: name (str): Driver name. args (str or list): Argument(s) for running the model on the command line. This should be a complete command including the necessary executable and command line arguments to that executable. is_server (bool, optional): If True, the model is assumed to be a server and an instance of :class:`cis_interface.drivers.ServerDriver` is started. Defaults to False. client_of (str, list, optional): The names of ne or more servers that this model is a client of. Defaults to empty list. with_strace (bool, optional): If True, the command is run with strace (on Linux) or dtrace (on MacOS). Defaults to False. strace_flags (list, optional): Flags to pass to strace (or dtrace). Defaults to []. with_valgrind (bool, optional): If True, the command is run with valgrind. Defaults to False. valgrind_flags (list, optional): Flags to pass to valgrind. Defaults to []. model_index (int, optional): Index of model in list of models being run. Defaults to 0. **kwargs: Additional keyword arguments are passed to parent class. Attributes: args (list): Argument(s) for running the model on the command line. process (:class:`cis_interface.tools.CisPopen`): Process used to run the model. is_server (bool): If True, the model is assumed to be a server and an instance of :class:`cis_interface.drivers.ServerDriver` is started. client_of (list): The names of server models that this model is a client of. with_strace (bool): If True, the command is run with strace or dtrace. strace_flags (list): Flags to pass to strace/dtrace. with_valgrind (bool): If True, the command is run with valgrind. valgrind_flags (list): Flags to pass to valgrind. model_index (int): Index of model in list of models being run. Raises: RuntimeError: If both with_strace and with_valgrind are True. """ _language = 'executable' _schema_type = 'model' _schema_required = ['name', 'language', 'args', 'working_dir'] _schema_properties = { 'name': {'type': 'string'}, 'language': {'type': 'string'}, 'args': {'type': 'array', 'items': {'type': 'string'}}, 'inputs': {'type': 'array', 'default': [], 'items': {'$ref': '#/definitions/comm'}}, 'outputs': {'type': 'array', 'default': [], 'items': {'$ref': '#/definitions/comm'}}, 'working_dir': {'type': 'string'}, 'is_server': {'type': 'boolean', 'default': False}, 'client_of': {'type': 'array', 'items': {'type': 'string'}, 'default': []}, 'with_strace': {'type': 'boolean', 'default': False}, 'strace_flags': {'type': 'array', 'default': [], 'items': {'type': 'string'}}, 'with_valgrind': {'type': 'boolean', 'default': False}, 'valgrind_flags': {'type': 'array', 'default': ['--leak-check=full'], # '-v' 'items': {'type': 'string'}}} def __init__(self, name, args, model_index=0, **kwargs): for k, v in self._schema_properties.items(): if k in ['name', 'language', 'args', 'inputs', 'outputs', 'working_dir']: continue default = v.get('default', None) setattr(self, k, kwargs.pop(k, default)) super(ModelDriver, self).__init__(name, **kwargs) self.debug(str(args)) if not isinstance(args, list): args = [args] self.args = [] for a in args: self.args.append(str(a)) self.model_process = None self.queue = Queue() self.queue_thread = None self.event_process_kill_called = Event() self.event_process_kill_complete = Event() # Strace/valgrind if self.with_strace and self.with_valgrind: raise RuntimeError("Trying to run with strace and valgrind.") if (((self.with_strace or self.with_valgrind) and platform._is_win)): # pragma: windows raise RuntimeError("strace/valgrind options invalid on windows.") self.model_index = model_index self.env_copy = ['LANG', 'PATH', 'USER'] self._exit_line = b'EXIT' # print(os.environ.keys()) for k in self.env_copy: if k in os.environ: self.env[k] = os.environ[k]
[docs] @classmethod def is_installed(self): r"""Determine if this model driver is installed on the current machine. Returns: bool: Truth of if this model driver can be run on the current machine. """ return False
[docs] def set_env(self): env = copy.deepcopy(self.env) env.update(os.environ) env['CIS_SUBPROCESS'] = "True" env['CIS_MODEL_INDEX'] = str(self.model_index) return env
[docs] def before_start(self): r"""Actions to perform before the run starts.""" env = self.set_env() pre_args = [] if self.with_strace: if platform._is_linux: pre_cmd = 'strace' elif platform._is_mac: pre_cmd = 'dtrace' pre_args += [pre_cmd] + self.strace_flags elif self.with_valgrind: pre_args += ['valgrind'] + self.valgrind_flags # print(pre_args + self.args) self.model_process = tools.CisPopen(pre_args + self.args, env=env, cwd=self.working_dir, forward_signals=False, shell=platform._is_win) # Start thread to queue output self.queue_thread = tools.CisThreadLoop(target=self.enqueue_output_loop, name=self.name + '.EnqueueLoop') self.queue_thread.start()
[docs] def enqueue_output_loop(self): r"""Keep passing lines to queue.""" # if self.model_process_complete: # self.debug("Process complete") # self.queue_thread.set_break_flag() # self.queue.put(self._exit_line) # return try: line = self.model_process.stdout.readline() except BaseException as e: # pragma: debug print(e) line = "" if len(line) == 0: # self.info("%s: Empty line from stdout" % self.name) self.queue_thread.set_break_flag() self.queue.put(self._exit_line) self.debug("End of model output") try: self.model_process.stdout.close() except BaseException: # pragma: debug pass else: try: self.queue.put(line.decode('utf-8')) except BaseException as e: # pragma: debug warnings.warn("Error in printing output: %s" % e)
[docs] def before_loop(self): r"""Actions before loop.""" self.debug('Running %s from %s with cwd %s and env %s', self.args, os.getcwd(), self.working_dir, pformat(self.env))
[docs] def run_loop(self): r"""Loop to check if model is still running and forward output.""" # Continue reading until there is not any output try: line = self.queue.get_nowait() except Empty: # if self.queue_thread.was_break: # self.debug("No more output") # self.set_break_flag() # This sleep is necessary to allow changes in queue without lock self.sleep() return else: if (line == self._exit_line): self.debug("No more output") self.set_break_flag() else: self.print_encoded(line, end="") sys.stdout.flush()
[docs] def after_loop(self): r"""Actions to perform after run_loop has finished. Mainly checking if there was an error and then handling it.""" self.debug('') if self.queue_thread is not None: self.queue_thread.join(self.sleeptime) if self.queue_thread.is_alive(): self.info("Queue thread still alive") # Loop was broken from outside, kill the queueing thread self.kill_process() # self.queue_thread.set_break_flag() # try: # self.model_process.stdout.close() # except BaseException: # pragma: debug # self.error("Close during concurrent operation") return self.wait_process(self.timeout, key_suffix='.after_loop') self.kill_process()
@property def model_process_complete(self): r"""bool: Has the process finished or not. Returns True if the process has not started.""" if self.model_process is None: # pragma: debug return True return (self.model_process.poll() is not None)
[docs] def wait_process(self, timeout=None, key=None, key_suffix=None): r"""Wait for some amount of time for the process to finish. Args: timeout (float, optional): Time (in seconds) that should be waited. Defaults to None and is infinite. key (str, optional): Key that should be used to register the timeout. Defaults to None and set based on the stack trace. Returns: bool: True if the process completed. False otherwise. """ if not self.was_started: # pragma: debug return True T = self.start_timeout(timeout, key_level=1, key=key, key_suffix=key_suffix) while ((not T.is_out) and (not self.model_process_complete)): # pragma: debug self.sleep() self.stop_timeout(key_level=1, key=key, key_suffix=key_suffix) return self.model_process_complete
[docs] def kill_process(self): r"""Kill the process running the model, checking return code.""" if not self.was_started: # pragma: debug self.debug('Process was never started.') self.set_break_flag() self.event_process_kill_called.set() self.event_process_kill_complete.set() if self.event_process_kill_called.is_set(): # pragma: debug self.debug('Process has already been killed.') return self.event_process_kill_called.set() with self.lock: self.debug('') if not self.model_process_complete: # pragma: debug self.error("Process is still running. Killing it.") try: self.model_process.kill() self.debug("Waiting %f s for process to be killed", self.timeout) self.wait_process(self.timeout, key_suffix='.kill_process') except BaseException: # pragma: debug self.exception("Error killing model process") assert(self.model_process_complete) if self.model_process.returncode != 0: self.error("return code of %s indicates model error.", str(self.model_process.returncode)) self.event_process_kill_complete.set() if self.queue_thread is not None: if not self.was_break: # pragma: debug # Wait for messages to be printed self.debug("Waiting for queue_thread to finish up.") self.queue_thread.wait(self.timeout) if self.queue_thread.is_alive(): # pragma: debug self.debug("Setting break flag for queue_thread to finish up.") self.queue_thread.set_break_flag() self.queue_thread.wait(self.timeout) try: self.model_process.stdout.close() self.queue_thread.wait(self.timeout) except BaseException: # pragma: debug self.exception("Closed during concurrent action") if self.queue_thread.is_alive(): # pragma: debug self.error("Queue thread was not terminated.")
[docs] def graceful_stop(self): r"""Gracefully stop the driver.""" self.debug('') self.wait_process(self.timeout, key_suffix='.graceful_stop') super(ModelDriver, self).graceful_stop()
# def do_terminate(self): # r"""Terminate the process running the model.""" # self.debug('') # self.kill_process() # super(ModelDriver, self).do_terminate()