import re
import numpy as np
import pandas as pd
import unyt
from collections import OrderedDict
from yggdrasil import tools, constants
_unit_quantity = unyt.array.unyt_quantity
_unit_array = unyt.array.unyt_array
_ureg_unyt = None
PYTHON_SCALARS_WITH_UNITS = OrderedDict([
(k, tuple(list(v) + [_unit_quantity]))
for k, v in constants.PYTHON_SCALARS.items()])
ALL_PYTHON_ARRAYS_WITH_UNITS = tuple(
list(constants.ALL_PYTHON_ARRAYS) + [_unit_array])
ALL_PYTHON_SCALARS_WITH_UNITS = tuple(
list(constants.ALL_PYTHON_SCALARS) + [_unit_quantity])
[docs]def get_ureg():
r"""Get the unit registry."""
global _ureg_unyt
if _ureg_unyt is None:
_ureg_unyt = unyt.UnitRegistry('mks')
_ureg_unyt.add("ac", 4046.86, dimensions=unyt.dimensions.area,
tex_repr=r"\rm{ac}", offset=0.0, prefixable=False)
_ureg_unyt.add("a", 100.0, dimensions=unyt.dimensions.area,
tex_repr=r"\rm{a}", offset=0.0, prefixable=True)
_ureg_unyt.add("j", 1.0, dimensions=unyt.dimensions.energy,
tex_repr=r"\rm{J}", offset=0.0, prefixable=True)
# _ureg_unyt.add("cel", 1.0, dimensions=unyt.dimensions.temperature,
# tex_repr=r"^\circ\rm{C}", offset=-273.15, prefixable=True)
# _ureg_unyt.add("j", 1.0, dimensions=unyt.dimensions.specific_flux,
# tex_repr=r"\rm{Jy}", prefixable=True)
# _ureg_unyt.add("CH2O", 1.0, dimensions=unyt.dimensions.dimensionless,
# tex_repr=r"\rm{CH2O}", offset=0.0, prefixable=False)
unyt._unit_lookup_table.inv_name_alternatives["acre"] = "ac"
unyt._unit_lookup_table.inv_name_alternatives["are"] = "a"
unyt._unit_lookup_table.inv_name_alternatives["hectare"] = "ha"
unyt._unit_lookup_table.inv_name_alternatives["days"] = "day"
return _ureg_unyt
[docs]def convert_to_pandas_timedelta(x):
r"""Convert variable with time units to a pandas.Timedelta instance.
Args:
x (object): Scalar/array with units to convert to a pandas.Timedelta
instance.
Returns:
pandas.Timedelta: Equivalent Timedelta variable.
"""
assert has_units(x)
t_data = get_data(x)
t_unit = get_units(x)
unit_map = {'ns': 'ns',
(tools.bytes2str(b'\xc2\xb5') + 's'): 'us',
(tools.bytes2str(b'\xce\xbcs') + 's'): 'us',
'ms': 'ms',
's': 's',
'min': 'm',
'hr': 'h',
'day': 'D'}
return pd.Timedelta(t_data, unit=unit_map[t_unit])
[docs]def convert_from_pandas_timedelta(x):
r"""Covert a pandas.Timedelta instance to a scalar/array with
time units.
Args:
x (pandas.Timedelta): Timedelta variable to convert.
Returns:
object: Equivalent scalar/array with units.
"""
return add_units(x.total_seconds(), 's')
[docs]def convert_matlab_unit_string(m_str): # pragma: matlab
r"""Convert Matlab unit string to string that the Python package
can understand.
Args:
m_str (str): Matlab units string to convert.
Returns:
str: Converted string.
"""
out = m_str
replacements = {'h': 'hr'}
regex_mu = [tools.bytes2str(b'\xc2\xb5'),
tools.bytes2str(b'\xce\xbcs')]
regex = r'(?P<name>[A-Za-z%s]+)' % ''.join(regex_mu)
for x in re.finditer(regex, m_str):
xdict = x.groupdict()
if xdict['name'] in replacements:
xdict['name'] = replacements[xdict['name']]
out = out[:(x.start())] + xdict['name'] + out[(x.end()):]
return out
[docs]def convert_R_unit_string(r_str):
r"""Convert R unit string to string that the Python package can
understand.
Args:
r_str (str): R units string to convert.
Returns:
str: Converted string.
"""
return convert_unit_string(r_str)
[docs]def convert_unit_string(orig_str, replacements=None):
r"""Convert unit string to string that the Python package can
understand.
Args:
orig_str (str): Original units string to convert.
replacements (dict, optional): Mapping from unit to another.
Defaults to empty dict.
Returns:
str: Converted string.
"""
if not orig_str.strip():
return ''
out = []
if replacements is None:
replacements = {'h': 'hr',
'hrs': 'hr',
'days': 'day',
'100%': 'percent'}
regex_mu = [tools.bytes2str(b'\xc2\xb5'),
tools.bytes2str(b'\xce\xbcs'),
tools.bytes2str(b'\xc2\xb0'),
r'(?:100\%)']
regex = (r'(?P<paren>\()?(?P<name>[A-Za-z%s]+)'
r'(?:(?:(?:\^)|(?:\*\*))?(?P<exp_paren>\()?(?P<exp>-?[0-9]+)'
r'(?(exp_paren)\)))?'
r'(?(paren)\)|)(?P<op> |(?:\*)|(?:\/))?' % ''.join(regex_mu))
out = ''
if re.fullmatch(r'(?:%s)+' % regex, orig_str.strip()):
for x in re.finditer(regex, orig_str.strip()):
xdict = x.groupdict()
if xdict['name'] in replacements:
xdict['name'] = replacements[xdict['name']]
if xdict['exp']:
out += '({name}**{exp})'.format(**xdict)
else:
out += xdict['name']
if xdict['op']:
if xdict['op'].isspace():
xdict['op'] = '*'
out += xdict['op']
else: # pragma: debug
print(repr(orig_str), type(orig_str))
m = re.search(r'(?:%s)+' % regex, orig_str.strip())
if m:
print(repr(m.group(0)), m.groupdict())
else:
print('no match')
for m in re.finditer(regex, orig_str.strip()):
print(m.group(0), m.groupdict())
raise Exception("Could not standardize units: %s" % repr(orig_str))
return out
[docs]def has_units(obj, check_dimensionless=False):
r"""Determine if a Python object has associated units.
Args:
obj (object): Object to be tested for units.
check_dimensionless (bool, optional): If True, an object with
dimensionless units will return True.
Returns:
bool: True if the object has units, False otherwise.
"""
out = isinstance(obj, (_unit_quantity, _unit_array))
# out = hasattr(obj, 'units')
if ((out and (obj.units == as_unit('dimensionless'))
and (not check_dimensionless))):
out = False
return out
[docs]def get_units(obj):
r"""Get the string representation of the units.
Args:
obj (object): Object to get units for.
Returns:
str: Units, empty if input object has none.
"""
if has_units(obj):
out = str(obj.units)
else:
out = ''
return out
[docs]def get_data(obj):
r"""Get the array/scalar assocaited with the object.
Args:
obj (object): Object to get data for.
Returns:
np.ndarray: Numpy array representation of the underlying data.
"""
if has_units(obj, check_dimensionless=True):
out = obj.to_ndarray()
if out.ndim == 0:
out = out.reshape((1, ))[0]
else:
out = obj
return out
[docs]def add_units(arr, unit_str, dtype=None):
r"""Add units to an array or scalar.
Args:
arr (np.ndarray, float, int): Scalar or array of data to add units to.
unit_str (str): Unit string.
dtype (np.dtype, optional): Numpy data type that should be maintained for
array/qunatity with units. If not provided, this is determined from the
array.
Returns:
unyt.unyt_array: Array with units.
"""
ureg = get_ureg()
unit_str = tools.bytes2str(unit_str)
if is_null_unit(unit_str):
return arr
unit_str = convert_unit_string(unit_str)
if has_units(arr):
return convert_to(arr, unit_str)
if dtype is None:
if isinstance(arr, np.ndarray):
dtype = arr.dtype
else:
dtype = np.array([arr]).dtype
try:
if isinstance(arr, np.ndarray) and (arr.ndim > 0):
out = unyt.unyt_array(arr, unit_str, dtype=dtype,
registry=ureg)
else:
out = unyt.unyt_quantity(arr, unit_str, dtype=dtype,
registry=ureg)
except BaseException:
raise ValueError("Error parsing unit: %s, type(%s)."
% (repr(unit_str), type(unit_str)))
return out
[docs]def are_compatible(units1, units2):
r"""Check if two units are compatible.
Args:
units1 (str): First units string.
units2 (str): Second units string.
Returns:
bool: True if the units are compatible, False otherwise.
"""
# Empty units always compatible
if is_null_unit(units1) or is_null_unit(units2):
return True
if (not is_unit(units1)) or (not is_unit(units2)):
return False
x = add_units(1, units1)
try:
convert_to(x, units2)
except ValueError:
return False
return True
[docs]def is_null_unit(ustr):
r"""Determines if a string is a null unit.
Args:
ustr (str): String to test.
Returns:
bool: True if the string is '' or 'n/a', False otherwise.
"""
if (len(ustr) == 0) or (ustr == 'n/a'):
return True
return False
[docs]def as_unit(ustr):
r"""Get unit object for the string.
Args:
ustr (str): Unit string.
Returns:
unyt.Unit: Unit object.
Raises:
ValueError: If the string is not a recognized unit.
"""
try:
out = unyt.Unit(ustr, registry=get_ureg())
except unyt.exceptions.UnitParseError as e:
raise ValueError(str(e))
return out
[docs]def is_unit(ustr):
r"""Determine if a string is a valid unit.
Args:
ustr (str): String representation to test.
Returns:
bool: True if the string is a valid unit. False otherwise.
"""
ustr = tools.bytes2str(ustr)
if is_null_unit(ustr):
return True
try:
as_unit(ustr)
except ValueError:
return False
return True
[docs]def convert_to(arr, new_units):
r"""Convert quantity with units to new units. Objects without units
will be returned with the new units.
Args:
arr (np.ndarray, float, int, unyt.unyt_array): Quantity with or
without units.
new_units (str): New units that should be applied.
Returns:
unyt.unyt_array: Array with new units.
"""
if is_null_unit(new_units):
return arr
if not has_units(arr):
return add_units(arr, new_units)
new_units = convert_unit_string(new_units)
try:
arr1 = get_data(arr)
dtype = get_data(arr1).dtype
out = arr.to(new_units)
arr2 = get_data(out)
equal = (arr2.dtype == dtype)
if not equal:
out = add_units(arr2, new_units, dtype=dtype)
except unyt.exceptions.UnitConversionError as e:
raise ValueError(str(e))
return out
[docs]def get_conversion_function(old_units, new_units):
r"""Get a function that will convert a scalar/array from one unit
to another.
Args:
old_units (str): Units to convert from.
new_units (str): Units to convert to.
Returns:
function: Conversion function that takes scalar/array as input
and returns converted scalar/array.
"""
def fconvert(x):
ux = add_units(x, old_units)
return get_data(convert_to(ux, new_units))
return fconvert