Source code for omas.omas_utils

'''naming convention translation and misc utilities

-------
'''

from .omas_setup import *
from .omas_setup import __version__
import sys

# --------------------------------------------
# ODS utilities
# --------------------------------------------
default_keys_to_ignore = [
    'dataset_description.data_entry.user',
    'dataset_description.data_entry.run',
    'dataset_description.data_entry.machine',
    'dataset_description.ids_properties',
    'dataset_description.imas_version',
    'dataset_description.time',
    'ids_properties.homogeneous_time',
    'ids_properties.occurrence',
    'ids_properties.version_put.data_dictionary',
    'ids_properties.version_put.access_layer',
    'ids_properties.version_put.access_layer_language',
]


[docs]def different_ods(ods1, ods2, ignore_type=False, ignore_empty=False, ignore_keys=[], ignore_default_keys=True, rtol=1.0e-5, atol=1.0e-8): """ Checks if two ODSs have any difference and returns the string with the cause of the different :param ods1: first ods to check :param ods2: second ods to check :param ignore_type: ignore object type differences :param ignore_empty: ignore emptry nodes :param ignore_keys: ignore the following keys :param ignore_default_keys: ignores the following keys from the comparison %s rtol : The relative tolerance parameter atol : The absolute tolerance parameter :return: string with reason for difference, or False otherwise """ from omas import ODS, CodeParameters ods1 = ods1.flat(return_empty_leaves=True, traverse_code_parameters=True) ods2 = ods2.flat(return_empty_leaves=True, traverse_code_parameters=True) keys_to_ignore = [] keys_to_ignore.extend(ignore_keys) if ignore_default_keys: keys_to_ignore.extend(default_keys_to_ignore) def is_ignored(k): return any(o2u(k).endswith(end) for end in keys_to_ignore) k1 = set(ods1.keys()) k2 = set(ods2.keys()) differences = [] for k in k1.difference(k2): if not k.startswith('info.') and not (ignore_empty and isinstance(ods1[k], ODS) and not len(ods1[k])) and not is_ignored(k): differences.append(f'DIFF: key `{k}` missing in 2nd ods') for k in k2.difference(k1): if not k.startswith('info.') and not (ignore_empty and isinstance(ods2[k], ODS) and not len(ods2[k])) and not is_ignored(k): differences.append(f'DIFF: key `{k}` missing in 1st ods') for k in sorted(k1.intersection(k2)): try: if is_ignored(k): pass elif ods1[k] is None and ods2[k] is None: pass elif isinstance(ods1[k], str) and isinstance(ods2[k], str): if ods1[k] != ods2[k]: differences.append(f'DIFF: `{k}` differ in value') elif not ignore_type and type(ods1[k]) != type(ods2[k]): differences.append(f'DIFF: `{k}` differ in type: {type(ods1[k])} vs type(ods2[k])') elif is_uncertain(ods1[k]) or is_uncertain(ods2[k]): v1 = nominal_values(ods1[k]) v2 = nominal_values(ods2[k]) d1 = std_devs(ods1[k]) d2 = std_devs(ods1[k]) s1 = v1.shape s2 = v2.shape if s1 != s2: differences.append(f'DIFF: `{k}` differ in shape: {s1} vs {s2}') elif not numpy.allclose(v1, v2, equal_nan=True, atol=atol, rtol=rtol) or not numpy.allclose( d1, d2, equal_nan=True, atol=atol, rtol=rtol ): differences.append(f'DIFF: `{k}` differ in value') else: v1 = nominal_values(ods1[k]) v2 = nominal_values(ods2[k]) s1 = v1.shape s2 = v2.shape if v1.shape != v2.shape: differences.append(f'DIFF: `{k}` differ in shape: {s1} vs {s2}') elif not numpy.allclose(ods1[k], ods2[k], equal_nan=True, atol=atol, rtol=rtol): differences.append(f'DIFF: `{k}` differ in value') except Exception as _excp: raise Exception(f'Error comparing {k}: ' + repr(_excp)) if len(differences): return differences else: return False
different_ods.__doc__ = different_ods.__doc__ % '\n '.join(default_keys_to_ignore) def different_ods_attrs(ods1, ods2, attrs=None, verbose=False): """ Checks if two ODSs have any difference in their attributes :param ods1: first ods to check :param ods2: second ods to check :param attrs: list of attributes to compare :param verbose: print differences to stdout :return: dictionary with list of attriibutes that have differences, or False otherwise """ if isinstance(attrs, str): attrs = [attrs] elif attrs is None: from .omas_core import omas_ods_attrs attrs = omas_ods_attrs if '_parent' in attrs: attrs.pop(attrs.index('_parent')) n = max(list(map(lambda x: len(x), attrs))) l1 = set(list(map(lambda x: l2i(x[:-1]), ods1.paths(return_empty_leaves=True, traverse_code_parameters=False)))) l2 = set(list(map(lambda x: l2i(x[:-1]), ods2.paths(return_empty_leaves=True, traverse_code_parameters=False)))) paths = sorted(list(l1.intersection(l2))) differences = {} for item in paths: first = True try: for k in attrs: a1 = getattr(ods1[item], k) a2 = getattr(ods2[item], k) if a1 != a2: if first: if verbose: print('-' * 20) print(item) print('-' * 20) differences[item] = [] first = False differences[item].append(k) if verbose: print(k.ljust(n) + ': * %s' % a1) print('`%s * %s' % (' '.ljust(n), a2)) except: raise if len(differences): return differences else: return False # -------------------------- # general utility functions # -------------------------- _streams = {'DEBUG': sys.stderr, 'STDERR': sys.stderr} def printd(*objects, **kw): """ debug print Use environmental variable $OMAS_DEBUG_TOPIC to set the topic to be printed """ topic = kw.pop('topic', '') if isinstance(topic, str): topic = [topic] topic = list(map(lambda x: x.lower(), topic)) if len(topic): objects = [f'DEBUG ({",".join(topic)}):'] + list(objects) else: objects = ['DEBUG:'] + list(objects) topic_selected = os.environ.get('OMAS_DEBUG_TOPIC', '') dump = False if topic_selected.endswith('_dump'): dump = True topic_selected = re.sub('_dump$', '', topic_selected) if topic_selected and (topic_selected == '*' or topic_selected in topic or '*' in topic): if eval(os.environ.get('OMAS_DEBUG_STDOUT', '0')): kw.setdefault('file', sys.stdout) else: kw.setdefault('file', _streams['DEBUG']) print(*objects, **kw) if dump: fb = StringIO() print(*objects[1:], file=fb) with open('omas_dump.txt', 'a') as f: f.write(fb.getvalue()) fb.close() def printe(*objects, **kw): """ print to stderr """ kw['file'] = _streams['STDERR'] print(*objects, **kw) def print_stack(): return traceback.print_stack(file=sys.__stderr__) def is_uncertain(var): """ :param var: Variable or array to test :return: True if input variable or array is uncertain """ def _uncertain_check(x): return isinstance(x, uncertainties.core.AffineScalarFunc) if isinstance(var, str): return False elif numpy.iterable(var) or isinstance(var, numpy.ndarray): # isinstance needed for 0D arrays from squeeze tmp = numpy.array(var) if tmp.dtype not in ['O', 'object']: return False else: # the argument of any is a generator object (using a list slows things down) return any(_uncertain_check(x) for x in tmp.flat) else: return _uncertain_check(var) def is_numeric(value): """ Convenience function check if value is numeric :param value: value to check :return: True/False """ try: 0 + value return True except TypeError: return False def omas_interp1d(x, xp, yp, left=None, right=None, period=None, extrapolate=True): """ If xp is not increasing, the results are numpy.interp1d nonsense. This function wraps numpy.interp1d but makes sure that the x-coordinate sequence xp is increasing. :param extrapolate: linear extrapolation beyond bounds """ if not numpy.all(numpy.diff(xp) > 0): index = numpy.argsort(xp) else: index = numpy.arange(len(xp)).astype(int) y = numpy.interp(x, xp[index], yp[index], left=left, right=right, period=period) if extrapolate: if not period and not left: y = numpy.where( x < xp[index[0]], yp[index[0]] + (x - xp[index[0]]) * (yp[index[0]] - yp[index[1]]) / (xp[index[0]] - xp[index[1]]), y ) if not period and not right: y = numpy.where( x > xp[index[-1]], yp[index[-1]] + (x - xp[index[-1]]) * (yp[index[-1]] - yp[index[-2]]) / (xp[index[-1]] - xp[index[-2]]), y, ) return y omas_interp1d.__doc__ += numpy.interp.__doc__ def json_dumper(obj, objects_encode=True): """ Dump objects to json format :param obj: input ojbect :param objects_encode: how to handle non-standard JSON objects * True: encode numpy arrays, complex, and uncertain * None: numpy arrays as lists, encode complex, and uncertain * False: numpy arrays as lists, fail on complex, and uncertain :return: json-compatible object """ from omas import ODS if isinstance(obj, ODS): return obj.omas_data if objects_encode is False: if isinstance(obj, numpy.ndarray): return obj.tolist() elif isinstance(obj, (range, map)): return list(obj) elif isinstance(obj, numpy.generic): return obj.item() else: return obj.toJSON() else: if is_uncertain(obj): if not len(numpy.array(obj).shape): return dict(__ufloat__=nominal_values(obj), __ufloat_std__=std_devs(obj)) else: nomv = nominal_values(obj) return dict( __udarray_tolist_avg__=nomv.tolist(), __udarray_tolist_std__=std_devs(obj).tolist(), dtype=str(nomv.dtype), shape=obj.shape, ) elif isinstance(obj, numpy.ndarray): if 'complex' in str(obj.dtype).lower(): return dict( __ndarray_tolist_real__=obj.real.tolist(), __ndarray_tolist_imag__=obj.imag.tolist(), dtype=str(obj.dtype), shape=obj.shape, ) else: if objects_encode is None: return obj.tolist() else: return dict(__ndarray_tolist__=obj.tolist(), dtype=str(obj.dtype), shape=obj.shape) elif isinstance(obj, range): return list(obj) elif isinstance(obj, numpy.generic): return obj.item() elif isinstance(obj, complex): return dict(__complex__=True, real=obj.real, imag=obj.imag) elif isinstance(obj, bytes): return obj.decode('utf-8') else: return obj.toJSON() def convert_int(value): """ Try to convert value to integer and do nothing on error :param value: value to try to convert :return: value, possibly converted to int """ try: return int(value) except ValueError: return value def json_loader(object_pairs, cls=dict, null_to=None): """ Load json-objects generated by the json_dumper function :param object_pairs: json-compatible [dict/list] object :param cls: dicitonary class to use :param null_to: convert null to user defined value (None by default) :return: ojbect """ from omas import ODS object_pairs = list(map(lambda o: (convert_int(o[0]), o[1]), object_pairs)) dct = cls() # for ODSs we can use the setraw() method which does # not peform any sort of check, nor tries to parse # special OMAS syntaxes and is thus much faster if isinstance(dct, ODS): for x, y in object_pairs: if null_to is not None and y is None: y = null_to if isinstance(y, list): if len(y) and isinstance(y[0], ODS): dct.setraw(x, cls()) for k in range(len(y)): dct[x].setraw(k, y[k]) else: if null_to is not None: for k in range(len(y)): if y[k] is None: y[k] = null_to y = numpy.array(y) # to handle objects_encode=None as used in OMAS dct.setraw(x, y) else: dct.setraw(x, y) else: for x, y in object_pairs: if null_to is not None and y is None: y = null_to if isinstance(y, list): if len(y) and isinstance(y[0], ODS): dct[x] = cls() for k in range(len(y)): dct[x][k] = y[k] else: if null_to is not None: for k in range(len(y)): if y[k] is None: y[k] = null_to dct[x] = y else: dct[x] = y if "dtype" in dct: # python2/3 compatibility dct["dtype"] = dct["dtype"].replace('S', 'U') if '__ndarray_tolist__' in dct: return numpy.array(dct['__ndarray_tolist__'], dtype=dct['dtype']).reshape(dct['shape']) elif '__ndarray_tolist_real__' in dct and '__ndarray_tolist_imag__' in dct: return ( numpy.array(dct['__ndarray_tolist_real__'], dtype=dct['dtype']).reshape(dct['shape']) + numpy.array(dct['__ndarray_tolist_imag__'], dtype=dct['dtype']).reshape(dct['shape']) * 1j ) elif '__udarray_tolist_avg__' in dct and '__udarray_tolist_std__' in dct: return uarray( numpy.array(dct['__udarray_tolist_avg__'], dtype=dct['dtype']).reshape(dct['shape']), numpy.array(dct['__udarray_tolist_std__'], dtype=dct['dtype']).reshape(dct['shape']), ) elif '__ufloat__' in dct and '__ufloat_std__' in dct: return ufloat(dct['__ufloat__'], dct['__ufloat_std__']) elif '__ndarray__' in dct: import base64 data = base64.b64decode(dct['__ndarray__']) return numpy.frombuffer(data, dct['dtype']).reshape(dct['shape']) elif '__complex__' in dct: return complex(dct['real'], dct['imag']) return dct def recursive_glob(pattern='*', rootdir='.'): """ Search recursively for files matching a specified pattern within a rootdir :param pattern: glob pattern to match :param rootdir: top level directory to search under """ import fnmatch matches = [] for root, dirnames, filenames in os.walk(rootdir): for filename in fnmatch.filter(filenames, pattern): matches.append(os.path.join(root, filename)) return matches def remove_parentheses(inv, replace_with=''): """ function used to remove/replace top-level matching parenthesis from a string :param inv: input string :param replace_with: string to replace matching parenthesis with :return: input string without first set of matching parentheses """ k = 0 lp = '' out = '' for c in inv: # go one level deep if c == '(': k += 1 lp = c # go one level up elif c == ')': k -= 1 lp += c if k == 0: out += replace_with # zero depth: add character to output string elif k == 0: out += c return out def closest_index(my_list, my_number=0): """ Given a SORTED iterable (a numeric array or list of numbers) and a numeric scalar my_number, find the index of the number in the list that is closest to my_number :param my_list: Sorted iterable (list or array) to search for number closest to my_number :param my_number: Number to get close to in my_list :return: Index of my_list element closest to my_number :note: If two numbers are equally close, returns the index of the smallest number. """ if not hasattr(my_list, '__iter__'): raise TypeError("closestIndex() requires an iterable as the first argument. Got instead: {:}".format(my_list)) if not is_numeric(my_number): raise TypeError("closestIndex() requires a numeric scalar as the second argument. Got instead: {:}".format(my_number)) import bisect pos = bisect.bisect_left(my_list, my_number) if pos == 0: return 0 if pos == len(my_list): return pos - 1 before = pos - 1 after = pos if my_list[after] - my_number < my_number - my_list[before]: return pos else: return pos - 1 def sanitize_version_number(version): """Removes common non-numerical characters from version numbers obtained from git tags, such as '_rc', etc.""" if version.startswith('.'): version = '-1' + version version = version.replace('_rc', '.') return version def compare_version(version1, version2): """Returns 1 if version1 > version2, -1 if version1 < version2, or 0 if version1 == version2.""" version1 = sanitize_version_number(version1) version2 = sanitize_version_number(version2) def normalize(v): if 'r' in v: v = v.split('r')[0] return [int(x) for x in re.sub(r'(\.0+)*$', '', v).split(".")] return (normalize(version1) > normalize(version2)) - (normalize(version1) < normalize(version2)) def underline_last(text, offset=0): """ Utility function to underline the last part of a path :param text: text to underline :param offset: add offset to underling :return: original text with underline on a new line """ index = [i for i, x in enumerate(text) if x in ['.', ' ']][-1] if text[index] == '.': index += 1 underline = ' ' * (index + offset) + '^' * (len(text) - index) return text + '\n' + underline def function_arguments(f, discard=None, asString=False): """ Returns the arguments that a function takes :param f: function to inspect :param discard: list of function arguments to discard :param asString: concatenate arguments to a string :return: tuple of four elements * list of compulsory function arguments * dictionary of function arguments that have defaults * True/False if the function allows variable arguments * True/False if the function allows keywords """ import inspect the_argspec = inspect.getfullargspec(f) the_keywords = the_argspec.varkw args = [] kws = OrderedDict() string = '' for k, arg in enumerate(the_argspec.args): if (discard is not None) and (arg in tolist(discard)): continue d = '' if the_argspec.defaults is not None: if (-len(the_argspec.args) + k) >= -len(the_argspec.defaults): d = the_argspec.defaults[-len(the_argspec.args) + k] kws[arg] = d string += arg + '=' + repr(d) + ',\n' else: args.append(arg) string += arg + ',\n' else: args.append(arg) string += arg + ',\n' if the_argspec.varargs: string += '*[],\n' if the_keywords: string += '**{},\n' string = string.strip() if asString: return string else: return args, kws, the_argspec.varargs is not None, the_keywords is not None def args_as_kw(f, args, kw): """ Move positional arguments to kw arguments :param f: function :param args: positional arguments :param kw: keywords arguments :return: tuple with positional arguments moved to keyword arguments """ a, k, astar, kstar = function_arguments(f) if len(a) and a[0] == 'self': a = a[1:] a = a + list(k.keys()) n = 0 for name, value in zip(a + list(k.keys()), args): if name not in kw: kw[name] = value n += 1 return args[n:], kw # ---------------------------------------------- # handling of OMAS json structures # ---------------------------------------------- # IMAS structure info organized as flat entries # * IMAS syntax with `:` for list of structures # * each entry contains leafs attributes _structures = {} # IMAS structure info organized in hierarchical dictionaries # * list of structures as `:` # * the leafs are empty dictionaries _structures_dict = {} # cache structures filenames _structures_filenames = {} # cache for structure() _ods_structure_cache = {} # similar to `_structures_dict` but for use in omas_info _info_structures = {} # dictionary that contains all the coordinates defined within the data dictionary _coordinates = {} # dictionary that contains all the times defined within the data dictionary _times = {} # dictionary that contains all the _global_quantities defined within the data dictionary _global_quantities = {} # extra structures that python modules using omas can define # by setting omas.omas_utils._extra_structures equal to a # dictionary with the definitions of the quantities that are # not (yet) available in IMAS. For example: # # omas.omas_utils._extra_structures = { # 'equilibrium': { # 'equilibrium.time_slice.:.profiles_1d.centroid.r_max': { # "full_path": "equilibrium/time_slices(itime)/profiles_1d/centroid.r_max(:)", # "coordinates": ['equilibrium.time_slice[:].profiles_1d.psi'], # "data_type": "FLT_1D", # "description": "centroid r max", # "units": 'm', # "cocos_signal": '?' # optional # } # } # } _extra_structures = {} def list_structures(imas_version): """ list names of structures in imas version :param imas_version: imas version :return: list with names of structures in imas version """ json_filenames = glob.glob(imas_json_dir + os.sep + imas_versions.get(imas_version, imas_version) + os.sep + '*' + '.json') json_filenames = filter(lambda x: os.path.basename(x)[0] != '_', json_filenames) structures = sorted(list(map(lambda x: os.path.splitext(os.path.split(x)[1])[0], json_filenames))) if not len(structures): raise ValueError("Unrecognized IMAS version `%s`. Possible options are:\n%s" % (imas_version, imas_versions.keys())) return structures def structures_filenames(imas_version): """ Maps structure names to json filenames :param imas_version: imas version :return: dictionary maps structure names to json filenames """ if imas_version not in _structures_filenames: paths = glob.glob(imas_json_dir + os.sep + imas_versions.get(imas_version, imas_version) + os.sep + '*' + '.json') if not len(paths): raise ValueError("Unrecognized IMAS version `%s`. Possible options are:\n%s" % (imas_version, imas_versions.keys())) structures = dict(zip(list(map(lambda x: os.path.splitext(os.path.split(x)[1])[0], paths)), paths)) _structures_filenames[imas_version] = { structure: structures[structure] for structure in structures if not structure.startswith('_') } return _structures_filenames[imas_version] def load_structure(filename, imas_version): """ load omas structure from given json filename or IDS name :param filename: full path to json file or IDS name :param imas_version: imas version to load the data schema of (optional if filename is a full path) :return: tuple with structure, hashing mapper, and ods """ from .omas_physics import cocos_signals # translate DS to filename if os.sep not in filename: filename = structures_filenames(imas_version)[filename] # check if _structures and _structures_dict already have this in cache id = (filename, imas_version) if id in _structures and id in _structures_dict: return _structures[id], _structures_dict[id] else: with open(filename, 'r') as f: dump_string = f.read() # load flat definitions from json file _structures[id] = json.loads(dump_string) # add _extra_structures definitions structure_name = os.path.splitext(os.path.split(filename)[1])[0] if structure_name in _extra_structures: for item in _extra_structures[structure_name]: if item not in _structures[id]: cs = _extra_structures[structure_name][item].pop('cocos_signal', None) _structures[id][item] = _extra_structures[structure_name][item] if cs is not None: cocos_signals[i2o(item)] = cs # generate hierarchical structure _structures_dict[id] = {} for item in _structures[id]: h = _structures_dict[id] for step in i2o(item).split('.'): if step not in h: h[step] = {} h = h[step] return _structures[id], _structures_dict[id] def imas_structure(imas_version, location): """ Returns a dictionary with the IMAS structure given a location :param imas_version: imas version :param location: path in OMAS format :return: dictionary as loaded by load_structure() at location """ if imas_version not in _ods_structure_cache: _ods_structure_cache[imas_version] = {} ulocation = o2u(location) if ulocation not in _ods_structure_cache[imas_version]: if not ulocation: structure = {k: k for k in list_structures(imas_version=imas_version)} else: path = p2l(ulocation) structure = load_structure(path[0], imas_version=imas_version)[1][path[0]] for key in path[1:]: structure = structure[key] _ods_structure_cache[imas_version][ulocation] = structure return _ods_structure_cache[imas_version][ulocation] def omas_coordinates(imas_version=omas_rcparams['default_imas_version']): """ return list of coordinates :param imas_version: IMAS version to look up :return: list of strings with IMAS coordinates """ # caching if imas_version not in _coordinates: filename = imas_json_dir + os.sep + imas_versions.get(imas_version, imas_version) + os.sep + '_coordinates.json' if os.path.exists(filename): with open(filename, 'r') as f: _coordinates[imas_version] = json.load(f) else: from .omas_structure import extract_coordinates _coordinates[imas_version] = extract_coordinates(imas_version) return _coordinates[imas_version] def omas_times(imas_version=omas_rcparams['default_imas_version']): """ return list of times :param imas_version: IMAS version to look up :return: list of strings with IMAS times """ # caching if imas_version not in _times: filename = imas_json_dir + os.sep + imas_versions.get(imas_version, imas_version) + os.sep + '_times.json' if os.path.exists(filename): with open(filename, 'r') as f: _times[imas_version] = json.load(f) else: from .omas_structure import extract_times _times[imas_version] = extract_times(imas_version) return _times[imas_version] def omas_global_quantities(imas_version=omas_rcparams['default_imas_version']): """ return list of times :param imas_version: IMAS version to look up :return: list of strings with IMAS times """ # caching if imas_version not in _global_quantities: filename = imas_json_dir + os.sep + imas_versions.get(imas_version, imas_version) + os.sep + '_global_quantities.json' if os.path.exists(filename): with open(filename, 'r') as f: _global_quantities[imas_version] = json.load(f) else: from .omas_structure import extract_global_quantities _global_quantities[imas_version] = extract_global_quantities(imas_version) return _global_quantities[imas_version] # only attempt cython if effective user owns this copy of omas # disabled for Windows: need to add check for file ownership under Windows if os.name == 'nt' or os.geteuid() != os.stat(__file__).st_uid: with open(os.path.split(__file__)[0] + os.sep + 'omas_cython.pyx', 'r') as f: exec(f.read(), globals()) else: try: import pyximport pyximport.install(language_level=3) from .omas_cython import * except Exception as _excp: warnings.warn('omas cython failed: ' + str(_excp)) with open(os.path.split(__file__)[0] + os.sep + 'omas_cython.pyx', 'r') as f: exec(f.read(), globals()) def l2ut(path): """ Formats IMAS time lists ['bla',0,'time_slice',5,'quantity'] with universal ODS path 'bla.0.time_slice.:.quantity' :param path: list of strings and integers :return: ODS path format with time lists in universal format """ lpath = p2l(path) opath = l2o(lpath) for k, key in enumerate(lpath): if not isinstance(key, int): continue key = lpath[:k] info = omas_info_node(l2u(key)) if 'coordinates' in info: for infoc in info['coordinates']: if infoc.endswith('.time'): lpath[k] = ':' return l2o(lpath)
[docs]def omas_info(structures=None, hide_obsolescent=True, cumulative_queries=False, imas_version=omas_rcparams['default_imas_version']): """ This function returns an ods with the leaf nodes filled with their property informations :param hide_obsolescent: hide obsolescent entries :param structures: list with ids names or string with ids name of which to retrieve the info if None, then all structures are returned :param cumulative_queries: return all IDSs that have been queried :param imas_version: IMAS version to look up :return: ods showcasing IDS structure """ from omas import ODS if not structures: structures = sorted(list(structures_filenames(imas_version).keys())) elif isinstance(structures, str): structures = [structures] # caching based on imas version and obsolescence if (imas_version, hide_obsolescent) not in _info_structures: _info_structures[imas_version, hide_obsolescent] = ODS(imas_version=imas_version, consistency_check=False) ods = _info_structures[imas_version, hide_obsolescent] ods_out = ODS(imas_version=imas_version, consistency_check=False) # generate ODS with info for structure in structures: if structure not in ods: tmp = load_structure(structure, imas_version)[0] lst = sorted(tmp.keys()) for k, item in enumerate(lst): if re.match('.*_error_(index|lower|upper)$', item.split('.')[-1]): continue parent = False for item1 in lst[k + 1 :]: if l2u(item1.split('.')[:-1]).rstrip('[:]') == item: parent = True break if parent: continue if hide_obsolescent and omas_info_node(item).get('lifecycle_status', '') == 'obsolescent': continue ods[item.replace(':', '0')] = tmp[item] ods_out[structure] = ods[structure] # cumulative queries if cumulative_queries: for structure in ods: if structure not in ods_out: ods_out[structure] = ods[structure] return ods_out
[docs]def omas_info_node(key, imas_version=omas_rcparams['default_imas_version']): """ return information about a given node :param key: IMAS path :param imas_version: IMAS version to look up :return: dictionary with IMAS information (or an empty dictionary if the node is not found) """ try: return copy.copy(load_structure(key.split('.')[0], imas_version)[0][o2i(key)]) except KeyError: return {}
def recursive_interpreter(me, interpret_method=ast.literal_eval, dict_cls=OrderedDict): """ Traverse dictionaries and list to convert strings to int/float when appropriate :param me: root of the dictionary to traverse :param interpret_method: method used for conversion (ast.literal_eval by default) :param dict_cls: dictionary class to use :return: root of the dictionary """ if isinstance(me, list): keys = range(len(me)) elif isinstance(me, dict): keys = me.keys() for kid in keys: if me[kid] is None: continue elif isinstance(me[kid], (list, dict)): if not isinstance(me[kid], dict_cls): tmp = me[kid] me[kid] = dict_cls() me[kid].update(tmp) recursive_interpreter(me[kid], interpret_method=interpret_method, dict_cls=dict_cls) if isinstance(kid, str) and kid.startswith('__integer_'): me[int(re.sub('__integer_([0-9]+)__', r'\1', kid))] = me[kid] del me[kid] else: try: me[kid] = interpret_method(me[kid]) except (ValueError, SyntaxError) as _excp: pass if isinstance(me[kid], str) and ' ' in me[kid]: try: values = [] for item in re.split(r'[ |\t]+', me[kid].strip()): values.append(float(item)) me[kid] = numpy.array(values) except ValueError: pass return me def recursive_encoder(me): """ Traverse dictionaries and list to convert entries as appropriate :param me: root of the dictionary to traverse :return: root of the dictionary """ if isinstance(me, list): keys = range(len(me)) elif isinstance(me, dict): keys = me.keys() for kid in keys: if me[kid] is None: continue elif isinstance(me[kid], (list, dict)): recursive_encoder(me[kid]) else: if isinstance(me[kid], numpy.ndarray): me[kid] = ' '.join([repr(x) for x in me[kid]]) else: me[kid] = str(me[kid]) # omas encoding of integer keys if isinstance(kid, int): me['__integer_%d__' % kid] = me[kid] del me[kid] return me
[docs]def get_actor_io_ids(filename): """ Parse IMAS Python actor script and return actor input and output IDSs :param filename: filename of the IMAS Python actor :return: tuple with list of input IDSs and output IDSs """ import ast with open(filename, 'r') as f: module = ast.parse(f.read()) actor = os.path.splitext(os.path.split(filename)[-1])[0] function_definitions = [node for node in module.body if isinstance(node, ast.FunctionDef)] docstring = ast.get_docstring([f for f in function_definitions if f.name == actor][0]) ids_in = [] ids_out = [] for line in docstring.split('\n'): if 'codeparams' in line: pass elif line.strip().startswith(':param result:'): ids_out = list(map(lambda x: x.strip()[:-1], line.split(':')[2].strip(', ').split(','))) break elif line.strip().startswith(':param '): ids_in.append(line.split(':')[2].strip()) return ids_in, ids_out
class UnittestCaseOmas(unittest.TestCase): """ Base class for unittest.TestCase within OMAS """ def setUp(self): name = self.__class__.__name__ + '.' + self._testMethodName print('') print('~' * len(name)) print(name) print('~' * len(name))