Source code for omfit_classes.omfit_osborne

try:
    # framework is running
    from .startup_choice import *
except ImportError as _excp:
    # class is imported by itself
    if (
        'attempted relative import with no known parent package' in str(_excp)
        or 'No module named \'omfit_classes\'' in str(_excp)
        or "No module named '__main__.startup_choice'" in str(_excp)
    ):
        from startup_choice import *
    else:
        raise

from omfit_classes.omfit_mds import OMFITmds
from omfit_classes.omfit_ascii import OMFITascii
from omfit_classes.omfit_nc import OMFITncData
from omfit_classes.utils_base import printi
from omfit_classes.exceptions_omfit import OMFITexception
from omfit_classes.utils_math import interp1e
from omfit_classes.utils_fusion import is_device

import omas
import numpy as np
from scipy.interpolate import interp1d


__all__ = ['NoFitException', 'OMFITosborneProfile', 'OMFITpFile']

os.environ.setdefault('VPN_ACTIVE', '')


[docs]class NoFitException(OMFITexception): pass
[docs]class OMFITosborneProfile(OMFITmds): """ Class accesses Osborne fits stored in MDS+, and provides convenient methods for accessing the data or fitting functions used in those fits, including handling the covariance of the fitting parameters :param server: The device (really MDS+ archive) :param treename: The tree where the Osborne-tool profiles are stored :param shot: The shot number :param time: The timeid of the profile :param runid: The runid of the profile Note that this class assumes that the profiles are stored as [`tree`]['PROFDB_PED']['P<time>_<runid>'] """ def __init__(self, server='DIII-D', treename='auto', shot=None, time=None, runid=None): if treename == 'auto': if is_device(server, 'DIII-D'): treename = 'PEDESTAL' elif is_device(server, ('NSTX', 'NSTX-U')): treename = 'PROFDB_PED' self.OMFITproperties = {} OMFITmds.__init__(self, server, treename, shot) self.OMFITproperties['treename'] = treename self.OMFITproperties['time'] = time self.OMFITproperties['runid'] = runid self.OMFITproperties['shot'] = shot @property def treename(self): return self.OMFITproperties['treename'] @treename.setter def treename(self, value): self.OMFITproperties['treename'] = value @property def time(self): return self.OMFITproperties['time'] @time.setter def time(self, value): self.OMFITproperties['time'] = value @time.deleter def time(self): self.OMFITproperties['time'] = None @property def runid(self): return self.OMFITproperties['runid'] @runid.setter def runid(self, value): self.OMFITproperties['runid'] = value @runid.deleter def runid(self): self.OMFITproperties['runid'] = None @property def shot(self): return self.OMFITproperties['shot'] @shot.setter def shot(self, value): self.OMFITproperties['shot'] = value @shot.deleter def shot(self): self.OMFITproperties['shot'] = None
[docs] @dynaLoad def load(self): OMFITmds.load(self) profdb_ped = self if self.treename == 'PEDESTAL': profdb_ped = self['PROFDB_PED'] if 'P%s_%s' % (self.time, self.runid.upper()) not in profdb_ped: raise RuntimeError('Time: %s, Runid: %s does not exist for shot %s' % (self.time, self.runid, self.shot)) tmp = profdb_ped['P%s_%s' % (self.time, self.runid.upper())] self.clear() self.update(tmp)
def __repr__(self): return self.__class__.__name__ + '(' + ','.join(map(repr, [self.server, self.treename, self.shot, self.time, self.runid])) + ')'
[docs] def get_raw_data(self, x_var='rhob', quant='ne'): """ Get the raw data :param x_var: The mapping of the data (rhob,rhov,psi,R) :param quant: The quantity for which to return the data of the fit :return: x,y tuple * x - mapped radius of data * y - the data (an array of numbers with uncertainties) """ from uncertainties.unumpy import uarray node = self['%sDAT%s' % (quant.upper(), x_var.upper())] return node.dim_of(0), uarray(node.data(), node.error())
[docs] def plot_raw_data(self, x_var='rhob', quant='ne', **kw): r""" Plot the raw data :param x_var: The mapping of the data (rhob,rhov,psi,R) :param quant: The quantity for which to plot the data of the fit :param \**kw: Keyword dictionary passed to uerrorbar :return: The collection instance returned by uerrorbar """ from omfit_classes.utils_plot import uerrorbar from matplotlib import rcParams if quant == 'omeg': x, y = self.get_raw_data(quant='vt1', x_var=x_var) xR, yR = self.get_raw_data(quant='vt1', x_var='R') y = y / xR else: x, y = self.get_raw_data(x_var=x_var, quant=quant) kw.setdefault('color', rcParams['axes.prop_cycle'].by_key()['color'][0]) kw.setdefault('ms', 0.5) kw.setdefault('capsize', 0) kw.setdefault('marker', '.') return uerrorbar(x, y, **kw)
[docs] def calc_spline_fit(self, x_var='rhob', quant='ne', knots=5): """ Calculate a spline fit :param x_var: The mapping of the data (rhob,rhov,psi,R) :param quant: The quantity for which to calculate a spline fit :param knots: An integer (autoknotted) or a list of knot locations """ x, y = self.get_raw_data(x_var=x_var, quant=quant) fit = 'spl' fit_node = '%s%s%s' % (quant.upper(), fit.upper(), x_var.upper()) print(fit_node in self) if fit_node in self: node = self[fit_node] xfit = node.dim_of(0) if xfit is not None: print(max(xfit), x < max(xfit)) ind = x < max(xfit) x = x[ind] y = y[ind] if 'SPLINE_T' in node and node['SPLINE_T'].data() is not None: print('Using {fit_node} knots'.format(fit_node=fit_node)) knots = np.unique(node['SPLINE_T'].data()) print(knots) return fitSpline(x, nominal_values(y), std_devs(y), knots=knots, fit_SOL=True)
[docs] def get_fit(self, fit, x_var='rhob', quant='ne', corr=True, x_val=None): """ Get the fit, including uncertainties taking into account covariance of parameters :param fit: Which type of fit to retrieve :param x_var: The mapping of the fit (rhob,rhov,psi,R) :param quant: The quantity that was fit :param corr: (bool) Use the covariance of the parameters :param x_val: If the fit can be evaluated, if ``x_val`` is not ``None``, evaluate the fit at these locations :return: x,y tuple * x - radius of fit as stored in MDS+ * y - the fit (an array of numbers with uncertainties) """ node = self['%s%s%s' % (quant.upper(), fit.upper(), x_var.upper())] if node.data() is None: raise NoFitException('No fit for %s' % fit) def get_mdsplus_fit(): if node.data().shape == node.error().shape: return node.dim_of(0), uarray(node.data(), node.error()) else: return node.dim_of(0), uarray(node.data(), 0) import uncertainties import numpy as np from uncertainties.unumpy import uarray try: import data_fit_functions except Exception: warnings.warn('You must have Osborne tools installed to get the fit with covarianced errors. \n' 'Using the stored errors') return get_mdsplus_fit() else: orig_np = np try: data_fit_functions.numpy = uncertainties.unumpy data_fit_functions.numpy.where = np.where try: fit_fun = eval('data_fit_functions.%s' % (node['FITNAME'].data()[0])) except Exception: warnings.warn("Can't handle fitname %s" % (node['FITNAME'].data()[0])) return get_mdsplus_fit() if 'FIT_COEFCOV' not in node or not corr: printi('Using Osborne tool profile fit parameters without correlated uncertainties for fit:%s' % fit) params = uarray(node['FIT_COEF'].data(), node['FIT_COEFERR'].data()) else: params = uncertainties.correlated_values(node['FIT_COEF'].data(), node['FIT_COEFCOV'].data()) try: x = node.dim_of(0) if x_val is not None: x = x_val if node['FITNAME'].data()[0] == 'tanh_multi' and x_var.upper() == 'RHOB': return x, fit_fun(params, x, 0) else: return x, fit_fun(params, x) except Exception as _excp: printe(_excp) printe('Using MDS+ stored fit, without errorbars') return get_mdsplus_fit() finally: data_fit_functions.numpy = orig_np
[docs] def get_fit_deriv(self, nder, fit, **kw): r""" Apply deriv(x,y) `nder` times :param nder: Number of derivatives :param fit: Which type of fit to apply derivatives to :param \**kw: Keyword arguments passed to get_fit :return: x, d^{nder}y / d x^{nder} tuple * x - radius of fit as stored in MDS+ * d^n y / d x^n - The nder'th derivative of the fit """ x, y = self.get_fit(fit, **kw) for i in range(nder): y = deriv(x, y) return x, y
[docs] def get_fit_params(self, fit, x_var='rhob', quant='ne', doc=False, covariance=True): """ Get the parameters and their uncertainties :param fit: Which type of fit to retrieve (tnh0 is the correct tanhfit for scalings) :param x_var: The mapping of the fit (rhob,rhov,psi,R) :param quant: The quantity that was fit :param doc: (bool) if True print the fit documentation to understand which parameter is which :param covariance: (bool) return the covariance matrix instead of just the errors :return: params, cov -- tuple of parameters and their covariance matrix. Errors are the sqrt of diagonals: np.sqrt(np.diag(cov)) """ node = self['%s%s%s' % (quant.upper(), fit.upper(), x_var.upper())] if node.data() is None: raise NoFitException('No fit for %s' % fit) if doc: print(f"Fit documentation for {quant.upper()}{fit.upper()}{x_var.upper()}") print(node['FITDOC'].data()[0].format()) if covariance: if 'FIT_COEFCOV' not in node: # if the covariance matrix doesnt exist errors need to be in matrix printi('Covariance matrix not available using listed errors for fit:%s' % fit) params = node['FIT_COEF'].data() cov = np.zeros((len(params), len(params))) np.fill_diagnoal(cov, node['FIT_COEFERR'].data() ** 2) else: params, cov = node['FIT_COEF'].data(), node['FIT_COEFCOV'].data() else: # if covariance is False just return the errors params = node['FIT_COEF'].data() cov = node['FIT_COEFERR'].data() return params, cov
[docs] def plot_fit(self, fit, x_var='rhob', quant='ne', corr=True, nder=0, **kw): """ Plot the fit for the given quant, with the given x-coordinate. :param fit: Which type of fit to plot :param x_var: The mapping of the fit (rhob,rhov,psi,R) :param quant: The quantity that was fit :param corr: Plot with correlated uncertainty parameters :param nder: The number of derivatives to compute before plotting :return: The result of uband """ x, y = self.get_fit_deriv(nder, fit, x_var=x_var, quant=quant, corr=corr) from omfit_classes.utils_plot import uband return uband(x, y, **kw)
[docs] def plot_all_fits(self, x_var='rhob', quant='ne', corr=True, nder=0): """ Plot all fits for the given quantity `quant` and mapping `x_var` :param x_var: The mapping of the fit (rhob,rhov,psi,R) :param quant: The quantity that was fit :param corr: Plot with correlated uncertainty parameters :param nder: Plot the nder'th derivative of the fit (if nder=0, plot data also) :return: None """ if nder == 0: self.plot_raw_data(x_var=x_var, quant=quant) ki = 1 for k in list(self.keys()): if not (k.startswith(quant.upper()) and k.endswith(x_var.upper())): continue fit_type = k[len(quant) : -len(x_var)] if fit_type == 'DAT': continue try: from matplotlib import rcParams self.plot_fit( fit_type, x_var=x_var, quant=quant, color=rcParams['axes.prop_cycle'].by_key()['color'][ki % len(rcParams['axes.prop_cycle'].by_key()['color'])], corr=corr, label=fit_type, nder=nder, ) ki += 1 except (NoFitException, TypeError) as _excp: continue from pylab import draw draw()
def __save_kw__(self): """ :return: generate the kw dictionary used to save the attributes to be passed when reloading from OMFITsave.txt """ return self.OMFITproperties
[docs]class OMFITpFile(SortedDict, OMFITascii): r""" OMFIT class used to interface with Osborne pfiles :param filename: filename passed to OMFITobject class :param \**kw: keyword dictionary passed to OMFITobject class """ def __init__(self, filename, **kw): OMFITobject.__init__(self, filename, **kw) SortedDict.__init__(self) self.legacy = False self.is_remapped = False # define whether this is the original pFile or its psin grid has been remapped self.descriptions = SortedDict() self.descriptions['ne'] = 'Electron density' self.descriptions['te'] = 'Electron temperature' self.descriptions['ni'] = 'Ion density' self.descriptions['ti'] = 'Ion temperature' self.descriptions['nb'] = 'Fast ion density' self.descriptions['pb'] = 'Fast ion pressure' self.descriptions['ptot'] = 'Total pressure' self.descriptions['omeg'] = 'Toroidal rotation: VTOR/R' self.descriptions['omegp'] = 'Poloidal rotation: Bt * VPOL / (RBp)' self.descriptions['omgvb'] = 'VxB rotation term in the ExB rotation frequency: OMEG + OMEGP' self.descriptions['omgpp'] = 'Diamagnetic term in the ExB rotation frequency: (P_Carbon)/dpsi / (6*n_Carbon)' self.descriptions['omgeb'] = 'ExB rotation frequency: OMGPP + OMGVB = Er/(RBp)' self.descriptions['er'] = 'Radial electric field from force balance: OMGEB * RBp' self.descriptions['ommvb'] = 'Main ion VXB term of Er/RBp, considered a flux function' self.descriptions['ommpp'] = 'Main ion pressure term of Er/RBp, considered a flux function' self.descriptions['omevb'] = 'Electron VXB term of Er/RBp, considered a flux function' self.descriptions['omepp'] = 'Electron pressure term of Er/RBp, considered a flux function' self.descriptions['kpol'] = 'KPOL=VPOL/Bp : V_vector = KPOL*B_vector + OMGEB * PHI_Vector' self.descriptions['N Z A'] = 'N Z A of ION SPECIES' self.descriptions['omghb'] = 'Hahm-Burrell form for the ExB velocity shearing rate: OMGHB = (RBp)**2/Bt * d (Er/RBp)/dpsi' self.descriptions['nz1'] = 'Density of the 1st impurity species' self.descriptions['vtor1'] = 'Toroidal velocity of the 1st impurity species' self.descriptions['vpol1'] = 'Poloidal velocity of the 1st impurity species' self.descriptions['nz2'] = 'Density of the 2nd impurity species' self.descriptions['vtor2'] = 'Toroidal velocity of the 2nd impurity species' self.descriptions['vpol2'] = 'Poloidal velocity of the 2nd impurity species' # There may be more impurity species, but let's stop here for now. self.units = SortedDict() self.units['ne'] = '10^20/m^3' self.units['te'] = 'KeV' self.units['ni'] = '10^20/m^3' self.units['ti'] = 'KeV' self.units['nb'] = '10^20/m^3' self.units['pb'] = 'KPa' self.units['ptot'] = 'KPa' self.units['omeg'] = 'kRad/s' self.units['omegp'] = 'kRad/s' self.units['omgvb'] = 'kRad/s' self.units['omgpp'] = 'kRad/s' self.units['omgeb'] = 'kRad/s' self.units['ommvb'] = '' self.units['ommpp'] = '' self.units['omevb'] = '' self.units['omepp'] = '' self.units['er'] = 'kV/m' self.units['kpol'] = 'km/s/T' self.units['N Z A'] = '' self.units['omghb'] = '' self.units['nz1'] = '10^20/m^3' self.units['vtor1'] = 'km/s' self.units['vpol1'] = 'km/s' self.units['nz2'] = '10^20/m^3' self.units['vtor2'] = 'km/s' self.units['vpol2'] = 'km/s' if os.stat(self.filename).st_size: self.dynaLoad = True else: for key in list(self.descriptions.keys()): if key in ('N Z A',): continue self[key] = OMFITncData() self[key]['data'] = np.array([0]) self[key]['description'] = self.descriptions[key] self[key]['psinorm'] = np.array([0]) self[key]['units'] = self.units[key] self[key]['derivative'] = np.array([0])
[docs] @dynaLoad def load(self): """ Method used to load the content of the file specified in the .filename attribute :return: None """ self.clear() # read the file f = os.path.getsize(self.filename) if f == 0: return fn = self.filename with open(fn, 'r') as f: fl = f.read().strip().splitlines(True) ind = 0 while True: try: line = fl[ind] cur = line.split() curData = fl[ind + 1].split() except IndexError: break num = int(re.sub(r'([0-9]*)\s.*', r'\1', line)) xkey = re.sub(r'([0-9]*)\s(.*)\s(.*)\((.*)\)\s(.*)\n', r'\2', line) key = re.sub(r'([0-9]*)\s(.*)\s(.*)\((.*)\)\s(.*)\n', r'\3', line) units = re.sub(r'([0-9]*)\s(.*)\s(.*)\((.*)\)\s(.*)\n', r'\4', line) der = re.sub(r'([0-9]*)\s(.*)\s(.*)\((.*)\)\s(.*)\n', r'\5', line) quants = [xkey, key + '(' + units + ')', der] q = [] for i in range(ind + 1, ind + 1 + num): q.append(list(map(float, fl[i].split()))) q = list(zip(*q)) if key in self: if np.sum(np.array(self[key]['data']) != np.array(q[1])): raise ValueError('%s already defined, but trying to change its value' % quants[1]) elif 'N Z A of ION SPECIES' in line: self['N Z A'] = OMFITncData() self['N Z A']['description'] = 'N Z A of ION SPECIES' self['N Z A']['N'] = np.array(q[0]) self['N Z A']['Z'] = np.array(q[1]) self['N Z A']['A'] = np.array(q[2]) else: self[key] = OMFITncData() if key in self.descriptions: self[key]['description'] = self.descriptions[key] else: self[key]['description'] = key self[key]['units'] = units for qi, quant in enumerate(quants): if qi == 1: self[key]['data'] = np.array(q[qi]) elif quant == 'd%s/dpsiN' % key or quant == 'd%s/dpsi' % key: self[key]['derivative'] = np.array(q[qi]) else: self[key][quant] = np.array(q[qi]) ind = ind + 1 + num
[docs] @dynaSave def save(self): """ Method used to save the content of the object to the file specified in the .filename attribute :return: None """ lines = [] for key in self.keys(filter=hide_ptrn): if key == 'N Z A': if self.legacy: break lines.append('%i N Z A of ION SPECIES\n' % (len(self[key]['A']),)) for i in range(len(self[key]['A'])): lines.append(" %f %f %f\n" % (self[key]['N'][i], self[key]['Z'][i], self[key]['A'][i])) else: if len(self[key]['data']) == 1: continue lines.append("%i psinorm %s(%s) d%s/dpsiN\n" % (len(self[key]['data']), key, self[key]['units'], key)) for i in range(len(self[key]['data'])): lines.append(" %f %f %f\n" % (self[key]['psinorm'][i], self[key]['data'][i], self[key]['derivative'][i])) with open(self.filename, 'w') as f: f.writelines(lines)
[docs] def plot(self): """ Method used to plot all profiles, one in a different subplots :return: None """ nplot = int(len(self)) cplot = int(np.floor(np.sqrt(nplot) / 2.0) * 2) rplot = int(np.ceil(nplot * 1.0 / cplot)) pyplot.subplots_adjust(wspace=0.35, hspace=0.0) pyplot.ioff() try: for k, item in enumerate(self): r = int(np.floor(k * 1.0 / cplot)) c = int(k - r * cplot) if k == 0: ax1 = pyplot.subplot(rplot, cplot, r * (cplot) + c + 1) ax = ax1 else: ax = pyplot.subplot(rplot, cplot, r * (cplot) + c + 1, sharex=ax1) ax.ticklabel_format(style='sci', scilimits=(-3, 3)) if 'psinorm' not in self[item]: printi('Can\'t plot %s because no psinorm attribute' % item) continue x = self[item]['psinorm'] pyplot.plot(x, self[item]['data'], '.-') pyplot.text(0.5, 0.9, item, horizontalalignment='center', verticalalignment='top', size='medium', transform=ax.transAxes) if k < len(self) - cplot: pyplot.setp(ax.get_xticklabels(), visible=False) else: pyplot.xlabel('$\\psi$') pyplot.xlim(min(x), max(x)) finally: pyplot.draw() pyplot.ion()
[docs] def remap(self, points='ne', **kw): """ Remap the disparate psinorm grids for each vaiable onto the same psinorm grid :param npoints: number of points to remap the originally 256 grid - If points is int: make an evenly spaced array. - If points is array: use this as the grid - If points is a string: use the 'psinorm' from that item in the pfile (by default `ne['psinorm']`) :param \**kw: addidional keywords are passed to scipy.interpolate.interp1d return: The entire object remapped to the same psinorm grid """ mapped = copy.deepcopy(self) keys = mapped.keys() if isinstance(points, str): remap_psin = self[points]['psinorm'] elif type(points) is int: # Evenly spaced grid rebase remap_psin = np.linspace(0, 1, points) elif type(points) in [list, tuple, np.ndarray]: # user provided grid rebase assert np.all(np.abs(points) <= 1), 'All values must be on the interval [0, 1]' assert np.all(np.diff(points) > 0), 'All values must be monotonically increasing' remap_psin = points mapped['psinorm'] = remap_psin # save the psin grid # iterate through all keys for key in keys: if key in ('N Z A',): # this is not mapped to psi so skip it continue f_key = interp1d(self[key]['psinorm'], self[key]['data']) f_key_derivative = interp1d(self[key]['psinorm'], self[key]['derivative'], **kw) mapped[key]['psinorm'] = remap_psin mapped[key]['data'] = f_key(remap_psin) mapped[key]['derivative'] = f_key_derivative(remap_psin) return mapped
[docs] def to_omas(self, ods=None, time_index=0, gEQDSK=None): """ translate OMFITpFile class to OMAS data structure :param ods: input ods to which data is added :param time_index: time index to which data is added :param gEQDSK: corresponding gEQDSK (if ods does not have equilibrium already :return: ods """ # Lyons: I've used my own rotation variable definitions here, until IMAS properly defines them # Note that negative signs and sign_Ip are needed to make # omega_perp, omega_dia, and omega_ExB have the same sign convention from omfit_classes.omfit_eqdsk import OMFITgeqdsk from omas import ODS, omas_environment from omfit_classes.utils_math import atomic_element if ods is None: ods = ODS() if gEQDSK is not None: gEQDSK.to_omas(ods=ods, time_index=time_index) else: filename = 'g' + os.path.split(self.filename)[1][1:] gEQDSK = OMFITgeqdsk(filename).from_omas(ods=ods, time_index=time_index) cocosio = 1 # pFile's have CCW phi and CW theta assert gEQDSK.cocos == cocosio, "gEQDSK must be in COCOS %d" % cocosio # get required quantities sign_Ip = np.sign(gEQDSK['CURRENT']) # get gEQDSK quantities psin_eq = np.linspace(0.0, 1.0, len(gEQDSK['PRES'])) rhotn_eq = gEQDSK['RHOVN'] R_eq = gEQDSK['fluxSurfaces']['midplane']['R'] Bt_eq = gEQDSK['fluxSurfaces']['midplane']['Bt'] Bp_eq = gEQDSK['fluxSurfaces']['midplane']['Bp'] quants = [ ('rotation_frequency_tor_sonic', 'omgeb', sign_Ip * 1e3), # sign_Ip accounts for abs(Bp) in pFile definition ('pressure_perpendicular', 'ptot', 1e3 / 3.0), ('pressure_parallel', 'ptot', 1e3 / 3.0), ('e_field.radial', 'er', 1e3), ('electrons.density_thermal', 'ne', 1e20), ('electrons.temperature', 'te', 1e3), ('electrons.rotation.perpendicular', 'omevb', 1e3), ('electrons.rotation.diamagnetic', 'omepp', sign_Ip * 1e3), ] # sign_Ip accounts for abs(dpsi) in pFile definition def get_ion(N, Z, A): label = list(atomic_element(Z=N, A=int(A)).values())[0]['symbol'] k = -1 ion = None prof1d = ods['core_profiles']['profiles_1d'][time_index] if 'ion' in prof1d: for k in prof1d['ion']: if ( prof1d['ion'][k]['label'] == label and int(prof1d['ion'][k]['element'][0]['a']) == int(A) and prof1d['ion'][k]['element'][0]['z_n'] == Z ): # found the ion ion = prof1d['ion'][k] err = "OMAS only works for single ion states" assert ion['multiple_states_flag'] == 0, err if ion is None: k += 1 # need to add new ion ion = prof1d['ion'][k] ion['multiple_states_flag'] = 0 ion['label'] = label ion['element'][0]['a'] = A ion['element'][0]['z_n'] = Z return k, ion if 'N Z A' in self: # Assume multiple ion species are defined mk, main_ion = get_ion(N=self['N Z A']['N'][-2], Z=self['N Z A']['Z'][-2], A=self['N Z A']['A'][-2]) bk, beam_ion = get_ion(N=self['N Z A']['N'][-1], Z=self['N Z A']['Z'][-1], A=self['N Z A']['A'][-1]) else: printw("'N Z A' missing from OMFITpFile; assuming deuterium for main and beam ions") mk, main_ion = get_ion(N=1, Z=1, A=2.0) bk, beam_ion = get_ion(N=1, Z=1, A=2.0) quants += [ (f'ion.{mk}.density_thermal', 'ni', 1e20), (f'ion.{mk}.temperature', 'ti', 1e3), (f'ion.{mk}.rotation.perpendicular', 'ommvb', 1e3), (f'ion.{mk}.rotation.diamagnetic', 'ommpp', sign_Ip * 1e3), # sign_Ip accounts for abs(dpsi) in pFile definition (f'ion.{bk}.density_fast', 'nb', 1e20), (f'ion.{bk}.pressure_fast_perpendicular', 'pb', 1e3 / 3.0), (f'ion.{bk}.pressure_fast_parallel', 'pb', 1e3 / 3.0), (f'ion.{mk}.velocity.toroidal', 'vtor1', 1e3), (f'ion.{mk}.velocity.poloidal', 'vpol1', 1e3), ] if 'N Z A' in self: for i in range(len(self['N Z A']['N']) - 2): ik, imp_ion = get_ion(N=self['N Z A']['N'][i], Z=self['N Z A']['Z'][i], A=self['N Z A']['A'][i]) quants += [ (f'ion.{ik}.density_thermal', 'nz%d' % (i + 1), 1e20), (f'ion.{ik}.temperature', 'ti', 1e3), ] # Lyons: I think it's assumed all ions have same temperature if imp_ion['label'] == 'C': # working with Carbon quants += [ (f'ion.{ik}.rotation.perpendicular', 'omgvb', 1e3), (f'ion.{ik}.rotation.diamagnetic', 'omgpp', sign_Ip * 1e3), # sign_Ip accounts for abs(dpsi) in pFile definition (f'ion.{ik}.rotation.parallel_stream_function', 'kpol', 1e3), ] else: ik, imp_ion = get_ion(N=6, Z=6, A=12.0) if 'nz1' in self: printw(" carbon impurity") quants += [(f'ion.{ik}.density_thermal', 'nz1', 1e20)] else: printw(" carbon impurity with trace density") quants += [(f'ion.{ik}.density_thermal', 'ni', 1e-6)] quants += [ (f'ion.{ik}.temperature', 'ti', 1e3), # Lyons: I think it's assumed all ions have same temperature (f'ion.{ik}.rotation.perpendicular', 'omgvb', 1e3), (f'ion.{ik}.rotation.diamagnetic', 'omgpp', sign_Ip * 1e3), # sign_Ip accounts for abs(dpsi) in pFile definition (f'ion.{ik}.rotation.parallel_stream_function', 'kpol', 1e3), ] for q_ods, q_p, fac in quants: if q_p in self: # pFiles don't enforce that every variable has the same psinorm grid rho_tor_norm = np.interp(self[q_p]['psinorm'], psin_eq, rhotn_eq) coordsio = {f'core_profiles.profiles_1d.{time_index}.grid.rho_tor_norm': rho_tor_norm} with omas_environment(ods, cocosio=cocosio, coordsio=coordsio): prof1d = ods['core_profiles']['profiles_1d'][time_index] prof1d[q_ods] = fac * self[q_p]['data'] else: printw("pFile does not contain " + q_p + ". Skipping") if 'N Z A' in self: for i in range(len(self['N Z A']['N']) - 2): ik, imp_ion = get_ion(N=self['N Z A']['N'][i], Z=self['N Z A']['Z'][i], A=self['N Z A']['A'][i]) if imp_ion['label'] != 'C': try: # save rotations for non-carbon psin = self[f'vpol{(i + 1)}']['psinorm'] rho_tor_norm = np.interp(psin, psin_eq, rhotn_eq) coordsio = {f'core_profiles.profiles_1d.{time_index}.grid.rho_tor_norm': rho_tor_norm} R = np.interp(psin, psin_eq, R_eq) Bp = np.interp(psin, psin_eq, Bp_eq) Bt = np.interp(psin, psin_eq, Bt_eq) with omas_environment(ods, cocosio=cocosio, coordsio=coordsio): prof1d = ods['core_profiles']['profiles_1d'][time_index] # parallel stream function from vpol and Bp vpol = sign_Ip * self[f'vpol{(i + 1)}']['psinorm']['data'] # vpol in pFile has a sign_Ip prof1d[f'ion.{ik}.velocity.poloidal'] = 1e3 * vpol prof1d[f'ion.{ik}.rotation.parallel_stream_function'] = 1e3 * vpol / Bp # perpendicular rotation from vtor and vpol vtor = np.interp(psin, self[f'vtor{(i + 1)}']['psinorm'], self[f'vtor{(i + 1)}']['data']) prof1d[f'ion.{ik}.velocity.toroidal'] = 1e3 * vtor omeg = vtor / R omegp = -vpol * Bt / (R * Bp) prof1d[f'ion.{ik}.rotation.perpendicular'] = 1e3 * (omeg + omegp) # diamagentic rotation from vtor, vpol, and omgeb omgeb = sign_Ip * np.interp( psin, self['omgeb']['psinorm'], self['omgeb']['data'] ) # sign_Ip accounts for abs(Bp) in pFile definition prof1d[f'ion.{ik}.rotation.diamagnetic'] = 1e3 * (omgeb - omeg - omegp) except KeyError: printw(f'Some velocity info missing for pFile ion {(i + 1)}, so possible missing info for ODS ion {ik}') continue return ods
[docs] def from_omas(self, ods=None, time_index=0): """ translate OMAS data structure to OMFITpFile :param ods: input ods to take data from :param time_index: time index to which data is added :return: ods """ # Lyons: I've used my own rotation variable definitions here, until IMAS properly defines them # Note that negative signs and sign_Ip are needed to make # omega_perp, omega_dia, and omega_ExB have the same sign convention from omfit_classes.omfit_eqdsk import OMFITgeqdsk from omas import ODS, omas_environment from omfit_classes.utils_math import atomic_element cocosio = 1 # pFile's have CCW phi and CW theta # get required quantities with omas_environment(ods, cocosio=cocosio): sign_Ip = np.sign(ods['equilibrium']['time_slice'][time_index]['global_quantities']['ip']) prof1d = ods['core_profiles']['profiles_1d'][time_index] # determine main ion by highest mean thermal density den_mk = 0.0 ions = list(prof1d['ion'].keys()) ck = None for k in ions: den_k = np.mean(prof1d['ion'][k]['density_thermal']) if den_k > den_mk: # define main ion den_mk = den_k mk = k if prof1d['ion'][k]['label'] == 'C': # define carbon ion (if found) ck = k # move main ion to end ions.remove(mk) ions += [mk] # move carbon ion to the beginning if ck is not None: ions.remove(ck) ions = [ck] + ions Nions = len(ions) + 1 self['N Z A'] = OMFITncData() self['N Z A']['description'] = 'N Z A of ION SPECIES' self['N Z A']['N'] = np.zeros(Nions) self['N Z A']['Z'] = np.zeros(Nions) self['N Z A']['A'] = np.zeros(Nions) quants = [ ('ne', 'electrons.density_thermal', 1e-20), ('te', 'electrons.temperature', 1e-3), ('ni', f'ion.{mk}.density_thermal', 1e-20), ('ti', f'ion.{mk}.temperature', 1e-3), ('nb', f'ion.{mk}.density_fast', 1e-20), ('pb', [f'ion.{mk}.pressure_fast_perpendicular', f'ion.{mk}.pressure_fast_parallel'], [2e-3, 1e-3]), ('ptot', ['pressure_perpendicular', 'pressure_parallel'], [2e-3, 1e-3]), ('omgeb', 'omega0', sign_Ip * 1e-3), # sign_Ip accounts for abs(Bp) in pFile definition ('omevb', 'electrons.rotation.perpendicular', 1e-3), ('omepp', 'electrons.rotation.diamagnetic', sign_Ip * 1e-3), # sign_Ip accounts for abs(dpsi) in pFile definition ('ommvb', f'ion.{mk}.rotation.perpendicular', 1e-3), ('ommpp', f'ion.{mk}.rotation.diamagnetic', sign_Ip * 1e-3), ] # sign_Ip accounts for abs(dpsi) in pFile definition for i, k in enumerate(ions): # currently we don't store charge state information, so Z = N N = prof1d['ion'][k]['element'][0]['z_n'] A = prof1d['ion'][k]['element'][0]['a'] if k == mk: # main and beam ions self['N Z A']['N'][-1] = self['N Z A']['N'][-2] = N self['N Z A']['Z'][-1] = self['N Z A']['Z'][-2] = N self['N Z A']['A'][-1] = self['N Z A']['A'][-2] = A else: # impurity ions self['N Z A']['N'][i] = N self['N Z A']['Z'][i] = N self['N Z A']['A'][i] = A pk = i + 1 # iterate and use for pfile indexing if f'nz{pk}' not in self: for key, desc, units in [ (f'nz{pk}', 'Density', '10^20/m^3'), (f'vtor{pk}', 'Toroidal velocity', 'km/s'), (f'vpol{pk}', 'Toroidal velocity', 'km/s'), ]: self[key] = OMFITncData() self[key]['data'] = np.array([0]) self[key]['description'] = desc + f' of the #{pk} impurity species' self[key]['psinorm'] = np.array([0]) self[key]['units'] = units self[key]['derivative'] = np.array([0]) quants += [(f'nz{pk}', f'ion.{k}.density_thermal', 1e-20)] # load gEQDSK from ODS # Need to identify correct time. gEQDSK = OMFITgeqdsk('g0.0').from_omas(ods, time_index=time_index) gtime = ods[f'equilibrium.time_slice.{time_index}.time'] if gtime != prof1d['time']: printw("Warning: The time corresponding to the equilibrium and the profile is mismatched!") printw(f" Please check your input ods!") printw( f" For time_index = {time_index}, the equilibrium has time = {gtime}, and the core_profile has time = {prof1d['time']}." ) # load data from ODS with omas_environment(ods, cocosio=cocosio): psi1D = interp1e( ods[f'equilibrium.time_slice.{time_index}.profiles_1d.rho_tor_norm'], ods[f'equilibrium.time_slice.{time_index}.profiles_1d.psi'], )(ods[f'core_profiles.profiles_1d.{time_index}.grid.rho_tor_norm']) coordsio = {f'equilibrium.time_slice.{time_index}.profiles_1d.psi': psi1D} with omas_environment(ods, cocosio=cocosio, coordsio=coordsio): prof1d = ods['core_profiles']['profiles_1d'][time_index] # pFiles are on psinorm grid but core_profiles data on rho_tor_norm gird rhotn = ods['equilibrium']['time_slice'][time_index]['profiles_1d']['rho_tor_norm'] psi = ods['equilibrium']['time_slice'][time_index]['profiles_1d']['psi'] psib = np.interp(1.0, rhotn, psi) # in case grid extends beyond rhotn = 1. psin = (psi - psi[0]) / (psib - psi[0]) for q_p, q_ods, fac in quants: self[q_p]['psinorm'] = psin try: if isinstance(q_ods, list): self[q_p]['data'] = 0.0 * psin for i, qo in enumerate(q_ods): self[q_p]['data'] += fac[i] * nominal_values(prof1d[qo]) else: self[q_p]['data'] = fac * nominal_values(prof1d[q_ods]) except ValueError: self[q_p]['data'] = 0.0 * psin # calc auxiliary quantities from gEQDSK['fluxSurfaces'] psin_eq = np.linspace(0.0, 1.0, len(gEQDSK['PRES'])) R = np.interp(psin, psin_eq, gEQDSK['fluxSurfaces']['midplane']['R']) Bt = np.interp(psin, psin_eq, gEQDSK['fluxSurfaces']['midplane']['Bt']) Bp = np.interp(psin, psin_eq, gEQDSK['fluxSurfaces']['midplane']['Bp']) self['er']['psinorm'] = psin self['er']['data'] = R * abs(Bp) * self['omgeb']['data'] # pFile Er same sign as omgeb zero_array = 0.0 * psin for ik, k in enumerate(ions): ik += 1 if k != mk: # these are all as they're defined in pFile kpol = None omegp = None vpol = None omgvb = None omeg = None vtor = None omgpp = None missing = [] if f'ion.{k}.rotation.parallel_stream_function' in prof1d: kpol = 1e-3 * nominal_values(prof1d[f'ion.{k}.rotation.parallel_stream_function']) omegp = -Bt * kpol / R vpol = sign_Ip * kpol * Bp # sign_Ip accounts for pFile definition if f'ion.{k}.rotation_frequency_tor' in prof1d: omeg = 1e-3 * nominal_values(prof1d[f'ion.{k}.rotation_frequency_tor']) if f'ion.{k}.rotation.perpendicular' in prof1d: omgvb = 1e-3 * nominal_values(prof1d[f'ion.{k}.rotation.perpendicular']) if f'ion.{k}.rotation.diamagnetic' in prof1d: # sign_Ip accounts for abs(dpsi) in pFile definition omgpp = -1e-3 * sign_Ip * nominal_values(prof1d[f'ion.{k}.rotation.diamagnetic']) omgeb = self['omgeb']['data'] if (omgvb is None) and (omeg is not None) and (omegp is not None): omgvb = omeg + omegp if (not np.any(omgeb)) and (omgpp is not None) and (omgvb is not None): omgeb = omgpp + sign_Ip * omgvb if omeg is None and omgvb is not None and omegp is not None: omeg = omgvb - omegp if omeg is not None: vtor = R * omeg if omgpp is None: # sign_Ip accounts pFile definition omgpp = omgeb - sign_Ip * omgvb if vpol is None and f'ion.{k}.velocity.poloidal' in prof1d: vpol = 1e-3 * nominal_values(prof1d[f'ion.{k}.velocity.poloidal']) if vtor is None and f'ion.{k}.velocity.toroidal' in prof1d: vtor = 1e-3 * nominal_values(prof1d[f'ion.{k}.velocity.toroidal']) # Calculate fallback omevb and ommvb omevb = self['omevb']['data'] # These are in params above and will be present, but may be all zero omepp = self['omepp']['data'] ommvb = self['ommvb']['data'] ommpp = self['ommpp']['data'] if (not np.any(omevb)) and (omepp is not None) and np.any(omgeb): omevb = sign_Ip * (omgeb - omepp) if (not np.any(ommvb)) and (ommpp is not None) and np.any(omgeb): ommvb = sign_Ip * (omgeb - ommpp) params = [(f'vpol{ik}', vpol), (f'vtor{ik}', vtor)] if ik == 1: params += [ ('kpol', kpol), ('omegp', omegp), ('omgvb', omgvb), ('omeg', omeg), ('omgpp', omgpp), ('omevb', omevb), ('ommvb', ommvb), ('omgeb', omgeb), ] for name, val in params: self[name]['psinorm'] = psin if val is not None: self[name]['data'] = val else: self[name]['data'] = 0.0 * psin missing += [name] for name in missing: printw(f"pFile.from_omas: {name} could not be defined") for key in list(self.keys()): if key != 'N Z A': if len(self[key]['data']) == 1: del self[key] else: self[key]['derivative'] = deriv(self[key]['psinorm'], self[key]['data']) # Hahm-Burrell shearing rate # self['omghb'] = OMFITncData() # self['omghb']['data'] = 1e-3*(R*Bp)**2*self['omgeb']['derivative'] # self['omghb']['data'] /= abs(gEQDSK['SIBRY']-gEQDSK['SIMAG'])*np.sqrt(Bt**2 + Bp**2) # self['omghb']['description'] = self.descriptions['omghb'] # self['omghb']['psinorm'] = psin # self['omghb']['units'] = self.units['omghb'] # self['omghb']['derivative'] = deriv(self['omghb']['psinorm'], self['omghb']['data']) # printw('Hahm-Burrell shearing rate not written to pFile') return self
# OMAS extra_structures # omega0 is kept for backward compatibility _extra_structures = { 'core_profiles': { "core_profiles.profiles_1d[:].omega0": { "coordinates": ["core_profiles.profiles_1d[:].grid.rho_tor_norm"], "data_type": "FLT_1D", "documentation": "ExB plasma rotation", "full_path": "core_profiles/profiles_1d(itime)/omega0(:)", "data_type": "FLT_1D", "units": "rad/s", "cocos_signal": "TOR", }, "core_profiles.profiles_1d[:].electrons.rotation.perpendicular": { "coordinates": ["core_profiles.profiles_1d[:].grid.rho_tor_norm"], "data_type": "FLT_1D", "documentation": "electron perpendicular VxB rotation", "full_path": "core_profiles/profiles_1d(itime)/electrons/rotation/perpendicular(:)", "data_type": "FLT_1D", "units": "rad/s", "cocos_signal": "TOR", }, "core_profiles.profiles_1d[:].electrons.rotation.diamagnetic": { "coordinates": ["core_profiles.profiles_1d[:].grid.rho_tor_norm"], "data_type": "FLT_1D", "documentation": "electron diamagnetic rotation", "full_path": "core_profiles/profiles_1d(itime)/electrons/rotation/diamagnetic(:)", "data_type": "FLT_1D", "units": "rad/s", "cocos_signal": "TOR", }, "core_profiles.profiles_1d[:].ion[:].rotation.perpendicular": { "coordinates": ["core_profiles.profiles_1d[:].grid.rho_tor_norm"], "data_type": "FLT_1D", "documentation": "ion perpendicular VxB rotation", "full_path": "core_profiles/profiles_1d(itime)/ion(i1)/rotation/perpendicular(:)", "data_type": "FLT_1D", "units": "rad/s", "cocos_signal": "TOR", }, "core_profiles.profiles_1d[:].ion[:].rotation.diamagnetic": { "coordinates": ["core_profiles.profiles_1d[:].grid.rho_tor_norm"], "data_type": "FLT_1D", "documentation": "ion diamagnetic rotation", "full_path": "core_profiles/profiles_1d(itime)/ion(i1)/rotation/diamagnetic(:)", "data_type": "FLT_1D", "units": "rad/s", "cocos_signal": "TOR", }, "core_profiles.profiles_1d[:].ion[:].rotation.parallel_stream_function": { "coordinates": ["core_profiles.profiles_1d[:].grid.rho_tor_norm"], "data_type": "FLT_1D", "documentation": "ion parallel stream function", "full_path": "core_profiles/profiles_1d(itime)/ion(i1)/rotation/parallel_stream_function(:)", "data_type": "FLT_1D", "units": "rad/s" # no COCOS transformation }, } } omas.omas_utils._structures = {} omas.omas_utils._structures_dict = {} if not hasattr(omas.omas_utils, '_extra_structures'): omas.omas_utils._extra_structures = {} for _ids in _extra_structures: for _item in _extra_structures[_ids]: _extra_structures[_ids][_item]['lifecycle_status'] = 'omfit' omas.omas_utils._extra_structures.setdefault(_ids, {}).update(_extra_structures[_ids])