Source code for yggdrasil.schema

import os
import copy
import pprint
import yaml
import json
import importlib
from collections import OrderedDict
from yggdrasil import rapidjson


_schema_fname = os.path.abspath(os.path.join(
    os.path.dirname(__file__), '.ygg_schema.yml'))
_schema = None
_constants_separator = (
    "\n# ======================================================\n"
    "# Do not edit this file past this point as the following\n"
    "# is generated by yggdrasil.schema.update_constants\n"
    "# ======================================================\n")


[docs]class SchemaDict(OrderedDict): r"""OrderedDict subclass for ordering schemas on read in Python 2.""" def __repr__(self): return pprint.pformat(dict(self))
[docs]def ordered_load(stream, object_pairs_hook=SchemaDict, **kwargs): r"""Load YAML document from a file using a specified class to represent mapping types that allows for ordering. Args: stream (file): File stream to load the schema YAML from. object_pairs_hook (type, optional): Class that should be used to represent loaded maps. Defaults to SchemaDict. **kwargs: Additional keyword arguments are passed to decode_yaml. Returns: object: Result of ordered load. """ kwargs['sorted_dict_type'] = object_pairs_hook from yggdrasil.serialize import YAMLSerialize return YAMLSerialize.decode_yaml(stream, **kwargs)
[docs]def ordered_dump(data, **kwargs): r"""Dump object as a YAML document, representing SchemaDict objects as mapping type. Args: data (object): Python object that should be dumped. **kwargs: Additional keyword arguments are passed to encode_yaml. Returns: str: YAML document representating data. """ from yggdrasil.serialize import YAMLSerialize kwargs['sorted_dict_type'] = [SchemaDict, OrderedDict] return YAMLSerialize.encode_yaml(data, **kwargs)
[docs]def clear_schema(): r"""Clear global schema.""" global _schema _schema = None
[docs]def init_schema(fname=None): r"""Initialize global schema.""" global _schema if _schema is None: _schema = load_schema(fname)
[docs]def create_schema(): r"""Create a new schema from the registry.""" from yggdrasil.components import init_registry, registering with registering(): x = SchemaRegistry(init_registry(recurse=True)) update_constants(x) return x
[docs]def load_schema(fname=None): r"""Return the yggdrasil schema for YAML options. Args: fname (str, optional): Full path to the file that the schema should be loaded from. If the file dosn't exist, it is created. Defaults to _schema_fname. Returns: dict: yggdrasil YAML options. """ if fname is None: fname = _schema_fname if not os.path.isfile(fname): x = create_schema() x.save(fname) return SchemaRegistry.from_file(fname)
[docs]def get_schema(fname=None): r"""Return the yggdrasil schema for YAML options. Args: fname (str, optional): Full path to the file that the schema should be loaded from. If the file dosn't exist, it is created. Defaults to _schema_fname. Returns: dict: yggdrasil YAML options. """ global _schema if fname is None: init_schema() out = _schema else: out = load_schema(fname) return out
[docs]def convert_extended2base(s): r"""Covert schema from the extended form to a strictly JSON form. Args: s (object): Object to updated. Returns: object: Updated JSON object. """ # TODO: Automate this on classes type_map = {'int': 'integer', 'uint': 'integer', 'float': 'number', 'complex': 'string', 'unicode': 'string', 'bytes': 'string', 'function': 'string', 'class': 'string', 'instance': 'string', '1darray': 'array', 'ndarray': 'array', 'obj': 'object', 'ply': 'object', 'any': ["number", "string", "boolean", "object", "array", "null"]} if isinstance(s, (list, tuple)): s = [convert_extended2base(x) for x in s] elif isinstance(s, (dict, OrderedDict)): if 'type' in s: if isinstance(s['type'], str): if s['type'] in ['schema']: s = {"$ref": "#/definitions/schema"} elif s['type'] in type_map: s['type'] = type_map[s['type']] s.pop('class', None) elif s['type'] in ['scalar']: s.pop("precision", None) s.pop("units", None) s['type'] = s.pop('subtype') s['type'] = type_map.get(s['type'], s['type']) elif isinstance(s['type'], list): assert 'schema' not in s['type'] assert 'scalar' not in s['type'] s['type'] = [type_map.get(t, t) for t in s['type']] if all([t == s['type'][0] for t in s['type']]): s['type'] = s['type'][0] opt = copy.deepcopy(s.get('options', None)) s = {k: convert_extended2base(v) for k, v in s.items()} if opt is not None: s['options'] = opt return s
[docs]def get_json_schema(fname_dst=None, indent=None): r"""Return the yggdrasil schema as a strictly JSON schema without any of the extended datatypes. Args: fname_dst (str, optional): Full path to file where the JSON schema should be saved. Defaults to None and no file is created. indent (str, optional): Indentation that should be used when saving the schema to a file. Returns: dict: Converted structure. """ s = get_schema() out = s.get_schema() out['definitions']['schema'] = copy.deepcopy(rapidjson.get_metaschema()) out = convert_extended2base(out) if fname_dst is not None: with open(fname_dst, 'w') as fd: json.dump(out, fd, indent=indent) return out
[docs]def get_model_form_schema(fname_dst=None, **kwargs): r"""Return the yggdrasil schema that can be used to generate a form for creating a model specification file. Args: fname_dst (str, optional): Full path to file where the JSON schema should be saved. Defaults to None and no file is created. **kwargs: Additional keyword arguments are passed to the json.dump call if fname_dst is provided and ignored otherwise. Returns: dict: Schema structure. """ s = get_schema() out = s.model_form_schema if fname_dst is not None: with open(fname_dst, 'w') as fd: json.dump(out, fd, **kwargs) return out
[docs]def update_constants(schema=None): r"""Update constants.py with info from the schema.""" from yggdrasil.components import import_component from yggdrasil.drivers.CompiledModelDriver import ( get_compilation_tool_registry) if schema is None: schema = get_schema() def as_lines(x, newline='\n', key_order=None): out = "" if isinstance(x, dict): if key_order is None: key_order = list(sorted(x.keys())) out += "{" + newline for k in key_order: v = x[k] out += " %s: %s," % ( repr(k), as_lines(v, newline=(newline + ' '))) + newline out += "}" elif isinstance(x, list): out += "[" + newline out += " " + ", ".join( [as_lines(xx, newline=(newline + ' ')) for xx in x]) + "]" else: out += repr(x) return out filename = os.path.join(os.path.dirname(__file__), 'constants.py') # Component information component_registry = {} for k, v in schema.items(): component_registry[k] = { 'module': v.module, 'default': v.default_subtype, 'base': v.base_subtype_class_name, 'key': v.subtype_key, 'subtypes': v.subtype2class, 'subtype_modules': v.subtype2module} # File information files = { k: import_component('file', v) for k, v in component_registry['file']['subtypes'].items()} file2ext = {} ext2file = {'.txt': 'ascii'} for k, f in files.items(): file2ext.setdefault(k, f._extensions[0]) for ext in f._extensions: ext2file.setdefault(ext, k) # Language driver information drivers = {k: import_component('model', v) for k, v in component_registry['model']['subtypes'].items()} language_cat = ['compiled', 'interpreted', 'build', 'dsl', 'other'] typemap = {'compiler': 'compiled', 'interpreter': 'interpreted'} lang2ext = {'yaml': '.yml', 'executable': '.exe'} languages = {k: [] for k in language_cat} languages_with_aliases = {k: [] for k in language_cat} language_properties = {} compiler_env_vars = {} compilation_tool_vars = {} complete = [] aliased_languages = {} for k, drv in drivers.items(): if drv.language in complete: continue complete.append(drv.language) drv_type = typemap.get(drv.executable_type, drv.executable_type) if drv.language_ext: if k not in lang2ext: assert isinstance(drv.language_ext, list) lang2ext[k] = drv.language_ext[0] for ka in drv.language_aliases: lang2ext[ka] = lang2ext[k] languages.setdefault(drv_type, []) languages[drv_type].append(drv.language) languages_with_aliases.setdefault(drv_type, []) languages_with_aliases[drv_type].append(drv.language) languages_with_aliases[drv_type] += drv.language_aliases if drv.language_aliases: aliased_languages[drv.language] = [drv.language] + drv.language_aliases language_properties[drv.language] = { 'executable_type': drv_type, 'is_typed': drv.is_typed, 'full_language': drv.full_language} languages = {k: sorted(v) for k, v in languages.items()} languages_with_aliases = {k: sorted(v) for k, v in languages_with_aliases.items()} for x in ['compiler', 'linker', 'archiver']: reg = get_compilation_tool_registry(x).get('by_language', {}) for lang, tools in reg.items(): for v in tools.values(): k = v.toolname if v.is_build_tool or (k in compilation_tool_vars): continue compilation_tool_vars[k] = { 'exec': v.default_executable_env, 'flags': v.default_flags_env} if (x == 'compiler') and (lang not in compiler_env_vars): compiler_env_vars[lang] = compilation_tool_vars[k].copy() language_cat = list(languages.keys()) with open(filename, 'r') as fd: lines = [fd.read().split(_constants_separator)[0], _constants_separator[1:]] lines += [ "", "# Component registry", f"COMPONENT_REGISTRY = {as_lines(component_registry)}"] lines += [ "", "# File constants", "FILE2EXT = %s" % as_lines(file2ext), "EXT2FILE = %s" % as_lines(ext2file)] lines += [ "", "# Language driver constants", "LANG2EXT = %s" % as_lines(lang2ext), "EXT2LANG = {v: k for k, v in LANG2EXT.items()}", "LANGUAGES = %s" % as_lines(languages, key_order=language_cat)] lines.append( "LANGUAGES['all'] = (\n LANGUAGES[%s]" % repr(language_cat[0])) lines += [" + LANGUAGES[%s]" % repr(k) for k in language_cat[1:]] lines[-1] += ")" lines += [ "LANGUAGES_WITH_ALIASES = %s" % as_lines(languages_with_aliases, key_order=language_cat)] lines.append( "LANGUAGES_WITH_ALIASES['all'] = (\n LANGUAGES_WITH_ALIASES[%s]" % repr(language_cat[0])) lines += [" + LANGUAGES_WITH_ALIASES[%s]" % repr(k) for k in language_cat[1:]] lines[-1] += ")" lines += [ "ALIASED_LANGUAGES = %s" % as_lines(aliased_languages)] lines += [ "COMPILER_ENV_VARS = %s" % as_lines(compiler_env_vars), "COMPILATION_TOOL_VARS = %s" % as_lines(compilation_tool_vars)] lines += [ "LANGUAGE_PROPERTIES = %s" % as_lines(language_properties)] with open(filename, 'w') as fd: fd.write('\n'.join(lines) + '\n')
[docs]class ComponentSchema(object): r"""Schema information for one component. Args: schema_type (str): The name of the component. subtype_key (str): The name of the schema property/class attribute that should be used to differentiate between subtypes of this component. schema_registry (SchemaRegistry, optional): Registry of schemas that this schema is dependent on. **kwargs: Additional keyword arguments are entries in the component schema. Args: schema_type (str): The name of the component. schema_registry (SchemaRegistry): Registry of schemas. subtype_key (str): Schema property that is used to differentiate between subtypes of this component. schema_subtypes (dict): Mapping between component class names and the associated values of the subtype_key property for this component. schema_modules (dict): Mapping between component class names and the modules that they are in. """ def __init__(self, schema_type, subtype_key, schema_registry=None, module=None, schema_subtypes=None, schema_modules=None): self._storage = SchemaDict() self._base_name = None self._base_kwargs = None self._base_subtype_description = None self._base_schema = None self._no_inherit_kwargs = {} self.required_by_subtype = [] self.schema_type = schema_type self.subtype_key = subtype_key self.schema_registry = schema_registry if schema_subtypes is None: schema_subtypes = {} self.schema_subtypes = schema_subtypes if schema_modules is None: schema_modules = {} self.schema_modules = schema_modules self.default_subtype = None self.module = module super(ComponentSchema, self).__init__()
[docs] def identify_subtype(self, doc): r"""Identify the subtype associated with a document by validating it against the schemas for the different subtypes. Args: doc (dict): JSON object that conforms to one of the component subtypes. Returns: str: Name of the subtype that valdiates the provided document. """ for subtype in self.subtypes: subtype_schema = self.get_subtype_schema(subtype) try: rapidjson.validate(doc, subtype_schema) return subtype except rapidjson.ValidationError: pass raise ValueError("Could not determine subtype " "for document: %s" % doc) # pragma: debug
[docs] def get_base_schema(self): r"""Get a base schema containing properties that are the same for all component subtypes.""" if self._base_schema is not None: return copy.deepcopy(self._base_schema) assert self._base_name in self._storage self._base_schema = dict( copy.deepcopy(self._storage[self._base_name]), title=f"{self.schema_type}_base", description=(f"Base schema for all subtypes of " f"{self.schema_type} components."), additionalProperties=True) if self._base_kwargs: self._base_schema.update(self._base_kwargs) if self.subtype_key in self._base_schema.get('required', []): self._base_schema['required'].remove(self.subtype_key) if not self._base_schema['required']: self._base_schema.pop('required') if self.default_subtype is None: self._base_schema['properties'][self.subtype_key].pop( 'default', None) else: self._base_schema['properties'][ self.subtype_key]['default'] = self.default_subtype if self._base_subtype_description: self._base_schema['properties'][self.subtype_key][ 'description'] = self._base_subtype_description driver_list = [] for v in self._storage.values(): for k in v['properties'].get('driver', {}).get('enum', []): if k and k not in driver_list: driver_list.append(k) if driver_list: self.add_legacy_properties( self._base_schema, driver_list=driver_list, base=True) if 'required' in self._base_schema: self._base_schema['required'] = set( self._base_schema['required']) for v in self._storage.values(): self._base_schema['required'] &= set(v.get('required', [])) self._base_schema['required'] = sorted( list(self._base_schema['required'])) if not self._base_schema['required']: # pragma: no cover del self._base_schema['required'] # Update base schema, checking for compatiblity prop_overlap = set(self._base_schema['properties'].keys()) for v in self._storage.values(): prop_overlap &= set(v['properties'].keys()) # Force subtype keys to be included prop_overlap.add(self.subtype_key) new_base_prop = {k: self._base_schema['properties'][k] for k in prop_overlap} for v in self._storage.values(): for k in prop_overlap: old = new_base_prop[k] new = v['properties'][k] # Don't compare descriptions or properties defining the # subtype (like the subtype key or driver) if k not in [self.subtype_key, 'driver']: if not self.compare_body(old, new): # pragma: debug raise ValueError( f"Schema for property '{k}' of class" f" '{v['title']}'" f" is {new}, which differs from the existing" f" base class value ({old}). Check that" f" another class dosn't have a conflicting" f" definition of the same property.") if ((k in self._base_schema.get('required', []) and not self.compare_body(old, new, only_keys=['default']))): self.required_by_subtype.append(k) old.pop('default', None) # Assign original copy that includes description if ((k in [self.subtype_key, 'driver'] and ('enum' in old or 'enum' in new))): old['enum'] = sorted([ x for x in (set(old.get('enum', [])) | set(new.get('enum', []))) if x]) self._base_schema['properties'] = new_base_prop return copy.deepcopy(self._base_schema)
[docs] def get_subtype_schema(self, subtype, unique=False, relaxed=False, allow_instance=False, for_form=False, partnered=False): r"""Get the schema for the specified subtype. Args: subtype (str): Component subtype to return schema for. If 'base', the schema for evaluating the component base will be returned. unique (bool, optional): If True, the returned schema will only contain properties that are specific to the specified subtype. If subtype is 'base', these will be properties that are valid for all of the registerd subtypes. Defaults to False. relaxed (bool, optional): If True, the schema will allow additional properties. Defaults to False. allow_instance (bool, optional): If True, the returned schema will validate instances of this component in addition to documents describing a component. Defaults to False. for_form (bool, optional): If True, the returned schema will be formatted for easy parsing by form generation tools. Defaults to False. Causes relaxed and allow_instance to be ignored. Returns: dict: Schema for specified subtype. """ if for_form: relaxed = False allow_instance = False partnered = False if subtype == 'base': out = self.get_base_schema() # Add additional properties that apply to specific subtypes if not unique: if self.default_subtype: out['properties'][self.subtype_key]['default'] = self.default_subtype if ((for_form and all(self.subtype_key in x.get('required', []) for x in self._storage.values()))): out.setdefault('required', []) out['required'].append(self.subtype_key) out['additionalProperties'] = False for x in self._storage.values(): for k, v in x['properties'].items(): if (k != self.subtype_key): if (k not in out['properties']): out['properties'][k] = copy.deepcopy(v) if for_form: out['properties'][k]['options'] = { 'dependencies': {self.subtype_key: []}} if for_form and ('options' in out['properties'][k]): out['properties'][k]['options']['dependencies'][ self.subtype_key] += ( x['properties'][self.subtype_key]['enum']) if ((partnered and self.required_by_subtype and out.get('required', []))): for k in self.required_by_subtype: if k in out['required']: out['required'].remove(k) assert out['required'] # if not out['required']: # del out['required'] else: # pragma: debug assert (relaxed or for_form) # elif not (relaxed or for_form): # # Add place holders # for x in self._storage.values(): # for k, v in x['properties'].items(): # self._add_placeholder(out['properties'], k, v) else: if subtype not in self._storage: s2c = self.subtype2class if subtype in s2c: subtype = s2c[subtype] out = copy.deepcopy(self._storage[subtype]) if subtype in self._no_inherit_kwargs: out.update(self._no_inherit_kwargs[subtype]) base = self.get_base_schema() # Remove properties that apply to all subtypes if unique: skip_props = [self.subtype_key, 'driver'] skip_props += self.required_by_subtype out['additionalProperties'] = True if 'required' in out: out['required'] = sorted( list(set(out['required']) - (set(base.get('required', [])) - set(skip_props)))) if not out['required']: del out['required'] if isinstance(base.get('allowSingular', False), str): skip_props.append(base['allowSingular']) for k, v in base['properties'].items(): if ((k not in skip_props and k in out['properties'] and k not in out.get('required', []))): del out['properties'][k] assert relaxed # if not relaxed: # self._add_placeholder(out['properties'], k, v) if not out['properties']: # pragma: no cover del out['properties'] # TODO: Remove pushProperties/pullProperties based on # keys assert not (partnered and self._base_kwargs) # if partnered and self._base_kwargs: # for k in self._base_kwargs: # if k in ['pushProperties', 'pullProperties']: # out.pop(k, None) out['additionalProperties'] = relaxed if allow_instance: if subtype == 'base': comp_cls = self.base_subtype_class else: from yggdrasil.components import import_component comp_cls = import_component( self.schema_type, subtype=subtype) out = {'oneOf': [out, {'type': 'instance', 'class': comp_cls}]} return out
[docs] def get_subtype_order(self): r"""Get the order that subtypes should be in the schema, with the base subtype first. Returns: list: Subtypes in order. """ baseSchemaKey = self.default_subtype order = sorted(self._storage.keys()) if baseSchemaKey is not None: if baseSchemaKey not in order: baseSchemaKey = self.subtype2class.get(baseSchemaKey, baseSchemaKey) if baseSchemaKey in order: order.remove(baseSchemaKey) order.insert(0, baseSchemaKey) return order
@classmethod def _subtype_defkey(cls, schema_type, subtype): return f"{schema_type}-subtype-{subtype}".replace('+', 'p') # @classmethod # def _is_placeholder(cls, x): # return (x is True) # @classmethod # def _add_placeholder(cls, props, k, v): # if k not in props: # props[k] = {} # if 'aliases' in v: # for alias in v['aliases']: # props[alias] = {}
[docs] def get_subtype_definition_ref(self, subtype): r"""Get the address for a subtype's schema definition that can be used in references. Args: subtype (str): Component subtype. Returns: str: Definition address. """ if subtype in self.class2subtype: subtype = self.class2subtype[subtype][0] key = self._subtype_defkey(self.schema_type, subtype) return f"#/definitions/{key}"
[docs] def get_subtype_definitions(self, unique='both', relaxed=False, **kwargs): r"""Get subtype definitions. Args: **kwargs: Additional keyword arguments are passed to get_subtype_schema calls for each subtype. Returns: dict: Dictionary of subtype definitions. """ order = self.get_subtype_order() out = {} unique_base = (unique in ['base', 'both']) unique_subt = (unique in ['subtype', 'both']) relaxed_base = True relaxed_subt = True partnered_base = (unique in 'subtype') partnered_subt = (unique in 'base') if not relaxed: assert unique in ['subtype', 'base', 'both'] if unique in ['base', 'both']: relaxed_base = True relaxed_subt = False elif unique in ['subtype']: relaxed_base = False relaxed_subt = True else: # pragma: completion relaxed_base = False relaxed_subt = False base = self.get_subtype_schema('base', unique=unique_base, partnered=partnered_base, relaxed=relaxed_base, **kwargs) out[self._subtype_defkey(self.schema_type, "base")] = base for subC in order: subT = self.class2subtype[subC][0] x = self.get_subtype_schema(subC, unique=unique_subt, partnered=partnered_subt, relaxed=relaxed_subt, **kwargs) out[self._subtype_defkey(self.schema_type, subT)] = x if unique_subt: # Only remove these if they will be in the base for k in ['pushProperties', 'pullProperties']: if k in base: x.pop(k, None) else: # TODO: Unclear what this should be if properties added # via relaxed for k in ['pushProperties', 'pullProperties']: if k in base: x.setdefault(k, {}) x[k].update(base[k]) return out
[docs] def get_driver_definition(self, allow_driver=False): r"""Get a definition for a driver schema. Args: allow_driver (bool, optional): If not True, the outputs will be a null schema. Defaults to False. Returns: dict: Driver schema. """ if allow_driver: out = {"allOf": [ {"$ref": f"#/definitions/{self.schema_type}"}, {"required": ["driver", "args"], "properties": { 'driver': {'type': 'string', 'deprecated': True, 'description': ( '[DEPRECATED] Name of driver ' 'class that should be used.')}, 'args': {'type': 'string', 'deprecated': True, 'description': ( '[DEPRECATED] Arguments that should ' 'be provided to the driver.')}}} ]} if self.schema_type == 'file': out['allOf'][1]['required'].append('working_dir') out['allOf'][1]['properties']['working_dir'] = { 'type': 'string'} else: out = {'type': 'null'} return out
[docs] def get_definitions(self, for_form=False, allow_instance=False, allow_driver=False, full=False, relaxed=False, **kwargs): r"""Get the set of definitions defining this component type. Args: for_form (bool, optional): If True, the returned schema will be formatted for easy parsing by form generation tools. Defaults to False. Causes relaxed, allow_instance, and full to be ignored. allow_instance (bool, optional): If True, the returned schema will validate instances of this component in addition to documents describing a component. Defaults to False. allow_driver (bool, optional): If True, the returned definitions will include the deprecated driver based schemas. Defaults to False. full (bool, optional): If True, properties are made explicit for each component subtype resulting in a larger schema that has a simplified logic for parsing. If False, the schema will be compressed such that properties occuring across all component subtypes will only occur once in the base schema. Defaults to False. **kwargs: Additional keyword arguments are passed to get_subtype_definitions calls. Returns: dict: Set of component definitions. """ combo = self.get_schema(for_form=for_form, allow_instance=allow_instance) out = {self.schema_type: combo} if full: out.update(self.get_subtype_definitions( unique='base', relaxed=relaxed, **kwargs)) else: out.update(self.get_subtype_definitions( unique='subtype', relaxed=relaxed, **kwargs)) if ((self.schema_type in ['connection', 'comm', 'file', 'model'] and not for_form)): out[f"{self.schema_type}_driver"] = self.get_driver_definition( allow_driver=allow_driver) return out
[docs] def get_schema(self, for_form=False, allow_instance=False): r"""Get the schema defining this component. Args: for_form (bool, optional): If True, the returned schema will be formatted for easy parsing by form generation tools. Defaults to False. Causes relaxed, allow_instance, and full to be ignored. allow_instance (bool, optional): If True, the returned schema will validate instances of this component in addition to documents describing a component. Defaults to False. Returns: dict: Schema for this component. """ combo = { 'description': f'Schema for {self.schema_type} components.', 'title': (f"complete-{self.schema_type}-{self.subtype_key}")} # f"-{self.default_subtype}")} if for_form: combo.update(self.get_subtype_schema('base', for_form=True)) return combo order = self.get_subtype_order() combo['allOf'] = [ {'$ref': self.get_subtype_definition_ref("base")}, {'anyOf': [{'$ref': self.get_subtype_definition_ref(x)} for x in order]}] # This does allow for properties to be removed via push/pull # but is more elegant than duplicating properties in the base # schema and so is preservered # if not relaxed: # all_prop = set() # for x in self._storage.values(): # all_prop.update(set(x['properties'].keys())) # for v in x['properties'].values(): # all_prop.update(set(v.get('aliases', []))) # combo['allOf'].append( # {'type': 'object', # 'additionalProperties': False, # 'properties': {k: {} for k in all_prop}}) if allow_instance: combo['oneOf'] = [{'allOf': combo.pop('allOf')}, {'type': 'instance', 'class': self.base_subtype_class}] return combo
# def set_required_by_subtype(self, props): # r"""Update schema so that specified properties are required at # the subtype level instead of in the base schema to allow # subtypes to specify defaults. # Args: # props (list): List of properties to require by subtype. # """ # if not props: # return # for x in self._storage.values(): # x.setdefault('required', []) # x.setdefault('properties', {}) # for k in props: # if k not in x['required']: # x['required'].append(k) # if k not in x['properties']: # x['properties'][k] = copy.deepcopy( # self._base_schema['properties'][k]) # if self._base_schema.get('required', []): # for k in props: # if k in self._base_schema['required']: # self._base_schema['required'].remove(k) # if not self._base_schema['required']: # del self._base_schema['required']
[docs] @classmethod def from_definitions(cls, schema, defs, schema_registry=None): r"""Construct a ComponentSchema from a schema. Args: schema (dict): Schema. Returns: ComponentSchema: Schema with information from schema. """ _, schema_type, subtype_key = schema['title'].split('-') subt_base = defs[cls._subtype_defkey(schema_type, "base")] subt_default = subt_base['properties'][subtype_key].get('default', None) subt_description = subt_base['properties'][subtype_key].get('description', '') subt_schema = [defs[x['$ref'].split('#/definitions/')[-1]] for x in schema['allOf'][1]['anyOf']] # Initialize schema out = cls(schema_type, subtype_key, schema_registry=schema_registry) out.default_subtype = subt_default # out._base_schema = subt_base prop_base = list(subt_base['properties'].keys()) for v in subt_schema: for k in v['properties'].keys(): if k in prop_base: prop_base.remove(k) prop_base += ['driver', subtype_key] base_unique = {k: v for k, v in subt_base['properties'].items() if k in prop_base} kwargs_base = {k: subt_base[k] for k in ['pushProperties', 'pullProperties'] if k in subt_base} for v in subt_schema: v_module_name, v_class_name = v['title'].split('.')[-2:] v_new = copy.deepcopy(v) v_new['required'] = sorted(list( set(v.get('required', [])) | set(subt_base.get('required', [])))) if not v_new['required']: del v_new['required'] v_new['properties'] = copy.deepcopy(base_unique) v_new['properties'].update(v['properties']) # TODO: Compare with base kwargs_no_inherit = {k: v_new.pop(k) for k in ['pushProperties', 'pullProperties'] if k in v_new and k not in subt_base} out.append_schema(v_class_name, v_new, description=subt_description, default=subt_default, kwargs_base=kwargs_base, kwargs_no_inherit=kwargs_no_inherit) subtypes = v['properties'][out.subtype_key]['enum'] out.schema_subtypes[v_class_name] = subtypes out.schema_modules[v_class_name] = v_module_name v_module = '.'.join(v['title'].split('.')[:-2]) if out.module is None: out.module = v_module else: assert v_module == out.module # Remove placeholder properties in base and subtypes # for x in list(out._storage.values()): # for k in list(x['properties'].keys()): # if cls._is_placeholder(x['properties'][k]): # del x['properties'][k] return out
[docs] @classmethod def from_registry(cls, schema_type, registry, **kwargs): r"""Construct a ComponentSchema from a registry entry. Args: schema_type (str): Name of component type to build. registry (dict): Registry information for the component. **kwargs: Additional keyword arguments are passed to the class __init__ method. Returns: ComponentSchema: Schema with information from classes. """ schema_subtypes = {} for k, v in registry['subtypes'].items(): if v not in schema_subtypes: schema_subtypes[v] = [] schema_subtypes[v].append(k) schema_modules = {} for k, v in registry.get('subtype_modules', {}).items(): if v not in schema_modules: schema_modules[v] = [] schema_modules[v].append(k) kwargs.update(module=registry['module'], schema_subtypes=schema_subtypes, schema_modules=schema_modules) out = cls(schema_type, registry['key'], **kwargs) for x in registry['classes'].values(): out.append_class(x, verify=True) return out
# @property # def properties(self): # r"""list: Valid properties for this component.""" # return sorted(list(self.get_subtype_schema('base')['properties'].keys()))
[docs] def get_subtype_properties(self, subtype): r"""Get the valid properties for a specific subtype. Args: subtype (str): Name of the subtype to get keys for. Returns: list: Valid properties for the specified subtype. """ return sorted(list(self.get_subtype_schema(subtype)['properties'].keys()))
@property def class2subtype(self): r"""dict: Mapping from class to list of subtypes.""" return self.schema_subtypes @property def subtype2class(self): r"""dict: Mapping from subtype to class.""" out = {} for k, v in self.schema_subtypes.items(): for iv in v: out[iv] = k return out @property def subtype2module(self): r"""dict: Mapping from subtype to module.""" out = {} for k, v in self.schema_subtypes.items(): for iv in v: out[iv] = self.schema_modules[k] return out @property def base_subtype_class_name(self): r"""str: Name of base class for the subtype.""" if not getattr(self, '_base_subtype_class_name', None): self.base_subtype_class return self._base_subtype_class_name @property def base_subtype_class(self): r"""ComponentClass: Base class for the subtype.""" if not getattr(self, '_base_subtype_class', None): default_class = list(self.schema_subtypes.keys())[0] cls = getattr( importlib.import_module(f"{self.module}.{default_class}"), default_class) base_class = cls for i, x in enumerate(cls.__mro__): if x._schema_type != cls._schema_type: break base_class = x else: # pragma: debug raise RuntimeError( f"Could not determine a base class for " f"{self.schema_type} (using class {cls})") self._base_subtype_class = base_class self._base_subtype_class_name = base_class.__name__ return self._base_subtype_class @property def subtypes(self): r"""list: All subtypes for this schema type.""" out = [] for v in self.schema_subtypes.values(): out += v return sorted(list(set(out))) @property def classes(self): r"""list: All available classes for this schema.""" return sorted([k for k in self.schema_subtypes.keys()])
[docs] @classmethod def compare_body(cls, a, b, only_keys=None, ignore_keys=['description', 'default', 'maxItems', 'minItems']): r"""Compare two schemas, ignoring some keys. Args: a (dict): First schema for comparison. b (dict): Second schema for comparison. only_keys (list, optional): Keys to compare. ignore_keys (list, optional): Keys to ignore. Returns: bool: True if the schemas are equivalent, False otherwise. """ if only_keys: for k in only_keys: if a.get(k, None) != b.get(k, None): return False return True a_cpy = copy.deepcopy(a) b_cpy = copy.deepcopy(b) for k in ignore_keys: a_cpy.pop(k, None) b_cpy.pop(k, None) return a_cpy == b_cpy
[docs] @classmethod def add_legacy_properties(cls, new_schema, driver_list=None, base=False): r"""Add driver/args legacy properties to the schema. Args: new_schema (dict): Schema to add properties to. driver_list (list, optional): Drivers that are valid for the schema. """ legacy_properties = { 'driver': {'type': 'string', 'deprecated': True, 'enum': driver_list, 'description': ( '[DEPRECATED] Name of driver ' 'class that should be used.')}, 'args': {'type': 'string', 'deprecated': True, 'description': ( '[DEPRECATED] Arguments that should ' 'be provided to the driver.')}} if not driver_list: legacy_properties.pop('args') legacy_properties['driver']['enum'] = [''] for k, v in legacy_properties.items(): if k not in new_schema['properties']: new_schema['properties'][k] = v if 'dependencies' not in new_schema and base: new_schema['dependencies'] = {'driver': ['args']}
[docs] def append_schema(self, name, new_schema, verify=False, description=None, default=None, kwargs_base=None, kwargs_no_inherit=None): r"""Append component schema to the schema. Args: name (str): Name to store the schema under. new_schema (dict): New schema to store. verify (bool, optional): If True, verify the schema after adding the schema. Defaults to False. description (str, optional): Description of the base subtype property. default (str, optional): Default subtype for base. kwargs_base (dict, optional): Keyword arguments added to the base schema and removed from unique subtype schemas. kwargs_no_inherit (dict, optional): Keyword arguments not inherited by other subtypes or the base schema. """ # Set properties for creating base schema or that would be # present in base schema if not in every subschema # if kwargs_base: # new_schema.update(kwargs_base) if kwargs_no_inherit: # TODO: Remove these from base? self._no_inherit_kwargs[name] = kwargs_no_inherit # new_schema.update(kwargs_no_inherit) self._storage[name] = copy.deepcopy(new_schema) if self._base_name is None: self._base_name = name self._base_kwargs = kwargs_base self._base_subtype_description = description self.default_subtype = default # Verify that the schema is valid if verify: rapidjson.Normalizer.check_schema(self.get_schema()) self._base_schema = None # Reset so that it will be regen
[docs] def append_class(self, comp_cls, verify=False): r"""Append component class to the schema. Args: comp_cls (class): Component class that should be added. verify (bool, optional): If True, verify the schema after adding the component class. Defaults to False. """ assert comp_cls._schema_type == self.schema_type assert comp_cls._schema_subtype_key == self.subtype_key name = comp_cls.__name__ fullname = f'{comp_cls.__module__}.{comp_cls.__name__}' subtype_module = '.'.join(comp_cls.__module__.split('.')[:-1]) # Append subtype subtype_list = copy.deepcopy( getattr(comp_cls, f'_{self.subtype_key}', None)) if not isinstance(subtype_list, list): subtype_list = [subtype_list] subtype_list += getattr(comp_cls, '_%s_aliases' % self.subtype_key, []) driver_list = [] driver_list += getattr(comp_cls, '_deprecated_drivers', []) if name.endswith(('Driver', 'Model')): driver_list.append(name) self.schema_subtypes[name] = subtype_list self.schema_modules[name] = comp_cls.__module__.split('.')[-1] assert subtype_module == self.module # Create new schema for subtype new_schema = {'title': fullname, 'description': ('Schema for %s component %s subtype.' % (self.schema_type, subtype_list)), 'type': 'object', 'required': copy.deepcopy(comp_cls._schema_required), 'properties': copy.deepcopy(comp_cls._schema_properties), 'additionalProperties': False} new_schema.update(comp_cls._schema_additional_kwargs) if not new_schema['required']: del new_schema['required'] new_schema['properties'].setdefault(self.subtype_key, {}) new_schema['properties'][self.subtype_key]['enum'] = subtype_list new_schema['properties'][self.subtype_key].setdefault('type', 'string') # Add legacy properties prev_drivers = ( self._base_name and 'driver' in self._storage[self._base_name]['properties']) if driver_list or prev_drivers: self.add_legacy_properties(new_schema, driver_list) if driver_list and not prev_drivers: for v in self._storage.values(): self.add_legacy_properties(v) # Add description/default for subtype to subtype property after # recording base to prevent overwriting of the property rather # than the property value. base_description = new_schema['properties'][self.subtype_key].get( 'description', '') base_default = new_schema['properties'][self.subtype_key].get( 'default', None) if comp_cls._schema_subtype_description is not None: new_schema['properties'][self.subtype_key]['description'] = ( comp_cls._schema_subtype_description) if comp_cls._schema_no_default_subtype: new_schema['properties'][self.subtype_key].pop('default', None) elif subtype_list: new_schema['properties'][self.subtype_key]['default'] = subtype_list[0] kws = dict( kwargs_base=comp_cls._schema_additional_kwargs_base, kwargs_no_inherit=comp_cls._schema_additional_kwargs_no_inherit, description=base_description, default=base_default) self.append_schema(name, new_schema, verify=verify, **kws)
[docs]class SchemaRegistry(object): r"""Registry of schema's for different integration components. Args: registry (dict, optional): Dictionary of registered components. Defaults to None and the registry will be empty. required (list, optional): Components that are required. Defaults to ['comm', 'file', 'model', 'connection']. Ignored if registry is None. Raises: ValueError: If registry is provided and one of the required components is missing. """ _normalizers = {} _default_required_components = ['comm', 'file', 'model', 'connection'] def __init__(self, registry=None, required=None): super(SchemaRegistry, self).__init__() self._cache = {} self._storage = SchemaDict() if required is None: required = self._default_required_components self.required_components = required if registry is not None: for k in required: if k not in registry: raise ValueError("Component %s required." % k) # Create schemas for each component for k, v in registry.items(): icomp = ComponentSchema.from_registry(k, v, schema_registry=self) self.add(k, icomp, verify=True)
[docs] def add(self, k, v, verify=False): r"""Add a new component schema to the registry.""" self._cache = {} self._storage[k] = v if verify: rapidjson.Normalizer.check_schema(self.get_schema())
[docs] def get(self, k, *args, **kwargs): r"""Return a component schema from the registry.""" return self._storage.get(k, *args, **kwargs)
[docs] def get_definitions(self, relaxed=False, allow_instance=False, for_form=False, dont_copy=False, full=False, allow_driver=False): r"""Get schema definitions for the registered components. Args: relaxed (bool, optional): If True, the returned schema (and any definitions it includes) are relaxed to allow for objects with objects with additional properties to pass validation. Defaults to False. allow_instance (bool, optional): If True, the returned definitions will validate instances of the components in addition to documents describing components. Defaults to False. for_form (bool, optional): If True, the returned schema will be formatted for easy parsing by form generation tools. Defaults to False. Causes relaxed, allow_instance, and full to be ignored. dont_copy (bool, optional): If True, a the cached definitions are returned without copying. Defaults to False. full (bool, optional): If True, properties are made explicit for each component subtype resulting in a larger schema that has a simplified logic for parsing. If False, the schema will be compressed such that properties occuring across all component subtypes will only occur once in the base schema. Defaults to False. allow_driver (bool, optional): If True, the returned definitions will include the deprecated driver based schemas. Defaults to False. Returns: dict: Schema defintiions for each of the registered components. """ cache_key = 'definitions' if for_form: cache_key += '_form' full = False relaxed = False allow_instance = False allow_driver = False if relaxed: cache_key += '_relaxed' if allow_instance: cache_key += '_instance' if full: cache_key += '_full' if allow_driver: cache_key += '_driver' if cache_key not in self._cache: out = {} for k, v in self._storage.items(): out.update(v.get_definitions(for_form=for_form, relaxed=relaxed, allow_instance=allow_instance, allow_driver=allow_driver, full=full)) for k in self.required_components: out.setdefault(k, {'type': 'string'}) self._cache[cache_key] = out out = self._cache[cache_key] if not dont_copy: out = copy.deepcopy(out) return out
[docs] def get_schema(self, relaxed=False, allow_instance=False, for_form=False, partial=False, full=False, allow_driver=True): r"""Get the schema defining this component. Args: relaxed (bool, optional): If True, the returned schema (and any definitions it includes) are relaxed to allow for objects with objects with additional properties to pass validation. Defaults to False. allow_instance (bool, optional): If True, the returned schema will validate instances of this component in addition to documents describing a component. Defaults to False. for_form (bool, optional): If True, the returned schema will be formatted for easy parsing by form generation tools. Defaults to False. Causes relaxed, allow_instance, and full to be ignored. partial (bool, optional): If True, the schema for individual yaml files will be used which allows for partial integration specification. Defaults to False. full (bool, optional): If True, properties are made explicit for each component subtype resulting in a larger schema that has a simplified logic for parsing. If False, the schema will be compressed such that properties occuring across all component subtypes will only occur once in the base schema. Defaults to False. allow_driver (bool, optional): If True, the returned schema will include the deprecated driver based schemas. Defaults to True. Returns: dict: Schema for this component. """ cache_key = 'schema' if for_form: cache_key += '_form' full = False relaxed = False allow_instance = False allow_driver = False if relaxed: cache_key += '_relaxed' if allow_instance: cache_key += '_instance' if partial: cache_key += '_partial' if full: cache_key += '_full' if allow_driver: cache_key += '_driver' if cache_key not in self._cache: if allow_driver: model_ref = {'anyOf': [{'$ref': '#/definitions/model'}, {'$ref': '#/definitions/model_driver'}]} conn_ref = {'anyOf': [{'$ref': '#/definitions/connection'}, {'$ref': '#/definitions/connection_driver'}]} else: model_ref = {'$ref': '#/definitions/model'} conn_ref = {'$ref': '#/definitions/connection'} out = {'title': 'YAML Schema', 'description': 'Schema for yggdrasil YAML input files.', 'type': 'object', 'definitions': self.get_definitions( relaxed=relaxed, allow_instance=allow_instance, for_form=for_form, dont_copy=True, full=full, allow_driver=allow_driver), 'required': ['models', 'connections'], 'additionalProperties': False, 'properties': SchemaDict( [('models', {'type': 'array', 'items': model_ref, 'minItems': 1, 'aliases': ['model']}), ('connections', {'type': 'array', 'items': conn_ref, 'aliases': ['connection'], 'default': []})])} if not for_form: out['properties']['working_dir'] = {'type': 'string'} out['pushProperties'] = { '$properties/connections/items': ['working_dir'], '$properties/models/items': ['working_dir']} out['properties']['models']['allowSingular'] = True out['properties']['connections']['allowSingular'] = True if partial: out.pop('additionalProperties') out['properties']['models'].pop('minItems') out['properties']['models']['default'] = [] self._cache[cache_key] = out return copy.deepcopy(self._cache[cache_key])
@property def definitions(self): r"""dict: Schema definitions for different components.""" return self.get_definitions() @property def schema(self): r"""dict: Schema for evaluating YAML input file.""" return self.get_schema() @property def form_schema(self): r"""dict: Schema for generating a YAML form.""" out = self.get_schema(for_form=True) out['definitions']['schema'] = copy.deepcopy(rapidjson.get_metaschema()) out = convert_extended2base(out) return out @property def model_form_schema_props(self): r"""dict: Information about how properties should be modified for the model form schema.""" prop = { # 'add': {}, 'replace': { 'comm': { 'transform': { "type": "array", "items": {"$ref": "#/definitions/transform"}}, 'default_file': { '$ref': '#/definitions/file'}}, 'file': { 'serializer': { '$ref': '#/definitions/serializer'}}}, 'required': { 'model': ['args', 'inputs', 'outputs', 'description', 'repository_url', 'repository_commit']}, 'remove': { 'comm': ['is_default', 'length_map', 'serializer', 'address', 'dont_copy', 'for_service', 'client_id', 'cookies', 'host', 'params', 'port', 'commtype'], 'ocomm': ['default_value'], 'file': ['is_default', 'length_map', 'wait_for_creation', 'working_dir', 'read_meth', 'in_temp', 'serializer', 'datatype', 'address', 'dont_copy', 'for_service', 'client_id', 'cookies', 'host', 'params', 'port'], 'model': ['client_of', 'is_server', 'preserve_cache', 'products', 'source_products', 'working_dir', 'overwrite', 'skip_interpreter', 'copies', 'timesync', 'with_strace', 'with_valgrind', 'valgrind_flags', 'with_debugger', 'copies', 'logging_level', 'additional_variables', 'aggregation', 'interpolation', 'synonyms', 'driver']}, 'order': { 'model': ['name', 'repository_url', 'repository_commit', 'contact_email', 'language', 'description', 'args', 'inputs', 'outputs'], 'comm': ['name', 'datatype']}, 'update': { 'model': { 'inputs': { 'description': ('Zero or more channels carrying ' 'input to the model'), 'items': {'$ref': '#/definitions/icomm'}}, 'outputs': { 'description': ('Zero or more channels carrying ' 'output from the model'), 'items': {'$ref': '#/definitions/ocomm'}}, 'repository_commit': { 'description': ('Commit that should be checked out ' 'from the model repository.')}, 'args': {'minItems': 1}}, 'file': { 'name': { 'description': ('Path to a file in the model ' 'repository')}}}, } return prop @property def model_form_schema(self): r"""dict: Schema for generating a model YAML form.""" from yggdrasil import constants out = self.get_schema(for_form=True) scalar_types = list(constants.VALID_TYPES.keys()) meta = copy.deepcopy(rapidjson.get_metaschema()) meta_prop = { 'subtype': ['1darray', 'ndarray'], 'units': ['1darray', 'ndarray'] + scalar_types, 'precision': ['1darray', 'ndarray'] + scalar_types, 'length': ['1darray'], 'shape': ['ndarray']} out['definitions']['simpleTypes'] = meta['definitions']['simpleTypes'] out['definitions']['simpleTypes'].update(type='string', default='bytes') out['definitions']['simpleTypes']['enum'].remove('scalar') out['definitions']['schema'] = {'type': 'object', 'required': ['type'], 'properties': {}} out['definitions']['schema']['properties']['type'] = { '$ref': '#/definitions/simpleTypes'} for k, types in meta_prop.items(): out['definitions']['schema']['properties'][k] = meta['properties'][k] if types: out['definitions']['schema']['properties'][k]['options'] = { 'dependencies': {'type': types}} for k in ['comm', 'file', 'model']: out['definitions'][k].pop('description', '') for k in out['definitions'].keys(): if k in ['schema', 'simpleTypes']: continue out['definitions'][k].pop('title', None) if ((('required' in out['definitions'][k]) and ('working_dir' in out['definitions'][k]['required']))): out['definitions'][k]['required'].remove('working_dir') for p, v in list(out['definitions'][k]['properties'].items()): if v.get('description', '').startswith('[DEPRECATED]'): out['definitions'][k]['properties'].pop(p) # Process based on model_form_schema_props prop = self.model_form_schema_props def adjust_definitions(k): # Remove for p in prop['remove'].get(k, []): out['definitions'][k]['properties'].pop(p, None) if p in out['definitions'][k].get('required', []): out['definitions'][k]['required'].remove(p) # Replace for r, v in prop['replace'].get(k, {}).items(): if 'description' in out['definitions'][k]['properties'].get(r, {}): v['description'] = ( out['definitions'][k]['properties'][r]['description']) out['definitions'][k]['properties'][r] = v # Required out['definitions'][k].setdefault('required', []) for p in prop['required'].get(k, []): if p not in out['definitions'][k]['required']: out['definitions'][k]['required'].append(p) # Update for p, new in prop['update'].get(k, {}).items(): out['definitions'][k]['properties'][p].update(new) # Add # out['definitions'][k]['properties'].update( # prop['add'].get(k, {})) # Order for i, p in enumerate(prop['order'].get(k, [])): out['definitions'][k]['properties'][p]['propertyOrder'] = i out['definitions'][k].pop('allowSingular', None) out['definitions'][k].pop('pushProperties', None) out['definitions'][k].pop('pullProperties', None) # Update definitions for k in ['model', 'comm', 'file']: adjust_definitions(k) for k in ['icomm', 'ocomm']: out['definitions'][k] = copy.deepcopy(out['definitions']['comm']) adjust_definitions(k) out['definitions']['icomm']['oneOf'] = [ {'title': 'default file', 'required': ['default_file'], 'not': {'required': ['default_value']}}, {'title': 'default value', 'required': ['default_value'], 'not': {'required': ['default_file']}}] # Adjust formating for x in [out] + list(out['definitions'].values()): for p, v in x.get('properties', {}).items(): if v.get("type", None) == "boolean": v.setdefault("format", "checkbox") # Isolate model out.update(out['definitions'].pop('model')) out['definitions'].pop('connection') out.update( title='Model YAML Schema', description='Schema for yggdrasil model YAML input files.') out = convert_extended2base(out) return out def __getitem__(self, k): return self.get(k)
[docs] def keys(self): return self._storage.keys()
[docs] def items(self): return self._storage.items()
def __eq__(self, other): if not hasattr(other, 'get_schema'): return False return (self.get_schema() == other.get_schema())
[docs] @classmethod def from_file(cls, fname): r"""Create a SchemaRegistry from a file. Args: fname (str): Full path to the file the schema should be loaded from. """ out = cls() out.load(fname) return out
[docs] def load(self, fname): r"""Load schema from a file. Args: fname (str): Full path to the file the schema should be loaded from. """ with open(fname, 'r') as f: contents = f.read() schema = ordered_load(contents, Loader=yaml.SafeLoader) if schema is None: raise Exception("Failed to load schema from %s" % fname) # Create components for k, v in schema.get('definitions', {}).items(): # if k.endswith('_driver'): # continue if not v.get('title', '').startswith('complete-'): continue icomp = ComponentSchema.from_definitions( v, schema['definitions'], schema_registry=self) self.add(k, icomp)
[docs] def save(self, fname, schema=None): r"""Save the schema to a file. Args: fname (str): Full path to the file the schema should be saved to. schema (dict, optional): yggdrasil YAML options. """ if schema is None: schema = self.get_schema() with open(fname, 'w') as f: ordered_dump(schema, stream=f, Dumper=yaml.SafeDumper)
# def validate(self, obj, normalize=False, **kwargs): # r"""Validate an object against this schema. # Args: # obj (object): Object to valdiate. # **kwargs: Additional keyword arguments are passed to get_schema. # """ # if normalize: # return self.normalize(obj, **kwargs) # # TODO: Check schema? # return rapidjson.validate(obj, self.get_schema(**kwargs))
[docs] def validate_model_submission(self, obj): r"""Validate an object against the schema for models submitted to the yggdrasil model repository. Args: obj (object): Object to validate. """ rapidjson.validate(obj, self.model_form_schema) return obj
[docs] def validate_component(self, comp_name, obj, **kwargs): r"""Validate an object against a specific component. Args: comp_name (str): Name of the component to validate against. obj (object): Object to validate. **kwargs: Additional keyword arguments are passed to get_component_schema. """ comp_schema = self.get_component_schema(comp_name, **kwargs) return rapidjson.validate(obj, comp_schema)
[docs] def normalize(self, obj, norm_kws=None, **kwargs): r"""Normalize an object against this schema. Args: obj (object): Object to normalize. norm_kws (dict, optional): Keyword arguments that should be passed to rapidjson.normalize. Defaults to {}. **kwargs: Additional keyword arguments are passed to get_schema. Returns: object: Normalized object. """ # TODO: Check schema? if norm_kws is None: norm_kws = {} return rapidjson.normalize(obj, self.get_schema(**kwargs), **norm_kws)
# def is_valid(self, obj): # r"""Determine if an object is valid under this schema. # Args: # obj (object): Object to valdiate. # Returns: # bool: True if the object is valid, False otherwise. # """ # try: # self.validate(obj) # except rapidjson.ValidationError: # return False # return True # def is_valid_component(self, comp_name, obj): # r"""Determine if an object is a valid represenation of a component. # Args: # comp_name (str): Name of the component to validate against. # obj (object): Object to validate. # Returns: # bool: True if the object is valid, False otherwise. # """ # try: # self.validate_component(comp_name, obj) # except rapidjson.ValidationError: # return False # return True
[docs] def get_component_schema(self, comp_name, subtype=None, relaxed=False, allow_instance=False, allow_instance_definitions=False, for_form=False): r"""Get the schema for a certain component. Args: comp_name (str): Name of the component to get the schema for. subtype (str, optional): Component subtype to get schema for. Defaults to None and the schema for evaluating any subtype of the specified component is returned. relaxed (bool, optional): If True, the returned schema (and any definitions it includes) are relaxed to allow for objects with objects with additional properties to pass validation. Defaults to False. allow_instance (bool, optional): If True, the returned schema will validate instances of this component in addition to documents describing a component. Defaults to False. allow_instance_definitions (bool, optional): If True, the definitions in the returned schema will allow for instances of the components. Defaults to False. for_form (bool, optional): If True, the returned schema will be formatted for easy parsing by form generation tools. Defaults to False. Causes relaxed and allow_instance to be ignored. **kwargs: Additonal keyword arguments are paseed to get_schema or get_subtype_schema for the selected component type. Returns: dict: Schema for the specified component. """ if comp_name not in self._storage: # pragma: debug raise ValueError("Unrecognized component: %s" % comp_name) if subtype is None: out = self._storage[comp_name].get_schema( allow_instance=allow_instance, for_form=for_form) else: out = self._storage[comp_name].get_subtype_schema( subtype, relaxed=relaxed, allow_instance=allow_instance, for_form=for_form) out['definitions'] = self.get_definitions( relaxed=relaxed, allow_instance=allow_instance_definitions, for_form=for_form) return out
# def get_component_keys(self, comp_name): # r"""Get the properties associated with a certain component. # Args: # comp_name (str): Name of the component to return keys for. # Returns: # list: All of the valid properties for the specified component. # """ # return self._storage[comp_name].properties