import os
import copy
import pprint
import chevron
import yaml
import json
import git
import io as sio
from yggdrasil import constants, rapidjson
from yggdrasil.schema import get_schema
from urllib.parse import urlparse
from yaml.constructor import (
    ConstructorError, BaseConstructor, Constructor, SafeConstructor)

[docs]class YAMLSpecificationError(RuntimeError): r"""Error raised when the yaml specification does not meet expectations.""" pass
def __display_progress(verbose, obj, msg): r"""Display progress if verbosity turned on. Args: verbose (bool): If false, nothing will be displayed. msg (str): Message to display. obj (dict): Dictionary to display. """ if verbose: # pragma: no cover print(f"{msg}:\n{pprint.pformat(obj)}")
[docs]def no_duplicates_constructor(loader, node, deep=False): # """Check for duplicate keys.""" mapping = {} for key_node, value_node in node.value: key = loader.construct_object(key_node, deep=deep) value = loader.construct_object(value_node, deep=deep) if key in mapping: raise ConstructorError("while constructing a mapping", node.start_mark, "found duplicate key (%s)" % key, key_node.start_mark) mapping[key] = value return loader.construct_mapping(node, deep)
for cls in (BaseConstructor, Constructor, SafeConstructor): cls.add_constructor(yaml.resolver.BaseResolver.DEFAULT_MAPPING_TAG, no_duplicates_constructor)
[docs]def clone_github_repo(fname, commit=None, local_directory=None): r"""Clone a GitHub repository, returning the path to the local copy of the file pointed to by the URL if there is one. Args: fname (str): URL to a GitHub repository or a file in a GitHub repository that should be cloned. commit (str, optional): Commit that should be checked out. Defaults to None and the HEAD of the default branch is used. local_directory (str, optional): Local directory that the file should be cloned into. Defaults to None and the current working directory will be used. Returns: str: Path to the local copy of the repository or file in the repository. """ from import _service_host_env, _service_repo_dir if local_directory is None: local_directory = os.environ.get(_service_repo_dir, os.getcwd()) # make sure we start with a full url if 'http' not in fname: url = '' + fname else: url = fname # get the constituent url parts parsed = urlparse(url) # get the path component splitpath = parsed.path.split('/') # the first part is the 'owner' of the repo owner = splitpath[1] # the second part is the repo name reponame = splitpath[2] # the full path is the file name and location # turn the file path into an os based format fname = os.path.join(local_directory, *splitpath) # check to see if the file already exists, and clone if it does not if not os.path.exists(fname): if os.environ.get(_service_host_env, False): raise RuntimeError("Cloning of unvetted git repo is " "not permitted on a integration " "service manager.") # create the url for cloning the repo cloneurl = parsed.scheme + '://' + parsed.netloc + '/' + owner + '/' +\ reponame # clone the repo into the appropriate directory repo = git.Repo.clone_from(cloneurl, os.path.join(local_directory, owner, reponame)) if commit is not None: repo.git.checkout(commit) repo.close() # now that it is cloned, just pass the yaml file (and path) onwards return os.path.realpath(fname)
[docs]def load_yaml(fname, yaml_param=None, directory_for_clones=None, model_submission=False, verbose=False): r"""Parse a yaml file defining a run. Args: fname (str, file, dict): Path to a YAML file, an open file descriptor to a file containing a YAML, or a loaded YAML document. If fname starts with 'git:' then the code will assume the file is in a remote git repository. The remainder of fname can be the full url to the YAML file (http://mygit.repo/foo/bar/yaml/interesting.yml) or just the repo and YAML file (the server is assumed to be if not given) (foo/bar/yam/interesting.yaml will be interpreted as yaml_param (dict, optional): Parameters that should be used in mustache formatting of YAML files. Defaults to None and is ignored. directory_for_clones (str, optional): Directory that git repositories should be cloned into. Defaults to None and the current working directory will be used. model_submission (bool, optional): If True, the YAML will be evaluated as a submission to the yggdrasil model repository and model_only will be set to True. Defaults to False. verbose (bool, optional): If True, steps of the YAML parsing process will be printed. Defaults to False. Returns: dict: Contents of yaml file. """ opened = False yamlparsed = None if isinstance(fname, dict): yamlparsed = copy.deepcopy(fname) yamlparsed.setdefault('working_dir', os.getcwd()) elif isinstance(fname, str): # pull foreign file if fname.startswith('git:'): fname = clone_github_repo(fname[4:], local_directory=directory_for_clones) fname = os.path.realpath(fname) if not os.path.isfile(fname): raise IOError("Unable locate yaml file %s" % fname) fd = open(fname, 'r') opened = True else: fd = fname if (hasattr(fd, 'name') and (not'<'))): fname = else: fname = os.path.join(os.getcwd(), 'stream') # Mustache replace vars if not isinstance(yamlparsed, dict): if yaml_param is None: yaml_param = {} yamlparsed = yamlparsed = chevron.render( sio.StringIO(yamlparsed).getvalue(), dict(os.environ, **yaml_param)) if fname.endswith('.json'): yamlparsed = json.loads(yamlparsed) else: yamlparsed = yaml.safe_load(yamlparsed) if not isinstance(yamlparsed, dict): # pragma: debug raise YAMLSpecificationError("Loaded yaml is not a dictionary.") yamlparsed.setdefault('working_dir', os.path.dirname(fname)) if opened: fd.close() # Standardize models/model as list so that working directory can be set # from the repository URL yamlparsed.setdefault('models', yamlparsed.pop('model', [])) if not isinstance(yamlparsed['models'], list): yamlparsed['models'] = [yamlparsed['models']] for x in yamlparsed['models']: if isinstance(x, dict) and 'repository_url' in x and 'working_dir' not in x: repo_dir = clone_github_repo( x['repository_url'], commit=x.get('repository_commit', None), local_directory=directory_for_clones) x.setdefault('working_dir', repo_dir) s = get_schema() # TODO: Add support for turning off defaults? if model_submission: yamlparsed['connections'] = [] return yamlparsed __display_progress(verbose, yamlparsed, 'un-normalized') try: yml_norm = s.normalize( yamlparsed, partial=True, norm_kws={'relative_path_root': yamlparsed['working_dir']}) except rapidjson.NormalizationError: __display_progress(True, yamlparsed, 'un-normalized') raise __display_progress(verbose, yml_norm, 'normalized') return yml_norm
[docs]def prep_yaml(files, yaml_param=None, directory_for_clones=None, model_submission=False, verbose=False): r"""Prepare yaml to be parsed by rapidjson including covering backwards compatible options. Args: files (str, list): Either the path to a single yaml file or a list of yaml files. Entries can also be opened file descriptors for files containing YAML documents or pre-loaded YAML documents. yaml_param (dict, optional): Parameters that should be used in mustache formatting of YAML files. Defaults to None and is ignored. directory_for_clones (str, optional): Directory that git repositories should be cloned into. Defaults to None and the current working directory will be used. model_submission (bool, optional): If True, the YAML will be evaluated as a submission to the yggdrasil model repository and model_only will be set to True. Defaults to False. verbose (bool, optional): If True, steps of the YAML parsing process will be printed. Defaults to False. Returns: dict: YAML ready to be parsed using schema. """ from import IntegrationServiceManager # Load each file if not isinstance(files, list): files = [files] yamls = [load_yaml(f, yaml_param=yaml_param, directory_for_clones=directory_for_clones, model_submission=model_submission, verbose=verbose) for f in files] # Load files pointed to first = True files = [] while first or files: for f in files: yamls.append(load_yaml(f, yaml_param, directory_for_clones=directory_for_clones)) first = False files = [] for y in yamls: if 'include' in y: new_files = y.pop('include') if not isinstance(new_files, list): new_files = [new_files] for f in new_files: if not os.path.isabs(f): f = os.path.join(y['working_dir'], f) files.append(f) # Replace references to services with service descriptions for i, y in enumerate(yamls): services = y.pop('services', []) if 'service' in y: services.append(y.pop('service')) for x in services: request = {'action': 'start'} for k in ['name', 'yamls', 'yaml_param']: if k in x: request[k] = x.pop(k) if 'type' in x: x.setdefault('service_type', x.pop('type')) x.setdefault('for_request', True) cli = IntegrationServiceManager(**x) response = cli.send_request(**request) response['working_dir'] = os.getcwd() assert response.pop('status') == 'complete' y['models'].append(response) # Combine models & connections yml_all = {'models': [], 'connections': []} for yml in yamls: yml_all['models'] += yml['models'] yml_all['connections'] += yml['connections'] return yml_all
[docs]def parse_yaml(files, complete_partial=False, partial_commtype=None, model_only=False, model_submission=False, yaml_param=None, directory_for_clones=None, verbose=False): r"""Parse list of yaml files. Args: files (str, list): Either the path to a single yaml file or a list of yaml files. complete_partial (bool, optional): If True, unpaired input/output channels are allowed and reserved for use (e.g. for calling the model as a function). Defaults to False. partial_commtype (dict, optional): Communicator kwargs that should be be used for the connections to the unpaired channels when complete_partial is True. Defaults to None and will be ignored. model_only (bool, optional): If True, the YAML will not be evaluated as a complete integration and only the individual components will be parsed. Defaults to False. model_submission (bool, optional): If True, the YAML will be evaluated as a submission to the yggdrasil model repository and model_only will be set to True. Defaults to False. yaml_param (dict, optional): Parameters that should be used in mustache formatting of YAML files. Defaults to None and is ignored. directory_for_clones (str, optional): Directory that git repositories should be cloned into. Defaults to None and the current working directory will be used. verbose (bool, optional): If True, steps of the YAML parsing process will be printed. Defaults to False. Raises: ValueError: If the yml dictionary is missing a required keyword or has an invalid value. RuntimeError: If one of the I/O channels is not initialized with driver information. Returns: dict: Dictionary of information parsed from the yamls. """ s = get_schema() # Parse files using schema # TODO: only run backward_compat if deprecation warnings raised run_backwards_compat = True yml_norm = prep_yaml(files, yaml_param=yaml_param, directory_for_clones=directory_for_clones, model_submission=model_submission, verbose=verbose) __display_progress(verbose, yml_norm, "prepped") if model_submission: models = [] for yml in yml_norm['models']: wd = yml.pop('working_dir', None) x = s.validate_model_submission(yml) if wd: x['working_dir'] = wd models.append(x) yml_norm['models'] = models model_only = True # Determine if any of the models require synchronization timesync_names = [] for yml in yml_norm['models']: if yml.get('timesync', False): # if yml['timesync'] is True: # yml['timesync'] = 'timesync' # if not isinstance(yml['timesync'], list): # yml['timesync'] = [yml['timesync']] for i, tsync in enumerate(yml['timesync']): if isinstance(tsync, bool): tsync = {'name': 'timesync'} yml['timesync'][i] = tsync elif isinstance(tsync, str): tsync = {'name': tsync} yml['timesync'][i] = tsync timesync_names.append(tsync['name']) yml.setdefault('timesync_client_of', []) yml['timesync_client_of'].append(tsync['name']) for tsync in set(timesync_names): for m in yml_norm['models']: if m['name'] == tsync: assert m['language'] == 'timesync' m.update(is_server=True, inputs=[], outputs=[]) break else: yml_norm['models'].append({'name': tsync, 'args': [], 'language': 'timesync', 'is_server': True, 'working_dir': os.getcwd(), 'inputs': [], 'outputs': []}) # Parse models, then connections to ensure connections can be processed existing = {k: {} for k in ['input', 'output', 'model', 'connection', 'server']} existing['aliases'] = {'inputs': {}, 'outputs': {}} existing['pairs'] = [] existing['input_drivers'] = [] existing['output_drivers'] = [] existing['backward'] = run_backwards_compat for yml in yml_norm['models']: existing = parse_component(yml, 'model', existing=existing) backward_compat(yml_norm, existing) __display_progress(verbose, existing, "After parsing models") for yml in yml_norm['connections']: existing = parse_component(yml, 'connection', existing=existing) __display_progress(verbose, existing, "After parsing connections") # Exit early if model_only: for x in yml_norm['models']: for io in ['inputs', 'outputs']: x[io] = [z for z in x[io] if not z.get('is_default', False)] if not x[io]: del x[io] return yml_norm # Add stand-in model that uses unpaired channels if complete_partial: existing = complete_partial_integration( existing, complete_partial, partial_commtype=partial_commtype) __display_progress(verbose, existing, "After completing partial integration") # Create server/client connections for srv, srv_info in existing['server'].items(): clients = srv_info['clients'] if srv not in existing['input']: continue yml = {'inputs': [{'name': x} for x in clients], 'outputs': [{'name': srv}], 'driver': 'RPCRequestDriver', 'name': existing['input'][srv]['model_driver'][0]} if srv_info.get('replaces', None): yml['outputs'][0].update({ k: v for k, v in srv_info['replaces']['input'].items() if k not in ['name']}) yml['response_kwargs'] = { k: v for k, v in srv_info['replaces']['output'].items() if k not in ['name']} existing = parse_component(yml, 'connection', existing=existing) existing['model'][yml['dst_models'][0]]['clients'] = yml['src_models'] existing.pop('server') # Make sure that servers have clients and clients have servers for k, v in existing['model'].items(): if v.get('is_server', False): for x in existing['model'].values(): if v['name'] in x.get('client_of', []): break else: raise YAMLSpecificationError( "Server '%s' does not have any clients.", k) elif v.get('client_of', False): for s in v['client_of']: missing_servers = [] if s not in existing['model']: missing_servers.append(s) if missing_servers: raise YAMLSpecificationError( f"Servers {missing_servers} do not exist, " f"but '{v['name']}' is a client of them. " f"(models: {list(existing['model'].keys())})") # Make sure that I/O channels initialized opp_map = {'input': 'output', 'output': 'input'} for io in ['input', 'output']: remove = [] for k in list(existing[io].keys()): v = existing[io][k] if v.get('__connection_count', 0) > 0: continue if 'driver' not in v: if v.get('is_default', False): remove.append(k) elif 'default_file' in v: new_conn = {io + 's': [v['default_file']], opp_map[io] + 's': [v]} existing = parse_component(new_conn, 'connection', existing=existing) elif (io == 'input') and ('default_value' in v): # TODO: The keys moved should be automated based on schema # if the ValueComm has anymore parameters added vdef = {'name': v['name'], 'default_value': v.pop('default_value'), 'count': v.pop('count', 1), 'commtype': 'value'} new_conn = {'inputs': [vdef], 'outputs': [v]} existing = parse_component(new_conn, 'connection', existing=existing) else: raise YAMLSpecificationError( "No driver established for %s channel %s" % (io, k)) # Remove unused default channels for k in remove: for m in existing[io][k]['model_driver']: for i, x in enumerate(existing['model'][m][io + 's']): if x['name'] == k: existing['model'][m][io + 's'].pop(i) break existing[io].pop(k) __display_progress(verbose, existing, "After initializing model IO") # Link io drivers back to models existing = link_model_io(existing) __display_progress(verbose, existing, "Finalized yaml info") return existing
[docs]def complete_partial_integration(existing, name, partial_commtype=None): r"""Patch input/output channels that are not connected to a stand-in model. Args: existing (dict): Dictionary of existing components. name (str): Name that should be given to the new model. partial_commtype (dict, optional): Communicator kwargs that should be be used for the connections to the unpaired channels. Defaults to None and will be ignored. Returns: dict: Updated dictionary of components. """ if isinstance(name, bool): name = 'dummy_model' new_model = {'name': name, 'language': 'dummy', 'args': 'dummy', 'working_dir': os.getcwd(), 'inputs': [], 'outputs': []} new_connections = [] # Locate unmatched channels miss = {} dir2opp = {'input': 'output', 'output': 'input'} for io in dir2opp.keys(): miss[io] = [k for k in existing[io].keys() if not (((io == 'input') and (k in existing['server'])) or existing[io][k].get('is_default', False) or existing[io][k].get('__connection_count', 0) > 0)] for srv_info in existing['server'].values(): if not srv_info['clients']: new_model.setdefault('client_of', []) new_model['client_of'].append(srv_info['model_name']) # TODO: Check that there arn't any missing servers # for conn in existing['connection'].values(): # for io1, io2 in dir2opp.items(): # if ((io1 + 's') in conn): # for x in conn[io1 + 's']: # if x in miss[io2]: # miss[io2].remove(x) # Create connections to dummy model for io1, io2 in dir2opp.items(): for i in miss[io1]: dummy_channel = f"{name}:dummy_{i.replace(':', '-')}" dummy_comm = copy.deepcopy(existing[io1][i]) for k in ['address', 'for_service', 'commtype', 'host']: dummy_comm.pop(k, None) dummy_comm['name'] = dummy_channel if partial_commtype is not None: dummy_comm.update(partial_commtype) new_model[io2 + 's'].append(dummy_comm) new_connections.append({io1 + 's': [{'name': dummy_channel}], io2 + 's': [{'name': i}]}) # Parse new components existing = parse_component(new_model, 'model', existing=existing) for new_conn in new_connections: existing = parse_component(new_conn, 'connection', existing=existing) return existing
[docs]def parse_component(yml, ctype, existing=None): r"""Parse a yaml entry for a component, adding it to the list of existing components. Args: yml (dict): YAML dictionary for a component. ctype (str): Component type. This can be 'input', 'output', 'model', or 'connection'. existing (dict): Dictionary of existing components. Raises: TypeError: If yml is not a dictionary. ValueError: If dtype is not 'input', 'output', 'model', or 'connection'. ValueError: If the component already exists. Returns: dict: All components identified. """ assert existing if not isinstance(yml, dict): # pragma: debug raise YAMLSpecificationError("Component entry in yml must be a dictionary.") # Parse based on type if ctype == 'model': existing = parse_model(yml, existing) elif ctype == 'connection': existing = parse_connection(yml, existing) # Ensure component dosn't already exist if yml['name'] in existing[ctype]: pprint.pprint(existing) pprint.pprint(yml) raise YAMLSpecificationError("%s is already a registered '%s' component." % ( yml['name'], ctype)) existing[ctype][yml['name']] = yml return existing
[docs]def parse_model(yml, existing): r"""Parse a yaml entry for a model. Args: yml (dict): YAML dictionary for a model. existing (dict): Dictionary of existing components. Returns: dict: Updated log of all entries. """ yml = backward_compat_models(yml, existing) if ((yml.get('driver', None) == 'GCCModelDriver' and any(x.endswith('.cpp') for x in yml.get('args', [])))): yml['language'] = 'cpp' language = yml.pop('language') yml['driver'] = constants.COMPONENT_REGISTRY['model']['subtypes'][language] # Add server input if yml.get('is_server', False): srv = {'name': yml['name'] + ':' + yml['name'], 'commtype': 'default', 'datatype': {'type': 'bytes'}, 'args': yml['name'] + '_SERVER', 'working_dir': yml['working_dir']} if yml.get('function', False) and isinstance(yml['is_server'], bool): if (len(yml['inputs']) == 1) and (len(yml['outputs']) == 1): yml['is_server'] = {'input': yml['inputs'][0]['name'], 'output': yml['outputs'][0]['name']} else: raise YAMLSpecificationError( "The 'is_server' parameter is boolean for the model '%s' " "and the 'function' parameter is also set. " "If the 'function' and 'is_server' parameters are used " "together, the 'is_server' parameter must be a mapping " "with 'input' and 'output' entries specifying which of " "the function's input/output variables should be received" "/sent from/to clients. e.g. \n" "\t-input: input_variable\n" "\t-output: output_variables\n" % yml['name']) replaces = None if isinstance(yml['is_server'], dict): replaces = {} for io in ['input', 'output']: replaces[io] = None # if not yml['is_server'][io].startswith('%s:' % yml['name']): # yml['is_server'][io] = yml['name'] + ':' + yml['is_server'][io] for i, x in enumerate(yml[io + 's']): if x['name'] == yml['is_server'][io]: replaces[io] = x replaces[io + '_index'] = i yml[io + 's'].pop(i) break else: raise YAMLSpecificationError( f"Failed to locate an existing {io} channel " f"with the name {yml['is_server'][io]}:\n" f"{pprint.pformat(yml)}.") srv['server_replaces'] = replaces yml['inputs'].insert(replaces['input_index'], srv) else: yml['inputs'].append(srv) yml['clients'] = [] existing['server'].setdefault(srv['name'], {'clients': [], 'model_name': yml['name']}) if replaces: existing['server'][srv['name']]['replaces'] = replaces # Mark timesync clients timesync = yml.pop('timesync_client_of', []) if timesync: yml.setdefault('client_of', []) yml['client_of'] += timesync # Add client output if yml.get('client_of', []): for srv in yml['client_of']: srv_name = f'{srv}:{srv}' if srv in timesync: cli_name = '%s:%s' % (yml['name'], srv) else: cli_name = '%s:%s_%s' % (yml['name'], srv, yml['name']) cli = {'name': cli_name, 'working_dir': yml['working_dir']} yml['outputs'].append(cli) existing['server'].setdefault(srv_name, {'clients': [], 'model_name': srv}) existing['server'][srv_name]['clients'].append(cli_name) # Model index and I/O channels yml['model_index'] = len(existing['model']) prefix = yml['name'] + ':' for io in ['inputs', 'outputs']: for x in yml[io]: if not x['name'].startswith(prefix): new_name = prefix + x['name'] existing['aliases'][io][x['name']] = new_name if x.get('is_default', False): existing['aliases'][io][yml['name']] = new_name x['name'] = new_name backward_compat_model_io(io[:-1], x, existing, yml) if ((yml.get('function', False) and (not x.get('outside_loop', False)) and yml.get('is_server', False))): x.setdefault('dont_copy', True) if yml.get('allow_threading', False) or ( (yml.get('copies', 1) > 1) and (not x.get('dont_copy', False))): x['allow_multiple_comms'] = True # TODO: Replace model_driver with partner_model? x['model_driver'] = [yml['name']] x['partner_model'] = yml['name'] if yml.get('copies', 1) > 1: x['partner_copies'] = yml['copies'] x['partner_language'] = language existing = parse_component(x, io[:-1], existing=existing) for k in yml.get('env', {}).keys(): if not isinstance(yml['env'][k], str): yml['env'][k] = json.dumps(yml['env'][k]) return existing
[docs]def parse_connection(yml, existing): r"""Parse a yaml entry for a connection between I/O channels. Args: yml (dict): YAML dictionary for a connection. existing (dict): Dictionary of existing components. Raises: RuntimeError: If the 'inputs' entry is not a model output or file. RuntimeError: If neither the 'inputs' or 'outputs' entries correspond to model I/O channels. Returns: dict: Updated log of all entries. """ backward_compat_connections(yml, existing) # File input is_file = {'inputs': [], 'outputs': []} iname_list = [] for x in yml['inputs']: is_file['inputs'].append('filetype' in x) if is_file['inputs'][-1]: if x.get('serializer', {}) == {'seritype': 'default'}: x['serializer'] = {'seritype': 'direct'} fname = os.path.expanduser(x['name']) if not os.path.isabs(fname): fname = os.path.join(x['working_dir'], fname) fname = os.path.normpath(fname) if (((not os.path.isfile(fname)) and (not x.get('wait_for_creation', False)))): # pragma: debug raise YAMLSpecificationError( f"Input file does not exist: \"{x['name']}\"") x['address'] = fname elif 'default_value' in x: x['address'] = x['default_value'] else: if '::' in x['name']: x['name'], var = x['name'].split('::') assert 'vars' not in x x['vars'] = [{'name': var}] if x['name'] in existing['aliases']['outputs']: x['name'] = existing['aliases']['outputs'][x['name']] if x['name'] not in existing['output']: raise YAMLSpecificationError( f"Input '{x['name']}' does not match a corresponding " f"output.") iname_list.append(x['name']) # File output oname_list = [] for x in yml['outputs']: is_file['outputs'].append('filetype' in x) if is_file['outputs'][-1]: if x.get('serializer', {}) == {'seritype': 'default'}: x['serializer'] = {'seritype': 'direct'} fname = os.path.expanduser(x['name']) if not x.get('in_temp', False): if not os.path.isabs(fname): fname = os.path.join(x['working_dir'], fname) fname = os.path.normpath(fname) x['address'] = fname else: if '::' in x['name']: x['name'], var = x['name'].split('::') assert 'vars' not in x x['vars'] = [{'name': var}] if x['name'] in existing['aliases']['inputs']: x['name'] = existing['aliases']['inputs'][x['name']] if x['name'] not in existing['input']: raise YAMLSpecificationError( f"Output '{x['name']}' does not match a corresponding " f"input.") oname_list.append(x['name']) iname = ','.join(iname_list) oname = ','.join(oname_list) if not iname: args = oname elif not oname: args = iname else: args = '%s_to_%s' % (iname, oname) name = args if all(is_file['inputs']) and all(is_file['outputs']): # pragma: debug raise YAMLSpecificationError( f"Both the input and output fro this connection appear to be " f"files:\n{pprint.pformat(yml)}") # Connection xx = {'src_models': [], 'dst_models': [], 'inputs': [], 'outputs': []} for i, y in enumerate(yml['inputs']): if is_file['inputs'][i] or ('default_value' in y): xx['inputs'].append(y) else: new = existing['output'][y['name']] for k, v in y.items(): new.setdefault(k, v) xx['inputs'].append(new) xx['src_models'] += existing['output'][y['name']]['model_driver'] new.setdefault('__connection_count', 0) new['__connection_count'] += 1 for i, y in enumerate(yml['outputs']): if is_file['outputs'][i]: xx['outputs'].append(y) else: new = existing['input'][y['name']] for k, v in y.items(): new.setdefault(k, v) xx['outputs'].append(new) xx['dst_models'] += existing['input'][y['name']]['model_driver'] new.setdefault('__connection_count', 0) new['__connection_count'] += 1 # TODO: Combine inputs/outputs that access variables from the same # comm connection # TODO: Split comms if models are not co-located and the main # process needs access to the message passed yml.update(xx) yml.setdefault('driver', 'ConnectionDriver') yml.setdefault('name', name) return existing
# The following are function add to allow backwards compatability of older # yaml schemas
[docs]def rwmeth2filetype(rw_meth): r"""Get the alternate properties that corresponding to the old read_meth/write_meth keywords. Args: rw_meth (str): Read/write method name. Returns: dict: Property values equivalent to provided read/write method. """ out = {} if rw_meth == 'all': out['filetype'] = 'binary' elif rw_meth == 'line': out['filetype'] = 'ascii' elif rw_meth == 'table_array': out['filetype'] = 'table' out['as_array'] = True else: out['filetype'] = rw_meth return out
[docs]def backward_compat_model_io(io, instance, iodict, model): if iodict['backward']: # Match deprecated driver options if ('driver' in instance) and ('args' in instance): instance['working_dir'] = model['working_dir'] opp_map = {'input': 'output', 'output': 'input'} for i, (opp_arg, opp_name) in enumerate(iodict[f'{opp_map[io]}_drivers']): if instance['args'] == opp_arg: if io == 'input': iodict['pairs'].append( (iodict[f'{opp_map[io]}_drivers'].pop(i)[1], instance['name'])) else: # pragma: debug # This won't be called because inputs are processed first # but this code is here for symmetries sake iodict['pairs'].append( (instance['name'], iodict[f'{opp_map[io]}_drivers'].pop(i)[1])) instance.pop('args') instance.pop('driver') break else: key = instance['args'] if 'filetype' in instance: cpy = copy.deepcopy(instance) instance.clear() instance['name'] = cpy['name'] cpy['name'] = cpy['args'] key += f"_{instance['name']}" iodict[opp_map[io]][key] = cpy cpy.pop('args') cpy.pop('driver') iodict[f'{io}_drivers'].append((key, instance['name'])) return instance
[docs]def backward_compat_models(instance, iodict): if ((iodict['backward'] and instance.get('language', 'executable') == 'executable')): args_ext = os.path.splitext(instance['args'][0])[-1] if args_ext in constants.EXT2LANG: instance['language'] = constants.EXT2LANG[args_ext] return instance
[docs]def backward_compat_connections(instance, iodict): if iodict['backward']: files = [x for x in instance['inputs'] if 'filetype' in x] files += [x for x in instance['outputs'] if 'filetype' in x] # Replace old read/write methd with filetype for k in ['read_meth', 'write_meth']: val = instance.pop(k, None) if val is None: continue ftype = rwmeth2filetype(val) if files: for x in files: if x['filetype'] == 'binary': x.update(ftype) else: raise YAMLSpecificationError( "Deprecated connection parameters " "'write_meth' and 'read_meth' are " "only valid for connections to files.") return instance
[docs]def backward_compat(instance, iodict): r"""Normalize a yggdrasil input file for use with backward compatible features after it has been normalized via rapidjson. Args: instance (dict): JSON document pre-normalized via rapidjson. iodict (dict): Utility dictionary tracking processed elements. Returns: dict: Backward compatible instance. """ if iodict['backward']: new_connections = [] # Create direct connections from output to input for (oname, iname) in iodict['pairs']: oyml = iodict['output'][oname] iyml = iodict['input'][iname] conn = dict(inputs=[{'name': oname}], outputs=[{'name': iname}]) oyml.pop('working_dir', None) iyml.pop('working_dir', None) new_connections.append(conn) # File input for k, v in iodict['input_drivers']: iyml = iodict['input'][v] fyml = iodict['output'].pop(k) conn = dict(inputs=[fyml], outputs=[{'name': v}], working_dir=fyml['working_dir']) new_connections.append(conn) # File output for k, v in iodict['output_drivers']: oyml = iodict['output'][v] fyml = iodict['input'].pop(k) conn = dict(outputs=[fyml], inputs=[{'name': v}], working_dir=fyml['working_dir']) new_connections.append(conn) # Transfer keyword arguments from input/output to connection for conn in new_connections: instance['connections'].append(conn) # Empty registry of orphan input/output drivers for k in ['input_drivers', 'output_drivers', 'pairs']: iodict[k] = [] return instance