Source code for yggdrasil.drivers.ModelDriver

import os
import re
import sys
import copy
import logging
import warnings
import subprocess
import shutil
import uuid
import tempfile
import asyncio
from collections import OrderedDict
from pprint import pformat
from yggdrasil import (
    platform, tools, languages, multitasking, constants, rapidjson)
from yggdrasil.components import import_component
from yggdrasil.drivers.Driver import Driver
from queue import Empty
logger = logging.getLogger(__name__)

_map_language_ext = OrderedDict()

[docs]def remove_product(product, check_for_source=False, **kwargs): r"""Delete a single product after checking that the product is not (or does not contain, in the case of directories), source files. Args: product (str): Full path to a file or directory that should be removed. check_for_source (bool, optional): If True, the specified product will be checked to ensure that no source files are present. If a source file is present, a RuntimeError will be raised. Defaults to False. **kwargs: Additional keyword arguments are passed to tools.remove_path. Raises: RuntimeError: If the specified product is a source file and check_for_source is False. RuntimeError: If the specified product is a directory that contains a source file and check_for_source is False. RuntimeError: If the product cannot be removed. """ tools.import_all_modules('yggdrasil.drivers') source_keys = list(_map_language_ext.keys()) if '.exe' in source_keys: # pragma: windows source_keys.remove('.exe') if check_for_source: if os.path.isdir(product): ext_tuple = tuple(source_keys) for root, dirs, files in os.walk(product): for f in files: if f.endswith(ext_tuple): raise RuntimeError(("%s contains a source file " "(%s)") % (product, f)) elif os.path.isfile(product): ext = os.path.splitext(product)[-1] if ext in source_keys: raise RuntimeError("%s is a source file." % product) tools.remove_path(product, **kwargs)
[docs]def remove_products(products, source_products): r"""Delete products produced during the process of running the model. Args: products (list): List of products that should be removed after checking that they are not source files. source_products (list): List of products that should be removed without checking that they are not source files. """ for p in source_products: remove_product(p) for p in products: remove_product(p, check_for_source=True)
[docs]class ModelDriver(Driver): r"""Base class for Model drivers and for running executable based models. Args: name (str): Unique name used to identify the model. This will be used to report errors associated with the model. args (str or list): The path to the file containing the model program that will be run by the driver for the model's language and/or a list of arguments that should be passed as input to the model program or language executable (e.g. source code or configuration file for a domain specific language). products (list, optional): Paths to files created by the model that should be cleaned up when the model exits. Entries can be absolute paths or paths relative to the working directory. Defaults to []. function (str, optional): If provided, an integrated model is created by wrapping the function named here. The function must be located within the file specified by the source file listed in the first argument. If not provided, the model must contain it's own calls to the |yggdrasil| interface. iter_function_over (array, optional): Variable(s) that should be received or sent as an array, but iterated over. Defaults to an empty array and is ignored. source_products (list, optional): Files created by running the model that are source files. These files will be removed without checking their extension so users should avoid adding files to this list unless they are sure they should be deleted. Defaults to []. is_server (bool, dict, optional): If `True`, the model is assumed to be a server for one or more client models and an instance of :class:`yggdrasil.drivers.ServerDriver` is started. The corresponding channel that should be passed to the yggdrasil API will be the name of the model. If is_server is a dictionary, it should contain an 'input' key and an 'output' key. These are required to be the names of existing input and output channels in the model that will be co-opted by the server. (Note: This requires that the co-opted output channel's send method is called once for each time the co-opted input channel's recv method is called. If used with the `function` parameter, `is_server` must be a dictionary. Defaults to False. client_of (str, list, optional): The names of one or more models that this model will call as a server. If there are more than one, this should be specified as a sequence collection (list). The corresponding channel(s) that should be passed to the yggdrasil API will be the name of the server model joined with the name of the client model with an underscore `<server_model>_<client_model>`. There will be one channel created for each server the model is a client of. Defaults to empty list. Use of `client_of` with `function` is not currently supported. timesync (bool, str, optional): If set, the model is assumed to call a send then receive of the state at each timestep for syncronization with other models that are also integrating in time. If a string is provided, it is assumed to be the name of the server that will handle timestep synchronization. If a boolean is provided, the name of the server will be assumed to be 'timestep'. Defaults to False. overwrite (bool, optional): If True, any existing model products (compilation products, wrapper scripts, etc.) are removed prior to the run. If False, the products are not removed. Defaults to True. Setting this to False can improve the performance, particularly for models that take a long time to compile, but this should only be done once the model has been fully debugged to ensure that each run is tested on a clean copy of the model. The value of this keyword also determines whether or not products are removed after a run. preserve_cache (bool, optional): If True model products will be kept following the run, otherwise all products will be cleaned up. Defaults to False. This keyword is superceeded by overwrite. with_strace (bool, optional): If True, the command is run with strace (on Linux) or dtrace (on MacOS). Defaults to False. strace_flags (list, optional): Flags to pass to strace (or dtrace). Defaults to []. with_valgrind (bool, optional): If True, the command is run with valgrind. Defaults to False. valgrind_flags (list, optional): Flags to pass to valgrind. Defaults to []. model_index (int, optional): Index of model in list of models being run. Defaults to 0. copy_index (int, optional): Index of model in set of copies. Defaults to -1 indicating there is only one copy of the model. outputs_in_inputs (bool, optional): If True, outputs from wrapped model functions are passed by pointer as inputs for modification and the return value will be a flag. If False, outputs are limited to return values. Defaults to the value of the class attribute outputs_in_inputs. logging_level (str, optional): The level of logging messages that should be displayed by the model. Defaults to the logging level as determined by the configuration file and environment variables. allow_threading (bool, optional): If True, comm connections will be set up so that the model-side comms can be used by more than one thread. Defaults to False. copies (int, optional): The number of copies of the model that should be created. Defaults to 1. repository_url (str, optional): URL for the git repository containing the model source code. If provided, relative paths in the model YAML definition will be considered relative to the repository root directory. repository_commit (str, optional): Commit that should be checked out in the model repository specified by repository_url. If not provided, the most recent commit on the default branch will be used. description (str, optional): Description of the model. This parameter is only used in the model repository or when providing the model as a service. contact_email (str, optional): Email address that should be used to contact the maintainer of the model. This parameter is only used in the model repository. validation_command (str, optional): Path to a validation command that can be used to verify that the model ran as expected. A non-zero return code is taken to indicate failure. dependencies (list, optional): A list of packages required by the model that are written in the same language as the model. If the package requires dependencies outside the language of the model. use the additional_dependencies parameter to provide them. If you need a version of the package from a specific package manager, a mapping with 'package' and 'package_manager' fields can be provided instead of just the name of the package. additional_dependencies (dict, optional): A mapping between languages and lists of packages in those languages that are required by the model. with_debugger (str, optional): Debugger tool that should be used to run models. This string should include the tool executable and any flags that should be passed to it. **kwargs: Additional keyword arguments are passed to parent class. Class Attributes: language (str): Primary name for the programming language that this compiler should be used for. [REQUIRED] language_aliases (list): Additional/alternative names that the language may be known by. language_ext (list): Extensions for programs written in the target language. [REQUIRED] base_languages (list): Other programming languages that this driver and the interpreter for the target language are dependent on (e.g. Matlab models require Python). executable_type (str): 'compiler' or 'interpreter' to indicate the type of the executable for the language. [AUTOMATED] interface_library (list): Name of the library containing the yggdrasil interface for the target language. [REQUIRED] interface_directories (list): Directories containing code in the interface library for the target language. interface_dependencies (list): List of names of libraries that are required to use the interface on the current platform. This dosn't include libraries required by specific communication types which are described by supported_comm_options. supported_comms (list): Name of comms supported in the target language. [REQUIRED] supported_comm_options (dict): Options for the supported comms like the platforms they are available on and the external libraries required to use them. [REQUIRED] external_libraries (dict): Information on external libraries required for running models in the target language using yggdrasil. internal_libraries (dict): Information on internal libraries required for running models in the target language using yggdrasil. type_map (dict): Mapping of |yggdrasil| extended JSON types to datatypes in the target programming language. [REQUIRED] function_param (dict): Options specifying how different operations would be encoded in the target language (e.g. if statements, for loops, while loops). [REQUIRED] version_flags (list): Flags that should be called with the language executable to determine the version of the compiler/interpreter. Defaults to ['--version']. outputs_in_inputs (bool): If True, outputs are passed by pointer as inputs for modification and the return value should be a flag. Defaults to False. include_arg_count (bool): If True, the number of arguments passed to send/recv calls is prepended to the arguments to the function. Defaults to False. include_channel_obj (bool): If True, the channel object is passed as input to the send/recv calls (after the argument count if it is also present due to include_arg_count being True). Defaults to False. is_typed (bool): True if the language is typed, False otherwise. brackets (tuple): A pair of opening and clossing characters that are used by the language to mark blocks. Set to None and ignored by default. no_executable (bool): True if there is not an executable associated with the language driver. Defaults to False. comms_implicit (bool): True if the comms installed for this driver are not explicitly defined (depend on input parameters). Defaults to False. Attributes: args (list): Argument(s) for running the model on the command line. model_file (str): Full path to the model executable or interpretable script. model_args (list): Runtime arguments for running the model on the command line. model_src (str): Full path to the model source code. For interpreted languages, this will be the same as model_file. model_function_info (dict): Parameters recovered by parsing the provided model function definition. overwrite (bool): If True, any existing compilation products will be overwritten by compilation and cleaned up following the run. Otherwise, existing products will be used and will remain after the run. products (list): Files created by running the model. This includes compilation products such as executables and/or object files. source_products (list): Files created by running the model that are source files. These files will be removed without checking their extension so users should avoid adding files to this list unless they are sure they should be deleted. wrapper_products (list): Files created in order to wrap the model. process (:class:``): Process used to run the model. function (str): The name of the model function that should be wrapped. iter_function_over (array): Variable(s) that should be received or sent as an array, but iterated over. is_server (bool, dict): If True, the model is assumed to be a server and an instance of :class:`yggdrasil.drivers.ServerDriver` is started. If a dict, the input/output channels with the specified names in the dict will be replaced with a server. client_of (list): The names of server models that this model is a client of. timesync (str): If set, the name of the server performing timestep synchronization for the model. with_strace (bool): If True, the command is run with strace or dtrace. strace_flags (list): Flags to pass to strace/dtrace. with_valgrind (bool): If True, the command is run with valgrind. valgrind_flags (list): Flags to pass to valgrind. model_index (int): Index of model in list of models being run. copy_index (int): Index of model in set of copies. modified_files (list): List of pairs of originals and copies of files that should be restored during cleanup. allow_threading (bool): If True, comm connections will be set up so that the model-side comms can be used by more than one thread. copies (int): The number of copies of the model that should be created. repository_url (str): URL for the git repository containing the model source code. If provided, relative paths in the model YAML definition will be considered relative to the repository root directory. repository_commit (str): Commit that should be checked out in the model repository specified by repository_url. description (str): Description of the model. This parameter is only used in the model repository or when providing the model as a service. contact_email (str): Email address that should be used to contact the maintainer of the model. This parameter is only used in the model repository. validation_command (str): Path to a validation command that can be used to verify that the model ran as expected. A non-zero return code is taken to indicate failure. dependencies (list): A list of packages required by the model that are written in the same language as the model. If the package requires dependencies outside the language of the model, use the additional_dependencies parameter to provide them. If you need a version of the package from a specific package manager, a mapping with 'package' and 'package_manager' fields can be provided instead of just the name of the package. additional_dependencies (dict): A mapping between languages and lists of packages in those languages that are required by the model. Raises: RuntimeError: If both with_strace and with_valgrind are True. """ _schema_type = 'model' _schema_subtype_key = 'language' _schema_required = ['name', 'language', 'args', 'working_dir', 'inputs', 'outputs'] _schema_properties = { 'name': {'type': 'string'}, 'language': {'type': 'string', 'default': 'executable', 'description': ( 'The programming language that the model ' 'is written in. A list of available ' 'languages can be found :ref:`here <' 'schema_table_model_subtype_rst>`.')}, 'args': {'type': 'array', 'items': {'type': ['string', 'number']}, 'allowSingular': True}, 'inputs': {'type': 'array', 'default': [{'name': 'input', 'commtype': 'default', 'datatype': constants.DEFAULT_DATATYPE, 'is_default': True}], 'items': {'anyOf': [{'$ref': '#/definitions/comm'}, {'$ref': '#/definitions/comm_driver'}, {'$ref': '#/definitions/file_driver'}]}, 'allowSingular': True, 'aliases': ['input', 'input_file', 'input_files'], 'description': ( 'Zero or more channels carrying input to the model. ' 'A full description of channel entries and the ' 'options available for channels can be found ' ':ref:`here<yaml_comm_options>`.')}, 'outputs': {'type': 'array', 'default': [{'name': 'output', 'commtype': 'default', 'datatype': constants.DEFAULT_DATATYPE, 'is_default': True}], 'items': {'anyOf': [{'$ref': '#/definitions/comm'}, {'$ref': '#/definitions/comm_driver'}, {'$ref': '#/definitions/file_driver'}]}, 'allowSingular': True, 'aliases': ['output', 'output_file', 'output_files'], 'description': ( 'Zero or more channels carrying output from the ' 'model. A full description of channel entries and ' 'the options available for channels can be found ' ':ref:`here<yaml_comm_options>`.')}, 'env': {'type': 'object', 'default': {}, 'additional_properties': {'type': 'string'}}, 'products': {'type': 'array', 'default': [], 'items': {'type': 'string'}}, 'source_products': {'type': 'array', 'default': [], 'items': {'type': 'string'}}, 'working_dir': {'type': 'string'}, 'overwrite': {'type': 'boolean'}, 'preserve_cache': {'type': 'boolean', 'default': False}, 'function': {'type': 'string'}, 'iter_function_over': {'type': 'array', 'default': [], 'items': {'type': 'string'}}, 'is_server': {'anyOf': [{'type': 'boolean'}, {'type': 'object', 'properties': {'input': {'type': 'string'}, 'output': {'type': 'string'}}, 'additionalProperties': False}], 'default': False}, 'client_of': {'type': 'array', 'items': {'type': 'string'}, 'default': [], 'allowSingular': True}, 'timesync': { 'type': 'array', 'allowSingular': True, 'items': { 'anyOf': [ {'type': 'string'}, {'type': 'boolean'}, {'type': 'object', 'required': ['name'], 'properties': { 'name': {'type': 'string', 'default': 'timesync'}, 'inputs': {'type': 'array', 'allowSingular': True, 'aliases': ['input'], 'items': {'type': 'string'}}, 'outputs': {'type': 'array', 'allowSingular': True, 'aliases': ['output'], 'items': {'type': 'string'}}}}]}, 'default': [False]}, 'with_debugger': {'type': 'string'}, 'with_strace': {'type': 'boolean', 'default': False}, 'strace_flags': {'type': 'array', 'default': ['-e', 'trace=memory'], 'items': {'type': 'string'}}, 'with_valgrind': {'type': 'boolean', 'default': False}, 'valgrind_flags': {'type': 'array', 'default': ['--leak-check=full', '--show-leak-kinds=all'], # '-v' 'items': {'type': 'string'}}, 'outputs_in_inputs': {'type': 'boolean'}, 'logging_level': {'type': 'string', 'default': ''}, 'allow_threading': {'type': 'boolean'}, 'copies': {'type': 'integer', 'default': 1, 'minimum': 1}, 'repository_url': {'type': 'string'}, 'repository_commit': {'type': 'string'}, 'description': {'type': 'string'}, 'contact_email': {'type': 'string'}, 'validation_command': {'type': 'string'}, 'dependencies': { 'type': 'array', 'items': {'oneOf': [ {'type': 'string'}, {'type': 'object', 'required': ['package'], 'properties': { 'package': {'type': 'string'}, 'package_manager': {'type': 'string'}, 'arguments': {'type': 'string'}}, 'additionalProperties': False}]}}, 'additional_dependencies': { 'type': 'object', 'additionalProperties': { 'type': 'array', 'items': {'oneOf': [ {'type': 'string'}, {'type': 'object', 'required': ['package'], 'properties': { 'package': {'type': 'string'}, 'package_manager': {'type': 'string'}, 'arguments': {'type': 'string'}}, 'additionalProperties': False}]}}}} _schema_excluded_from_class = ['name', 'language', 'args', 'working_dir'] _schema_excluded_from_class_validation = ['inputs', 'outputs'] _schema_additional_kwargs_base = { 'pushProperties': { ('$properties/inputs/items/anyOf/0/allOf/0/properties/' 'default_file'): ['working_dir'], ('$properties/outputs/items/anyOf/0/allOf/0/properties/' 'default_file'): ['working_dir'], '$properties/inputs/items/anyOf/2': ['working_dir'], '$properties/outputs/items/anyOf/2': ['working_dir']}} language = None language_ext = None language_aliases = [] base_languages = [] executable_type = None interface_library = None interface_directories = [] interface_dependencies = [] supported_comms = [] supported_comm_options = {} external_libraries = {} internal_libraries = {} type_map = None inverse_type_map = None function_param = None version_flags = ['--version'] full_language = True outputs_in_inputs = False include_arg_count = False include_channel_obj = False is_typed = False types_in_funcdef = True interface_inside_exec = False dont_declare_channel = False is_dsl = False brackets = None zero_based = True max_line_width = None no_executable = False comms_implicit = False python_interface = {'table_input': 'YggAsciiTableInput', 'table_output': 'YggAsciiTableOutput', 'array_input': 'YggArrayInput', 'array_output': 'YggArrayOutput', 'pandas_input': 'YggPandasInput', 'pandas_output': 'YggPandasOutput'} _library_cache = {} _config_keys = [] _config_attr_map = [] _executable_search_dirs = None _disconnect_attr = (Driver._disconnect_attr + ['queue', 'queue_thread', 'event_process_kill_called', 'event_process_kill_complete', 'model_process']) _mpi_tags = {'ENV': 1, 'START': 2, 'STOP_RANK0': 3, # Stopped by partner 'STOP_RANKX': 4, # Stopped by root 'BUILDFILE': 5, 'LOCK_BUILDFILE': 6, 'UNLOCK_BUILDFILE': 7} def __init__(self, name, args, model_index=0, copy_index=-1, clients=[], preparsed_function=None, outputs_in_inputs=None, mpi_rank=0, mpi_tag_start=None, **kwargs): self._inv_mpi_tags = {v: k for k, v in self._mpi_tags.items()} self.model_outputs_in_inputs = outputs_in_inputs self.preparsed_function = preparsed_function kwargs['additional_component_properties'] = {'args': args} super(ModelDriver, self).__init__(name, **kwargs) if self.overwrite is None: self.overwrite = (not self.preserve_cache) # Setup process things self.model_process = None self.queue = multitasking.Queue() self.queue_thread = None self.event_process_kill_called = multitasking.Event() self.event_process_kill_complete = multitasking.Event() # Tools if self.with_debugger == 'valgrind': self.with_debugger += ' --leak-check=full --show-leak-kinds=all' elif self.with_debugger == 'strace': self.with_debugger += ' -e trace=memory' elif self.with_debugger == 'asan': self.with_asan = True self.with_debugger = None if self.with_strace: # TODO: deprecate with_strace, strace_flags assert not (self.with_debugger or self.with_valgrind) self.with_debugger = 'strace ' + ' '.join(self.strace_flags) if self.with_valgrind: # TODO: deprecate with_valgrind, valgrind_flags assert not (self.with_debugger or self.with_strace) self.with_debugger = 'valgrind ' + ' '.join(self.valgrind_flags) if ((platform._is_win and self.with_debugger and self.with_debugger.startswith(('strace', 'valgrind')))): raise RuntimeError("strace and valgrind are not available " "on Windows") self.model_index = model_index self.copy_index = copy_index self.clients = clients self.env_copy = ['LANG', 'PATH', 'USER'] self._exit_line = b'EXIT' for k in self.env_copy: if k in os.environ: self.env[k] = os.environ[k] if not self.is_installed(): raise RuntimeError("%s is not installed" % self.language) self.raw_model_file = None self.model_function_file = None self.model_function_info = None self.model_function_inputs = None self.model_function_outputs = None self.model_file = None self.model_args = [] self.model_dir = None self.model_src = None self.args = args self.modified_files = [] self.wrapper_products = [] self._mpi_comm = False self._mpi_rank = 0 self._mpi_size = 1 self._mpi_requests = {} self._mpi_tag = (len(self._mpi_tags) * self.model_index) if mpi_tag_start is not None: self._mpi_tag += mpi_tag_start if multitasking._on_mpi: self._mpi_comm = multitasking.MPI.COMM_WORLD self._mpi_rank = self._mpi_comm.Get_rank() self._mpi_size = self._mpi_comm.Get_size() self._mpi_partner_rank = mpi_rank # Update for function if self.function: args = [self.init_from_function(args)] # Parse arguments self.debug(str(args)) self.parse_arguments(args) assert self.model_file is not None # Remove products if self.overwrite: self.remove_products() # Write wrapper if self.function: self.wrapper_products.append(args[0]) self.wrapper_products += self.write_wrappers() # Install dependencies if self.dependencies: self.install_model_dependencies(self.dependencies) if self.additional_dependencies: for language, v in self.additional_dependencies.items(): drv = import_component('model', language) drv.install_model_dependencies(v)
[docs] @staticmethod def before_registration(cls): r"""Operations that should be performed to modify class attributes prior to registration including things like platform dependent properties and checking environment variables for default settings. """ Driver.before_registration(cls) cls.inverse_type_map = None cls._language = cls.language cls._language_aliases = cls.language_aliases if (((cls.language_ext is not None) and (not isinstance(cls.language_ext, (list, tuple))))): cls.language_ext = [cls.language_ext]
[docs] @staticmethod def after_registration(cls, cfg=None, second_pass=False): r"""Operations that should be performed to modify class attributes after registration. For compiled languages this includes selecting the default compiler. The order of precedence is the config file 'compiler' option for the language, followed by the environment variable set by _compiler_env, followed by the existing class attribute. Args: cfg (YggConfigParser, optional): Config class that should be used to set options for the driver. Defaults to None and yggdrasil.config.ygg_cfg is used. second_pass (bool, optional): If True, the class as already been registered. Defaults to False. """ if cfg is None: from yggdrasil.config import ygg_cfg cfg = ygg_cfg cfg.reload() Driver.after_registration(cls) cls.cfg = cfg for x in cls._config_attr_map: ka = x['attr'] k0 = x.get('key', ka) setattr(cls, ka, cls.cfg.get(cls.language, k0, getattr(cls, ka)))
[docs] @staticmethod def finalize_registration(cls): r"""Operations that should be performed after a class has been fully initialized and registered.""" global _map_language_ext for x in cls.get_language_ext(): if x not in _map_language_ext: _map_language_ext[x] = [] _map_language_ext[x].append(cls.language)
[docs] @classmethod def mpi_partner_init(cls, self): r"""Actions initializing an MPIPartnerModel.""" pass
[docs] @classmethod def mpi_partner_cleanup(cls, self): r"""Actions cleaning up an MPIPartnerModel.""" pass
[docs] @classmethod def get_inverse_type_map(cls): r"""Get the inverse type map. Returns: dict: Mapping from native type to JSON type. """ if cls.inverse_type_map is None: cls.inverse_type_map = {} for k, v in cls.type_map.items(): if k != 'flag': cls.inverse_type_map[v] = k return cls.inverse_type_map
[docs] @classmethod def get_language_for_source(cls, fname, languages=None, early_exit=False, **kwargs): r"""Determine the language that can be used with the provided source file(s). If more than one language applies to a set of multiple files, the language that applies to the most files is returned. Args: fname (str, list): The full path to one or more files. If more than one languages (list, optional): The list of languages that are acceptable. Defaults to None and any language will be acceptable. early_exit (bool, optional): If True, the first language identified will be returned if fname is a list of files. Defaults to False. **kwargs: Additional keyword arguments are passed to recursive calls. Returns: str: The language that can operate on the specified file. """ if isinstance(fname, list): lang_dict = {} for f in fname: try: ilang = cls.get_language_for_source(f, languages=languages, **kwargs) if early_exit: return ilang except ValueError: continue lang_dict.setdefault(ilang, 0) lang_dict[ilang] += 1 if lang_dict: return max(lang_dict, key=lang_dict.get) else: ext = os.path.splitext(fname)[-1] for ilang in cls.get_map_language_ext().get(ext, []): if (languages is None) or (ilang in languages): return ilang raise ValueError("Cannot determine language for file(s): '%s'" % fname)
[docs] @classmethod def get_map_language_ext(cls): r"""Return the mapping of all language extensions.""" return _map_language_ext
[docs] @classmethod def get_all_language_ext(cls): r"""Return the list of all language extensions.""" return list(_map_language_ext.keys())
[docs] @classmethod def get_language_dir(cls): r"""Return the langauge directory.""" return languages.get_language_dir(cls.language)
[docs] @classmethod def get_language_ext(cls): r"""Return the language extension, including from the base classes.""" out = cls.language_ext if out is None: out = [] for x in cls.base_languages: out += import_component('model', x).get_language_ext() return out
[docs] def parse_arguments(self, args, default_model_dir=None): r"""Sort model arguments to determine which one is the executable and which ones are arguments. Args: args (list): List of arguments provided. default_model_dir (str, optional): Path to directory that should be used to normalize the model file path if it is not absolute. Defaults to None and is set to the working_dir. """ if isinstance(args, (str, bytes)): args = args.split() for i in range(len(args)): args[i] = str(args[i]) assert isinstance(args, list) if default_model_dir is None: default_model_dir = self.working_dir self.raw_model_file = args[0] self.model_file = self.raw_model_file self.model_args = args[1:] if (self.language != 'executable') and (not os.path.isabs(self.model_file)): model_file = os.path.normpath(os.path.join(default_model_dir, self.model_file)) self.model_file = model_file self.model_dir = os.path.dirname(self.model_file) self.debug("model_file = '%s', model_dir = '%s', model_args = '%s'", self.model_file, self.model_dir, self.model_args)
[docs] def init_from_function(self, args): r"""Initialize model parameters based on the wrapped function.""" if not self.preparsed_function: yml_mock = dict(self.yml,, args=self.args, function=self.function, is_server=self.is_server, client_of=self.client_of, inputs=self.inputs, outputs=self.outputs, iter_function_over=self.iter_function_over, copies=self.copies) self.preparsed_function = self.preparse_function(yml_mock) self.model_function_info = self.preparsed_function['model_file'] self.model_function_file = self.model_function_info['model_file'] self.model_function_inputs = self.preparsed_function['inputs'] self.model_function_outputs = self.preparsed_function['outputs'] self.model_outputs_in_inputs = self.preparsed_function['outputs_in_inputs'] model_dir, model_base = os.path.split(self.model_function_file) model_base = os.path.splitext(model_base)[0] wrapper_fname = os.path.join(model_dir, 'ygg_%s_%s%s' % (model_base,, self.language_ext[0])) lines = self.write_model_wrapper(, **self.preparsed_function) # Write file if (not os.path.isfile(wrapper_fname)) or self.overwrite: with open(wrapper_fname, 'w') as fd: fd.write('\n'.join(lines)) return wrapper_fname
@property def numeric_logging_level(self): r"""int: Logging level for the model.""" out = self.logger.getEffectiveLevel() if self.logging_level: out = logging.getLevelName(self.logging_level) return out @property def n_sent_messages(self): r"""dict: Number of messages sent by the model via each connection.""" if (self._mpi_rank > 0) and self.check_mpi_request('stopped'): out = self._mpi_requests['stopped'].result return out out = {} for x in self.yml.get('output_drivers', []): x_inst = x.get('instance', None) if x_inst: out[] = x_inst.models_recvd.get(, 0) if self.is_server: for x in self.yml.get('input_drivers', []): x_inst = x.get('instance', None) if x_inst and (x_inst._connection_type == 'rpc_request'): out[] = x_inst.servers_recvd.get(, 0) return out @property def has_sent_messages(self): r"""bool: True if output has been received from the model.""" n_msg = self.n_sent_messages if not n_msg: return True return bool(sum(n_msg.values()))
[docs] def write_wrappers(self, **kwargs): r"""Write any wrappers needed to compile and/or run a model. Args: **kwargs: Keyword arguments are ignored (only included to allow cascade from child classes). Returns: list: Full paths to any created wrappers. """ return []
[docs] @classmethod def install_model_dependencies(cls, dependencies, always_yes=False): r"""Install any dependencies required by the model. Args: dependencies (list): Dependencies that should be installed. always_yes (bool, optional): If True, the package manager will not ask users for input during installation. Defaults to False. """ packages = {} for x in dependencies: if isinstance(x, str): x = {'package': x} if x.get('arguments', None): cls.install_dependency(always_yes=always_yes, **x) else: packages.setdefault(x.get('package_manager', None), []) packages[x.get('package_manager', None)].append( x['package']) for k, v in packages.items(): cls.install_dependency(v, package_manager=k, always_yes=always_yes)
[docs] @classmethod def install_dependency(cls, package=None, package_manager=None, arguments=None, command=None, always_yes=False, command_kwargs=None): r"""Install a dependency. Args: package (str): Name of the package that should be installed. If the package manager supports it, this can include version requirements. package_manager (str, optional): Package manager that should be used to install the package. arguments (str, optional): Additional arguments that should be passed to the package manager. command (list, optional): Command that should be used to install the package. always_yes (bool, optional): If True, the package manager will not ask users for input during installation. Defaults to False. command_kwargs (dict, optional): Keyword arguments that should be passed to the subprocess call for the installation. """ assert package if isinstance(package, str): package = package.split() if package_manager is None: if tools.get_conda_prefix(): package_manager = 'conda' if shutil.which('mamba'): package_manager = 'mamba' elif platform._is_mac: package_manager = 'brew' elif platform._is_linux: package_manager = 'apt' elif platform._is_win: package_manager = 'choco' yes_cmd = [] if command_kwargs is None: command_kwargs = {} if command: cmd = copy.copy(command) elif package_manager in ('conda', 'mamba'): cmd = [package_manager, 'install'] + package if platform._is_win: # pragma: windows # Conda/mamba commands must be run on the shell on # windows as it is implemented as a batch script cmd.insert(0, 'call') command_kwargs['shell'] = True yes_cmd = ['-y'] elif package_manager == 'brew': cmd = ['brew', 'install'] + package elif package_manager == 'apt': cmd = ['apt-get', 'install'] + package if bool(os.environ.get('GITHUB_ACTIONS', False)): # Only enable sudo for testing, otherwise allow the user to # decide if they want to run yggdrasil with sudo, or just # install the dependencies themselves cmd.insert(0, 'sudo') yes_cmd = ['-y'] elif package_manager == 'choco': cmd = ['choco', 'install'] + package elif package_manager == 'vcpkg': cmd = ['vcpkg.exe', 'install', '--triplet', 'x64-windows'] cmd += package else: package_managers = {'pip': 'python', 'cran': 'r'} if package_manager in package_managers: drv = import_component( 'model', package_managers[package_manager]) return drv.install_dependency( package=package, package_manager=package_manager, arguments=arguments, always_yes=always_yes) raise NotImplementedError(f"Unsupported package manager: " f"{package_manager}") if arguments: cmd += arguments.split() if always_yes: cmd += yes_cmd if command_kwargs.get('shell', False): cmd = ' '.join(cmd) subprocess.check_call(cmd, **command_kwargs)
[docs] def model_command(self): r"""Return the command that should be used to run the model. Returns: list: Any commands/arguments needed to run the model from the command line. """ return [self.model_file] + self.model_args
[docs] @classmethod def language_executable(cls, **kwargs): r"""Command required to compile/run a model written in this language from the command line. Returns: str: Name of (or path to) compiler/interpreter executable required to run the compiler/interpreter from the command line. """ if cls.no_executable: return '' raise NotImplementedError("language_executable not implemented for '%s'" % cls.language)
# @classmethod # def compiled_with_asan(cls): # r"""Returns true if the compiled_with_asan flag is set.""" # return (cls.cfg.get( # cls.language, 'compiled_with_asan', 'false').lower() == 'true')
[docs] @classmethod def executable_command(cls, args, unused_kwargs=None, **kwargs): r"""Compose a command for running a program using the exectuable for this language (compiler/interpreter) with the provided arguments. Args: args (list): The program that returned command should run and any arguments that should be provided to it. unused_kwargs (dict, optional): Existing dictionary that unused keyword arguments should be added to. Defaults to None and is ignored. **kwargs: Additional keyword arguments are ignored. Returns: list: Arguments composing the command required to run the program from the command line using the executable for this language. """ raise NotImplementedError("executable_command not implemented for '%s'" % cls.language)
[docs] @classmethod def run_executable(cls, args, return_process=False, debug_flags=None, **kwargs): r"""Run a program using the executable for this language and the provided arguments. Args: args (list): The program that should be run and any arguments that should be provided to it. return_process (bool, optional): If True, the process class is returned without checking the process output. If False, communicate is called on the process and the output is parsed for errors. Defaults to False. debug_flags (list, optional): Debug executable and flags that should be prepended to the executable command. Defaults to None and is ignored. **kwargs: Additional keyword arguments are passed to cls.executable_command and tools.popen_nobuffer. Returns: str: Output to stdout from the run command if return_process is False, the process if return_process is True. Raises: RuntimeError: If the language is not installed. RuntimeError: If there is an error when running the command. """ unused_kwargs = {} cmd = cls.executable_command(args, unused_kwargs=unused_kwargs, **kwargs) if isinstance(debug_flags, list): cmd = debug_flags + cmd try: # Add default keyword arguments if 'working_dir' in unused_kwargs: unused_kwargs.setdefault('cwd', unused_kwargs.pop('working_dir')) unused_kwargs.setdefault('shell', platform._is_win) # Call command logger.debug("Running '%s' from %s" % (' '.join(cmd), unused_kwargs.get('cwd', os.getcwd()))) logger.debug("Process keyword arguments:\n%s\n", ' ' + pformat(unused_kwargs).replace('\n', '\n ')) print(' '.join(cmd)) proc = tools.popen_nobuffer(cmd, **unused_kwargs) if return_process: return proc out, err = proc.communicate() if proc.returncode != 0: if out:'\n%s' % out.decode('utf-8')) if err: # pragma: debug'\n%s' % err.decode('utf-8')) raise RuntimeError("Command '%s' failed with code %d." % (' '.join(cmd), proc.returncode)) out = out.decode("utf-8") logger.debug('%s\n%s' % (' '.join(cmd), out)) return out except (subprocess.CalledProcessError, OSError) as e: # pragma: debug raise RuntimeError("Could not call command '%s': %s" % (' '.join(cmd), e))
[docs] def run_validation(self): r"""Run the validation script for the model.""" if not self.validation_command: return subprocess.check_call(self.validation_command.split(), cwd=self.working_dir)
[docs] def run_model(self, command=None, return_process=True, **kwargs): r"""Run the model. Unless overridden, the model will be run using run_executable. Args: command (list, optional): Command to run. Defaults to None and is created by the model_command method. return_process (bool, optional): If True, the process running the model is returned. If False, the process will block until the model finishes running. Defaults to True. **kwargs: Keyword arguments are passed to run_executable. """ env = self.set_env() if command is None: command = self.model_command() if self.with_debugger: kwargs.setdefault('debug_flags', self.with_debugger.split()) self.debug('Working directory: %s', self.working_dir) self.debug('Command: %s', ' '.join(command)) self.debug('Environment Variables:\n%s', self.pprint(env, block_indent=1)) # Update keywords # NOTE: Setting forward_signals to False allows faster debugging # but should not be used in deployment for cases where models are # not running locally. default_kwargs = dict(env=env, working_dir=self.working_dir, forward_signals=False, shell=platform._is_win) for k, v in default_kwargs.items(): kwargs.setdefault(k, v) return self.run_executable(command, return_process=return_process, **kwargs)
[docs] @classmethod def language_version(cls, version_flags=None, **kwargs): r"""Determine the version of this language. Args: **kwargs: Keyword arguments are passed to cls.run_executable. Returns: str: Version of compiler/interpreter for this language. """ if version_flags is None: version_flags = cls.version_flags return cls.run_executable(version_flags, **kwargs).splitlines()[0].strip()
[docs] @classmethod def is_installed(cls): r"""Determine if this model driver is installed on the current machine. Returns: bool: Truth of if this model driver can be run on the current machine. """ return (cls.is_language_installed() and cls.are_base_languages_installed() and cls.are_dependencies_installed() and cls.is_interface_installed() and cls.is_comm_installed() and cls.is_configured() and (not cls.is_disabled()))
[docs] @classmethod def are_base_languages_installed(cls, missing=None): r"""Determine if the base languages are installed. Args: missing (list, optional): A pre-existing list that missing base languages should be appended to. Returns: bool: True if the base langauges are installed. False otherwise. """ out = True for x in cls.base_languages: if (not out) and (not isinstance(missing, list)): # pragma: no cover break out = import_component('model', x).is_installed() if isinstance(missing, list) and (not out): missing.append(x) if missing: out = False return out
[docs] @classmethod def are_dependencies_installed(cls): r"""Determine if the dependencies are installed for the interface (not including dependencies needed by a particular communication type). Returns: bool: True if the dependencies are installed. False otherwise. """ out = (cls.language is not None) for x in cls.interface_dependencies: if not out: # pragma: no cover break out = cls.is_library_installed(x) return out
[docs] @classmethod def is_interface_installed(cls): r"""Determine if the interface library for the associated programming language is installed. Returns: bool: True if the interface library is installed. """ out = (cls.language is not None) if out and (cls.interface_library is not None): out = cls.is_library_installed(cls.interface_library) return out
[docs] @classmethod def is_language_installed(cls): r"""Determine if the interpreter/compiler for the associated programming language is installed. Returns: bool: True if the language interpreter/compiler is installed. """ out = False if cls.language is not None: try: out = (shutil.which(cls.language_executable()) is not None) except NotImplementedError: # pragma: debug out = False return out
[docs] @classmethod def identify_source_files(cls, args=None, working_dir=None, **kwargs): r"""Determine the source file based on model arguments. Args: args (list, optional): Arguments provided. working_dir (str, optional): Working directory. **kwargs: Additional keyword arguments are ignored. Returns: list: Source files. """ out = [] if args: src = args[0] if (((not cls.is_source_file(src)) and (cls.language_ext is not None) and (os.path.splitext(src)[-1] not in cls.get_all_language_ext()))): src = os.path.splitext(src)[0] + cls.language_ext[0] if working_dir and (not os.path.isabs(src)): src = os.path.normpath(os.path.join(working_dir, src)) if os.path.isfile(src): out.append(src) return out
[docs] @classmethod def is_source_file(cls, fname): r"""Determine if the provided file name points to a source files for the associated programming language by checking the extension. Args: fname (str): Path to file. Returns: bool: True if the provided file is a source file, False otherwise. """ out = False model_ext = os.path.splitext(fname)[-1] if len(model_ext) > 0: out = (model_ext in cls.get_language_ext()) return out
[docs] @classmethod def is_library_installed(cls, lib, **kwargs): r"""Determine if a dependency is installed. Args: lib (str): Name of the library that should be checked. **kwargs: Additional keyword arguments are ignored. Returns: bool: True if the library is installed, False otherwise. """ raise NotImplementedError("Method is_library_installed missing for '%s'" % cls.language)
[docs] @classmethod def is_disabled(cls): return (cls.cfg.get(cls.language, 'disable', 'false').lower() == 'true')
[docs] @classmethod def configuration_steps(cls): r"""Get a list of configuration steps with tuples of flags and boolean values. Returns: OrderedDict: Pairs of descriptions and states for different steps in the configuration all steps must be True for the language to be configured. """ out = OrderedDict([ ('Not Disabled', not cls.is_disabled()), ('Config Section', cls.cfg.has_section(cls.language))]) config_keys = list(cls._config_keys) if out['Config Section'] and len(cls.base_languages) == 0: config_keys.insert(0, 'commtypes') for k in config_keys: out[k] = (cls.cfg.get(cls.language, k, None) is not None) return out
[docs] @classmethod def is_configured(cls): r"""Determine if the appropriate configuration has been performed (e.g. installation of supporting libraries etc.) Returns: bool: True if the language has been configured. """ return all(v for v in cls.configuration_steps().values())
[docs] @classmethod def is_comm_installed(cls, commtype=None, skip_config=False, **kwargs): r"""Determine if a comm is installed for the associated programming language. Args: commtype (str, optional): If provided, this method will only test for installation of the specified communication type. Defaults to None and will check for any installed comm. skip_config (bool, optional): If True, the config list of comms installed for this language will not be used to determine if the comm is installed and the class attribute supported_comm_options will be processed. Defaults to False and config options are used in order to improve performance after initial configuration. platforms (list, optional): Platforms on which the comm can be installed. Defaults to None and is ignored unless there is a value for the commtype in supported_comm_options. This keyword argument is ignored if skip_config is False. libraries (list, optional): External libraries that are required by the specified commtype. Defaults to None and is ignored unless there is a value for the commtype in supported_comm_options. This keyword argument is ignored if skip_config is False. **kwargs: Additional keyword arguments are passed to either is_comm_installed for the base languages, supported languages, or is_library_installed as appropriate. Returns: bool: True if a comm is installed for this language. """ # If there are base_languages for this language, use that language's # driver to check for comm installation. if len(cls.base_languages) > 0: out = True for x in cls.base_languages: if not out: # pragma: no cover break out = import_component('model', x).is_comm_installed( commtype=commtype, skip_config=skip_config, **kwargs) return out if cls.comms_implicit: if commtype is None: return True return (commtype in tools.get_supported_comm()) # Check for installation based on config option if not skip_config: installed_comms = cls.cfg.get(cls.language, 'commtypes', []) if commtype is None: return (len(installed_comms) > 0) else: return (commtype in installed_comms) # Check for any comm if commtype is None: for c in cls.supported_comms: if cls.is_comm_installed(commtype=c, skip_config=skip_config, **kwargs): return True # Check that comm is explicitly supported if commtype not in cls.supported_comms: return False # Set & pop keywords for k, v in cls.supported_comm_options.get(commtype, {}).items(): if kwargs.get(k, None) is None: kwargs[k] = v platforms = kwargs.pop('platforms', None) libraries = kwargs.pop('libraries', []) # Check platforms if (platforms is not None) and (platform._platform not in platforms): return False # pragma: windows # Check libraries if (libraries is not None): for lib in libraries: if not cls.is_library_installed(lib, **kwargs): return False # Check for server on RabbitMQ if commtype in ['rmq', 'rmq_async']: from yggdrasil.communication.RMQComm import check_rmq_server if not check_rmq_server(): return False return True
[docs] @classmethod def configure(cls, cfg, **kwargs): r"""Add configuration options for this language. Args: cfg (CisConfigParser): Config class that options should be set for. Returns: list: Section, option, description tuples for options that could not be set. """ out = [] # Section and executable if (cls.language is not None) and (not cfg.has_section(cls.language)): cfg.add_section(cls.language) # Executable type configuration out += cls.configure_executable_type(cfg) # Locate executable if (((not cls.is_language_installed()) and (cls.executable_type is not None))): # pragma: debug try: exec_file = cls.language_executable() if exec_file is not None: fpath = tools.locate_file( exec_file, directory_list=cls._executable_search_dirs) if fpath: cfg.set(cls.language, cls.executable_type, fpath) except NotImplementedError: pass # Configure libraries out += cls.configure_libraries(cfg) # Only do additional configuration if no base languages if not cls.base_languages: # Installed comms comms = [] for c in cls.supported_comms: if cls.is_comm_installed(commtype=c, cfg=cfg, skip_config=True): comms.append(c) cfg.set(cls.language, 'commtypes', comms) for k in cls._config_keys: if k in kwargs: # pragma: config cfg.set(cls.language, k, kwargs[k]) cls.after_registration(cls, cfg=cfg, second_pass=True) return out
[docs] @classmethod def configure_executable_type(cls, cfg): r"""Add configuration options specific in the executable type before the libraries are configured. Args: cfg (YggConfigParser): Config class that options should be set for. Returns: list: Section, option, description tuples for options that could not be set. """ return []
[docs] @classmethod def configure_libraries(cls, cfg): r"""Add configuration options for external libraries in this language. Args: cfg (YggConfigParser): Config class that options should be set for. Returns: list: Section, option, description tuples for options that could not be set. """ return []
[docs] def get_io_env(self, input_drivers=None, output_drivers=None): r"""Get environment variables set by the input/output drivers. Args: input_drivers (list, optional): Input drivers. Defaults to the yaml entry if not provided. output_drivers (list, optional): Output drivers. Defaults to the yaml entry if not provided. Returns: dict: Environment variables. """ if input_drivers is None: input_drivers = self.yml.get('input_drivers', []) if output_drivers is None: output_drivers = self.yml.get('output_drivers', []) out = {} if self.copies > 1: from yggdrasil.drivers.DuplicatedModelDriver import ( DuplicatedModelDriver) base_name = DuplicatedModelDriver.get_base_name( else: base_name = for x in input_drivers + output_drivers: if 'instance' in x: model_env = x['instance'].model_env if in model_env: out.update(model_env[]) elif base_name in model_env: out.update(model_env[base_name]) return out
# @classmethod # def set_asan_env(cls, env, compiler=None): # r"""Add flags in the case that the program being run links against # a shared ASAN library. # Args: # env (dict): Environment variables dictionary to add library to. # compiler (Compiler): Compiler that should be used to determine # the location of the ASAN library. Defaults to the C compiler. # Returns: # dict: Environment variables dictionary. # """ # if compiler is None: # drv = cls # if cls._language != 'c': # drv = import_component('model', 'c') # compiler = drv.get_tool('compiler') # compiler.init_asan_env(env) # return env
[docs] @classmethod def set_env_class(cls, existing=None, **kwargs): r"""Set environment variables that are instance independent. Args: existing (dict, optional): Existing dictionary of environment variables that new variables should be added to. Defaults to a copy of os.environ. **kwargs: Additional keyword arguments are ignored. Returns: dict: Environment variables for the model process. """ if existing is None: # pragma: no cover existing = {} existing.update(os.environ) # if cls.compiled_with_asan(): # cls.set_asan_env(existing) return existing
[docs] def set_env(self, existing=None, **kwargs): r"""Get environment variables that should be set for the model process. Args: existing (dict, optional): Existing dictionary of environment variables that new variables should be added to. Defaults to a copy of os.environ. **kwargs: Additional keyword arguments are passed to set_env_class. Returns: dict: Environment variables for the model process. """ from yggdrasil.config import ygg_cfg if existing is None: existing = {} existing.update(copy.deepcopy(self.env)) existing.update(self.get_io_env()) env = self.set_env_class(existing=existing, **kwargs) env.update(YGG_SUBPROCESS="True", YGG_MODEL_INDEX=str(self.model_index), YGG_MODEL_LANGUAGE=self.language, YGG_MODEL_COPIES=str(self.copies), # YGG_PYTHON_EXEC=sys.executable, YGG_DEFAULT_COMM=tools.get_default_comm(), YGG_NCLIENTS=str(len(self.clients))) if multitasking._on_mpi: env['YGG_MPI_RANK'] = str(multitasking._mpi_rank) if self.copies > 1: from yggdrasil.drivers.DuplicatedModelDriver import ( DuplicatedModelDriver) env['YGG_MODEL_COPY'] = str(self.copy_index) env['YGG_MODEL_NAME'] = DuplicatedModelDriver.get_base_name( else: env['YGG_MODEL_NAME'] = if self.allow_threading or (self.copies > 1): env['YGG_THREADING'] = '1' if isinstance(self.is_server, dict): env['YGG_SERVER_INPUT'] = self.is_server['input'] env['YGG_SERVER_OUTPUT'] = self.is_server['output'] if self.logging_level: env['YGG_MODEL_DEBUG'] = self.logging_level replace = [k for k in env.keys() if ':' in k] for k in replace: env[k.replace(':', '__COLON__')] = env.pop(k) if ygg_cfg.get('general', 'allow_multiple_omp', False): env['KMP_DUPLICATE_LIB_OK'] = 'True' return env
[docs] def before_start(self, no_queue_thread=False, **kwargs): r"""Actions to perform before the run starts. Args: no_queue_thread (bool, optional): If True, the queue_thread is not created/started. Defaults to False. **kwargs: Keyword arguments are pased to run_model. """ # if multitasking._on_mpi: # self.init_mpi_env() self.model_process = self.run_model(**kwargs) # Start thread to queue output if not no_queue_thread: self.queue_thread = multitasking.YggTaskLoop( target=self.enqueue_output_loop, + '.EnqueueLoop') self.queue_thread.start() if multitasking._on_mpi: self.init_mpi()
[docs] def queue_close(self): r"""Close the queue for messages from the model process.""" self.model_process.stdout.close()
[docs] def queue_recv(self): r"""Receive a message from the model process.""" return self.model_process.stdout.readline()
[docs] def enqueue_output_loop(self): r"""Keep passing lines to queue.""" try: line = self.queue_recv() except BaseException as e: # pragma: debug print(e) line = "" if (len(line) == 0): self.queue_thread.set_break_flag() try: self.queue.put(self._exit_line) except multitasking.AliasDisconnectError: # pragma: debug self.error("Queue disconnected") self.debug("End of model output") try: self.queue_close() except BaseException: # pragma: debug pass else: try: self.queue.put(line.decode('utf-8')) except BaseException as e: # pragma: debug warnings.warn("Error in printing output: %s" % e)
[docs] def before_loop(self): r"""Actions before loop.""" self.debug('Running %s from %s with cwd %s and env %s', self.model_command(), os.getcwd(), self.working_dir, pformat(self.env))
# def init_mpi_env(self): # r"""Receive env information to the partner model.""" # self.env = self.recv_mpi(tag=self._mpi_tags['ENV'])
[docs] def init_mpi(self): r"""Initialize MPI communicator.""" if self._mpi_rank == 0: self._mpi_comm = None else: self.recv_mpi(tag=self._mpi_tags['START']) self._mpi_requests['stopped'] = multitasking.MPIRequestWrapper( self.recv_mpi(tag=self._mpi_tags['STOP_RANKX'], dont_block=True))
[docs] def send_mpi(self, msg, tag=0, dont_block=False): r"""Send an MPI message.""" self.debug("send %d (%d) [%s]: %s (blocking=%s)", tag, self._mpi_tag + tag, self._inv_mpi_tags[tag], msg, not dont_block) kws = {'dest': self._mpi_partner_rank, 'tag': (self._mpi_tag + tag)} if dont_block: # pragma: debug # return self._mpi_comm.isend(msg, **kws) raise NotImplementedError("Non-blocking MPI send not tested.") else: return self._mpi_comm.send(msg, **kws)
[docs] def recv_mpi(self, tag=0, dont_block=False): r"""Receive an MPI message.""" self.debug('recv %d (%d) [%s] (blocking=%s)', tag, self._mpi_tag + tag, self._inv_mpi_tags[tag], not dont_block) kws = {'source': self._mpi_partner_rank, 'tag': (self._mpi_tag + tag)} if dont_block: return self._mpi_comm.irecv(**kws) else: return self._mpi_comm.recv(**kws)
[docs] def stop_mpi_partner(self, msg=None, dest=0, tag=None): r"""Send a message to stop the MPI partner model on the main process.""" if self._mpi_comm and (not self.check_mpi_request('stopping')): if tag is None: tag = self._mpi_tags['STOP_RANK0'] if msg is None: if self.errors or self.model_process_returncode: msg = 'ERROR' else: msg = 'STOPPING' self.debug("stop_mpi_partner: %d, %s", tag, msg) # Don't call test() self._mpi_requests['stopping'] = multitasking.MPIRequestWrapper( self.send_mpi(msg, tag=tag), completed=True)
[docs] def wait_on_mpi_request(self, name, timeout=False): r"""Wait for a request to be completed. Args: name (str): Name that request was registered under. Returns: bool, str: Received message or False if the request does not exist or is not complete. """ self.debug("Waiting on '%s' (timeout=%s)", name, timeout) try: out = self._mpi_requests[name].wait(timeout=timeout) if out == 'ERROR': # pragma: debug self.errors.append(out) return out except asyncio.TimeoutError: # pragma: debug"Timeout for MPI '%s' request", name)
[docs] def check_mpi_request(self, name): r"""Check if a request has been completed. Args: name (str): Name that request was registered under. Returns: bool, str: Received message or False if the request does not exist or is not complete. """ if self._mpi_comm and (name in self._mpi_requests): out, msg = self._mpi_requests[name].test() if out and (msg == 'ERROR'): # pragma: debug self.errors.append(msg) return out return False
[docs] def set_break_flag(self, *args, **kwargs): r"""Stop the model loop.""" self.stop_mpi_partner() super(ModelDriver, self).set_break_flag(*args, **kwargs)
[docs] def run_loop(self): r"""Loop to check if model is still running and forward output.""" # Continue reading until there is not any output if self.model_process_returncode: self.errors.append(self.model_process_returncode) if self.check_mpi_request('stopped'): self.debug("Stop requested by MPI partner.") self.set_break_flag() try: line = self.queue.get_nowait() except Empty: # This sleep is necessary to allow changes in queue without lock self.sleep() return except multitasking.AliasDisconnectError: # pragma: debug self.error("Queue disconnected") self.set_break_flag() else: if (line == self._exit_line) or self.check_mpi_request('stopped'): self.debug("No more output") self.set_break_flag() else: self.print_encoded(line, end="") sys.stdout.flush()
[docs] def run_finally(self): r"""Actions to perform in finally clause of try/except wrapping run.""" # Ensure the MPI partner gets cleaned up following an error self.stop_mpi_partner() super(ModelDriver, self).run_finally()
[docs] def after_loop(self): r"""Actions to perform after run_loop has finished. Mainly checking if there was an error and then handling it.""" self.debug('') self.stop_mpi_partner() if self.queue_thread is not None: self.queue_thread.join(self.sleeptime) if self.queue_thread.is_alive(): self.debug("Queue thread still alive") # Loop was broken from outside, kill the queueing thread self.kill_process() return self.wait_process(self.timeout, key_suffix='.after_loop') self.kill_process() self.debug(("Closing input/output drivers:\n" "\tinput: %s\n\toutput: %s") % ([drv['name'] for drv in self.yml.get('input_drivers', [])], [drv['name'] for drv in self.yml.get('output_drivers', [])])) for drv in self.yml.get('input_drivers', []): if 'instance' in drv: if self.language == 'mpi': drv['instance'].wait(self.timeout) drv['instance'].on_model_exit('output',, errors=self.errors) for drv in self.yml.get('output_drivers', []): if 'instance' in drv: if self.language == 'mpi': drv['instance'].wait(self.timeout) drv['instance'].on_model_exit('input',, errors=self.errors)
@property def io_errors(self): r"""list: Errors produced by input/output drivers to this model.""" errors = [] for drv in self.yml.get('input_drivers', []): if 'instance' in drv: errors += drv['instance'].errors for drv in self.yml.get('output_drivers', []): if 'instance' in drv: errors += drv['instance'].errors return errors @property def model_process_complete(self): r"""bool: Has the process finished or not. Returns True if the process has not started.""" if self.model_process is None: # pragma: debug return True return (self.model_process.poll() is not None) @property def model_process_returncode(self): r"""int: Return code for the model process where non-zero values indicate that there was an error.""" if self.model_process_complete and (self.model_process is not None): return self.model_process.returncode return 0
[docs] def wait_process(self, timeout=None, key=None, key_suffix=None): r"""Wait for some amount of time for the process to finish. Args: timeout (float, optional): Time (in seconds) that should be waited. Defaults to None and is infinite. key (str, optional): Key that should be used to register the timeout. Defaults to None and set based on the stack trace. Returns: bool: True if the process completed. False otherwise. """ if not self.was_started: # pragma: debug return True return self.wait_on_function(lambda: self.model_process_complete, timeout=timeout, key_level=1, key=key, key_suffix=key_suffix)
[docs] def kill_process(self): r"""Kill the process running the model, checking return code.""" if not self.was_started: # pragma: debug self.debug('Process was never started.') self.set_break_flag() self.event_process_kill_called.set() self.event_process_kill_complete.set() if self.event_process_kill_called.is_set(): # pragma: debug self.debug('Process has already been killed.') return self.event_process_kill_called.set() with self.lock: self.debug('') ignore_error_code = False if not self.model_process_complete: # pragma: debug self.debug("Process is still running. Killing it.") try: self.model_process.kill() self.debug("Waiting %f s for process to be killed", self.timeout) self.wait_process(self.timeout, key_suffix='.kill_process') except BaseException: # pragma: debug self.exception("Error killing model process") if not self.has_sent_messages: ignore_error_code = True assert self.model_process_complete if (((self.model_process_returncode != 0) and (not ignore_error_code))): self.error(("return code of %s indicates model error. " "(sent messages: %s)"), str(self.model_process_returncode), self.n_sent_messages) self.event_process_kill_complete.set() if self.queue_thread is not None: if not self.was_break: # pragma: debug # Wait for messages to be printed self.debug("Waiting for queue_thread to finish up.") self.queue_thread.wait(self.timeout) if self.queue_thread.is_alive(): # pragma: debug self.debug("Setting break flag for queue_thread to finish up.") self.queue_thread.set_break_flag() self.queue_thread.wait(self.timeout) try: self.queue_close() self.queue_thread.wait(self.timeout) except BaseException: # pragma: debug self.exception("Closed during concurrent action") if self.queue_thread.is_alive(): # pragma: debug self.error("Queue thread was not terminated.")
[docs] def graceful_stop(self): r"""Gracefully stop the driver.""" self.debug('') if self.has_sent_messages: self.wait_process(self.timeout, key_suffix='.graceful_stop') super(ModelDriver, self).graceful_stop()
[docs] def cleanup_products(self): r"""Remove products created in order to run the model.""" if self.overwrite and (not self.preserve_cache): self.remove_products() self.restore_files()
[docs] def cleanup(self): r"""Remove compile executable.""" self.cleanup_products() super(ModelDriver, self).cleanup()
[docs] def restore_files(self): r"""Restore modified files to their original form.""" for (original, modified) in self.modified_files: if os.path.isfile(original): os.remove(modified) shutil.move(original, modified)
[docs] def remove_products(self): r"""Delete products produced during the process of running the model.""" products = self.products source_products = self.source_products + self.wrapper_products remove_products(products, source_products)
[docs] @classmethod def cleanup_dependencies(cls, products=[], verbose=False, **kws): r"""Cleanup dependencies.""" for x in products: if os.path.isfile(x): if verbose: # pragma: debug print("Removing %s" % x) os.remove(x)
# Methods for automated model wrapping
[docs] @classmethod def run_code(cls, lines, process_kwargs={}, **kwargs): r"""Run code by first writing it as an executable and then calling the driver. Args: lines (list): Lines of code to be wrapped as an executable. process_kwargs (dict, optional): Keyword arguments that should be passed to run_model. Defaults to {}. **kwargs: Additional keyword arguments are passed to the write_executable method. """ name = 'test_code_%s' % str(uuid.uuid4())[:13].replace('-', '_') working_dir = os.getcwd() code_dir = tempfile.gettempdir() # code_dir = working_dir fname = os.path.join(code_dir, name + cls.get_language_ext()[0]) lines = cls.write_executable(lines, **kwargs) with open(fname, 'w') as fd: fd.write('\n'.join(lines)) inst = None try: assert os.path.isfile(fname) inst = cls(name, [fname], working_dir=working_dir) inst.run_model(return_process=False, **process_kwargs) except BaseException: # pragma: debug logger.error('Failed generated code:\n%s' % '\n'.join(lines)) raise finally: if os.path.isfile(fname): os.remove(fname) if inst is not None: inst.cleanup()
[docs] @classmethod def format_function_param(cls, key, default=None, replacement=None, ignore_method=False, **kwargs): r"""Return the formatted version of the specified key. Args: key (str): Key in cls.function_param mapping that should be formatted. default (str, optional): Format that should be returned if key is not in cls.function_param. Defaults to None. replacement (str, optional): Format that should be used instead of the one in cls.function_param. Defaults to None. **kwargs: Additional keyword arguments are used in formatting the request function parameter. Returns: str: Formatted string. Raises: NotImplementedError: If key is not in cls.function_param and default is not set. """ if replacement is not None: fmt = replacement elif (not ignore_method) and hasattr(cls, 'format_function_param_%s' % key): return getattr(cls, 'format_function_param_%s' % key)(**kwargs) else: if (key not in cls.function_param) and (default is None): raise NotImplementedError(("Language %s dosn't have an entry in " "function_param for key '%s'") % (cls.language, key)) fmt = cls.function_param.get(key, default) return fmt.format(**kwargs)
[docs] @classmethod def parse_var_definition(cls, io, value, outputs_in_inputs=None): r"""Extract information about input/output variables from a string definition. Args: io (str): Description of variables contained in the provided string. Must be 'inputs' or 'outputs'. value (str): String containing one or more variable definitions. outputs_in_inputs (bool, optional): If True, the outputs are presented in the function definition as inputs. Defaults to False. Returns: list: List of information about the variables contained in the provided string. Raises: AssertionError: If io is not 'inputs' or 'outputs'. NotImplementedError: If the def_regex for the specified io is not defined. """ if outputs_in_inputs is None: outputs_in_inputs = cls.outputs_in_inputs assert io in ['inputs', 'outputs'] if f'{io}_def_regex' not in cls.function_param: # pragma: debug raise NotImplementedError( f"'{io}_def_regex' not defined for " f"language {cls.language}.") if 'multiple_outputs' in cls.function_param: multi_re = cls.function_param['multiple_outputs'] for x in '[]()': multi_re = multi_re.replace(x, '\\' + x) multi_re = multi_re.format(outputs='(.*?)') match =, value) if match is not None: value = new_val = [] io_re = r'^' + cls.format_function_param(f'{io}_def_regex') + r'$' for i, ivar in enumerate(cls.split_variables(value)): igrp = {'name': ivar} x =, ivar) if x is not None: igrp = x.groupdict() for k in list(igrp.keys()): if igrp[k] is None: del igrp[k] if 'native_type' in igrp: igrp['native_type'] = igrp['native_type'].replace(' ', '') igrp['datatype'] = cls.get_json_type(igrp['native_type']) igrp['position'] = i if (io == 'outputs') and outputs_in_inputs: igrp = cls.input2output(igrp) new_val.append(igrp) return new_val
[docs] @classmethod def parse_function_definition(cls, model_file, model_function, contents=None, match=None, expected_outputs=[], outputs_in_inputs=None): r"""Get information about the inputs & outputs to a model from its defintition if possible. Args: model_file (str): Full path to the file containing the model function's declaration. model_function (str): Name of the model function. contents (str, optional): String containing the function definition. If not provided, the function definition is read from model_file. match (re.Match, optional): Match object for the function regex. If not provided, a search is performed using function_def_regex. expected_outputs (list, optional): List of names or variable information dictionaries for outputs that are expected to be extracted from the function's definition. This variable is only used if outputs_in_inputs is True and outputs are not extracted from the function's defintion using the regex for this language. Defaults to []. outputs_in_inputs (bool, optional): If True, the outputs are presented in the function definition as inputs. Defaults to False. Returns: dict: Parameters extracted from the function definitions. """ if outputs_in_inputs is None: outputs_in_inputs = cls.outputs_in_inputs out = {} if match or ('function_def_regex' in cls.function_param): if not match: function_regex = cls.format_function_param( 'function_def_regex', function_name=model_function) if contents is None: with open(model_file, 'r') as fd: contents = match =, contents) if not match: # pragma: debug raise RuntimeError(("Could not find function match in file:\n" "%s\nfor regex:\nr'%s'") % (pformat(contents), function_regex)) # Match brackets to determine where the function definition is if isinstance(cls.brackets, tuple): assert len(cls.brackets) == 2 contents = counts = {k: 0 for k in cls.brackets} first_zero = 0 re_brackets = r'[\%s\%s]' % cls.brackets for x in re.finditer(re_brackets, contents): counts[] += 1 if (((counts[cls.brackets[0]] > 0) and (counts[cls.brackets[0]] == counts[cls.brackets[1]]))): first_zero = x.span(0)[1] break assert ((first_zero == 0) or (first_zero == len(contents))) # This is currently commented as regex's are # sufficient so far, but this may be needed in the # future to isolate single definitions. # if (first_zero != 0) and first_zero != len(contents): # contents = contents[:first_zero] # match =, contents) # assert match out = match.groupdict() for k in list(out.keys()): if out[k] is None: del out[k] for io in ['inputs', 'outputs']: if io in out: out[io] = cls.parse_var_definition( io, out[io], outputs_in_inputs=outputs_in_inputs) out['model_file'] = model_file if outputs_in_inputs and expected_outputs and (not out.get('outputs', False)): missing_expected_outputs = [] for o in expected_outputs: if isinstance(o, dict): o = o['name'] missing_expected_outputs.append(o) out['outputs'] = [] for x in out['inputs']: if x['name'] not in missing_expected_outputs: continue missing_expected_outputs.remove(x['name']) out['outputs'].append(cls.input2output(x)) if missing_expected_outputs: # pragma: debug raise ValueError(("Could not locate %d output " "variable(s) in input: %s") % (len(missing_expected_outputs), missing_expected_outputs)) for x in out['outputs']: out['inputs'].remove(x) if out.get('flag_var', None): flag_var = {'name': out.pop('flag_var'), 'datatype': {'type': 'flag'}} if out.get('flag_type', None): flag_var['native_type'] = out.pop('flag_type').replace(' ', '') flag_var['datatype'] = cls.get_json_type(flag_var['native_type']) out['flag_var'] = flag_var cls.check_flag_var(out, outputs_in_inputs=outputs_in_inputs) return out
[docs] @classmethod def check_flag_var(cls, info, outputs_in_inputs=None): r"""Check if the flag variable should be treated as an output. Args: info (dict): Information about the function. outputs_in_inputs (bool, optional): If True, the outputs are presented in the function definition as inputs. Defaults to False. """ if outputs_in_inputs is None: # pragma: debug outputs_in_inputs = cls.outputs_in_inputs flag_t = cls.type_map['flag'] if (((info.get('flag_var', {}).get('native_type', flag_t) != flag_t) or (not outputs_in_inputs))): if info.get('outputs', []): # pragma: debug logger.warn("Support for returning outputs via parameter(s) " "and return value is not yet support. The return " "value will be assumed to be a flag indicating " "the success of the model.") info['outputs_in_inputs'] = True else: info['outputs'] = [info.pop('flag_var')] info['outputs_in_inputs'] = False
[docs] @classmethod def channels2vars(cls, channels): r"""Convert a list of channels to a list of variables. Args: channels (list): List of channel dictionaries. Returns: list: List of variables. """ if not isinstance(channels, list): channels = [channels] variables = [] for x in channels: variables += x['vars'] def get_pos(x): return x.get('position', 0) variables = sorted(variables, key=get_pos) return variables
[docs] @classmethod def expand_server_io(cls, inputs, outputs, client_comms=[]): r"""Update inputs/outputs w/ information about server that will be using them. Args: inputs (list): List of model inputs including types. outputs (list): List of model outputs including types. client_comms (list, optional): List of the names of client comms that should be removed from the list of outputs. Defaults to []. """ if client_comms: warnings.warn("When wrapping a model function, client comms " "must either be initialized outside the function, " "pass a 'global_scope' parameter to the " "comm initialization (e.g. Python, R, Matlab), " "or use a 'WITH_GLOBAL_SCOPE' macro " "(e.g. C, C++, Fortran) around the initialization " "so that they are persistent " "across calls and the call or recv/send methods " "must be called explicitly (as opposed to the " "function inputs/outputs which will be handled " "by the wrapper). This model's client comms are:\n" "\t%s" % client_comms) # Replace server input w/ split input/output and remove client # connections from inputs for i, x in enumerate(inputs): if x.get('server_replaces', False): inputs[x['server_replaces']['input_index']] = ( x['server_replaces']['input']) outputs.insert(x['server_replaces']['output_index'], x['server_replaces']['output']) rm_outputs = [i for i, x in enumerate(outputs) if x['name'] in client_comms] for i in rm_outputs[::-1]: outputs.pop(i)
[docs] @classmethod def preparse_function(cls, yml): r"""Extract information about inputs and outputs based on the function being wrapped. Args: yml (dict): Options that will be used to initialize the model. Returns: dict: Information about the parsed function. """ if 'function' not in yml: return if yml.get('is_server', False): assert isinstance(yml['is_server'], dict) if cls.function_param is None: raise ValueError(f"Language {cls.language} is not parameterized " f"and so functions cannot be automatically " f"wrapped as a model.") source_files = cls.identify_source_files(**yml) if not source_files: # pragma: debug raise ValueError("Could not identify any source files.") model_function_file = source_files[0] if not os.path.isfile(model_function_file): # pragma: debug raise ValueError("Source file does not exist: '%s'" % model_function_file) # Update input/outputs based on parsed source code client_comms = ['%s:%s_%s' % (yml['name'], x, yml['name']) for x in yml.get('client_of', [])] model_function_inputs = copy.copy(yml.get('inputs', [])) model_function_outputs = copy.copy(yml.get('outputs', [])) cls.expand_server_io( model_function_inputs, model_function_outputs, client_comms=client_comms) expected_outputs = [] for x in model_function_outputs: expected_outputs += x.get('vars', []) model_outputs_in_inputs = yml.get('outputs_in_inputs', None) model_function_info = cls.parse_function_definition( model_function_file, yml['function'], expected_outputs=expected_outputs, outputs_in_inputs=model_outputs_in_inputs) if model_outputs_in_inputs is None: model_outputs_in_inputs = model_function_info.get( 'outputs_in_inputs', None) model_flag = cls.update_io_from_function( model_function_info, yml['function'], inputs=model_function_inputs, outputs=model_function_outputs, iter_function_over=yml.get('iter_function_over', [])) yml['preparsed_function'] = { 'model_file': model_function_info, 'model_function': yml['function'], 'inputs': model_function_inputs, 'outputs': model_function_outputs, 'model_flag': model_flag, 'outputs_in_inputs': model_outputs_in_inputs, 'copies': yml.get('copies', 1), 'iter_function_over': yml.get('iter_function_over', []), 'skip_update_io': True} return yml['preparsed_function']
[docs] @classmethod def update_io_from_function(cls, model_file, model_function, inputs=[], outputs=[], contents=None, outputs_in_inputs=None, iter_function_over=[]): r"""Update inputs/outputs from the function definition. Args: model_file (str): Full path to the file containing the model function's declaration. model_function (str): Name of the model function. inputs (list, optional): List of model inputs including types. Defaults to []. outputs (list, optional): List of model outputs including types. Defaults to []. contents (str, optional): Contents of file to parse rather than re-reading the file. Defaults to None and is ignored. outputs_in_inputs (bool, optional): If True, the outputs are presented in the function definition as inputs. Defaults to False. iter_function_over (array, optional): Variable(s) that should be received or sent as an array, but iterated over. Defaults to an empty array and is ignored. Returns: dict, None: Flag variable used by the model. If None, the model does not use a flag variable. """ from yggdrasil.datatypes import is_default_typedef # Read info from the source code if (((isinstance(model_file, str) and os.path.isfile(model_file)) or (contents is not None))): # pragma: debug expected_outputs = [] for x in outputs: expected_outputs += x.get('vars', []) info = cls.parse_function_definition(model_file, model_function, contents=contents, expected_outputs=expected_outputs) logger.warn("The new execution pattern reuses the parsed " "source code parameters. Double check results:\n%s." % pformat(info)) elif isinstance(model_file, dict): info = model_file else: info = {"inputs": [], "outputs": []} if outputs_in_inputs is None: # pragma: debug outputs_in_inputs = info.get('outputs_in_inputs', cls.outputs_in_inputs) info_map = {io: OrderedDict([(x['name'], x) for x in info.get(io, [])]) for io in ['inputs', 'outputs']} # Determine flag variable flag_var = None if info.get('flag_var', None): flag_var = dict(info['flag_var'], name='model_flag') # Check for vars matching names of input/output channels for io, io_var in zip(['inputs', 'outputs'], [inputs, outputs]): if (io == 'outputs') and outputs_in_inputs: io_map = info_map['inputs'] else: io_map = info_map[io] for x in io_var: if x.get('vars', []): continue var_name = x['name'].split(':')[-1] if var_name in io_map: x['vars'] = [{'name': var_name}] for k in ['length', 'shape', 'ndim']: kvar = '%s_var' % k if kvar in io_map[var_name]: x['vars'].append({"name": io_map[var_name][kvar]}) # Move variables if outputs in inputs if outputs_in_inputs: if ((((len(inputs) + len(outputs)) == len(info.get('inputs', []))) and (len(info.get('outputs', [])) == 0))): for i, vdict in enumerate(info['inputs'][:len(inputs)]): inputs[i].setdefault('vars', [{'name': vdict['name']}]) assert len(inputs[i]['vars']) == 1 assert inputs[i]['vars'][0]['name'] == vdict['name'] for i, vdict in enumerate(info['inputs'][len(inputs):]): outputs[i].setdefault('vars', [{'name': vdict['name']}]) assert len(outputs[i]['vars']) == 1 assert outputs[i]['vars'][0]['name'] == vdict['name'] for x in outputs: for i, v in enumerate(x.get('vars', [])): if v['name'] in info_map['inputs']: info_map['outputs'][v['name']] = cls.input2output( info_map['inputs'].pop(v['name'])) for io, io_var in zip(['inputs', 'outputs'], [inputs, outputs]): for x in io_var: x['channel_name'] = x['name'] x['channel'] = (x['name'].split(':', 1)[-1] + '_%s_channel' % io[:-1]) for i, v in enumerate(x.get('vars', [])): if v['name'] in info_map[io]: info_map[io][v['name']].update(v) x['vars'][i] = info_map[io][v['name']] if (len(io_var) == 1) and info_map.get(io, False): io_var[0].setdefault('vars', list(info_map[io].values())) for x in io_var: if 'vars' not in x: x['vars'] = [copy.deepcopy(x)] x['vars'][0]['name'] = x['name'].split(':', 1)[-1] for v in x['vars']: if isinstance(v.get('datatype', None), str): v['datatype'] = {'type': v['datatype']} if isinstance(x.get('datatype', None), str): x['datatype'] = {'type': x['datatype']} # Check for user defined length variables and add flag to # length variables for x in io_var: for k in ['length', 'shape', 'ndim']: for v in x['vars']: if k + '_var' in v: v[k + '_var'] = info_map[io][v[k + '_var']] v[k + '_var']['is_length_var'] = True else: v[k + '_var'] = False # Update datatypes for x in io_var: non_length = [] for v in x['vars']: if not v.get('is_length_var', False): non_length.append(v) if ((x.get('datatype', None) and (not is_default_typedef(x['datatype'])))): if (len(non_length) == 1): non_length[0]['datatype'] = x['datatype'] else: # TODO: Remove types associated with length? assert x['datatype']['type'] == 'array' assert len(x['datatype']['items']) == len(non_length) for v, t in zip(non_length, x['datatype']['items']): v['datatype'] = t elif any(xx.get('datatype', None) for xx in non_length): if (len(non_length) == 1): x['datatype'] = non_length[0]['datatype'] x['datatype']['allowWrapped'] = True else: x['datatype'] = { 'type': 'array', 'items': [v['datatype'] for v in non_length]} x['datatype']['from_function'] = cls.is_typed if cls.is_typed: for v in x['vars']: if 'native_type' not in v: v['native_type'] = cls.get_native_type(**v) # Update types based on iteration for x in io_var: for v in x.get('vars', [x]): if v['name'] in iter_function_over: v['iter_datatype'] = copy.deepcopy(v.get('datatype', {})) if v.get('datatype', {}): assert v['datatype']['type'] == 'scalar' v['datatype']['type'] = '1darray' v.pop('native_type', None) v['native_type'] = cls.get_native_type(**v) # Finalize io variables for x in inputs: cls.finalize_function_io('input', x) for x in outputs: cls.finalize_function_io('output', x) return flag_var
[docs] @classmethod def add_extra_vars(cls, direction, x): r"""Add extra variables required for communication. Args: direction (str): Direction of channel ('input' or 'output') x (dict): Dictionary describing the variable. """ x.setdefault('extra_vars', {})
[docs] @classmethod def finalize_function_io(cls, direction, x): r"""Finalize info for an input/output channel following function parsing. Args: direction (str): Direction of channel ('input' or 'output') x (dict): Channel info. """ assert direction in ['input', 'output'] for v in x['vars']: cls.add_extra_vars(direction, v)
[docs] @classmethod def write_model_wrapper(cls, model_file, model_function, inputs=[], outputs=[], model_flag=None, outputs_in_inputs=None, verbose=False, copies=1, iter_function_over=[], verbose_model=False, skip_update_io=False, model_name=None): r"""Return the lines required to wrap a model function as an integrated model. Args: model_file (str): Full path to the file containing the model function's declaration. model_function (str): Name of the model function. inputs (list, optional): List of model inputs including types. Defaults to []. outputs (list, optional): List of model outputs including types. Defaults to []. model_flag (dict, optional): Information about the flag that should be used to track the success of yggdrasil send/recv calls. This should only be provided if update_io_from_function has already been called. Defaults to None and is determined by update_io_from_function. outputs_in_inputs (bool, optional): If True, the outputs are presented in the function definition as inputs. Defaults to the class attribute outputs_in_inputs. verbose (bool, optional): If True, the contents of the created file are displayed. Defaults to False. copies (int, optional): Number of times the model driver is duplicated. If more than one, no error will be raised in the event there is never a call the the function. Defaults to 1. iter_function_over (array, optional): Variable(s) that should be received or sent as an array, but iterated over. Defaults to an empty array and is ignored. skip_update_io (bool, optional): If True, update_io_from_function will not be called. Defaults to False. verbose_model (bool, optional): If True, print statements will be added after every line in the model. Defaults to False. model_name (str, optional): Name given to the model. Defaults to None. Returns: list: Lines of code wrapping the provided model with the necessary code to run it as part of an integration. """ if outputs_in_inputs is None: outputs_in_inputs = cls.outputs_in_inputs # TODO: Determine how to encode dependencies on external variables in models if cls.function_param is None: raise NotImplementedError("function_param attribute not set for" "language '%s'" % cls.language) # Update types based on the function definition for typed languages if not skip_update_io: model_flag = cls.update_io_from_function( model_file, model_function, inputs=inputs, outputs=outputs, outputs_in_inputs=outputs_in_inputs, iter_function_over=iter_function_over) if isinstance(model_file, dict): model_file = model_file['model_file'] # Update types based on iteration iter_function_idx = None iter_ivars = [] iter_ovars = [] if iter_function_over: iter_function_idx = {'name': 'idx_func_iter', 'datatype': {'type': 'integer'}} if cls.zero_based: iter_function_idx['begin'] = int(0) else: iter_function_idx['begin'] = int(1) for x in inputs: iter_ivars += [v for v in x.get('vars', [x]) if v['name'] in iter_function_over] if not iter_ivars: # pragma: debug raise RuntimeError("The iter_function_over model " "parameter must include an input to " "iterate over. To expand output arrays " "into component elements, use the " "'iterate' transformation.") for x in outputs: iter_ovars += [v for v in x.get('vars', [x]) if v['name'] in iter_function_over] if iter_ivars[0].get('length_var', False): iter_function_idx['end'] = iter_ivars[0]['length_var'] for v in iter_ovars: v['length_var'] = iter_ivars[0]['length_var']['name'] if isinstance(iter_function_idx['end'], dict): iter_function_idx['end'] = iter_function_idx['end']['name'] else: iter_function_idx['end'] = cls.format_function_param( 'len', variable=iter_ivars[0]['name'], extra=iter_ivars[0]) for v in iter_ivars + iter_ovars: v['iter_var'] = iter_function_idx # Declare variables and flag, then define flag lines = [] flag_var = {'name': 'flag', 'datatype': {'type': 'flag'}} iter_var = {'name': 'first_iter', 'datatype': {'type': 'flag'}} free_vars = [] definitions = [] if 'declare' in cls.function_param: for x in inputs + outputs: lines += cls.write_channel_decl( x, definitions=definitions, requires_freeing=free_vars) lines += cls.write_declaration(flag_var, definitions=definitions, requires_freeing=free_vars) lines += cls.write_declaration(iter_var, definitions=definitions, requires_freeing=free_vars) if model_flag: lines += cls.write_declaration( model_flag, definitions=definitions, requires_freeing=free_vars) if iter_function_idx: lines += cls.write_declaration( iter_function_idx, definitions=definitions, requires_freeing=free_vars) for x in inputs + outputs: for v in x.get('vars', [x]): lines += cls.write_declaration( v, definitions=definitions, requires_freeing=free_vars) lines += definitions if 'python_init' in cls.function_param: # TODO: Check if python is used? lines += [cls.function_param['python_init'], ''] nline_preamble = len(lines) lines.append(cls.format_function_param( 'assign', name=flag_var['name'], value=cls.function_param.get( 'true_flag', cls.function_param['true']))) lines.append(cls.format_function_param( 'assign', name=iter_var['name'], value=cls.function_param.get( 'true_flag', cls.function_param['true']))) # Declare/define input and output channels for x in inputs: lines += cls.write_channel_def('input', requires_freeing=free_vars, **x) for x in outputs: lines += cls.write_channel_def('output', requires_freeing=free_vars, **x) # Receive inputs before loop for x in inputs: if x.get('outside_loop', False): lines += cls.write_model_recv(x['channel'], x, flag_var=flag_var) # Loop loop_lines = [] # Receive inputs any_loop_inputs = False loop_iter_var = iter_var if copies > 1: loop_iter_var = None for x in inputs: if not x.get('outside_loop', False): any_loop_inputs = True loop_lines += cls.write_model_recv(x['channel'], x, flag_var=flag_var, iter_var=loop_iter_var, allow_failure=True) # Prepare output array if iter_function_over: for v in iter_ivars: if v['name'] in iter_function_over: loop_lines += cls.write_finalize_iiter(v) for v in iter_ovars: if v['name'] in iter_function_over: loop_lines += cls.write_initialize_oiter(v) # Call model loop_lines += cls.write_model_function_call( model_function, model_flag, inputs, outputs, outputs_in_inputs=outputs_in_inputs, iter_function_idx=iter_function_idx) # Finalize output array if iter_function_over: for v in iter_ovars: if v['name'] in iter_function_over: loop_lines += cls.write_finalize_oiter(v) # Send outputs for x in outputs: if not x.get('outside_loop', False): loop_lines += cls.write_model_send(x['channel'], x, flag_var=flag_var) loop_lines.append(cls.format_function_param( 'assign', name=iter_var['name'], value=cls.function_param.get('false_flag', cls.function_param['false']))) # Add break if there are not any inputs inside the loop if not any_loop_inputs: loop_lines.append(cls.format_function_param( 'assign', name=flag_var['name'], value=cls.function_param.get( 'false_flag', cls.function_param['false']))) # Add loop in while block flag_cond = cls.format_function_param('flag_cond', default='{flag_var}', flag_var=flag_var['name']) lines += cls.write_while_loop(flag_cond, loop_lines) # Send outputs after loop for x in outputs: if x.get('outside_loop', False): lines += cls.write_model_send(x['channel'], x, flag_var=flag_var) # Free variables for x in free_vars: lines += cls.write_free(x) # Add prints if verbose_model: # pragma: debug idx = len(lines) - 1 while (idx > nline_preamble): if 'else' not in lines[idx]: indent = ' ' * (len(lines[idx]) - len(lines[idx].lstrip())) lines.insert(idx, indent + cls.format_function_param( 'print', message=("%s: line %d" % (model_file, idx)))) idx -= 1 # Wrap as executable with interface & model import prefix = None if 'interface' in cls.function_param: ygglib = cls.interface_library if ygglib in cls.internal_libraries: ygglib = cls.internal_libraries[ygglib]['source'] if cls.interface_inside_exec: lines.insert(0, cls.format_function_param( 'interface', interface_library=ygglib)) else: prefix = [cls.format_function_param( 'interface', interface_library=ygglib)] out = cls.write_executable(lines, prefix=prefix, model_name=model_name, imports={'filename': model_file, 'function': model_function}) if verbose: # pragma: debug'\n' + '\n'.join( [f"{i + 1:3}: {x}" for i, x in enumerate(out)])) else: logger.debug('\n' + '\n'.join( [f"{i + 1:3}: {x}" for i, x in enumerate(out)])) return out
[docs] @classmethod def write_channel_decl(cls, var, **kwargs): r"""Write a channel declaration. Args: var (dict): Information dictionary for the channel. being declared. **kwargs: Additional keyword arguments are passed to class's write_declaration. Returns: list: The lines declaring the variable. """ out = [] if not cls.dont_declare_channel: out = cls.write_declaration( {'name': var['channel'], 'type': 'comm'}, **kwargs) if (((var.get('datatype', None) is not None) and ('{channel_type}' in cls.function_param['input']))): var['channel_type'] = '%s_type' % var['channel'] out += cls.write_type_decl( var['channel_type'], var['datatype'], definitions=kwargs.get('definitions', None), requires_freeing=kwargs.get('requires_freeing', None)) return out
[docs] @classmethod def write_type_decl(cls, name, datatype, name_base=None, requires_freeing=None, definitions=None, no_decl=False): r"""Get lines declaring the datatype within the language. Args: name (str): Name of variable that should be declared. datatype (dict): Type definition. requires_freeing (list, optional): List that variables requiring freeing should be appended to. Defaults to None. definitions (list, optional): Existing list that variable definitions should be added to. Defaults to None if not provided and definitions will be included in the returned lines. no_decl (bool, optional): If True, the variable is not declared, but supporting variables will be. Defaults to False. Returns: list: Lines required to define a type declaration. """ out = [] if name_base is None: name_base = name # if datatype['type'] == 'array': # if 'items' in datatype: # assert isinstance(datatype['items'], list) # out += cls.write_declaration( # {'name': '%s_items' % name_base, # 'datatype': { # 'type': '1darray', 'subtype': 'dtype', # 'length': len(datatype['items'])}}, # definitions=definitions, # requires_freeing=requires_freeing) # for i, x in enumerate(datatype['items']): # # Prevent recusion # x_copy = copy.deepcopy(x) # x_copy.pop('items', None) # x_copy.pop('properties', None) # out += cls.write_type_decl( # None, x_copy, # name_base=('%s_item%d' % (name_base, i)), # definitions=definitions, # requires_freeing=requires_freeing, # no_decl=True) # elif datatype['type'] == 'object': # if 'properties' in datatype: # assert isinstance(datatype['properties'], dict) # precision = 0 # if datatype['properties']: # precision = max([len(k) for k in # datatype['properties'].keys()]) # precision = max(80, precision) # out += cls.write_declaration( # {'name': '%s_keys' % name_base, # 'datatype': { # 'type': '1darray', 'subtype': 'string', # 'length': len(datatype['properties']), # 'precision': precision}}, # definitions=definitions, # requires_freeing=requires_freeing) # out += cls.write_declaration( # {'name': '%s_vals' % name_base, # 'datatype': { # 'type': '1darray', 'subtype': 'dtype', # 'length': len(datatype['properties'])}}, # definitions=definitions, # requires_freeing=requires_freeing) # for i, (k, v) in enumerate(datatype['properties'].items()): # # Prevent recusion # v_copy = copy.deepcopy(v) # v_copy.pop('items', None) # v_copy.pop('properties', None) # out += cls.write_type_decl( # None, v_copy, # name_base=('%s_prop%d' % (name_base, i)), # requires_freeing=requires_freeing, # definitions=definitions, # no_decl=True) if datatype['type'] == 'ndarray': if 'shape' in datatype: out += cls.write_declaration( {'name': '%s_shape' % name_base, 'datatype': { 'type': '1darray', 'subtype': 'int', 'precision': rapidjson.SIZE_OF_SIZE_T, 'length': len(datatype['shape'])}}, definitions=definitions, requires_freeing=requires_freeing) elif datatype['type'] in (['ply', 'obj', '1darray', 'scalar', 'boolean', 'null', 'number', 'integer', 'string', 'class', 'function', 'instance', 'schema', 'any', 'array', 'object'] + list(constants.VALID_TYPES.keys())): pass else: # pragma: debug raise ValueError(("Cannot create %s version of type " "'%s'") % (cls.language, datatype['type'])) if not no_decl: out += cls.write_declaration( {'name': name, 'type': 'dtype'}) return out
[docs] @classmethod def write_type_def(cls, name, datatype, name_base=None, use_generic=False): r"""Get lines declaring the data type within the language. Args: name (str): Name of variable that definition should be stored in. datatype (dict): Type definition. use_generic (bool, optional): If True variables serialized and/or deserialized by the type will be assumed to be generic objects. Defaults to False. Returns: list: Lines required to define a type definition. """ out = [] fmt = None keys = {} if use_generic: keys['use_generic'] = cls.function_param['true'] else: keys['use_generic'] = cls.function_param['false'] assert 'init_type_from_schema' in cls.function_param datatype_ = copy.deepcopy(datatype) datatype_.pop('from_function', None) for k in ['precision', 'length', 'ndim', 'shape']: if k in datatype_ and not datatype_[k]: datatype_.pop(k) if datatype_.get('type', None) == '1darray': datatype_['type'] = 'ndarray' if 'length' in datatype_: datatype_['shape'] = [datatype_.pop('length')] if 'shape' not in datatype_: datatype_['ndim'] = 1 keys['schema'] = rapidjson.dumps(datatype_).replace( '"', cls.function_param.get('escaped_double_quote', '\\"')) fmt = cls.format_function_param( 'init_type_from_schema', **keys) out.append(cls.format_function_param('assign', name=name, value=fmt)) return out
[docs] @classmethod def write_channel_def(cls, key, datatype=None, **kwargs): r"""Write an channel definition. Args: key (str): Entry in cls.function_param that should be used. datatype (dict, optional): Data type associated with the channel. Defaults to None and is ignored. **kwargs: Additional keyword arguments are passed as parameters to format_function_param. Returns: list: Lines required to declare and define an output channel. """ out = [] if (datatype is not None) and ('{channel_type}' in cls.function_param[key]): kwargs['channel_type'] = '%s_type' % kwargs['channel'] out += cls.write_type_def( kwargs['channel_type'], datatype, use_generic=kwargs.get('use_generic', False)) dir_map = {'input': 'recv', 'output': 'send'} try_keys = [dir_map[key] + '_converter', 'transform'] try_vals = [] if all([bool(kwargs.get(k, False)) for k in try_keys]): # pragma: debug # TODO: Handling merger of the transforms in yaml or # remove the *_converter options entirely raise RuntimeError(("Transforms are specified in multiple " "locations for this input: %s") % str(try_keys)) for k in try_keys: if k in kwargs: v = kwargs[k] try_vals += v # This last transform is used because the others are assumed # to be applied by the connection driver if try_vals: transform_type = try_vals[-1] if isinstance(transform_type, dict): transform_type = transform_type.get('transformtype', None) try_key = f"{transform_type}_{key}" if ((isinstance(transform_type, str) and 'python_interface' in cls.function_param and try_key in cls.python_interface)): kwargs['python_interface'] = cls.python_interface[try_key] if ((('format_str' in kwargs) and ('python_interface_format' in cls.function_param))): key = 'python_interface_format' kwargs['format_str'] = kwargs['format_str'].encode( "unicode_escape").decode('utf-8') else: key = 'python_interface' out += [cls.format_function_param(key, **kwargs)] return out
[docs] @classmethod def write_model_function_call(cls, model_function, flag_var, inputs, outputs, outputs_in_inputs=None, on_failure=None, format_not_flag_cond=None, format_flag_cond=None, iter_function_idx=None): r"""Write lines necessary to call the model function. Args: model_function (str): Handle of the model function that should be called. flag_var (str): Name of variable that should be used as a flag. inputs (list): List of dictionaries describing inputs to the model. outputs (list): List of dictionaries describing outputs from the model. outputs_in_inputs (bool, optional): If True, the outputs are presented in the function definition as inputs. Defaults to the class attribute outputs_in_inputs. on_failure (list, optional): Lines to be executed if the model call fails. Defaults to an error message. This variable is only used if flag_var is not None and outputs_in_inputs is True. format_not_flag_cond (str, optional): Format string that produces a conditional expression that evaluates to False when the model flag indicates a failure. Defaults to None and the class's value for 'not_flag_cond' in function_param is used if it exists. If it does not exist, format_flag_cond is used. format_flag_cond (str, optional): Format string that produces a conditional expression that evaluates to True when the model flag indicates a success. Defaults to None and the defaults class's value for 'flag_cond' in function_param is used if it exists. If it does not exist, the flag is directly evaluated as if it were a boolean. iter_function_idx (dict, optional): Variable that serves as an index to iterate over variables. Defaults to None. Returns: list: Lines required to carry out a call to a model function in this language. """ if outputs_in_inputs is None: # pragma: debug outputs_in_inputs = cls.outputs_in_inputs func_inputs = cls.channels2vars(inputs) func_outputs = cls.channels2vars(outputs) if iter_function_idx: for src in [func_inputs, func_outputs]: for i, x in enumerate(src): if 'iter_datatype' in x: src[i] = dict( x, datatype=x['iter_datatype'], name=cls.format_function_param( 'index', variable=x['name'], index=iter_function_idx['name'], extra=x), length_var=False) if isinstance(flag_var, dict): flag_var = flag_var['name'] out = cls.write_function_call( model_function, inputs=func_inputs, outputs=func_outputs, flag_var=flag_var, outputs_in_inputs=outputs_in_inputs) if flag_var and outputs_in_inputs: if (not format_flag_cond) and ('not_flag_cond' in cls.function_param): flag_cond = cls.format_function_param( 'not_flag_cond', flag_var=flag_var, replacement=format_not_flag_cond) else: # pragma: debug # flag_cond = '%s (%s)' % ( # cls.function_param['not'], # cls.format_function_param( # 'flag_cond', default='{flag_var}', flag_var=flag_var, # replacement=format_flag_cond)) raise RuntimeError("Untested code below. Uncomment " "at your own risk if you find " "use case for it.") if on_failure is None: on_failure = [cls.format_function_param( 'error', error_msg="Model call failed.")] out += cls.write_if_block(flag_cond, on_failure) if iter_function_idx: out = cls.write_for_loop(iter_function_idx['name'], iter_function_idx['begin'], iter_function_idx['end'], out) return out
[docs] @classmethod def write_model_recv(cls, channel, recv_var, flag_var='flag', iter_var=None, allow_failure=False, alt_recv_function=None, include_arg_count=None): r"""Write a model receive call include checking the return flag. Args: channel (str): Name of variable that the channel being received from was stored in. recv_var (dict, list): Information of one or more variables that receieved information should be stored in. flag_var (str, optional): Name of flag variable that the flag should be stored in. Defaults to 'flag', iter_var (str, optional): Name of flag signifying when the model is in it's first iteration. If allow_failure is True and iter_var is provided, an error will be raised if iter_var is True. Defaults to None. allow_failure (bool, optional): If True, the returned lines will call a break if the flag is False. Otherwise, the returned lines will issue an error. Defaults to False. alt_recv_function (str, optional): Alternate receive function format string. Defaults to None and is ignored. include_arg_count (bool, optional): If True, the arguments to the receive call will be proceeded with an argument count. If not provided, the class attribute of the same name will be used. Returns: list: Lines required to carry out a receive call in this language. """ if cls.function_param is None: raise NotImplementedError("function_param attribute not set for" "language '%s'" % cls.language) if include_arg_count is None: include_arg_count = cls.include_arg_count recv_var_str = recv_var if not isinstance(recv_var, str): recv_var_par = cls.channels2vars(recv_var) recv_var_str = cls.prepare_output_variables( recv_var_par, in_inputs=cls.outputs_in_inputs, for_yggdrasil=True) else: recv_var_par = cls.split_variables(recv_var_str) expanded_recv_var = None if isinstance(flag_var, dict): flag_var = flag_var['name'] if isinstance(iter_var, dict): iter_var = iter_var['name'] if cls.outputs_in_inputs: inputs = [recv_var_str] outputs = [flag_var] else: if (len(recv_var_par) > 1): # and ('multiple_outputs' in cls.function_param): expanded_recv_var = recv_var_str recv_var_str = 'temp_%s' % recv_var_par[0]['name'] inputs = [] outputs = [flag_var, recv_var_str] if cls.include_channel_obj: inputs.insert(0, channel) lines = cls.write_function_call( cls.format_function_param('recv_function', channel=channel, replacement=alt_recv_function), inputs=inputs, outputs=outputs, include_arg_count=include_arg_count) if 'not_flag_cond' in cls.function_param: flag_cond = cls.format_function_param('not_flag_cond', flag_var=flag_var) else: flag_cond = '%s (%s)' % ( cls.function_param['not'], cls.format_function_param('flag_cond', default='{flag_var}', flag_var=flag_var)) fail_message = cls.escape_quotes( "Could not receive %s." % recv_var_str) if allow_failure: fail_message = cls.escape_quotes( 'End of input from %s.' % recv_var_str) if_block = [cls.format_function_param('print', message=fail_message), cls.function_param.get('break', 'break')] if iter_var is not None: if_block = cls.write_if_block( iter_var, [cls.format_function_param( 'error', error_msg=cls.escape_quotes( 'No input from %s.' % recv_var_str))], if_block) else: if_block = [cls.format_function_param('error', error_msg=fail_message)] lines += cls.write_if_block(flag_cond, if_block) # Check if single element should be expanded if expanded_recv_var: # lines.append(cls.format_function_param( # 'print_generic', object=recv_var_str)) if 'expand_mult' in cls.function_param: # pragma: matlab lines.append(cls.format_function_param( 'expand_mult', name=expanded_recv_var, value=recv_var_str)) elif 'assign_mult' in cls.function_param: lines.append(cls.format_function_param( 'assign_mult', name=expanded_recv_var, value=recv_var_str)) else: lines.append(cls.format_function_param( 'assign', name=expanded_recv_var, value=recv_var_str)) elif len(recv_var_par) == 1: lines += cls.write_expand_single_element(recv_var_str) return lines
[docs] @classmethod def write_model_send(cls, channel, send_var, flag_var='flag', allow_failure=False, alt_send_function=None, include_arg_count=None): r"""Write a model send call include checking the return flag. Args: channel (str): Name of variable that the channel being sent to was stored in. send_var (dict, list): Information on one or more variables containing information that will be sent. flag_var (str, optional): Name of flag variable that the flag should be stored in. Defaults to 'flag', allow_failure (bool, optional): If True, the returned lines will call a break if the flag is False. Otherwise, the returned lines will issue an error. Defaults to False. alt_send_function (str, optional): Alternate send function format string. Defaults to None and is ignored. include_arg_count (bool, optional): If True, the arguments to the send call will be proceeded with an argument count. If not provided, the class attribute of the same name will be used. Returns: list: Lines required to carry out a send call in this language. """ if cls.function_param is None: raise NotImplementedError("function_param attribute not set for" "language '%s'" % cls.language) if include_arg_count is None: include_arg_count = cls.include_arg_count send_var_str = send_var if not isinstance(send_var_str, str): send_var_par = cls.channels2vars(send_var) send_var_str = cls.prepare_input_variables( send_var_par, for_yggdrasil=True) if isinstance(flag_var, dict): flag_var = flag_var['name'] if cls.include_channel_obj: send_var_str = [channel, send_var_str] lines = cls.write_function_call( cls.format_function_param('send_function', channel=channel, replacement=alt_send_function), inputs=send_var_str, outputs=flag_var, include_arg_count=include_arg_count) flag_cond = '%s (%s)' % ( cls.function_param['not'], cls.format_function_param('flag_cond', default='{flag_var}', flag_var=flag_var)) fail_message = cls.escape_quotes( "Could not send %s." % send_var_str) if allow_failure: # pragma: no cover # This is not particularly useful, but is included for completion if_block = [cls.format_function_param('print', message=fail_message), cls.function_param.get('break', 'break')] else: if_block = [cls.format_function_param('error', error_msg=fail_message)] lines += cls.write_if_block(flag_cond, if_block) return lines
[docs] @classmethod def write_print_var(cls, var, prefix_msg=None): r"""Get the lines necessary to print a variable in this language. Args: var (dict): Variable information. prefix_msg (str, optional): Message that should be printed before the variable. Defaults to None and is ignored. Returns: list: Lines printing the specified variable. """ out = [] print_key = None varname = var if isinstance(var, dict): varname = var['name'] typename = var.get( 'datatype', {'type': var.get('type', None)}).get('type', None) if ('print_%s' % typename) in cls.function_param: print_key = ('print_%s' % typename) elif 'print_generic' in cls.function_param: print_key = 'print_generic' elif 'print_generic' in cls.function_param: print_key = 'print_generic' if print_key: if prefix_msg is not None: out.append(cls.format_function_param( 'print', message=prefix_msg)) out += [cls.format_function_param( print_key, object=varname)] return out
[docs] @classmethod def write_print_input_var(cls, var, **kwargs): r"""Get the lines necessary to print an input variable in this language. Args: var (dict): Variable information. **kwargs: Additional keyword arguments are passed to write_print_var. Returns: list: Lines printing the specified variable. """ return cls.write_print_var(var, **kwargs)
[docs] @classmethod def write_print_output_var(cls, var, in_inputs=False, **kwargs): r"""Get the lines necessary to print an output variable in this language. Args: var (dict): Variable information. in_inputs (bool, optional): If True, the output variable is passed in as an input variable to be populated. Defaults to False. **kwargs: Additional keyword arguments are passed to write_print_var. Returns: list: Lines printing the specified variable. """ return cls.write_print_var(var, **kwargs)
[docs] @classmethod def write_function_def(cls, function_name, inputs=[], outputs=[], input_var=None, output_var=None, function_contents=[], outputs_in_inputs=False, opening_msg=None, closing_msg=None, print_inputs=False, print_outputs=False, skip_interface=False, function_keys=None, verbose=False, **kwargs): r"""Write a function definition. Args: function_name (str): Name fo the function being defined. inputs (list, optional): List of inputs to the function. Defaults to []. Ignored if input_var provided. outputs (list, optional): List of outputs from the function. Defaults to []. If not provided, no return call is added to the function body. Ignored if output_var provided. input_var (str, optional): Full string specifying input in the function definition. If not provided, this will be created based on the contents of the inputs variable. output_var (str, optional): Full string specifying output in the function definition. If not provided, this will be created based on the contents of the outputs variable. function_contents (list, optional): List of lines comprising the body of the function. Defaults to []. outputs_in_inputs (bool, optional): If True, the outputs are presented in the function definition as inputs. Defaults to False. opening_msg (str, optional): String that should be printed before the function contents (and inputs if print_inputs is True). Defaults to None and is ignored. closing_msg (str, optional): String that should be printed after the function contents (and outputs if print_outputs is True). Defaults to None and is ignored. print_inputs (bool, optional): If True, the input variables will be printed before the function contents. Defaults to False. print_outputs (bool, optional): If True, the output variables will be printed after the function contents. Defaults to False. skip_interface (bool, optional): If True, the line including the interface will be skipped. Defaults to False. function_keys (tuple, optional): 2 element tuple that specifies the keys for the function_param entries that should be used to begin & end a function definition. Defaults to ('function_def_begin', function_def_end'). verbose (bool, optional): If True, the contents of the created file are displayed. Defaults to False. **kwargs: Additional keyword arguments are passed to cls.format_function_param. Returns: list: Lines completing the function call. Raises: NotImplementedError: If the function_param attribute for the class is not defined. """ if cls.function_param is None: raise NotImplementedError("function_param attribute not set for" "language '%s'" % cls.language) if function_keys is None: function_keys = ('function_def_begin', 'function_def_end') out = [] interface_lines = [] if ('interface' in cls.function_param) and (not skip_interface): ygglib = cls.interface_library if ygglib in cls.internal_libraries: ygglib = cls.internal_libraries[ygglib]['source'] interface_lines.append(cls.format_function_param( 'interface', interface_library=ygglib)) if not cls.interface_inside_exec: out += interface_lines flag_var = {} if input_var is None: input_var = cls.prepare_input_variables( inputs, in_definition=True) if output_var is None: output_var = cls.prepare_output_variables( outputs, in_inputs=outputs_in_inputs, in_definition=True) print_input_lines = [] if print_inputs and inputs: for x in inputs: print_input_lines += cls.write_print_input_var( x, prefix_msg=('INPUT[%s]:' % x['name'])) print_output_lines = [] if print_outputs and outputs: for x in outputs: print_output_lines += cls.write_print_output_var( x, prefix_msg=('OUTPUT[%s]:' % x['name']), in_inputs=outputs_in_inputs) old_outputs = [] if outputs_in_inputs: if output_var: input_var = cls.prepare_input_variables( [input_var, output_var]) flag_var = kwargs.get('flag_var', 'flag') if isinstance(flag_var, str): flag_var = {'name': flag_var} flag_var.setdefault('datatype', 'flag') flag_var.setdefault('value', cls.function_param.get( 'true_flag', cls.function_param['true'])) old_outputs = outputs outputs = [flag_var] output_var = cls.prepare_output_variables(outputs) out.append(cls.format_function_param( function_keys[0], function_name=function_name, input_var=input_var, output_var=output_var, **kwargs)) if cls.interface_inside_exec: out += [cls.function_param['indent'] + x for x in interface_lines] free_vars = [] if 'declare' in cls.function_param: definitions = [] if not cls.types_in_funcdef: for o in (inputs + old_outputs): out += [cls.function_param['indent'] + x for x in cls.write_declaration( o, definitions=definitions, requires_freeing=free_vars, is_argument=True)] for o in outputs: out += [cls.function_param['indent'] + x for x in cls.write_declaration( o, definitions=definitions, requires_freeing=free_vars)] out += [cls.function_param['indent'] + x for x in definitions] if outputs_in_inputs: out.append(cls.function_param['indent'] + cls.format_function_param( 'assign', **flag_var)) if opening_msg: out.append(cls.function_param['indent'] + cls.format_function_param( 'print', message=opening_msg)) if print_inputs: for x in print_input_lines: out.append(cls.function_param['indent'] + x) for x in function_contents: out.append(cls.function_param['indent'] + x) if print_outputs: for x in print_output_lines: out.append(cls.function_param['indent'] + x) if closing_msg: out.append(cls.function_param['indent'] + cls.format_function_param( 'print', message=closing_msg)) # This is not currently used by the tests, but may be # needed in the future assert not free_vars # for x in free_vars: # out += [cls.function_param['indent'] + line # for line in cls.write_free(x)] if output_var and ('return' in cls.function_param): out.append(cls.function_param['indent'] + cls.format_function_param( 'return', output_var=output_var)) if function_keys[1] in cls.function_param: out.append(cls.format_function_param( function_keys[1], function_name=function_name)) else: out.append(cls.function_param.get('block_end', '')) if verbose: # pragma: debug'\n' + '\n'.join(out)) else: logger.debug('\n' + '\n'.join(out)) return out
[docs] @classmethod def write_function_call(cls, function_name, inputs=[], outputs=[], include_arg_count=False, outputs_in_inputs=False, **kwargs): r"""Write a function call. Args: function_name (str): Name of the function being called. inputs (list, optional): List of inputs to the function. Defaults to []. outputs (list, optional): List of outputs from the function. Defaults to []. include_arg_count (bool, optional): If True, the count of input arguments is included as the first argument. Defaults to False. outputs_in_inputs (bool, optional): If True, the outputs are presented in the function definition as inputs. Defaults to False. **kwargs: Additional keyword arguments are passed to cls.format_function_param. Returns: list: Lines completing the function call. """ if outputs_in_inputs: inputs = inputs + [cls.prepare_output_variables( outputs, in_inputs=outputs_in_inputs)] flag_var = kwargs.get('flag_var', None) if (flag_var is None) and ('function_call_noout' not in cls.function_param): flag_var = 'flag' outputs = [] if flag_var: outputs.append(flag_var) kwargs.setdefault('input_var', cls.prepare_input_variables(inputs)) kwargs.setdefault('output_var', cls.prepare_output_variables(outputs)) nout = max(len(outputs), len(cls.split_variables(kwargs['output_var']))) if include_arg_count: narg = len(cls.split_variables(kwargs['input_var'])) kwargs['input_var'] = cls.prepare_input_variables( [str(narg), kwargs['input_var']]) if (nout == 0) and ('function_call_noout' in cls.function_param): call_str = cls.format_function_param( 'function_call_noout', function_name=function_name, **kwargs) else: call_str = cls.format_function_param( 'function_call', default='{function_name}({input_var})', function_name=function_name, **kwargs) if nout == 0: out = [call_str + cls.function_param.get('line_end', '')] elif (nout > 1) and ('assign_mult' in cls.function_param): out = [cls.format_function_param( 'assign_mult', name=kwargs['output_var'], value=call_str)] else: out = [cls.format_function_param( 'assign', name=kwargs['output_var'], value=call_str)] return out
[docs] @classmethod def write_executable_import(cls, model_name=None, **kwargs): r"""Add import statements to executable lines. Args: **kwargs: Keyword arguments for import statement. Returns: list: Lines required to complete the import. """ # This code is currently unused, but may be needed in the # future to import a dependency directly # if ('filename' not in kwargs) and ('import_nofile' in cls.function_param): # key = 'import_nofile' # else: # key = 'import' # return [cls.format_function_param(key, **kwargs)] out = [] if 'import' in cls.function_param: out = [cls.format_function_param('import', **kwargs)] return out
[docs] @classmethod def write_executable(cls, lines, prefix=None, suffix=None, function_definitions=None, imports=None, model_name=None): r"""Return the lines required to complete a program that will run the provided lines. Args: lines (list): Lines of code to be wrapped as an executable. prefix (list, optional): Lines of code that should proceed the wrapped code. Defaults to None and is ignored. (e.g. C/C++ include statements). suffix (list, optional): Lines of code that should follow the wrapped code. Defaults to None and is ignored. function_definitions (list, optional): Lines of code defining functions that will beused by the code contained in lines. Defaults to None and is ignored. imports (list, optional): Kwargs for packages that should be imported for use by the executable. Defaults to None and is ignored. model_name (str, optional): Name given to the model. Defaults to None. Returns: lines: Lines of code wrapping the provided lines with the necessary code to run it as an executable (e.g. C/C++'s main). """ if cls.function_param is None: raise NotImplementedError("function_param attribute not set for" "language '%s'" % cls.language) out = [] # Add imports if imports is not None: if not isinstance(imports, list): imports = [imports] import_lines = [] for kws in imports: import_lines += cls.write_executable_import(**kws) if prefix is None: prefix = [] prefix += import_lines # Add standard & user defined prefixes if ((('exec_prefix' in cls.function_param) and (cls.function_param['exec_prefix'] not in lines))): out.append(cls.function_param['exec_prefix']) out.append('') if prefix is not None: if not isinstance(prefix, (list, tuple)): prefix = [prefix] out += prefix out.append('') if (((not cls.function_param.get('functions_defined_last', False)) and (function_definitions is not None))): out += function_definitions out.append('') # Add code with begin/end book ends if ((('exec_begin' in cls.function_param) and (cls.function_param['exec_begin'] not in '\n'.join(lines)))): out.append(cls.function_param['exec_begin']) if not isinstance(lines, (list, tuple)): lines = [lines] for x in lines: out.append(cls.function_param['indent'] + x) out.append(cls.function_param.get('exec_end', cls.function_param.get( 'block_end', ''))) else: out += lines if out[-1]: out.append('') # Add standard & user defined suffixes if suffix is not None: if not isinstance(suffix, (list, tuple)): suffix = [suffix] out += suffix out.append('') if ((('exec_suffix' in cls.function_param) and (cls.function_param['exec_suffix'] not in lines))): out.append(cls.function_param['exec_suffix']) out.append('') if (((cls.function_param.get('functions_defined_last', False)) and (function_definitions is not None))): # pragma: matlab out += function_definitions out.append('') if cls.max_line_width: new_out = [] for iout in out: new_out += cls.split_line(iout) out = new_out return out
[docs] @classmethod def escape_quotes(cls, x): r"""Escape quotes in a string. Args: x (str): String to escape quotes in. Returns: str: x with escaped quotes. """ out = x.replace('"', '\\\"') out = out.replace("'", "\\\'") return out
[docs] @classmethod def split_line(cls, line, length=None, force_split=False): r"""Split a line as close to (or before) a given character as possible. Args: line (str): Line to split. length (int, optional): Maximum length of split lines. Defaults to cls.max_line_width if not provided. force_split (bool, optional): If True, force a split to occur at the specified length. Defauts to False. Returns: list: Set of lines resulting from spliting the provided line. """ out = [] if not line.lstrip(): return [line] nindent = line.index(line.lstrip()[0]) block_end = cls.function_param['block_end'].lower() if '\n' in line: out = line.split('\n') for i in range(1, len(out)): if out[i].lstrip().lower().startswith(block_end): nindent -= len(cls.function_param['indent']) out[i] = (nindent * ' ') + out[i] new_out = [] for x in out: new_out += cls.split_line(x, length=length, force_split=force_split) return new_out if length is None: length = cls.max_line_width if (length is None) or (len(line) < length): return [line] length_allow = (length - len(cls.function_param.get( 'continuation_before', ''))) if force_split: isplit = length_allow else: char_break = cls.function_param.get('continuation_break', ' ') for c in char_break: if c in line[:length_allow]: isplit = line[:length_allow].rindex(c) + 1 break else: # pragma: debug isplit = len(line) if (isplit < nindent + 1) or (isplit >= len(line)): out = [line] else: out.append(line[:isplit] + cls.function_param.get( 'continuation_before', '')) out += cls.split_line( ((nindent * ' ') + cls.function_param.get( 'continuation_after', '') + line[isplit:]), length=length, force_split=force_split) return out
[docs] @classmethod def input2output(cls, var): r"""Perform conversion necessary to turn a variable extracted from a function definition from an input to an output. Args: var (dict): Variable definition. Returns: dict: Updated variable definition. """ return var
[docs] @classmethod def output2input(cls, var, in_definition=True): r"""Perform conversion necessary to turn an output variable into an corresponding input that can be used to format a function definition. Args: var (dict): Variable definition. in_definition (bool, optional): If True, the returned dictionary corresponds to an input variable in a function definition. If False, the returned value will correspond to an input to a function. Defaults to True. Returns: dict: Updated variable definition. """ return var
[docs] @classmethod def get_native_type(cls, return_json=False, **kwargs): r"""Get the native type. Args: return_json (bool, optional): If True, the returned value will also include the JSON datatype. **kwargs: Additional keyword arguments may be used in determining the precise declaration that should be used. Returns: str: The native type. """ if 'native_type' in kwargs: out = kwargs['native_type'] if return_json: out = (out, kwargs.get('json_type', kwargs)) return out assert 'json_type' not in kwargs json_type = copy.deepcopy(kwargs.get('datatype', kwargs)) if isinstance(json_type, dict): type_name = json_type.get('type', 'bytes') else: type_name = json_type json_type = kwargs if type_name == 'scalar': type_name = json_type['subtype'] if type_name == 'string': type_name = 'unicode' if json_type.get('encoding', 'ASCII') == 'ASCII': type_name = 'bytes' if (type_name == 'flag') and (type_name not in cls.type_map): type_name = 'boolean' if ((kwargs.get('is_length_var', False) and 'length' in cls.type_map and type_name != '1darray')): type_name = 'length' if return_json: json_type.setdefault('type', type_name) if not (any(x in ['comm', 'dtype', '1darray_pointer', 'ndarray_pointer', 'flag'] for x in [json_type.get('subtype', None), json_type['type']])): length = json_type.pop('length', None) if json_type.get('type', None) != 'length': try: json_type = rapidjson.normalize( json_type, {'type': 'schema'}) except rapidjson.NormalizationError: # pragma: debug print(f"Invalid schema: {json_type}") raise if length is not None: json_type['length'] = length return cls.type_map[type_name], json_type return cls.type_map[type_name]
[docs] @classmethod def get_json_type(cls, native_type): r"""Get the JSON type from the native language type. Args: native_type (str): The native language type. Returns: str, dict: The JSON type. """ return cls.get_inverse_type_map()[native_type]
[docs] @classmethod def write_finalize_iiter(cls, var): r"""Get the lines necessary to finalize an input array for iteration. Args: var (dict, str): Name or information dictionary for the variable finalized. Returns: list: The lines finalizing the variable. """ return []
[docs] @classmethod def write_initialize_oiter(cls, var, value=None, requires_freeing=None): r"""Get the lines necessary to initialize an array for iteration output. Args: var (dict, str): Name or information dictionary for the variable being initialized. value (str, optional): Value that should be assigned to the variable. requires_freeing (list, optional): Existing list that variables requiring freeing should be appended to. Defaults to None and is ignored. Returns: list: The lines initializing the variable. """ return cls.write_initialize(var, value=value, requires_freeing=requires_freeing)
[docs] @classmethod def write_finalize_oiter(cls, var, value=None, requires_freeing=None): r"""Get the lines necessary to finalize an array after iteration. Args: var (dict, str): Name or information dictionary for the variable being initialized. value (str, optional): Value that should be assigned to the variable. requires_freeing (list, optional): Existing list of variables requiring freeing. Defaults to None and is ignored. Returns: list: The lines finalizing the variable. """ return []
[docs] @classmethod def write_initialize(cls, var, value=None, requires_freeing=None): r"""Get the code necessary to initialize a variable. Args: var (dict, str): Name or information dictionary for the variable being declared. value (str, optional): Value that should be assigned to the variable after it is declared. requires_freeing (list, optional): Existing list that variables requiring freeing should be appended to. Defaults to None and is ignored. Returns: list: The lines initializing the variable. """ out = [] if isinstance(var, str): # pragma: no cover var = {'name': var} if (value is None) and isinstance(var.get('datatype', False), dict): init_type = 'init_%s' % var['datatype']['type'] free_type = 'free_%s' % var['datatype']['type'] if init_type in cls.function_param: assert free_type in cls.function_param # value = cls.format_function_param(init_type, **var['datatype']) value = cls.function_param[init_type] if requires_freeing is not None: requires_freeing.append(var) if value is not None: out.append(cls.format_function_param( 'assign', name=var['name'], value=value)) return out
[docs] @classmethod def write_declaration(cls, var, value=None, requires_freeing=None, definitions=None, is_argument=False): r"""Return the lines required to declare a variable with a certain type. Args: var (dict, str): Name or information dictionary for the variable being declared. value (str, optional): Value that should be assigned to the variable after it is declared. requires_freeing (list, optional): Existing list that variables requiring freeing should be appended to. Defaults to None and is ignored. definitions (list, optional): Existing list that variable definitions should be added to. Defaults to None if not provided and definitions will be included in the returned lines. dont_define (bool, optional): If True, the variable will not be defined. Defaults to False. is_argument (bool, optional): If True, the variable being declared is an input argument. Defaults to False. Returns: list: The lines declaring the variable. """ if isinstance(var, str): # pragma: no cover var = {'name': var} type_name = cls.get_native_type(**var) out = [cls.format_function_param( 'declare', type_name=type_name, variable=cls.get_name_declare(var))] if var.get('extra_vars', {}): for v in var['extra_vars'].values(): out += cls.write_declaration( v, requires_freeing=requires_freeing, definitions=definitions, is_argument=is_argument) if is_argument: return out if definitions is None: definitions = out definitions += cls.write_initialize( var, value=value, requires_freeing=requires_freeing) return out
[docs] @classmethod def get_name_declare(cls, var): r"""Determine the name that should be used for declaration. Args: var (str, dict): Name of variable or dictionary of information. Returns: str: Modified name for declaration. """ if isinstance(var, str): # pragma: no cover return var assert isinstance(var, dict) out = var['name'] return out
[docs] @classmethod def write_free(cls, var, **kwargs): r"""Return the lines required to free a variable with a certain type. Args: var (dict, str): Name or information dictionary for the variable being declared. **kwargs: Additional keyword arguments are passed to format_function_param. Returns: list: The lines freeing the variable. """ if isinstance(var, str): # pragma: no cover var = {'name': var} out = [] if not var.get('dont_free', False): if ((isinstance(var.get('datatype', False), dict) and (('free_%s' % var['datatype']['type']) in cls.function_param))): out = [cls.format_function_param( 'free_%s' % var['datatype']['type'], variable=var['name'], **kwargs)] else: out = [cls.format_function_param( 'free', variable=var['name'], **kwargs)] return out
[docs] @classmethod def write_assign_to_output(cls, dst_var, src_var, copy=False, outputs_in_inputs=False, **kwargs): r"""Write lines assigning a value to an output variable. Args: dst_var (str, dict): Name or information dictionary for variable being assigned to. src_var (str, dict): Name or information dictionary for value being assigned to dst_var. copy (bool, optional): If True, the assigned value is copied during assignment. Defaults to False. outputs_in_inputs (bool, optional): If True, outputs are passed as input parameters. In some languages, this means that a pointer or reference is passed (e.g. C) and so the assignment should be to the memory indicated rather than the variable. Defaults to False. Returns: list: Lines achieving assignment. """ datatype = None if isinstance(dst_var, dict): kwargs['name'] = dst_var['name'] datatype = dst_var['datatype'] else: kwargs['name'] = dst_var if isinstance(src_var, dict): kwargs['value'] = src_var['name'] datatype = src_var['datatype'] else: kwargs['value'] = src_var if ((outputs_in_inputs and isinstance(dst_var, dict) and isinstance(dst_var['datatype'], dict) and ('copy_' + dst_var['datatype']['type'] in cls.function_param))): copy = True if copy: if ((isinstance(datatype, dict) and ('copy_' + datatype['type'] in cls.function_param))): return [cls.format_function_param( 'copy_' + datatype['type'], **kwargs)] else: return [cls.format_function_param('assign_copy', **kwargs)] else: return [cls.format_function_param('assign', **kwargs)]
[docs] @classmethod def write_expand_single_element(cls, output_var, add_cond=False): r"""Write lines allowing extraction of the only element from a single element array as a stand-alone variable if the variable is an array and only has one element. Args: output_var (str): Name of the variable that should be conditionally expanded. add_cond (list, optional): Additional conditions that must be satisfied for the array element to be extracted. Defaults to False and is ignored. Returns: list: Lines added the conditional expansion of single element arrays. """ if 'istype' not in cls.function_param: return [] cond = ('(%s) %s (%s %s 1)' % ( cls.format_function_param('istype', variable=output_var, type=cls.type_map['array']), cls.function_param.get('and', '&&'), cls.format_function_param('len', variable=output_var), cls.function_param.get('equ', '=='))) if add_cond: for x in add_cond: cond += f" {cls.function_param.get('and', '&&')} {x}" out = cls.write_if_block( cond, cls.format_function_param( 'assign', name=output_var, value=cls.format_function_param( 'index', variable=output_var, index=int(cls.function_param.get('first_index', 0))))) return out
[docs] @classmethod def split_variables(cls, var_str): r"""Split variable string include individual variables. Args: var_str (str): String containing multiple variables. Returns: list: Split variables. """ out = [] if var_str: pairs = [('[', ']'), ('(', ')'), ('{', '}'), ('<', '>'), ("'", "'"), ('"', '"')] out = [x.strip() for x in var_str.split(',')] i = 0 while i < len(out): for p in pairs: if out[i].count(p[0]) != out[i].count(p[1]): out[i] += ", " + out[i + 1] del out[i + 1] break else: i += 1 return out
[docs] @classmethod def prepare_variables(cls, vars_list, in_definition=False, for_yggdrasil=False): r"""Concatenate a set of input variables such that it can be passed as a single string to the function_call parameter. Args: vars_list (list): List of variable dictionaries containing info (e.g. names) that should be used to prepare a string representing input/output to/from a function call. in_definition (bool, optional): If True, the returned sequence will be of the format required for specifying variables in a function definition. Defaults to False. for_yggdrasil (bool, optional): If True, the variables will be prepared in the formated expected by calls to yggdarsil send/recv methods. Defaults to False. Returns: str: Concatentated variables list. """ name_list = [] if not isinstance(vars_list, list): vars_list = [vars_list] for x in vars_list: if isinstance(x, str): name_list.append(x) else: assert isinstance(x, dict) name_list.append(x['name']) return ', '.join(name_list)
[docs] @classmethod def prepare_input_variables(cls, vars_list, in_definition=False, for_yggdrasil=False): r"""Concatenate a set of input variables such that it can be passed as a single string to the function_call parameter. Args: vars_list (list): List of variable dictionaries containing info (e.g. names) that should be used to prepare a string representing input to a function call. in_definition (bool, optional): If True, the returned sequence will be of the format required for specifying input variables in a function definition. Defaults to False. for_yggdrasil (bool, optional): If True, the variables will be prepared in the formated expected by calls to yggdarsil send/recv methods. Defaults to False. Returns: str: Concatentated variables list. """ return cls.prepare_variables(vars_list, in_definition=in_definition, for_yggdrasil=for_yggdrasil)
[docs] @classmethod def prepare_output_variables(cls, vars_list, in_definition=False, in_inputs=False, for_yggdrasil=False): r"""Concatenate a set of output variables such that it can be passed as a single string to the function_call parameter. Args: vars_list (list): List of variable dictionaries containing info (e.g. names) that should be used to prepare a string representing output from a function call. in_definition (bool, optional): If True, the returned sequence will be of the format required for specifying output variables in a function definition. Defaults to False. in_inputs (bool, optional): If True, the output variables should be formated to be included as input variables. Defaults to False. for_yggdrasil (bool, optional): If True, the variables will be prepared in the formated expected by calls to yggdarsil send/recv methods. Defaults to False. Returns: str: Concatentated variables list. """ if in_inputs: vars_list = [cls.output2input(x, in_definition=in_definition) for x in vars_list] out = cls.prepare_variables(vars_list, in_definition=in_definition, for_yggdrasil=for_yggdrasil) if isinstance(vars_list, list) and (len(vars_list) > 1): if in_definition and ('multiple_outputs_def' in cls.function_param): out = cls.format_function_param('multiple_outputs_def', outputs=out) elif 'multiple_outputs' in cls.function_param: out = cls.format_function_param('multiple_outputs', outputs=out) return out
[docs] @classmethod def write_if_block(cls, cond, block_contents, else_block_contents=False): r"""Return the lines required to complete a conditional block. Args: cond (str): Conditional that should determine block execution. block_contents (list): Lines of code that should be executed inside the block. else_block_contents (list, optional): Lines of code that should be executed inside the else clause of the block. Defaults to False if not provided and an else clause is omitted. Returns: list: Lines of code performing conditional execution of a block. """ if cls.function_param is None: raise NotImplementedError("function_param attribute not set for" "language '%s'" % cls.language) out = [] if not isinstance(cond, list): cond = [cond] block_contents = [block_contents] assert len(cond) == len(block_contents) for i, (icond, iblock_contents) in enumerate(zip(cond, block_contents)): if i == 0: out.append(cls.format_function_param('if_begin', cond=icond)) else: out.append(cls.format_function_param('if_elif', cond=icond)) if not isinstance(iblock_contents, (list, tuple)): iblock_contents = [iblock_contents] for x in iblock_contents: out.append(cls.function_param['indent'] + x) if else_block_contents: out.append(cls.format_function_param('if_else')) if not isinstance(else_block_contents, (list, tuple)): else_block_contents = [else_block_contents] for x in else_block_contents: out.append(cls.function_param['indent'] + x) # Close block out.append(cls.function_param.get('if_end', cls.function_param.get( 'block_end', ''))) return out
[docs] @classmethod def write_for_loop(cls, iter_var, iter_begin, iter_end, loop_contents): r"""Return the lines required to complete a for loop. Args: iter_var (str): Name of variable that iterator should use. iter_begin (int): Beginning of iteration. iter_end (int): End of iteration. loop_contents (list): Lines of code that should be executed inside the loop. Returns: list: Lines of code performing a loop. """ if cls.function_param is None: raise NotImplementedError("function_param attribute not set for" "language '%s'" % cls.language) out = [] # Opening for statement line out.append(cls.format_function_param('for_begin', iter_var=iter_var, iter_begin=iter_begin, iter_end=iter_end)) # Indent loop contents if not isinstance(loop_contents, (list, tuple)): loop_contents = [loop_contents] for x in loop_contents: out.append(cls.function_param['indent'] + x) # Close block out.append(cls.function_param.get('for_end', cls.function_param.get( 'block_end', ''))) return out
[docs] @classmethod def write_while_loop(cls, cond, loop_contents): r"""Return the lines required to complete a for loop. Args: cond (str): Conditional that should determine loop execution. loop_contents (list): Lines of code that should be executed inside the loop. Returns: list: Lines of code performing a loop. """ if cls.function_param is None: raise NotImplementedError("function_param attribute not set for" "language '%s'" % cls.language) out = [] # Opening for statement line out.append(cls.format_function_param('while_begin', cond=cond)) # Indent loop contents if not isinstance(loop_contents, (list, tuple)): loop_contents = [loop_contents] for x in loop_contents: out.append(cls.function_param['indent'] + x) # Close block out.append(cls.function_param.get('while_end', cls.function_param.get( 'block_end', ''))) return out
[docs] @classmethod def write_try_except(cls, try_contents, except_contents, error_var='e', error_type=None): r"""Return the lines required to complete a try/except block. Args: try_contents (list): Lines of code that should be executed inside the try block. except_contents (list): Lines of code that should be executed inside the except block. error_var (str, optional): Name of variable where the caught error should be stored. Defaults to 'e'. error_type (str, optional): Name of error type that should be caught. If not provided, defaults to None and will be set based on the class function_param entry for 'try_error_type'. Returns: Lines of code perfoming a try/except block. """ if (cls.function_param is None) or ('try_begin' not in cls.function_param): raise NotImplementedError("function_param attribute not set for" "language '%s'" % cls.language) if error_type is None: error_type = cls.function_param.get('try_error_type', None) out = [] # Try block contents if not isinstance(try_contents, (list, tuple)): try_contents = [try_contents] out.append(cls.function_param['try_begin']) for x in try_contents: out.append(cls.function_param['indent'] + x) # Except block contents if not isinstance(except_contents, (list, tuple)): except_contents = [except_contents] out.append(cls.format_function_param('try_except', error_var=error_var, error_type=error_type)) for x in except_contents: out.append(cls.function_param['indent'] + x) # Close block out.append(cls.function_param.get('try_end', cls.function_param.get( 'block_end', ''))) return out
[docs] @classmethod def get_testing_options(cls, **kwargs): r"""Method to return a dictionary of testing options for this class. Returns: dict: Dictionary of variables to use for testing. Key/value pairs: kwargs (dict): Keyword arguments for driver instance. deps (list): Dependencies to install. """ out = dict( kwargs={}, deps=[], write_function_def_params=[ {'inputs': [{'name': 'x', 'value': 1.0, 'datatype': {'type': 'float', 'precision': 4, 'units': 'cm'}}], 'outputs': [{'name': 'y', 'datatype': {'type': 'float', 'precision': 4, 'units': 'cm'}}]}], split_lines=[('abcdef', {'length': 3, 'force_split': True}, ['abc', 'def']), (' abc', {'length': 3, 'force_split': True}, [' abc'])]) return out