Source code for cis_interface.tests.test_schema

import os
import pprint
import tempfile
from cis_interface.tests import assert_raises, assert_equal
from cis_interface import schema


_normalize_objects = [
    ({'models': [{'name': 'modelA',
                  'language': 'c',
                  'args': 'model.c',
                  'outputs': [{'name': 'outputA',
                               'column_names': ['a', 'b'],
                               'column_units': ['cm', 'g']}],
                  'working_dir': os.getcwd()}],
      'connections': [{'inputs': 'outputA',
                       'outputs': 'fileA.txt',
                       'working_dir': os.getcwd()}]},
     {'models': [{'name': 'modelA',
                  'language': 'c',
                  'args': ['model.c'],
                  'inputs': [], 'outputs': [{'name': 'outputA',
                                             'commtype': 'default',
                                             'datatype': {'type': 'bytes'}}],
                  'working_dir': os.getcwd()}],
      'connections': [{'inputs': [{'name': 'outputA',
                                   'datatype': {'type': 'bytes'},
                                   'commtype': 'default'}],
                       'outputs': [{'name': 'fileA.txt',
                                    'filetype': 'binary',
                                    'working_dir': os.getcwd(),
                                    'field_names': ['a', 'b'],
                                    'field_units': ['cm', 'g']}]}]})]
                                    

[docs]def test_SchemaRegistry(): r"""Test schema registry.""" assert_raises(ValueError, schema.SchemaRegistry, {}) x = schema.SchemaRegistry() assert_equal(x == 0, False) fname = os.path.join(tempfile.gettempdir(), 'temp.yml') with open(fname, 'w') as fd: fd.write('') assert_raises(Exception, x.load, fname) os.remove(fname)
[docs]def test_default_schema(): r"""Test getting default schema.""" s = schema.get_schema() assert(s is not None) schema.clear_schema() assert(schema._schema is None) s = schema.get_schema() assert(s is not None) for k in s.keys(): assert(isinstance(s[k].subtypes, list)) assert(isinstance(s[k].classes, list)) for ksub in s[k].classes: s[k].get_subtype_properties(ksub)
[docs]def test_create_schema(): r"""Test creating new schema.""" fname = 'test_schema.yml' if os.path.isfile(fname): # pragma: debug os.remove(fname) # Test saving/loading schema s0 = schema.create_schema() s0.save(fname) assert(s0 is not None) assert(os.path.isfile(fname)) s1 = schema.get_schema(fname) assert_equal(s1.schema, s0.schema) # assert_equal(s1, s0) os.remove(fname) # Test getting schema s2 = schema.load_schema(fname) assert(os.path.isfile(fname)) assert_equal(s2, s0) os.remove(fname)
[docs]def test_cdriver2filetype_error(): r"""Test errors in cdriver2filetype.""" assert_raises(ValueError, schema.cdriver2filetype, 'invalid')
[docs]def test_standardize(): r"""Test standardize.""" vals = [(False, ['inputs', 'outputs'], ['_file'], {'input': 'inputA', 'output_file': 'outputA'}, {'inputs': [{'name': 'inputA'}], 'outputs': [{'name': 'outputA'}]}), (True, ['input', 'output'], ['_file'], {'inputs': 'inputA', 'output_files': 'outputA'}, {'input': [{'name': 'inputA'}], 'output': [{'name': 'outputA'}]})] for is_singular, keys, suffixes, x, y in vals: schema.standardize(x, keys, suffixes=suffixes, is_singular=is_singular) assert_equal(x, y)
[docs]def test_normalize(): r"""Test normalization of legacy formats.""" s = schema.get_schema() for x, y in _normalize_objects: a = s.normalize(x, backwards_compat=True) try: assert_equal(a, y) except BaseException: # pragma: debug pprint.pprint(a) pprint.pprint(y) raise
[docs]def test_cdriver2commtype_error(): r"""Test error when invalid driver supplied.""" assert_raises(ValueError, schema.cdriver2commtype, 'invalid')