'''ODS class and save/load from pickle routines
-------
'''
from .omas_utils import *
from .omas_utils import __version__, _extra_structures
# fmt: off
__all__ = [
'ODS', 'ODC', 'ODX',
'CodeParameters', 'codeparams_xml_save', 'codeparams_xml_load',
'ods_sample', 'different_ods', 'omas_structure',
'save_omas_pkl', 'load_omas_pkl', 'through_omas_pkl',
'save_omas_json', 'load_omas_json', 'through_omas_json',
'save_omas_mongo', 'load_omas_mongo', 'through_omas_mongo',
'save_omas_hdc', 'load_omas_hdc', 'through_omas_hdc',
'save_omas_nc', 'load_omas_nc', 'through_omas_nc',
'save_omas_h5', 'load_omas_h5', 'through_omas_h5',
'save_omas_ascii', 'load_omas_ascii', 'through_omas_ascii',
'save_omas_ds', 'load_omas_ds', 'through_omas_ds',
'load_omas_dx', 'save_omas_dx', 'through_omas_dx', 'ods_2_odx', 'odx_2_ods',
'save_omas_imas', 'load_omas_imas', 'through_omas_imas', 'load_omas_iter_scenario', 'browse_imas',
'save_omas_s3', 'load_omas_s3', 'through_omas_s3', 'list_omas_s3', 'del_omas_s3',
'machine_expression_types', 'machines', 'machine_mappings', 'load_omas_machine',
'machine_mapping_function', 'test_machine_mapping_functions', 'mdstree', 'mdsvalue',
'omas_dir', 'imas_versions', 'latest_imas_version', 'omas_info', 'omas_info_node', 'get_actor_io_ids',
'omas_rcparams', 'rcparams_environment', 'omas_testdir', '__version__',
'latexit', 'OmasDynamicException'
]
# fmt: on
# List of functions that can be added by third-party Python
# packages for processing input data that will go in an ODS
# This is necessary because ODSs should only contain [int (arrays), floats (arrays), strings]
# It is used for example by OMFIT to process OMFITexpressions
input_data_process_functions = []
def force_imas_type(value):
"""
IMAS supports (arrays of) integers, floats and strings
:param value: input value
:return: input value converted to be IMAS compatible
"""
# lists are saved as numpy arrays, and 0D numpy arrays as scalars
for function in input_data_process_functions:
value = function(value)
if isinstance(value, list):
value = numpy.array(value)
if 'DataArray' in value.__class__.__name__:
import xarray
if isinstance(value, xarray.DataArray):
value = value.values
if isinstance(value, numpy.ndarray) and not len(value.shape):
value = value.item()
if isinstance(value, (numpy.bytes_, numpy.str_)):
value = value.item()
elif isinstance(value, (float, numpy.floating)):
value = float(value)
elif isinstance(value, (int, numpy.integer)):
value = int(value)
if isinstance(value, str):
pass
elif isinstance(value, bytes):
value = value.decode('utf-8', errors='ignore')
return value
_consistency_warnings = {}
def consistency_checker(location, value, info, consistency_check, imas_version):
"""
Print warnings or raise errors if object does not satisfy IMAS data dictionary
Converts numeric data to INT/FLOAT depending on IMAS specifications
:param value: value to check consistency of
:param info: output of omas_info_node
:param consistency_check: True, False, 'warn'
:param imas_version: IMAS version
:return: value
"""
# force type consistent with data dictionary
txt = ''
if is_uncertain(value) or 'data_type' not in info:
pass
elif isinstance(value, numpy.ndarray):
if 'STRUCT_ARRAY' in info['data_type'] and not len(value):
value = ODS()
value.omas_data = []
elif 'FLT' in info['data_type']:
value = value.astype(float)
elif 'INT' in info['data_type']:
value = value.astype(int)
elif 'STR' in info['data_type']:
value = value.astype(str)
elif isinstance(value, (int, float, numpy.integer, numpy.floating)):
if 'FLT' in info['data_type']:
value = float(value)
elif 'INT' in info['data_type']:
value = int(value)
elif 'STR' in info['data_type']:
value = str(value)
elif isinstance(value, bytes):
if 'STR' in info['data_type']:
value = b2s(value)
# structure type is respected check type
if 'data_type' in info and info['data_type'] in ['STRUCTURE', 'STRUCT_ARRAY'] and not isinstance(value, ODS):
txt = f'{location} is of type {type(value)} but this should be an ODS'
# check type
elif not (
isinstance(value, (int, float, str, numpy.ndarray, uncertainties.core.Variable))
or value is None
or isinstance(value, (CodeParameters, ODS))
):
txt = f'{location} is of type {type(value)} but supported types are: string, float, int, array'
# check consistency for scalar entries
elif 'data_type' in info and '_0D' in info['data_type'] and isinstance(value, numpy.ndarray):
txt = f'{location} is of type {type(value)} must be a scalar of type {info["data_type"]}'
# check consistency for number of dimensions
elif (
'coordinates' in info
and len(info['coordinates'])
and (not isinstance(value, numpy.ndarray) or len(value.shape) != len(info['coordinates']))
):
txt = f'{location} shape {numpy.asarray(value).shape} is inconsistent with coordinates: {info["coordinates"]}'
if len(txt) and consistency_check is True:
raise ValueError(txt)
elif 'lifecycle_status' in info and info['lifecycle_status'] in ['obsolescent']:
txt = f'{o2u(location)} is in {info["lifecycle_status"].upper()} state for IMAS {imas_version}'
if consistency_check and imas_version not in _consistency_warnings or txt not in _consistency_warnings[imas_version]:
_consistency_warnings.setdefault(imas_version, []).append(txt)
else:
txt = ''
return value, txt
def _handle_extension(*args, **kw):
if args[0] == 'ascii':
ext = 'ascii'
args = list(args)
args[0] = None
args = tuple(args)
elif '/' not in args[0] and '.' not in os.path.split(args[0])[1]:
ext = args[0]
args = args[1:]
else:
ext = os.path.splitext(args[0])[-1].strip('.')
if not ext:
ext = 'pkl'
# use `ids` extension for IMAS ASCII format
if ext == 'ids':
ext = 'ascii'
# understand machine names as `machine` extension
if ext in machines(None):
args = tuple([ext] + list(args))
ext = 'machine'
return ext, args
omas_ods_attrs = [
'_consistency_check',
'_imas_version',
'_cocos',
'_cocosio',
'_coordsio',
'_unitsio',
'_uncertainio',
'_dynamic',
'_parent',
]
[docs]class OmasDynamicException(RuntimeError):
"""
Exception raised when dynamic data fetching fails
"""
pass
[docs]class ODS(MutableMapping):
"""
OMAS Data Structure class
"""
def __init__(
self,
imas_version=omas_rcparams['default_imas_version'],
consistency_check=omas_rcparams['consistency_check'],
cocos=omas_rcparams['cocos'],
cocosio=None,
coordsio=None,
unitsio=None,
uncertainio=None,
dynamic=None,
):
"""
:param imas_version: IMAS version to use as a constrain for the nodes names
:param consistency_check: whether to enforce consistency with IMAS schema
:param cocos: internal COCOS representation (this can only be set when the object is created)
:param cocosio: COCOS representation of the data that is read/written from/to the ODS
:param coordsio: ODS with coordinates to use for the data that is read/written from/to the ODS
:param unitsio: ODS will return data with units if True
:param uncertainio: ODS will return data with uncertainties if True
:param dynamic: internal keyword used for dynamic data loading
"""
self.omas_data = None
self._consistency_check = consistency_check
if consistency_check and imas_version not in imas_versions:
raise ValueError("Unrecognized IMAS version `%s`. Possible options are:\n%s" % (imas_version, imas_versions.keys()))
self._parent = None
self.imas_version = imas_version
self.cocos = cocos
self.cocosio = cocosio
self.coordsio = coordsio
self.unitsio = unitsio
self.uncertainio = uncertainio
self.dynamic = dynamic
[docs] def homogeneous_time(self, key='', default=True):
"""
Dynamically evaluate whether time is homogeneous or not
NOTE: this method does not read ods['ids_properties.homogeneous_time'] instead it uses the time info to figure it out
:param default: what to return in case no time basis is defined
:return: True/False or default value (True) if no time basis is defined
"""
if not len(self.location) and not len(key):
raise ValueError('homogeneous_time() can not be called on a top-level ODS')
extra_info = {}
self.time(key=key, extra_info=extra_info)
homogeneous_time = extra_info['homogeneous_time']
if homogeneous_time is None:
return default
else:
return homogeneous_time
[docs] def time(self, key='', extra_info=None):
"""
Return the time information for a given ODS location
:param key: ods location
:param extra_info: dictionary that will be filled in place with extra information about time
:return: time information for a given ODS location (scalar or array)
"""
if extra_info is None:
extra_info = {}
location = self.location
# subselect on requested key
subtree = p2l(location + '.' + key)
# get time nodes from data structure definitions
loc = p2l(subtree)
if not loc:
raise LookupError('Must specify a location in the ODS to get the time of')
utimes_ds = [i2o(k) for k in omas_times(self.imas_version) if k.startswith(loc[0] + '.')]
# get time nodes with actual numbers for indexes of arrays of structures and identify time index
times_ds = list(map(lambda item: u2o(item, l2o(subtree)), utimes_ds))
try:
time_array_index = int(re.sub('.*\.([0-9]+)\.time.*', r'\1', ' '.join(times_ds)))
except Exception:
time_array_index = None
try:
try:
# traverse ODS upstream until time information is found
time = {}
for sub in [subtree[:k] for k in range(len(subtree), 0, -1)]:
times_sub_ds = [k for k in utimes_ds if k.startswith(l2u(sub))]
this_subtree = l2o(sub)
# get time data from ods
times = {}
n = len(location)
for item in times_sub_ds:
otem = u2o(item, this_subtree)[n:]
if otem.replace(':', '0') not in self:
continue
try:
time = self.__getitem__(otem, None) # traverse ODS
if isinstance(time, numpy.ndarray):
if time.size == 0:
continue
# if time is still multidimensional
# (eg. because we are querying the time across different diagnostic channels)
# squash the multidimensional time arrays if they are all the same
if len(time.shape) > 1:
time = numpy.reshape(time, (-1, time.shape[-1]))
if all(numpy.allclose(time[0], t) for t in time[1:]):
time = time[0]
times[item] = time
except ValueError as _excp:
if 'has no data' in repr(_excp):
pass
else:
# return False if time is not homogeneous
extra_info['homogeneous_time'] = False
return None
times_values = list(times.values())
extra_info['location'] = times.keys()
# no time data defined
if not len(times_values):
extra_info['homogeneous_time'] = None
return None
# We crossed [:] or something and picked up a 2D time array
elif any(len(numpy.asarray(time).shape) > 1 for time in times_values):
# Make a 1D reference time0 that can be comapred against other time arrays
time0 = list(times.values())[0]
# Collapse extra dimensions, assuming time is the last one. If it isn't, this will fail.
while len(time0.shape) > 1:
time0 = numpy.take(time0, 0, axis=0)
if all(time.size == time0.size for time in times.values()):
for time in times.values():
# Make sure all time arrays are close to the time0 we identified
assert abs(time - time0).max() < 1e-7
extra_info['homogeneous_time'] = True
if isinstance(time0, (float, int)):
return time0
elif time_array_index is not None:
return time0[time_array_index]
return time0
else: # Similar to ValueError exception caught above
extra_info['homogeneous_time'] = False
return None
# if the time entries that are all consistent with one another
elif all([len(numpy.asarray(time).shape) == 1] for time in times_values):
# if the time entries that are all consistent with one another
if all(
[
numpy.array_equiv(times_values[0], time) and numpy.allclose(times_values[0], time)
for time in times_values[1:]
]
):
time = times_values[0]
extra_info['homogeneous_time'] = True
if isinstance(time, (float, int)):
return time
elif time_array_index is not None:
return time[time_array_index]
return time
# if the time entries are not consistent with one another
else:
extra_info['homogeneous_time'] = False
return None
# We should never end up here
else:
raise ValueError(
f'Error handling time in OMAS for `{l2o(subtree)}`:\n' + '\n'.join([f'{k}:{v}' for k, v in times.items()])
)
except Exception:
raise
except Exception as _excp:
raise _excp.__class__(f'Error setting time for `{l2o(subtree)}`')
# We should never end up here
raise ValueError(f'Error handling time in OMAS for `{l2o(subtree)}`')
[docs] def slice_at_time(self, time=None, time_index=None):
"""
method for selecting a time slice from an time-dependent ODS (NOTE: this method operates in place)
:param time: time value to select
:param time_index: time index to select (NOTE: time_index has precedence over time)
:return: modified ODS
"""
# set time_index for parent and children
if 'time' in self and isinstance(self['time'], numpy.ndarray):
if time_index is None:
time_index = numpy.argmin(abs(self['time'] - time))
if (time - self['time'][time_index]) != 0.0:
printe('%s sliced at %s instead of requested time %s' % (self.location, self['time'][time_index], time))
time = self['time'][time_index]
if time is None:
time = self['time'][time_index]
# loop over items
for item in self.keys():
# time (if present) is treated last
if item == 'time':
continue
# identify time-dependent data
info = omas_info_node(o2u(self.ulocation + '.' + str(item)))
if 'coordinates' in info and any(k.endswith('.time') for k in info['coordinates']):
# time-dependent arrays
if not isinstance(self.getraw(item), ODS):
self[item] = numpy.atleast_1d(self[item][time_index])
# time-depentend list of ODSs
elif isinstance(self[item].omas_data, list) and len(self[item]) and 'time' in self[item][0]:
if time_index is None:
raise ValueError('`time` array is not set for `%s` ODS' % self.ulocation)
tmp = self[item][time_index]
self.getraw(item).clear()
self.getraw(item)[0] = tmp
# go deeper inside ODSs that do not have time info
elif isinstance(self.getraw(item), ODS):
self.getraw(item).slice_at_time(time=time, time_index=time_index)
# treat time
if 'time' in self:
self['time'] = numpy.atleast_1d(self['time'][time_index])
return self
[docs] def time_index(self, time, key=''):
"""
Return the index of the closest time-slice for a given ODS location
:param time: time in second
:param key: ods location
:return: index (integer) of the closest time-slice
"""
time_array = self.time(key=key)
if not isinstance(time_array, numpy.ndarray):
raise ValueError(l2i(p2l(self.location + '.' + key)) + " does not have a array of times")
return numpy.argmin(abs(time - numpy.atleast_1d(time_array)))
@property
def parent(self):
try:
return self._parent()
except TypeError:
return None
except AttributeError:
self._parent = None
return None
@parent.setter
def parent(self, value):
if value is None:
self._parent = None
else:
self._parent = weakref.ref(value)
@property
def location(self):
"""
Property which returns instance of parent ODS
"""
parent = self.parent
if isinstance(parent, ODC):
return ''
elif parent is None:
return ''
else:
parent_location = parent.location
loc = None
if isinstance(parent.omas_data, list):
for k, value in enumerate(parent.omas_data):
if value is self:
loc = k
break
elif isinstance(parent.omas_data, dict):
for key in parent.omas_data:
if parent.omas_data[key] is self:
loc = key
break
if loc is None:
return ''
if parent_location:
return parent_location + '.' + str(loc)
else:
return str(loc)
@property
def top(self):
"""
Property which returns instance of top level ODS
"""
top = self
parent = self.parent
while parent is not None:
top = parent
parent = parent.parent
return top
@property
def structure(self):
"""
Property which returns structure of current ODS
"""
return imas_structure(self.imas_version, self.location)
@property
def imas_version(self):
"""
Property that returns the imas_version of this ods
:return: string with imas_version
"""
if not hasattr(self, '_imas_version'):
self._imas_version = omas_rcparams['default_imas_version']
top = self.top
if top._imas_version is None:
top._imas_version = omas_rcparams['default_imas_version']
self._imas_version = top._imas_version
return self._imas_version
@imas_version.setter
def imas_version(self, imas_version_value):
if imas_version_value is None:
imas_version_value = omas_rcparams['default_imas_version']
self.top._imas_version = imas_version_value
@property
def consistency_check(self):
"""
property that returns whether consistency with IMAS schema is enabled or not
:return: True/False/'warn'/'drop'/'strict' or a combination of those strings
"""
if not hasattr(self, '_consistency_check'):
self._consistency_check = omas_rcparams['consistency_check']
return self._consistency_check
@consistency_check.setter
def consistency_check(self, consistency_value):
"""
property that sets whether consistency with IMAS schema is enabled or not
:param consistency_value: True/False/'warn'/'drop'/'strict' or a combination of those strings
"""
if not consistency_value and not self._consistency_check:
return
old_consistency_check = self._consistency_check
try:
# set ._consistency_check for this ODS
self._consistency_check = consistency_value
# set .consistency_check
for item in list(self.keys(dynamic=0)):
if isinstance(self.getraw(item), ODS) and 'code.parameters' in self.getraw(item).location:
# consistency_check=True makes sure that code.parameters is of type CodeParameters
if consistency_value:
tmp = CodeParameters()
tmp.update(self.getraw(item))
self.setraw(item, tmp)
else:
continue
else:
consistency_value_propagate = consistency_value
if consistency_value:
location = self.location
structure = imas_structure(self.imas_version, location)
if location.endswith('.ids_properties') and item == 'occurrence':
continue
else:
structure_key = item if not isinstance(item, int) else ':'
strict_fail = False
if (
isinstance(consistency_value, str)
and 'strict' in consistency_value
and structure_key in structure
and p2l(location + '.%s' % item)[0] in _extra_structures
and o2i(o2u(location + '.%s' % item)) in _extra_structures[p2l(location + '.%s' % item)[0]]
):
strict_fail = True
if not strict_fail and structure_key in structure:
structure[structure_key]
else:
options = list(structure.keys())
if len(options) == 1 and options[0] == ':':
options = 'A numerical index is needed with n>=0'
else:
if len(options) > 5:
options = {
option: difflib.SequenceMatcher(None, structure_key, option).ratio() for option in options
}
index = numpy.argsort(list(options.values())).astype(int)
options = list(numpy.array(list(options.keys()))[index[-5:]][::-1]) + ['...']
options = 'Did you mean: ' + ', '.join(options)
txt = 'IMAS %s location: %s' % (self.imas_version, location + '.' + structure_key)
if isinstance(consistency_value, str) and ('warn' in consistency_value or 'drop' in consistency_value):
if 'warn' in consistency_value:
if 'drop' in consistency_value:
printe(f'Dropping invalid {txt}')
else:
printe(f'Invalid {txt}')
consistency_value_propagate = False
else:
raise LookupError(underline_last(f'Invalid {txt}', len('LookupError: ')) + '\n' + options)
if isinstance(consistency_value, str) and 'drop' in consistency_value:
del self[item]
continue
# check that value is consistent
if not isinstance(self.getraw(item), ODS):
location = l2o([location] + [item])
info = omas_info_node(o2u(location), imas_version=self.imas_version)
value, txt = consistency_checker(location, self.getraw(item), info, consistency_value, self.imas_version)
if not len(txt):
pass
elif isinstance(consistency_value, str) and ('warn' in consistency_value or 'drop' in consistency_value):
if 'warn' in consistency_value:
if 'drop' in consistency_value:
printe(f'Dropping invalid {txt}')
else:
printe(f'Invalid {txt}')
if isinstance(consistency_value, str) and 'drop' in consistency_value:
del self[item]
continue
if value is not self.getraw(item):
self.setraw(item, value)
# propagate consistency check
if isinstance(self.getraw(item), ODS):
self.getraw(item).consistency_check = consistency_value_propagate
except Exception as _excp:
# restore existing consistency_check value in case of error
if old_consistency_check != consistency_value:
for item in self.keys(dynamic=0):
if isinstance(self.getraw(item), ODS):
self.getraw(item).consistency_check = old_consistency_check
self.consistency_check = old_consistency_check
raise # (LookupError('Consistency check failed: %s' % repr(_excp)))
@property
def cocos(self):
"""
property that tells in what COCOS format the data is stored internally of the ODS
"""
if not hasattr(self, '_cocos'):
self._cocos = omas_rcparams['cocos']
top = self.top
if top._cocos is None:
top._cocos = omas_rcparams['cocos']
self._cocos = top._cocos
return self._cocos
@cocos.setter
def cocos(self, cocos_value):
if cocos_value is None:
cocos_value = omas_rcparams['cocos']
self.top._cocos = cocos_value
@property
def cocosio(self):
"""
property that tells in what COCOS format the data will be input/output
"""
if not hasattr(self, '_cocosio'):
self._cocosio = omas_rcparams['cocos']
top = self.top
if top._cocosio is None:
top._cocosio = omas_rcparams['cocos']
self._cocosio = top._cocosio
return self._cocosio
@cocosio.setter
def cocosio(self, cocosio_value):
if cocosio_value is None:
cocosio_value = omas_rcparams['cocos']
self.top._cocosio = cocosio_value
@property
def unitsio(self):
"""
property that if data should be returned with units or not
"""
if not hasattr(self, '_unitsio'):
self._unitsio = {}
top = self.top
if top._unitsio is None:
top._unitsio = {}
self._unitsio = top._unitsio
return self._unitsio
@unitsio.setter
def unitsio(self, unitsio_value):
if unitsio_value is None:
unitsio_value = {}
self.top._unitsio = unitsio_value
@property
def uncertainio(self):
"""
property that if data should be returned with units or not
"""
if not hasattr(self, '_uncertainio'):
self._uncertainio = {}
top = self.top
if top._uncertainio is None:
top._uncertainio = {}
self._uncertainio = top._uncertainio
return self._uncertainio
@uncertainio.setter
def uncertainio(self, uncertainio_value):
if uncertainio_value is None:
uncertainio_value = {}
self.top._uncertainio = uncertainio_value
@property
def coordsio(self):
"""
property that tells in what COCOS format the data will be input/output
"""
if not hasattr(self, '_coordsio'):
self._coordsio = {}
top = self.top
if top._coordsio is None:
top._coordsio = {}
self._coordsio = top._coordsio
return self._coordsio
@coordsio.setter
def coordsio(self, coordsio_value):
if coordsio_value is None:
coordsio_value = {}
self.top._coordsio = coordsio_value
@property
def dynamic(self):
"""
property that point to dynamic_ODS object
"""
if not hasattr(self, '_dynamic'):
self._coordsio = None
top = self.top
self._dynamic = top._dynamic
return self._dynamic
@property
def active_dynamic(self):
"""
property that point to dynamic_ODS object and says if it is active
"""
dynamic = self.dynamic
if dynamic and dynamic.active:
return dynamic
@dynamic.setter
def dynamic(self, dynamic_value):
self.top._dynamic = dynamic_value
@property
def ulocation(self):
"""
:return: string with location of this object in universal ODS path format
"""
return o2u(self.location)
def _validate(self, value, structure, go_deep=False):
"""
Validate that the value is consistent with the provided structure field
:param value: sub-tree to be checked
:param structure: reference structure
:param go_deep: check value up to its leaves
"""
for key in value.keys(dynamic=0):
structure_key = o2u(key)
if go_deep and isinstance(value[key], ODS) and value[key].consistency_check:
value._validate(value[key], structure[structure_key])
else:
structure[structure_key]
def __setitem__(self, key, value):
# handle individual keys as well as full paths
key = p2l(key)
if not len(key):
return self
if isinstance(key[0], int):
# negative numbers are used to address arrays of structures from the end
if key[0] < 0:
if self.omas_data is None:
key[0] = 0
elif isinstance(self.omas_data, list):
if not len(self.omas_data):
key[0] = 0
else:
key[0] = len(self.omas_data) + key[0]
else:
# '+' is used to append new entry in array structure
if key[0] == '+':
if self.omas_data is None:
key[0] = 0
elif isinstance(self.omas_data, list):
key[0] = len(self.omas_data)
# ':' is used to slice
elif ':' in key[0]:
if not self.keys():
if key[0] == ':':
key[0] = [0]
else:
key[0] = list(range(slice(*map(lambda x: int(x.strip()) if x.strip() else None, key[0].split(':'))).stop))
else:
key[0] = slice(*map(lambda x: int(x.strip()) if x.strip() else None, key[0].split(':')))
# handle data slicing on write
if isinstance(key[0], list):
for k in key[0]:
self.__setitem__([k] + key[1:], value)
return
elif isinstance(key[0], slice):
for k in self.keys()[key[0]]:
self.__setitem__([k] + key[1:], value)
return
# handle dynamic path creation for .code.parameters leaf
if (
len(key) == 1
and key[0] == 'parameters'
and (self.location.endswith('.code') or not self.location)
and not isinstance(value, str)
):
pass_on_value = value
value = CodeParameters()
value.update(pass_on_value)
# if the user has entered path rather than a single key
elif len(key) > 1:
pass_on_value = value
if key[0] == 'parameters' and (self.location.endswith('.code') or not self.location) and not isinstance(value, str):
value = CodeParameters()
value[key[1:]] = pass_on_value
else:
value = self.same_init_ods(cls=ODS)
# full path where we want to place the data
location = l2o([self.location, key[0]])
# perform consistency check with IMAS structure
if self.consistency_check and '.code.parameters.' not in location:
structure_key = key[0] if not isinstance(key[0], int) else ':'
try:
structure = imas_structure(self.imas_version, location)
if isinstance(value, ODS):
if value.omas_data is None and not len(structure) and '.code.parameters' not in location:
raise ValueError('`%s` has no data' % location)
self._validate(value, structure)
if value.omas_data is None:
if ':' in structure:
value.omas_data = []
elif len(structure):
value.omas_data = {}
except (LookupError, TypeError):
txt = 'Not a valid IMAS %s location: %s' % (self.imas_version, location)
if isinstance(self.consistency_check, str) and 'warn' in self.consistency_check:
printe(txt)
if isinstance(value, ODS):
value.consistency_check = False
elif self.consistency_check:
try:
options = list(imas_structure(self.imas_version, self.location).keys())
except KeyError:
raise LookupError(txt)
if len(options) == 1 and options[0] == ':':
options = 'A numerical index is needed with n>=0'
else:
if len(options) > 5:
options = {option: difflib.SequenceMatcher(None, structure_key, option).ratio() for option in options}
index = numpy.argsort(list(options.values())).astype(int)
options = list(numpy.array(list(options.keys()))[index[-5:]][::-1]) + ['...']
options = 'Did you mean: ' + ', '.join(options)
raise LookupError(underline_last(txt, len('LookupError: ')) + '\n' + options)
# check what container type is required and if necessary switch it
if isinstance(self, ODC):
pass
elif not self.omas_data or not len(self.omas_data):
if isinstance(key[0], int):
if not isinstance(self.omas_data, list):
self.omas_data = []
else:
if not isinstance(self.omas_data, dict):
self.omas_data = {}
elif isinstance(key[0], int) and not isinstance(self.omas_data, list):
raise TypeError('Cannot convert from dict to list once ODS has data')
elif isinstance(key[0], str) and not isinstance(self.omas_data, dict):
raise TypeError('Cannot convert from list to dict once ODS has data')
# if the value is not an ODS strucutre
if not isinstance(value, ODS):
# convert simple dict of code.parameters to CodeParameters instances
if '.code.parameters' in location and not isinstance(value, CodeParameters) and isinstance(value, (dict, ODS)):
tmp = value
value = CodeParameters()
value.update(tmp)
# now that all checks are completed we can assign the structure information
if self.consistency_check and '.code.parameters.' not in location:
ulocation = o2u(location)
blocation = re.sub(r'_error_(upper|lower)$', '', ulocation)
# handle cocos transformations coming in
if (
self.cocosio
and self.cocosio != self.cocos
and '.' in location
and blocation in omas_physics.cocos_signals
and not isinstance(value, ODS)
):
transform = omas_physics.cocos_signals[blocation]
if isinstance(transform, list):
norm = np.ones(len(transform))
for itf, tf in enumerate(transform):
norm[itf] = omas_physics.cocos_transform(self.cocosio, self.cocos)[tf]
elif transform == '?':
if isinstance(self.consistency_check, str) and 'warn' in self.consistency_check:
printe('COCOS translation has not been setup: %s' % blocation)
norm = 1.0
else:
raise ValueError('COCOS translation has not been setup: %s' % blocation)
else:
norm = omas_physics.cocos_transform(self.cocosio, self.cocos)[transform]
norm = norm if blocation == ulocation else abs(norm)
value = value * norm
# get node information
info = omas_info_node(ulocation, imas_version=self.imas_version)
# handle units (Python pint package)
if str(value.__class__).startswith("<class 'pint."):
import pint
if (
'units' in info
and isinstance(value, pint.Quantity)
or (
isinstance(value, numpy.ndarray)
and value.size
and isinstance(numpy.atleast_1d(value).flat[0], pint.Quantity)
)
):
value = value.to(info['units']).magnitude
# coordinates interpolation
ods_coordinates = self.top
input_coordinates = self.coordsio
if input_coordinates:
all_coordinates = []
coordinates = []
if len(input_coordinates) and 'coordinates' in info:
all_coordinates = list(map(lambda x: u2o(x, self.location), info['coordinates']))
coordinates = list(filter(lambda coord: not coord.startswith('1...'), all_coordinates))
if len(coordinates):
# add any missing coordinate that were input
for coordinate in coordinates:
if coordinate not in ods_coordinates and coordinate in input_coordinates:
printd('Adding %s coordinate to ods' % (coordinate), topic='coordsio')
ods_coordinates[coordinate] = input_coordinates.__getitem__(coordinate, False)
# if all coordinates information is present
if all(coord in input_coordinates and coord in ods_coordinates for coord in coordinates):
# if there is any coordinate that does not match
if any(
[
len(input_coordinates.__getitem__(coord, None)) != len(ods_coordinates.__getitem__(coord, None))
or (
not numpy.allclose(
input_coordinates.__getitem__(coord, False), ods_coordinates.__getitem__(coord, False)
)
)
for coord in coordinates
]
):
# for the time being omas interpolates only 1D quantities
if len(info['coordinates']) > 1:
raise Exception('coordio does not support multi-dimentional interpolation just yet')
# if the (first) coordinate is in input_coordinates
coordinate = coordinates[0]
if len(input_coordinates.__getitem__(coordinate, None)) != len(value):
raise Exception(
'coordsio %s.shape=%s does not match %s.shape=%s'
% (coordinate, input_coordinates.__getitem__(coordinate, False).shape, location, value.shape)
)
printd('Adding %s interpolated to input %s coordinate' % (self.location, coordinate), topic='coordsio')
value = omas_interp1d(
ods_coordinates.__getitem__(coordinate, None), input_coordinates.__getitem__(coordinate, None), value
)
else:
printd('%s ods and coordsio match' % (coordinates), topic='coordsio')
else:
printd('Adding `%s` without knowing coordinates `%s`' % (self.location, all_coordinates), topic='coordsio')
elif not self.active_dynamic and ulocation in omas_coordinates(self.imas_version) and location in ods_coordinates:
value = ods_coordinates.__getitem__(location, None)
# lists are saved as numpy arrays, and 0D numpy arrays as scalars
value = force_imas_type(value)
# check that dimensions and data types are consistent with IMAS specifications
if self.consistency_check and '.code.parameters.' not in location:
value, txt = consistency_checker(location, value, info, self.consistency_check, self.imas_version)
if not len(txt):
pass
elif isinstance(self.consistency_check, str) and ('warn' in self.consistency_check or 'drop' in self.consistency_check):
if 'warn' in self.consistency_check:
if 'drop' in self.consistency_check:
printe(f'Dropping invalid {txt}')
else:
printe(f'Invalid {txt}')
if 'drop' in self.consistency_check:
return
# check if the branch/node was dynamically created
dynamically_created = False
if key[0] not in self.keys(dynamic=0) and len(key) > 1:
dynamically_created = True
# assign values to this ODS
if key[0] not in self.keys(dynamic=0) or len(key) == 1:
self.setraw(key[0], value)
# pass the value one level deeper
# and cleanup dynamically created branches if necessary (eg. if consistency check fails)
if len(key) > 1:
try:
self.getraw(key[0])[key[1:]] = pass_on_value
except LookupError:
if dynamically_created:
del self[key[0]]
raise
# if the value is an ODS strucutre
if isinstance(value, ODS) and value.omas_data is not None and len(value.keys(dynamic=0)):
# we purposly do not force value.consistency_check = self.consistency_check
# because sub-ODSs could be shared among ODSs that have different settings of consistency_check
if False and value.consistency_check != self.consistency_check:
value.consistency_check = self.consistency_check
[docs] def getraw(self, key):
"""
Method to access data stored in ODS with no processing of the key, and it is thus faster than the ODS.__getitem__(key)
Effectively behaves like a pure Python dictionary/list __getitem__.
This method is mostly meant to be used in the inner workings of the ODS class.
NOTE: ODS.__getitem__(key, False) can be used to access items in the ODS with disabled cocos and coordinates processing but with support for different syntaxes to access data
:param key: string or integer
:return: ODS value
"""
return self.omas_data[key]
[docs] def same_init_ods(self, cls=None):
"""
Initializes a new ODS with the same attributes as this one
:return: new ODS
"""
if cls is None:
cls = self.__class__
return cls(
imas_version=self.imas_version,
consistency_check=self._consistency_check,
cocos=self._cocos,
cocosio=self._cocosio,
coordsio=self._coordsio,
dynamic=self.dynamic,
)
[docs] def setraw(self, key, value):
"""
Method to assign data to an ODS with no processing of the key, and it is thus faster than the ODS.__setitem__(key, value)
Effectively behaves like a pure Python dictionary/list __setitem__.
This method is mostly meant to be used in the inner workings of the ODS class.
:param key: string, integer or a list of these
:param value: value to assign
:return: value
"""
# accept path as list of keys
if isinstance(key, list):
if len(key) > 1:
if key[0] not in self.keys(dynamic=0):
self.setraw(key[0], self.same_init_ods())
return self.getraw(key[0]).setraw(key[1:], value)
else:
key = key[0]
# set .parent
if isinstance(value, ODC):
pass
elif isinstance(value, ODS):
if value.parent is not None:
value = copy.deepcopy(value)
value.parent = self
# structure
if isinstance(key, str) or isinstance(self, ODC):
if self.omas_data is None:
self.omas_data = {}
# handle uncertainty
if is_uncertain(value):
# scalar
if isinstance(value, uncertainties.core.AffineScalarFunc):
self.omas_data[key] = numpy.asarray(nominal_values(value)).item()
self.omas_data[key + '_error_upper'] = numpy.asarray(std_devs(value)).item()
# array
else:
self.omas_data[key] = nominal_values(value)
self.omas_data[key + '_error_upper'] = std_devs(value)
else:
self.omas_data[key] = value
# arrays of structures
else:
if self.omas_data is None:
self.omas_data = []
# dynamic array structure creation
if key > len(self.omas_data) and omas_rcparams['dynamic_path_creation'] == 'dynamic_array_structures':
for k in range(len(self.omas_data), key):
self.setraw(k, self.same_init_ods())
# index exists
if key < len(self.omas_data):
self.omas_data[key] = value
# next index creation
elif key == len(self.omas_data):
self.omas_data.append(value)
# missing index
else:
if not len(self.omas_data):
raise IndexError('`%s[%d]` but ods has no data' % (self.location, key))
else:
raise IndexError(
'`%s[%d]` but maximum index is %d\nPerhaps you want to use `with omas_environment(ods, dynamic_path_creation=\'dynamic_array_structures\')'
% (self.location, key, len(self.omas_data) - 1)
)
return value
def __getitem__(self, key, cocos_and_coords=True):
"""
ODS getitem method allows support for different syntaxes to access data
:param key: different syntaxes to access data, for example:
* ods['equilibrium']['time_slice'][0]['profiles_2d'][0]['psi'] # standard Python dictionary syntax
* ods['equilibrium.time_slice[0].profiles_2d[0].psi'] # IMAS hierarchical tree syntax
* ods['equilibrium.time_slice.0.profiles_2d.0.psi'] # dot separated string syntax
* ods[['equilibrium','time_slice',0,'profiles_2d',0,'psi']] # list of nodes syntax
NOTE: Python3.6+ f-strings can be very handy when looping over arrays of structures. For example:
for time_index in range(len(ods[f'equilibrium.time_slice'])):
for grid_index in range(len(ods[f'equilibrium.time_slice.{time_index}.profiles_2d'])):
print(ods[f'equilibrium.time_slice.{time_index}.profiles_2d.{grid_index}.psi'])
:param cocos_and_coords: processing of cocos transforms and coordinates interpolations [True/False/None]
* True: enabled COCOS and enabled interpolation
* False: enabled COCOS and disabled interpolation
* None: disabled COCOS and disabled interpolation
:return: ODS value
"""
# handle pattern match
if isinstance(key, str) and key.startswith('@'):
key = self.search_paths(key, 1, '@')[0]
# handle individual keys as well as full paths
key = p2l(key)
if not len(key):
return self
# negative numbers are used to address arrays of structures from the end
if isinstance(key[0], int) and key[0] < 0:
if self.omas_data is None:
key[0] = 0
elif isinstance(self.omas_data, list):
if not len(self.omas_data):
key[0] = 0
else:
key[0] = len(self.omas_data) + key[0]
# '+' is used to append new entry in array structure
if key[0] == '+':
if self.omas_data is None:
key[0] = 0
elif isinstance(self.omas_data, list):
key[0] = len(self.omas_data)
# slice
elif isinstance(key[0], str) and ':' in key[0]:
key[0] = slice(*map(lambda x: int(x.strip()) if x.strip() else None, key[0].split(':')))
dynamically_created = False
# data slicing
# NOTE: OMAS will try to return numpy arrays if the sliced data can be stacked in a uniform array
# otherwise a list will be returned (that's where we do `return data0` below)
if isinstance(key[0], slice):
data0 = []
for k in self.keys(dynamic=1)[key[0]]:
try:
data0.append(self.__getitem__([k] + key[1:], cocos_and_coords))
except ValueError:
data0.append([])
# raise an error if no data is returned
if not len(data0):
raise ValueError('`%s` has no data' % self.location)
# if they are filled but do not have the same number of dimensions
shapes = [numpy.asarray(item).shape for item in data0 if numpy.asarray(item).size]
if not len(shapes):
return numpy.asarray(data0)
if not all(len(shape) == len(shapes[0]) for shape in shapes[1:]):
return data0
# find maximum shape
max_shape = []
for shape in shapes:
for k, s in enumerate(shape):
if len(max_shape) < k + 1:
max_shape.append(s)
else:
max_shape[k] = max(max_shape[k], s)
max_shape = tuple([len(data0)] + max_shape)
# find types
dtypes = [numpy.asarray(item).dtype for item in data0 if numpy.asarray(item).size]
if not len(dtypes):
return numpy.asarray(data0)
if not all(dtype.char == dtypes[0].char for dtype in dtypes[1:]):
return data0
dtype = dtypes[0]
# array of strings
if dtype.char in 'U':
return numpy.asarray(data0)
# define an empty array of shape max_shape
if dtype.char in 'iIl':
data = numpy.full(max_shape, 0)
elif dtype.char in 'df':
data = numpy.full(max_shape, numpy.nan)
elif dtype.char in 'O':
data = numpy.full(max_shape, object())
else:
raise ValueError('Not an IMAS data type %s' % dtype.char)
# place the data in the empty array
if len(max_shape) == 1:
for k, item in enumerate(data0):
if isinstance(item, list): # item is [] if a subtree was missing in one of the slices
return data0
data[k] = numpy.asarray(item).item()
else:
for k, item in enumerate(data0):
if not sum(numpy.squeeze(item).shape):
if len(numpy.atleast_1d(numpy.squeeze(item))):
data[k, 0] = numpy.asarray(numpy.squeeze(item)).item()
else:
data[k, : len(item)] = item
return data
# dynamic path creation
elif key[0] not in self.keys(dynamic=0):
if omas_rcparams['dynamic_path_creation']:
if self.active_dynamic:
location = l2o([self.location, key[0]])
if self.active_dynamic and self.dynamic.__contains__(location):
try:
value = self.dynamic.__getitem__(location)
except Exception as _excp:
raise OmasDynamicException(f'Error dynamic fetching of `{location}` for {self.dynamic.kw}: {repr(_excp)}')
self.__setitem__(key[0], value)
elif self.active_dynamic and o2u(location).endswith(':'):
dynamically_created = True
for k in self.keys(dynamic=1):
if k not in self.keys(dynamic=0):
self.__setitem__(k, self.same_init_ods())
else:
dynamically_created = True
self.__setitem__(key[0], self.same_init_ods())
else:
location = l2o([self.location, key[0]])
raise LookupError('Dynamic path creation is disabled, hence `%s` needs to be manually created' % location)
value = self.omas_data[key[0]]
if len(key) > 1:
# if the user has entered a path rather than a single key
try:
if isinstance(value, ODS):
return value.__getitem__(key[1:], cocos_and_coords)
else:
return value[l2o(key[1:])]
except ValueError: # ValueError is raised when nodes have no data
if dynamically_created:
del self[key[0]]
raise
else:
if cocos_and_coords is not None and self.consistency_check and not isinstance(value, ODS):
location = l2o([self.location, key[0]])
ulocation = o2u(location)
blocation = re.sub(r'_error_(upper|lower)$', '', ulocation)
# handle cocos transformations going out
if self.cocosio and self.cocosio != self.cocos and '.' in location and blocation in omas_physics.cocos_signals:
transform = omas_physics.cocos_signals[blocation]
if isinstance(transform, list):
norm = numpy.ones(len(transform))
for itf, tf in enumerate(transform):
norm[itf] = omas_physics.cocos_transform(self.cocosio, self.cocos)[tf]
elif transform == '?':
if self.consistency_check == 'warn':
printe('COCOS translation has not been setup: %s' % blocation)
norm = 1.0
else:
raise ValueError('COCOS translation has not been setup: %s' % blocation)
else:
norm = omas_physics.cocos_transform(self.cocos, self.cocosio)[transform]
norm = norm if blocation == ulocation else abs(norm)
value = value * norm
# get node information
info = omas_info_node(ulocation, imas_version=self.imas_version)
# coordinates interpolation
ods_coordinates = self.top
output_coordinates = self.coordsio
if cocos_and_coords and output_coordinates:
all_coordinates = []
coordinates = []
if len(output_coordinates) and 'coordinates' in info:
all_coordinates = list(map(lambda x: u2o(x, self.location), info['coordinates']))
coordinates = list(filter(lambda coord: not coord.startswith('1...'), all_coordinates))
if len(coordinates):
# if all coordinates information is present
if all(coord in output_coordinates and coord in ods_coordinates for coord in coordinates):
# if there is any coordinate that does not match
if any(
[
len(output_coordinates.__getitem__(coord, None)) != len(ods_coordinates.__getitem__(coord, None))
or (
not numpy.allclose(
output_coordinates.__getitem__(coord, None), ods_coordinates.__getitem__(coord, None)
)
)
for coord in coordinates
]
):
# for the time being omas interpolates only 1D quantities
if len(info['coordinates']) > 1:
raise Exception('coordio does not support multi-dimentional interpolation just yet')
# if the (first) coordinate is in output_coordinates
coordinate = coordinates[0]
if len(ods_coordinates.__getitem__(coordinate, None)) != len(value):
raise Exception(
'coordsio %s.shape=%s does not match %s.shape=%s'
% (coordinate, ods_coordinates.__getitem__(coordinate, False).shape, location, value.shape)
)
printd('Returning %s interpolated to output %s coordinate' % (location, coordinate), topic='coordsio')
try:
value = omas_interp1d(
output_coordinates.__getitem__(coordinate, None),
ods_coordinates.__getitem__(coordinate, None),
value,
)
except TypeError:
if is_uncertain(value):
v = omas_interp1d(
output_coordinates.__getitem__(coordinate, None),
ods_coordinates.__getitem__(coordinate, None),
nominal_values(value),
)
s = omas_interp1d(
output_coordinates.__getitem__(coordinate, None),
ods_coordinates.__getitem__(coordinate, None),
std_devs(value),
)
value = unumpy.uarray(v, s)
else:
printd('%s ods and coordsio match' % (coordinates), topic='coordsio')
else:
printd(
'Getting `%s` without knowing some of the coordinates `%s`' % (self.location, all_coordinates),
topic='coordsio',
)
elif ulocation in omas_coordinates(self.imas_version) and location in output_coordinates:
value = output_coordinates.__getitem__(location, False)
# handle units (Python pint package)
if 'units' in info and self.unitsio:
import pint
from .omas_setup import ureg
if ureg[0] is None:
import pint
ureg[0] = pint.UnitRegistry()
value = value * getattr(ureg[0], info['units'])
# return uncertain array if errors are filled
if self.uncertainio and isinstance(key[0], str) and key[0] + '_error_upper' in self:
if key[0] + '_error_lower' in self:
raise TypeError(f"Error for {self.location+'.'+key[0]} is not symmetrical")
error_upper = self.__getitem__(key[0] + '_error_upper', cocos_and_coords)
if isinstance(value, float):
value = ufloat(value, error_upper)
else:
value = uarray(value, error_upper)
return value
def __delitem__(self, key):
# handle individual keys as well as full paths
key = p2l(key)
if len(key) > 1:
# if the user has entered path rather than a single key
del self.getraw(key[0])[key[1:]]
else:
return self.omas_data.__delitem__(key[0])
[docs] def paths(self, return_empty_leaves=False, traverse_code_parameters=True, include_structures=False, dynamic=True, verbose=False, **kw):
"""
Traverse the ods and return paths to its leaves
:param return_empty_leaves: if False only return paths to leaves that have data
if True also return paths to empty leaves
:param traverse_code_parameters: traverse code parameters
:param include_structures: include paths leading to the leaves
:param dynamic: traverse paths that are not loaded in a dynamic ODS
:return: list of paths that have data
"""
if dynamic and self.active_dynamic:
get_func = self.__getitem__
else:
get_func = self.getraw
paths = kw.setdefault('paths', [])
path = kw.setdefault('path', [])
for kid in sorted(self.keys(dynamic=dynamic)):
try:
mykid = get_func(kid)
except OmasDynamicException:
continue
if isinstance(mykid, ODS):
if include_structures:
paths.append(path + [kid])
mykid.paths(
return_empty_leaves=return_empty_leaves,
traverse_code_parameters=traverse_code_parameters,
include_structures=include_structures,
dynamic=dynamic,
verbose=verbose,
paths=paths,
path=path + [kid],
)
elif traverse_code_parameters and isinstance(mykid, CodeParameters):
if include_structures:
paths.append(path)
self.getraw(kid).paths(paths=paths, path=path + [kid])
else:
if verbose:
print(l2i(path + [kid]))
paths.append(path + [kid])
if not len(self.keys(dynamic=dynamic)) and return_empty_leaves:
paths.append(path)
return paths
[docs] def pretty_paths(self, **kw):
r"""
Traverse the ods and return paths that have data formatted nicely
:param \**kw: extra keywords passed to the path() method
:return: list of paths that have data formatted nicely
"""
return list(map(l2i, self.paths(**kw)))
[docs] def full_paths(self, **kw):
r"""
Traverse the ods and return paths from root of ODS that have data
:param \**kw: extra keywords passed to the path() method
:return: list of paths that have data
"""
location = p2l(self.location)
return [location + path for path in self.paths(**kw)]
[docs] def full_pretty_paths(self, **kw):
r"""
Traverse the ods and return paths from root of ODS that have data formatted nicely
:param \**kw: extra keywords passed to the full_paths() method
:return: list of paths that have data formatted nicely
"""
return list(map(l2i, self.full_paths(**kw)))
[docs] def flat(self, **kw):
r"""
Flat dictionary representation of the data
:param \**kw: extra keywords passed to the path() method
:return: OrderedDict with flat representation of the data
"""
tmp = OrderedDict()
for path in self.paths(**kw):
tmp[l2o(path)] = self[path]
return tmp
def __len__(self):
return len(self.keys())
def __iter__(self):
return iter(self.keys())
def __contains__(self, key):
key = p2l(key)
h = self
for c, k in enumerate(key):
# h.omas_data is None when dict/list behaviour is not assigned
if h.omas_data is not None and k in h.keys(dynamic=0):
h = h.__getitem__(k, False)
continue # continue to the next key
else:
# dynamic loading
if self.active_dynamic:
if k in self.dynamic.keys(l2o(p2l(self.location) + key[:c])):
continue # continue to the next key
# handle ':' by looking under each array of structure
# and returning True if any of them have the entry we are loooking for
if isinstance(k, str) and ':' in k:
s = slice(*map(lambda x: int(x.strip()) if x.strip() else None, k.split(':')))
for k in range(len(h.omas_data))[s]:
if key[c + 1 :] in h.omas_data[k]:
return True
return False
# return False if checking existance of a leaf and the leaf exists but is unassigned
if not self.active_dynamic and isinstance(h, ODS) and h.omas_data is None:
return False
return True
[docs] def keys(self, dynamic=True):
"""
Return list of keys
:param dynamic: whether dynamic loaded key should be shown.
This is `True` by default because this should be the case for calls that are facing the user.
Within the inner workings of OMAS we thus need to be careful and keep track of when this should not be the case.
Throughout the library we use `dynamic=1` or `dynamic=0` for debug purposes, since one can place a conditional
breakpoint in this function checking if `dynamic is True and self.dynamic` to verfy that indeed the `dynamic=True`
calls come from the user and not from within the library itself.
:return: list of keys
"""
if dynamic and self.active_dynamic:
if isinstance(self.omas_data, dict):
dynamic_keys = list(self.dynamic.keys(self.location))
return sorted(numpy.unique(list(self.omas_data.keys()) + dynamic_keys).tolist())
elif isinstance(self.omas_data, list):
# the first time dynamic data is loaded, empty ODS structures will populate self.omas_data
if len(self.omas_data):
return list(range(len(self.omas_data)))
else:
dynamic_keys = list(self.dynamic.keys(self.location))
if ':' in dynamic_keys:
raise Exception(
f'{self.dynamic.__class__.__name__}.keys() at `{self.location}` did not return a range with number of structures'
)
return dynamic_keys
elif not self.location:
return self.dynamic.keys(self.location)
else:
return []
else:
if isinstance(self.omas_data, dict):
return list(self.omas_data.keys())
elif isinstance(self.omas_data, list):
return list(range(len(self.omas_data)))
else:
return []
[docs] def values(self, dynamic=True):
return [self[item] for item in self.keys(dynamic=dynamic)]
[docs] def items(self, dynamic=True):
return [(item, self[item]) for item in self.keys(dynamic=dynamic)]
def __str__(self):
return self.location
def __repr__(self):
return repr(self.omas_data)
def __tree_repr__(self):
"""
OMFIT tree representation
"""
if not self.location:
return self, []
s = '--{%d}--' % len(self)
if 'dataset_description.data_entry' in self:
s += ' ' + ' '.join(['%s:%s' % (k, v) for k, v in self['dataset_description']['data_entry'].items() if v not in ['None', None]])
if 'summary.ids_properties.comment' in self:
s += ' ' + repr(self['summary.ids_properties.comment'])
return s, []
def __tree_keys__(self):
"""
OMFIT tree keys display dynamic
"""
return self.keys(dynamic=1)
[docs] def get(self, key, default=None):
r"""
Check if key is present and if not return default value without creating value in omas data structure
:param key: ods location
:param default: default value
:return: return default if key is not found
"""
if key not in self:
return default
else:
return self[key]
[docs] def setdefault(self, key, value=None):
"""
Set value if key is not present
:param key: ods location
:param value: value to set
:return: value
"""
if key not in self:
self[key] = value
return self[key]
def __getstate__(self):
state = {}
for item in ['omas_data'] + omas_ods_attrs:
if item in self.__dict__:
# we do not want to carry with us this information
if item in ['_cocosio', '_coordsio', '_unitsio', '_uncertainio', '_parent', '_dynamic']:
state[item] = None
else:
state[item] = self.__dict__[item]
return state
def __setstate__(self, state):
self.__dict__.update(state)
for item in omas_ods_attrs:
self.__dict__.setdefault(item, None)
if isinstance(self.omas_data, list):
for value in self.omas_data:
if isinstance(value, ODS):
value.parent = self
elif isinstance(self.omas_data, dict):
for key in self.omas_data:
if isinstance(self.omas_data[key], ODS):
self.omas_data[key].parent = self
return self
def __deepcopy__(self, memo):
tmp = self.same_init_ods()
memo[id(self)] = tmp
if self.omas_data is None:
return tmp
elif isinstance(self.omas_data, list):
tmp.omas_data = []
for k, value in enumerate(self.omas_data):
tmp.omas_data.append(value.__deepcopy__(memo=memo))
tmp.omas_data[k].parent = tmp
else:
tmp.omas_data = {}
for key in self.omas_data:
if isinstance(self.omas_data[key], ODS):
tmp.omas_data[key] = self[key].__deepcopy__(memo=memo)
tmp.omas_data[key].parent = tmp
else:
tmp.omas_data[key] = copy.deepcopy(self[key], memo=memo)
return tmp
[docs] def copy(self):
"""
:return: copy.deepcopy of current ODS object
"""
return copy.deepcopy(self)
[docs] def clear(self):
"""
remove data from a branch
:return: current ODS object
"""
if isinstance(self.omas_data, dict):
self.omas_data.clear()
elif isinstance(self.omas_data, list):
self.omas_data[:] = []
return self
[docs] def copy_attrs_from(self, ods):
"""
copy omas_ods_attrs attributes from input ods
:param ods: input ods
:return: self
"""
for item in omas_ods_attrs:
if item not in ['_parent', '_dynamic']:
setattr(self, item, getattr(ods, item, None))
return self
[docs] def prune(self):
"""
Prune ODS branches that are leafless
:return: number of branches that were pruned
"""
n = 0
for item in self.keys(dynamic=0):
if isinstance(self.getraw(item), ODS):
n += self.getraw(item).prune()
if not len(self.getraw(item).keys()):
n += 1
del self[item]
return n
[docs] def set_time_array(self, key, time_index, value):
"""
Convenience function for setting time dependent arrays
:param key: ODS location to edit
:param time_index: time index of the value to set
:param value: value to set
:return: time dependent array
"""
orig_value = []
if key in self:
orig_value = numpy.atleast_1d(self[key]).tolist()
# substitute
if time_index < len(orig_value):
orig_value[time_index] = value
# append
elif time_index == len(orig_value):
orig_value = orig_value + [value]
else:
key = p2l(key)
raise IndexError('%s has length %d and time_index %d is beyond current range' % (l2o(key), len(orig_value), time_index))
self[key] = numpy.atleast_1d(orig_value)
return orig_value
[docs] def update(self, ods2):
"""
Adds ods2's key-values pairs to the ods
:param ods2: dictionary or ODS to be added into the ODS
"""
if isinstance(ods2, ODS):
for item in ods2.paths():
self[item] = ods2[item]
else:
with omas_environment(self, dynamic_path_creation='dynamic_array_structures'):
for item in ods2.keys():
self[item] = ods2[item]
return self
[docs] def list_coordinates(self, absolute_location=True):
"""
return dictionary with coordinates in a given ODS
:param absolute_location: return keys as absolute or relative locations
:return: dictionary with coordinates
"""
coords = {}
n = len(self.location)
for full_path in self.full_paths():
if l2u(full_path) in omas_coordinates(self.imas_version):
if absolute_location:
coords[l2o(full_path)] = self[l2o(full_path)[n:].strip('.')]
else:
coords[l2o(full_path)[n:].strip('.')] = self[l2o(full_path)[n:].strip('.')]
return coords
[docs] def coordinates(self, key=None):
"""
return dictionary with coordinates of a given ODS location
:param key: ODS location to return the coordinates of
Note: both the key location and coordinates must have data
:return: OrderedDict with coordinates of a given ODS location
"""
coords = OrderedDict()
if key is None:
return self.list_coordinates(absolute_location=False)
self.__getitem__(key, False) # raise an error if data is not there
location = self.location
uxlocation = l2u(p2l(location) + p2l(key))
info = omas_info_node(uxlocation)
if 'coordinates' not in info:
raise ValueError('ODS location `%s` has no coordinates information' % uxlocation)
coordinates = list(map(lambda x: u2o(x[len(location) :].strip('.'), key), info['coordinates']))
for coord in coordinates:
coords[coord] = self[coord] # this will raise an error if the coordinates data is not there
return coords
[docs] def search_paths(self, search_pattern, n=None, regular_expression_startswith=''):
"""
Find ODS locations that match a pattern
:param search_pattern: regular expression ODS location string
:param n: raise an error if a number of occurrences different from n is found
:param regular_expression_startswith: indicates that use of regular expressions
in the search_pattern is preceeded by certain characters.
This is used internally by some methods of the ODS to force users
to use '@' to indicate access to a path by regular expression.
:return: list of ODS locations matching search_pattern pattern
"""
if not isinstance(search_pattern, str):
return [search_pattern]
elif regular_expression_startswith:
if not search_pattern.startswith(regular_expression_startswith):
return [search_pattern]
else:
search_pattern = search_pattern[len(regular_expression_startswith) :]
search = re.compile(search_pattern)
matches = []
for path in map(l2o, self.full_paths()):
if re.match(search, path):
matches.append(path)
if n is not None and len(matches) != n:
raise ValueError(
'Found %d matches of `%s` instead of the %d requested\n%s' % (len(matches), search_pattern, n, '\n'.join(matches))
)
return matches
[docs] def xarray(self, key):
"""
Returns data of an ODS location and correspondnig coordinates as an xarray dataset
Note that the Dataset and the DataArrays have their attributes set with the ODSs structure info
:param key: ODS location
:return: xarray dataset
"""
key = self.search_paths(key, 1, '@')[0]
import xarray
key = p2l(key)
location = p2l(self.location)
full_key = location + key
info = omas_info_node(l2u(full_key))
coords = self.coordinates(key)
short_coords = OrderedDict()
for coord in coords:
short_coords[p2l(coord)[-1]] = coords[coord]
ds = xarray.Dataset()
ds[key[-1]] = xarray.DataArray(self[key], coords=short_coords, dims=short_coords.keys(), attrs=info)
ds.attrs['y'] = key[-1]
ds.attrs['y_rel'] = l2o(key)
ds.attrs['y_full'] = l2o(full_key)
ds.attrs['x'] = []
ds.attrs['x_rel'] = []
ds.attrs['x_full'] = []
for coord in coords:
coord = p2l(coord)
info = omas_info_node(l2u(coord))
ds[coord[-1]] = xarray.DataArray(coords[l2o(coord)], dims=coord[-1], attrs=info)
ds.attrs['x'].append(coord[-1])
ds.attrs['x_rel'].append(l2o(coord))
ds.attrs['x_full'].append(l2o(location + coord))
return ds
[docs] def dataset(self, homogeneous=[False, 'time', 'full', None][-1]):
"""
Return xarray.Dataset representation of a whole ODS
Forming the N-D labeled arrays (tensors) that are at the base of xarrays,
requires that the number of elements in the arrays do not change across
the arrays of data structures.
:param homogeneous: * False: flat representation of the ODS
(data is not collected across arrays of structures)
* 'time': collect arrays of structures only along the time dimension
(always valid for homogeneous_time=True)
* 'full': collect arrays of structures along all dimensions
(may be valid in many situations, especially related to
simulation data with homogeneous_time=True and where
for example number of ions, sources, etc. do not vary)
* None: smart setting, uses homogeneous='time' if homogeneous_time=True else False
:return: xarray.Dataset
"""
import xarray
if not self.location:
DS = xarray.Dataset()
for ds in self.keys(dynamic=0):
DS.update(self[ds].dataset(homogeneous=homogeneous))
return DS
def arraystruct_indexnames(key):
"""
return list of strings with a name for each of the arrays of structures indexes
:param key: ods location
:return: list of strings
"""
base = key.split('.')[0]
coordinates = []
counter = 0
for c in [':'.join(key.split(':')[: k + 1]).strip('.') for k, struct in enumerate(key.split(':'))]:
info = omas_info_node(o2u(c))
if 'coordinates' in info:
for infoc in info['coordinates']:
if infoc == '1...N':
infoc = c
coordinates.append(
'__'
+ '_'.join(
[base, infoc.split('.')[-1]]
+ [str(k) for k in p2l(key) if isinstance(k, int) or k == ':']
+ ['index%d__' % counter]
)
)
counter += 1
return coordinates
# Generate paths with ':' for the arrays of structures
# that we want to collect across
paths = self.paths()
if self.location:
fpaths = list(map(lambda key: [self.location] + key, paths))
if homogeneous is None:
homogeneous = 'time' if self.homogeneous_time() else False
if not homogeneous:
fupaths = list(map(l2o, fpaths))
elif homogeneous == 'time':
fupaths = numpy.unique(list(map(l2ut, fpaths)))
elif homogeneous == 'full':
fupaths = numpy.unique(list(map(l2u, fpaths)))
else:
raise ValueError("OMAS dataset homogeneous attribute can only be [False, 'time', 'full', None]")
upaths = fupaths
if self.location:
n = len(self.location)
upaths = list(map(lambda key: key[n + 1 :], fupaths))
# Figure out coordinate indexes
# NOTE: We use coordinates indexes instead of proper coordinates
# since in IMAS these are time dependent quantities
# Eg. 'equilibrium.time_slice[:].profiles_1d.psi'
coordinates = {}
for fukey, ukey in zip(fupaths, upaths):
coordinates[fukey] = arraystruct_indexnames(fukey)
# Generate dataset
DS = xarray.Dataset()
for fukey, ukey in zip(fupaths, upaths):
if not len(omas_info_node(o2u(fukey))):
printe(f'WARNING: {o2i(fukey)} is not part of IMAS')
continue
data = self[ukey] # OMAS data slicing at work
for k, c in enumerate(coordinates[fukey]):
if c not in DS:
DS[c] = xarray.DataArray(numpy.arange(data.shape[k]), dims=c)
try:
try:
DS[fukey] = xarray.DataArray(data, dims=coordinates[fukey])
except Exception:
raise
except Exception as _excp:
raise ValueError(
f'Error collecting `{fukey}` which has coordinates {coordinates[fukey]}. Are coordinates not homogeneous?\n'
+ str(_excp)
)
return DS
[docs] def satisfy_imas_requirements(self, attempt_fix=True, raise_errors=True):
"""
Assign .time and .ids_properties.homogeneous_time info for top-level structures
since these are required for writing an IDS to IMAS
:param attempt_fix: fix dataset_description and wall IDS to have 0 times if none is set
:param raise_errors: raise errors if could not satisfy IMAS requirements
:return: `True` if all is good, `False` if requirements are not satisfied, `None` if fixes were applied
"""
status = self.physics_consistent_times(attempt_fix=attempt_fix, raise_errors=raise_errors)
self.physics_imas_info()
return status
[docs] def save(self, *args, **kw):
r"""
Save OMAS data
:param filename: filename.XXX where the extension is used to select save format method (eg. 'pkl','nc','h5','ds','json','ids')
set to `imas`, `s3`, `hdc`, `mongo` for load methods that do not have a filename with extension
:param \*args: extra arguments passed to save_omas_XXX() method
:param \**kw: extra keywords passed to save_omas_XXX() method
:return: return from save_omas_XXX() method
"""
# figure out format used
ext, args = _handle_extension(*args)
# save
return eval('save_omas_' + ext)(self, *args, **kw)
[docs] def load(self, *args, **kw):
r"""
Load OMAS data
:param filename: filename.XXX where the extension is used to select load format method (eg. 'pkl','nc','h5','ds','json','ids')
set to `imas`, `s3`, `hdc`, `mongo` for save methods that do not have a filename with extension
:param consistency_check: perform consistency check once the data is loaded
:param \*args: extra arguments passed to load_omas_XXX() method
:param \**kw: extra keywords passed to load_omas_XXX() method
:return: ODS with loaded data
"""
# figure out format used
ext, args = _handle_extension(*args)
# manage consistency_check logic
if 'consistency_check' in kw:
consistency_check = kw['consistency_check']
else:
consistency_check = self.consistency_check
if self.location:
kw['consistency_check'] = False
else:
kw['consistency_check'] = consistency_check
# load the data
results = eval('load_omas_' + ext)(*args, **kw)
# mongoDB may return more than one result, or none
if ext in ['mongo']:
if not len(results):
raise RuntimeError(ext + ' query returned no result!')
elif len(results) > 1:
raise RuntimeError(ext + ' query returned more than one result!')
else:
results = list(results.values())[0]
# update the data
self.omas_data = results.omas_data
if isinstance(self.omas_data, list):
for value in self.omas_data:
value.omas_data.parent = self
elif isinstance(self.omas_data, dict):
for key in self.omas_data:
if isinstance(self.omas_data[key], ODS):
self.omas_data[key].parent = self
# for pickle we can copy attrs over
if ext == 'pkl':
self.copy_attrs_from(results)
# apply consistency checks
if consistency_check != self.consistency_check or consistency_check != results.consistency_check:
self.consistency_check = consistency_check
return self
[docs] def open(self, *args, **kw):
r"""
Dynamically load OMAS data for seekable storage formats
:param filename: filename.XXX where the extension is used to select load format method (eg. 'nc','h5','ds','json','ids')
set to `imas`, `s3`, `hdc`, `mongo` for save methods that do not have a filename with extension
:param consistency_check: perform consistency check once the data is loaded
:param \*args: extra arguments passed to dynamic_omas_XXX() method
:param \**kw: extra keywords passed to dynamic_omas_XXX() method
:return: ODS with loaded data
"""
# manage consistency_check logic
if 'consistency_check' in kw:
consistency_check = kw.pop('consistency_check')
else:
consistency_check = self.consistency_check
if self.location:
consistency_check = False
# without args/kw re-connect to a dynamic ODS
if not len(args) and not len(kw):
if self.dynamic:
return self.dynamic.open()
else:
raise ValueError('ods was not previously open. Must specify .open() arguments.')
# figure out format used
ext, args = _handle_extension(*args)
storage_options = ['nc', 'imas']
if ext in ['nc', 'imas', 'machine']:
# apply consistency checks
if consistency_check != self.consistency_check:
self.consistency_check = consistency_check
if ext == 'nc':
from omas.omas_nc import dynamic_omas_nc
self.dynamic = dynamic_omas_nc(*args, **kw)
elif ext == 'imas':
from omas.omas_imas import dynamic_omas_imas
self.dynamic = dynamic_omas_imas(*args, **kw)
elif ext == 'machine':
from omas.omas_machine import dynamic_omas_machine
self.dynamic = dynamic_omas_machine(*args, **kw)
self.dynamic.open()
return self
else:
options = storage_options + list(machines(None).keys())
options.pop(options.index('sample'))
raise ValueError(ext + ' : dynamic loading not supported. Supported options are: ' + str(options))
[docs] def close(self):
if self.dynamic:
self.dynamic.close()
[docs] def diff(self, ods, ignore_type=False, ignore_empty=False, ignore_keys=[], ignore_default_keys=True, rtol=1.0e-5, atol=1.0e-8):
"""
return differences between this ODS and the one passed
:param ods: ODS to compare against
:param ignore_type: ignore object type differences
:param ignore_empty: ignore emptry nodes
:param ignore_keys: ignore the following keys
:param ignore_default_keys: ignores the following keys from the comparison
%s
rtol : The relative tolerance parameter
atol : The absolute tolerance parameter
:return: dictionary with differences
"""
return different_ods(
self,
ods,
ignore_type=ignore_type,
ignore_empty=ignore_empty,
ignore_keys=ignore_keys,
ignore_default_keys=ignore_default_keys,
rtol=rtol,
atol=atol,
)
[docs] def diff_attrs(self, ods, attrs=omas_ods_attrs, verbose=False):
"""
Checks if two ODSs have any difference in their attributes
:param ods: ODS to compare against
:param attrs: list of attributes to compare
:param verbose: print differences to stdout
:return: dictionary with list of attriibutes that have differences, or False otherwise
"""
return different_ods_attrs(self, ods, attrs=attrs, verbose=verbose)
[docs] def from_structure(self, structure, depth=0):
"""
Generate an ODS starting from a hierarchical structure made of dictionaries and lists
:param structure: input structure
:return: self
"""
if isinstance(structure, dict):
keys = list(map(str, structure.keys()))
elif isinstance(structure, list):
keys = list(range(len(structure)))
else:
raise ValueError('from_structure must be fed either a structure made of dictionaries and lists')
for item in keys:
if isinstance(structure[item], dict):
self[item].from_structure(structure[item], depth=depth + 1)
elif isinstance(structure[item], list):
# identify if this is a leaf
if (len(structure[item]) and not isinstance(structure[item][0], dict)) or not len(structure[item]):
self.setraw(item, numpy.array(structure[item]))
# or a node in the IMAS tree
else:
self[item].from_structure(structure[item], depth=depth + 1)
else:
self.setraw(item, copy.deepcopy(structure[item]))
if depth == 0 and isinstance(self[item], ODS):
self[item].consistency_check = self.consistency_check
return self
[docs] def codeparams2xml(self):
"""
Convert code.parameters to a XML string
"""
if not self.location:
for item in self.keys(dynamic=0):
self[item].codeparams2xml()
return
elif 'code.parameters' in self and isinstance(self['code.parameters'], CodeParameters):
self['code.parameters'] = self['code.parameters'].to_string()
elif 'parameters' in self and isinstance(self['parameters'], CodeParameters):
self['parameters'] = self['parameters'].to_string()
[docs] def codeparams2dict(self):
"""
Convert code.parameters to a CodeParameters dictionary object
"""
import xml
if not self.location:
for item in self.keys(dynamic=0):
self[item].codeparams2dict()
return
try:
if 'code.parameters' in self and isinstance(self['code.parameters'], str):
self['code.parameters'] = CodeParameters().from_string(self['code.parameters'])
elif 'parameters' in self and isinstance(self['parameters'], str):
self['parameters'] = CodeParameters().from_string(self['parameters'])
except xml.parsers.expat.ExpatError:
printe('%s.code.parameters is not formatted as XML' % self.location)
except Exception as _excp:
printe('Issue with %s.code.parameters: %s' % (self.location, repr(_excp)))
[docs] def sample(self, ntimes=1, homogeneous_time=None):
"""
Populates the ods with sample data
:param ntimes: number of time slices to generate
:param homogeneous_time: only return samples that have ids_properties.homogeneous_time either True or False
:return: self
"""
for func in omas_sample.__ods__:
printd(f'Adding {func} sample data to ods', topic='sample')
args, kw, _, _ = function_arguments(getattr(self, 'sample_' + func))
if 'time_index' in kw:
for k in range(ntimes):
getattr(self, 'sample_' + func)(time_index=k)
else:
getattr(self, 'sample_' + func)()
# remove IDSs that do not have the same homogeneous_time property as requested
if homogeneous_time is not None:
self.physics_consistent_times() # to add homogeneous_time info
for ds in list(self.keys()):
if self[ds]['ids_properties']['homogeneous_time'] != homogeneous_time:
del self[ds]
return self
[docs] def document(self, what=['coordinates', 'data_type', 'documentation', 'units']):
"""
RST documentation of the ODs content
:param what: fields to be included in the documentation
if None, all fields are included
:return: string with RST documentation
"""
pp = {}
for item in self.pretty_paths():
tt = l2u(p2l(item))
pp[tt] = omas_info_node(tt)
txt = []
for item in sorted(pp.keys()):
txt += ['', item]
txt += ['-' * len(item)]
for elms in pp[item]:
if what is not None and elms not in what:
continue
value = str(pp[item][elms])
txt += [f'* {elms}: {value}']
return '\n'.join(txt)
[docs] def to_odx(self, homogeneous=None):
"""
Generate a ODX from current ODS
:param homogeneous: * False: flat representation of the ODS
(data is not collected across arrays of structures)
* 'time': collect arrays of structures only along the time dimension
(always valid for homogeneous_time=True)
* 'full': collect arrays of structures along all dimensions
(may be valid in many situations, especially related to
simulation data with homogeneous_time=True and where
for example number of ions, sources, etc. do not vary)
* None: smart setting, uses homogeneous='time' if homogeneous_time=True else False
:return: ODX
"""
return ods_2_odx(self, homogeneous=homogeneous)
[docs] def info(self, location):
"""
return node info
:param location: location of the node to return info of
:return: dictionary with info
"""
return omas_info_node((self.location + '.' + location).lstrip('.'))
[docs] def relax(self, other, alpha=0.5):
'''
Blend floating point data in this ODS with corresponding floating point in other ODS
:param other: other ODS
:param alpha: relaxation coefficient `this_ods * (1.0 - alpha) + other_ods * alpha`
:return: list of paths that have been blended
'''
self_paths = set(self.pretty_paths())
other_paths = set(other.pretty_paths())
relaxed_paths = []
for item in sorted(self_paths.intersection(other_paths)):
self_data = self[item]
other_data = other[item]
if type(self_data) and type(other_data) and numpy.asarray(self_data).dtype.kind == 'f':
self[item] = self_data * (1.0 - alpha) + other_data * alpha
relaxed_paths.append(item)
return relaxed_paths
def __enter__(self):
if self.dynamic:
return self.dynamic.__enter__()
else:
raise RuntimeError('Missing call to .open() ?')
def __exit__(self, type, value, traceback):
if self.dynamic:
return self.dynamic.__exit__(type, value, traceback)
else:
raise RuntimeError('Missing call to .open() ?')
ODS.diff.__doc__ = ODS.diff.__doc__ % '\n '.join(default_keys_to_ignore)
omas_dictstate = dir(ODS)
omas_dictstate.extend(['omas_data'] + omas_ods_attrs)
omas_dictstate = sorted(list(set(omas_dictstate)))
[docs]class ODC(ODS):
"""
OMAS Data Collection class
"""
def __init__(self, *args, **kw):
ODS.__init__(self, *args, **kw)
self.omas_data = {}
@property
def consistency_check(self):
return False
@consistency_check.setter
def consistency_check(self, consistency_value):
for item in self.keys(dynamic=0):
self[item].consistency_check = consistency_value
self._consistency_check = consistency_value
[docs] def same_init_ods(self, cls=None):
if cls is None:
cls = ODS
return ODS.same_init_ods(self, cls=cls)
[docs] def keys(self, dynamic=True):
keys = list(self.omas_data.keys())
if keys is None:
return []
for k, item in enumerate(keys):
try:
keys[k] = c(item)
except Exception:
pass
return keys
[docs] def save(self, *args, **kw):
# figure out format that was used
if '/' not in args[0] and '.' not in os.path.split(args[0])[1]:
ext = args[0]
args = args[1:]
else:
ext = os.path.splitext(args[0])[-1].strip('.')
if not ext:
ext = 'pkl'
if ext in ['pkl', 'nc', 'json', 'h5']:
pass
else:
raise ValueError(f'Cannot save ODC to {ext} format')
return ODS.save(self, *args, **kw)
[docs] def load(self, *args, **kw):
# figure out format that was used
if '/' not in args[0] and '.' not in os.path.split(args[0])[1]:
ext = args[0]
args = args[1:]
else:
ext = os.path.splitext(args[0])[-1].strip('.')
if not ext:
ext = 'pkl'
if ext == 'pkl':
pass
elif ext in ['h5', 'nc', 'json']:
kw['cls'] = ODC
else:
raise ValueError(f'Cannot load ODC from {ext} format')
# manage consistency_check logic
if 'consistency_check' not in kw:
kw['consistency_check'] = True
return ODS.load(self, *args, **kw)
class dynamic_ODS:
"""
Abstract base class that dynamic_omas_... classes inherit from
"""
kw = {}
active = False
def __init__(self):
raise NotImplementedError('Classes that subclass %s should have a __init__() method' % self.__class__)
def open(self):
raise NotImplementedError('Classes that subclass %s should have a open() method' % self.__class__)
def close(self):
raise NotImplementedError('Classes that subclass %s should have a close() method' % self.__class__)
def __getstate__(self):
return self.kw
def __setstate__(self, kw):
self.kw = kw
self.active = False
def __enter__(self):
return self
def __exit__(self, type, value, traceback):
self.close()
[docs]class CodeParameters(dict):
"""
Class used to interface with IMAS code-parameters XML files
"""
def __init__(self, string=None):
if isinstance(string, str):
if os.path.exists(string):
self.from_file(string)
else:
self.from_string(string)
[docs] def from_string(self, code_params_string):
"""
Load data from code.parameters XML string
:param code_params_string: XML string
:return: self
"""
import xmltodict
self.clear()
if not code_params_string.strip().endswith('</parameters>'):
code_params_string = '<parameters>' + code_params_string + '</parameters>'
tmp = xmltodict.parse(code_params_string).get('parameters', '')
if not isinstance(tmp, (list, dict)):
import xml
raise xml.parsers.expat.ExpatError('Not in XML format')
elif tmp:
recursive_interpreter(tmp, dict_cls=CodeParameters)
self.update(tmp)
return self
[docs] def from_file(self, code_params_file):
"""
Load data from code.parameters XML file
:param code_params_file: XML file
:return: self
"""
with open(code_params_file, 'r') as f:
return self.from_string(f.read())
[docs] def to_string(self):
"""
generate an XML string from this dictionary
:return: XML string
"""
import xmltodict
tmp = {'parameters': CodeParameters()}
tmp['parameters'].update(copy.deepcopy(self))
recursive_encoder(tmp)
return xmltodict.unparse(tmp, pretty=True)
def __setitem__(self, key, value):
key = p2l(key)
if not len(key):
return self
# go deeper
if len(key) > 1:
if key[0] not in self or not isinstance(self[key[0]], CodeParameters):
self.setraw(key[0], self.__class__())
self.getraw(key[0])[key[1:]] = value
# return leaf
else:
# convert ODSs to CodeParameters
if isinstance(value, ODS):
value = CodeParameters()
self.setraw(key[0], value)
def __getitem__(self, key):
key = p2l(key)
if not len(key):
return self
# go deeper
if len(key) > 1:
return self.getraw(key[0])[key[1:]]
# return leaf
else:
return self.getraw(key[0])
[docs] def getraw(self, key):
"""
Method to access data to CodeParameters with no processing of the key.
Effectively behaves like a pure Python dictionary/list __getitem__.
This method is mostly meant to be used in the inner workings of the CodeParameters class.
:param key: string or integer
:return: value
"""
return dict.__getitem__(self, key)
[docs] def setraw(self, key, value):
"""
Method to assign data to CodeParameters with no processing of the key.
Effectively behaves like a pure Python dictionary/list __setitem__.
This method is mostly meant to be used in the inner workings of the CodeParameters class.
:param key: string or integer
:param value: value to assign
:return: value
"""
return dict.__setitem__(self, key, value)
[docs] def update(self, value):
"""
Update CodeParameters
NOTE: ODSs will be converted to CodeParameters classes
:param value: dictionary structure
:return: self
"""
# convert ODSs to CodeParameters
if isinstance(value, (ODS, CodeParameters)):
for item in value.paths():
self[item] = value[item]
else:
for item in value.keys():
self[item] = value[item]
return self
[docs] def paths(self, **kw):
"""
Traverse the code parameters and return paths that have data
:return: list of paths that have data
"""
paths = kw.setdefault('paths', [])
path = kw.setdefault('path', [])
for kid in self.keys():
if isinstance(self[kid], CodeParameters):
self[kid].paths(paths=paths, path=path + [kid])
else:
paths.append(path + [kid])
return paths
[docs] def keys(self):
"""
:return: keys as list
"""
return list(dict.keys(self))
[docs] def values(self):
"""
:return: values as list
"""
return list(dict.values(self))
[docs] def items(self):
"""
:return: key-value pairs as list
"""
return list(dict.items(self))
[docs] def flat(self, **kw):
r"""
Flat dictionary representation of the data
:param \**kw: extra keywords passed to the path() method
:return: OrderedDict with flat representation of the data
"""
tmp = OrderedDict()
for path in self.paths(**kw):
tmp[l2o(path)] = self[path]
return tmp
[docs] def from_structure(self, structure, depth=0):
"""
Generate CodeParamters starting from a hierarchical structure made of dictionaries and lists
:param structure: input structure
:return: self
"""
if isinstance(structure, dict):
keys = list(map(str, structure.keys()))
elif isinstance(structure, list):
keys = list(range(len(structure)))
else:
raise ValueError('from_structure must be fed either a structure made of dictionaries and lists')
for item in keys:
if isinstance(structure[item], dict):
self[item].from_structure(structure[item], depth=depth + 1)
elif isinstance(structure[item], list):
# identify if this is a leaf
if len(structure[item]) and not isinstance(structure[item][0], dict):
self[item] = numpy.array(structure[item])
# or a node in the IMAS tree
else:
self[item].from_structure(structure[item], depth=depth + 1)
else:
self[item] = copy.deepcopy(structure[item])
return self
[docs]def codeparams_xml_save(f):
"""
Decorator function to be used around the omas_save_XXX methods to enable
saving of code.parameters as an XML string
"""
def wrapper(ods, *args, **kwargs):
with omas_environment(ods, xmlcodeparams=True):
return f(ods, *args, **kwargs)
return wrapper
[docs]def codeparams_xml_load(f):
"""
Decorator function to be used around the omas_load_XXX methods to enable
loading of code.parameters from an XML string
"""
@wraps(f)
def wrapper(*args, **kwargs):
ods = f(*args, **kwargs)
ods.codeparams2dict()
return ods
return wrapper
# --------------------------------------------
# import sample functions and add them as ODS methods
# --------------------------------------------
try:
from . import omas_sample
from .omas_sample import ods_sample
__all__.append('omas_sample')
for item in omas_sample.__ods__:
setattr(ODS, 'sample_' + item, getattr(omas_sample, item))
except ImportError as _excp:
printe('OMAS sample function are not available: ' + repr(_excp))
raise
# --------------------------------------------
# import physics functions and add them as ODS methods
# --------------------------------------------
try:
from . import omas_physics
from .omas_physics import *
__all__.append('omas_physics')
__all__.extend(omas_physics.__all__)
for item in omas_physics.__ods__:
setattr(ODS, 'physics_' + item, getattr(omas_physics, item))
except ImportError as _excp:
printe('OMAS physics function are not available: ' + repr(_excp))
# --------------------------------------------
# import plotting functions and add them as ODS methods
# --------------------------------------------
try:
from . import omas_plot
__all__.append('omas_plot')
for item in omas_plot.__ods__:
setattr(ODS, 'plot_' + item, getattr(omas_plot, item))
except ImportError as _excp:
printe('OMAS plotting function are not available: ' + repr(_excp))
# --------------------------------------------
# save and load OMAS with Python pickle
# --------------------------------------------
[docs]def save_omas_pkl(ods, filename, **kw):
"""
Save ODS to Python pickle
:param ods: OMAS data set
:param filename: filename to save to
:param kw: keywords passed to pickle.dump function
"""
printd('Saving to %s' % filename, topic='pkl')
kw.setdefault('protocol', omas_rcparams['pickle_protocol'])
with open(filename, 'wb') as f:
pickle.dump(ods, f, **kw)
[docs]def load_omas_pkl(filename, consistency_check=None, imas_version=None):
"""
Load ODS or ODC from Python pickle
:param filename: filename to save to
:param consistency_check: verify that data is consistent with IMAS schema (skip if None)
:param imas_version: imas version to use for consistency check (leave original if None)
:returns: ods OMAS data set
"""
printd('Loading from %s' % filename, topic='pkl')
with open(filename, 'rb') as f:
try:
tmp = pickle.load(f)
except UnicodeDecodeError:
# to support ODSs created with Python2
tmp = pickle.load(f, encoding="latin1")
if imas_version is not None:
tmp.imas_version = imas_version
if consistency_check is not None:
tmp.consistency_check = consistency_check
return tmp
[docs]def through_omas_pkl(ods):
"""
Test save and load Python pickle
:param ods: ods
:return: ods
"""
filename = omas_testdir(__file__) + '/test.pkl'
ods = copy.deepcopy(ods) # make a copy to make sure save does not alter entering ODS
save_omas_pkl(ods, filename)
ods1 = load_omas_pkl(filename)
return ods1
# --------------------------------------------
# import other omas tools and methods in this namespace
# --------------------------------------------
from .omas_imas import *
from .omas_s3 import *
from .omas_nc import *
from .omas_json import *
from .omas_hdc import *
from .omas_uda import *
from .omas_h5 import *
from .omas_ds import *
from .omas_ascii import *
from .omas_mongo import *
from .omas_symbols import *
from .omas_machine import *
from .utilities.machine_mapping_decorator import *
from .utilities.omas_mds import *
from . import omas_structure