import numpy as np
from cis_interface import serialize, backwards, platform
from cis_interface.tests import assert_raises, assert_equal
from cis_interface.serialize.DefaultSerialize import DefaultSerialize
unsupported_nptype = ['bool_']
map_nptype2cformat = [
(['float_', 'float16', 'float32', 'float64'], '%g'),
(['complex_', 'complex64', 'complex128'], '%g%+gj'),
# (['int8', 'short', 'intc', 'int_', 'longlong'], '%d'),
# (['uint8', 'ushort', 'uintc', 'uint64', 'ulonglong'], '%u'),
('int8', '%hhd'), ('short', '%hd'), ('intc', '%d'),
('uint8', '%hhu'), ('ushort', '%hu'), ('uintc', '%u'),
('S', '%s'), ('S5', '%5s'), ('U', '%s'), ('U5', '%20s')]
if platform._is_win: # pragma: windows
map_nptype2cformat.append(('int64', '%l64d'))
map_nptype2cformat.append(('uint64', '%l64u'))
else:
map_nptype2cformat.append(('int64', '%ld'))
map_nptype2cformat.append(('uint64', '%lu'))
# Conditional on if default int 32bit or 64bit
# This is for when default int is 32bit
if np.dtype('int_') != np.dtype('intc'):
map_nptype2cformat.append(('int_', '%ld'))
else:
map_nptype2cformat.append(('int_', '%d')) # pragma: windows
if np.dtype('int_') != np.dtype('longlong'):
if platform._is_win: # pragma: windows
map_nptype2cformat.append(('longlong', '%l64d'))
map_nptype2cformat.append(('ulonglong', '%l64u'))
else: # pragma: debug
map_nptype2cformat.append(('longlong', '%lld'))
map_nptype2cformat.append(('ulonglong', '%llu'))
map_cformat2pyscanf = [(['%5s', '%s'], '%s'),
('%s', '%s'),
# (['%hhd', '%hd', '%d', '%ld', '%lld', '%l64d'], '%d'),
# (['%hhu', '%hu', '%u', '%lu', '%llu', '%l64u'], '%u'),
('%g%+gj', '%g%+gj')]
unsupported_cfmt = ['a', 'A', 'p', 'n', '']
map_cformat2nptype = [(['f', 'F', 'e', 'E', 'g', 'G'], 'float64'),
# (['f', 'F', 'e', 'E', 'g', 'G'], 'float32'),
# (['lf', 'lF', 'le', 'lE', 'lg', 'lG'], 'float64'),
(['hhd', 'hhi'], 'int8'),
(['hd', 'hi'], 'short'),
(['d', 'i'], 'intc'),
(['ld', 'li'], 'int_'),
(['lld', 'lli', 'l64d'], 'longlong'),
(['hhu', 'hho', 'hhx', 'hhX'], 'uint8'),
(['hu', 'ho', 'hx', 'hX'], 'ushort'),
(['u', 'o', 'x', 'X'], 'uintc'),
(['lu', 'lo', 'lx', 'lX'], 'uint64'),
(['llu', 'llo', 'llx', 'llX', 'l64u'], 'ulonglong'),
(['c', 's'], backwards.np_dtype_str),
('s', backwards.np_dtype_str)]
# if np.dtype('int_') != np.dtype('intc'):
# map_cformat2nptype.append((['ld', 'li'], 'int_'))
map_cformat2nptype.append(
(['%{}%+{}j'.format(_, _) for _ in ['f', 'F', 'e', 'E', 'g', 'G']],
'complex128'))
[docs]def test_get_registered_serializers():
r"""Test get_registered_serializers."""
registry = serialize.get_registered_serializers()
assert_equal(registry[DefaultSerialize._seritype], DefaultSerialize)
[docs]def test_register_serializer_errors():
r"""Test errors in register_serializer for duplicate."""
assert_raises(ValueError, serialize.register_serializer, DefaultSerialize)
[docs]def test_get_serializer_class_errors():
r"""Test errors in get_serializer_class for invalid serializer."""
assert_raises(ValueError, serialize.get_serializer_class, 'invalid')
[docs]def test_combine_flds():
r"""Test combine_flds."""
names0 = ['f0', 'f1', 'f2', 'f3']
names1 = ["name", "number", "value", "complex"]
dtypes = ['S5', 'i8', 'f8', 'c16']
dtype0 = np.dtype([(n, f) for n, f in zip(names0, dtypes)])
dtype1 = np.dtype([(n, f) for n, f in zip(names1, dtypes)])
nfld = len(names0)
nele = 5
arrs = [np.zeros(nele, dtype=dtype0[i]) for i in range(nfld)]
np.testing.assert_array_equal(serialize.combine_flds(arrs),
np.zeros(nele, dtype0))
np.testing.assert_array_equal(serialize.combine_flds(arrs, dtype=dtype1),
np.zeros(nele, dtype1))
assert_raises(ValueError, serialize.combine_flds, arrs[:-1], dtype=dtype0)
arrs[0] = np.zeros(nele - 1, dtype=dtype0[0])
assert_raises(ValueError, serialize.combine_flds, arrs)
[docs]def test_combine_eles():
r"""Test combine_eles."""
names0 = ['f0', 'f1', 'f2', 'f3']
names1 = ["name", "number", "value", "complex"]
dtypes = ['S5', 'i8', 'f8', 'c16']
dtype0 = np.dtype([(n, f) for n, f in zip(names0, dtypes)])
dtype1 = np.dtype([(n, f) for n, f in zip(names1, dtypes)])
dtype_short = np.dtype([(n, f) for n, f in zip(names1[:-1], dtypes[:-1])])
# nfld = len(names0)
nele = 5
res0 = np.zeros(nele, dtype0)
res1 = np.zeros(nele, dtype1)
res0['f0'][0] = 'hello'
res1['name'][0] = 'hello'
arrs = [np.zeros(1, dtype=dtype0) for i in range(nele)]
arrs[0]['f0'] = b'hello'
arrs_list = [a.tolist()[0] for a in arrs]
arrs_void = [res1[i] for i in range(nele)]
arrs_mixd = [a.tolist() for a in res1]
arrs_mixd[-1] = arrs_void[-1]
if platform._is_win: # pragma: windows
if backwards.PY2: # pragma: Python 2
res0_list = res0
res1_list = res1
else: # pragma: Python 3
dtype0_w = np.dtype({'names': names0, 'formats': ['S5', 'i4', 'f8', 'c16']})
dtype1_w = np.dtype({'names': names1, 'formats': ['S5', 'i4', 'f8', 'c16']})
res0_list = res0.astype(dtype0_w)
res1_list = res1.astype(dtype1_w)
else:
res0_list = res0
res1_list = res1
np.testing.assert_array_equal(serialize.combine_eles([res1[n][0] for n in names1]),
res0[0])
np.testing.assert_array_equal(serialize.combine_eles(arrs), res0)
np.testing.assert_array_equal(serialize.combine_eles(arrs_list), res0_list)
np.testing.assert_array_equal(serialize.combine_eles(arrs_void), res1)
np.testing.assert_array_equal(serialize.combine_eles(arrs_mixd), res1_list)
np.testing.assert_array_equal(serialize.combine_eles(arrs, dtype=dtype1),
res1)
np.testing.assert_array_equal(serialize.combine_eles(arrs_list, dtype=dtype1),
res1)
arrs_list[0] = arrs_list[0][:-1]
assert_raises(ValueError, serialize.combine_eles, arrs_list, dtype=dtype1)
assert_raises(ValueError, serialize.combine_eles, arrs_list)
arrs_list[0] = None
assert_raises(TypeError, serialize.combine_eles, arrs_list)
assert_raises(ValueError, serialize.combine_eles, arrs_void,
dtype=dtype_short)
[docs]def test_consolidate_array():
r"""Test consolidation of array information in different forms."""
names0 = ['f0', 'f1', 'f2', 'f3']
# names1 = ["name", "number", "value", "complex"]
dtypes = ['S5', 'i8', 'f8', 'c16']
dtype0 = np.dtype([(n, f) for n, f in zip(names0, dtypes)])
if platform._is_win: # pragma: windows
if backwards.PY2: # pragma: Python 2
dtype0_list = dtype0
else: # pragma: Python 3
dtype0_list = np.dtype({'names': names0,
'formats': ['S5', 'i4', 'f8', 'c16']})
else:
dtype0_list = dtype0
# dtype1 = np.dtype([(n, f) for n, f in zip(names1, dtypes)])
dtype2 = np.dtype([('f0', 'i8'), ('f1', 'f8')])
dtype3 = np.dtype([('f0', 'i4'), ('f1', 'f4')])
shape = (5, 5)
# Create list of input variables
dlist = []
x0list = []
x1list = []
for dtype in [np.dtype('float'), dtype0]:
x = np.zeros(shape, dtype=dtype)
if dtype == dtype0:
x['f0'][:] = 'hello'
x_flat = x.flatten()
dlist.append(dtype)
x0list.append(x)
x1list.append(x)
if len(dtype) > 0:
dlist += [dtype, dtype]
if dtype == dtype0:
dtype_list = dtype0_list
else: # pragma: debug
dtype_list = dtype
dlist.append(dtype_list)
x0list += [x, x_flat, x_flat.astype(dtype_list)]
# Tests with lists of arrays rows or columns
x1list.append([x[n] for n in dtype.names])
x1list.append([x_flat[i] for i in range(len(x_flat))])
x1list.append([x_flat[i].tolist() for i in range(len(x_flat))])
# Tests with single array mapped onto structured array
nrow = 4
x0 = np.zeros(nrow, dtype2)
x1 = np.zeros((nrow, len(dtype2)))
dlist += [None]
x0list += [x0]
x1list.append(x0)
x2 = serialize.consolidate_array(x1, dtype=dtype2)
np.testing.assert_array_equal(x0, x2)
np.testing.assert_array_equal(x0, serialize.consolidate_array(
np.zeros(nrow, dtype3), dtype=dtype2))
# Test with list of arrays
arr_void = np.ones(5, dtype0)
x0list.append(arr_void)
x1list.append([a for a in arr_void])
dlist.append(dtype0)
# Loop over different test inputs
i = 0
for x0, x1, dtype in zip(x0list, x1list, dlist):
i += 1
# Dtype provided
x2 = serialize.consolidate_array(x1, dtype=dtype)
np.testing.assert_array_equal(x0, x2)
# Dtype not provided
x2 = serialize.consolidate_array(x1)
np.testing.assert_array_equal(x0, x2)
# Error on incorrect data format
assert_raises(ValueError, serialize.consolidate_array,
np.zeros((4, 1)), dtype=dtype2)
# Error on incorrect type
assert_raises(TypeError, serialize.consolidate_array, None)
# Error on dtypes with differing numbers of fields
assert_raises(ValueError, serialize.consolidate_array,
np.zeros(3, dtype0), dtype=dtype3)
[docs]def test_array_to_table():
r"""Test conversion of arrays to ASCII table and back."""
flist = ['# %5s\t%ld\t%lf\t%g%+gj\n']
for use_astropy in [False, True]:
for f in flist:
dtype = serialize.cformat2nptype(f)
arr0 = np.ones(5, dtype)
arr0['f0'][0] = b'hello'
tab = serialize.array_to_table(arr0, f, use_astropy=use_astropy)
arr1 = serialize.table_to_array(tab, f, use_astropy=use_astropy)
np.testing.assert_array_equal(arr1, arr0)
[docs]def test_array_to_bytes():
r"""Test conversion of arrays to bytes and back."""
names0 = ['f0', 'f1', 'f2', 'f3']
# names1 = ["name", "number", "value", "complex"]
dtypes = ['S5', 'i8', 'f8', 'c16']
dtype0 = np.dtype([(n, f) for n, f in zip(names0, dtypes)])
# dtype1 = np.dtype([(n, f) for n, f in zip(names1, dtypes)])
shape = (5, 5)
for order in ['C', 'F']:
for dtype in [np.dtype('float'), dtype0]:
x0 = np.zeros(shape, dtype=dtype)
if dtype == dtype0:
x0['f0'][:] = 'hello'
# An array of the same type, dtype provided
b = serialize.array_to_bytes(x0, dtype=dtype, order=order)
x1 = serialize.bytes_to_array(b, dtype, order=order, shape=shape)
np.testing.assert_array_equal(x1, x0)
# An array of the same type, dtype not provided
b = serialize.array_to_bytes(x0, order=order)
x1 = serialize.bytes_to_array(b, dtype, order=order, shape=shape)
np.testing.assert_array_equal(x1, x0)
# Tests with single array mapped onto structured array
dtype2 = np.dtype([('f0', 'i8'), ('f1', 'f8')])
nrow = 4
ncol = len(dtype2)
b0 = serialize.array_to_bytes(np.zeros(nrow, dtype2), order=order)
b1 = serialize.array_to_bytes(np.zeros((nrow, ncol)), dtype=dtype2,
order=order)
assert_equal(b1, b0)
# Error on incomplete serial array
assert_raises(RuntimeError, serialize.bytes_to_array,
b0[:-1], dtype2, order=order)
[docs]def test_dict2list():
r"""Test conversion of a dictionary to a list and back."""
assert_raises(TypeError, serialize.dict2list, None)
assert_raises(TypeError, serialize.list2dict, None)
[docs]def test_numpy2pandas():
r"""Test conversion of a numpy array to a pandas data frame and back."""
assert_raises(TypeError, serialize.numpy2pandas, None)
assert_raises(TypeError, serialize.pandas2numpy, None)
nele = 5
names = ["name", "number", "value", "complex"]
dtypes = ['S5', 'i8', 'f8', 'c16']
dtype = np.dtype([(n, f) for n, f in zip(names, dtypes)])
arr_mix = np.zeros(nele, dtype)
arr_mix['name'][0] = 'hello'
arr_obj = np.array([list(), 'hello', 5], dtype='O')
test_arrs = [arr_mix,
np.zeros(nele, 'float'),
arr_mix['name'],
arr_obj,
np.array([])]
for ans in test_arrs:
frame = serialize.numpy2pandas(ans)
res = serialize.pandas2numpy(frame)
np.testing.assert_array_equal(ans, res)
[docs]def test_numpy2list():
r"""Test conversion of a numpy array to a list and back."""
assert_raises(TypeError, serialize.numpy2list, None)
assert_raises(TypeError, serialize.list2numpy, None)
nele = 5
names = ["complex", "name", "number", "value"]
dtypes = ['c16', 'S5', 'i8', 'f8']
dtype = np.dtype([(n, f) for n, f in zip(names, dtypes)])
arr_mix = np.zeros(nele, dtype)
arr_mix['name'][0] = 'hello'
test_arrs = [arr_mix]
for ans in test_arrs:
d = serialize.numpy2list(ans)
# Name provided
res = serialize.list2numpy(d, names=names)
np.testing.assert_array_equal(ans, res)
[docs]def test_numpy2dict():
r"""Test conversion of a numpy array to a dictionary and back."""
assert_raises(TypeError, serialize.numpy2dict, None)
assert_raises(TypeError, serialize.dict2numpy, None)
nele = 5
names = ["complex", "name", "number", "value"]
dtypes = ['c16', 'S5', 'i8', 'f8']
dtype = np.dtype([(n, f) for n, f in zip(names, dtypes)])
arr_mix = np.zeros(nele, dtype)
arr_mix['name'][0] = 'hello'
test_arrs = [arr_mix]
for ans in test_arrs:
d = serialize.numpy2dict(ans)
# Sorted
res = serialize.dict2numpy(d)
np.testing.assert_array_equal(ans, res)
# Provided
res = serialize.dict2numpy(d, order=names)
np.testing.assert_array_equal(ans, res)
[docs]def test_pandas2dict():
r"""Test conversion of a Pandas data frame to a dictionary and back."""
assert_raises(TypeError, serialize.dict2pandas, None)
assert_raises(TypeError, serialize.pandas2dict, None)
nele = 5
names = ["complex", "name", "number", "value"]
dtypes = ['c16', 'S5', 'i8', 'f8']
dtype = np.dtype([(n, f) for n, f in zip(names, dtypes)])
arr_mix = np.zeros(nele, dtype)
arr_mix['name'][0] = 'hello'
arr_obj = np.array([list(), 'hello', 5], dtype='O')
test_arrs = [arr_mix,
np.zeros(nele, 'float'),
arr_mix['name'],
arr_obj,
np.array([])]
for ans in test_arrs:
frame = serialize.numpy2pandas(ans)
# Sorted
d = serialize.pandas2dict(frame)
res = serialize.dict2pandas(d)
np.testing.assert_array_equal(res, frame)
# Provided
d = serialize.pandas2dict(frame)
res = serialize.dict2pandas(d, order=ans.dtype.names)
np.testing.assert_array_equal(res, frame)