import re
import numpy as np
import pandas as pd
import deprecation
from collections import OrderedDict
from ._version import get_versions
from yggdrasil import tools, constants
from yggdrasil.rapidjson import units as units_
# Use get_versions to prevent circular import from importing it
# from yggdrasil's __init__
__version__ = get_versions()['version']
# TODO: This import fails saying yggdrasil.rapidjson is not a package so
# we need to find a work around
# from yggdrasil.rapidjson.units import Quantity, QuantityArray, Units, UnitsError
Quantity = units_.Quantity
QuantityArray = units_.QuantityArray
Units = units_.Units
UnitsError = units_.UnitsError
_unit_quantity = Quantity
_unit_array = QuantityArray
PYTHON_SCALARS_WITH_UNITS = OrderedDict([
(k, tuple(list(v) + [_unit_quantity]))
for k, v in constants.PYTHON_SCALARS.items()])
ALL_PYTHON_ARRAYS_WITH_UNITS = tuple(
list(constants.ALL_PYTHON_ARRAYS) + [_unit_array])
ALL_PYTHON_SCALARS_WITH_UNITS = tuple(
list(constants.ALL_PYTHON_SCALARS) + [_unit_quantity])
[docs]def convert_to_pandas_timedelta(x):
r"""Convert variable with time units to a pandas.Timedelta instance.
Args:
x (object): Scalar/array with units to convert to a pandas.Timedelta
instance.
Returns:
pandas.Timedelta: Equivalent Timedelta variable.
"""
assert has_units(x)
t_data = get_data(x)
t_unit = get_units(x)
unit_map = {'ns': 'ns',
(tools.bytes2str(b'\xc2\xb5') + 's'): 'us',
(tools.bytes2str(b'\xce\xbcs') + 's'): 'us',
'ms': 'ms',
's': 's',
'min': 'm',
'hr': 'h',
'day': 'D'}
return pd.Timedelta(t_data, unit=unit_map[t_unit])
[docs]def convert_from_pandas_timedelta(x):
r"""Covert a pandas.Timedelta instance to a scalar/array with
time units.
Args:
x (pandas.Timedelta): Timedelta variable to convert.
Returns:
object: Equivalent scalar/array with units.
"""
return add_units(x.total_seconds(), 's')
[docs]def convert_julia_unit_string(in_str): # pragma: julia
r"""Convert unit string to version that julia Unitful package can
understand.
Args:
in_str (str): String unit to convert.
Returns:
str: Converted string.
"""
replacements = [('**', '^'),
('days', 'd'),
('day', 'd'),
('degC', '°C'),
('degF', '°F')]
out = in_str
for a, b in replacements:
out = out.replace(a, b)
return out
[docs]def convert_matlab_unit_string(m_str): # pragma: matlab
r"""Convert Matlab unit string to string that the Python package
can understand.
Args:
m_str (str): Matlab units string to convert.
Returns:
str: Converted string.
"""
out = m_str
replacements = {'h': 'hr'}
regex_mu = [tools.bytes2str(b'\xc2\xb5'),
tools.bytes2str(b'\xce\xbcs')]
regex = r'(?P<name>[A-Za-z%s]+)' % ''.join(regex_mu)
for x in re.finditer(regex, m_str):
xdict = x.groupdict()
if xdict['name'] in replacements:
xdict['name'] = replacements[xdict['name']]
out = out[:(x.start())] + xdict['name'] + out[(x.end()):]
return out
[docs]@deprecation.deprecated(deprecated_in="2.0", removed_in="3.0",
current_version=__version__,
details=("This method is no longer necessary and "
"units can be parsed directly"))
def convert_R_unit_string(r_str): # pragma: deprecated
r"""Convert R unit string to string that the Python package can
understand.
Args:
r_str (str): R units string to convert.
Returns:
str: Converted string.
"""
return convert_unit_string(r_str)
[docs]@deprecation.deprecated(deprecated_in="2.0", removed_in="3.0",
current_version=__version__,
details=("This method is no longer necessary and "
"units can be parsed directly"))
def convert_unit_string(orig_str, replacements=None): # pragma: deprecated
r"""Convert unit string to string that the Python package can
understand.
Args:
orig_str (str): Original units string to convert.
replacements (dict, optional): Mapping from unit to another.
Defaults to empty dict.
Returns:
str: Converted string.
"""
return orig_str
[docs]def has_units(obj, check_dimensionless=False):
r"""Determine if a Python object has associated units.
Args:
obj (object): Object to be tested for units.
check_dimensionless (bool, optional): If True, an object with
dimensionless units will return True.
Returns:
bool: True if the object has units, False otherwise.
"""
out = (isinstance(obj, (_unit_quantity, _unit_array))
and not (obj.is_dimensionless()
and (not check_dimensionless)))
# out = hasattr(obj, 'units')
return out
[docs]def get_units(obj, for_language=None):
r"""Get the string representation of the units.
Args:
obj (object): Object to get units for.
for_language (str, optional): Language requesting units.
Returns:
str: Units, empty if input object has none.
"""
if has_units(obj):
out = str(obj.units)
else:
out = ''
if for_language == "R": # pragma: extern
# udunits dosn't support Δ
out = out.replace('Δ', '')
return out
[docs]def get_data(obj):
r"""Get the array/scalar assocaited with the object.
Args:
obj (object): Object to get data for.
Returns:
np.ndarray: Numpy array representation of the underlying data.
"""
if has_units(obj, check_dimensionless=True):
out = obj.value
else:
out = obj
return out
[docs]def add_units(arr, unit_str, **kwargs):
r"""Add units to an array or scalar.
Args:
arr (np.ndarray, float, int): Scalar or array of data to add units to.
unit_str (str): Unit string.
**kwargs: Additional keyword arguments are passed to the unit constructor.
Returns:
Quantity ro QuantityArray: Scalar or array with units.
"""
if is_null_unit(unit_str):
return arr
if has_units(arr):
out = convert_to(arr, unit_str)
elif isinstance(arr, np.ndarray) and (arr.ndim > 0):
out = QuantityArray(arr, unit_str, **kwargs)
else:
out = Quantity(arr, unit_str, **kwargs)
return out
[docs]def are_compatible(units1, units2):
r"""Check if two units are compatible.
Args:
units1 (str): First units string.
units2 (str): Second units string.
Returns:
bool: True if the units are compatible, False otherwise.
"""
# Empty units always compatible
try:
u1 = Units(units1)
u2 = Units(units2)
except UnitsError:
return False
if u1.is_dimensionless() or u2.is_dimensionless():
return True
return u1.is_compatible(u2)
[docs]def is_null_unit(ustr):
r"""Determines if a string is a null unit.
Args:
ustr (str): String to test.
Returns:
bool: True if the string is '' or 'n/a', False otherwise.
"""
return Units(ustr).is_dimensionless()
[docs]def as_unit(ustr):
r"""Get unit object for the string.
Args:
ustr (str): Unit string.
Returns:
Units: Unit object.
Raises:
ValueError: If the string is not a recognized unit.
"""
return Units(ustr)
[docs]def is_unit(ustr):
r"""Determine if a string is a valid unit.
Args:
ustr (str): String representation to test.
Returns:
bool: True if the string is a valid unit. False otherwise.
"""
try:
as_unit(ustr)
return True
except UnitsError:
return False
[docs]def convert_to(arr, new_units):
r"""Convert quantity with units to new units. Objects without units
will be returned with the new units.
Args:
arr (np.ndarray, float, int, Quantity, QuantityArray): Quantity with
or without units.
new_units (str): New units that should be applied.
Returns:
Quantity, QuantityArray: Scalar or array with new units.
"""
if not has_units(arr):
return add_units(arr, new_units)
return arr.to(new_units)
[docs]def get_conversion_function(old_units, new_units):
r"""Get a function that will convert a scalar/array from one unit
to another.
Args:
old_units (str): Units to convert from.
new_units (str): Units to convert to.
Returns:
function: Conversion function that takes scalar/array as input
and returns converted scalar/array.
"""
def fconvert(x):
ux = add_units(x, old_units)
return get_data(convert_to(ux, new_units))
return fconvert