Source code for yggdrasil.communication.transforms.TransformBase

import copy
import collections
from yggdrasil import rapidjson
from yggdrasil.components import ComponentBase


[docs]class TransformBase(ComponentBase): r"""Base class for message transforms. Args: original_datatype (dict, optional): Datatype associated with expected messages. Defaults to None. """ _transformtype = None _schema_type = 'transform' _schema_subtype_key = 'transformtype' _schema_properties = {'original_datatype': {'type': 'schema'}} _schema_additional_kwargs = {'allowSingular': 'transformtype'} def __init__(self, *args, **kwargs): self._state = {} super(TransformBase, self).__init__(*args, **kwargs) self._transformed_datatype = None if self.original_datatype: self.set_original_datatype(self.original_datatype) @property def transformed_datatype(self): r"""dict: The transformed datatype.""" if self._transformed_datatype is None: out = None if self.original_datatype: out = self.transform_datatype(self.original_datatype) return out return self._transformed_datatype
[docs] def set_original_datatype(self, datatype): r"""Set datatype. Args: datatype (dict): Datatype. """ self.validate_datatype(datatype) self.original_datatype = datatype
[docs] def set_original_datatype_from_data(self, data): r"""Set datatype from data. Args: data (object): Data object. """ self.set_original_datatype(rapidjson.encode_schema(data, minimal=True))
[docs] def set_transformed_datatype(self, datatype): r"""Set datatype. Args: datatype (dict): Datatype. """ self._transformed_datatype = datatype
[docs] def set_transformed_datatype_from_data(self, data): r"""Set datatype from data. Args: data (object): Data object. """ if isinstance(data, collections.abc.Iterator): item_type = None for x in copy.deepcopy(data): x_type = rapidjson.encode_schema(x, minimal=True) if item_type is None: item_type = x_type elif item_type != x_type: item_type = {"type": "any"} break return self.set_transformed_datatype(item_type) self.set_transformed_datatype( rapidjson.encode_schema(data, minimal=True))
[docs] def validate_datatype(self, datatype): r"""Assert that the provided datatype is valid for this transformation. Args: datatype (dict): Datatype to validate. Raises: AssertionError: If the datatype is not valid. """ pass
[docs] def transform_datatype(self, datatype): r"""Determine the datatype that will result from applying the transform to the supplied datatype. Args: datatype (dict): Datatype to transform. Returns: dict: Transformed datatype. """ try: out = rapidjson.encode_schema(self(rapidjson.generate_data(datatype)), minimal=True) if (((out['type'] == 'array') and (datatype['type'] == 'array') and isinstance(out['items'], list) and isinstance(datatype['items'], list) and (len(out['items']) == len(datatype['items'])))): for x, y in zip(out['items'], datatype['items']): if 'title' in y: x.setdefault('title', y['title']) return out except NotImplementedError: # pragma: debug return datatype
[docs] def evaluate_transform(self, x, no_copy=False): r"""Call transform on the provided message. Args: x (object): Message object to transform. no_copy (bool, optional): If True, the transformation occurs in place. Otherwise a copy is created and transformed. Defaults to False. Returns: object: The transformed message. """ raise NotImplementedError # pragma: debug
[docs] def call_transform(self, x, no_init=False, **kwargs): r"""Call transform, setting datatypes during the process. Args: x (object): Message object to transform. no_init (bool, optional): If True, the datatype is not initialized if it is not already set. Defaults to False. **kwargs: Additional keyword arguments are passed to evaluate_transform. Returns: object: The transformed message. """ if (not self.original_datatype) and (not no_init): self.set_original_datatype_from_data(x) out = self.evaluate_transform(x, **kwargs) if (not self._transformed_datatype) and (not no_init): self.set_transformed_datatype_from_data(out) return out
[docs] def __call__(self, x, no_init=False, **kwargs): r"""Call transform on the provided message. Args: x (object): Message object to transform. no_init (bool, optional): If True, the datatype is not initialized if it is not already set. Defaults to False. **kwargs: Additional keyword arguments are passed to call_transform. Returns: object: The transformed message. """ if isinstance(x, bytes) and (len(x) == 0) and no_init: return b'' if isinstance(x, collections.abc.Iterator): xlist = list(x) out = iter([ self.call_transform(xx, no_init=no_init, **kwargs) for xx in xlist ]) else: out = self.call_transform(x, no_init=no_init, **kwargs) return out