import re
import numpy as np
from yggdrasil import tools, units, rapidjson
from yggdrasil.serialize.AsciiMapSerialize import AsciiMapSerialize
[docs]class CABOSerialize(AsciiMapSerialize):
r"""Class for serializing/deserializing CABO parameter files."""
_seritype = 'cabo'
_schema_subtype_description = ('Serialization of mapping between '
'keys and scalar or array values '
'as used in the CABO parameter files.')
_schema_properties = {
'delimiter': {'type': 'string',
'default': ' = '}}
default_datatype = {'type': 'object'}
file_extensions = ['.cab']
concats_as_str = True
_delimiter = ' = '
_array_fmt = '%5.5f, %5.5f'
[docs] def func_serialize(self, args):
r"""Serialize a message.
Args:
args (dict): Python dictionary to be serialized.
Returns:
bytes, str: Serialized message.
"""
lines = []
for k in args.keys():
v = args[k]
if not isinstance(k, (str, bytes)): # pragma: debug
raise ValueError("Serialization of non-string keys not supported.")
iline = tools.bytes2str(k) + self.delimiter
if isinstance(v, list):
indent = ' ' * len(iline)
arr_lines = []
assert len(v) == 2
assert v[0].shape == v[1].shape
for i in range(len(v[0])):
arr_lines.append(self._array_fmt % (v[0][i], v[1][i]))
v_units = [str(getattr(vv, 'units', '-')) for vv in v]
arr_lines[0] += f"\t! [{'; '.join(v_units)}]"
iline += (',\n' + indent).join(arr_lines)
elif isinstance(v, str):
iline += "\'%s\'" % v
elif hasattr(v, 'units'):
iline += rapidjson.dumps(
float(v), yggdrasil_mode=rapidjson.YM_READABLE)
iline += f'\t! [{v.units}]'
else:
iline += rapidjson.dumps(
v, yggdrasil_mode=rapidjson.YM_READABLE)
lines.append(iline)
return tools.str2bytes('\n'.join(lines))
[docs] @classmethod
def parse_units(cls, x):
r"""Parse units.
Args:
x (str): Unit string.
Returns:
str: Propertly formatted units.
"""
replacements = {"kg N": "kg",
"kg P": "kg",
"kg-1 dry biomass": "kg-1",
"kg CH2O": "kg",
"cel": "degC",
"degC d": "ΔdegC d"}
for k, v in replacements.items():
x = x.replace(k, v)
if x == '-':
x = ""
return x
[docs] def func_deserialize(self, msg):
r"""Deserialize a message.
Args:
msg (bytes): Message to be deserialized.
Returns:
dict: Deserialized Python dictionary.
"""
regex = (r'(?:(?:(?P<name>[^\!]*?)\s*=\s*)|(?:\s+))'
r'(?P<value1>(?:-?[\d.]+(?:\.[\d+])?)|(?:\'[^\']*?\'))?'
r'\s*(?:(?P<unwrapped_units>[^\!\,\']*?))?'
r'(?:\,\s+(?P<value2>[^\!\,]*?)'
r'(?:\,?))?\s*(?:\!\s*(?P<description>[^\[\]]*?)?\s*'
r'(?:\[\s*(?P<units>[^\]]*?)\s*\][^\[\]]*)?)?\s*')
out = dict()
lines = tools.bytes2str(msg.split(self.newline), recurse=True)
k = None
desc = ''
k_units = ''
is_arr = False
def finalize_element():
if is_arr:
out[k][0] = np.array(out[k][0])
out[k][1] = np.array(out[k][1])
if k_units:
if is_arr:
if ';' in k_units:
u1, u2 = k_units.split(';')
u1 = self.parse_units(u1.strip())
u2 = self.parse_units(u2.strip())
out[k][0] = units.add_units(out[k][0], u1)
out[k][1] = units.add_units(out[k][1], u2)
else:
out[k][1] = units.add_units(
out[k][1], self.parse_units(k_units))
else:
out[k] = units.add_units(
out[k], self.parse_units(k_units))
for line in lines:
if (not line.strip()) or line.startswith('**'):
continue
match = re.fullmatch(regex, line)
if not match: # pragma: debug
raise Exception("Failed to parse line: '%s'" % line)
match = match.groupdict()
if match.get('name', None):
finalize_element()
k = match['name']
if isinstance(match.get('value1', None), str):
match['value1'] = match['value1'].strip()
if isinstance(match.get('value2', None), str):
match['value2'] = match['value2'].strip()
is_arr = bool(match.get('value2', None))
desc = ''
k_units = ''
if is_arr:
out[k] = [[], []]
if match.get('description', ''):
desc += match['description']
if match.get('units', ''):
k_units += match['units']
if match['value1']:
if match['value1'].startswith('\'') and match['value1'].endswith('\''):
v1 = match['value1'].strip('\'')
else:
try:
v1 = rapidjson.loads(match['value1'])
except rapidjson.JSONDecodeError:
if match['value1'].endswith('.'):
v1 = rapidjson.loads(match['value1'] + '0')
else: # pragma: debug
raise
if is_arr:
v2 = rapidjson.loads(match['value2'])
out[k][0].append(v1)
out[k][1].append(v2)
else:
out[k] = v1
if out:
finalize_element()
return out
[docs] @classmethod
def concatenate(cls, objects, **kwargs):
r"""Concatenate objects to get object that would be recieved if
the concatenated serialization were deserialized.
Args:
objects (list): Objects to be concatenated.
**kwargs: Additional keyword arguments are ignored.
Returns:
list: Set of objects that results from concatenating those provided.
"""
total = dict()
for x in objects:
total.update(x)
return [total]
[docs] @classmethod
def get_testing_options(cls, **kwargs):
r"""Method to return a dictionary of testing options for this class.
Returns:
dict: Dictionary of variables to use for testing.
"""
out = super(CABOSerialize, cls).get_testing_options()
out['exact_contents'] = False
out['objects'] = [{'CRPNAM': 'Grain maize CSA practicals',
'TBASEM': units.add_units(4.0, 'degC'),
'TEFFMX': units.add_units(30.0, 'degC'),
'TSUMEM': units.add_units(110.0, 'ΔdegC*d'),
'IDSL': 0,
'DLO': units.add_units(-99.0, 'hr'),
'DLC': units.add_units(-99.0, 'hr'),
'TSUM1': units.add_units(900.0, 'ΔdegC*d'),
'TSUM2': units.add_units(800.0, 'ΔdegC*d'),
'DTSMTB': [
units.add_units(
np.array([0.0, 10.0, 30.0, 35.0]), 'degC'),
units.add_units(
np.array([0.0, 0.0, 24.0, 24.0]), 'ΔdegC*d')],
'DVSI': 0.0,
'DVSEND': 2.0,
'SSATB': [
np.array([0.0, 2.0]),
units.add_units(
np.array([0.0, 0.0]), 'ha/kg')]}]
out['empty'] = dict()
out['contents'] = (
b'CRPNAM=\'Grain maize CSA practicals\'\n\n'
b'** emergence\n'
b'TBASEM = 4.0 ! lower threshold temp. for emergence [cel]\n'
b'TEFFMX = 30.0 ! max. eff. temp. for emergence [cel]\n'
b'TSUMEM = 110. ! temperature sum from sowing to emergence [cel d]\n\n'
b'** phenology\n'
b'IDSL = 0 ! indicates whether pre-anthesis development depends\n'
b' ! on temp. (=0), daylength (=1) , or both (=2)\n'
b'DLO = -99.0 ! optimum daylength for development [hr]\n'
b'DLC = -99.0 ! critical daylength (lower threshold) [hr]\n'
b'TSUM1 = 900. ! temperature sum from emergence to anthesis [cel d]\n'
b'TSUM2 = 800. ! temperature sum from anthesis to maturity [cel d]\n'
b'DTSMTB = 0.00, 0.00, ! daily increase in temp. sum\n'
b' 10.00, 0.00, ! as function of av. temp. '
b'[cel; cel d]\n'
b' 30.00, 24.00,\n'
b' 35.00, 24.00\n'
b'DVSI = 0. ! initial DVS\n'
b'DVSEND = 2.00 ! development stage at harvest (= 2.0 at '
b'maturity [-]))\n'
b'SSATB = 0.0, 0.0, ! specific stem area [ha kg-1]\n'
b' 2.0, 0.0 ! as function of DVS\n')
return out