import os
import sys
import importlib
from yggdrasil import tools
from yggdrasil.drivers.InterpretedModelDriver import InterpretedModelDriver
[docs]class PythonModelDriver(InterpretedModelDriver):
r"""Class for running Python models."""
_schema_subtype_description = ('Model is written in Python.')
language = 'python'
language_ext = '.py'
default_interpreter = sys.executable
interface_library = 'yggdrasil.interface.YggInterface'
# supported_comms = ['ipc', 'zmq', 'rmq']
supported_comm_options = {
'ipc': {'platforms': ['MacOS', 'Linux'],
'libraries': ['sysv_ipc']},
'zmq': {'libraries': ['zmq']},
'rmq': {'libraries': ['pika']},
'mpi': {'libraries': ['mpi4py']},
'rest': {'libraries': ['requests']}}
type_map = {
'int': 'numpy.intX',
'float': 'numpy.floatX',
'string': 'str',
'array': 'list',
'object': 'dict',
'boolean': 'bool',
'null': 'None',
'uint': 'numpy.uintX',
'complex': 'numpy.complexX',
'bytes': 'bytes',
'unicode': 'str',
'1darray': 'numpy.ndarray',
'ndarray': 'numpy.ndarray',
'ply': 'PlyDict',
'obj': 'ObjDict',
'schema': 'dict'}
interface_map = {
'import': 'from yggdrasil.languages.Python.YggInterface import {commtype}',
'input': 'YggInput("{channel_name}")',
'output': 'YggOutput("{channel_name}")',
'server': 'YggRpcServer("{channel_name}")',
'client': 'YggRpcClient("{channel_name}")',
'timesync': 'YggTimesync("{channel_name}")',
'send': 'flag = {channel_obj}.send({outputs})',
'recv': 'flag, {inputs} = {channel_obj}.recv()',
'call': 'flag, {inputs} = {channel_obj}.call({outputs})',
}
function_param = {
'import_nofile': 'import {function}',
'import': 'from {filename} import {function}',
'istype': 'isinstance({variable}, {type})',
'len': 'len({variable})',
'index': '{variable}[{index}]',
'interface': 'import {interface_library} as ygg',
'input': '{channel} = ygg.YggInput(\'{channel_name}\')',
'output': '{channel} = ygg.YggOutput(\'{channel_name}\')',
'python_interface': '{channel} = ygg.{python_interface}(\'{channel_name}\')',
'python_interface_format': ('{channel} = ygg.{python_interface}'
'(\'{channel_name}\', \'{format_str}\')'),
'recv_function': '{channel}.recv',
'send_function': '{channel}.send',
'multiple_outputs': '[{outputs}]',
'comment': '#',
'true': 'True',
'false': 'False',
'not': 'not',
'and': 'and',
'indent': 4 * ' ',
'quote': '\"',
'print_generic': ('from yggdrasil.tools import print_encoded; '
'print_encoded({object})'),
'print': 'print(\"{message}\")',
'fprintf': 'print(\"{message}\" % ({variables}))',
'error': 'raise Exception("{error_msg}")',
'block_end': '',
'if_begin': 'if ({cond}):',
'if_elif': 'elif ({cond}):',
'if_else': 'else:',
'for_begin': 'for {iter_var} in range({iter_begin}, {iter_end}):',
'while_begin': 'while ({cond}):',
'break': 'break',
'try_begin': 'try:',
'try_error_type': 'BaseException',
'try_except': 'except {error_type} as {error_var}:',
'assign': '{name} = {value}',
'exec_begin': 'def main():',
'exec_suffix': ('if __name__ == "__main__":\n'
' main()'),
'function_def_begin': 'def {function_name}({input_var}):',
'return': 'return {output_var}',
'function_def_regex': (
r'\n?( *)def +{function_name}'
r' *\((?P<inputs>(?:.|\n)*?)\)? *:'
r'(?P<body>(?:\1(?: )+(?!return).*\n)|(?: *\n))*'
r'(?:\1(?: )+'
r'return *(?P<outputs>.*)?)?'),
'inputs_def_regex': r'\s*(?P<name>.+?)\s*(?:,|$)',
'outputs_def_regex': r'\s*(?P<name>.+?)\s*(?:,|$)'}
[docs] @staticmethod
def finalize_registration(cls):
r"""Operations that should be performed after a class has been fully
initialized and registered."""
cls.supported_comms = tools.get_supported_comm()
InterpretedModelDriver.finalize_registration(cls)
[docs] def set_env(self, **kwargs):
r"""Get environment variables that should be set for the model process.
Args:
**kwargs: Additional keyword arguments are passed to the parent class's
method.
Returns:
dict: Environment variables for the model process.
"""
out = super(PythonModelDriver, self).set_env(**kwargs)
if self.with_valgrind:
out['PYTHONMALLOC'] = 'malloc'
return out
[docs] @classmethod
def is_language_installed(self):
r"""Determine if this model driver is installed on the current
machine.
Returns:
bool: Truth of if this model driver can be run on the current
machine.
"""
# This is being run so python exists
return True
[docs] @classmethod
def is_library_installed(cls, lib, **kwargs):
r"""Determine if a dependency is installed.
Args:
lib (str): Name of the library that should be checked.
**kwargs: Additional keyword arguments are ignored.
Returns:
bool: True if the library is installed, False otherwise.
"""
try:
importlib.import_module(lib)
except ImportError:
return False
return True
[docs] @classmethod
def write_initialize_oiter(cls, var, value=None, **kwargs):
r"""Get the lines necessary to initialize an array for iteration
output.
Args:
var (dict, str): Name or information dictionary for the variable
being initialized.
value (str, optional): Value that should be assigned to the
variable.
**kwargs: Additional keyword arguments are passed to the
parent class's method.
Returns:
list: The lines initializing the variable.
"""
value = '[None for _ in range(%s)]' % var['iter_var']['end']
out = super(PythonModelDriver, cls).write_initialize_oiter(
var, value=value, **kwargs)
return out
[docs] @classmethod
def write_finalize_oiter(cls, var):
r"""Get the lines necessary to finalize an array after iteration.
Args:
var (dict, str): Name or information dictionary for the variable
being initialized.
Returns:
list: The lines finalizing the variable.
"""
out = super(PythonModelDriver, cls).write_finalize_oiter(var)
out += ['import numpy as np',
'from yggdrasil import units',
'dtype = np.dtype(units.get_data({name}[0]))'.format(
name=var['name']),
('{name} = units.add_units(np.array({name}, dtype=dtype),'
'units.get_units({name}[0]))').format(
name=var['name'])]
return out
[docs] @classmethod
def install_dependency(cls, package=None, package_manager=None, **kwargs):
r"""Install a dependency.
Args:
package (str): Name of the package that should be installed. If
the package manager supports it, this can include version
requirements.
package_manager (str, optional): Package manager that should be
used to install the package.
**kwargs: Additional keyword arguments are passed to the parent
class.
"""
if package_manager in [None, 'pip']:
if isinstance(package, str):
package = package.split()
kwargs.setdefault(
'command',
[cls.get_interpreter(), '-m', 'pip', 'install'] + package)
return super(PythonModelDriver, cls).install_dependency(
package, package_manager=package_manager, **kwargs)
[docs] def run_validation(self):
r"""Run the validation script for the model."""
if ((self.validation_command
and (self.validation_command.split()[0].endswith('.py')))):
self.validation_command = (
f"{self.get_interpreter()} {self.validation_command}")
return super(PythonModelDriver, self).run_validation()
[docs] @classmethod
def get_testing_options(cls, **kwargs):
r"""Method to return a dictionary of testing options for this class.
Args:
**kwargs: Additional keyword arguments are passed to the parent
class.
Returns:
dict: Dictionary of variables to use for testing. Key/value pairs:
kwargs (dict): Keyword arguments for driver instance.
deps (list): Dependencies to install.
"""
out = super(PythonModelDriver, cls).get_testing_options(**kwargs)
out['deps'] = [{'package': 'numpy', 'arguments': '-v'},
'requests', 'pyyaml']
out['kwargs']['interpreter'] = sys.executable
return out