Source code for cis_interface.serialize.PandasSerialize

import pandas
import copy
import numpy as np
import warnings
from cis_interface import backwards, platform
from cis_interface.metaschema.datatypes.ArrayMetaschemaType import (
    OneDArrayMetaschemaType)
from cis_interface.serialize import (
    register_serializer, pandas2numpy, list2pandas)
from cis_interface.serialize.AsciiTableSerialize import AsciiTableSerialize


[docs]@register_serializer class PandasSerialize(AsciiTableSerialize): r"""Class for serializing/deserializing Pandas data frames. Args: delimiter (str, optional): Delimiter that should be used to serialize pandas data frames to/from csv style files. Defaults to \t. write_header (bool, optional): If True, headers will be added to serialized tables. Defaults to True. """ _seritype = 'pandas' _schema_properties = dict( AsciiTableSerialize._schema_properties, write_header={'type': 'bool', 'default': True}) @property def empty_msg(self): r"""obj: Object indicating empty message.""" return pandas.DataFrame()
[docs] def apply_field_names(self, frame): r"""Apply field names as columns to a frame, first checking for a mapping. If there is a direct mapping, the columns are reordered to match the order of the field names. If there is not an overlap in the field names and columns, a one-to-one mapping is assumed, but a warning is issued. If there is a partial overlap, an error is raised. Args: frame (pandas.DataFrame): Frame to apply field names to as columns. Returns: pandas.DataFrame: Frame with updated field names. Raises: RuntimeError: If there is a partial overlap between the field names and columns. """ field_names = self.get_field_names() if field_names is None: return frame cols = frame.columns.tolist() if len(field_names) != len(cols): raise RuntimeError(("Number of field names (%d) doesn't match " + "number of columns in data frame (%d).") % (len(field_names), len(cols))) # Check for missing fields fmiss = [] for f in field_names: if f not in cols: fmiss.append(f) if fmiss: if len(fmiss) == len(field_names): warnings.warn("Assuming direct mapping of field names to columns. " + "This may not be correct.") frame.columns = field_names else: # Partial overlap raise RuntimeError("%d fields missing from frame: %s" % (len(fmiss), str(fmiss))) else: # Reorder columns frame = frame[field_names] return frame
[docs] def func_serialize(self, args): r"""Serialize a message. Args: args (obj): Python object to be serialized. Returns: bytes, str: Serialized message. """ if not isinstance(args, pandas.DataFrame): raise TypeError(("Pandas DataFrame required. Invalid type " + "of '%s' provided.") % type(args)) fd = backwards.StringIO() if backwards.PY2: # pragma: Python 2 args_ = args else: # pragma: Python 3 # For Python 3 and higher, bytes need to be encoded args_ = copy.deepcopy(args) for c in args.columns: if isinstance(args_[c][0], backwards.bytes_type): args_[c] = args_[c].apply(lambda s: s.decode('utf-8')) args_ = self.apply_field_names(args_) args_.to_csv(fd, index=False, # Not in pandas <0.24 # line_terminator=backwards.as_str(self.newline), sep=backwards.as_str(self.delimiter), mode='wb', encoding='utf8', header=self.write_header) out = fd.getvalue() fd.close() # Required to change out \r\n for \n on windows out = out.replace( backwards.match_stype(out, platform._newline), backwards.match_stype(out, self.newline)) return backwards.as_bytes(out)
[docs] def func_deserialize(self, msg): r"""Deserialize a message. Args: msg (str, bytes): Message to be deserialized. Returns: obj: Deserialized Python object. """ fd = backwards.BytesIO(msg) out = pandas.read_csv(fd, sep=backwards.as_str(self.delimiter), encoding='utf8') fd.close() if not backwards.PY2: # For Python 3 and higher, make sure strings are bytes for c, d in zip(out.columns, out.dtypes): if d == object: out[c] = out[c].apply(lambda s: s.encode('utf-8')) # On windows, long != longlong and longlong requires special cformat # For now, long will be used to preserve the use of %ld to match long if platform._is_win: # pragma: windows if np.dtype('longlong').itemsize == 8: new_dtypes = dict() for c, d in zip(out.columns, out.dtypes): if d == np.dtype('longlong'): new_dtypes[c] = np.int32 else: new_dtypes[c] = d out = out.astype(new_dtypes, copy=False) # Reorder if necessary out = self.apply_field_names(out) if self.field_names is None: self.field_names = out.columns.tolist() # for c, d in zip(out.columns, out.dtypes): # if d == object: # out[c] = out[c].apply(lambda s: s.strip()) if not self._initialized: typedef = {'type': 'array', 'items': []} np_out = pandas2numpy(out) for n in self.get_field_names(): typedef['items'].append(OneDArrayMetaschemaType.encode_type( np_out[n], title=n)) self.update_serializer(extract=True, **typedef) return out
[docs] @classmethod def get_testing_options(cls, no_names=False, **kwargs): r"""Method to return a dictionary of testing options for this class. Args: no_names (bool, optional): If True, an example is returned where the names are not provided to the deserializer. Defaults to False. Returns: dict: Dictionary of variables to use for testing. """ out = super(PandasSerialize, cls).get_testing_options(as_array=True) for k in ['as_array']: del out['kwargs'][k] out['extra_kwargs'] = {} out['empty'] = pandas.DataFrame() if no_names: del out['kwargs']['field_names'] out['objects'] = [list2pandas(x) for x in out['objects']] out['contents'] = (b'f0\tf1\tf2\n' + b'one\t1\t1.0\n' + b'two\t2\t2.0\n' + b'three\t3\t3.0\n' + b'one\t1\t1.0\n' + b'two\t2\t2.0\n' + b'three\t3\t3.0\n') else: field_names = [backwards.as_str(x) for x in out['kwargs']['field_names']] out['objects'] = [list2pandas(x, names=field_names) for x in out['objects']] out['contents'] = (b'name\tcount\tsize\n' + b'one\t1\t1.0\n' + b'two\t2\t2.0\n' + b'three\t3\t3.0\n' + b'one\t1\t1.0\n' + b'two\t2\t2.0\n' + b'three\t3\t3.0\n') out['kwargs'].update(out['typedef']) return out