Source code for yggdrasil.serialize.PandasSerialize

import pandas
import copy
import numpy as np
import warnings
import io as sio
from yggdrasil import platform, serialize
from yggdrasil.serialize.AsciiTableSerialize import AsciiTableSerialize
from yggdrasil.communication.transforms.PandasTransform import PandasTransform


[docs]class PandasSerialize(AsciiTableSerialize): r"""Class for serializing/deserializing Pandas data frames. Args: no_header (bool, optional): If True, headers will not be read or serialized from/to tables. Defaults to False. str_as_bytes (bool, optional): If True, strings in columns are read as bytes. Defaults to False. """ _seritype = 'pandas' _schema_subtype_description = ('Serializes tables using the pandas package.') _schema_properties = {'no_header': {'type': 'boolean', 'default': False}, 'str_as_bytes': {'type': 'boolean', 'default': False}} _schema_excluded_from_inherit = ['as_array'] default_read_meth = 'read' as_array = True concats_as_str = False # has_header = False def __init__(self, *args, **kwargs): self.write_header_once = False self.dont_write_header = kwargs.pop('dont_write_header', kwargs.get('no_header', False)) return super(PandasSerialize, self).__init__(*args, **kwargs) @property def empty_msg(self): r"""obj: Object indicating empty message.""" if self.numpy_dtype: return pandas.DataFrame(np.zeros(0, self.numpy_dtype)) else: return pandas.DataFrame(columns=self.get_field_names())
[docs] def get_field_names(self, *args, **kwargs): r"""Get the field names for an array of fields. Args: *args: Arguments are passed to the parent class's method. **kwargs: Keyword arguments are passed to the parent class's method. Returns: list: Names for each field in the data type. """ if self.no_header: return None return super(PandasSerialize, self).get_field_names(*args, **kwargs)
[docs] @classmethod def apply_field_names(cls, frame, field_names=None): r"""Apply field names as columns to a frame, first checking for a mapping. If there is a direct mapping, the columns are reordered to match the order of the field names. If there is not an overlap in the field names and columns, a one-to-one mapping is assumed, but a warning is issued. If there is a partial overlap, an error is raised. Args: frame (pandas.DataFrame): Frame to apply field names to as columns. field_names (list, optional): New field names that should be applied. If not provided, the original frame will be returned unaltered. Returns: pandas.DataFrame: Frame with updated field names. Raises: RuntimeError: If there is a partial overlap between the field names and columns. """ if field_names is None: return frame cols = frame.columns.tolist() if len(field_names) != len(cols): raise RuntimeError(f"Number of field names ({len(field_names)})" f" doesn't match number of columns in data" f" frame ({len(cols)}).") # Check for missing fields fmiss = [] for f in field_names: if f not in cols: fmiss.append(f) if fmiss: if len(fmiss) == len(field_names): warnings.warn("Assuming direct mapping of field names to columns. " "This may not be correct.") frame.columns = field_names else: # Partial overlap raise RuntimeError(f"{len(fmiss)} fields ({fmiss})" f" missing from frame: {frame}") else: # Reorder columns frame = frame[field_names] return frame
[docs] def cformat2nptype(self, *args, **kwargs): r"""Method to convert c format string to numpy data type. Args: *args: Arguments are passed to serialize.cformat2nptype. **kwargs: Keyword arguments are passed to serialize.cformat2nptype. Returns: np.dtype: Corresponding numpy data type. """ out = super(PandasSerialize, self).cformat2nptype(*args, **kwargs) if (out.char == 'S') and (not self.str_as_bytes): out = np.dtype('U%d' % out.itemsize) return out
[docs] def initialize_from_message(self, msg, **kwargs): r"""Initialize the serializer based on recieved message. Args: msg (object): Message that serializer should be initialized from. **kwargs: Additional keyword arguments are treated as metadata that may contain additional information for initializing the serializer. """ if self.field_names is None and not self.no_header: self.field_names = msg.columns.tolist() return super(PandasSerialize, self).initialize_from_message(msg, **kwargs)
[docs] def normalize(self, args): r"""Normalize a message to conform to the expected datatype. Args: args (object): Message arguments. Returns: object: Normalized message. """ from yggdrasil.serialize import numpy2pandas, list2pandas args = super(PandasSerialize, self).normalize(args) if isinstance(args, np.ndarray): args = numpy2pandas(args) elif isinstance(args, list): args = list2pandas(args) field_names = self.get_field_names() if ((field_names is None and not self.no_header and args.columns.tolist() == list(range(len(args.columns))))): field_names = ['f%d' % i for i in range(len(args.columns))] args = self.apply_field_names(args, field_names) return args
[docs] def func_serialize(self, args): r"""Serialize a message. Args: args (obj): Python object to be serialized. Returns: bytes, str: Serialized message. """ if not isinstance(args, pandas.DataFrame): raise TypeError(f"Pandas DataFrame required. Invalid type" f" of '{type(args)}' provided.") fd = sio.StringIO() # For Python 3 and higher, bytes need to be encoded args_ = self.normalize_bytes2unicode(args) args_.to_csv(fd, index=False, # Not in pandas <0.24 # line_terminator=self.newline.decode("utf-8"), sep=self.delimiter.decode("utf-8"), mode='w', encoding='utf8', header=(not self.dont_write_header)) if self.write_header_once: self.dont_write_header = True out = fd.getvalue() fd.close() # Required to change out \r\n for \n on windows out = out.encode("utf-8") out = out.replace(platform._newline, self.newline) return out
[docs] def func_deserialize(self, msg): r"""Deserialize a message. Args: msg (str, bytes): Message to be deserialized. Returns: obj: Deserialized Python object. """ fd = sio.BytesIO(msg) names = None dtype = None if self.initialized: np_dtype = self.numpy_dtype dtype = {} if self.no_header: dtype_names = range(len(np_dtype.names)) else: dtype_names = np_dtype.names for n in dtype_names: if np_dtype[n].char in ['U', 'S']: dtype[n] = object else: dtype[n] = np_dtype[n] kws = dict(sep=self.delimiter.decode("utf-8"), names=names, dtype=dtype, encoding='utf8', skipinitialspace=True) if self.no_header: kws['header'] = None out = pandas.read_csv(fd, **kws) out = out.dropna(axis='columns', how='all') fd.close() if self.str_as_bytes: out = self.normalize_unicode2bytes(out) # On windows, long != longlong and longlong requires special cformat # For now, long will be used to preserve the use of %ld to match long if platform._is_win: # pragma: windows if np.dtype('longlong').itemsize == 8: new_dtypes = dict() for c, d in zip(out.columns, out.dtypes): if d == np.dtype('longlong'): new_dtypes[c] = np.int32 else: new_dtypes[c] = d out = out.astype(new_dtypes, copy=False) # Reorder if necessary out = self.apply_field_names(out, self.get_field_names()) if dtype is not None: out = out.astype(dtype, copy=False) return out
@property def send_converter(self): kws = {} field_names = self.get_field_names() if field_names is not None: kws['field_names'] = field_names return PandasTransform(**kws)
[docs] @classmethod def normalize_unicode2bytes(cls, args): r"""Convert columns that are unicode to bytes. Args: args (pd.DataFrame): Pandas dataframe to convert columns for. Returns: pd.DataFrame: Version of args without unicode in columns. """ for c, d in zip(args.columns, args.dtypes): if (d == object) and isinstance(args[c][0], str): args[c] = args[c].apply(lambda s: s.encode('utf-8')) return args
[docs] @classmethod def normalize_bytes2unicode(cls, args): r"""Convert columns that are bytes to unicode. Args: args (pd.DataFrame): Pandas dataframe to convert columns for. Returns: pd.DataFrame: Version of args without bytes in columns. """ # For Python 3 and higher, bytes need to be encoded args_ = copy.deepcopy(args) for c in args.columns: if isinstance(args_[c][0], bytes): args_[c] = args_[c].apply(lambda s: s.decode('utf-8')) return args_
[docs] @classmethod def dict2object(cls, obj, field_names=None, **kwargs): r"""Conver a dictionary to a message object. Args: obj (dict): Dictionary to convert to serializable object. **kwargs: Additional keyword arguments are ignored. Returns: object: Serializable object. """ return serialize.dict2pandas(obj, order=field_names)
[docs] @classmethod def object2dict(cls, obj, **kwargs): r"""Convert a message object into a dictionary. Args: obj (object): Object that would be serialized by this class and should be returned in a dictionary form. **kwargs: Additional keyword arguments are ignored. Returns: dict: Dictionary version of the provided object. """ if isinstance(obj, pandas.DataFrame): return serialize.pandas2dict(obj) return super(PandasSerialize, cls).object2dict(obj, as_array=True, **kwargs)
[docs] @classmethod def object2array(cls, obj, **kwargs): r"""Convert a message object into an array. Args: obj (object): Object that would be serialized by this class and should be returned in an array form. **kwargs: Additional keyword arguments are ignored. Returns: np.array: Array version of the provided object. """ if isinstance(obj, pandas.DataFrame): return serialize.pandas2numpy(obj) return super(PandasSerialize, cls).object2array(obj, as_array=True, **kwargs)
[docs] @classmethod def concatenate(cls, objects, **kwargs): r"""Concatenate objects to get object that would be recieved if the concatenated serialization were deserialized. Args: objects (list): Objects to be concatenated. **kwargs: Additional keyword arguments are ignored. Returns: list: Set of objects that results from concatenating those provided. """ if len(objects) == 0: return [] if isinstance(objects[0], pandas.DataFrame): field_names = objects[0].columns.tolist() for i in range(1, len(objects)): objects[i] = cls.apply_field_names(objects[i], field_names) return [pandas.concat(objects, ignore_index=True)] return super(PandasSerialize, cls).concatenate(objects, as_array=True, **kwargs)
[docs] def consolidate_array(self, out): r"""Consolidate message into a structure numpy array if possible. Args: out (list, tuple, np.ndarray): Object to consolidate into a structured numpy array. Returns: np.ndarray: Structured numpy array containing consolidated message. Raises: ValueError: If the array cannot be consolidated. """ if isinstance(out, pandas.DataFrame): out = serialize.pandas2numpy(out) return super(PandasSerialize, self).consolidate_array(out)
[docs] @classmethod def get_testing_options(cls, not_as_frames=False, no_names=False, no_header=False, **kwargs): r"""Method to return a dictionary of testing options for this class. Args: not_as_frames (bool, optional): If True, the returned example includes data that is not in a pandas data frame. Defaults to False. no_names (bool, optional): If True, an example is returned where the names are not provided to the deserializer. Defaults to False. no_header (bool, optional): If True, an example is returned where a header is not included. Defaults to False. Returns: dict: Dictionary of variables to use for testing. """ kwargs.setdefault('table_string_type', 'string') kwargs.setdefault('no_names', (no_names or no_header)) field_names = None out = super(PandasSerialize, cls).get_testing_options(array_columns=True, **kwargs) if kwargs['table_string_type'] == 'bytes': out['kwargs']['str_as_bytes'] = True for k in ['as_array']: # , 'format_str']: if k in out['kwargs']: del out['kwargs'][k] out['extra_kwargs'] = {} if no_names: header_line = b'f0\tf1\tf2\n' elif no_header: header_line = b'' out['kwargs']['no_header'] = True else: if 'field_names' in out['kwargs']: field_names = out['kwargs']['field_names'] header_line = b'name\tcount\tsize\n' out['contents'] = (header_line + b'one\t1\t1.0\n' + b'two\t2\t2.0\n' + b'three\t3\t3.0\n' + b'one\t1\t1.0\n' + b'two\t2\t2.0\n' + b'three\t3\t3.0\n') out['concatenate'] = [([], [])] if not_as_frames: pass elif no_header: out['objects'] = [serialize.list2pandas(x) for x in out['objects']] out['dtype'] = np.dtype(','.join([x[1] for x in out['dtype'].descr])) else: if field_names is None: field_names = ['f0', 'f1', 'f2'] out['objects'] = [serialize.list2pandas(x, names=field_names) for x in out['objects']] out['empty'] = pandas.DataFrame(np.zeros(0, out['dtype'])) return out
[docs] def enable_file_header(self): r"""Set serializer attributes to enable file headers to be included in the serializations.""" self.dont_write_header = False self.write_header_once = True
[docs] def disable_file_header(self): r"""Set serializer attributes to disable file headers from being included in the serializations.""" self.dont_write_header = True self.write_header_once = True
[docs] def serialize_file_header(self): r"""Return the serialized header information that should be prepended to files serialized using this class. Returns: bytes: Header string that should be written to the file. """ return b''
[docs] def deserialize_file_header(self, fd): r"""Deserialize the header information from the file and update the serializer. Args: fd (file): File containing header. """ pass