import os
import copy
import pprint
import yaml
import json
import importlib
from collections import OrderedDict
from jsonschema.exceptions import ValidationError
from yggdrasil import metaschema
_schema_fname = os.path.abspath(os.path.join(
os.path.dirname(__file__), '.ygg_schema.yml'))
_schema = None
_constants_separator = (
"\n# ======================================================\n"
"# Do not edit this file past this point as the following\n"
"# is generated by yggdrasil.schema.update_constants\n"
"# ======================================================\n")
[docs]class SchemaDict(OrderedDict):
r"""OrderedDict subclass for ordering schemas on read in Python 2."""
def __repr__(self):
return pprint.pformat(dict(self))
[docs]def ordered_load(stream, object_pairs_hook=SchemaDict, **kwargs):
r"""Load YAML document from a file using a specified class to represent
mapping types that allows for ordering.
Args:
stream (file): File stream to load the schema YAML from.
object_pairs_hook (type, optional): Class that should be used to
represent loaded maps. Defaults to SchemaDict.
**kwargs: Additional keyword arguments are passed to decode_yaml.
Returns:
object: Result of ordered load.
"""
kwargs['sorted_dict_type'] = object_pairs_hook
out = metaschema.encoder.decode_yaml(stream, **kwargs)
return out
[docs]def ordered_dump(data, **kwargs):
r"""Dump object as a YAML document, representing SchemaDict objects as
mapping type.
Args:
data (object): Python object that should be dumped.
**kwargs: Additional keyword arguments are passed to encode_yaml.
Returns:
str: YAML document representating data.
"""
kwargs['sorted_dict_type'] = [SchemaDict, OrderedDict]
return metaschema.encoder.encode_yaml(data, **kwargs)
[docs]def clear_schema():
r"""Clear global schema."""
global _schema
_schema = None
[docs]def init_schema(fname=None):
r"""Initialize global schema."""
global _schema
if _schema is None:
_schema = load_schema(fname)
[docs]def create_schema():
r"""Create a new schema from the registry."""
from yggdrasil.components import init_registry, registering
with registering():
x = SchemaRegistry(init_registry(recurse=True))
update_constants(x)
return x
[docs]def load_schema(fname=None):
r"""Return the yggdrasil schema for YAML options.
Args:
fname (str, optional): Full path to the file that the schema should be
loaded from. If the file dosn't exist, it is created. Defaults to
_schema_fname.
Returns:
dict: yggdrasil YAML options.
"""
if fname is None:
fname = _schema_fname
if not os.path.isfile(fname):
x = create_schema()
x.save(fname)
return SchemaRegistry.from_file(fname)
[docs]def get_schema(fname=None):
r"""Return the yggdrasil schema for YAML options.
Args:
fname (str, optional): Full path to the file that the schema should be
loaded from. If the file dosn't exist, it is created. Defaults to
_schema_fname.
Returns:
dict: yggdrasil YAML options.
"""
global _schema
if fname is None:
init_schema()
out = _schema
else:
out = load_schema(fname)
return out
[docs]def convert_extended2base(s):
r"""Covert schema from the extended form to a strictly JSON form.
Args:
s (object): Object to updated.
Returns:
object: Updated JSON object.
"""
# TODO: Automate this on classes
type_map = {'int': 'integer', 'uint': 'integer',
'float': 'number', 'complex': 'string',
'unicode': 'string', 'bytes': 'string',
'function': 'string', 'class': 'string',
'instance': 'string', '1darray': 'array',
'ndarray': 'array', 'obj': 'object',
'ply': 'object',
'any': ["number", "string", "boolean", "object", "array",
"null"]}
if isinstance(s, (list, tuple)):
s = [convert_extended2base(x) for x in s]
elif isinstance(s, (dict, OrderedDict)):
if 'type' in s:
if isinstance(s['type'], str):
if s['type'] in ['schema']:
s = {"$ref": "#/definitions/schema"}
elif s['type'] in type_map:
s['type'] = type_map[s['type']]
s.pop('class', None)
else:
assert s['type'] not in ['scalar']
# Scalars not currently included in the schema
# elif s['type'] in ['scalar']:
# s.pop("precision", None)
# s.pop("units", None)
# s['type'] = type_map[s.pop('subtype')]
elif isinstance(s['type'], list):
assert 'schema' not in s['type']
assert 'scalar' not in s['type']
s['type'] = [type_map.get(t, t) for t in s['type']]
if all([t == s['type'][0] for t in s['type']]):
s['type'] = s['type'][0]
opt = copy.deepcopy(s.get('options', None))
s = {k: convert_extended2base(v) for k, v in s.items()}
if opt is not None:
s['options'] = opt
return s
[docs]def get_json_schema(fname_dst=None, indent=None):
r"""Return the yggdrasil schema as a strictly JSON schema without
any of the extended datatypes.
Args:
fname_dst (str, optional): Full path to file where the JSON
schema should be saved. Defaults to None and no file is
created.
indent (str, optional): Indentation that should be used when saving
the schema to a file.
Returns:
dict: Converted structure.
"""
s = get_schema()
out = copy.deepcopy(s.schema)
out['definitions']['schema'] = copy.deepcopy(metaschema._metaschema)
out = convert_extended2base(out)
if fname_dst is not None:
with open(fname_dst, 'w') as fd:
json.dump(out, fd, indent=indent)
return out
[docs]def update_constants(schema=None):
r"""Update constants.py with info from the schema."""
from yggdrasil.components import import_component
from yggdrasil.drivers.CompiledModelDriver import (
get_compilation_tool_registry)
if schema is None:
schema = get_schema()
def as_lines(x, newline='\n', key_order=None):
out = ""
if isinstance(x, dict):
if key_order is None:
key_order = list(sorted(x.keys()))
out += "{" + newline
for k in key_order:
v = x[k]
out += " %s: %s," % (
repr(k), as_lines(v, newline=(newline + ' '))) + newline
out += "}"
elif isinstance(x, list):
out += "[" + newline
out += " " + ", ".join(
[as_lines(xx, newline=(newline + ' ')) for xx in x]) + "]"
else:
out += repr(x)
return out
filename = os.path.join(os.path.dirname(__file__), 'constants.py')
# Component information
component_registry = {}
for k, v in schema.items():
component_registry[k] = {
'module': v.module,
'default': v.default_subtype,
'base': v.base_subtype_class_name,
'key': v.subtype_key,
'subtypes': v.subtype2class}
# Language driver information
drivers = {k: import_component('model', v)
for k, v in component_registry['model']['subtypes'].items()}
language_cat = ['compiled', 'interpreted', 'build', 'dsl', 'other']
typemap = {'compiler': 'compiled', 'interpreter': 'interpreted'}
lang2ext = {'yaml': '.yml', 'executable': '.exe'}
languages = {k: [] for k in language_cat}
languages_with_aliases = {k: [] for k in language_cat}
language_properties = {}
compiler_env_vars = {}
compilation_tool_vars = {}
complete = []
aliased_languages = {}
for k, drv in drivers.items():
if drv.language in complete:
continue
complete.append(drv.language)
drv_type = typemap.get(drv.executable_type, drv.executable_type)
if drv.language_ext:
if k not in lang2ext:
assert isinstance(drv.language_ext, list)
lang2ext[k] = drv.language_ext[0]
for ka in drv.language_aliases:
lang2ext[ka] = lang2ext[k]
languages.setdefault(drv_type, [])
languages[drv_type].append(drv.language)
languages_with_aliases.setdefault(drv_type, [])
languages_with_aliases[drv_type].append(drv.language)
languages_with_aliases[drv_type] += drv.language_aliases
if drv.language_aliases:
aliased_languages[drv.language] = [drv.language] + drv.language_aliases
language_properties[drv.language] = {
'executable_type': drv_type,
'is_typed': drv.is_typed,
'full_language': drv.full_language}
languages = {k: sorted(v) for k, v in languages.items()}
languages_with_aliases = {k: sorted(v) for k, v in
languages_with_aliases.items()}
for x in ['compiler', 'linker', 'archiver']:
reg = get_compilation_tool_registry(x).get('by_language', {})
for lang, tools in reg.items():
for v in tools.values():
k = v.toolname
if v.is_build_tool or (k in compilation_tool_vars):
continue
compilation_tool_vars[k] = {
'exec': v.default_executable_env,
'flags': v.default_flags_env}
if (x == 'compiler') and (lang not in compiler_env_vars):
compiler_env_vars[lang] = compilation_tool_vars[k].copy()
language_cat = list(languages.keys())
with open(filename, 'r') as fd:
lines = [fd.read().split(_constants_separator)[0],
_constants_separator[1:]]
lines += [
"", "# Component registry",
f"COMPONENT_REGISTRY = {as_lines(component_registry)}"]
lines += [
"", "# Language driver constants",
"LANG2EXT = %s" % as_lines(lang2ext),
"EXT2LANG = {v: k for k, v in LANG2EXT.items()}",
"LANGUAGES = %s" % as_lines(languages, key_order=language_cat)]
lines.append(
"LANGUAGES['all'] = (\n LANGUAGES[%s]"
% repr(language_cat[0]))
lines += [" + LANGUAGES[%s]" % repr(k)
for k in language_cat[1:]]
lines[-1] += ")"
lines += [
"LANGUAGES_WITH_ALIASES = %s" % as_lines(languages_with_aliases,
key_order=language_cat)]
lines.append(
"LANGUAGES_WITH_ALIASES['all'] = (\n LANGUAGES_WITH_ALIASES[%s]"
% repr(language_cat[0]))
lines += [" + LANGUAGES_WITH_ALIASES[%s]" % repr(k)
for k in language_cat[1:]]
lines[-1] += ")"
lines += [
"ALIASED_LANGUAGES = %s" % as_lines(aliased_languages)]
lines += [
"COMPILER_ENV_VARS = %s" % as_lines(compiler_env_vars),
"COMPILATION_TOOL_VARS = %s" % as_lines(compilation_tool_vars)]
lines += [
"LANGUAGE_PROPERTIES = %s" % as_lines(language_properties)]
with open(filename, 'w') as fd:
fd.write('\n'.join(lines) + '\n')
[docs]class ComponentSchema(object):
r"""Schema information for one component.
Args:
schema_type (str): The name of the component.
subtype_key (str): The name of the schema property/class attribute
that should be used to differentiate between subtypes of this
component.
schema_registry (SchemaRegistry, optional): Registry of schemas
that this schema is dependent on.
**kwargs: Additional keyword arguments are entries in the component
schema.
Args:
schema_type (str): The name of the component.
schema_registry (SchemaRegistry): Registry of schemas.
subtype_key (str): Schema property that is used to differentiate between
subtypes of this component.
schema_subtypes (dict): Mapping between component class names and the
associated values of the subtype_key property for this component.
"""
def __init__(self, schema_type, subtype_key, schema_registry=None,
module=None, schema_subtypes=None):
self._storage = SchemaDict()
self._base_schema = None
self.schema_type = schema_type
self.subtype_key = subtype_key
self.schema_registry = schema_registry
if schema_subtypes is None:
schema_subtypes = {}
self.schema_subtypes = schema_subtypes
self.module = module
super(ComponentSchema, self).__init__()
[docs] def identify_subtype(self, doc):
r"""Identify the subtype associated with a document by validating it
against the schemas for the different subtypes.
Args:
doc (dict): JSON object that conforms to one of the component subtypes.
Returns:
str: Name of the subtype that valdiates the provided document.
"""
for subtype in self.subtypes:
subtype_schema = self.get_subtype_schema(subtype)
try:
metaschema.validate_instance(doc, subtype_schema)
return subtype
except ValidationError:
pass
raise ValueError("Could not determine subtype "
"for document: %s" % doc) # pragma: debug
[docs] def get_subtype_schema(self, subtype, unique=False, relaxed=False,
allow_instance=False, for_form=False):
r"""Get the schema for the specified subtype.
Args:
subtype (str): Component subtype to return schema for. If 'base',
the schema for evaluating the component base will be returned.
unique (bool, optional): If True, the returned schema will only
contain properties that are specific to the specified subtype.
If subtype is 'base', these will be properties that are valid
for all of the registerd subtypes. Defaults to False.
relaxed (bool, optional): If True, the schema will allow additional
properties. Defaults to False.
allow_instance (bool, optional): If True, the returned schema will
validate instances of this component in addition to documents
describing a component. Defaults to False.
for_form (bool, optional): If True, the returned schema will be
formatted for easy parsing by form generation tools. Defaults
to False. Causes relaxed and allow_instance to be ignored.
Returns:
dict: Schema for specified subtype.
"""
if for_form:
relaxed = False
allow_instance = False
if subtype == 'base':
out = copy.deepcopy(self._base_schema)
# Add additional properties that apply to specific subtypes
if not unique:
out['additionalProperties'] = False
for x in self._storage.values():
for k, v in x['properties'].items():
if (k != self.subtype_key):
if (k not in out['properties']):
out['properties'][k] = copy.deepcopy(v)
if for_form:
out['properties'][k]['options'] = {
'dependencies': {self.subtype_key: []}}
if for_form and ('options' in out['properties'][k]):
out['properties'][k]['options']['dependencies'][
self.subtype_key] += (
x['properties'][self.subtype_key]['enum'])
else:
if subtype not in self._storage:
s2c = self.subtype2class
if subtype in s2c:
subtype = s2c[subtype]
out = copy.deepcopy(self._storage[subtype])
# Remove properties that apply to all subtypes
if unique:
out['additionalProperties'] = True
if 'required' in out:
out['required'] = sorted(
list(set(out['required'])
- set(self._base_schema.get('required', []))))
if not out['required']:
del out['required']
for k in self._base_schema['properties'].keys():
if (k != self.subtype_key) and (k in out['properties']):
del out['properties'][k]
if not out['properties']: # pragma: no cover
del out['properties']
if relaxed:
out['additionalProperties'] = True
if allow_instance:
if subtype == 'base':
comp_cls = self.base_subtype_class
else:
from yggdrasil.components import import_component
comp_cls = import_component(
self.schema_type, subtype=subtype)
out = {'oneOf': [out, {'type': 'instance',
'class': comp_cls}]}
return out
[docs] def get_schema(self, relaxed=False, allow_instance=False, for_form=False):
r"""Get the schema defining this component.
Args:
relaxed (bool, optional): If True, the returned schema (and any
definitions it includes) are relaxed to allow for objects with
objects with additional properties to pass validation. Defaults
to False.
allow_instance (bool, optional): If True, the returned schema will
validate instances of this component in addition to documents
describing a component. Defaults to False.
for_form (bool, optional): If True, the returned schema will be
formatted for easy parsing by form generation tools. Defaults
to False. Causes relaxed and allow_instance to be ignored.
Returns:
dict: Schema for this component.
"""
out = {'description': 'Schema for %s components.' % self.schema_type,
'title': self.schema_type}
if for_form:
out.update(self.get_subtype_schema('base', for_form=for_form))
allow_instance = False
else:
out['allOf'] = [self.get_subtype_schema('base', relaxed=relaxed),
{'anyOf': [self.get_subtype_schema(x, unique=True)
for x in sorted(self._storage.keys())]}]
if allow_instance:
out['oneOf'] = [{'allOf': out.pop('allOf')},
{'type': 'instance',
'class': self.base_subtype_class}]
return out
@property
def schema(self):
r"""dict: Schema for this component."""
return self.get_schema()
@property
def full_schema(self):
r"""dict: Schema for evaluating YAML input file that fully specifies
the properties for each component."""
# TODO: Could be simplified to just 'anyOf' for subtypes, but need
# to reconcile that with schema normalization which uses the
# position in the schema
out = {'description': 'Schema for %s components.' % self.schema_type,
'title': self.schema_type,
'allOf': [self.get_subtype_schema('base', unique=True),
{'anyOf': [self.get_subtype_schema(x)
for x in sorted(self._storage.keys())]}]}
return out
[docs] @classmethod
def from_schema(cls, schema, schema_registry=None):
r"""Construct a ComponentSchema from a schema.
Args:
schema (dict): Schema.
Returns:
ComponentSchema: Schema with information from schema.
"""
schema_type = schema['title']
subt_schema = schema['allOf'][1]['anyOf']
# Determine subtype key
subt_overlap = set(list(subt_schema[0]['properties'].keys()))
subt_props = subt_overlap.copy()
for v in subt_schema[1:]:
ikeys = set(list(v['properties'].keys()))
subt_props |= ikeys
subt_overlap &= ikeys
assert len(subt_overlap) == 1
subtype_key = list(subt_overlap)[0]
assert subtype_key in schema['allOf'][0]['properties']
# Initialize schema
out = cls(schema_type, subtype_key, schema_registry=schema_registry)
out._base_schema = schema['allOf'][0]
for v in subt_schema:
v_class_name = v['title'].split('.')[-1]
out._storage[v_class_name] = v
subtypes = v['properties'][out.subtype_key]['enum']
out.schema_subtypes[v_class_name] = subtypes
v_module = '.'.join(v['title'].split('.')[:-2])
if out.module is None:
out.module = v_module
else:
assert v_module == out.module
# Remove subtype specific properties
for k in subt_props:
if k != out.subtype_key:
del out._base_schema['properties'][k]
out._base_schema['additionalProperties'] = True
# Update subtype properties with general properties
for x in out._storage.values():
for k, v in out._base_schema['properties'].items():
if k != out.subtype_key:
x['properties'][k] = copy.deepcopy(v)
x['additionalProperties'] = False
return out
[docs] @classmethod
def from_registry(cls, schema_type, registry, **kwargs):
r"""Construct a ComponentSchema from a registry entry.
Args:
schema_type (str): Name of component type to build.
registry (dict): Registry information for the component.
**kwargs: Additional keyword arguments are passed to the class
__init__ method.
Returns:
ComponentSchema: Schema with information from classes.
"""
schema_subtypes = {}
for k, v in registry['subtypes'].items():
if v not in schema_subtypes:
schema_subtypes[v] = []
schema_subtypes[v].append(k)
kwargs.update(module=registry['module'],
schema_subtypes=schema_subtypes)
out = cls(schema_type, registry['key'], **kwargs)
for x in registry['classes'].values():
out.append(x, verify=True)
return out
@property
def properties(self):
r"""list: Valid properties for this component."""
return sorted(list(self.get_subtype_schema('base')['properties'].keys()))
[docs] def get_subtype_properties(self, subtype):
r"""Get the valid properties for a specific subtype.
Args:
subtype (str): Name of the subtype to get keys for.
Returns:
list: Valid properties for the specified subtype.
"""
return sorted(list(self.get_subtype_schema(subtype)['properties'].keys()))
@property
def class2subtype(self):
r"""dict: Mapping from class to list of subtypes."""
return self.schema_subtypes
@property
def subtype2class(self):
r"""dict: Mapping from subtype to class."""
out = {}
for k, v in self.schema_subtypes.items():
for iv in v:
out[iv] = k
return out
@property
def base_subtype_class_name(self):
r"""str: Name of base class for the subtype."""
if not getattr(self, '_base_subtype_class_name', None):
self.base_subtype_class
return self._base_subtype_class_name
@property
def base_subtype_class(self):
r"""ComponentClass: Base class for the subtype."""
if not getattr(self, '_base_subtype_class', None):
default_class = list(self.schema_subtypes.keys())[0]
cls = getattr(
importlib.import_module(f"{self.module}.{default_class}"),
default_class)
base_class = cls
for i, x in enumerate(cls.__mro__):
if x._schema_type != cls._schema_type:
break
base_class = x
else: # pragma: debug
raise RuntimeError(
f"Could not determine a base class for "
f"{self.schema_type} (using class {cls})")
self._base_subtype_class = base_class
self._base_subtype_class_name = base_class.__name__
return self._base_subtype_class
@property
def default_subtype(self):
r"""str: Default subtype."""
return self._base_schema['properties'][self.subtype_key].get(
'default', None)
@property
def subtypes(self):
r"""list: All subtypes for this schema type."""
out = []
for v in self.schema_subtypes.values():
out += v
return sorted(list(set(out)))
@property
def classes(self):
r"""list: All available classes for this schema."""
return sorted([k for k in self.schema_subtypes.keys()])
[docs] def append(self, comp_cls, verify=False):
r"""Append component class to the schema.
Args:
comp_cls (class): Component class that should be added.
verify (bool, optional): If True, verify the schema after
adding the component class. Defaults to False.
"""
assert comp_cls._schema_type == self.schema_type
assert comp_cls._schema_subtype_key == self.subtype_key
name = comp_cls.__name__
fullname = f'{comp_cls.__module__}.{comp_cls.__name__}'
subtype_module = '.'.join(comp_cls.__module__.split('.')[:-1])
# Append subtype
subtype_list = getattr(comp_cls, '_%s' % self.subtype_key, None)
if not isinstance(subtype_list, list):
subtype_list = [subtype_list]
subtype_list += getattr(comp_cls, '_%s_aliases' % self.subtype_key, [])
self.schema_subtypes[name] = subtype_list
assert subtype_module == self.module
# Create new schema for subtype
new_schema = {'title': fullname,
'description': ('Schema for %s component %s subtype.'
% (self.schema_type, subtype_list)),
'type': 'object',
'required': copy.deepcopy(comp_cls._schema_required),
'properties': copy.deepcopy(comp_cls._schema_properties),
'additionalProperties': False}
if not new_schema['required']:
del new_schema['required']
new_schema['properties'].setdefault(self.subtype_key, {})
new_schema['properties'][self.subtype_key]['enum'] = subtype_list
# Add legacy properties
if self.schema_type in ['connection', 'comm', 'file', 'model']:
legacy_properties = {'driver': {'type': 'string',
'description': (
'[DEPRECATED] Name of driver '
'class that should be used.')},
'args': {'type': 'string',
'description': (
'[DEPRECATED] Arguments that should '
'be provided to the driver.')}}
for k, v in legacy_properties.items():
if k not in new_schema['properties']:
new_schema['properties'][k] = v
# Create base schema
is_base = False
if self._base_schema is None:
is_base = True
self._base_schema = dict(
copy.deepcopy(new_schema),
title='%s_base' % self.schema_type,
description=('Base schema for all subtypes of %s components.'
% self.schema_type),
dependencies={'driver': ['args']},
additionalProperties=True)
# Add description of subtype to subtype property after base to
# prevent overwriting description of the property rather than the
# property value.
if comp_cls._schema_subtype_description is not None:
new_schema['properties'][self.subtype_key]['description'] = (
comp_cls._schema_subtype_description)
# Update base schema, checking for compatiblity
if not is_base:
if 'required' in self._base_schema:
self._base_schema['required'] = sorted(list(
set(self._base_schema['required'])
& set(new_schema.get('required', []))))
if not self._base_schema['required']: # pragma: no cover
del self._base_schema['required']
prop_overlap = list(
set([self.subtype_key]) # Force subtype keys to be included
| (set(self._base_schema['properties'].keys())
& set(new_schema['properties'].keys())))
new_base_prop = {}
for k in prop_overlap:
old = copy.deepcopy(self._base_schema['properties'][k])
new = copy.deepcopy(new_schema['properties'][k])
# Don't compare descriptions or properties defining subtype
if k != self.subtype_key:
old.pop('description', None)
new.pop('description', None)
old.pop('default', None)
new.pop('default', None)
if old != new: # pragma: debug
raise ValueError(
("Schema for property '%s' of class '%s' "
"is %s, which differs from the existing "
"base class value (%s). Check that "
"another class dosn't have a conflicting "
"definition of the same property.")
% (k, comp_cls, new, old))
# Assign original copy that includes description
new_base_prop[k] = self._base_schema['properties'][k]
if k == self.subtype_key:
new_base_prop[k]['enum'] = sorted(list(
set(new_base_prop[k]['enum']) | set(new['enum'])))
self._base_schema['properties'] = new_base_prop
self._storage[name] = copy.deepcopy(new_schema)
# Verify that the schema is valid
if verify:
metaschema.validate_schema(self.schema)
[docs]class SchemaRegistry(object):
r"""Registry of schema's for different integration components.
Args:
registry (dict, optional): Dictionary of registered components.
Defaults to None and the registry will be empty.
required (list, optional): Components that are required. Defaults to
['comm', 'file', 'model', 'connection']. Ignored if registry is None.
Raises:
ValueError: If registry is provided and one of the required components
is missing.
"""
_normalizers = {}
_default_required_components = ['comm', 'file', 'model', 'connection']
def __init__(self, registry=None, required=None):
super(SchemaRegistry, self).__init__()
self._cache = {}
self._storage = SchemaDict()
if required is None:
required = self._default_required_components
self.required_components = required
if registry is not None:
for k in required:
if k not in registry:
raise ValueError("Component %s required." % k)
# Create schemas for each component
for k, v in registry.items():
icomp = ComponentSchema.from_registry(k, v, schema_registry=self)
self.add(k, icomp, verify=True)
[docs] def add(self, k, v, verify=False):
r"""Add a new component schema to the registry."""
self._cache = {}
self._storage[k] = v
if verify:
metaschema.validate_schema(self.schema)
[docs] def get(self, k, *args, **kwargs):
r"""Return a component schema from the registry."""
return self._storage.get(k, *args, **kwargs)
[docs] def get_definitions(self, relaxed=False, allow_instance=False,
for_form=False, dont_copy=False):
r"""Get schema definitions for the registered components.
Args:
relaxed (bool, optional): If True, the returned schema (and any
definitions it includes) are relaxed to allow for objects with
objects with additional properties to pass validation. Defaults
to False.
allow_instance (bool, optional): If True, the returned definitions will
validate instances of the components in addition to documents
describing components. Defaults to False.
for_form (bool, optional): If True, the returned schema will be
formatted for easy parsing by form generation tools. Defaults
to False. Causes relaxed and allow_instance to be ignored.
dont_copy (bool, optional): If True, a the cached definitions
are returned without copying. Defaults to False.
Returns:
dict: Schema defintiions for each of the registered components.
"""
cache_key = 'definitions'
if for_form:
cache_key += '_form'
relaxed = False
allow_instance = False
if relaxed:
cache_key += '_relaxed'
if allow_instance:
cache_key += '_instance'
if cache_key not in self._cache:
out = {k: v.get_schema(relaxed=relaxed, allow_instance=allow_instance,
for_form=for_form)
for k, v in self._storage.items()}
for k in self.required_components:
out.setdefault(k, {'type': 'string'})
self._cache[cache_key] = out
out = self._cache[cache_key]
if not dont_copy:
out = copy.deepcopy(out)
return out
[docs] def get_schema(self, relaxed=False, allow_instance=False, for_form=False):
r"""Get the schema defining this component.
Args:
relaxed (bool, optional): If True, the returned schema (and any
definitions it includes) are relaxed to allow for objects with
objects with additional properties to pass validation. Defaults
to False.
allow_instance (bool, optional): If True, the returned schema will
validate instances of this component in addition to documents
describing a component. Defaults to False.
for_form (bool, optional): If True, the returned schema will be
formatted for easy parsing by form generation tools. Defaults
to False. Causes relaxed and allow_instance to be ignored.
Returns:
dict: Schema for this component.
"""
cache_key = 'schema'
if for_form:
cache_key += '_form'
relaxed = False
allow_instance = False
if relaxed:
cache_key += '_relaxed'
if allow_instance:
cache_key += '_instance'
if cache_key not in self._cache:
out = {'title': 'YAML Schema',
'description': 'Schema for yggdrasil YAML input files.',
'type': 'object',
'definitions': self.get_definitions(
relaxed=relaxed, allow_instance=allow_instance,
for_form=for_form, dont_copy=True),
'required': ['models'],
'additionalProperties': False,
'properties': SchemaDict(
[('models', {'type': 'array',
'items': {'$ref': '#/definitions/model'},
'minItems': 1}),
('connections',
{'type': 'array',
'items': {'$ref': '#/definitions/connection'}})])}
self._cache[cache_key] = out
return copy.deepcopy(self._cache[cache_key])
@property
def definitions(self):
r"""dict: Schema definitions for different components."""
return self.get_definitions()
@property
def schema(self):
r"""dict: Schema for evaluating YAML input file."""
return self.get_schema()
@property
def form_schema(self):
r"""dict: Schema for generating a YAML form."""
out = self.get_schema(for_form=True)
out['definitions']['schema'] = copy.deepcopy(metaschema._metaschema)
out = convert_extended2base(out)
return out
@property
def model_form_schema_props(self):
r"""dict: Information about how properties should be modified for the
model form schema."""
prop = {
# 'add': {},
'replace': {
'comm': {
'transform': {
"type": "array",
"items": {"$ref": "#/definitions/transform"}},
'default_file': {
'$ref': '#/definitions/file'}},
'file': {
'serializer': {
'$ref': '#/definitions/serializer'}}},
'required': {
'model': ['args', 'inputs', 'outputs', 'description',
'repository_url', 'repository_commit']},
'remove': {
'comm': ['is_default', 'length_map', 'serializer',
'address', 'dont_copy', 'for_service',
'send_converter', 'recv_converter', 'client_id',
'cookies', 'host', 'params', 'port', 'commtype'],
'ocomm': ['default_value'],
'file': ['is_default', 'length_map',
'wait_for_creation', 'working_dir',
'read_meth', 'in_temp',
'serializer', 'datatype',
'address', 'dont_copy', 'for_service',
'send_converter', 'recv_converter', 'client_id',
'cookies', 'host', 'params', 'port'],
'model': ['client_of', 'is_server', 'preserve_cache',
'products', 'source_products', 'working_dir',
'overwrite', 'skip_interpreter', 'copies',
'timesync', 'with_strace', 'with_valgrind',
'valgrind_flags', 'additional_variables',
'aggregation', 'interpolation', 'synonyms',
'driver']},
'order': {
'model': ['name', 'repository_url', 'repository_commit',
'contact_email', 'language', 'description',
'args', 'inputs', 'outputs'],
'comm': ['name', 'datatype']},
'update': {
'model': {
'inputs': {
'description': ('Zero or more channels carrying '
'input to the model'),
'items': {'$ref': '#/definitions/icomm'}},
'outputs': {
'description': ('Zero or more channels carrying '
'output from the model'),
'items': {'$ref': '#/definitions/ocomm'}},
'repository_commit': {
'description': ('Commit that should be checked out '
'from the model repository.')},
'args': {'minItems': 1}},
'file': {
'name': {
'description': ('Path to a file in the model '
'repository')}}},
}
return prop
@property
def model_form_schema(self):
r"""dict: Schema for generating a model YAML form."""
from yggdrasil import constants
out = self.get_schema(for_form=True)
scalar_types = list(constants.VALID_TYPES.keys())
meta = copy.deepcopy(metaschema._metaschema)
meta_prop = {
'subtype': ['1darray', 'ndarray'],
'units': ['1darray', 'ndarray'] + scalar_types,
'precision': ['1darray', 'ndarray'] + scalar_types,
'length': ['1darray'],
'shape': ['ndarray']}
out['definitions']['simpleTypes'] = meta['definitions']['simpleTypes']
out['definitions']['simpleTypes'].update(type='string',
default='bytes')
out['definitions']['simpleTypes']['enum'].remove('scalar')
out['definitions']['schema'] = {'type': 'object',
'required': ['type'],
'properties': {}}
out['definitions']['schema']['properties']['type'] = {
'$ref': '#/definitions/simpleTypes'}
for k, types in meta_prop.items():
out['definitions']['schema']['properties'][k] = meta['properties'][k]
if types:
out['definitions']['schema']['properties'][k]['options'] = {
'dependencies': {'type': types}}
for k in ['comm', 'file', 'model']:
out['definitions'][k].pop('description', '')
for k in out['definitions'].keys():
if k in ['schema', 'simpleTypes']:
continue
out['definitions'][k].pop('title', None)
if ((('required' in out['definitions'][k])
and ('working_dir' in out['definitions'][k]['required']))):
out['definitions'][k]['required'].remove('working_dir')
for p, v in list(out['definitions'][k]['properties'].items()):
if v.get('description', '').startswith('[DEPRECATED]'):
out['definitions'][k]['properties'].pop(p)
# Process based on model_form_schema_props
prop = self.model_form_schema_props
def adjust_definitions(k):
# Remove
for p in prop['remove'].get(k, []):
out['definitions'][k]['properties'].pop(p, None)
if p in out['definitions'][k].get('required', []):
out['definitions'][k]['required'].remove(p)
# Replace
for r, v in prop['replace'].get(k, {}).items():
if 'description' in out['definitions'][k]['properties'].get(r, {}):
v['description'] = (
out['definitions'][k]['properties'][r]['description'])
out['definitions'][k]['properties'][r] = v
# Required
out['definitions'][k].setdefault('required', [])
for p in prop['required'].get(k, []):
if p not in out['definitions'][k]['required']:
out['definitions'][k]['required'].append(p)
# Update
for p, new in prop['update'].get(k, {}).items():
out['definitions'][k]['properties'][p].update(new)
# Add
# out['definitions'][k]['properties'].update(
# prop['add'].get(k, {}))
# Order
for i, p in enumerate(prop['order'].get(k, [])):
out['definitions'][k]['properties'][p]['propertyOrder'] = i
# Update definitions
for k in ['model', 'comm', 'file']:
adjust_definitions(k)
for k in ['icomm', 'ocomm']:
out['definitions'][k] = copy.deepcopy(out['definitions']['comm'])
adjust_definitions(k)
out['definitions']['icomm']['oneOf'] = [
{'title': 'default file',
'required': ['default_file'],
'not': {'required': ['default_value']}},
{'title': 'default value',
'required': ['default_value'],
'not': {'required': ['default_file']}}]
# Adjust formating
for x in [out] + list(out['definitions'].values()):
for p, v in x.get('properties', {}).items():
if v.get("type", None) == "boolean":
v.setdefault("format", "checkbox")
# Isolate model
out.update(out['definitions'].pop('model'))
out['definitions'].pop('connection')
out.update(
title='Model YAML Schema',
description='Schema for yggdrasil model YAML input files.')
out = convert_extended2base(out)
return out
@property
def full_schema(self):
r"""dict: Schema for evaluating YAML input file that fully specifies
the properties for each component."""
if 'full_schema' not in self._cache:
out = self.schema
for k, v in self._storage.items():
out['definitions'][k] = v.full_schema
self._cache['full_schema'] = out
return copy.deepcopy(self._cache['full_schema'])
def __getitem__(self, k):
return self.get(k)
[docs] def keys(self):
return self._storage.keys()
[docs] def items(self):
return self._storage.items()
def __eq__(self, other):
if not hasattr(other, 'schema'):
return False
return (self.schema == other.schema)
[docs] @classmethod
def from_file(cls, fname):
r"""Create a SchemaRegistry from a file.
Args:
fname (str): Full path to the file the schema should be loaded from.
"""
out = cls()
out.load(fname)
return out
[docs] def load(self, fname):
r"""Load schema from a file.
Args:
fname (str): Full path to the file the schema should be loaded from.
"""
with open(fname, 'r') as f:
contents = f.read()
schema = ordered_load(contents, Loader=yaml.SafeLoader)
if schema is None:
raise Exception("Failed to load schema from %s" % fname)
# Create components
for k, v in schema.get('definitions', {}).items():
icomp = ComponentSchema.from_schema(v, schema_registry=self)
self.add(k, icomp)
[docs] def save(self, fname):
r"""Save the schema to a file.
Args:
fname (str): Full path to the file the schema should be saved to.
schema (dict): yggdrasil YAML options.
"""
out = self.schema
with open(fname, 'w') as f:
ordered_dump(out, stream=f, Dumper=yaml.SafeDumper)
[docs] def validate(self, obj, **kwargs):
r"""Validate an object against this schema.
Args:
obj (object): Object to valdiate.
**kwargs: Additional keyword arguments are passed to validate_instance.
"""
if kwargs.get('normalize', False):
kwargs.setdefault('normalizers', self._normalizers)
# kwargs.setdefault('no_defaults', True)
kwargs.setdefault('schema_registry', self)
return metaschema.validate_instance(obj, self.schema, **kwargs)
[docs] def validate_model_submission(self, obj, **kwargs):
r"""Validate an object against the schema for models submitted to
the yggdrasil model repository.
Args:
obj (object): Object to validate.
**kwargs: Additional keyword arguments are ignored.
"""
import jsonschema
jsonschema.validate(obj, self.model_form_schema)
return obj
[docs] def validate_component(self, comp_name, obj, **kwargs):
r"""Validate an object against a specific component.
Args:
comp_name (str): Name of the component to validate against.
obj (object): Object to validate.
**kwargs: Additional keyword arguments are passed to
get_component_schema.
"""
comp_schema = self.get_component_schema(comp_name, **kwargs)
return metaschema.validate_instance(obj, comp_schema)
[docs] def normalize(self, obj, backwards_compat=False, **kwargs):
r"""Normalize an object against this schema.
Args:
obj (object): Object to normalize.
**kwargs: Additional keyword arguments are passed to normalize_instance.
Returns:
object: Normalized object.
"""
kwargs.setdefault('normalizers', self._normalizers)
kwargs.setdefault('required_defaults', True)
kwargs.setdefault('no_defaults', True)
kwargs.setdefault('schema_registry', self)
return metaschema.normalize_instance(obj, self.full_schema, **kwargs)
# def is_valid(self, obj):
# r"""Determine if an object is valid under this schema.
# Args:
# obj (object): Object to valdiate.
# Returns:
# bool: True if the object is valid, False otherwise.
# """
# try:
# self.validate(obj)
# except ValidationError:
# return False
# return True
[docs] def is_valid_component(self, comp_name, obj):
r"""Determine if an object is a valid represenation of a component.
Args:
comp_name (str): Name of the component to validate against.
obj (object): Object to validate.
Returns:
bool: True if the object is valid, False otherwise.
"""
try:
self.validate_component(comp_name, obj)
except ValidationError:
return False
return True
[docs] def get_component_schema(self, comp_name, subtype=None, relaxed=False,
allow_instance=False, allow_instance_definitions=False,
for_form=False):
r"""Get the schema for a certain component.
Args:
comp_name (str): Name of the component to get the schema for.
subtype (str, optional): Component subtype to get schema for.
Defaults to None and the schema for evaluating any subtype of
the specified component is returned.
relaxed (bool, optional): If True, the returned schema (and any
definitions it includes) are relaxed to allow for objects with
objects with additional properties to pass validation. Defaults
to False.
allow_instance (bool, optional): If True, the returned schema will
validate instances of this component in addition to documents
describing a component. Defaults to False.
allow_instance_definitions (bool, optional): If True, the definitions
in the returned schema will allow for instances of the components.
Defaults to False.
for_form (bool, optional): If True, the returned schema will be
formatted for easy parsing by form generation tools. Defaults
to False. Causes relaxed and allow_instance to be ignored.
**kwargs: Additonal keyword arguments are paseed to get_schema or
get_subtype_schema for the selected component type.
Returns:
dict: Schema for the specified component.
"""
if comp_name not in self._storage: # pragma: debug
raise ValueError("Unrecognized component: %s" % comp_name)
if subtype is None:
out = self._storage[comp_name].get_schema(
relaxed=relaxed, allow_instance=allow_instance,
for_form=for_form)
else:
out = self._storage[comp_name].get_subtype_schema(
subtype, relaxed=relaxed, allow_instance=allow_instance,
for_form=for_form)
out['definitions'] = self.get_definitions(
relaxed=relaxed, allow_instance=allow_instance_definitions,
for_form=for_form)
return out
[docs] def get_component_keys(self, comp_name):
r"""Get the properties associated with a certain component.
Args:
comp_name (str): Name of the component to return keys for.
Returns:
list: All of the valid properties for the specified component.
"""
return self._storage[comp_name].properties
[docs] @classmethod
def register_normalizer(cls, path):
r"""Register a normalizer that will be applied to elements in the
instance at the specified path.
Args:
path (tuple): Location in schema where normalizer will be applied.
Returns:
function: Decorator for registering the normalizer function.
"""
if not isinstance(path, list):
path_list = [path]
else:
path_list = path
def _register_normalizer(func):
for p in path_list:
if p not in cls._normalizers:
cls._normalizers[p] = []
cls._normalizers[p].append(func)
return func
return _register_normalizer
# The following are function add to allow backwards compatability of older
# yaml schemas
[docs]def rwmeth2filetype(rw_meth):
r"""Get the alternate properties that corresponding to the old
read_meth/write_meth keywords.
Args:
rw_meth (str): Read/write method name.
Returns:
dict: Property values equivalent to provided read/write method.
"""
out = {}
if rw_meth == 'all':
out['filetype'] = 'binary'
elif rw_meth == 'line':
out['filetype'] = 'ascii'
elif rw_meth == 'table_array':
out['filetype'] = 'table'
out['as_array'] = True
else:
out['filetype'] = rw_meth
return out
[docs]def cdriver2commtype(driver):
r"""Convert a connection driver to a file type.
Args:
driver (str): The name of the connection driver.
Returns:
str: The corresponding file type for the driver.
"""
_legacy = {'InputDriver': 'default',
'OutputDriver': 'default',
'ZMQInputDriver': 'zmq',
'ZMQOutputDriver': 'zmq',
'IPCInputDriver': 'ipc',
'IPCOutputDriver': 'ipc',
'RMQInputDriver': 'rmq',
'RMQOutputDriver': 'rmq',
'RMQAsyncInputDriver': 'rmq_async',
'RMQAsyncOutputDriver': 'rmq_async'}
if driver in _legacy:
return _legacy[driver]
raise ValueError("Unknown driver: '%s'" % driver)
[docs]def cdriver2filetype(driver):
r"""Convert a connection driver to a file type.
Args:
driver (str): The name of the connection driver.
Returns:
str: The corresponding file type for the driver.
"""
_legacy = {'FileInputDriver': 'binary',
'FileOutputDriver': 'binary',
'AsciiMapInputDriver': 'map',
'AsciiMapOutputDriver': 'map',
'AsciiFileInputDriver': 'ascii',
'AsciiFileOutputDriver': 'ascii',
'AsciiTableInputDriver': 'table',
'AsciiTableOutputDriver': 'table',
'PandasFileInputDriver': 'pandas',
'PandasFileOutputDriver': 'pandas',
'PickleFileInputDriver': 'pickle',
'PickleFileOutputDriver': 'pickle',
'PlyFileInputDriver': 'ply',
'PlyFileOutputDriver': 'ply',
'ObjFileInputDriver': 'obj',
'ObjFileOutputDriver': 'obj',
'MatInputDriver': 'mat',
'MatOutputDriver': 'mat'}
if driver in _legacy:
return _legacy[driver]
raise ValueError("%s is not a registered connection driver." % driver)
[docs]def migrate_keys(from_dict, to_dict, exclude_key_list=None, include_key_list=None):
r"""Migrate keys from one component to another that are not in a list
of predefined keys.
Args:
from_dict (dict): Component dictionary to migrate keys from.
to_dict (list): List of component dictionaries to migrate keys to. If
this is an empty list, keys will not be migrated.
exclude_key_list (list, optional): List of keys in from_dict that
should not be migrated to to_dict. All keys in include_key_list
that are not in this list are moved. Defaults to None and no keys
are excluded.
include_key_list (list, optional): List of keys that should be migrated
from from_dict to to_dict dictionaries. If not provided, all keys
in from_dict that are not in exclude_key_list are moved. Defaults
to None and all keys in from_dict are included.
"""
assert isinstance(to_dict, list)
if len(to_dict) == 0:
return
if exclude_key_list is None:
exclude_key_list = []
if include_key_list is None:
include_key_list = list(from_dict.keys())
for k in include_key_list:
if (k not in from_dict) or (k in exclude_key_list):
continue
v = from_dict.pop(k)
for d in to_dict:
d.setdefault(k, v)
[docs]def standardize(instance, keys, is_singular=False, suffixes=None, altkeys=None):
r"""Standardize a component such that each key contains a list of dictionaries.
Args:
instance (dict): Component to standardize.
keys (list): Keys to standardize in the instance.
is_singular (bool, optional): If False, the keys are assumed to be plural
and singular alternatives are also checked. If True, the keys are
assumed to be singular and plural alternatives are also checked.
Defaults to False.
suffixes (list, optional): Suffixes to add to the keys to get a set of
alternate keys that should also be checked. Defaults to None as is
ignored.
altkeys (list, optional): List of lists of alternate keys. Defaults to None.
"""
for k in keys:
if k not in instance:
instance[k] = []
if not isinstance(instance[k], list):
instance[k] = [instance[k]]
# Get list of alternate keys from suffixes and plurality
if altkeys is None:
altkeys = []
if suffixes is not None:
for s in suffixes:
altkeys.append(['%s%s' % (k, s) for k in keys])
if is_singular:
altkeys.append(['%ss%s' % (k, s) for k in keys])
altkeys.append(['%s%ss' % (k, s) for k in keys])
else:
altkeys.append(['%s%s' % (k[:-1], s) for k in keys])
altkeys.append(['%s%ss' % (k[:-1], s) for k in keys])
if is_singular:
altkeys.append(['%ss' % k for k in keys])
else:
altkeys.append([k[:-1] for k in keys])
# Add components listed under alternate keys
for ialtkeys in altkeys:
for k, kalt in zip(keys, ialtkeys):
if kalt in instance:
if isinstance(instance[kalt], list):
instance[k] += instance.pop(kalt)
else:
instance[k].append(instance.pop(kalt))
# Handle strings
for k in keys:
for i in range(len(instance[k])):
if isinstance(instance[k][i], str):
instance[k][i] = {'name': instance[k][i]}
@SchemaRegistry.register_normalizer(tuple())
def _normalize_root(normalizer, value, instance, schema):
r"""Decorate normalizer."""
# if getattr(normalizer, 'schema_registry', None) is None:
# normalizer.schema_registry = get_schema()
if getattr(normalizer, 'iodict', None) is None:
normalizer.iodict = {'inputs': {}, 'outputs': {}, 'connections': [],
'input_drivers': [], 'output_drivers': [], 'pairs': [],
'inputs_extra': {}, 'outputs_extra': {},
'models': {},
'aliases': {'inputs': {}, 'outputs': {}}}
standardize(instance, ['models', 'connections'])
return instance
@SchemaRegistry.register_normalizer(('models', 0))
def _normalize_modelio_first(normalizer, value, instance, schema):
r"""Normalizes set of model inputs/outputs before each input/output is normalized."""
iodict = getattr(normalizer, 'iodict', None)
if isinstance(instance, dict):
standardize(instance, ['inputs', 'outputs'])
if instance.get('language', None) == 'timesync':
instance.setdefault('args', [])
prefix = '%s:' % instance['name']
for io in ['inputs', 'outputs']:
if len(instance[io]) == 0:
instance[io] = [{'name': io[:-1], 'is_default': True}]
for x in instance[io]:
if not x['name'].startswith(prefix):
new_name = prefix + x['name']
if iodict is not None:
iodict['aliases'][io][x['name']] = new_name
if x.get('is_default', False):
iodict['aliases'][io][instance['name']] = new_name
x['name'] = new_name
if not x.get('is_default', False):
x.setdefault('working_dir', instance['working_dir'])
if 'default_file' in x:
if isinstance(x['default_file'], str):
x['default_file'] = {'name': x['default_file']}
x['default_file'].setdefault('working_dir',
instance['working_dir'])
return instance
@SchemaRegistry.register_normalizer([('models', 0, 'inputs', 0),
('models', 0, 'outputs', 0)])
def _normalize_modelio_elements(normalizer, value, instance, schema):
r"""Normalize case of models singular."""
io = normalizer.current_schema_path[2]
# Register io if dict set
iodict = getattr(normalizer, 'iodict', None)
s = getattr(normalizer, 'schema_registry', None)
if (iodict is not None) and isinstance(instance, dict) and ('name' in instance):
# Register io if dict set
if instance['name'] not in iodict[io]:
iodict[io][instance['name']] = instance
# Move non-comm keywords to a buffer
if (s is not None):
comm_keys = s.get_component_keys('comm')
type_keys = list(metaschema.get_metaschema()['properties'].keys())
extra_keys = {}
migrate_keys(instance, [extra_keys], comm_keys + type_keys)
iodict['%s_extra' % io][instance['name']] = extra_keys
# type_dict = {}
# migrate_keys(instance, [type_dict], comm_keys)
# instance.setdefault('datatype', {})
# instance['datatype'].update(type_dict)
# Add driver to list
if ('driver' in instance) and ('args' in instance):
opp_map = {'inputs': 'output', 'outputs': 'input'}
for i, (opp_arg, opp_name) in enumerate(iodict['%s_drivers' % opp_map[io]]):
if instance['args'] == opp_arg:
if io == 'inputs':
iodict['pairs'].append(
(iodict['%s_drivers' % opp_map[io]].pop(i)[1],
instance['name']))
else: # pragma: debug
# This won't be called because inputs are processed first
# but this code is here for symmetries sake
iodict['pairs'].append(
(instance['name'],
iodict['%s_drivers' % opp_map[io]].pop(i)[1]))
break
else:
iodict['%s_drivers' % io[:-1]].append(
(instance['args'], instance['name']))
return instance
@SchemaRegistry.register_normalizer(('connections',))
def _normalize_connio_base(normalizer, value, instance, schema):
r"""Normalizes list of connections, adding those represented by
multiple drivers."""
if normalizer.current_path != normalizer.current_schema_path:
return instance
# Build connections from input/output drivers
iodict = getattr(normalizer, 'iodict', None)
if (iodict is not None):
new_connections = []
# Create direct connections from output to input
for (oname, iname) in iodict['pairs']:
oyml = iodict['outputs'][oname]
iyml = iodict['inputs'][iname]
conn = dict(input=oname, output=iname)
new_connections.append(([oyml, iyml], conn))
oyml['commtype'] = cdriver2commtype(oyml['driver'])
iyml['commtype'] = cdriver2commtype(iyml['driver'])
oyml.pop('working_dir', None)
iyml.pop('working_dir', None)
# File input
for k, v in iodict['input_drivers']:
iyml = iodict['inputs'][v]
fyml = dict(name=k, filetype=cdriver2filetype(iyml['driver']))
if iyml.get('as_array', False):
# TODO: This should not be an exception
fyml['as_array'] = True
conn = dict(input=fyml, output=v)
new_connections.append(([iyml], conn))
# File output
for k, v in iodict['output_drivers']:
oyml = iodict['outputs'][v]
fyml = dict(name=k, filetype=cdriver2filetype(oyml['driver']))
if oyml.get('as_array', False):
# TODO: This should not be an exception
fyml['as_array'] = True
conn = dict(output=fyml, input=v)
new_connections.append(([oyml], conn))
# Transfer keyword arguments from input/output to connection
for ymls, conn in new_connections:
for y in ymls:
del y['driver'], y['args']
iodict['connections'].append(conn)
instance.append(conn)
# Empty registry of orphan input/output drivers
for k in ['input_drivers', 'output_drivers', 'pairs']:
iodict[k] = []
return instance
@SchemaRegistry.register_normalizer(('connections', 0))
def _normalize_connio_first(normalizer, value, instance, schema):
r"""Normalizes set of connection before each connection is normalized."""
if isinstance(instance, dict):
standardize(instance, ['inputs', 'outputs'], suffixes=['_file', '_files'],
altkeys=[['from', 'to']])
# Handle indexed inputs/outputs
for io in ['inputs', 'outputs']:
pruned = []
pruned_names = []
for x in instance[io]:
if ('::' in x['name']) and (not os.path.isabs(x['name'])):
name = x['name'].split('::')[0]
x['name'] = name
if x['name'] not in pruned_names:
pruned_names.append(x['name'])
pruned.append(x)
instance[io] = pruned
# Move non-comm properties from model inputs/outputs
s = getattr(normalizer, 'schema_registry', None)
iodict = getattr(normalizer, 'iodict', None)
if (s is not None) and (iodict is not None):
opp_map = {'inputs': 'outputs', 'outputs': 'inputs'}
comm_keys = s.get_component_keys('comm')
conn_keys = s.get_component_keys('connection')
target_files = []
for io in ['inputs', 'outputs']:
for x in instance[io]:
if x['name'] in iodict['aliases'][opp_map[io]]:
x['name'] = iodict['aliases'][opp_map[io]][x['name']]
y = iodict['%s_extra' % opp_map[io]].get(x['name'], None)
if y is None:
target_files.append(x)
continue
y_keys = list(y.keys())
for k in y_keys:
val = y.pop(k)
if k == 'translator':
instance.setdefault(k, [])
if not isinstance(val, (list, tuple)):
val = [val]
instance[k] += val
else:
instance.setdefault(k, val)
# Move everything but comm keywords down to files, then move
# comm keywords down to connection inputs and outputs.
migrate_keys(instance, target_files, conn_keys + comm_keys)
instance.pop('working_dir', None)
migrate_keys(instance, instance['inputs'] + instance['outputs'], conn_keys)
return instance
@SchemaRegistry.register_normalizer([('connections', 0, 'inputs', 0, 0),
('connections', 0, 'outputs', 0, 0)])
def _normalize_connio_elements_comm(normalizer, value, instance, schema):
r"""Normalize connection inputs/outputs as comms."""
io = normalizer.current_schema_path[2]
if isinstance(instance, dict):
# Check to see if is file
iodict = getattr(normalizer, 'iodict', None)
opp_map = {'inputs': 'outputs', 'outputs': 'inputs'}
if iodict is not None:
if (instance['name'] in iodict[opp_map[io]]):
opp_comm = iodict[opp_map[io]][instance['name']]
s = getattr(normalizer, 'schema_registry', None)
if s is not None:
comm_keys = s.get_component_keys('comm')
for k in comm_keys:
if k in opp_comm:
instance.setdefault(k, opp_comm[k])
return instance
@SchemaRegistry.register_normalizer([('connections', 0, 'inputs', 1, 0),
('connections', 0, 'outputs', 1, 0)])
def _normalize_connio_elements_file(normalizer, value, instance, schema):
r"""Normalize connection inputs/outputs as files."""
io = normalizer.current_schema_path[2]
if isinstance(instance, dict):
# Check to see if is file
iodict = getattr(normalizer, 'iodict', None)
opp_map = {'inputs': 'outputs', 'outputs': 'inputs'}
if iodict is not None:
if (((instance['name'] not in iodict[opp_map[io]])
and ('filetype' not in instance))):
instance['filetype'] = schema['properties']['filetype']['default']
return instance
@SchemaRegistry.register_normalizer(('connections', 1))
def _normalize_connio_last(normalizer, value, instance, schema):
r"""Normalize set of connections after they have been normalized."""
if isinstance(instance, dict):
# Check that files properly specified
s = getattr(normalizer, 'schema_registry', None)
if s is not None:
is_file = {}
for io in ['inputs', 'outputs']:
all = [s.is_valid_component('file', x) for x in instance[io]]
is_file[io] = (sum(all) == len(all))
if is_file['inputs'] and is_file['outputs']:
raise RuntimeError(("Both the input and output for this connection "
+ "appear to be files:\n%s"
% pprint.pformat(instance)))
# Copy file keys from partner comm(s) to the file comm(s)
comm_keys = s.get_component_keys('comm')
opp_map = {'inputs': 'outputs', 'outputs': 'inputs'}
for io in ['inputs', 'outputs']:
if is_file[io]:
for x in instance[opp_map[io]]:
migrate_keys(x, instance[io], comm_keys)
return instance
@SchemaRegistry.register_normalizer(('models', 0))
def _normalize_model_driver(normalizer, value, instance, schema):
r"""Normalizes older style of specifying driver rather than language."""
if isinstance(instance, dict):
s = getattr(normalizer, 'schema_registry', None)
if s is not None:
if ('language' not in instance) and ('driver' in instance):
if instance['driver'] == 'GCCModelDriver':
# TODO: Fix this properly, checking the extension to
# distinguish between C and C++
if isinstance(instance['args'], list):
args_ext = os.path.splitext(instance['args'][0])[-1]
else:
args_ext = os.path.splitext(instance['args'])[-1]
from yggdrasil.drivers.CPPModelDriver import CPPModelDriver
if args_ext in CPPModelDriver.language_ext:
instance['driver'] = 'CPPModelDriver'
else:
instance['driver'] = 'CModelDriver'
class2language = s['model'].class2subtype
instance['language'] = class2language[instance.pop('driver')][0]
return instance
@SchemaRegistry.register_normalizer([('connections', 0, 'inputs', 1, 0),
('connections', 0, 'outputs', 1, 0)])
def _normalize_rwmeth(normalizer, value, instance, schema):
r"""Normalize older style of specifying 'read_meth' or 'write_meth' instead
of filetype."""
if isinstance(instance, dict):
# Replace old read/write methd with filetype
for k in ['read_meth', 'write_meth']:
val = instance.pop(k, None)
if (((val is not None)
and (instance.get('filetype', None) in [None, 'binary']))):
instance.update(rwmeth2filetype(val))
return instance
@SchemaRegistry.register_normalizer([('connections', 0, 'inputs', 1, 0),
('connections', 0, 'outputs', 1, 0)])
def _normalize_ascii_table(normalizer, value, instance, schema):
r"""Normalize the older style arguments for ascii table connections."""
if isinstance(instance, dict):
alias_keys = [('column_names', 'field_names'),
('column_units', 'field_units'),
('column', 'delimiter')]
for old, new in alias_keys:
if old in instance:
instance.setdefault(new, instance.pop(old))
return instance
@SchemaRegistry.register_normalizer([('connections', 0, 'inputs', 1, 0),
('connections', 0, 'outputs', 1, 0)])
def _normalize_serializer(normalizer, value, instance, schema):
r"""Normalize the serializer if the information is in the file."""
if ((isinstance(instance, dict) and ('serializer' not in instance)
and (instance.get('filetype', None) in [None, 'binary']))):
s = getattr(normalizer, 'schema_registry', None)
if s is not None:
comm_keys = s.get_component_keys('comm')
seri_keys = s.get_component_keys('serializer')
serializer = {}
migrate_keys(instance, [serializer], include_key_list=seri_keys,
exclude_key_list=comm_keys)
if serializer:
instance['serializer'] = serializer
return instance
@SchemaRegistry.register_normalizer([('models', 0, 'inputs', 0),
('models', 0, 'outputs', 0),
('connections', 0, 'inputs', 0, 0),
('connections', 0, 'outputs', 0, 0)])
def _normalize_datatype(normalizer, value, instance, schema):
r"""Normalize the datatype if the type information is in the comm."""
if isinstance(instance, dict):
if ('datatype' not in instance):
type_keys = list(metaschema.get_metaschema()['properties'].keys())
# Don't include args in type_keys if driver in the instance
if ('driver' in instance) and ('args' in type_keys):
type_keys.remove('args')
datatype = {}
migrate_keys(instance, [datatype], include_key_list=type_keys)
if datatype:
instance['datatype'] = datatype
return instance