import os
import copy
import pprint
import yaml
import json
import importlib
from collections import OrderedDict
from yggdrasil import rapidjson
_schema_fname = os.path.abspath(os.path.join(
os.path.dirname(__file__), '.ygg_schema.yml'))
_schema = None
_constants_separator = (
"\n# ======================================================\n"
"# Do not edit this file past this point as the following\n"
"# is generated by yggdrasil.schema.update_constants\n"
"# ======================================================\n")
[docs]class SchemaDict(OrderedDict):
r"""OrderedDict subclass for ordering schemas on read in Python 2."""
def __repr__(self):
return pprint.pformat(dict(self))
[docs]def ordered_load(stream, object_pairs_hook=SchemaDict, **kwargs):
r"""Load YAML document from a file using a specified class to represent
mapping types that allows for ordering.
Args:
stream (file): File stream to load the schema YAML from.
object_pairs_hook (type, optional): Class that should be used to
represent loaded maps. Defaults to SchemaDict.
**kwargs: Additional keyword arguments are passed to decode_yaml.
Returns:
object: Result of ordered load.
"""
kwargs['sorted_dict_type'] = object_pairs_hook
from yggdrasil.serialize import YAMLSerialize
return YAMLSerialize.decode_yaml(stream, **kwargs)
[docs]def ordered_dump(data, **kwargs):
r"""Dump object as a YAML document, representing SchemaDict objects as
mapping type.
Args:
data (object): Python object that should be dumped.
**kwargs: Additional keyword arguments are passed to encode_yaml.
Returns:
str: YAML document representating data.
"""
from yggdrasil.serialize import YAMLSerialize
kwargs['sorted_dict_type'] = [SchemaDict, OrderedDict]
return YAMLSerialize.encode_yaml(data, **kwargs)
[docs]def clear_schema():
r"""Clear global schema."""
global _schema
_schema = None
[docs]def init_schema(fname=None):
r"""Initialize global schema."""
global _schema
if _schema is None:
_schema = load_schema(fname)
[docs]def create_schema():
r"""Create a new schema from the registry."""
from yggdrasil.components import init_registry, registering
with registering():
x = SchemaRegistry(init_registry(recurse=True))
update_constants(x)
return x
[docs]def load_schema(fname=None):
r"""Return the yggdrasil schema for YAML options.
Args:
fname (str, optional): Full path to the file that the schema should be
loaded from. If the file dosn't exist, it is created. Defaults to
_schema_fname.
Returns:
dict: yggdrasil YAML options.
"""
if fname is None:
fname = _schema_fname
if not os.path.isfile(fname):
x = create_schema()
x.save(fname)
return SchemaRegistry.from_file(fname)
[docs]def get_schema(fname=None):
r"""Return the yggdrasil schema for YAML options.
Args:
fname (str, optional): Full path to the file that the schema should be
loaded from. If the file dosn't exist, it is created. Defaults to
_schema_fname.
Returns:
dict: yggdrasil YAML options.
"""
global _schema
if fname is None:
init_schema()
out = _schema
else:
out = load_schema(fname)
return out
[docs]def convert_extended2base(s):
r"""Covert schema from the extended form to a strictly JSON form.
Args:
s (object): Object to updated.
Returns:
object: Updated JSON object.
"""
# TODO: Automate this on classes
type_map = {'int': 'integer', 'uint': 'integer',
'float': 'number', 'complex': 'string',
'unicode': 'string', 'bytes': 'string',
'function': 'string', 'class': 'string',
'instance': 'string', '1darray': 'array',
'ndarray': 'array', 'obj': 'object',
'ply': 'object',
'any': ["number", "string", "boolean", "object", "array",
"null"]}
if isinstance(s, (list, tuple)):
s = [convert_extended2base(x) for x in s]
elif isinstance(s, (dict, OrderedDict)):
if 'type' in s:
if isinstance(s['type'], str):
if s['type'] in ['schema']:
s = {"$ref": "#/definitions/schema"}
elif s['type'] in type_map:
s['type'] = type_map[s['type']]
s.pop('class', None)
elif s['type'] in ['scalar']:
s.pop("precision", None)
s.pop("units", None)
s['type'] = s.pop('subtype')
s['type'] = type_map.get(s['type'], s['type'])
elif isinstance(s['type'], list):
assert 'schema' not in s['type']
assert 'scalar' not in s['type']
s['type'] = [type_map.get(t, t) for t in s['type']]
if all([t == s['type'][0] for t in s['type']]):
s['type'] = s['type'][0]
opt = copy.deepcopy(s.get('options', None))
s = {k: convert_extended2base(v) for k, v in s.items()}
if opt is not None:
s['options'] = opt
return s
[docs]def get_json_schema(fname_dst=None, indent=None):
r"""Return the yggdrasil schema as a strictly JSON schema without
any of the extended datatypes.
Args:
fname_dst (str, optional): Full path to file where the JSON
schema should be saved. Defaults to None and no file is
created.
indent (str, optional): Indentation that should be used when saving
the schema to a file.
Returns:
dict: Converted structure.
"""
s = get_schema()
out = s.get_schema()
out['definitions']['schema'] = copy.deepcopy(rapidjson.get_metaschema())
out = convert_extended2base(out)
if fname_dst is not None:
with open(fname_dst, 'w') as fd:
json.dump(out, fd, indent=indent)
return out
[docs]def update_constants(schema=None):
r"""Update constants.py with info from the schema."""
from yggdrasil.components import import_component
from yggdrasil.drivers.CompiledModelDriver import (
get_compilation_tool_registry)
if schema is None:
schema = get_schema()
def as_lines(x, newline='\n', key_order=None):
out = ""
if isinstance(x, dict):
if key_order is None:
key_order = list(sorted(x.keys()))
out += "{" + newline
for k in key_order:
v = x[k]
out += " %s: %s," % (
repr(k), as_lines(v, newline=(newline + ' '))) + newline
out += "}"
elif isinstance(x, list):
out += "[" + newline
out += " " + ", ".join(
[as_lines(xx, newline=(newline + ' ')) for xx in x]) + "]"
else:
out += repr(x)
return out
filename = os.path.join(os.path.dirname(__file__), 'constants.py')
# Component information
component_registry = {}
for k, v in schema.items():
component_registry[k] = {
'module': v.module,
'default': v.default_subtype,
'base': v.base_subtype_class_name,
'key': v.subtype_key,
'subtypes': v.subtype2class,
'subtype_modules': v.subtype2module}
# File information
files = {
k: import_component('file', v)
for k, v in component_registry['file']['subtypes'].items()}
file2ext = {}
ext2file = {'.txt': 'ascii'}
for k, f in files.items():
file2ext.setdefault(k, f._extensions[0])
for ext in f._extensions:
ext2file.setdefault(ext, k)
# Language driver information
drivers = {k: import_component('model', v)
for k, v in component_registry['model']['subtypes'].items()}
language_cat = ['compiled', 'interpreted', 'build', 'dsl', 'other']
typemap = {'compiler': 'compiled', 'interpreter': 'interpreted'}
lang2ext = {'yaml': '.yml', 'executable': '.exe'}
languages = {k: [] for k in language_cat}
languages_with_aliases = {k: [] for k in language_cat}
language_properties = {}
compiler_env_vars = {}
compilation_tool_vars = {}
complete = []
aliased_languages = {}
for k, drv in drivers.items():
if drv.language in complete:
continue
complete.append(drv.language)
drv_type = typemap.get(drv.executable_type, drv.executable_type)
if drv.language_ext:
if k not in lang2ext:
assert isinstance(drv.language_ext, list)
lang2ext[k] = drv.language_ext[0]
for ka in drv.language_aliases:
lang2ext[ka] = lang2ext[k]
languages.setdefault(drv_type, [])
languages[drv_type].append(drv.language)
languages_with_aliases.setdefault(drv_type, [])
languages_with_aliases[drv_type].append(drv.language)
languages_with_aliases[drv_type] += drv.language_aliases
if drv.language_aliases:
aliased_languages[drv.language] = [drv.language] + drv.language_aliases
language_properties[drv.language] = {
'executable_type': drv_type,
'is_typed': drv.is_typed,
'full_language': drv.full_language}
languages = {k: sorted(v) for k, v in languages.items()}
languages_with_aliases = {k: sorted(v) for k, v in
languages_with_aliases.items()}
for x in ['compiler', 'linker', 'archiver']:
reg = get_compilation_tool_registry(x).get('by_language', {})
for lang, tools in reg.items():
for v in tools.values():
k = v.toolname
if v.is_build_tool or (k in compilation_tool_vars):
continue
compilation_tool_vars[k] = {
'exec': v.default_executable_env,
'flags': v.default_flags_env}
if (x == 'compiler') and (lang not in compiler_env_vars):
compiler_env_vars[lang] = compilation_tool_vars[k].copy()
language_cat = list(languages.keys())
with open(filename, 'r') as fd:
lines = [fd.read().split(_constants_separator)[0],
_constants_separator[1:]]
lines += [
"", "# Component registry",
f"COMPONENT_REGISTRY = {as_lines(component_registry)}"]
lines += [
"", "# File constants",
"FILE2EXT = %s" % as_lines(file2ext),
"EXT2FILE = %s" % as_lines(ext2file)]
lines += [
"", "# Language driver constants",
"LANG2EXT = %s" % as_lines(lang2ext),
"EXT2LANG = {v: k for k, v in LANG2EXT.items()}",
"LANGUAGES = %s" % as_lines(languages, key_order=language_cat)]
lines.append(
"LANGUAGES['all'] = (\n LANGUAGES[%s]"
% repr(language_cat[0]))
lines += [" + LANGUAGES[%s]" % repr(k)
for k in language_cat[1:]]
lines[-1] += ")"
lines += [
"LANGUAGES_WITH_ALIASES = %s" % as_lines(languages_with_aliases,
key_order=language_cat)]
lines.append(
"LANGUAGES_WITH_ALIASES['all'] = (\n LANGUAGES_WITH_ALIASES[%s]"
% repr(language_cat[0]))
lines += [" + LANGUAGES_WITH_ALIASES[%s]" % repr(k)
for k in language_cat[1:]]
lines[-1] += ")"
lines += [
"ALIASED_LANGUAGES = %s" % as_lines(aliased_languages)]
lines += [
"COMPILER_ENV_VARS = %s" % as_lines(compiler_env_vars),
"COMPILATION_TOOL_VARS = %s" % as_lines(compilation_tool_vars)]
lines += [
"LANGUAGE_PROPERTIES = %s" % as_lines(language_properties)]
with open(filename, 'w') as fd:
fd.write('\n'.join(lines) + '\n')
[docs]class ComponentSchema(object):
r"""Schema information for one component.
Args:
schema_type (str): The name of the component.
subtype_key (str): The name of the schema property/class attribute
that should be used to differentiate between subtypes of this
component.
schema_registry (SchemaRegistry, optional): Registry of schemas
that this schema is dependent on.
**kwargs: Additional keyword arguments are entries in the component
schema.
Args:
schema_type (str): The name of the component.
schema_registry (SchemaRegistry): Registry of schemas.
subtype_key (str): Schema property that is used to differentiate between
subtypes of this component.
schema_subtypes (dict): Mapping between component class names and the
associated values of the subtype_key property for this component.
schema_modules (dict): Mapping between component class names and
the modules that they are in.
"""
def __init__(self, schema_type, subtype_key, schema_registry=None,
module=None, schema_subtypes=None, schema_modules=None):
self._storage = SchemaDict()
self._base_name = None
self._base_kwargs = None
self._base_subtype_description = None
self._base_schema = None
self._no_inherit_kwargs = {}
self.required_by_subtype = []
self.schema_type = schema_type
self.subtype_key = subtype_key
self.schema_registry = schema_registry
if schema_subtypes is None:
schema_subtypes = {}
self.schema_subtypes = schema_subtypes
if schema_modules is None:
schema_modules = {}
self.schema_modules = schema_modules
self.default_subtype = None
self.module = module
super(ComponentSchema, self).__init__()
[docs] def identify_subtype(self, doc):
r"""Identify the subtype associated with a document by validating it
against the schemas for the different subtypes.
Args:
doc (dict): JSON object that conforms to one of the component subtypes.
Returns:
str: Name of the subtype that valdiates the provided document.
"""
for subtype in self.subtypes:
subtype_schema = self.get_subtype_schema(subtype)
try:
rapidjson.validate(doc, subtype_schema)
return subtype
except rapidjson.ValidationError:
pass
raise ValueError("Could not determine subtype "
"for document: %s" % doc) # pragma: debug
[docs] def get_base_schema(self):
r"""Get a base schema containing properties that are the same
for all component subtypes."""
if self._base_schema is not None:
return copy.deepcopy(self._base_schema)
assert self._base_name in self._storage
self._base_schema = dict(
copy.deepcopy(self._storage[self._base_name]),
title=f"{self.schema_type}_base",
description=(f"Base schema for all subtypes of "
f"{self.schema_type} components."),
additionalProperties=True)
if self._base_kwargs:
self._base_schema.update(self._base_kwargs)
if self.subtype_key in self._base_schema.get('required', []):
self._base_schema['required'].remove(self.subtype_key)
if not self._base_schema['required']:
self._base_schema.pop('required')
if self.default_subtype is None:
self._base_schema['properties'][self.subtype_key].pop(
'default', None)
else:
self._base_schema['properties'][
self.subtype_key]['default'] = self.default_subtype
if self._base_subtype_description:
self._base_schema['properties'][self.subtype_key][
'description'] = self._base_subtype_description
driver_list = []
for v in self._storage.values():
for k in v['properties'].get('driver', {}).get('enum', []):
if k and k not in driver_list:
driver_list.append(k)
if driver_list:
self.add_legacy_properties(
self._base_schema, driver_list=driver_list, base=True)
if 'required' in self._base_schema:
self._base_schema['required'] = set(
self._base_schema['required'])
for v in self._storage.values():
self._base_schema['required'] &= set(v.get('required', []))
self._base_schema['required'] = sorted(
list(self._base_schema['required']))
if not self._base_schema['required']: # pragma: no cover
del self._base_schema['required']
# Update base schema, checking for compatiblity
prop_overlap = set(self._base_schema['properties'].keys())
for v in self._storage.values():
prop_overlap &= set(v['properties'].keys())
# Force subtype keys to be included
prop_overlap.add(self.subtype_key)
new_base_prop = {k: self._base_schema['properties'][k]
for k in prop_overlap}
for v in self._storage.values():
for k in prop_overlap:
old = new_base_prop[k]
new = v['properties'][k]
# Don't compare descriptions or properties defining the
# subtype (like the subtype key or driver)
if k not in [self.subtype_key, 'driver']:
if not self.compare_body(old, new): # pragma: debug
raise ValueError(
f"Schema for property '{k}' of class"
f" '{v['title']}'"
f" is {new}, which differs from the existing"
f" base class value ({old}). Check that"
f" another class dosn't have a conflicting"
f" definition of the same property.")
if ((k in self._base_schema.get('required', [])
and not self.compare_body(old, new,
only_keys=['default']))):
self.required_by_subtype.append(k)
old.pop('default', None)
# Assign original copy that includes description
if ((k in [self.subtype_key, 'driver']
and ('enum' in old or 'enum' in new))):
old['enum'] = sorted([
x for x in (set(old.get('enum', []))
| set(new.get('enum', []))) if x])
self._base_schema['properties'] = new_base_prop
return copy.deepcopy(self._base_schema)
[docs] def get_subtype_schema(self, subtype, unique=False, relaxed=False,
allow_instance=False, for_form=False,
partnered=False):
r"""Get the schema for the specified subtype.
Args:
subtype (str): Component subtype to return schema for. If 'base',
the schema for evaluating the component base will be returned.
unique (bool, optional): If True, the returned schema will only
contain properties that are specific to the specified subtype.
If subtype is 'base', these will be properties that are valid
for all of the registerd subtypes. Defaults to False.
relaxed (bool, optional): If True, the schema will allow additional
properties. Defaults to False.
allow_instance (bool, optional): If True, the returned schema will
validate instances of this component in addition to documents
describing a component. Defaults to False.
for_form (bool, optional): If True, the returned schema will be
formatted for easy parsing by form generation tools. Defaults
to False. Causes relaxed and allow_instance to be ignored.
Returns:
dict: Schema for specified subtype.
"""
if for_form:
relaxed = False
allow_instance = False
partnered = False
if subtype == 'base':
out = self.get_base_schema()
# Add additional properties that apply to specific subtypes
if not unique:
if self.default_subtype:
out['properties'][self.subtype_key]['default'] = self.default_subtype
if ((for_form
and all(self.subtype_key in x.get('required', [])
for x in self._storage.values()))):
out.setdefault('required', [])
out['required'].append(self.subtype_key)
out['additionalProperties'] = False
for x in self._storage.values():
for k, v in x['properties'].items():
if (k != self.subtype_key):
if (k not in out['properties']):
out['properties'][k] = copy.deepcopy(v)
if for_form:
out['properties'][k]['options'] = {
'dependencies': {self.subtype_key: []}}
if for_form and ('options' in out['properties'][k]):
out['properties'][k]['options']['dependencies'][
self.subtype_key] += (
x['properties'][self.subtype_key]['enum'])
if ((partnered and self.required_by_subtype
and out.get('required', []))):
for k in self.required_by_subtype:
if k in out['required']:
out['required'].remove(k)
assert out['required']
# if not out['required']:
# del out['required']
else: # pragma: debug
assert (relaxed or for_form)
# elif not (relaxed or for_form):
# # Add place holders
# for x in self._storage.values():
# for k, v in x['properties'].items():
# self._add_placeholder(out['properties'], k, v)
else:
if subtype not in self._storage:
s2c = self.subtype2class
if subtype in s2c:
subtype = s2c[subtype]
out = copy.deepcopy(self._storage[subtype])
if subtype in self._no_inherit_kwargs:
out.update(self._no_inherit_kwargs[subtype])
base = self.get_base_schema()
# Remove properties that apply to all subtypes
if unique:
skip_props = [self.subtype_key, 'driver']
skip_props += self.required_by_subtype
out['additionalProperties'] = True
if 'required' in out:
out['required'] = sorted(
list(set(out['required'])
- (set(base.get('required', []))
- set(skip_props))))
if not out['required']:
del out['required']
if isinstance(base.get('allowSingular', False), str):
skip_props.append(base['allowSingular'])
for k, v in base['properties'].items():
if ((k not in skip_props and k in out['properties']
and k not in out.get('required', []))):
del out['properties'][k]
assert relaxed
# if not relaxed:
# self._add_placeholder(out['properties'], k, v)
if not out['properties']: # pragma: no cover
del out['properties']
# TODO: Remove pushProperties/pullProperties based on
# keys
assert not (partnered and self._base_kwargs)
# if partnered and self._base_kwargs:
# for k in self._base_kwargs:
# if k in ['pushProperties', 'pullProperties']:
# out.pop(k, None)
out['additionalProperties'] = relaxed
if allow_instance:
if subtype == 'base':
comp_cls = self.base_subtype_class
else:
from yggdrasil.components import import_component
comp_cls = import_component(
self.schema_type, subtype=subtype)
out = {'oneOf': [out, {'type': 'instance',
'class': comp_cls}]}
return out
[docs] def get_subtype_order(self):
r"""Get the order that subtypes should be in the schema, with the
base subtype first.
Returns:
list: Subtypes in order.
"""
baseSchemaKey = self.default_subtype
order = sorted(self._storage.keys())
if baseSchemaKey is not None:
if baseSchemaKey not in order:
baseSchemaKey = self.subtype2class.get(baseSchemaKey,
baseSchemaKey)
if baseSchemaKey in order:
order.remove(baseSchemaKey)
order.insert(0, baseSchemaKey)
return order
@classmethod
def _subtype_defkey(cls, schema_type, subtype):
return f"{schema_type}-subtype-{subtype}".replace('+', 'p')
# @classmethod
# def _is_placeholder(cls, x):
# return (x is True)
# @classmethod
# def _add_placeholder(cls, props, k, v):
# if k not in props:
# props[k] = {}
# if 'aliases' in v:
# for alias in v['aliases']:
# props[alias] = {}
[docs] def get_subtype_definition_ref(self, subtype):
r"""Get the address for a subtype's schema definition that can
be used in references.
Args:
subtype (str): Component subtype.
Returns:
str: Definition address.
"""
if subtype in self.class2subtype:
subtype = self.class2subtype[subtype][0]
key = self._subtype_defkey(self.schema_type, subtype)
return f"#/definitions/{key}"
[docs] def get_subtype_definitions(self, unique='both',
relaxed=False, **kwargs):
r"""Get subtype definitions.
Args:
**kwargs: Additional keyword arguments are passed to
get_subtype_schema calls for each subtype.
Returns:
dict: Dictionary of subtype definitions.
"""
order = self.get_subtype_order()
out = {}
unique_base = (unique in ['base', 'both'])
unique_subt = (unique in ['subtype', 'both'])
relaxed_base = True
relaxed_subt = True
partnered_base = (unique in 'subtype')
partnered_subt = (unique in 'base')
if not relaxed:
assert unique in ['subtype', 'base', 'both']
if unique in ['base', 'both']:
relaxed_base = True
relaxed_subt = False
elif unique in ['subtype']:
relaxed_base = False
relaxed_subt = True
else: # pragma: completion
relaxed_base = False
relaxed_subt = False
base = self.get_subtype_schema('base', unique=unique_base,
partnered=partnered_base,
relaxed=relaxed_base, **kwargs)
out[self._subtype_defkey(self.schema_type, "base")] = base
for subC in order:
subT = self.class2subtype[subC][0]
x = self.get_subtype_schema(subC, unique=unique_subt,
partnered=partnered_subt,
relaxed=relaxed_subt, **kwargs)
out[self._subtype_defkey(self.schema_type, subT)] = x
if unique_subt:
# Only remove these if they will be in the base
for k in ['pushProperties', 'pullProperties']:
if k in base:
x.pop(k, None)
else:
# TODO: Unclear what this should be if properties added
# via relaxed
for k in ['pushProperties', 'pullProperties']:
if k in base:
x.setdefault(k, {})
x[k].update(base[k])
return out
[docs] def get_driver_definition(self, allow_driver=False):
r"""Get a definition for a driver schema.
Args:
allow_driver (bool, optional): If not True, the outputs will
be a null schema. Defaults to False.
Returns:
dict: Driver schema.
"""
if allow_driver:
out = {"allOf": [
{"$ref": f"#/definitions/{self.schema_type}"},
{"required": ["driver", "args"],
"properties": {
'driver': {'type': 'string',
'deprecated': True,
'description': (
'[DEPRECATED] Name of driver '
'class that should be used.')},
'args': {'type': 'string',
'deprecated': True,
'description': (
'[DEPRECATED] Arguments that should '
'be provided to the driver.')}}}
]}
if self.schema_type == 'file':
out['allOf'][1]['required'].append('working_dir')
out['allOf'][1]['properties']['working_dir'] = {
'type': 'string'}
else:
out = {'type': 'null'}
return out
[docs] def get_definitions(self, for_form=False, allow_instance=False,
allow_driver=False, full=False, relaxed=False,
**kwargs):
r"""Get the set of definitions defining this component type.
Args:
for_form (bool, optional): If True, the returned schema will
be formatted for easy parsing by form generation tools.
Defaults to False. Causes relaxed, allow_instance, and
full to be ignored.
allow_instance (bool, optional): If True, the returned schema
will validate instances of this component in addition to
documents describing a component. Defaults to False.
allow_driver (bool, optional): If True, the returned
definitions will include the deprecated driver based
schemas. Defaults to False.
full (bool, optional): If True, properties are made explicit
for each component subtype resulting in a larger schema
that has a simplified logic for parsing. If False, the
schema will be compressed such that properties occuring
across all component subtypes will only occur once in
the base schema. Defaults to False.
**kwargs: Additional keyword arguments are passed to
get_subtype_definitions calls.
Returns:
dict: Set of component definitions.
"""
combo = self.get_schema(for_form=for_form,
allow_instance=allow_instance)
out = {self.schema_type: combo}
if full:
out.update(self.get_subtype_definitions(
unique='base', relaxed=relaxed, **kwargs))
else:
out.update(self.get_subtype_definitions(
unique='subtype', relaxed=relaxed, **kwargs))
if ((self.schema_type in ['connection', 'comm', 'file', 'model']
and not for_form)):
out[f"{self.schema_type}_driver"] = self.get_driver_definition(
allow_driver=allow_driver)
return out
[docs] def get_schema(self, for_form=False, allow_instance=False):
r"""Get the schema defining this component.
Args:
for_form (bool, optional): If True, the returned schema will
be formatted for easy parsing by form generation tools.
Defaults to False. Causes relaxed, allow_instance, and
full to be ignored.
allow_instance (bool, optional): If True, the returned schema
will validate instances of this component in addition to
documents describing a component. Defaults to False.
Returns:
dict: Schema for this component.
"""
combo = {
'description': f'Schema for {self.schema_type} components.',
'title': (f"complete-{self.schema_type}-{self.subtype_key}")}
# f"-{self.default_subtype}")}
if for_form:
combo.update(self.get_subtype_schema('base', for_form=True))
return combo
order = self.get_subtype_order()
combo['allOf'] = [
{'$ref': self.get_subtype_definition_ref("base")},
{'anyOf': [{'$ref': self.get_subtype_definition_ref(x)}
for x in order]}]
# This does allow for properties to be removed via push/pull
# but is more elegant than duplicating properties in the base
# schema and so is preservered
# if not relaxed:
# all_prop = set()
# for x in self._storage.values():
# all_prop.update(set(x['properties'].keys()))
# for v in x['properties'].values():
# all_prop.update(set(v.get('aliases', [])))
# combo['allOf'].append(
# {'type': 'object',
# 'additionalProperties': False,
# 'properties': {k: {} for k in all_prop}})
if allow_instance:
combo['oneOf'] = [{'allOf': combo.pop('allOf')},
{'type': 'instance',
'class': self.base_subtype_class}]
return combo
# def set_required_by_subtype(self, props):
# r"""Update schema so that specified properties are required at
# the subtype level instead of in the base schema to allow
# subtypes to specify defaults.
# Args:
# props (list): List of properties to require by subtype.
# """
# if not props:
# return
# for x in self._storage.values():
# x.setdefault('required', [])
# x.setdefault('properties', {})
# for k in props:
# if k not in x['required']:
# x['required'].append(k)
# if k not in x['properties']:
# x['properties'][k] = copy.deepcopy(
# self._base_schema['properties'][k])
# if self._base_schema.get('required', []):
# for k in props:
# if k in self._base_schema['required']:
# self._base_schema['required'].remove(k)
# if not self._base_schema['required']:
# del self._base_schema['required']
[docs] @classmethod
def from_definitions(cls, schema, defs, schema_registry=None):
r"""Construct a ComponentSchema from a schema.
Args:
schema (dict): Schema.
Returns:
ComponentSchema: Schema with information from schema.
"""
_, schema_type, subtype_key = schema['title'].split('-')
subt_base = defs[cls._subtype_defkey(schema_type, "base")]
subt_default = subt_base['properties'][subtype_key].get('default', None)
subt_description = subt_base['properties'][subtype_key].get('description', '')
subt_schema = [defs[x['$ref'].split('#/definitions/')[-1]]
for x in schema['allOf'][1]['anyOf']]
# Initialize schema
out = cls(schema_type, subtype_key,
schema_registry=schema_registry)
out.default_subtype = subt_default
# out._base_schema = subt_base
prop_base = list(subt_base['properties'].keys())
for v in subt_schema:
for k in v['properties'].keys():
if k in prop_base:
prop_base.remove(k)
prop_base += ['driver', subtype_key]
base_unique = {k: v for k, v in subt_base['properties'].items()
if k in prop_base}
kwargs_base = {k: subt_base[k] for k in
['pushProperties', 'pullProperties']
if k in subt_base}
for v in subt_schema:
v_module_name, v_class_name = v['title'].split('.')[-2:]
v_new = copy.deepcopy(v)
v_new['required'] = sorted(list(
set(v.get('required', [])) | set(subt_base.get('required', []))))
if not v_new['required']:
del v_new['required']
v_new['properties'] = copy.deepcopy(base_unique)
v_new['properties'].update(v['properties'])
# TODO: Compare with base
kwargs_no_inherit = {k: v_new.pop(k) for k in
['pushProperties', 'pullProperties']
if k in v_new and k not in subt_base}
out.append_schema(v_class_name, v_new,
description=subt_description,
default=subt_default,
kwargs_base=kwargs_base,
kwargs_no_inherit=kwargs_no_inherit)
subtypes = v['properties'][out.subtype_key]['enum']
out.schema_subtypes[v_class_name] = subtypes
out.schema_modules[v_class_name] = v_module_name
v_module = '.'.join(v['title'].split('.')[:-2])
if out.module is None:
out.module = v_module
else:
assert v_module == out.module
# Remove placeholder properties in base and subtypes
# for x in list(out._storage.values()):
# for k in list(x['properties'].keys()):
# if cls._is_placeholder(x['properties'][k]):
# del x['properties'][k]
return out
[docs] @classmethod
def from_registry(cls, schema_type, registry, **kwargs):
r"""Construct a ComponentSchema from a registry entry.
Args:
schema_type (str): Name of component type to build.
registry (dict): Registry information for the component.
**kwargs: Additional keyword arguments are passed to the class
__init__ method.
Returns:
ComponentSchema: Schema with information from classes.
"""
schema_subtypes = {}
for k, v in registry['subtypes'].items():
if v not in schema_subtypes:
schema_subtypes[v] = []
schema_subtypes[v].append(k)
schema_modules = {}
for k, v in registry.get('subtype_modules', {}).items():
if v not in schema_modules:
schema_modules[v] = []
schema_modules[v].append(k)
kwargs.update(module=registry['module'],
schema_subtypes=schema_subtypes,
schema_modules=schema_modules)
out = cls(schema_type, registry['key'], **kwargs)
for x in registry['classes'].values():
out.append_class(x, verify=True)
return out
# @property
# def properties(self):
# r"""list: Valid properties for this component."""
# return sorted(list(self.get_subtype_schema('base')['properties'].keys()))
[docs] def get_subtype_properties(self, subtype):
r"""Get the valid properties for a specific subtype.
Args:
subtype (str): Name of the subtype to get keys for.
Returns:
list: Valid properties for the specified subtype.
"""
return sorted(list(self.get_subtype_schema(subtype)['properties'].keys()))
@property
def class2subtype(self):
r"""dict: Mapping from class to list of subtypes."""
return self.schema_subtypes
@property
def subtype2class(self):
r"""dict: Mapping from subtype to class."""
out = {}
for k, v in self.schema_subtypes.items():
for iv in v:
out[iv] = k
return out
@property
def subtype2module(self):
r"""dict: Mapping from subtype to module."""
out = {}
for k, v in self.schema_subtypes.items():
for iv in v:
out[iv] = self.schema_modules[k]
return out
@property
def base_subtype_class_name(self):
r"""str: Name of base class for the subtype."""
if not getattr(self, '_base_subtype_class_name', None):
self.base_subtype_class
return self._base_subtype_class_name
@property
def base_subtype_class(self):
r"""ComponentClass: Base class for the subtype."""
if not getattr(self, '_base_subtype_class', None):
default_class = list(self.schema_subtypes.keys())[0]
cls = getattr(
importlib.import_module(f"{self.module}.{default_class}"),
default_class)
base_class = cls
for i, x in enumerate(cls.__mro__):
if x._schema_type != cls._schema_type:
break
base_class = x
else: # pragma: debug
raise RuntimeError(
f"Could not determine a base class for "
f"{self.schema_type} (using class {cls})")
self._base_subtype_class = base_class
self._base_subtype_class_name = base_class.__name__
return self._base_subtype_class
@property
def subtypes(self):
r"""list: All subtypes for this schema type."""
out = []
for v in self.schema_subtypes.values():
out += v
return sorted(list(set(out)))
@property
def classes(self):
r"""list: All available classes for this schema."""
return sorted([k for k in self.schema_subtypes.keys()])
[docs] @classmethod
def compare_body(cls, a, b, only_keys=None,
ignore_keys=['description', 'default',
'maxItems', 'minItems']):
r"""Compare two schemas, ignoring some keys.
Args:
a (dict): First schema for comparison.
b (dict): Second schema for comparison.
only_keys (list, optional): Keys to compare.
ignore_keys (list, optional): Keys to ignore.
Returns:
bool: True if the schemas are equivalent, False otherwise.
"""
if only_keys:
for k in only_keys:
if a.get(k, None) != b.get(k, None):
return False
return True
a_cpy = copy.deepcopy(a)
b_cpy = copy.deepcopy(b)
for k in ignore_keys:
a_cpy.pop(k, None)
b_cpy.pop(k, None)
return a_cpy == b_cpy
[docs] @classmethod
def add_legacy_properties(cls, new_schema, driver_list=None,
base=False):
r"""Add driver/args legacy properties to the schema.
Args:
new_schema (dict): Schema to add properties to.
driver_list (list, optional): Drivers that are valid for
the schema.
"""
legacy_properties = {
'driver': {'type': 'string',
'deprecated': True,
'enum': driver_list,
'description': (
'[DEPRECATED] Name of driver '
'class that should be used.')},
'args': {'type': 'string',
'deprecated': True,
'description': (
'[DEPRECATED] Arguments that should '
'be provided to the driver.')}}
if not driver_list:
legacy_properties.pop('args')
legacy_properties['driver']['enum'] = ['']
for k, v in legacy_properties.items():
if k not in new_schema['properties']:
new_schema['properties'][k] = v
if 'dependencies' not in new_schema and base:
new_schema['dependencies'] = {'driver': ['args']}
[docs] def append_schema(self, name, new_schema, verify=False,
description=None, default=None, kwargs_base=None,
kwargs_no_inherit=None):
r"""Append component schema to the schema.
Args:
name (str): Name to store the schema under.
new_schema (dict): New schema to store.
verify (bool, optional): If True, verify the schema after
adding the schema. Defaults to False.
description (str, optional): Description of the base
subtype property.
default (str, optional): Default subtype for base.
kwargs_base (dict, optional): Keyword arguments added to the
base schema and removed from unique subtype schemas.
kwargs_no_inherit (dict, optional): Keyword arguments not
inherited by other subtypes or the base schema.
"""
# Set properties for creating base schema or that would be
# present in base schema if not in every subschema
# if kwargs_base:
# new_schema.update(kwargs_base)
if kwargs_no_inherit: # TODO: Remove these from base?
self._no_inherit_kwargs[name] = kwargs_no_inherit
# new_schema.update(kwargs_no_inherit)
self._storage[name] = copy.deepcopy(new_schema)
if self._base_name is None:
self._base_name = name
self._base_kwargs = kwargs_base
self._base_subtype_description = description
self.default_subtype = default
# Verify that the schema is valid
if verify:
rapidjson.Normalizer.check_schema(self.get_schema())
self._base_schema = None # Reset so that it will be regen
[docs] def append_class(self, comp_cls, verify=False):
r"""Append component class to the schema.
Args:
comp_cls (class): Component class that should be added.
verify (bool, optional): If True, verify the schema after
adding the component class. Defaults to False.
"""
assert comp_cls._schema_type == self.schema_type
assert comp_cls._schema_subtype_key == self.subtype_key
name = comp_cls.__name__
fullname = f'{comp_cls.__module__}.{comp_cls.__name__}'
subtype_module = '.'.join(comp_cls.__module__.split('.')[:-1])
# Append subtype
subtype_list = copy.deepcopy(
getattr(comp_cls, f'_{self.subtype_key}', None))
if not isinstance(subtype_list, list):
subtype_list = [subtype_list]
subtype_list += getattr(comp_cls, '_%s_aliases' % self.subtype_key, [])
driver_list = []
driver_list += getattr(comp_cls, '_deprecated_drivers', [])
if name.endswith(('Driver', 'Model')):
driver_list.append(name)
self.schema_subtypes[name] = subtype_list
self.schema_modules[name] = comp_cls.__module__.split('.')[-1]
assert subtype_module == self.module
# Create new schema for subtype
new_schema = {'title': fullname,
'description': ('Schema for %s component %s subtype.'
% (self.schema_type, subtype_list)),
'type': 'object',
'required': copy.deepcopy(comp_cls._schema_required),
'properties': copy.deepcopy(comp_cls._schema_properties),
'additionalProperties': False}
new_schema.update(comp_cls._schema_additional_kwargs)
if not new_schema['required']:
del new_schema['required']
new_schema['properties'].setdefault(self.subtype_key, {})
new_schema['properties'][self.subtype_key]['enum'] = subtype_list
new_schema['properties'][self.subtype_key].setdefault('type', 'string')
# Add legacy properties
prev_drivers = (
self._base_name
and 'driver' in self._storage[self._base_name]['properties'])
if driver_list or prev_drivers:
self.add_legacy_properties(new_schema, driver_list)
if driver_list and not prev_drivers:
for v in self._storage.values():
self.add_legacy_properties(v)
# Add description/default for subtype to subtype property after
# recording base to prevent overwriting of the property rather
# than the property value.
base_description = new_schema['properties'][self.subtype_key].get(
'description', '')
base_default = new_schema['properties'][self.subtype_key].get(
'default', None)
if comp_cls._schema_subtype_description is not None:
new_schema['properties'][self.subtype_key]['description'] = (
comp_cls._schema_subtype_description)
if comp_cls._schema_no_default_subtype:
new_schema['properties'][self.subtype_key].pop('default', None)
elif subtype_list:
new_schema['properties'][self.subtype_key]['default'] = subtype_list[0]
kws = dict(
kwargs_base=comp_cls._schema_additional_kwargs_base,
kwargs_no_inherit=comp_cls._schema_additional_kwargs_no_inherit,
description=base_description, default=base_default)
self.append_schema(name, new_schema, verify=verify, **kws)
[docs]class SchemaRegistry(object):
r"""Registry of schema's for different integration components.
Args:
registry (dict, optional): Dictionary of registered components.
Defaults to None and the registry will be empty.
required (list, optional): Components that are required. Defaults to
['comm', 'file', 'model', 'connection']. Ignored if registry is None.
Raises:
ValueError: If registry is provided and one of the required components
is missing.
"""
_normalizers = {}
_default_required_components = ['comm', 'file', 'model', 'connection']
def __init__(self, registry=None, required=None):
super(SchemaRegistry, self).__init__()
self._cache = {}
self._storage = SchemaDict()
if required is None:
required = self._default_required_components
self.required_components = required
if registry is not None:
for k in required:
if k not in registry:
raise ValueError("Component %s required." % k)
# Create schemas for each component
for k, v in registry.items():
icomp = ComponentSchema.from_registry(k, v, schema_registry=self)
self.add(k, icomp, verify=True)
[docs] def add(self, k, v, verify=False):
r"""Add a new component schema to the registry."""
self._cache = {}
self._storage[k] = v
if verify:
rapidjson.Normalizer.check_schema(self.get_schema())
[docs] def get(self, k, *args, **kwargs):
r"""Return a component schema from the registry."""
return self._storage.get(k, *args, **kwargs)
[docs] def get_definitions(self, relaxed=False, allow_instance=False,
for_form=False, dont_copy=False, full=False,
allow_driver=False):
r"""Get schema definitions for the registered components.
Args:
relaxed (bool, optional): If True, the returned schema (and any
definitions it includes) are relaxed to allow for objects with
objects with additional properties to pass validation. Defaults
to False.
allow_instance (bool, optional): If True, the returned definitions will
validate instances of the components in addition to documents
describing components. Defaults to False.
for_form (bool, optional): If True, the returned schema will be
formatted for easy parsing by form generation tools. Defaults
to False. Causes relaxed, allow_instance, and full to be
ignored.
dont_copy (bool, optional): If True, a the cached definitions
are returned without copying. Defaults to False.
full (bool, optional): If True, properties are made explicit for
each component subtype resulting in a larger schema that has
a simplified logic for parsing. If False, the schema will be
compressed such that properties occuring across all component
subtypes will only occur once in the base schema. Defaults to
False.
allow_driver (bool, optional): If True, the returned definitions
will include the deprecated driver based schemas. Defaults to
False.
Returns:
dict: Schema defintiions for each of the registered components.
"""
cache_key = 'definitions'
if for_form:
cache_key += '_form'
full = False
relaxed = False
allow_instance = False
allow_driver = False
if relaxed:
cache_key += '_relaxed'
if allow_instance:
cache_key += '_instance'
if full:
cache_key += '_full'
if allow_driver:
cache_key += '_driver'
if cache_key not in self._cache:
out = {}
for k, v in self._storage.items():
out.update(v.get_definitions(for_form=for_form,
relaxed=relaxed,
allow_instance=allow_instance,
allow_driver=allow_driver,
full=full))
for k in self.required_components:
out.setdefault(k, {'type': 'string'})
self._cache[cache_key] = out
out = self._cache[cache_key]
if not dont_copy:
out = copy.deepcopy(out)
return out
[docs] def get_schema(self, relaxed=False, allow_instance=False, for_form=False,
partial=False, full=False, allow_driver=True):
r"""Get the schema defining this component.
Args:
relaxed (bool, optional): If True, the returned schema (and any
definitions it includes) are relaxed to allow for objects with
objects with additional properties to pass validation.
Defaults to False.
allow_instance (bool, optional): If True, the returned schema will
validate instances of this component in addition to documents
describing a component. Defaults to False.
for_form (bool, optional): If True, the returned schema will be
formatted for easy parsing by form generation tools. Defaults
to False. Causes relaxed, allow_instance, and full to be
ignored.
partial (bool, optional): If True, the schema for individual
yaml files will be used which allows for partial integration
specification. Defaults to False.
full (bool, optional): If True, properties are made explicit for
each component subtype resulting in a larger schema that has
a simplified logic for parsing. If False, the schema will be
compressed such that properties occuring across all component
subtypes will only occur once in the base schema. Defaults to
False.
allow_driver (bool, optional): If True, the returned schema
will include the deprecated driver based schemas. Defaults to
True.
Returns:
dict: Schema for this component.
"""
cache_key = 'schema'
if for_form:
cache_key += '_form'
full = False
relaxed = False
allow_instance = False
allow_driver = False
if relaxed:
cache_key += '_relaxed'
if allow_instance:
cache_key += '_instance'
if partial:
cache_key += '_partial'
if full:
cache_key += '_full'
if allow_driver:
cache_key += '_driver'
if cache_key not in self._cache:
if allow_driver:
model_ref = {'anyOf': [{'$ref': '#/definitions/model'},
{'$ref': '#/definitions/model_driver'}]}
conn_ref = {'anyOf': [{'$ref': '#/definitions/connection'},
{'$ref': '#/definitions/connection_driver'}]}
else:
model_ref = {'$ref': '#/definitions/model'}
conn_ref = {'$ref': '#/definitions/connection'}
out = {'title': 'YAML Schema',
'description': 'Schema for yggdrasil YAML input files.',
'type': 'object',
'definitions': self.get_definitions(
relaxed=relaxed, allow_instance=allow_instance,
for_form=for_form, dont_copy=True, full=full,
allow_driver=allow_driver),
'required': ['models', 'connections'],
'additionalProperties': False,
'properties': SchemaDict(
[('models', {'type': 'array',
'items': model_ref,
'minItems': 1,
'aliases': ['model']}),
('connections',
{'type': 'array',
'items': conn_ref,
'aliases': ['connection'],
'default': []})])}
if not for_form:
out['properties']['working_dir'] = {'type': 'string'}
out['pushProperties'] = {
'$properties/connections/items': ['working_dir'],
'$properties/models/items': ['working_dir']}
out['properties']['models']['allowSingular'] = True
out['properties']['connections']['allowSingular'] = True
if partial:
out.pop('additionalProperties')
out['properties']['models'].pop('minItems')
out['properties']['models']['default'] = []
self._cache[cache_key] = out
return copy.deepcopy(self._cache[cache_key])
@property
def definitions(self):
r"""dict: Schema definitions for different components."""
return self.get_definitions()
@property
def schema(self):
r"""dict: Schema for evaluating YAML input file."""
return self.get_schema()
@property
def form_schema(self):
r"""dict: Schema for generating a YAML form."""
out = self.get_schema(for_form=True)
out['definitions']['schema'] = copy.deepcopy(rapidjson.get_metaschema())
out = convert_extended2base(out)
return out
@property
def model_form_schema_props(self):
r"""dict: Information about how properties should be modified for the
model form schema."""
prop = {
# 'add': {},
'replace': {
'comm': {
'transform': {
"type": "array",
"items": {"$ref": "#/definitions/transform"}},
'default_file': {
'$ref': '#/definitions/file'}},
'file': {
'serializer': {
'$ref': '#/definitions/serializer'}}},
'required': {
'model': ['args', 'inputs', 'outputs', 'description',
'repository_url', 'repository_commit']},
'remove': {
'comm': ['is_default', 'length_map', 'serializer',
'address', 'dont_copy', 'for_service',
'client_id', 'cookies', 'host', 'params',
'port', 'commtype'],
'ocomm': ['default_value'],
'file': ['is_default', 'length_map',
'wait_for_creation', 'working_dir',
'read_meth', 'in_temp',
'serializer', 'datatype',
'address', 'dont_copy', 'for_service',
'client_id', 'cookies', 'host', 'params',
'port'],
'model': ['client_of', 'is_server', 'preserve_cache',
'products', 'source_products', 'working_dir',
'overwrite', 'skip_interpreter', 'copies',
'timesync', 'with_strace', 'with_valgrind',
'valgrind_flags', 'with_debugger', 'copies',
'logging_level', 'additional_variables',
'aggregation', 'interpolation', 'synonyms',
'driver']},
'order': {
'model': ['name', 'repository_url', 'repository_commit',
'contact_email', 'language', 'description',
'args', 'inputs', 'outputs'],
'comm': ['name', 'datatype']},
'update': {
'model': {
'inputs': {
'description': ('Zero or more channels carrying '
'input to the model'),
'items': {'$ref': '#/definitions/icomm'}},
'outputs': {
'description': ('Zero or more channels carrying '
'output from the model'),
'items': {'$ref': '#/definitions/ocomm'}},
'repository_commit': {
'description': ('Commit that should be checked out '
'from the model repository.')},
'args': {'minItems': 1}},
'file': {
'name': {
'description': ('Path to a file in the model '
'repository')}}},
}
return prop
@property
def model_form_schema(self):
r"""dict: Schema for generating a model YAML form."""
from yggdrasil import constants
out = self.get_schema(for_form=True)
scalar_types = list(constants.VALID_TYPES.keys())
meta = copy.deepcopy(rapidjson.get_metaschema())
meta_prop = {
'subtype': ['1darray', 'ndarray'],
'units': ['1darray', 'ndarray'] + scalar_types,
'precision': ['1darray', 'ndarray'] + scalar_types,
'length': ['1darray'],
'shape': ['ndarray']}
out['definitions']['simpleTypes'] = meta['definitions']['simpleTypes']
out['definitions']['simpleTypes'].update(type='string',
default='bytes')
out['definitions']['simpleTypes']['enum'].remove('scalar')
out['definitions']['schema'] = {'type': 'object',
'required': ['type'],
'properties': {}}
out['definitions']['schema']['properties']['type'] = {
'$ref': '#/definitions/simpleTypes'}
for k, types in meta_prop.items():
out['definitions']['schema']['properties'][k] = meta['properties'][k]
if types:
out['definitions']['schema']['properties'][k]['options'] = {
'dependencies': {'type': types}}
for k in ['comm', 'file', 'model']:
out['definitions'][k].pop('description', '')
for k in out['definitions'].keys():
if k in ['schema', 'simpleTypes']:
continue
out['definitions'][k].pop('title', None)
if ((('required' in out['definitions'][k])
and ('working_dir' in out['definitions'][k]['required']))):
out['definitions'][k]['required'].remove('working_dir')
for p, v in list(out['definitions'][k]['properties'].items()):
if v.get('description', '').startswith('[DEPRECATED]'):
out['definitions'][k]['properties'].pop(p)
# Process based on model_form_schema_props
prop = self.model_form_schema_props
def adjust_definitions(k):
# Remove
for p in prop['remove'].get(k, []):
out['definitions'][k]['properties'].pop(p, None)
if p in out['definitions'][k].get('required', []):
out['definitions'][k]['required'].remove(p)
# Replace
for r, v in prop['replace'].get(k, {}).items():
if 'description' in out['definitions'][k]['properties'].get(r, {}):
v['description'] = (
out['definitions'][k]['properties'][r]['description'])
out['definitions'][k]['properties'][r] = v
# Required
out['definitions'][k].setdefault('required', [])
for p in prop['required'].get(k, []):
if p not in out['definitions'][k]['required']:
out['definitions'][k]['required'].append(p)
# Update
for p, new in prop['update'].get(k, {}).items():
out['definitions'][k]['properties'][p].update(new)
# Add
# out['definitions'][k]['properties'].update(
# prop['add'].get(k, {}))
# Order
for i, p in enumerate(prop['order'].get(k, [])):
out['definitions'][k]['properties'][p]['propertyOrder'] = i
out['definitions'][k].pop('allowSingular', None)
out['definitions'][k].pop('pushProperties', None)
out['definitions'][k].pop('pullProperties', None)
# Update definitions
for k in ['model', 'comm', 'file']:
adjust_definitions(k)
for k in ['icomm', 'ocomm']:
out['definitions'][k] = copy.deepcopy(out['definitions']['comm'])
adjust_definitions(k)
out['definitions']['icomm']['oneOf'] = [
{'title': 'default file',
'required': ['default_file'],
'not': {'required': ['default_value']}},
{'title': 'default value',
'required': ['default_value'],
'not': {'required': ['default_file']}}]
# Adjust formating
for x in [out] + list(out['definitions'].values()):
for p, v in x.get('properties', {}).items():
if v.get("type", None) == "boolean":
v.setdefault("format", "checkbox")
# Isolate model
out.update(out['definitions'].pop('model'))
out['definitions'].pop('connection')
out.update(
title='Model YAML Schema',
description='Schema for yggdrasil model YAML input files.')
out = convert_extended2base(out)
return out
def __getitem__(self, k):
return self.get(k)
[docs] def keys(self):
return self._storage.keys()
[docs] def items(self):
return self._storage.items()
def __eq__(self, other):
if not hasattr(other, 'get_schema'):
return False
return (self.get_schema() == other.get_schema())
[docs] @classmethod
def from_file(cls, fname):
r"""Create a SchemaRegistry from a file.
Args:
fname (str): Full path to the file the schema should be loaded from.
"""
out = cls()
out.load(fname)
return out
[docs] def load(self, fname):
r"""Load schema from a file.
Args:
fname (str): Full path to the file the schema should be loaded from.
"""
with open(fname, 'r') as f:
contents = f.read()
schema = ordered_load(contents, Loader=yaml.SafeLoader)
if schema is None:
raise Exception("Failed to load schema from %s" % fname)
# Create components
for k, v in schema.get('definitions', {}).items():
# if k.endswith('_driver'):
# continue
if not v.get('title', '').startswith('complete-'):
continue
icomp = ComponentSchema.from_definitions(
v, schema['definitions'], schema_registry=self)
self.add(k, icomp)
[docs] def save(self, fname, schema=None):
r"""Save the schema to a file.
Args:
fname (str): Full path to the file the schema should be saved to.
schema (dict, optional): yggdrasil YAML options.
"""
if schema is None:
schema = self.get_schema()
with open(fname, 'w') as f:
ordered_dump(schema, stream=f, Dumper=yaml.SafeDumper)
# def validate(self, obj, normalize=False, **kwargs):
# r"""Validate an object against this schema.
# Args:
# obj (object): Object to valdiate.
# **kwargs: Additional keyword arguments are passed to get_schema.
# """
# if normalize:
# return self.normalize(obj, **kwargs)
# # TODO: Check schema?
# return rapidjson.validate(obj, self.get_schema(**kwargs))
[docs] def validate_model_submission(self, obj):
r"""Validate an object against the schema for models submitted to
the yggdrasil model repository.
Args:
obj (object): Object to validate.
"""
rapidjson.validate(obj, self.model_form_schema)
return obj
[docs] def validate_component(self, comp_name, obj, **kwargs):
r"""Validate an object against a specific component.
Args:
comp_name (str): Name of the component to validate against.
obj (object): Object to validate.
**kwargs: Additional keyword arguments are passed to
get_component_schema.
"""
comp_schema = self.get_component_schema(comp_name, **kwargs)
return rapidjson.validate(obj, comp_schema)
[docs] def normalize(self, obj, norm_kws=None, **kwargs):
r"""Normalize an object against this schema.
Args:
obj (object): Object to normalize.
norm_kws (dict, optional): Keyword arguments that should be
passed to rapidjson.normalize. Defaults to {}.
**kwargs: Additional keyword arguments are passed to get_schema.
Returns:
object: Normalized object.
"""
# TODO: Check schema?
if norm_kws is None:
norm_kws = {}
return rapidjson.normalize(obj, self.get_schema(**kwargs),
**norm_kws)
# def is_valid(self, obj):
# r"""Determine if an object is valid under this schema.
# Args:
# obj (object): Object to valdiate.
# Returns:
# bool: True if the object is valid, False otherwise.
# """
# try:
# self.validate(obj)
# except rapidjson.ValidationError:
# return False
# return True
# def is_valid_component(self, comp_name, obj):
# r"""Determine if an object is a valid represenation of a component.
# Args:
# comp_name (str): Name of the component to validate against.
# obj (object): Object to validate.
# Returns:
# bool: True if the object is valid, False otherwise.
# """
# try:
# self.validate_component(comp_name, obj)
# except rapidjson.ValidationError:
# return False
# return True
[docs] def get_component_schema(self, comp_name, subtype=None, relaxed=False,
allow_instance=False, allow_instance_definitions=False,
for_form=False):
r"""Get the schema for a certain component.
Args:
comp_name (str): Name of the component to get the schema for.
subtype (str, optional): Component subtype to get schema for.
Defaults to None and the schema for evaluating any subtype of
the specified component is returned.
relaxed (bool, optional): If True, the returned schema (and any
definitions it includes) are relaxed to allow for objects with
objects with additional properties to pass validation. Defaults
to False.
allow_instance (bool, optional): If True, the returned schema will
validate instances of this component in addition to documents
describing a component. Defaults to False.
allow_instance_definitions (bool, optional): If True, the definitions
in the returned schema will allow for instances of the components.
Defaults to False.
for_form (bool, optional): If True, the returned schema will be
formatted for easy parsing by form generation tools. Defaults
to False. Causes relaxed and allow_instance to be ignored.
**kwargs: Additonal keyword arguments are paseed to get_schema or
get_subtype_schema for the selected component type.
Returns:
dict: Schema for the specified component.
"""
if comp_name not in self._storage: # pragma: debug
raise ValueError("Unrecognized component: %s" % comp_name)
if subtype is None:
out = self._storage[comp_name].get_schema(
allow_instance=allow_instance, for_form=for_form)
else:
out = self._storage[comp_name].get_subtype_schema(
subtype, relaxed=relaxed, allow_instance=allow_instance,
for_form=for_form)
out['definitions'] = self.get_definitions(
relaxed=relaxed, allow_instance=allow_instance_definitions,
for_form=for_form)
return out
# def get_component_keys(self, comp_name):
# r"""Get the properties associated with a certain component.
# Args:
# comp_name (str): Name of the component to return keys for.
# Returns:
# list: All of the valid properties for the specified component.
# """
# return self._storage[comp_name].properties