Source code for omfit_classes.omfit_hdf5

try:
    # framework is running
    from .startup_choice import *
except ImportError as _excp:
    # class is imported by itself
    if (
        'attempted relative import with no known parent package' in str(_excp)
        or 'No module named \'omfit_classes\'' in str(_excp)
        or "No module named '__main__.startup_choice'" in str(_excp)
    ):
        from startup_choice import *
    else:
        raise

from matplotlib import pyplot
import numpy as np

try:
    import h5py._hl.dataset

    def h5py_dataset_tree_repr(self):
        return self[()], []

    h5py._hl.dataset.Dataset.__tree_repr__ = h5py_dataset_tree_repr
    import h5py
except ImportError as _excp:
    h5py = None
    warnings.warn('No `hdf5` support: ' + repr(_excp))

__all__ = ['OMFIThdf5raw', 'OMFIThdf5', 'dict2hdf5']


[docs]def dict2hdf5(filename, dictin, groupname='', recursive=True, lists_as_dicts=False, compression=None): """ Save hierarchy of dictionaries containing np-compatible objects to hdf5 file :param filename: hdf5 file to save to :param dictin: input dictionary :param groupname: group to save the data in :param recursive: traverse the dictionary :param lists_as_dicts: convert lists to dictionaries with integer strings :param compression: gzip compression level """ if isinstance(filename, str): with h5py.File(filename, 'w') as g: dict2hdf5(g, dictin, recursive=recursive, lists_as_dicts=lists_as_dicts, compression=compression) return else: parent = filename if groupname: g = parent.create_group(groupname) else: g = parent for key, item in list(dictin.items()): if isinstance(item, dict): if recursive: dict2hdf5(g, item, key, recursive=recursive, lists_as_dicts=lists_as_dicts, compression=compression) elif lists_as_dicts and isinstance(item, (list, tuple)) and not isinstance(item, np.ndarray): item = {'%d' % k: v for k, v in enumerate(item)} dict2hdf5(g, item, key, recursive=recursive, lists_as_dicts=lists_as_dicts, compression=compression) else: if item is None: item = '_None' tmp = np.array(item) printd('tmp.dtype=', tmp.dtype, topic='hdf5') printd('tmp.dtype.name=', tmp.dtype.name, topic='hdf5') if str(tmp.dtype).lower().lstrip('<>|').startswith('u'): tmp = tmp.astype('S') elif tmp.dtype.name.lower().startswith('o'): printw('Could not save `%s` to h5 file' % key) continue if tmp.shape == (): g.create_dataset(key, tmp.shape, dtype=tmp.dtype)[...] = tmp else: g.create_dataset(key, tmp.shape, dtype=tmp.dtype, compression=compression)[...] = tmp return g
[docs]class OMFIThdf5raw(SortedDict, OMFITobject): """ OMFIT class that exposes directly the h5py.File class """ def __init__(self, filename, **kw): OMFITobject.__init__(self, filename, **kw) SortedDict.__init__(self, sorted=True) self.dynaLoad = True
[docs] @dynaLoad def load(self): self.clear() self.update(h5py.File(self.filename, 'r'))
class OMFIThdf5dataset(SortedDict): """ Replacement for h5py.Dataset that provides some useful methods like plot, etc. """ def __init__(self, dataset, oh5=None): """ :param dataset: h5py.Dataset instance :param oh5: parent OMFIThdf5 instance Required for handling lookup of dimensions """ self.update(dataset.attrs) self['data'] = dataset[()] self.oh5 = oh5 h5file = self.get_h5file() if 'DIMENSION_LIST' in self: self['dims'] = [b2s(h5py.h5r.get_name(self['DIMENSION_LIST'][0][i], h5file.id)) for i in range(len(self['DIMENSION_LIST'][0]))] return def get_h5file(self): """Gets a reference to the h5py.File instance associated with the parent OMFIThdf5""" if self.oh5 is None: return None else: h5file = getattr(self.oh5, '_data') if h5file is None: h5file = h5py.File(self.filename, 'r') return h5file def plot(self, ax=None, **plot_kw): """ Plots data. If enough information is available, the correct coordinates will be used. Otherwise, plots vs. array index. :param ax: Axes instance :param plot_kw: parameters passed to plot() call :return: output from plot call; probably an array of line objects or something """ ndim = len(np.shape(self['data'])) if ndim == 1: return self._plot_1d(ax=ax, **plot_kw) else: printe('Method for plotting ndim = {} is not yet supported, sorry.'.format(ndim)) return None def get_dim(self, idx): """ Gets information about a coordinate dimension :param idx: int Which dimension? :return: tuple containing array, string, string x label units """ xunits = '' xlabel = '' if self.oh5 is None or 'dims' not in self or len(self['dims']) == 0: x = None else: d0 = self['dims'][idx] if d0 in self.oh5: dim = self.oh5[d0] elif d0[1:] in self.oh5: dim = self.oh5[d0[1:]] else: dim = None if dim is None or dim.get('data', None) is None: x = None else: x = dim['data'] xunits = b2s(dim.get('units', '')) xlabel = b2s(dim.get('title', '')) return x, xunits, xlabel def _plot_1d(self, ax=None, **plot_kw): """ Plots 1D data :param ax: Axes instance :param plot_kw: keywords passed to plot :return: output from plot call; probably an array of line objects or something """ if ax is None: ax = pyplot.gca() y = self['data'] yunits = b2s(self.get('units', '')) ylabel = b2s(self.get('title', '')) x, xunits, xlabel = self.get_dim(0) if x is None: x = np.arange(len(y)) xlabel = 'index' xunits = '' if yunits: ylabel += ' ({})'.format(yunits) if xunits: xlabel += ' ({})'.format(xunits) ax.set_ylabel(ylabel) ax.set_xlabel(xlabel) return ax.plot(x, y, **plot_kw)
[docs]class OMFIThdf5(SortedDict, OMFITobject): """ OMFIT class that translates HDF5 file to python dictionary At this point this class is read only. Changes made to the its content will not be reflected to the HDF5 file. """ def __init__(self, filename, **kw): OMFITobject.__init__(self, filename, **kw) SortedDict.__init__(self, sorted=True) self.dynaLoad = True
[docs] @dynaLoad def load(self): self.clear() try: self._data = h5py.File(self.filename, 'r') self._convertDataset() finally: if hasattr(self, '_data'): self._data.close() delattr(self, '_data')
[docs] def update(self, dictin, groupname='', recursive=True, lists_as_dicts=True, compression=None): with h5py.File(self.filename, 'w') as g: dict2hdf5(g, dictin, recursive=recursive, compression=compression) return self
def _convertDataset(self, path=''): data_location = eval('self._data' + path) self_location = eval('self' + path) for item in data_location: if isinstance(data_location[item], h5py.Dataset): self_location[item] = OMFIThdf5dataset(data_location[item], self) elif isinstance(data_location[item], h5py.Group): self_location[item] = SortedDict(sorted=True) self._convertDataset(path=path + "[%s]" % repr(item)) return
############################################ if __name__ == '__main__': test_classes_main_header() tmp = OMFIThdf5raw(OMFITsrc + '/../samples/def_equilibrium.h5') tmp.load()