Source code for omas.omas_ascii

'''save/load from ASCII routines

-------
'''

from .omas_utils import *
from .omas_core import ODS, ODC, force_imas_type


def identify_imas_type(value):
    if isinstance(value, (str, numpy.bytes_, numpy.str_)):
        dtype = dict(type='50 (CHAR_DATA)', dim=1, size=(len(value),))
    elif isinstance(value, (float, numpy.floating)):
        dtype = dict(type='52 (DOUBLE_DATA)', dim=0)
    elif isinstance(value, (int, numpy.integer)):
        dtype = dict(type='51 (INTEGER_DATA)', dim=0)
    elif isinstance(value, numpy.ndarray):
        if 'str' in value.dtype.name:
            dtype = dict(type='50 (CHAR_DATA)', dim=len(value.shape), size=value.shape)
        elif 'float' in value.dtype.name:
            dtype = dict(type='52 (DOUBLE_DATA)', dim=len(value.shape), size=value.shape)
        elif 'int' in value.dtype.name:
            dtype = dict(type='51 (INTEGER_DATA)', dim=len(value.shape), size=value.shape)
        else:
            raise ValueError(str(value.dtype.name) + ' is not a valid IMAS data type')
    elif isinstance(value, ODS):
        dtype = dict(dim=len(value))
    else:
        raise ValueError(str(type(value)) + ' is not a valid IMAS data type')
    return dtype


imas_nan = -9e40


def imas_fmt(value):
    if isinstance(value, (float, numpy.floating)):
        if numpy.isnan(value):
            return '%5.16e' % imas_nan
        else:
            return '%5.16e' % value
    else:
        return '%s' % value


def imas_eval(value):
    tmp = ast.literal_eval(value)
    if tmp == imas_nan:
        return numpy.nan
    else:
        return tmp


def imas_ascii_key_sorter(keys, ods_has_location):
    new_order = [
        'ids_properties.homogeneous_time',
        'ids_properties.version_put.data_dictionary',
        'ids_properties.version_put.access_layer',
        'ids_properties.version_put.access_layer_language',
    ]

    if ods_has_location:

        for key in reversed(new_order):
            if key in keys:
                keys.insert(0, keys.pop(keys.index(key)))

    return keys


# ---------------------------
# save and load OMAS to ASCII
# ---------------------------
[docs]def save_omas_ascii(ods, filename, machine=None, pulse=None, run=None, dir=None): """ Save an ODS to ASCII (follows IMAS ASCII_BACKEND convention) :param ods: OMAS data set :param filename: filename or file descriptor to save to use `None` to save IDSs to multiple files based on machine, pulse, run :param machine: machine name to build filename for saving IDSs to multiple files :param pulse: pulse number to build filename for saving IDSs to multiple files :param run: run number to build filename for saving IDSs to multiple files :param dir: directory where to save multiple IDSs files """ if filename is None and machine is not None and pulse is not None and run is not None: for ds in ods: filename = f'{machine}_{pulse}_{run}_{ds}.ids' if dir: filename = dir + os.sep + filename save_omas_ascii(ods[ds], filename, machine=None, pulse=None, run=None) return elif filename is not None and machine is None and pulse is None and run is None: pass else: raise ValueError('Either `filename` or `machine, pulse, run` can be specified') printd('Saving OMAS data to ASCII: %s' % filename, topic='ascii') ods.satisfy_imas_requirements() ascii_string = [] for path in imas_ascii_key_sorter(ods.pretty_paths(include_structures=True), ods.location): value = ods[path] if isinstance(value, ODS) and not isinstance(value.omas_data, list): continue value = force_imas_type(value) info = identify_imas_type(value) tokens = [] if ods.location: tokens.append(ods.location + '.' + path.replace('.', '/')) else: tokens.append(path.replace('.', '/')) if 'type' in info: tokens.append(' type: ' + info['type']) if 'dim' in info: tokens.append(' dim: %d' % info['dim']) if 'size' in info: tokens.append(' size: %s' % (' '.join(map(str, info['size'])))) if isinstance(value, ODS): pass elif not isinstance(value, numpy.ndarray): tokens.append(imas_fmt(value)) elif len(value.shape) == 1: tokens.append(' '.join(map(imas_fmt, value))) elif len(value.shape) == 2: for row in value.T: tokens.append(' '.join(map(imas_fmt, row))) else: raise ValueError(f'{path} not implemented ASCII support for number of dimensions >2') ascii_string.extend(tokens) ascii_string = '\n'.join(ascii_string) if isinstance(filename, str): with open(filename, 'w') as f: f.write(ascii_string) else: f = filename f.write(ascii_string)
[docs]def load_omas_ascii( filename, machine=None, pulse=None, run=None, dir=None, consistency_check=True, imas_version=omas_rcparams['default_imas_version'] ): """ Load an ODS from ASCII (follows IMAS ASCII_BACKEND convention) :param filename: filename or file descriptor to load from use `None` to load IDSs from multiple files based on machine, pulse, run :param machine: machine name to build filename for loading IDSs from multiple files :param pulse: pulse number to build filename for loading IDSs from multiple files :param run: run number to build filename for loading IDSs from multiple files :param dir: directory from where to load multiple IDSs files :param consistency_check: verify that data is consistent with IMAS schema :param imas_version: imas version to use for consistency check :return: OMAS data set """ if filename is None and machine is not None and pulse is not None and run is not None: filename = f'{machine}_{pulse}_{run}_*.ids' if dir: filename = dir + os.sep + filename ods = ODS(consistency_check=True, imas_version=omas_rcparams['default_imas_version']) for filename in glob.glob(filename): ods.update(load_omas_ascii(filename, machine=None, pulse=None, run=None)) return ods elif filename is not None and machine is None and pulse is None and run is None: pass else: raise ValueError('Either `filename` or `machine, pulse, run` can be specified') printd('Loading OMAS data from ASCCI: %s' % filename, topic='ascii') if isinstance(filename, str): with open(filename, 'r') as f: ascii_string = f.read() else: ascii_string = filename.read() tokens = OrderedDict() value_lines = 0 path = None token = None for line in ascii_string.split('\n'): if path is None: path = line.strip().replace('/', '.') tokens[path] = token = {'path': path} continue elif value_lines: token.setdefault('value', []).append(line) value_lines -= 1 if value_lines == 0: path = None continue elif line.startswith(' type: '): token['type'] = line.split('type:')[1].strip() elif line.startswith(' dim: '): token['dim'] = int(line.split('dim:')[1].strip()) elif line.startswith(' size: '): token['size'] = tuple(map(int, line.split('size:')[1].strip().split())) # scalar INT or FLOAT if 'type' in token and 'dim' in token and token['dim'] == 0: value_lines = 1 # string elif 'type' in token and token['type'] == '50 (CHAR_DATA)' and 'dim' in token and token['dim'] == 1 and 'size' in token: value_lines = 1 # 1D arrays elif 'type' in token and 'dim' in token and token['dim'] == 1 and 'size' in token: value_lines = 1 # 2D arrays elif 'type' in token and 'dim' in token and token['dim'] == 2 and 'size' in token: value_lines = token['size'][1] # ODS elif 'type' not in token and 'dim' in token: value_lines = 0 path = None ods = ODS(imas_version=imas_version, consistency_check=consistency_check) for token in tokens.values(): path = token['path'] # scalar INT or FLOAT if 'type' in token and 'dim' in token and token['dim'] == 0: value = imas_eval(token['value'][0]) # string elif 'type' in token and token['type'] == '50 (CHAR_DATA)' and 'dim' in token and token['dim'] == 1: value = token['value'][0] # 1D arrays elif 'type' in token and 'dim' in token and token['dim'] == 1 and 'size' in token: value = numpy.array(list(map(imas_eval, token['value'][0].split()))) # 2D arrays elif 'type' in token and 'dim' in token and token['dim'] == 2 and 'size' in token: value = numpy.array([list(map(imas_eval, row.split())) for row in token['value']]).T # ODS elif 'type' not in token and 'dim' in token: continue ods[path] = value return ods
[docs]def through_omas_ascii(ods, method=['function', 'class_method'][1], one_or_many_files=['one', 'many'][1]): """ Test save and load OMAS ASCII :param ods: ods :return: ods """ if one_or_many_files == 'one': filename = omas_testdir(__file__) + os.sep + 'test.ids' machine = None pulse = None run = None dir = None else: if method == 'function': filename = None else: filename = 'ascii' machine = 'TEST' pulse = 1 run = 0 dir = omas_testdir(__file__) ods = copy.deepcopy(ods) # make a copy to make sure save does not alter entering ODS if method == 'function': save_omas_ascii(ods, filename, machine, pulse, run, dir) ods1 = load_omas_ascii(filename, machine, pulse, run, dir) else: ods.save(filename, machine, pulse, run, dir) ods1 = ODS().load(filename, machine, pulse, run, dir) return ods1