import re
import os
import numpy as np
import pandas as pd
import logging
import warnings
from collections import OrderedDict
from yggdrasil import serialize, platform, constants
from yggdrasil.drivers.InterpretedModelDriver import InterpretedModelDriver
from yggdrasil.drivers.PythonModelDriver import PythonModelDriver
from yggdrasil.drivers.CModelDriver import CModelDriver
from yggdrasil.languages.R import install
logger = logging.getLogger(__name__)
[docs]class RModelDriver(InterpretedModelDriver): # pragma: R
r"""Base class for running R models.
Args:
name (str): Driver name.
args (str or list): Argument(s) for running the model in matlab.
Generally, this should be the full path to a Matlab script.
**kwargs: Additional keyword arguments are passed to parent class's
__init__ method.
"""
_schema_subtype_description = ('Model is written in R.')
language = 'R'
language_aliases = ['r']
language_ext = '.R'
base_languages = ['python']
default_interpreter = 'Rscript'
# Dynamically setting the interface library causes circular a import so
# it is defined here statically. For dynamic import, use the following:
# interface_library = PythonModelDriver.interface_library
interface_library = 'yggdrasil'
interface_dependencies = [x.split()[0] for x in
install.requirements_from_description()]
# The Batch version causes output to saved to a file rather than directed to
# stdout so use Rscript instead. For the batch version, use the following:
# default_interpreter_flags = ['CMD', 'BATCH' '--vanilla', '--silent']
default_interpreter_flags = ['--default-packages=methods,utils']
send_converters = {'table': serialize.consolidate_array}
type_map = {
'int': 'integer, bit64::integer64',
'float': 'double',
'string': 'character',
'array': 'list',
'object': 'list',
'boolean': 'logical',
'null': 'NA',
'uint': 'integer',
'number': 'double',
'complex': 'complex',
'bytes': 'char (utf-8)',
'unicode': 'char',
'1darray': 'list',
'ndarray': 'list',
'ply': 'PlyDict',
'obj': 'ObjDict',
'schema': 'list'}
interface_map = {
'import': 'library(yggdrasil)',
'input': 'YggInterface(\'YggInput\', \'{channel_name}\')',
'output': 'YggInterface(\'YggOutput\', \'{channel_name}\')',
'server': 'YggInterface(\'YggRpcServer\', \'{channel_name}\')',
'client': 'YggInterface(\'YggRpcClient\', \'{channel_name}\')',
'timesync': 'YggInterface(\'YggTimesync\', \'{channel_name}\')',
'send': 'flag = {channel_obj}$send({outputs})',
'recv': 'c(flag, {inputs}) %<-% {channel_obj}$recv()',
'call': 'c(flag, {inputs}) %<-% {channel_obj}$call({outputs})',
}
function_param = {
'import': 'reticulate::py_config(); source(\"{filename}\")',
'istype': 'is({variable}, \"{type}\")',
'len': 'length({variable})',
'index': '{variable}[[{index}]]',
'first_index': 1,
'interface': 'library(yggdrasil)',
'input': '{channel} <- YggInterface(\"YggInput\", \"{channel_name}\")',
'output': '{channel} <- YggInterface(\"YggOutput\", \"{channel_name}\")',
'python_interface': ('{channel} <- YggInterface(\"{python_interface}\", '
'\"{channel_name}\")'),
'python_interface_format': ('{channel} <- YggInterface('
'\"{python_interface}\", '
'\"{channel_name}\", '
'\"{format_str}\")'),
'recv_function': '{channel}$recv',
'send_function': '{channel}$send',
'multiple_outputs': 'c({outputs})',
'multiple_outputs_def': 'list({outputs})',
'true': 'TRUE',
'false': 'FALSE',
'not': '!',
'and': '&&',
'comment': '#',
'indent': 2 * ' ',
'quote': '\"',
'print_generic': 'print({object})',
'print': 'print(\"{message}\")',
'fprintf': 'print(sprintf(\"{message}\", {variables}))',
'error': 'stop(\"{error_msg}\")',
'block_end': '}',
'if_begin': 'if ({cond}) {{',
'if_elif': '}} else if ({cond}) {{',
'if_else': '}} else {{',
'for_begin': 'for ({iter_var} in {iter_begin}:{iter_end}) {{',
'while_begin': 'while ({cond}) {{',
'try_begin': 'tryCatch({',
'try_except': '}}, error = function({error_var}) {{',
'try_end': '})',
'assign': '{name} <- {value}',
'assign_mult': '{name} %<-% {value}',
'function_def_begin': '{function_name} <- function({input_var}) {{',
'return': 'return({output_var})',
'function_def_regex': (
r'{function_name} *(?:(?:\<-)|(?:=)) *function'
r'\((?P<inputs>(?:.|(?:\r?\n))*?)\)\s*\{{'
r'(?P<body>(?:.|(?:\r?\n))*?)'
r'(?:return\((list\()?'
r'(?P<outputs>(?:.|(?:\r?\n))*?)(?(3)\))\)'
r'(?:.|(?:\r?\n))*?\}})'
r'|(?:\}})'),
'inputs_def_regex': r'\s*(?P<name>.+?)\s*(?:,|$)',
'outputs_def_regex': r'\s*(?P<name>.+?)\s*(?:,|$)',
'interface_regex': (
r'(?P<indent>[ \t]*)'
r'(?P<variable>[a-zA-Z\_]+[a-zA-Z\_0-9]*)\s*\<\-\s*'
r'YggInterface\(\'(?P<class>[a-zA-Z\_]+[a-zA-Z\_0-9]*)\'\s*\,\s*'
r'\'(?P<channel>[a-zA-Z\_]+[a-zA-Z\_0-9]*)\'\s*'
r'(?P<args>[^\)]*)\)')}
brackets = (r'{', r'}')
zero_based = False
[docs] @classmethod
def is_library_installed(cls, lib, **kwargs):
r"""Determine if a dependency is installed.
Args:
lib (str): Name of the library that should be checked.
**kwargs: Additional keyword arguments are ignored.
Returns:
bool: True if the library is installed, False otherwise.
"""
if lib not in cls._library_cache:
try:
cls.run_executable(['-e', 'library(%s)' % lib.split()[0]])
cls._library_cache[lib] = True
except RuntimeError as e:
logger.info('Error checking for R library %s: %s' % (lib, e))
cls._library_cache[lib] = False
return cls._library_cache[lib]
[docs] @classmethod
def language_version(cls, **kwargs):
r"""Determine the version of this language.
Args:
**kwargs: Keyword arguments are passed to the parent class's
method.
Returns:
str: Version of compiler/interpreter for this language.
"""
kwargs.setdefault('skip_interpreter_flags', True)
return super(RModelDriver, cls).language_version(**kwargs)
[docs] def run_model(self, *args, **kwargs):
r"""Run the model. Unless overridden, the model will be run using
run_executable.
Args:
*args: Arguments are passed to the parent class's method.
**kwargs: Keyword arguments are passed to the parent class's
method.
"""
if self.with_debugger:
# This dosn't appear to work as all arguments besides the
# debugger argument (-d) are passed to the debugger and there
# does not appear to be a method of running anything from
# the command line where the debugger is called.
# cmd = self.model_command()
# assert len(cmd) == 1
# interp = kwargs.pop('interpreter', self.get_interpreter())
# interp_flags = kwargs.pop('interpreter_flags', [])
# if 'Rscript' in interp:
# interp = 'R'.join(interp.rsplit('Rscript', 1))
# interp_flags = []
# kwargs['skip_interpreter_flags'] = True
# interp_flags += ['--vanilla', '-d']
# if ' ' in self.with_debugger:
# interp_flags += [f"\"{self.with_debugger}\""]
# else:
# interp_flags += [f"{self.with_debugger}"]
# interp_flags += ['-f']
# kwargs['command'] = [interp] + interp_flags
# # kwargs['interpreter'] = interp
# # kwargs['interpreter_flags'] = interp_flags
# kwargs['debug_flags'] = []
kwargs['debug_flags'] = []
warnings.warn("R does not support command line debugging.")
return super(RModelDriver, self).run_model(*args, **kwargs)
[docs] def set_env(self, **kwargs):
r"""Get environment variables that should be set for the model process.
Returns:
dict: Environment variables for the model process.
"""
out = super(RModelDriver, self).set_env(**kwargs)
out['RETICULATE_PYTHON'] = PythonModelDriver.get_interpreter()
if CModelDriver.is_language_installed():
c_linker = CModelDriver.get_tool('linker')
search_dirs = c_linker.get_search_path(env_only=True)
out = CModelDriver.update_ld_library_path(out, paths_to_add=search_dirs,
add_to_front=True)
# TODO: Set DYLD_INSERT_LIBRARIES to clang dynamic library
# on OSX when with_asan is True. R_TESTS may need to be
# modified so this variable is set when the tests are run.
# $(clang -print-file-name=libclang_rt.asan_osx_dynamic.dylib)
return out
[docs] @classmethod
def comm_atexit(cls, comm): # pragma: no cover
r"""Operations performed on comm at exit including draining receive.
Args:
comm (CommBase): Communication object.
"""
if comm.direction == 'recv':
while comm.recv(timeout=0)[0]:
comm.sleep()
else:
comm.send_eof()
comm.linger()
if not getattr(comm, 'dont_backlog', True):
comm.linger_close()
[docs] @classmethod
def python2language(cls, pyobj):
r"""Prepare a python object for transformation in R.
Args:
pyobj (object): Python object.
Returns:
object: Python object in a form that is R friendly.
"""
logger.debug("python2language: %s, %s" % (pyobj, type(pyobj)))
if isinstance(pyobj, tuple):
return tuple([cls.python2language(x) for x in pyobj])
elif isinstance(pyobj, list):
return [cls.python2language(x) for x in pyobj]
elif isinstance(pyobj, OrderedDict):
return OrderedDict([(cls.python2language(k),
cls.python2language(v))
for k, v in pyobj.items()])
elif isinstance(pyobj, dict):
return {cls.python2language(k): cls.python2language(v)
for k, v in pyobj.items()}
elif isinstance(pyobj, np.string_):
return pyobj.decode("utf-8")
elif isinstance(pyobj, pd.DataFrame):
# R dosn't have int64 and will cast 64bit ints as floats if passed
# without casting them to int32 first
for n in pyobj.columns:
if pyobj[n].dtype == np.dtype('int64'):
pyobj[n] = pyobj[n].astype('int32')
return pyobj
[docs] @classmethod
def write_model_wrapper(cls, model_file, model_function, **kwargs):
r"""Return the lines required to wrap a model function as an integrated
model.
Args:
model_file (str): Full path to the file containing the model
function's declaration.
model_function (str): Name of the model function.
**kwargs: Additional keyword arguments are passed to the parent
class's method.
Returns:
list: Lines of code wrapping the provided model with the necessary
code to run it as part of an integration.
"""
if platform._is_win and isinstance(model_file, str): # pragma: windows
model_file = model_file.replace('\\', '/')
return super(RModelDriver, cls).write_model_wrapper(
model_file, model_function, **kwargs)
[docs] @classmethod
def write_executable_import(cls, **kwargs):
r"""Add import statements to executable lines.
Args:
**kwargs: Keyword arguments for import statement.
Returns:
list: Lines required to complete the import.
"""
if kwargs.get('filename', None) and os.path.isfile(kwargs['filename']):
with open(kwargs['filename'], 'r') as fd:
contents = fd.read()
# For functions that contain interface calls, split them out
# and assign them to the global level
if re.search(cls.function_param['interface_regex'], contents):
new_contents = contents
for m in re.finditer(cls.function_param['interface_regex'],
contents):
mdict = m.groupdict()
assert 'global_scope' in mdict['args']
# if 'global_scope' not in mdict['args']:
# mdict['args'] += ', global_scope=TRUE'
new_channel = (
'{indent}if (!exists("{variable}")) {{\n'
'{indent} assign("{variable}", YggInterface('
'\'{class}\', \'{channel}\'{args}), '
'envir = .GlobalEnv)\n'
'{indent}}}').format(**mdict)
new_contents = new_contents.replace(m.group(0),
new_channel)
out = new_contents.splitlines()
return out
if platform._is_win: # pragma: windows
kwargs['filename'] = kwargs['filename'].replace('\\', '/')
return super(RModelDriver, cls).write_executable_import(**kwargs)
# The following is only provided for the yggcc CLI for building R
# packages and should not be used directly
[docs] @classmethod
def call_compiler(cls, package_dir, toolname=None, flags=None,
language='c++', verbose=False,
use_ccache=False, disable_python_c_api=False,
with_asan=False): # pragma: no cover
r"""Build an R package w/ the yggdrasil compilers.
Args:
package_dir (str): Full path to the package directory.
toolname (str, optional): Compilation tool that should be used.
Defaults to None and the default tools will be used.
flags (list, optional): Additional flags that should be passed
to the build command. Defaults to [].
language (str, optional): Language that the package is written in
(e.g. c, fortran). Defautls to 'c++'.
verbose (bool, optional): If True, information about the build
process will be displayed. Defaults to False.
use_ccache (bool, optional): If True, ccache will be added to
the compilation executable. Defaults to False.
disable_python_c_api (bool, optional): If True, the Python C
API will be disabled. Defaults to False.
"""
import shutil
import subprocess
from yggdrasil.components import import_component
if flags is None:
flags = []
if isinstance(package_dir, list):
assert len(package_dir) == 1
package_dir = package_dir[0]
cexec_vars = {'c': ['CC', 'CC_FOR_BUILD'],
'c++': ['CXX', 'CPP', 'CXX98', 'CXX11', 'CXX14',
'CXX17', 'CXX20', 'CXX_FOR_BUILD'],
'fortran': ['FC', 'F77']}
cflag_vars = {'c': ['CFLAGS'],
'c++': ['CXXFLAGS', 'CXX98FLAGS', 'CXX11FLAGS',
'CXX14FLAGS', 'CXX17FLAGS', 'CXX20FLAGS'],
'fortran': ['FFFLAGS', 'FCFLAGS']}
cstd_vars = {'c': [],
'c++': ['CXXSTD', 'CXX11STD', 'CXX14STD',
'CXX17STD', 'CXX20STD'],
'fortran': []}
env = os.environ.copy()
new_env = {}
for x in constants.LANGUAGES['compiled']:
# for x in [language]:
drv = import_component('model', x)
try:
compiler = drv.get_tool('compiler', toolname=toolname)
except NotImplementedError:
if language in [drv.language] + drv.language_aliases:
raise
continue
# archiver = compiler.archiver()
env = drv.set_env_compiler(existing=env, compiler=compiler)
cexec = compiler.get_executable(full_path=True)
if use_ccache and shutil.which('ccache'):
cexec = '%s %s' % ('ccache', cexec)
env['CCACHE_NOHASHDIR'] = 'true'
kws = {}
if x == 'c++':
kws['skip_standard_flag'] = True
cflags0 = drv.get_compiler_flags(
for_model=True, toolname=toolname,
dry_run=True, compiler=compiler,
disable_python_c_api=disable_python_c_api,
with_asan=with_asan, dont_link=True, **kws)
lflags = drv.get_linker_flags(
for_model=True, toolname=toolname,
dry_run=True, libtype='shared', with_asan=with_asan,
disable_python_c_api=disable_python_c_api)
if language in ([drv.language] + drv.language_aliases):
drv.compile_dependencies(
toolname=toolname, with_asan=with_asan,
disable_python_c_api=disable_python_c_api)
# Remove flags that are unnecessary
cflags = []
stdflags = []
for k in cflags0:
if k.startswith("-std="):
stdflags.append(k)
else:
cflags.append(k)
for k in ['-c']:
if k in cflags:
cflags.remove(k)
for k in ['-shared']:
if k in lflags:
lflags.remove(k)
for k in cexec_vars[x]:
env[k] = cexec
new_env[k] = cexec
for k in cflag_vars[x]:
env[k] = ''
new_env[k] = ' '.join(cflags)
for k in cstd_vars[x]:
env[k] = ''
new_env[k] = ' '.join(stdflags)
if language == x:
env['LDFLAGS'] = ''
new_env['LDFLAGS'] = ' '.join(lflags)
cmd = ['R', 'CMD', 'INSTALL', '--no-html', '--no-help', '--no-docs',
'--no-demo', '--no-multiarch', package_dir] + flags
makevar = os.path.expanduser(os.path.join('~', '.R', 'Makevars'))
makevar_copy = makevar + '_copy'
try:
if os.path.isfile(makevar):
shutil.move(makevar, makevar_copy)
if not os.path.isdir(os.path.dirname(makevar)):
os.mkdir(os.path.dirname(makevar))
with open(makevar, 'w') as fd:
for k, v in new_env.items():
if k.startswith('PKG_'):
fd.write('%s+=%s\n' % (k, v))
else:
fd.write('%s=%s\n' % (k, v))
if verbose:
with open(makevar, 'r') as fd:
print(fd.read())
# TODO: Set DYLD_INSERT_LIBRARIES to clang dynamic library
# on OSX when with_asan is True. R_TESTS may need to be
# modified so this variable is set when the tests are run.
# $(clang -print-file-name=libclang_rt.asan_osx_dynamic.dylib)
subprocess.check_call(cmd, env=env)
finally:
os.remove(makevar)
if os.path.isfile(makevar_copy):
shutil.move(makevar_copy, makevar)
[docs] @classmethod
def write_initialize_oiter(cls, var, value=None, **kwargs):
r"""Get the lines necessary to initialize an array for iteration
output.
Args:
var (dict, str): Name or information dictionary for the variable
being initialized.
value (str, optional): Value that should be assigned to the
variable.
**kwargs: Additional keyword arguments are passed to the
parent class's method.
Returns:
list: The lines initializing the variable.
"""
value = 'vector(mode = "list", length = %s)' % var['iter_var']['end']
out = super(RModelDriver, cls).write_initialize_oiter(
var, value=value, **kwargs)
return out
[docs] @classmethod
def write_finalize_oiter(cls, var):
r"""Get the lines necessary to finalize an array after iteration.
Args:
var (dict, str): Name or information dictionary for the variable
being initialized.
Returns:
list: The lines finalizing the variable.
"""
out = super(RModelDriver, cls).write_finalize_oiter(var)
out += [
'if (is({name}[[1]], "units")) {{'.format(name=var['name']),
' {name}_units = units::deparse_unit({name}[[1]])'.format(
name=var['name']),
'} else {',
' {name}_units = NULL'.format(name=var['name']),
'}',
'{name} = array(as.numeric(unlist({name})))'.format(name=var['name']),
'if (!(is.null({name}_units))) {{'.format(name=var['name']),
(' {name} = units::set_units({name}, {name}_units, '
' mode="standard")').format(name=var['name']),
'}',
]
return out
[docs] @classmethod
def write_expand_single_element(cls, output_var, add_cond=False):
r"""Write lines allowing extraction of the only element from a single
element array as a stand-alone variable if the variable is an array
and only has one element.
Args:
output_var (str): Name of the variable that should be conditionally
expanded.
add_cond (str, optional): Additional condition that must be
satisfied for the array element to be extracted. Defaults to
False and is ignored.
Returns:
list: Lines added the conditional expansion of single element
arrays.
"""
if not add_cond:
add_cond = []
add_cond.append(f"is.null(names({output_var}))")
return super(RModelDriver, cls).write_expand_single_element(
output_var, add_cond)
[docs] @classmethod
def install_dependency(cls, package, package_manager=None, **kwargs):
r"""Install a dependency.
Args:
package (str): Name of the package that should be installed. If
the package manager supports it, this can include version
requirements.
package_manager (str, optional): Package manager that should be
used to install the package.
**kwargs: Additional keyword arguments are passed to the parent
class.
"""
# if cls.compiled_with_asan():
# kwargs.setdefault('command_kwargs', {})
# kwargs['command_kwargs'].setdefault(
# 'env', copy.deepcopy(os.environ))
# cls.set_asan_env(kwargs['command_kwargs']['env'])
if package_manager in [None, 'cran', 'CRAN']:
if isinstance(package, list):
package = (
'c(' + ', '.join([f"\"{x}\"" for x in package]) + ')')
else:
package = f'\"{package}\"'
kwargs.setdefault(
'command',
[cls.get_interpreter(), '-e',
(f'\'install.packages({package}, dep=TRUE, '
f'repos=\"http://cloud.r-project.org\")\'')])
return super(RModelDriver, cls).install_dependency(
package, package_manager=package_manager, **kwargs)
[docs] @classmethod
def get_testing_options(cls, **kwargs):
r"""Method to return a dictionary of testing options for this class.
Args:
**kwargs: Additional keyword arguments are passed to the parent
class.
Returns:
dict: Dictionary of variables to use for testing. Key/value pairs:
kwargs (dict): Keyword arguments for driver instance.
deps (list): Dependencies to install.
"""
out = super(RModelDriver, cls).get_testing_options(**kwargs)
out['kwargs']['interpreter_flags'] = ['--vanilla']
out.update(
deps=['units', 'zeallot',
{'package': 'units', 'arguments': '-v'}],
python2language=[
(np.string_('hello'), 'hello'),
((np.string_('hello'), ), ('hello', )),
([np.string_('hello')], ['hello']),
({np.string_('hello'): np.string_('hello')},
{'hello': 'hello'}),
(OrderedDict([(np.string_('hello'), np.string_('hello'))]),
OrderedDict([('hello', 'hello')])),
(pd.DataFrame.from_dict({'a': np.zeros(5, dtype='int64')}),
pd.DataFrame.from_dict({'a': np.zeros(5, dtype='int32')}))],
invalid_libraries=['invalid_unicorn'])
return out