Source code for cis_interface.metaschema.datatypes

import os
import glob
import jsonschema
import copy
import importlib
from collections import OrderedDict
from cis_interface.metaschema.encoder import decode_json
from cis_interface.metaschema.properties import get_metaschema_property


_jsonschema_ver_maj = int(float(jsonschema.__version__.split('.')[0]))
_type_registry = OrderedDict()
_schema_dir = os.path.join(os.path.dirname(__file__), 'schemas')
_base_validator = jsonschema.validators.validator_for({"$schema": ""})
CIS_MSG_HEAD = b'CIS_MSG_HEAD'


[docs]class MetaschemaTypeError(TypeError): r"""Error that should be raised when a class encounters a type it cannot handle.""" pass
[docs]def register_type(type_class): r"""Register a type class, recording methods for encoding/decoding. Args: type_class (class): Class to be registered. Raises: ValueError: If the type is already registered. ValueError: If the type is a default JSON type. ValueError: If any of the type's properties are not registered. """ global _type_registry type_name = type_class.name if type_name in _type_registry: raise ValueError("Type '%s' already registered." % type_name) if (not type_class._replaces_existing): # pragma: debug if _jsonschema_ver_maj < 3: exist_flag = (type_name in _base_validator.DEFAULT_TYPES) else: exist_flag = (type_name in _base_validator.TYPE_CHECKER._type_checkers) if exist_flag: raise ValueError(("Type '%s' is a JSON default type " "which cannot be replaced.") % type_name) # Check properties for p in type_class.properties: prop_class = get_metaschema_property(p) if prop_class.name != p: raise ValueError("Type '%s' has unregistered property '%s'." % (type_name, p)) # Update property class with this type's info # TODO: Make sure this actually modifies the class # Type strings old = copy.deepcopy(list(prop_class.types)) new = [type_name] prop_class.types = tuple(set(old + new)) # Python types old = copy.deepcopy(list(prop_class.python_types)) try: new = list(type_class.python_types) except TypeError: new = [type_class.python_types] prop_class.python_types = tuple(set(old + new)) # Add to registry type_class._datatype = type_name type_class._schema_type = 'type' # type_class._schema_required = type_class.definition_schema()['required'] # type_class._schema_properties = {} # TODO: Transfer from # TODO: Enable schema tracking once ported to jsonschema # from cis_interface.schema import register_component # register_component(type_class) _type_registry[type_name] = type_class return type_class
[docs]def add_type_from_schema(path_to_schema, **kwargs): r"""Add a type from a schema in a file. Args: path_to_schema (string): Full path to the location of a schema file that can be loaded. target_globals (dict, optional): Globals dictionary for module where the fixed class should be added. If None, the new class is returned. Defaults to local globals. **kwargs: Additional keyword arguments are assumed to be attributes for the new class. """ from cis_interface.metaschema.datatypes.FixedMetaschemaType import ( create_fixed_type_class) if 'target_globals' not in kwargs: kwargs['target_globals'] = globals() if not os.path.isfile(path_to_schema): raise ValueError("The 'path_to_schema' attribute is not a valid path: " + "'%s'" % path_to_schema) with open(path_to_schema, 'r') as fd: out = decode_json(fd) jsonschema.validate(out, {'type': 'object', 'required': ['title', 'description', 'type']}) name = out['title'] if name in _type_registry: return name description = out['description'] base = get_type_class(out['type']) fixed_properties = out return create_fixed_type_class(name, description, base, fixed_properties, **kwargs)
[docs]def register_type_from_file(path_to_schema): r"""Decorator for registering a type by loading the schema describing it from a file. The original base class is discarded and replaced by one determined from the 'type' key in the schema. All attributes/methods for the class are preserved. Args: path_to_schema (str): Full path to the location of a schema file that can be loaded. Returns: function: Decorator that will modify a class according to the information provided in the schema. """ def _wrapped_decorator(type_class): out = add_type_from_schema(path_to_schema, target_globals=None, **type_class.__dict__) return out return _wrapped_decorator
[docs]def get_registered_types(): r"""Return a dictionary of registered types. Returns: dict: Registered type/class pairs. """ return _type_registry
[docs]def import_all_types(): r"""Import all types to ensure they are registered.""" for x in glob.glob(os.path.join(os.path.dirname(__file__), '*.py')): type_mod = os.path.basename(x)[:-3] if not type_mod.startswith('__'): importlib.import_module('cis_interface.metaschema.datatypes.%s' % type_mod) # Load types from schema schema_files = glob.glob(os.path.join(_schema_dir, '*.json')) names = [] for f in schema_files: names.append(add_type_from_schema(f)) # TODO: Need to make sure metaschema updated if it was already loaded from cis_interface.metaschema import _metaschema if _metaschema is not None: reload = False curr = _metaschema new_names = [] for n in names: if n not in curr['definitions']['simpleTypes']['enum']: # pragma: debug reload = True new_names.append(n) if reload: # pragma: debug raise Exception("The metaschema needs to be regenerated to include the " + "following new schemas found in schema files: %s" % new_names)
[docs]def complete_typedef(typedef): r"""Complete the type definition by converting it into the standard format. Args: typedef (str, dict, list): A type name, type definition dictionary, dictionary of subtype definitions, or a list of subtype definitions. Returns: dict: Type definition dictionary. Raises: TypeError: If typedef is not a valid type. """ schema_type = get_type_class('schema') out = schema_type.normalize(typedef) if not isinstance(out, dict) or ('type' not in out): raise TypeError("Cannot parse '%s' (type=%s) as type definition." % ( typedef, type(typedef))) return out
[docs]def get_type_class(type_name): r"""Return a type class given it's name. Args: type_name (str): Name of type class. Returns: class: Type class. """ if type_name not in _type_registry: raise ValueError("Class for type '%s' could not be found." % type_name) return _type_registry[type_name]
[docs]def get_type_from_def(typedef, dont_complete=False): r"""Return the type instance based on the provided type definition. Args: typedef (obj): This can be the name of a type, a dictionary containing a type definition (the 'typename' keyword must be specified), or a complex type (a list or dictionary containing types). dont_complete (bool, optional): If True, the type definition will be used as-is. Otherwise it will be completed using normalization which can be time consuming. Defaults to False. Returns: MetaschemaType: Instance of the appropriate type class. """ if not dont_complete: typedef = complete_typedef(typedef) out = get_type_class(typedef['type'])(**typedef) return out
[docs]def guess_type_from_msg(msg): r"""Guess the type class from a message. Args: msg (str, bytes): Message containing metadata. Raises: ValueError: If a type class cannot be determined. Returns: MetaschemaType: Instance of the appropriate type class. """ try: if CIS_MSG_HEAD in msg: _, metadata, data = msg.split(CIS_MSG_HEAD, 2) metadata = decode_json(metadata) cls = _type_registry[metadata['type']] else: raise Exception return cls except BaseException: raise ValueError("Could not guess type.")
[docs]def guess_type_from_obj(obj): r"""Guess the type class for a given Python object. Args: obj (object): Python object. Returns: MetaschemaType: Instance of the appropriate type class. Raises: ValueError: If a type class cannot be determined. """ type_encoder = get_metaschema_property('type') cls = get_type_class(type_encoder.encode(obj)) return cls
[docs]def encode_type(obj, typedef=None): r"""Encode an object into a JSON schema that can be used to both describe the object and validate others. Args: obj (object): Python object to be encoded. typedef (dict, optional): Type properties that should be used to initialize the encoded type definition in certain cases. Defaults to None and is ignored. Returns: dict: Encoded JSON schema describing the object. """ cls = guess_type_from_obj(obj) return cls.encode_type(obj, typedef=typedef)
[docs]def encode_data(obj, typedef=None): r"""Encode an object into a JSON serializable object. Args: obj (object): Python object to be encoded. typedef (dict, optional): JSON schema describing the object. Defaults to None and class is determined from the object. Returns: object: JSON serializable version of the object. """ if isinstance(typedef, dict) and ('type' in typedef): cls = get_type_class(typedef['type']) else: cls = guess_type_from_obj(obj) if typedef is None: metadata = cls.encode_type(obj) typedef = cls.extract_typedef(metadata) return cls.encode_data(obj, typedef=typedef)
[docs]def encode_data_readable(obj, typedef=None): r"""Encode an object into a JSON serializable object that is human readable but dosn't guarantee identical deserialization. Args: obj (object): Python object to be encoded. typedef (dict, optional): JSON schema describing the object. Defaults to None and class is determined from the object. Returns: object: JSON serializable version of the object. """ if isinstance(typedef, dict) and ('type' in typedef): cls = get_type_class(typedef['type']) else: cls = guess_type_from_obj(obj) if typedef is None: metadata = cls.encode_type(obj) typedef = cls.extract_typedef(metadata) return cls.encode_data_readable(obj, typedef=typedef)
[docs]def encode(obj): r"""Encode an object into a message. Args: obj (object): Python object to be encoded. Returns: bytes: Encoded message. """ cls = guess_type_from_obj(obj) metadata = cls.encode_type(obj) typedef = cls.extract_typedef(metadata) cls_inst = cls(**typedef) msg = cls_inst.serialize(obj) return msg
[docs]def decode(msg): r"""Decode an object from a message. Args: msg (bytes): Bytes encoded message. Returns: object: Decoded Python object. """ cls = guess_type_from_msg(msg) metadata = decode_json(msg.split(CIS_MSG_HEAD, 2)[1]) typedef = cls.extract_typedef(metadata) cls_inst = cls(**typedef) obj = cls_inst.deserialize(msg)[0] return obj
# def resolve_schema_references(schema, resolver=None): # r"""Resolve references within a schema. # # Args: # schema (dict): Schema with references to resolve. # top_level (dict, optional): Reference to the top level schema. # # Returns: # dict: Schema with references replaced with internal references. # # """ # if resolver is None: # if 'definitions' not in schema: # return schema # out = copy.deepcopy(schema) # resolver = jsonschema.RefResolver.from_schema(out) # else: # out = schema # if isinstance(out, dict): # if (len(out) == 1) and ('$ref' in out): # scope, resolved = resolver.resolve(out['$ref']) # out = resolved # else: # for k, v in out.items(): # out[k] = resolve_schema_references(v, resolver=resolver) # elif isinstance(out, (list, tuple)): # for i in range(len(out)): # out[i] = resolve_schema_references(out[i]) # return out
[docs]def compare_schema(schema1, schema2, root1=None, root2=None): r"""Compare two schemas for compatibility. Args: schema1 (dict): First schema. schema2 (dict): Second schema. root1 (dict, optional): Root for first schema. Defaults to None and is set to schema1. root2 (dict, optional): Root for second schema. Defaults to None and is set to schema2. Yields: str: Comparision failure messages. """ try: if root1 is None: root1 = jsonschema.RefResolver.from_schema(schema1) if root2 is None: root2 = jsonschema.RefResolver.from_schema(schema2) if (len(schema2) == 1) and ('$ref' in schema2): with root2.resolving(schema2['$ref']) as resolved_schema2: for e in compare_schema(schema1, resolved_schema2, root1=root1, root2=root2): yield e elif (len(schema1) == 1) and ('$ref' in schema1): with root1.resolving(schema1['$ref']) as resolved_schema1: for e in compare_schema(resolved_schema1, schema2, root1=root1, root2=root2): yield e elif ('type' not in schema2) or ('type' not in schema1): yield "Type required in both schemas for comparison." elif (schema1 != schema2): # Convert fixed types to base types type_cls1 = get_type_class(schema1['type']) if type_cls1.is_fixed: schema1 = type_cls1.typedef_fixed2base(schema1) type_list = schema2['type'] if not isinstance(schema2['type'], list): type_list = [type_list] all_errors = [] for itype in type_list: itype_cls2 = get_type_class(itype) ischema2 = copy.deepcopy(schema2) ischema2['type'] = itype if itype_cls2.is_fixed: ischema2 = itype_cls2.typedef_fixed2base(ischema2) # Compare contents of schema ierrors = [] for k, v in ischema2.items(): prop_cls = get_metaschema_property(k, skip_generic=True) if (prop_cls is None) or (k in ['title', 'default']): continue if k not in schema1: ierrors.append("Missing entry for required key '%s'" % k) continue if (k == 'properties') and ('required' in ischema2): vcp = copy.deepcopy(v) for k2 in list(vcp.keys()): if (((k2 not in schema1[k]) and (k2 not in ischema2['required']))): del vcp[k2] else: vcp = v ierrors += list(prop_cls.compare(schema1[k], vcp, root1=root1, root2=root2)) if len(ierrors) == 0: all_errors = [] break else: all_errors += ierrors for e in all_errors: yield e except BaseException as e: yield e