Source code for yggdrasil.metaschema.normalizer

# Normalizer adapated from the jsonschema validator
import pprint
import copy
import contextlib
import jsonschema
import logging
from collections import OrderedDict


logger = logging.getLogger(__name__)


[docs]class UndefinedProperty(object): r"""Class to be used as a flag for undefined properties.""" pass
[docs]class UninitializedNormalized(object): r"""Class to be used as a flag for uninitialized normalized value.""" pass
[docs]def create(*args, **kwargs): r"""Dynamically create a validation/normalization class that subclasses the jsonschema validation class. Args: normalizers (dict, optional): Keys are tuples representing paths that exist within the schema at which the normalization functions stored in lists as their value counterparts should be executed. Defaults to empty dictionary. no_defaults (bool, optional): If True, defaults will not be set during normalization. Defaults to False. required_defaults (bool, optional): If True, defaults will be set for required properties, even if no_defaults is True. Defaults to False. *args: Additional arguments are passed to jsonschema.validators.create. **kwargs: Additional keyword arguments are passed to jsonschema.validators.create. """ normalizers = kwargs.pop('normalizers', ()) no_defaults = kwargs.pop('no_defaults', ()) required_defaults = kwargs.pop('required_defaults', ()) validator_class = jsonschema.validators.create(*args, **kwargs) class Normalizer(validator_class): r"""Class that can be used to normalize (or validate) objects against JSON schemas. Args: *args: Additional arguments are passed to the base validator class. **kwargs: Additional keyword arguments are passed to the base validator class. Attributes: NORMALIZERS (dict): Keys are tuples representing paths that exist within the schema at which the normalization functions stored in lists as their value counterparts should be executed. NO_DEFAULTS (bool): If True, defaults will not be set during normalization. REQUIRED_DEFAULTS (bool): If True, defaults will be set for required properties, even if NO_DEFAULTS is True. """ NORMALIZER_VALIDATORS = OrderedDict([('default', None), ('type', None)]) NORMALIZERS = dict(normalizers) NO_DEFAULTS = no_defaults REQUIRED_DEFAULTS = required_defaults VERBOSE = False def __init__(self, *args, **kwargs): super(Normalizer, self).__init__(*args, **kwargs) self._normalized = UninitializedNormalized() self._normalizing = False self._old_settings = {} self._path_stack = [] self._schema_path_stack = [] self._normalized_stack = [] self._working_dir_stack = [] @classmethod def normalize_schema(cls, schema, **kwargs): r"""Normalize a schema against the metaschema. Args: schema (dict): Schema that should be normalized. **kwargs: Additional keyword arguments are passed to the normalize method. Returns: dict: Normalized schema. """ return cls(cls.META_SCHEMA).normalize(schema, **kwargs) @property def current_path(self): r"""tuple: Current path from the top of the instance to the current instance being validated/normalized.""" return tuple(self._path_stack) @property def current_schema_path(self): r"""tuple: Current path from the top of the schema to the current schema being used for validation/normalization.""" return tuple(self._schema_path_stack) @contextlib.contextmanager def normalizing(self, **kwargs): r"""Context for normalization that records normalizers before context is initialized so that they can be restored once the context exist. Args: **kwargs: Keyword arguments are treated as attributes that should be added to the class in the context. If the class already has an attribute of the same name, it is stored for restoration after the context exits. Yields: ValidationError: Errors encountered during validation. """ kwargs.update(iter_errors=self.iter_errors_normalize, descend=self.descend_normalize, _normalizing=True) for k, v in kwargs.items(): if k == 'normalizers': if self.NORMALIZERS: # pragma: debug raise Exception("Uncomment lines below to allow " + "addition of default normalizers.") # for ik, iv in self.NORMALIZERS.items(): # v.setdefault(ik, iv) elif k == 'validators': for ik, iv in self.VALIDATORS.items(): v.setdefault(ik, iv) elif k == 'normalizer_validators': for ik, iv in self.NORMALIZER_VALIDATORS.items(): v.setdefault(ik, iv) if hasattr(self, k.upper()): ksub = k.upper() else: ksub = k self._old_settings[ksub] = getattr(self, ksub, None) setattr(self, ksub, v) # Separate out validators that need to be run in a specific order # during normalization _migrated_validators = [] for k in self.NORMALIZER_VALIDATORS.keys(): if self.NORMALIZER_VALIDATORS[k] is None: _migrated_validators.append(k) self.NORMALIZER_VALIDATORS[k] = self.VALIDATORS.get(k, None) self.VALIDATORS[k] = None # Perform context and then cleanup try: yield finally: # Restore validators with special order for k in _migrated_validators: self.VALIDATORS[k] = self.NORMALIZER_VALIDATORS[k] self.NORMALIZER_VALIDATORS[k] = None # Restore old attributes for k, v in self._old_settings.items(): if v is None: delattr(self, k) else: setattr(self, k, v) self._old_settings = {} def iter_errors_normalize(self, instance, _schema=None): r"""Iterate through all of the errors encountered during validation of an instance at the current level or lower against properties in a schema. Args: instance (object): Instance that will be validated. _schema (dict, optional): Schema that the instance will be validated against. Defaults to the schema used to initialize the class. Yields: ValidationError: Errors encountered during validation of the instance. """ if _schema is None: _schema = self.schema if isinstance(self._normalized, UninitializedNormalized): self._normalized = copy.deepcopy(instance) if isinstance(_schema, dict) and (u"$ref" not in _schema): # Path based normalization try: # logger.info("schema_path=%s, type=%s, instance=%s, schema=%s" # % (self.current_schema_path, # type(self._normalized), self._normalized, # _schema)) if self.current_schema_path in self.NORMALIZERS: normalizers = self.NORMALIZERS[self.current_schema_path] for n in normalizers: self._normalized = n(self, None, self._normalized, _schema) except BaseException as e: error = jsonschema.ValidationError(str(e)) # set details if not already set by the called fn error._set( validator=n, validator_value=None, instance=self._normalized, schema=_schema) # if self.VERBOSE: # pragma: debug # logger.info('Error in normalization: %s' % e) yield error # Do defaults for required fields if (((((not self.NO_DEFAULTS) or self.REQUIRED_DEFAULTS) and isinstance(_schema.get('required', None), list) and isinstance(_schema.get('properties', None), dict) and self.is_type(self._normalized, "object")))): for k in _schema['required']: if (((k not in _schema['properties']) or (k in self._normalized))): continue default = _schema['properties'][k].get('default', None) self._normalized[k] = copy.deepcopy(default) # Perform normalization for properties that will change the # outcome of validation for k, validator in self.NORMALIZER_VALIDATORS.items(): if (((k != 'default') and isinstance(self._normalized, UndefinedProperty))): return if (validator is None) or (k not in _schema): continue v = _schema[k] errors = validator(self, v, self._normalized, _schema) or () for error in errors: # set details if not already set by the called fn error._set( validator=k, validator_value=v, instance=self._normalized, schema=_schema, ) if k != u"$ref": error.schema_path.appendleft(k) # if self.VERBOSE: # pragma: debug # logger.info('Error in early %s validation: %s' # % (k, error)) yield error for e in self._old_settings['iter_errors'](self._normalized, _schema=_schema): # if self.VERBOSE: # pragma: debug # logger.info('Error in base iter_errors: %s' % e) yield e def descend_normalize(self, instance, schema, path=None, schema_path=None): r"""Descend along a path in the schema/instance, recording information about the normalization state so that it can be replaced with the original value if there is a validation error along the descent path. Args: instance (object): Current instance being validated against the schema. schema (dict): Current schema that the instance is being validated against. path (str, int, optional): Path that resulted in the current instance. Defaults to None. schema_path (str, int, optional): Path that resulted in the current schema. Defaults to None. Yields: ValidationError: Errors raised during validation of the instance. """ old_normalized = self._normalized working_dir = None if isinstance(instance, dict) and ('working_dir' in instance): working_dir = instance['working_dir'] if path is not None: # self._normalized_stack.append(self._normalized) self._normalized = UninitializedNormalized() else: # self._normalized_stack.append(self._normalized) self._normalized = copy.deepcopy(self._normalized) if path is not None: self._path_stack.append(path) if schema_path is not None: self._schema_path_stack.append(schema_path) if working_dir is not None: self._working_dir_stack.append(working_dir) failed = False try: for error in self._old_settings['descend'](instance, schema, path=path, schema_path=schema_path): failed = True # if self.VERBOSE: # logger.info("Error in descent (path=%s, schema_path=%s): %s" # % (path, schema_path, error)) yield error finally: # old_normalized = self._normalized_stack.pop() if not (failed or isinstance(self._normalized, UndefinedProperty)): if path is not None: old_normalized[path] = self._normalized else: old_normalized = self._normalized self._normalized = old_normalized if path is not None: self._path_stack.pop() if schema_path is not None: self._schema_path_stack.pop() if working_dir is not None: self._working_dir_stack.pop() def validate(self, instance, _schema=None, normalize=False, **kwargs): r"""Validate an instance against a schema. Args: instance (object): Object to be validated. _schema (dict, optional): Schema by which the instance should be validated. Defaults to None and will be set to the schema used to create the class. normalize (bool, optional): If True, the instance will also be normalized as it is validated. Defaults to False. **kwargs: Additional keyword arguments are passed to the 'normalizing' context if normalize is True, otherwise they are ignored. Returns: object: Normalized instance if normalize == True. """ if normalize: with self.normalizing(**kwargs): super(Normalizer, self).validate(instance, _schema=_schema) out = self._normalized return out else: super(Normalizer, self).validate(instance, _schema=_schema) def normalize(self, instance, _schema=None, show_errors=False, **kwargs): r"""Normalize an instance during validation, allowing for aliases, defaults, or simple type conversions. Args: instance (object): Object to be normalized and validated. _schema (dict, optional): Schema by which the instance should be normalized and validated. Defaults to None and will be set to the schema used to create the class. show_errors (bool, optional): If True, any errors during the normalization are displayed. Defaults to False. **kwargs: Additional keyword arguments are passed to the 'normalizing' context. Returns: object: Normalized instance. """ if show_errors: self.VERBOSE = True with self.normalizing(**kwargs): errors = list(self.iter_errors(instance, _schema=_schema)) if show_errors: # pragma: debug for e in errors[::-1]: if e: logger.info(80 * '-') logger.info(e) if errors: logger.info(80 * '-') logger.info('Normalized:\n' + pprint.pformat(self._normalized)) if errors: return instance else: return self._normalized return Normalizer