import os
import re
import warnings
import copy
import shutil
import subprocess
import numpy as np
import sysconfig
from collections import OrderedDict
from yggdrasil import platform, tools, constants, rapidjson
from yggdrasil.drivers.CompiledModelDriver import (
CompiledModelDriver, CompilerBase, LinkerBase, ArchiverBase)
from yggdrasil.languages import get_language_dir
from yggdrasil.config import ygg_cfg
_default_internal_libtype = 'object'
_top_lang_dir = get_language_dir('c')
[docs]def get_OSX_SYSROOT():
r"""Determin the path to the OSX SDK.
Returns:
str: Full path to the SDK directory if one is located. None
otherwise.
"""
fname = None
if platform._is_mac:
try:
xcode_dir = subprocess.check_output(
'echo "$(xcode-select -p)"', shell=True).decode("utf-8").strip()
except BaseException: # pragma: debug
xcode_dir = None
fname_try = []
cfg_sdkroot = ygg_cfg.get('c', 'macos_sdkroot', None)
if cfg_sdkroot:
fname_try.append(cfg_sdkroot)
if os.environ.get('SDKROOT', False):
fname_try.append(os.environ['SDKROOT'])
if xcode_dir is not None:
bases_try = [
os.path.join(xcode_dir, 'SDKs', 'MacOSX%s.sdk'),
os.path.join(xcode_dir, 'Platforms',
'MacOSX.platform', 'Developer',
'SDKs', 'MacOSX%s.sdk')]
vers_try = ['11.0', ''] # 11.0 used by conda-forge
if os.environ.get('MACOSX_DEPLOYMENT_TARGET', False):
vers_try.insert(0, os.environ['MACOSX_DEPLOYMENT_TARGET'])
for v in vers_try:
fname_try += [x % v for x in bases_try]
for fcheck in fname_try:
if os.path.isdir(fcheck):
fname = fcheck
break
return fname
_osx_sysroot = get_OSX_SYSROOT()
[docs]class CCompilerBase(CompilerBase):
r"""Base class for C compilers."""
languages = ['c']
default_executable_env = 'CC'
default_flags_env = 'CFLAGS'
default_flags = ['-g', '-Wall']
# GCC & CLANG have similar call patterns
linker_attributes = {'default_executable_env': 'LD',
'default_flags_env': 'LDFLAGS',
'search_path_envvar': ['LIBRARY_PATH', 'LD_LIBRARY_PATH']}
search_path_envvar = ['C_INCLUDE_PATH']
search_path_flags = ['-E', '-v', '-xc', '/dev/null']
search_regex_begin = '#include "..." search starts here:'
search_regex_end = 'End of search list.'
search_regex = [r'(?:#include <...> search starts here:)|'
r'(?: ([^\n]+?)(?: \(framework directory\))?)\n']
# This is only needed if one of the linkers is created from the compiler
# @staticmethod
# def before_registration(cls):
# r"""Operations that should be performed to modify class attributes prior
# to registration including things like platform dependent properties and
# checking environment variables for default settings.
# """
# if platform._is_mac:
# cls.linker_attributes = dict(cls.linker_attributes,
# search_path_flags=['-Xlinker', '-v'],
# search_regex=[r'\t([^\t\n]+)\n'],
# search_regex_begin='Library search paths:')
# elif platform._is_linux:
# cls.linker_attributes = dict(cls.linker_attributes,
# search_path_flags=['-Xlinker', '--verbose'],
# search_regex=[r'SEARCH_DIR\("=([^"]+)"\);'])
# CompilerBase.before_registration(cls)
[docs] @classmethod
def set_env(cls, *args, **kwargs):
r"""Set environment variables required for compilation.
Args:
*args: Arguments are passed to the parent class's method.
**kwargs: Keyword arguments are passed to the parent class's
method.
Returns:
dict: Environment variables for the model process.
"""
out = super(CCompilerBase, cls).set_env(*args, **kwargs)
if _osx_sysroot is not None:
out['CONDA_BUILD_SYSROOT'] = _osx_sysroot
out['SDKROOT'] = _osx_sysroot
grp = re.search(r'MacOSX(?P<target>[0-9]+\.[0-9]+)?',
_osx_sysroot).groupdict()
# This is only utilized on local installs where a
# non-default SDK is installed in addition to the default
if grp['target']: # pragma: debug
out['MACOSX_DEPLOYMENT_TARGET'] = grp['target']
return out
[docs] @classmethod
def call(cls, args, **kwargs):
r"""Call the compiler with the provided arguments. For |yggdrasil| C
models will always be linked using the C++ linker since some parts of
the interface library are written in C++."""
if not kwargs.get('dont_link', False):
kwargs.setdefault('linker_language', 'c++')
return super(CCompilerBase, cls).call(args, **kwargs)
[docs] @classmethod
def get_search_path(cls, *args, **kwargs):
r"""Determine the paths searched by the tool for external library files.
Args:
*args: Additional arguments are passed to the parent
class's method.
**kwargs: Additional keyword arguments are passed to the
parent class's method.
Returns:
list: List of paths that the tools will search.
"""
return super(CompilerBase, cls).get_search_path(*args, **kwargs)
[docs]class GCCCompiler(CCompilerBase):
r"""Interface class for gcc compiler/linker."""
toolname = 'gcc'
platforms = ['MacOS', 'Linux', 'Windows']
default_archiver = 'ar'
default_linker = 'gcc'
is_linker = False
# linker_attributes = dict(
# CCompilerBase.linker_attributes,
# flag_options=OrderedDict(
# list(LinkerBase.flag_options.items())
# + list(CCompilerBase.linker_attributes.get('flag_options', {}).items())
# + [('library_rpath', '-Wl,-rpath')]))
toolset = 'gnu'
aliases = ['gnu-cc', 'gnu-gcc']
asan_flags = ['-fsanitize=address']
preload_envvar = 'LD_PRELOAD'
object_tool = "ldd"
[docs] @classmethod
def is_installed(cls):
r"""Determine if this tool is installed by looking for the executable.
Returns:
bool: True if the tool is installed, False otherwise.
"""
out = super(GCCCompiler, cls).is_installed()
# Disable gcc when it is an alias for clang
if out and platform._is_mac: # pragma: debug
ver = cls.tool_version()
if 'clang' in ver:
out = False
return out
[docs] @classmethod
def get_flags(cls, *args, **kwargs):
r"""Get a list of compiler flags."""
out = super(GCCCompiler, cls).get_flags(*args, **kwargs)
if platform._is_win: # pragma: windows
ver = cls.tool_version()
if 'mingw' in ver.lower() or 'msys' in ver.lower():
out.append('-Wa,-mbig-obj')
return out
[docs] def dll2a(cls, dll, dst=None, overwrite=False):
r"""Convert a window's .dll library into a static library.
Args:
dll (str): Full path to .dll library to convert.
dst (str, optional): Full path to location where the new
library should be saved. Defaults to None and will be
set based on lib or will be placed in the same directory
as dll.
overwrite (bool, optional): If True, the static file will
be created even if it already exists. Defaults to False.
Returns:
str: Full path to new .a static library.
"""
# https://sourceforge.net/p/mingw-w64/wiki2/
# Answer%20generation%20of%20DLL%20import%20library/
base = os.path.splitext(os.path.basename(dll))[0]
if dst is None:
libbase = base
if not libbase.startswith('lib'):
libbase = 'lib' + libbase
libdir = os.path.dirname(dll)
dst = os.path.join(libdir, libbase + '.dll.a')
if (not os.path.isfile(dst)) or overwrite:
gendef = shutil.which("gendef")
dlltool = shutil.which("dlltool")
if gendef and dlltool:
subprocess.check_call([gendef, dll])
subprocess.check_call(
[dlltool, '-D', dll, '-d', '%s.def' % base, '-l', dst])
else:
dst = dll
assert os.path.isfile(dst)
return dst
[docs]class ClangCompiler(CCompilerBase):
r"""Interface class for clang compiler/linker."""
toolname = 'clang'
platforms = ['MacOS', 'Linux', 'Windows']
default_linker = 'clang'
default_archiver = 'libtool'
flag_options = OrderedDict(list(CCompilerBase.flag_options.items())
+ [('sysroot', '--sysroot'),
('isysroot', {'key': '-isysroot',
'prepend': True}),
('mmacosx-version-min',
'-mmacosx-version-min=%s')])
asan_flags = ['-fsanitize=address']
preload_envvar = 'DYLD_INSERT_LIBRARIES'
object_tool = "otool -L"
# Set to False since ClangLinker has its own class to handle
# conflict between versions of clang and ld.
is_linker = False
toolset = 'llvm'
[docs] @classmethod
def get_flags(cls, *args, **kwargs):
r"""Get a list of compiler flags."""
out = super(ClangCompiler, cls).get_flags(*args, **kwargs)
if '-fopenmp' in out:
idx = out.index('-fopenmp')
# new_flag = '-Xpreprocessor'
new_flag = '-Xclang'
if (idx > 0) and (out[idx - 1] != new_flag):
out.insert(idx, new_flag)
return out
[docs]class MSVCCompiler(CCompilerBase):
r"""Microsoft Visual Studio C Compiler."""
toolname = 'cl'
platforms = ['Windows']
# TODO: Currently everything compiled as C++ on windows to allow use
# of complex types. Use '/TC' instead of '/TP' for strictly C
default_flags = ['/W4', # Display all errors
'/Zi', # Symbolic debug in .pdb (implies debug)
'/FS', # Allow synchronous write to .pdb
# '/MD',
# '/MTd', # Use LIBCMTD.lib to create multithreaded .exe
# '/Z7', # Symbolic debug in .obj (implies debug)
"/EHsc", # Catch C++ exceptions only (C don't throw C++)
"/bigobj", # Allow big files
'/TP', # Treat all files as C++
"/nologo", # Suppress startup banner
# Don't show errors from using scanf, strcpy, etc.
"-D_CRT_SECURE_NO_WARNINGS"]
output_key = '/Fo%s'
output_first = True
default_linker = 'LINK'
default_archiver = 'LIB'
linker_switch = '/link'
search_path_envvar = ['INCLUDE']
search_path_flags = None
version_flags = []
product_exts = ['.dir', '.ilk', '.pdb', '.sln', '.vcxproj', '.vcxproj.filters',
'.exp', '.lib']
combine_with_linker = True # Must be explicit; linker is separate .exe
linker_attributes = dict(GCCCompiler.linker_attributes,
default_executable=None,
default_executable_env='LINK',
output_key='/OUT:%s',
output_first=True,
output_first_library=False,
flag_options=OrderedDict(
[('library_libs', ''),
('library_libs_nonstd', ''),
('library_dirs', '/LIBPATH:%s'),
('import_lib', '/IMPLIB:%s')]),
shared_library_flag='/DLL',
search_path_envvar=['LIB'],
search_path_flags=None)
toolset = 'msvc'
[docs] @staticmethod
def before_registration(cls):
r"""Operations that should be performed to modify class attributes prior
to registration including things like platform dependent properties and
checking environment variables for default settings.
"""
compiler_path = shutil.which('cl.exe')
linker_path = shutil.which('link.exe')
if (((compiler_path and linker_path)
and (os.path.dirname(compiler_path).lower()
!= os.path.dirname(linker_path).lower()))):
cls.linker_attributes['default_executable'] = os.path.join(
os.path.dirname(compiler_path), os.path.basename(linker_path))
assert os.path.isfile(
cls.linker_attributes['default_executable'])
CCompilerBase.before_registration(cls)
# C Linkers
[docs]class LDLinker(LinkerBase):
r"""Linker class for ld tool."""
toolname = 'ld'
# Languages disabled for ld by default to prevent it being
# selected instead of the default which seems to be happening
# on the CI
languages = ['c'] # ['c', 'c++', 'fortran']
default_executable_env = 'LD'
default_flags_env = 'LDFLAGS'
version_flags = ['-v']
search_path_envvar = ['LIBRARY_PATH', 'LD_LIBRARY_PATH']
asan_flags = ['-fsanitize=address']
[docs] @classmethod
def get_flags(cls, *args, **kwargs):
r"""Get a list of linker flags."""
out = super(LDLinker, cls).get_flags(*args, **kwargs)
if '-lstdc++' not in out:
out.append('-lstdc++')
return out
[docs]class GCCLinker(LDLinker):
r"""Interface class for gcc linker (calls to ld)."""
toolname = GCCCompiler.toolname
aliases = GCCCompiler.aliases
languages = GCCCompiler.languages
platforms = GCCCompiler.platforms
default_executable = GCCCompiler.default_executable
toolset = GCCCompiler.toolset
search_path_flags = ['-Xlinker', '--verbose']
search_regex = [r'SEARCH_DIR\("=([^"]+)"\);']
flag_options = OrderedDict(LDLinker.flag_options,
**{'library_rpath': '-Wl,-rpath'})
[docs]class ClangLinker(LDLinker):
r"""Interface class for clang linker (calls to ld)."""
toolname = ClangCompiler.toolname
aliases = ClangCompiler.aliases
languages = ClangCompiler.languages
platforms = ClangCompiler.platforms
default_executable = ClangCompiler.default_executable
toolset = ClangCompiler.toolset
search_path_flags = ['-Xlinker', '-v']
search_regex = [r'\t([^\t\n]+)\n']
search_regex_begin = 'Library search paths:'
flag_options = OrderedDict(LDLinker.flag_options,
**{'linker-version': '-mlinker-version=%s',
'library_rpath': '-rpath',
'library_libs_nonstd': ''})
asan_flags = ['-fsanitize=address', '-shared-libasan']
[docs] @staticmethod
def before_registration(cls):
r"""Operations that should be performed to modify class attributes prior
to registration including things like platform dependent properties and
checking environment variables for default settings.
"""
LDLinker.before_registration(cls)
if platform._is_win: # pragma: windows
# On windows clang calls the MSVC linker LINK.exe which does not
# accept rpath. Runtime libraries must be in the same directory
# as the executable or a directory in the PATH env variable.
cls.flag_options.pop('library_rpath', None)
[docs] @classmethod
def get_flags(cls, *args, **kwargs):
r"""Get a list of linker flags."""
# Handle case where clang (10.0.0) is trying to pass
# -platform_version to a version of ld64 that dosn't support
# it (<520).
# https://bugs.llvm.org/show_bug.cgi?id=44813
# https://reviews.llvm.org/D71579
# https://reviews.llvm.org/D74784
ver = cls.tool_version()
if int(ver.split('.')[0]) >= 10:
ld_version = LDLinker.tool_version()
if float(ld_version.split('.')[0]) < 520: # pragma: version
# No longer covered as the default conda
# install no longer causes this configuration
# to occur, but this will not be deprecated
# as existing installs still have this mismatch
kwargs['linker-version'] = ld_version
out = super(ClangLinker, cls).get_flags(*args, **kwargs)
if '-fopenmp' in out:
out[out.index('-fopenmp')] = '-lomp'
if 'conda' not in cls.get_executable(full_path=True):
result = subprocess.check_output(
"find /usr/local -xdev -name '*libomp*'",
shell=True).splitlines()
for x in result:
x_dir, x_file = os.path.split(x.decode("utf-8"))
if x_file.endswith(('libomp.dylib', 'libomp.a')):
out.append(f'-L{x_dir}')
break
return out
# C Archivers
[docs]class ARArchiver(ArchiverBase):
r"""Archiver class for ar tool."""
toolname = 'ar'
languages = ['c', 'c++', 'fortran']
default_executable_env = 'AR'
default_flags_env = None
static_library_flag = 'rcs'
output_key = ''
output_first_library = True
toolset = 'gnu'
compatible_toolsets = ['llvm']
search_path_envvar = ['LIBRARY_PATH']
asan_flags = []
[docs]class MSVCArchiver(ArchiverBase):
r"""Microsoft Visual Studio C Archiver."""
toolname = 'LIB'
languages = ['c', 'c++']
platforms = ['Windows']
static_library_flag = None
output_key = '/OUT:%s'
toolset = 'msvc'
compatible_toolsets = ['llvm']
search_path_envvar = ['LIB']
# @classmethod
# def is_import_lib(cls, libpath):
# r"""Determine if a library is an import library or a static
# library.
# Args:
# libpath (str): Full path to library.
# Returns:
# bool: True if the library is an import library, False otherwise.
# """
# if (not os.path.isfile(libpath)) or (not libpath.endswith('.lib')):
# return False
# out = subprocess.check_output([cls.get_executable(full_path=True),
# '/list', libpath])
# files = set(out.splitlines())
# if any([f.endswith('.obj') for f in files]):
# return False
# return True
_incl_interface = _top_lang_dir
_incl_seri = os.path.join(_top_lang_dir, 'serialize')
_incl_comm = os.path.join(_top_lang_dir, 'communication')
_python_inc = ygg_cfg.get('c', 'python_include', None)
if (_python_inc is None) or (not os.path.isfile(_python_inc)): # pragma: no cover
_python_inc = sysconfig.get_paths()['include']
else:
_python_inc = os.path.dirname(_python_inc)
try:
if platform._is_win: # pragma: windows
libtype_order = ['static', 'shared']
else:
libtype_order = ['shared', 'static']
_python_lib = ygg_cfg.get('c', 'python_%s' % libtype_order[0],
ygg_cfg.get('c', 'python_%s' % libtype_order[1], None))
if (_python_lib is None) or (not os.path.isfile(_python_lib)): # pragma: no cover
_python_lib = tools.get_python_c_library(
allow_failure=True, libtype=libtype_order[0])
except BaseException as e: # pragma: debug
warnings.warn("ERROR LOCATING PYTHON LIBRARY: %s" % e)
_python_lib = None
try:
_numpy_inc = [np.get_include()]
except AttributeError: # pragma: debug
from numpy import distutils as numpy_distutils
_numpy_inc = numpy_distutils.misc_util.get_numpy_include_dirs()
_numpy_lib = None # os.path.join(os.path.dirname(_numpy_inc[0]), 'lib', 'npymath.lib')
[docs]class CModelDriver(CompiledModelDriver):
r"""Class for running C models."""
_schema_subtype_description = ('Model is written in C.')
_deprecated_drivers = ['GCCModelDriver']
language = 'c'
language_ext = ['.c', '.h']
interface_library = 'ygg'
supported_comms = ['ipc', 'zmq']
supported_comm_options = {
'ipc': {'platforms': ['MacOS', 'Linux']},
'zmq': {'libraries': ['zmq', 'czmq']}}
interface_dependencies = ['rapidjson']
interface_directories = [_incl_interface]
standard_libraries = ['m']
external_libraries = {
'rapidjson': {'include': os.path.join(os.path.dirname(tools.__file__),
'rapidjson', 'include',
'rapidjson', 'rapidjson.h'),
'libtype': 'header_only',
'language': 'c'},
'zmq': {'include': 'zmq.h',
'libtype': 'shared',
'language': 'c'},
'czmq': {'include': 'czmq.h',
'libtype': 'shared',
'language': 'c'},
'numpy': {'include': os.path.join(_numpy_inc[0], 'numpy',
'arrayobject.h'),
'libtype': 'header_only',
'language': 'c',
'for_python_api': True},
'python': {'include': os.path.join(_python_inc, 'Python.h'),
'language': 'c',
'for_python_api': True,
'standard': True}}
internal_libraries = {
'ygg': {'source': os.path.join(_incl_interface, 'YggInterface.c'),
'language': 'c',
'linker_language': 'c++', # Some dependencies are C++
'internal_dependencies': ['regex', 'datatypes'],
'external_dependencies': ['rapidjson',
'python', 'numpy'],
'include_dirs': [_incl_comm, _incl_seri],
'compiler_flags': []},
'regex_win32': {'source': 'regex_win32.cpp',
'directory': os.path.join(_top_lang_dir, 'regex'),
'language': 'c++',
'libtype': _default_internal_libtype,
'internal_dependencies': [],
'external_dependencies': []},
'regex_posix': {'source': 'regex_posix.h',
'directory': os.path.join(_top_lang_dir, 'regex'),
'language': 'c',
'libtype': 'header_only',
'internal_dependencies': [],
'external_dependencies': []},
'datatypes': {'directory': os.path.join(_top_lang_dir, 'datatypes'),
'language': 'c++',
'libtype': _default_internal_libtype,
'internal_dependencies': ['regex'],
'external_dependencies': ['rapidjson',
'python', 'numpy'],
'include_dirs': []}}
type_map = {
'comm': 'comm_t*',
'dtype': 'dtype_t*',
'int': 'intX_t',
'float': 'double',
'string': 'char*', # string_t',
'array': 'json_array_t',
'object': 'json_object_t',
'integer': 'int',
'number': 'double',
'boolean': 'bool',
'length': 'size_t',
'null': 'void*',
'uint': 'uintX_t',
'complex': 'complex_X',
'bytes': 'string_t', # char*',
'unicode': 'unicode_t',
'1darray': '*',
'ndarray': '*',
'ply': 'ply_t',
'obj': 'obj_t',
'schema': 'schema_t',
'flag': 'int',
'class': 'python_class_t',
'function': 'python_function_t',
'instance': 'python_instance_t',
'any': 'generic_t'}
interface_map = {
'import': '#include "YggInterface.h"',
'input': 'yggInputType("{channel_name}", {datatype})',
'output': 'yggOutputType("{channel_name}", {datatype})',
'server': 'yggRpcServerType("{channel_name}", {datatype_in}, {datatype_out})',
'client': 'yggRpcClientType("{channel_name}", {datatype_out}, {datatype_in})',
'timesync': 'yggTimesync("{channel_name}", "{time_units}")',
'send': 'flag = yggSend({channel_obj}, {outputs})',
'recv': 'flag = yggRecv({channel_obj}, {input_refs})',
'call': 'flag = rpcCall({channel_obj}, {outputs}, {inputs})',
}
type_class_map = {}
function_param = {
'import': '#include \"{filename}\"',
'index': '{variable}[{index}]',
'interface': '#include \"{interface_library}\"',
'input': ('{channel} = yggInputType('
'\"{channel_name}\", {channel_type});'),
'output': ('{channel} = yggOutputType('
'\"{channel_name}\", {channel_type});'),
'recv_heap': 'yggRecvRealloc',
'recv_stack': 'yggRecv',
'recv_function': 'yggRecvRealloc',
'send_function': 'yggSend',
'not_flag_cond': '{flag_var} < 0',
'flag_cond': '{flag_var} >= 0',
'declare': '{type_name} {variable};',
'init_array': 'init_json_array()',
'init_object': 'init_json_object()',
'init_schema': 'init_schema()',
'init_ply': 'init_ply()',
'init_obj': 'init_obj()',
'init_class': 'init_python()',
'init_function': 'init_python()',
'init_instance': 'init_generic()',
'init_any': 'init_generic()',
'init_type_from_schema': ('create_dtype_from_schema(\"{schema}\",'
' {use_generic})'),
'init_type_array': ('create_dtype_json_array({nitems}, '
'{items}, {use_generic})'),
'init_type_object': ('create_dtype_json_object({nitems}, '
'{keys}, {values}, {use_generic})'),
'init_type_ply': 'create_dtype_ply({use_generic})',
'init_type_obj': 'create_dtype_obj({use_generic})',
'init_type_1darray': ('create_dtype_1darray(\"{subtype}\", '
'{precision}, {length}, \"{units}\", '
'{use_generic})'),
'init_type_ndarray': ('create_dtype_ndarray(\"{subtype}\", '
'{precision}, {ndim}, {shape}, '
'\"{units}\", {use_generic})'),
'init_type_ndarray_arr': ('create_dtype_ndarray_arr(\"{subtype}\", '
'{precision}, {ndim}, {shape}, '
'\"{units}\", {use_generic})'),
'init_type_scalar': ('create_dtype_scalar(\"{subtype}\", '
'{precision}, \"{units}\", '
'{use_generic})'),
'init_type_default': ('create_dtype_default(\"{type}\", '
'{use_generic})'),
'init_type_pyobj': ('create_dtype_pyobj(\"{type}\", '
'{use_generic})'),
'init_type_empty': ('create_dtype_empty({use_generic})'),
'init_type_schema': ('create_dtype_schema({use_generic})'),
'copy_array': 'copy_generic_into(&{name}, {value});',
'copy_object': 'copy_generic_into(&{name}, {value});',
'copy_schema': 'copy_generic_into(&{name}, {value});',
'copy_ply': '{name} = copy_ply({value});',
'copy_obj': '{name} = copy_obj({value});',
'copy_class': '{name} = copy_python({value});',
'copy_function': '{name} = copy_python({value});',
'copy_instance': 'copy_generic_into(&{name}, {value});',
'copy_any': 'copy_generic_into(&{name}, {value});',
'free_array': 'free_json_array({variable});',
'free_object': 'free_json_object({variable});',
'free_schema': 'free_schema({variable});',
'free_ply': 'free_ply({variable});',
'free_obj': 'free_obj({variable});',
'free_class': 'destroy_python({variable});',
'free_function': 'destroy_python({variable});',
'free_instance': 'free_generic({variable});',
'free_any': 'free_generic({variable});',
'print_float': 'printf("%f\\n", {object});',
'print_int': 'printf("%i\\n", {object});',
'print_uint': 'printf("%u\\n", {object});',
'print_string': 'printf("%s\\n", {object});',
'print_unicode': 'printf("%s\\n", {object});',
'print_bytes': 'printf("%s\\n", {object});',
'print_complex': 'print_complex({object});',
'print_array': 'display_json_array({object});',
'print_object': 'display_json_object({object});',
'print_schema': 'display_schema({object});',
'print_ply': 'display_ply({object});',
'print_obj': 'display_obj({object});',
'print_class': 'display_python({object});',
'print_function': 'display_python({object});',
'print_instance': 'display_generic({object});',
'print_any': 'display_generic({object});',
'assign': '{name} = {value};',
'assign_copy': 'memcpy({name}, {value}, {N}*sizeof({native_type}));',
'comment': '//',
'true': 'true',
'false': 'false',
'true_flag': '1',
'false_flag': '0',
'null': 'NULL',
'not': '!',
'and': '&&',
'indent': 2 * ' ',
'quote': '\"',
'print': 'printf(\"{message}\\n\");',
'fprintf': 'printf(\"{message}\\n\", {variables});',
'error': 'printf(\"{error_msg}\\n\"); return -1;',
'block_end': '}',
'line_end': ';',
'if_begin': 'if ({cond}) {{',
'if_elif': '}} else if ({cond}) {{',
'if_else': '}} else {{',
'for_begin': ('for ({iter_var} = {iter_begin}; {iter_var} < {iter_end}; '
'{iter_var}++) {{'),
'while_begin': 'while ({cond}) {{',
'break': 'break;',
'exec_begin': 'int main() {',
'exec_end': ' return 0;\n}',
'exec_prefix': '#include <stdbool.h>',
'python_init': 'init_python_API();',
'free': 'if ({variable} != NULL) {{ free({variable}); {variable} = NULL; }}',
'function_def_begin': '{output_type} {function_name}({input_var}) {{',
'return': 'return {output_var};',
'function_def_regex': (
r'(?P<flag_type>.+?)\s*{function_name}\s*'
r'\((?P<inputs>(?:[^{{])*?)\)\s*\{{'
r'(?P<body>(?:.*?\n?)*?)'
r'(?:(?:return *(?P<flag_var>.+?)?;(?:.*?\n?)*?\}})'
r'|(?:\}}))'),
'inputs_def_regex': (
r'\s*(?P<native_type>(?:[^\s\*])+(\s+)?'
r'(?P<ptr>\*+)?)(?(ptr)(?(1)(?:\s*)|(?:\s+)))'
r'(\((?P<name_ptr>\*+)?)?(?P<name>.+?)(?(4)(?:\)))'
r'(?P<shape>(?:\[.+?\])+)?\s*(?:,|$)(?:\n)?'),
'outputs_def_regex': (
r'\s*(?P<native_type>(?:[^\s\*])+(\s+)?'
r'(?P<ptr>\*+)?)(?(ptr)(?(1)(?:\s*)|(?:\s+)))'
r'(?P<name>.+?)(?P<shape>(?:\[.+?\])+)?\s*(?:,|$)(?:\n)?')}
outputs_in_inputs = True
include_channel_obj = True
is_typed = True
brackets = (r'{', r'}')
locked_buildfile = 'datatypes.o'
[docs] @staticmethod
def after_registration(cls, **kwargs):
r"""Operations that should be performed to modify class attributes after
registration."""
if cls.default_compiler is None:
if platform._is_linux:
cls.default_compiler = 'gcc'
elif platform._is_mac:
cls.default_compiler = 'clang'
elif platform._is_win: # pragma: windows
cls.default_compiler = 'cl'
CompiledModelDriver.after_registration(cls, **kwargs)
if kwargs.get('second_pass', False):
return
if _python_lib:
if ((_python_lib.endswith(('.lib', '.a'))
and not _python_lib.endswith('.dll.a'))):
cls.external_libraries['python']['libtype'] = 'static'
cls.external_libraries['python']['static'] = _python_lib
else:
cls.external_libraries['python']['libtype'] = 'shared'
cls.external_libraries['python']['shared'] = _python_lib
if platform._is_win: # pragma: windows
for libtype in ['static', 'shared']:
if libtype in cls.external_libraries['python']:
continue
cls.external_libraries['python'][libtype] = tools.get_python_c_library(
allow_failure=True, libtype=libtype)
for x in ['zmq', 'czmq', 'python']:
if x in cls.external_libraries:
cls.external_libraries[x]['libtype'] = 'windows_import'
# Platform specific regex internal library
if platform._is_win: # pragma: windows
regex_lib = cls.internal_libraries['regex_win32']
else:
regex_lib = cls.internal_libraries['regex_posix']
cls.internal_libraries['regex'] = regex_lib
# Platform specific internal library options
cls.internal_libraries['ygg']['include_dirs'] += [_top_lang_dir]
if platform._is_win: # pragma: windows
cls.internal_libraries['datatypes']['include_dirs'] += [_top_lang_dir]
if platform._is_linux:
for x in ['ygg', 'datatypes']:
if 'compiler_flags' not in cls.internal_libraries[x]:
cls.internal_libraries[x]['compiler_flags'] = []
if '-fPIC' not in cls.internal_libraries[x]['compiler_flags']:
cls.internal_libraries[x]['compiler_flags'].append('-fPIC')
if 'm' not in cls.internal_libraries[x]['external_dependencies']:
cls.internal_libraries[x]['external_dependencies'].append('m')
[docs] @classmethod
def call_linker(cls, obj, language=None, **kwargs):
r"""Link several object files to create an executable or library (shared
or static), checking for errors.
Args:
obj (list): Object files that should be linked.
language (str, optional): Language that should be used to link
the files. Defaults to None and the language of the current
driver is used.
**kwargs: Additional keyword arguments are passed to run_executable.
Returns:
str: Full path to compiled source.
"""
if (((cls.language == 'c') and (language is None)
and kwargs.get('for_model', False)
and (not kwargs.get('skip_interface_flags', False)))):
language = 'c++'
kwargs.update(cls.update_linker_kwargs(**kwargs))
kwargs['skip_interface_flags'] = True
return super(CModelDriver, cls).call_linker(obj, language=language,
**kwargs)
[docs] @classmethod
def update_ld_library_path(cls, env, paths_to_add=None,
add_to_front=False, add_libpython_dir=False,
toolname=None, env_var=None, **kwargs):
r"""Update provided dictionary of environment variables so that
LD_LIBRARY_PATH includes the interface directory containing the interface
libraries.
Args:
env (dict): Dictionary of enviroment variables to be updated.
paths_to_add (list, optional): Paths that should be added. If not
provided, defaults to [cls.get_language_dir()].
add_to_front (bool, optional): If True, new paths are added to the
front, rather than the end. Defaults to False.
add_libpython_dir (bool, optional): If True, the directory
containing the Python C library will be added. Defaults
to False.
toolname (str, optional): Name of compiler tool that should be used.
Defaults to None and the default compiler for the language will
be used.
env_var (str, optional): Environment variable where the paths
should be added. Defaults to None and is only set for
linux (LD_LIBRARY_PATH) and windows (PATH).
**kwargs: Additional keyword arguments are ignored.
Returns:
dict: Updated dictionary of environment variables.
"""
if paths_to_add is None:
paths_to_add = []
paths_to_add = paths_to_add + [cls.get_language_dir()]
if add_libpython_dir:
python_lib = cls.get_dependency_library(
'python', toolname=toolname)
if os.path.isfile(python_lib):
paths_to_add.append(os.path.dirname(python_lib))
if platform._is_win and ygg_cfg.get('c', 'vcpkg_dir', None):
if platform._is_64bit:
arch = 'x64-windows'
else: # pragma: debug
arch = 'x86-windows'
raise NotImplementedError("Not yet tested on 32bit Python")
paths_to_add.append(os.path.join(ygg_cfg.get('c', 'vcpkg_dir'),
'installed', arch, 'bin'))
if env_var is None:
if platform._is_linux:
env_var = 'LD_LIBRARY_PATH'
elif platform._is_win:
env_var = 'PATH'
if env_var is not None:
path_list = []
prev_path = env.pop(env_var, '')
prev_path_list = prev_path.split(os.pathsep)
if prev_path:
path_list.append(prev_path)
for x in paths_to_add:
if x not in prev_path_list:
if add_to_front:
path_list.insert(0, x)
else:
path_list.append(x)
if path_list:
env[env_var] = os.pathsep.join(path_list)
return env
[docs] @classmethod
def update_python_path(cls, env):
r"""Update provided dictionary of environment variables so that
PYTHONPATH and PYTHONHOME are set as needed (primarily on windows).
Args:
env (dict): Dictionary of enviroment variables to be updated.
Returns:
dict: Updated dictionary of environment variables.
"""
if platform._is_win: # pragma: windows
env.setdefault('PYTHONHOME', sysconfig.get_config_var('prefix'))
env.setdefault('PYTHONPATH', os.pathsep.join([
sysconfig.get_path('stdlib'), sysconfig.get_path('purelib'),
os.path.join(sysconfig.get_config_var('prefix'), 'DLLs')]))
return env
[docs] @classmethod
def set_env_class(cls, **kwargs):
r"""Set environment variables that are instance independent.
Args:
**kwargs: Additional keyword arguments are passed to the parent
class's method and update_ld_library_path.
Returns:
dict: Environment variables for the model process.
"""
out = super(CModelDriver, cls).set_env_class(**kwargs)
out = cls.update_ld_library_path(out, **kwargs)
out = cls.update_python_path(out)
return out
[docs] @classmethod
def parse_var_definition(cls, io, value, **kwargs):
r"""Extract information about input/output variables from a
string definition.
Args:
io (str): Description of variables contained in the provided
string. Must be 'inputs' or 'outputs'.
value (str): String containing one or more variable definitions.
**kwargs: Additional keyword arguments are passed to the
parent class's method.
Returns:
list: List of information about the variables contained in
the provided string.
Raises:
AssertionError: If io is not 'inputs' or 'outputs'.
NotImplementedError: If the def_regex for the specified
io is not defined.
"""
out = super(CModelDriver, cls).parse_var_definition(io, value, **kwargs)
io_map = {x['name']: x for x in out}
for i, x in enumerate(out):
if (x['name'] + '_length') in io_map:
x['length_var'] = x['name'] + '_length'
elif ('length_' + x['name']) in io_map:
x['length_var'] = 'length_' + x['name']
elif (((x['name'] + '_ndim') in io_map)
and ((x['name'] + '_shape') in io_map)):
x['ndim_var'] = x['name'] + '_ndim'
x['shape_var'] = x['name'] + '_shape'
x['datatype']['type'] = 'ndarray'
elif ((('ndim_' + x['name']) in io_map)
and (('shape_' + x['name']) in io_map)):
x['ndim_var'] = 'ndim_' + x['name']
x['shape_var'] = 'shape_' + x['name']
x['datatype']['type'] = 'ndarray'
elif 'shape' in x:
x['datatype']['shape'] = [
int(float(s.strip('[]')))
for s in x.pop('shape').split('][')]
assert x['datatype']['subtype'] in (constants.SCALAR_TYPES
+ ['string'])
if len(x['datatype']['shape']) == 1:
x['datatype']['length'] = x['datatype'].pop(
'shape')[0]
x['datatype']['type'] = '1darray'
else:
x['datatype']['type'] = 'ndarray'
return out
# if direction == 'input':
# if not x.get('ndim_var', False):
# x['ndim_var'] = {
# 'name': x['name'] + '_ndim',
# 'datatype': {
# 'type': 'uint',
# 'precision': rapidjson.SIZE_OF_SIZE_T},
# 'is_length_var': True}
# x.setdefault('extra_vars', {})
# x['extra_vars']['ndim'] = x['ndim_var']
# if not x.get('shape_var', False):
# x['shape_var'] = {
# 'name': x['name'] + '_ndim',
# 'datatype': {
# 'type': '1darray',
# 'subtype': 'uint',
# 'precision': rapidjson.SIZE_OF_SIZE_T},
# 'is_length_var': True}
# x.setdefault('extra_vars', {})
# x['extra_vars']['shape'] = x['shape_var']
[docs] @classmethod
def finalize_function_io(cls, direction, x):
r"""Finalize info for an input/output channel following function
parsing.
Args:
direction (str): Direction of channel ('input' or 'output')
x (dict): Channel info.
"""
super(CModelDriver, cls).finalize_function_io(direction, x)
if direction == 'input':
# Flag input variables for reallocation
allows_realloc = [cls.allows_realloc(v) for v in x['vars']]
if all(allows_realloc):
for v in x['vars']:
if (((v['native_type'] not in ['char*', 'string_t',
'bytes_t', 'unicode_t'])
and (not v.get('is_length_var', False))
and (v['datatype']['type'] not in
['any', 'object', 'array', 'schema',
'instance', '1darray', 'ndarray',
'ply', 'obj'])
# TODO: allow ply_t/obj_t to be passed by pointer
and (cls.function_param['recv_function']
== cls.function_param['recv_heap']))):
v['allow_realloc'] = True
if x['datatype']['type'] == 'array':
nvars_items = len(x['datatype'].get('items', []))
nvars = sum([(not ix.get('is_length_var', False))
for ix in x['vars']])
if nvars_items == nvars:
x['use_generic'] = False
else:
x['use_generic'] = True
[docs] @classmethod
def allows_realloc(cls, var):
r"""Determine if a variable allows the receive call to perform
realloc.
Args:
var (dict): Dictionary of variable properties.
Returns:
bool: True if the variable allows realloc, False otherwise.
"""
if isinstance(var, dict):
datatype = var.get('datatype', var)
if ('shape' in datatype) or ('length' in datatype):
return False
return True
[docs] @classmethod
def requires_length_var(cls, var):
r"""Determine if a variable requires a separate length variable.
Args:
var (dict): Dictionary of variable properties.
Returns:
bool: True if a length variable is required, False otherwise.
"""
if ((isinstance(var, dict)
and ((cls.get_native_type(**var) in ['char*', 'string_t',
'bytes_t', 'unicode_t'])
or var.get('datatype', {}).get(
'type', var.get('type', None)) in ['1darray'])
and (not var.get('is_length_var', False))
and ('length' not in var.get('datatype', {})))):
return True
return False
[docs] @classmethod
def requires_shape_var(cls, var):
r"""Determine if a variable requires a separate shape variable.
Args:
var (dict): Dictionary of variable properties.
Returns:
bool: True if a shape variable is required, False otherwise.
"""
if ((isinstance(var, dict)
and (var.get('datatype', {}).get(
'type', var.get('type', None)) == 'ndarray')
and (not var.get('is_length_var', False))
and ('shape' not in var.get('datatype', {})))):
return True
return False
[docs] @classmethod
def get_native_type(cls, const=False, **kwargs):
r"""Get the native type.
Args:
type (str, optional): Name of |yggdrasil| extended JSON
type or JSONSchema dictionary defining a datatype.
const (bool, optional): If True, the native type will be
marked as constant. Defaults to False.
**kwargs: Additional keyword arguments may be used in
determining the precise declaration that should be used.
Returns:
str: The native type.
"""
out, json_type = super(CModelDriver, cls).get_native_type(
return_json=True, **kwargs)
prefix = ''
if const:
prefix = 'const '
if not ((out == '*') or ('X' in out) or (out == 'double')):
return prefix + out
if out == '*':
json_subtype = copy.deepcopy(json_type)
json_subtype['type'] = 'scalar'
out = cls.get_native_type(datatype=json_subtype)
if ('length' not in json_type) and ('shape' not in json_type):
out += '*'
elif 'X' in out:
precision = json_type['precision']
if json_type['subtype'] == 'complex':
precision_map = {8: 'float',
16: 'double',
32: 'long_double'}
if precision in precision_map:
out = out.replace('X', precision_map[precision])
else: # pragma: debug
raise ValueError("Unsupported precision for complex types: %d"
% precision)
else:
out = out.replace('X', str(8 * precision))
elif out == 'double':
if json_type.get('precision', 8) == 4:
out = 'float'
return prefix + out.replace(' ', '')
[docs] @classmethod
def get_json_type(cls, native_type):
r"""Get the JSON type from the native language type.
Args:
native_type (str): The native language type.
Returns:
str, dict: The JSON type.
"""
out = {}
regex_var = r'(?P<type>.+?(?P<precision>\d*)(?:_t)?)\s*(?P<pointer>\**)'
grp = re.fullmatch(regex_var, native_type).groupdict()
if grp.get('precision', False):
out['precision'] = int(grp['precision'])
grp['type'] = grp['type'].replace(grp['precision'], 'X')
if grp['type'] == 'char':
out['type'] = 'string'
elif grp['type'] == 'void':
out['type'] = 'null'
elif grp['type'].startswith('complex'):
out['type'] = 'complex'
precision_map = {'long_double': 32,
'double': 16,
'float': 8}
prec_str = grp['type'].split('complex_')[-1]
if prec_str in precision_map:
out['precision'] = precision_map[prec_str]
else: # pragma: debug
raise ValueError("Cannot determine precision for complex type '%s'"
% grp['type'])
elif grp['type'] == 'double':
out['type'] = 'float'
out['precision'] = 8
elif grp['type'] == 'float':
out['type'] = 'float'
out['precision'] = 4
else:
if grp['type'] in ['int', 'uint']:
grp['type'] += 'X_t'
out['precision'] = np.dtype('intc').itemsize
out['type'] = super(CModelDriver, cls).get_json_type(grp['type'])
if grp.get('pointer', False):
nptr = len(grp['pointer'])
if grp['type'] in ['char', 'void']:
nptr -= 1
if nptr > 0:
out['subtype'] = out['type']
out['type'] = '1darray'
if out['type'] in constants.SCALAR_TYPES:
out['subtype'] = out['type']
out['type'] = 'scalar'
if out.get('subtype', None) == 'unicode':
out['subtype'] = 'string'
out['encoding'] = 'UCS4'
return out
[docs] @classmethod
def write_model_function_call(cls, model_function, flag_var,
inputs, outputs, **kwargs):
r"""Write lines necessary to call the model function.
Args:
model_function (str): Handle of the model function that should be
called.
flag_var (str): Name of variable that should be used as a flag.
inputs (list): List of dictionaries describing inputs to the model.
outputs (list): List of dictionaries describing outputs from the model.
**kwargs: Additional keyword arguments are passed to the parent
class's method.
Returns:
list: Lines required to carry out a call to a model function in
this language.
"""
new_inputs = copy.deepcopy(inputs)
for x in new_inputs:
for v in x['vars']:
if v.get('allow_realloc', False):
v['name'] = '*' + v['name']
return super(CModelDriver, cls).write_model_function_call(
model_function, flag_var, new_inputs, outputs, **kwargs)
[docs] @classmethod
def write_model_recv(cls, channel, recv_var, **kwargs):
r"""Write a model receive call include checking the return flag.
Args:
channel (str): Name of variable that the channel being received from
was stored in.
recv_var (dict, list): Information of one or more variables that
receieved information should be stored in.
**kwargs: Additional keyword arguments are passed to the parent
class's method.
Returns:
list: Lines required to carry out a receive call in this language.
"""
recv_var_str = recv_var
if not isinstance(recv_var, str):
recv_var_par = cls.channels2vars(recv_var)
allows_realloc = [cls.allows_realloc(v)
for v in recv_var_par]
if all(allows_realloc):
kwargs['alt_recv_function'] = cls.function_param['recv_heap']
else:
kwargs['alt_recv_function'] = cls.function_param['recv_stack']
recv_var_str = cls.prepare_output_variables(
recv_var_par, in_inputs=cls.outputs_in_inputs,
for_yggdrasil=True)
return super(CModelDriver, cls).write_model_recv(channel, recv_var_str, **kwargs)
[docs] @classmethod
def write_initialize_oiter(cls, var, value=None, **kwargs):
r"""Get the lines necessary to initialize an array for iteration
output.
Args:
var (dict, str): Name or information dictionary for the variable
being initialized.
value (str, optional): Value that should be assigned to the
variable.
**kwargs: Additional keyword arguments are passed to the
parent class's method.
Returns:
list: The lines initializing the variable.
"""
value = '({type}*)realloc({name}, {length}*sizeof({type}))'.format(
type=cls.get_native_type(**var['iter_datatype']),
name=var['name'], length=var['iter_var']['end'])
out = super(CModelDriver, cls).write_initialize_oiter(
var, value=value, **kwargs)
return out
[docs] @classmethod
def write_declaration(cls, var, **kwargs):
r"""Return the lines required to declare a variable with a certain
type.
Args:
var (dict, str): Name or information dictionary for the variable
being declared.
**kwargs: Addition keyword arguments are passed to the parent
class's method.
Returns:
list: The lines declaring the variable.
"""
if isinstance(var, str): # pragma: no cover
var = {'name': var}
type_name = cls.get_native_type(**var)
if var.get('allow_realloc', False):
type_name += '*'
var = dict(var, native_type=type_name)
if (((type_name.endswith('*')
or (type_name in ['bytes_t', 'string_t', 'unicode_t']))
and (not type_name.startswith(('comm_t', 'dtype_t')))
and (not (('length' in var.get('datatype', var))
or ('shape' in var.get('datatype', var)))))):
kwargs.get('requires_freeing', []).append(var)
kwargs.setdefault('value', 'NULL')
elif var.get('is_length_var', False):
kwargs.setdefault('value', '0')
return super(CModelDriver, cls).write_declaration(var, **kwargs)
[docs] @classmethod
def get_name_declare(cls, var):
r"""Determine the name that should be used for declaration.
Args:
var (str, dict): Name of variable or dictionary of information.
Returns:
str: Modified name for declaration.
"""
if isinstance(var, str): # pragma: no cover
return var
assert isinstance(var, dict)
out = var['name']
if isinstance(var.get('datatype', None), dict):
if 'length' in var['datatype']:
out += '[%d]' % var['datatype']['length']
elif 'shape' in var['datatype']:
for s in var['datatype']['shape']:
out += '[%d]' % s
return out
[docs] @classmethod
def write_free(cls, var, **kwargs):
r"""Return the lines required to free a variable with a certain type.
Args:
var (dict, str): Name or information dictionary for the variable
being declared.
**kwargs: Additional keyword arguments are passed to the parent
class's method.
Returns:
list: The lines freeing the variable.
"""
out = []
if isinstance(var, str): # pragma: no cover
var = {'name': var}
if ((isinstance(var.get('datatype', False), dict)
and (('free_%s' % var['datatype']['type'])
in cls.function_param))):
if var.get('allow_realloc', False):
out += super(CModelDriver, cls).write_free(
var, **kwargs)
var = {'name': var['name']}
else:
var = dict(var, name=('&' + var['name']))
out += super(CModelDriver, cls).write_free(var, **kwargs)
return out
[docs] @classmethod
def prepare_variables(cls, vars_list, in_definition=False,
for_yggdrasil=False):
r"""Concatenate a set of input variables such that it can be passed as a
single string to the function_call parameter.
Args:
vars_list (list): List of variable dictionaries containing info
(e.g. names) that should be used to prepare a string representing
input/output to/from a function call.
in_definition (bool, optional): If True, the returned sequence
will be of the format required for specifying variables
in a function definition. Defaults to False.
for_yggdrasil (bool, optional): If True, the variables will be
prepared in the formated expected by calls to yggdarsil
send/recv methods. Defaults to False.
Returns:
str: Concatentated variables list.
"""
if not isinstance(vars_list, list):
vars_list = [vars_list]
new_vars_list = []
for x in vars_list:
if isinstance(x, str):
new_vars_list.append(x)
else:
assert isinstance(x, dict)
if for_yggdrasil and x.get('is_length_var', False):
continue
new_vars_list.append(x)
if for_yggdrasil:
for k in ['length', 'ndim', 'shape']:
kvar = k + '_var'
if x.get(kvar, False):
if ((x['name'].startswith('*')
or x['name'].startswith('&'))):
new_vars_list.append(
dict(x[kvar],
name=x['name'][0] + x[kvar]['name']))
else:
new_vars_list.append(x[kvar])
if in_definition:
new_vars_list2 = []
for x in new_vars_list:
if x['name'].startswith('*'):
name = '%s%s* %s' % tuple(
[cls.get_native_type(**x)]
+ x['name'].rsplit('*', 1))
else:
name = '%s %s' % (cls.get_native_type(**x), x['name'])
new_var = dict(x, name=name)
new_var['name'] = cls.get_name_declare(new_var)
new_vars_list2.append(new_var)
new_vars_list = new_vars_list2
return super(CModelDriver, cls).prepare_variables(
new_vars_list, in_definition=in_definition,
for_yggdrasil=for_yggdrasil)
[docs] @classmethod
def prepare_output_variables(cls, vars_list, in_definition=False,
in_inputs=False, for_yggdrasil=False):
r"""Concatenate a set of output variables such that it can be passed as
a single string to the function_call parameter.
Args:
vars_list (list): List of variable names to concatenate as output
from a function call.
in_definition (bool, optional): If True, the returned sequence
will be of the format required for specifying output
variables in a function definition. Defaults to False.
in_inputs (bool, optional): If True, the output variables should
be formated to be included as input variables. Defaults to
False.
for_yggdrasil (bool, optional): If True, the variables will be
prepared in the formated expected by calls to yggdarsil
send/recv methods. Defaults to False.
Returns:
str: Concatentated variables list.
"""
if not in_inputs:
# If the output is a True output and not passed as an input
# parameter, then the output should not include the type
# information that is added if in_definition is True.
in_definition = False
return super(CModelDriver, cls).prepare_output_variables(
vars_list, in_definition=in_definition, in_inputs=in_inputs,
for_yggdrasil=for_yggdrasil)
[docs] @classmethod
def write_print_output_var(cls, var, in_inputs=False, **kwargs):
r"""Get the lines necessary to print an output variable in this
language.
Args:
var (dict): Variable information.
in_inputs (bool, optional): If True, the output variable
is passed in as an input variable to be populated.
Defaults to False.
**kwargs: Additional keyword arguments are passed to write_print_var.
Returns:
list: Lines printing the specified variable.
"""
if in_inputs and (cls.language != 'c++'):
if isinstance(var, dict):
var = dict(var, name='%s[0]' % var['name'])
else:
var = '%s[0]' % var
return super(CModelDriver, cls).write_print_output_var(
var, in_inputs=in_inputs, **kwargs)
[docs] @classmethod
def write_function_def(cls, function_name, dont_add_lengths=False,
use_length_prefix=False, **kwargs):
r"""Write a function definition.
Args:
function_name (str): Name fo the function being defined.
dont_add_lengths (bool, optional): If True, length variables
are not added for arrays. Defaults to False.
use_length_prefix (bool, optional): If True and length variables
are added, they will be named using prefixes. Otherwise,
suffixes will be used. Defaults to False.
**kwargs: Additional keyword arguments are passed to the
parent class's method.
Returns:
list: Lines completing the function call.
Raises:
ValueError: If outputs_in_inputs is not True and more than
one output variable is specified.
"""
if not dont_add_lengths:
for io in ['input', 'output']:
if io + '_var' in kwargs:
io_var = cls.parse_var_definition(
io + 's', kwargs.pop(io + '_var'))
else:
io_var = kwargs.get(io + 's', [])
for x in io_var:
if use_length_prefix:
v_length = 'length_' + x['name']
v_ndim = 'ndim_' + x['name']
v_shape = 'shape_' + x['name']
else:
v_length = x['name'] + '_length'
v_ndim = x['name'] + '_ndim'
v_shape = x['name'] + '_shape'
if x.get('is_length_var', False):
continue
if cls.requires_length_var(x):
if not x.get('length_var', False):
x['length_var'] = {
'name': v_length,
'datatype': {
'type': 'uint',
'precision': rapidjson.SIZE_OF_SIZE_T},
'is_length_var': True}
io_var.append(x['length_var'])
elif cls.requires_shape_var(x):
if not x.get('ndim_var', False):
x['ndim_var'] = {
'name': v_ndim,
'datatype': {
'type': 'uint',
'precision': rapidjson.SIZE_OF_SIZE_T},
'is_length_var': True}
io_var.append(x['ndim_var'])
if not x.get('shape_var', False):
x['shape_var'] = {
'name': v_shape,
'datatype': {
'type': '1darray',
'subtype': 'uint',
'precision': rapidjson.SIZE_OF_SIZE_T},
'is_length_var': True}
io_var.append(x['shape_var'])
length_var = {
'name': v_length,
'datatype': {
'type': 'uint',
'precision': rapidjson.SIZE_OF_SIZE_T},
'is_length_var': True}
kwargs['function_contents'] = (
cls.write_declaration(length_var)
+ kwargs.get('function_contents', []))
kwargs[io + 's'] = io_var
output_type = None
if kwargs.get('outputs_in_inputs', False):
output_type = cls.get_native_type(datatype='flag')
else:
if 'output_var' in kwargs:
kwargs['outputs'] = cls.parse_var_definition(
'outputs', kwargs.pop('output_var'))
outputs = kwargs.get('outputs', [])
nout = len(outputs)
if nout == 0:
output_type = 'void'
elif nout == 1:
output_type = cls.get_native_type(**(outputs[0]))
else: # pragma: debug
raise ValueError("C does not support more than one "
"output variable.")
kwargs['output_type'] = output_type
return super(CModelDriver, cls).write_function_def(
function_name, **kwargs)
[docs] @classmethod
def write_assign_to_output(cls, dst_var, src_var,
outputs_in_inputs=False,
dont_add_lengths=False,
use_length_prefix=False, **kwargs):
r"""Write lines assigning a value to an output variable.
Args:
dst_var (str, dict): Name or information dictionary for
variable being assigned to.
src_var (str, dict): Name or information dictionary for
value being assigned to dst_var.
outputs_in_inputs (bool, optional): If True, outputs are passed
as input parameters. In some languages, this means that a
pointer or reference is passed (e.g. C) and so the assignment
should be to the memory indicated rather than the variable.
Defaults to False.
dont_add_lengths (bool, optional): If True, length variables
are not added for arrays. Defaults to False.
use_length_prefix (bool, optional): If True and length variables
are added, they will be named using prefixes. Otherwise,
suffixes will be used. Defaults to False.
**kwargs: Additional keyword arguments are passed to the parent
class's method.
Returns:
list: Lines achieving assignment.
"""
out = []
if cls.requires_length_var(dst_var):
src_var_length = None
dst_var_length = None
if isinstance(src_var, dict):
src_var_length = src_var.get('length_var', None)
if isinstance(dst_var, dict):
dst_var_length = dst_var.get('length_var', None)
if not dont_add_lengths:
if src_var_length is None:
if use_length_prefix:
src_var_length = 'length_' + src_var['name']
else:
src_var_length = src_var['name'] + '_length'
if dst_var_length is None:
if use_length_prefix:
dst_var_length = 'length_' + dst_var['name']
else:
dst_var_length = dst_var['name'] + '_length'
out += cls.write_assign_to_output(
dst_var_length, src_var_length,
outputs_in_inputs=outputs_in_inputs)
elif src_var_length is None:
if ((dst_var['datatype']['type']
in ['1darray', 'ndarray'])): # pragma: debug
raise RuntimeError("Length must be set in order "
"to write array assignments.")
else:
subtype = dst_var['datatype'].get(
'subtype', dst_var['datatype']['type'])
assert subtype in ['bytes', 'string', 'unicode']
# if subtype == 'unicode':
# dst_var['datatype'].setdefault('encoding', "UCS4")
encoding_size = constants.FIXED_ENCODING_SIZES.get(
dst_var['datatype'].get('encoding', 'ASCII'), 4)
if encoding_size == 1:
src_var_length = 'strlen(%s)' % src_var['name']
else:
src_var_length = f"strlen{encoding_size}({src_var['name']})"
if ((dst_var['datatype'].get('subtype', dst_var['datatype']['type'])
in ['bytes', 'string', 'unicode'])):
src_var_length = f"({src_var_length}+1)"
src_var_dtype = cls.get_native_type(**src_var)
if src_var_dtype in ['bytes_t', 'unicode_t', 'string_t']:
src_var_dtype = 'char*'
src_var_dtype = src_var_dtype.rsplit('*', 1)[0]
out += cls.write_assign_to_output(
dst_var['name'], 'value',
outputs_in_inputs=outputs_in_inputs,
replacement=('{name} = ({native_type}*)realloc({name}, '
'{N}*sizeof({native_type}));'),
native_type=src_var_dtype, N=src_var_length)
kwargs.update(copy=True, native_type=src_var_dtype,
N=src_var_length)
elif cls.requires_shape_var(dst_var):
if dont_add_lengths: # pragma: debug
raise RuntimeError("Shape must be set in order "
"to write ND array assignments.")
# Dimensions
src_var_ndim = None
dst_var_ndim = None
if isinstance(src_var, dict):
src_var_ndim = src_var.get('ndim_var', None)
if isinstance(dst_var, dict):
dst_var_ndim = dst_var.get('ndim_var', None)
if src_var_ndim is None:
if use_length_prefix:
src_var_ndim = 'ndim_' + src_var['name']
else:
src_var_ndim = src_var['name'] + '_ndim'
if dst_var_ndim is None:
if use_length_prefix:
dst_var_ndim = 'ndim_' + dst_var['name']
else:
dst_var_ndim = dst_var['name'] + '_ndim'
if isinstance(src_var_ndim, str):
src_var_ndim = {'name': src_var_ndim,
'datatype': {
'type': 'uint',
'precision': rapidjson.SIZE_OF_SIZE_T}}
if isinstance(dst_var_ndim, str):
dst_var_ndim = {'name': dst_var_ndim,
'datatype': {
'type': 'uint',
'precision': rapidjson.SIZE_OF_SIZE_T}}
out += cls.write_assign_to_output(
dst_var_ndim, src_var_ndim,
outputs_in_inputs=outputs_in_inputs)
# Shape
src_var_shape = None
dst_var_shape = None
if isinstance(src_var, dict):
src_var_shape = src_var.get('shape_var', None)
if isinstance(dst_var, dict):
dst_var_shape = dst_var.get('shape_var', None)
if src_var_shape is None:
if use_length_prefix:
src_var_shape = 'shape_' + src_var['name']
else:
src_var_shape = src_var['name'] + '_shape'
if dst_var_shape is None:
if use_length_prefix:
dst_var_shape = 'shape_' + dst_var['name']
else:
dst_var_shape = dst_var['name'] + '_shape'
if isinstance(src_var_shape, str):
src_var_shape = {'name': src_var_shape,
'datatype': {
'type': '1darray',
'subtype': 'uint',
'precision': rapidjson.SIZE_OF_SIZE_T},
'length_var': src_var_ndim['name']}
if isinstance(dst_var_shape, str):
dst_var_shape = {'name': dst_var_shape,
'datatype': {
'type': '1darray',
'subtype': 'uint',
'precision': rapidjson.SIZE_OF_SIZE_T},
'length_var': dst_var_ndim['name']}
out += cls.write_assign_to_output(
dst_var_shape, src_var_shape,
outputs_in_inputs=outputs_in_inputs,
dont_add_lengths=True)
src_var_dtype = cls.get_native_type(**src_var).rsplit('*', 1)[0]
if use_length_prefix:
src_var_length = 'length_' + src_var['name']
else:
src_var_length = src_var['name'] + '_length'
out += (('{length} = 1;\n'
'size_t cdim;\n'
'for (cdim = 0; cdim < {ndim}; cdim++) {{\n'
' {length} = {length}*{shape}[cdim];\n'
'}}\n').format(length=src_var_length,
ndim=src_var_ndim['name'],
shape=src_var_shape['name'])).splitlines()
out += cls.write_assign_to_output(
dst_var['name'], 'value',
outputs_in_inputs=outputs_in_inputs,
replacement=('{name} = ({native_type}*)realloc({name}, '
'{N}*sizeof({native_type}));'),
native_type=src_var_dtype, N=src_var_length)
kwargs.update(copy=True, native_type=src_var_dtype,
N=src_var_length)
elif isinstance(dst_var, dict):
if 'shape' in dst_var.get('datatype', {}):
nele = 1
for s in dst_var['datatype']['shape']:
nele *= s
dst_var_type = cls.get_native_type(**dst_var)
kwargs.update(copy=True, N=nele,
native_type=dst_var_type)
if outputs_in_inputs and (cls.language != 'c++'):
if isinstance(dst_var, dict):
dst_var = dict(dst_var,
name='%s[0]' % dst_var['name'])
else:
dst_var = '%s[0]' % dst_var
out += super(CModelDriver, cls).write_assign_to_output(
dst_var, src_var, outputs_in_inputs=outputs_in_inputs,
**kwargs)
return out
[docs] @classmethod
def get_testing_options(cls, **kwargs):
r"""Method to return a dictionary of testing options for this class.
Args:
**kwargs: Additional keyword arguments are passed to the parent
class.
Returns:
dict: Dictionary of variables to use for testing. Key/value pairs:
kwargs (dict): Keyword arguments for driver instance.
deps (list): Dependencies to install.
"""
out = super(CModelDriver, cls).get_testing_options(**kwargs)
# Test types
out['replacement_code_types'] = {}
for k, v in cls.type_map.items():
if v == '*':
knew = {'type': k, 'subtype': 'float', 'precision': 4}
vnew = 'float*'
out['replacement_code_types'][(k, v)] = (knew, vnew)
elif 'X' in v:
knew = {'type': 'scalar', 'subtype': k, 'precision': 8}
if k == 'complex':
vnew = v.replace('X', 'float')
else:
vnew = v.replace('X', '64')
out['replacement_code_types'][(k, v)] = (knew, vnew)
# Code composition parameters
out.setdefault('write_function_def_params', [])
out['write_function_def_params'] += [
# Single output
{'inputs': [{'name': 'x', 'value': 1.0,
'datatype': {'type': 'float',
'precision': 4,
'units': 'cm'}}],
'outputs': [{'name': 'y',
'datatype': {'type': 'float',
'precision': 4,
'units': 'cm'}}],
'outputs_in_inputs': False,
'dont_add_lengths': True},
# No output
{'inputs': [{'name': 'x', 'value': 1.0,
'datatype': {'type': 'float',
'precision': 4,
'units': 'cm'}}],
'outputs': [],
'outputs_in_inputs': False},
# No length variable
{'inputs': [{'name': 'x', 'value': '"hello"',
'length_var': 'length_x',
'datatype': {'type': 'string',
'precision': 20,
'units': ''}},
{'name': 'length_x', 'value': 5,
'datatype': {'type': 'uint',
'precision': rapidjson.SIZE_OF_SIZE_T},
'is_length_var': True}],
'outputs': [{'name': 'y',
'length_var': 'length_y',
'datatype': {'type': 'string',
'precision': 20,
'units': ''}},
{'name': 'length_y',
'datatype': {'type': 'uint',
'precision': rapidjson.SIZE_OF_SIZE_T},
'is_length_var': True}],
'dont_add_lengths': True},
]
return out