Source code for omfit_classes.omfit_interface

try:
    # framework is running
    from .startup_choice import *
except ImportError as _excp:
    # class is imported by itself
    if (
        'attempted relative import with no known parent package' in str(_excp)
        or 'No module named \'omfit_classes\'' in str(_excp)
        or "No module named '__main__.startup_choice'" in str(_excp)
    ):
        from startup_choice import *
    else:
        raise

import numpy as np

__all__ = ['OMFITinterface', 'interface']


class OMFITinterfaceError(Exception):
    pass


class OMFITinterfaceAdaptor(SortedDict):

    type = ''

    def __init__(self, data=None):
        SortedDict.__init__(self)
        self['__quantities__'] = SortedDict(sorted=True)
        self['__dimensions__'] = SortedDict(sorted=True)
        baseDimensions = ['', 'i', 'b', 'r', 't']

        s = list(baseDimensions[1:])
        self.allowedDimensions = []
        for k in itertools.chain.from_iterable(itertools.combinations(s, r) for r in range(len(s) + 1)):
            self.allowedDimensions.append(''.join(k))
        self.allowedDimensions.insert(0, '')
        # i and b should not coexist

        self.data = data
        self.list()
        self.dims()

    def definitions(self, key, data):
        pass

    def dims(self):
        if self.data is None:
            return

        if hasattr(self, 'adaptor'):
            self['__dimensions__'].update(self.adaptor['__dimensions__'])

        for item in self['__dimensions__']:
            if not isinstance(self['__dimensions__'][item], str):
                continue
            if item == '':
                self['__dimensions__'][item] = (1,)
            else:
                self['__dimensions__'][item] = self[self['__dimensions__'][item]].shape

        for item in list(self['__dimensions__'].keys()):
            if item != '' and np.iterable(self['__dimensions__'][item]):
                tmp = [1] * len(item)
                for k, s in enumerate(self['__dimensions__'][item]):
                    tmp[k] = s
                self['__dimensions__'][item] = tmp

        for item in list(self['__dimensions__'].keys()):
            if np.iterable(self['__dimensions__'][item]):
                for k, s in enumerate(item):
                    self['__dimensions__'][s] = self['__dimensions__'][item][k]
                del self['__dimensions__'][item]
            self['__dimensions__'][''] = 1

    def register(self, key, quantity, description, units, dimension):

        quantities = np.atleast_1d(quantity)
        for quantity in quantities:

            if quantity not in list(self['__quantities__'].keys()):
                if dimension not in self.allowedDimensions:
                    raise OMFITinterfaceError('Dimension `' + dimension + '` is not allowed, use: ' + repr(self.allowedDimensions))

                self['__quantities__'][quantity] = SortedDict({'description': description, 'units': units, 'raw_dimension': dimension})
                self['__dimensions__'][dimension] = quantity

            if key is not None and key == quantity:
                return True

        return False

    def __getitem__(self, key):
        if key in list(self.keys()):
            return super().__getitem__(key)
        else:
            if hasattr(self, 'adaptor'):
                data = self.adaptor.data
            else:
                data = self.data
            try:
                value = self.definitions(key, self.data)
            except Exception as _excp:
                printe('Error in the definition of ' + key)
                raise
            value = np.atleast_1d(value)
            super().__setitem__(key, value)
            return value

    def list(self):
        if not len(self['__quantities__']):

            if hasattr(self, 'adaptor'):
                self.adaptor.list()
                self['__quantities__'].update(self.adaptor['__quantities__'])

            self.definitions(None, self.data)

        return list(self['__quantities__'].keys())

    def get_all(self):
        for item in self['__quantities__']:
            self[item]


[docs]class OMFITinterface(OMFITinterfaceAdaptor): """ :param data: input data to be read by the adaptor :param adaptor: adaptor class to be used :param adaptorsFile: adaptor class to be used (defaults to 'OMFIT-source/omfit/adaptors.py') :param t: t at which to return data :param r: r at which to return data :param squeeze: string with dimensions ('r','t','rt') to squeeze if length is one :param checkAdaptors: check adaptors for inconsistencies (only if adaptor keyword is not passed) """ def __init__(self, data=None, adaptor=None, adaptorsFile=None, t=None, r=None, squeeze='', interp='linear', checkAdaptors=True): if adaptor is None: namespace = self.getAdaptors(adaptorsFile) excp_s = '' for k in namespace: try: if namespace[k].identify(data): adaptor = namespace[k] break except Exception as _excp: excp_s += '\nadaptor ' + k + ': ' + repr(_excp) if adaptor is None: printe('OMFITinterface could not associate the data to an adaptor' + excp_s) adaptor = OMFITinterfaceAdaptor if checkAdaptors: self.checkInterfaceAdaptors(namespace, quiet_if_ok=True) elif not adaptor.identify(data): printe('data is not valid for adaptor ' + adaptor.type) adaptor = OMFITinterfaceAdaptor self.adaptor = adaptor(data) OMFITinterfaceAdaptor.__init__(self) self.t = t self.r = r self.squeeze = squeeze self.interp = interp
[docs] @staticmethod def getAdaptors(adaptorsFile=None): namespace = {} if adaptorsFile is None: adaptorsFile = OMFITsrc + os.sep + 'omfit_classes' + os.sep + 'adaptors.py' elif hasattr(adaptorsFile, 'filename'): adaptorsFile = adaptorsFile.filename with open(adaptorsFile, 'r') as f: exec(f.read(), globals(), namespace) return namespace
@property def type(self): if self.adaptor is None: return '' return self.adaptor.type
[docs] def definitions(self, key, data): if key in self.adaptor.list(): return self.adaptor[key] if self.register(key, 'me', 'Electron mass', 'C', ''): return 9.10938291e-31 if self.register(key, 'mH', 'Hydrogen mass', 'kg', ''): return 1.6726e-27 if self.register(key, 'mD', 'Deuteron mass', 'kg', ''): return 2 * 1.6726e-27 if self.register(key, 'mT', 'Tritium mass', 'kg', ''): return 3 * 1.6726e-27 if self.register(key, 'mHe', 'Helium mass', 'kg', ''): return 4 * 1.6726e-27 if self.register(key, 'mC', 'Carbon mass', 'kg', ''): return 12 * 1.6726e-27 if self.register(key, 'e', 'Electron charge (positive)', 'C', ''): return 1.60217646e-19 if self.register(key, 'eV', 'eV', 'J', ''): return 1.60217646e-19 if self.register(key, 'keV -> ', 'keV', 'J', ''): return 1.60217646e-16 if self.register(key, 'c', 'Light speed', 'm/s', ''): return 299792458.0
[docs] def get(self, item, dim, t=None, r=None, interp=None, squeeze=None): if t is None: t = self.t if r is None: r = self.r if squeeze is None: squeeze = self.squeeze if interp is None: interp = self.interp items = np.atleast_1d(item) dims = np.atleast_1d(dim) tmp = [] for item in items: for dim in dims: tmp.append(self._get(item, dim, t, r, interp, squeeze)) if len(items) == 1 and len(dims) == 1: return tmp[0] else: return np.squeeze(np.reshape(tmp, (len(items), len(dims))))
def _get(self, item, dim, t, r, interp, squeeze=None): if item not in self['__quantities__']: raise OMFITinterfaceError('\'' + item + '\' is not available, did you mean ' + repr(bestChoice(self.list(), item)[0]) + ' ?') if dim in ['desc', 'description']: return self['__quantities__'][item]['description'] elif dim in ['unit', 'units']: return self['__quantities__'][item]['units'] elif dim in ['dim', 'raw_dimension']: return self['__quantities__'][item]['units'] if dim not in self.allowedDimensions: raise OMFITinterfaceError(item + ': Dimension \'' + dim + '\' is not allowed, use: ' + repr(self.allowedDimensions)) raw_dim = self['__quantities__'][item]['raw_dimension'] # stop if dimension requested is smaller than raw data if len(raw_dim.strip(dim)): raise OMFITinterfaceError('You are asking for `' + item + '` in ' + dim + ', but it is defined in ' + raw_dim) tmp = self[item] # do not attempt anything if it's a string if isinstance(tmp[0], str): return tmp # tiling missing = re.sub('[ ' + raw_dim + ']', '', dim) # print('was:'+raw_dim,'asked:'+dim,'missing:'+missing) tmp_dim = raw_dim tl = [] for k in missing[::-1]: tmp_dim = k + tmp_dim for k in missing: if k in self['__dimensions__']: tl.append(self['__dimensions__'][k]) else: tl.append(1) tl.extend([1] * len(tmp.shape)) tmp = np.tile(tmp, tl) # reordering tmp_dim = list(tmp_dim) for step in range(len(tmp_dim)): order = {} for k, s in enumerate(tmp_dim): if dim[k] != s: order[s] = (k, dim.index(s)) if not len(order): break # print(order) swap = order[list(order.keys())[0]] tmp = np.swapaxes(tmp, swap[0], swap[1]) tt = tmp_dim[swap[0]] tmp_dim[swap[0]] = tmp_dim[swap[1]] tmp_dim[swap[1]] = tt # ============== # after this point the object has the dimensionality which was requested by the user # ============== for k in self['__dimensions__']: if k not in dim and eval(k) is not None: raise OMFITinterfaceError( item + ': Cannot return data at specific `' + k + '`, with dimension `' + dim + '` wich is not `' + k + '` dependent' ) # interpolation by user request # make sure to keep shape even when interpolating at one point mp = {'t': 'time', 'r': 'rho'} for k, d in enumerate(dim): if d in mp and eval(d) is not None: sh = list(tmp.shape) if tmp.shape[k] > 1: sh[k] = len(np.atleast_1d(eval(d))) tmp = interp1e(self[mp[d]], tmp, axis=k, kind=interp)(eval(d)) tmp = np.reshape(tmp, sh) # squeeze dimensions sh = [] for k, d in enumerate(dim): if (squeeze is True or d in squeeze) and tmp.shape[k] == 1: continue sh.append(tmp.shape[k]) if len(sh): tmp = np.reshape(tmp, sh) else: tmp = np.reshape(tmp, (1,))[0] return tmp
[docs] @staticmethod def checkInterfaceAdaptors(namespace=None, checkList=['units', 'description'], quiet_if_ok=True): if namespace is None: namespace = OMFITinterface.getAdaptors() adaptorInstances = {} q_master = {} for k1 in list(namespace.keys()): k = namespace[k1].type adaptorInstances[k] = namespace[k1]() quantities = adaptorInstances[k]['__quantities__'] for q in quantities: q_master.setdefault(q, []).append((k, quantities[q])) anyIssue = False for q in q_master: adaptors = [] prop = {} for adpt in q_master[q]: adaptors.append(adpt[0]) for p in checkList: prop.setdefault(p, []).append(adpt[1][p]) for p in checkList: if len(set(prop[p])) > 1: anyIssue += 1 printe('\n\n' + q + ' ' + ':') for k, a in enumerate(adaptors): printe(' * ' + a + '\t\t\t-->\t' + prop[p][k]) if anyIssue: printe('\n' + str(anyIssue) + ' adaptors inconsistencies were detected!') elif not quiet_if_ok: printi('All adaptors definitions are consistent: ' + str(list(adaptorInstances.keys())))
[docs]def interface(data, *args, **kw): if data is None or (np.iterable(data) and not len(data)): return None tmp = OMFITinterface(data, *args, **kw) if len(tmp.type): return tmp