Source code for omfit_classes.omfit_profiles

try:
    # framework is running
    from .startup_choice import *
except ImportError as _excp:
    # class is imported by itself
    if (
        'attempted relative import with no known parent package' in str(_excp)
        or 'No module named \'omfit_classes\'' in str(_excp)
        or "No module named '__main__.startup_choice'" in str(_excp)
    ):
        from startup_choice import *
    else:
        raise

from omfit_classes.utils_math import *
from omfit_classes.omfit_base import OMFITtree
from omfit_classes.omfit_osborne import OMFITpFile
from omfit_classes.omfit_eqdsk import OMFITgeqdsk
from omfit_classes.omfit_data import OMFITncDataset, OMFITncDynamicDataset, importDataset
from omas import ODS, omas_environment, cocos_transform, define_cocos
from omfit_classes.omfit_omas_utils import add_generic_OMFIT_info_to_ods
from omfit_classes.omfit_rdb import OMFITrdb

import inspect
import numpy as np
from uncertainties import unumpy, ufloat
from uncertainties.unumpy import std_devs, nominal_values
from scipy import constants

np.seterr(invalid='ignore', divide='ignore')

__all__ = ['OMFITprofiles', 'OMFITprofilesDynamic', 'available_profiles']

model_tree_species = ['e', '2H1', '4He2', '6Li3', '10B5', '12C6', '14N7', '20Ne10']

# fmt: off
model_tree_quantities = ['angular_momentum', 'angular_momentum_density', 'angular_momentum_density_{species}', 'angular_momentum_{species}',
'dn_{species}_dpsi', 'dT_{species}_dpsi', 'ELM_phase', 'ELM_since_last', 'ELM_until_next', 'epot_{species}', 'Er_{species}',
'Er_{species}_gradP', 'Er_{species}_gradP_Vtor', 'Er_{species}_Vpol', 'Er_{species}_Vtor', 'Er_{species}_VxB', 'f_Z',
'fpol', 'gamma_ExB_{species}', 'gamma_ExB_{species}_gradP', 'gamma_ExB_{species}_Vpol', 'gamma_ExB_{species}_Vtor', 'I_total', 'index',
'J_BS', 'J_efit_norm', 'J_ohm', 'J_tot', 'jboot_sauter', 'lnLambda', 'mass_density', 'n_fast_{species}', 'n_{species}',
'nclass_sigma', 'nu_star_{species}', 'nu_{species}', 'omega_E_{species}', 'omega_gyro_{species}_midplane',
'omega_LX_{species}_midplane', 'omega_N_{species}', 'omega_NTV0_{species}', 'omega_P_{species}', 'omega_plasma_{species}',
'omega_RX_{species}_midplane', 'omega_T_{species}', 'omega_tor_{species}', 'omega_tor_{species}_KDG', 'P_brem', 'p_fast_{species}', 'P_rad',
'P_rad_cNi', 'P_rad_cW', 'P_rad_int', 'P_rad_nNi', 'P_rad_nW', 'P_rad_ZnNi', 'P_rad_ZnW', 'p_thermal', 'p_tot', 'p_total', 'p_{species}',
'pres', 'psi', 'psi_n', 'psi_n_2d', 'psin_n_{species}', 'psin_T_{species}',
'psin_V_tor_{species}', 'q', 'R_midplane', 'raw_n_{species}', 'raw_T_{species}', 'raw_V_tor_{species}', 'resistivity', 'rho',
'SAWTOOTH_phase', 'SAWTOOTH_since_last', 'SAWTOOTH_until_next', 'sigma_nc', 'T_fast_{species}', 'T_i', 'T_i_T_e_ratio',
'T_{species}', 'time', 'Total_Zeff', 'V_pol_{species}_KDG', 'V_tor_{species}',
'Zavg_Ni', 'Zavg_W', 'Zeff']

# fmt: on


def ugrad1(a2d):
    """
    Gradient along second axis with uncertainty propagation.
    :param a2d: 2D array or uarray
    :return:
    """
    if isinstance(a2d, DataArray):
        a2d = a2d.values
    if is_uncertain(a2d):
        dy = np.gradient(nominal_values(a2d), axis=1)
        ye = std_devs(a2d)
        sigma = np.zeros_like(ye)
        sigma[:, 1:-1] = 0.5 * np.sqrt(ye[:, :-2] ** 2 + ye[:, 2:] ** 2)
        sigma[:, 0] = 0.5 * np.sqrt(ye[:, 0] ** 2 + ye[:, 1] ** 2)
        sigma[:, -1] = 0.5 * np.sqrt(ye[:, -2] ** 2 + ye[:, -1] ** 2)
        result = unumpy.uarray(dy, sigma)
    else:
        result = np.gradient(a2d, axis=1)
    return result


def mZ(species):
    """
    Parse subscript strings and return ion mass and charge

    :param species: subscript strings such as `e`, `12C6`, `2H1`, 'fast_2H1`, ...

    :return: m and Z
    """
    species = str(species).replace('fast_', '')
    if species == 'e':
        Z = -1
        m = constants.m_e
    else:
        m = int(re.sub('([0-9]+)([a-zA-Z]+)([0-9]+)', r'\1', species))
        name = re.sub('([0-9]+)([a-zA-Z]+)([0-9]+)', r'\2', species)
        Z = int(re.sub('([0-9]+)([a-zA-Z]+)([0-9]+)', r'\3', species))
        m *= constants.m_u
    return m, Z


def get_species(derived):
    """
    Identify species and ions that have density information
    """
    species = []
    for key in list(derived.data_vars.keys()):
        if not re.match('^[nT](_fast)?_([0-9]+[a-zA-Z]+[0-9]{1,2}|e)$', key):
            continue
        s = key.split('_')[-1]
        if '_fast_' in key:
            s = 'fast_' + s
        species.append(s)
    species = tolist(np.unique(species))
    ions = [s for s in species if s not in ['e']]
    ions_with_dens = [i for i in ions if 'n_' + i in derived]
    ions_with_fast = [i.replace('fast_', '') for i in ions if 'fast_' in i]
    return species, ions, ions_with_dens, ions_with_fast


[docs]def available_profiles(server, shot, device='DIII-D', verbose=True): out = {} db = OMFITrdb(db='code_rundb', server='d3drdb', by_column=True) runs = db.select(f"SELECT * FROM plasmas WHERE code_name='OMFITprofiles' AND experiment='{device}' AND shot={shot}") if len(runs) == 0: print("No run_id found for this shot.") return out else: for i, runid in enumerate(runs['run_id']): out[ runid ] = f"runid={runid} by {runs['run_by'][i]} from {runs['start_time'][i]} to {runs['stop_time'][i]} with comment: {runs['run_comment'][i]}" return out
[docs]class OMFITprofiles(OMFITncDataset): """ Data class used by OMFITprofiles, CAKE and other OMFIT modules for storing experimental profiles data """ def __init__(self, filename, data_vars=None, coords=None, attrs=None, comment=''): """ :param filename: filename of the NetCDF file where data will be saved :param data_vars: see xarray.Dataset :param coords: see xarray.Dataset :param attrs: see xarray.Dataset :param comment: String that if set will show in the OMFIT tree GUI """ self.dynaLoad = False super().__init__(filename, data_vars=data_vars, coords=coords, attrs=attrs) self.OMFITproperties['comment'] = comment @property def comment(self): return self.OMFITproperties['comment'] @comment.setter def comment(self, comment): self.OMFITproperties['comment'] = comment def __tree_repr__(self): if self.comment: return self.__class__.__name__ + ': ' + self.comment, [] else: return super().__tree_repr__()
[docs] def to_omas(self, ods=None, times=None): """ :param ods: ODS to which data will be appended :return: ods """ if ods is None: ods = ODS() eq_times = None else: # Determine if equilibrium is avaliable and for what times. eq_times = ods.time('equilibrium') * 1e3 # ODS times are in s, and omfit_profiles are in ms. if 'device' in self.attrs: ods['dataset_description.data_entry.machine'] = self.attrs['device'] if 'shot' in self.attrs: ods['dataset_description.data_entry.pulse'] = self.attrs['shot'] # identify fitting coordinate for fit_coordinate in ['rho', 'psi_n', None]: if fit_coordinate in self.dims: break if fit_coordinate is None: raise ValueError("Fit coordinate should be 'rho' or 'psi_n'") # general info species, ions, ions_with_dens, ions_with_fast = get_species(self) nion = len(ions) if times is None: times = self['time'].values else: times = np.atleast_1d(times) # figure out time match between eq (if available) and profiles if eq_times is None: printw("No equilibrium data is avaliable to to_omas(). Some info will not be stored.") elif np.all([time in eq_times for time in times]): # aka is all times have avaliable eq printd("Matching equilibrium times found") elif np.any([time in eq_times for time in times]): printw("Some time slices don't have corresponding equilibria!") printw("These time slices will be missing some information.") else: printw("No equilibrium data is avaliable to to_omas(). Some info will not be stored.") # assign both core_profies and edge_profiles but with data from different spatial ranges idx_core = self[fit_coordinate].values <= 1.0 idx_edge = self[fit_coordinate].values >= 0.8 core_prof = ods['core_profiles'] edge_prof = ods['edge_profiles'] for prof in [core_prof, edge_prof]: prop = prof['ids_properties'] prop['comment'] = 'Data from OMFITprofiles.to_omas()' prop['homogeneous_time'] = True prof['time'] = times / 1e3 for ti, time in enumerate(times): for prof, idx_use, core in zip([core_prof, edge_prof], [idx_core, idx_edge], [True, False]): prof_1d = prof[f'profiles_1d.{ti}'] prof_1d['time'] = time / 1e3 # get corresponding eq and extract needed info geq = None R = None Bt = None Bp = None # Make sure we don't accidentally use values from last timeslice if eq_times is not None and len(eq_times) > 0 and time in eq_times: i_eq = np.where(time == eq_times)[0][0] # just the first index slice = ods[f'equilibrium.time_slice[{i_eq}]'] # get psin from profile if fit_coordinate == 'psi_n': psin = self['psi_n'].values else: psin = self['psi_n'].sel(time=time).values # Now we need the psin basis of the equilibrium psi_0 = slice['global_quantities.psi_axis'] psi_b = slice['global_quantities.psi_boundary'] psi_eq = slice['profiles_1d.psi'] psin_eq = (psi_eq - psi_0) / (psi_b - psi_0) # Re-interpolate R_interp = interp1e(psin_eq, slice['profiles_1d.r_outboard']) R = R_interp(psin) Z0 = R * 0 + slice[f'profiles_1d.geometric_axis.z'][0] # need to make sure dimensions match # Check the 2D profiles if slice[f'profiles_2d[0].grid_type.index'] != 1: printw(f"Unknow grid type in ODS's equilibrium. Aborting time slice idx = {ti}") continue RR = slice[f'profiles_2d[0].grid.dim1'] ZZ = slice[f'profiles_2d[0].grid.dim2'] Bt2D = slice[f'profiles_2d[0].b_field_tor'] Br2D = slice[f'profiles_2d[0].b_field_r'] Bz2D = slice[f'profiles_2d[0].b_field_z'] Bt = RectBivariateSplineNaN(RR, ZZ, Bt2D).ev(R, Z0) Br = RectBivariateSplineNaN(RR, ZZ, Br2D).ev(R, Z0) Bz = RectBivariateSplineNaN(RR, ZZ, Bz2D).ev(R, Z0) Bp = np.sqrt(Br**2 + Bz**2) # there is no Bp2D in standard ODSs for some reason, here we assume atleast cocos are already in order for q in self.variables: fit = self[q] if q in ['time']: continue if q == 'rho': if fit_coordinate == 'rho': prof_1d['grid.rho_tor_norm'] = self['rho'].values[idx_use] else: prof_1d['grid.rho_tor_norm'] = fit.sel(time=time).values[idx_use] elif q == 'psi_n': if fit_coordinate == 'psi_n': prof_1d['grid.rho_pol_norm'] = np.sqrt(self['psi_n'].values[idx_use]) else: prof_1d['grid.rho_pol_norm'] = np.sqrt(fit.sel(time=time).values[idx_use]) elif q == 'n_e': prof_1d['electrons.density_thermal'] = fit.sel(time=time).values[idx_use] elif q == 'T_e': prof_1d['electrons.temperature'] = fit.sel(time=time).values[idx_use] elif q == 'omega_P_e' and core: # this location is not valid OMAS location for edge profiles prof_1d['electrons.rotation.diamagnetic'] = fit.sel(time=time).values[idx_use] elif '_' in q and q.split('_', 1)[1] in ions: continue # thermal ions ni = 0 for ion in ions[::-1]: if ion == 'b' or ion.startswith('fast_'): continue profi = prof_1d[f'ion.{ni}'] profi['density_thermal'] = self[f'n_{ion}'].sel(time=time).values[idx_use] if f'T_{ion}' in self: profi['temperature'] = self[f'T_{ion}'].sel(time=time).values[idx_use] ion_details = list(atomic_element(symbol=ion).values())[0] profi['label'] = ion_details['symbol'] profi['z_ion'] = float(ion_details['Z_ion']) profi['multiple_states_flag'] = 0 profi['element[0].atoms_n'] = 1 profi['element[0].z_n'] = float(ion_details['Z']) profi['element[0].a'] = float(ion_details['A']) profi['multiple_states_flag'] = 0 if f'V_tor_{ion}' in self and not (f'omega_tor_{ion}' in self and 'R_midplane' in self): profi['velocity.toroidal'] = self[f'V_tor_{ion}'].sel(time=time).values[idx_use] elif f'omega_tor_{ion}' in self and 'R_midplane' in self: profi['velocity.toroidal'] = (self[f'omega_tor_{ion}'].sel(time=time) * self['R_midplane'].sel(time=time)).values[ idx_use ] if f'V_pol_{ion}' in self: profi['velocity.poloidal'] = self[f'V_pol_{ion}'].sel(time=time).values[idx_use] if core: # extra rotation info for the core profiles. (Not valid nodes for edge) if f'omega_P_{ion}' in self: profi['rotation.diamagnetic'] = self[f'omega_P_{ion}'].sel(time=time).values[idx_use] if f'omega_tor_{ion}' in self: profi['rotation_frequency_tor'] = self[f'omega_tor_{ion}'].sel(time=time).values[idx_use] if f'V_pol_{ion}' in self and Bp is not None: # Save to parallel streaming function, this will allow omegp to be calculated from ods profi['rotation.parallel_stream_function'] = self[f'V_pol_{ion}'].sel(time=time).values[idx_use] / Bp[idx_use] # Advance ni; its important that if it is fast ion and the loop-iteration is skipped, then ni do not advance ni += 1 # fast ions for ion in ions: if ion != 'b' and not ion.startswith('fast_'): continue # Get the 'base' ion for the fast population if ion.startswith('fast_'): base_ion = ion.replace('fast_', '') elif ion == 'b': base_ion == '2H1' # back compat for '_b' notations ion_details = list(atomic_element(symbol=base_ion).values())[0] # Determin the corresponding ion index ni = len(prof_1d['ion']) for nii in prof_1d['ion']: profi = prof_1d[f'ion.{nii}'] if profi['label'] == ion_details['symbol']: ni = nii break profi = prof_1d[f'ion.{ni}'] # Add fast_ion data. profi['density_fast'] = self[f'n_{ion}'].sel(time=time).values[idx_use] if f'p_{ion}' in self: pfast = self[f'p_{ion}'].sel(time=time)[idx_use] #'ion' here would have the form 'fast_2H1' for example else: pfast = ( self[f'T_{ion}'].sel(time=time).values[idx_use] * constants.e * self[f'n_{ion}'].sel(time=time).values[idx_use] ) profi['pressure_fast_perpendicular'] = ( 1.0 / 3.0 * pfast ) # Assume isotropic fast ions. Also OMAS treats p_xxx_perp as pressure in one of the perp directions, I think. profi['pressure_fast_parallel'] = 1.0 / 3.0 * pfast # Attach atomic data (from base_ion) profi['label'] = ion_details['symbol'] profi['z_ion'] = float(ion_details['Z_ion']) profi['multiple_states_flag'] = 0 profi['element[0].atoms_n'] = 1 profi['element[0].z_n'] = float(ion_details['Z']) profi['element[0].a'] = float(ion_details['A']) profi['multiple_states_flag'] = 0 if 'Total_Zeff' in self: prof_1d['zeff'] = self['Total_Zeff'].sel(time=time).values[idx_use] # Populate total pressure nodes under 'profiles_1d` ods.physics_core_profiles_pressures() # ods.physics_edge_profiles_pressures() # This function does not exist, but really should. return ods
[docs] def model_tree_quantities(self, warn=True, no_update=False, details=False): """ Returns list of MDS+ model_tree_quantities for all species. :param warn: [bool] If True, the function will warn if some of the `model_tree_quantities` are missing in OMFIT-source/omfit/omfit_classes/omfit_profiles.py and the model tree should be updated :param no_update: [bool] If True, the function will return only items that is in the object AND on the model tree, and ignore items that is not in model_tree_quantities. :return: list of strings """ new_model_tree_quantities = set(model_tree_quantities) if not no_update: for item in self.variables: match = False dont_replace = ['T_i_T_e_ratio', 'J_efit_norm'] # Don't make this into T_i_T_{species} ratio please for s in model_tree_species: if item not in dont_replace: tmp = item.replace(f'_{s}', '_{species}') if tmp != item: match = True break if match: new_model_tree_quantities.add(tmp) elif not no_update: new_model_tree_quantities.add(item) quant_set = set(model_tree_quantities) # using set compare... if len(new_model_tree_quantities - quant_set) > 0 and warn: import textwrap if details: printe('WARNING!: Update model_tree_quantities in OMFIT-source/omfit/omfit_classes/omfit_profiles.py') printe('WARNING!: and update the OMFIT_PROFS MDS+ model tree') printe(f'The missing quantities are {new_model_tree_quantities - quant_set}.') printe('-' * 140) printe('# fmt: off') printe(textwrap.fill(f'model_tree_quantities = {repr(new_model_tree_quantities)}', width=140)) printe('# fmt: on') printe('-' * 140) else: printe("WARNING!: Profile vars mismatch with model tree!") printe("WARNING!: Consider using the 'relaxed' option with to_mds()") printe("WARNING!: Or use .model_tree_quantities(details=True) for instructions to update model tree.") printe(f"WARNING!: Here are the problem vars -> {new_model_tree_quantities - quant_set}") quantities = [] for item in new_model_tree_quantities: if '{' in item: for s in model_tree_species: quantities.append(item.format(species=s)) else: quantities.append(item) return quantities
[docs] def create_model_tree(self, server, treename='OMFIT_PROFS'): """ Generate MDS+ model tree :param server: MDS+ server :param treename: MDS+ treename """ from omfit_classes.omfit_mds import OMFITmdsConnection conn = OMFITmdsConnection(server) quantities = {self.mds_translator(k): None for k in self.model_tree_quantities()} quantities['__content__'] = '' quantities['__x_coord__'] = '' quantities['__coords__'] = '' quantities['__dsp_name__'] = '' quantities['__attrs__'] = '' quantities['__comment__'] = '' conn.create_model_tree(treename, '', quantities, clear_subtree=True)
[docs] def check_attrs(self, quiet=False): """ Checks that basic/standard attrs are present. If not, they will be fill with standby values (usually 'unknown') Also checks that ints are ints and not int64, which would prevent json from working properly. :param quiet: If set to True, the function will not print warnings. By default set to False. """ basic_atts = ['shot', 'produced_by_module', 'produced_by_user']
[docs] def to_mds( self, server, shot, times=None, treename='OMFIT_PROFS', skip_vars=[], comment=None, tag=None, relaxed=False, commit=True, iri_upload_metadata=None, ): """ This script writes the OMFITproflies datase to DIII-D MDS+ and updates d3drdb accordingly :param server: MDS+ server :param shot: shot to store the data to :param treename: MDS+ treename :param skip_vars: variables to skip uploading. Array-like :param relaxed: if set to True, then the function will only try to upload vars in the model_tree_quantities list as recorded at the beginging of this file. If False, then this funct will attempt to upload all variables stored in self, and fail if a profile variable cannot be uploaded (usually due there not being a corresponding node on the MDS+ tree). :param commit (bool): If set to False, the SQL query will not commit the data to the coderunrdb. This is required to be false for a jenkins test or else if it tries to write data to SQL database twice it will throw an error. :param iri_upload_metadata: optionally, a dictionary with metadata for upload to iri_upload_log table in the code run RDB. Certain metadata are determined dynamically. If None, then it will not be logged to iri_upload_metadata. :return: runid, treename """ if times is None: times = self['time'].values # Parse comments if comment is None: comment = self.comment else: self.comment = comment # Update object comment to be consistent from omfit_classes.omfit_mds import OMFITmdsConnection, translate_MDSserver import json from omfit_classes.omfit_json import dumper conn = OMFITmdsConnection(server) if relaxed: quantities = self.model_tree_quantities(warn=False, no_update=True) else: quantities = self.model_tree_quantities() quantities = [x for x in quantities if x not in skip_vars] # Determine radial coord x_coord = None if 'psi_n' in self.coords.keys() and 'rho' in self.coords.keys(): x_coord = 'unclear' # raise Exception("Confusion exist in radial coordinate used. Make sure dataset have a single radial coordinate.") elif 'psi_n' in self.coords.keys(): x_coord = 'psi_n' elif 'rho' in self.coords.keys(): x_coord = 'rho' # find next available runid in d3drdb for this shot from omfit_classes.omfit_rdb import OMFITrdb rdb = OMFITrdb(db='code_rundb', server='d3drdb', by_column=True) # add data to d3drdb (before MDS+ so that we can get a RUNID, only if it has not been allocated yet) data = { 'shot': shot, 'experiment': 'DIII-D', 'run_type': 'user', 'tree': treename, 'start_time': np.min(times), 'stop_time': np.max(times), 'mdsserver': translate_MDSserver(server, ''), 'run_comment': comment, #'runtag':tag } #'x_coord': x_coord, command = "SpGetNextOmfitProfsID" output = rdb.custom_procedure(command, commit=commit, **data)[-1] runid = output.run_id if runid == -1: print("Error fetching available runid from SQL database") return runid, treename print(f'Writing OMFITprofiles to MDS+ {runid}') # write to MDS+ subset = self.sel(time=times) quantities = conn.write_dataset( treename=treename, shot=runid, subtree='', xarray_dataset=subset, quantities=quantities, translator=lambda x: self.mds_translator(x), ) # Store meta data # ==== conn.write(treename=treename, shot=runid, node='__content__', data=';'.join(quantities)) conn.write(treename=treename, shot=runid, node='__x_coord__', data=x_coord) # Upload meta to IRI table if iri_upload_metadata is not None and 'iri_id' in iri_upload_metadata: iri_upload_metadata['upload_server'] = translate_MDSserver(server, '') iri_upload_metadata['upload_tree'] = treename iri_upload_metadata['upload_id'] = runid rdb = OMFITrdb(db='code_rundb', server='d3drdb', by_column=True) output = rdb.custom_procedure('spNextIRIUploadID', **iri_upload_metadata) if output[-1].id == -1: printw('WARNING: Profile upload failed to update IRI upload log.') coords_string = '' disp_name_string = '' for quant in quantities: coords_string = ';'.join([coords_string, ','.join(self[quant].dims[::-1])]) # Upload reverses coord order, and above line accounts for it. But if upload behavior changes, this should # also change. try: disp_name = self[quant].attrs['display_name'] except KeyError: disp_name = '' disp_name_string = ';'.join([disp_name_string, disp_name]) # Trim the initial ':' that result from the way this is built if len(coords_string) > 1: coords_string = coords_string[1:] if len(disp_name_string) > 1: disp_name_string = disp_name_string[1:] conn.write(treename=treename, shot=runid, node='__coords__', data=coords_string) conn.write(treename=treename, shot=runid, node='__dsp_name__', data=disp_name_string) conn.write(treename=treename, shot=runid, node='__comment__', data=comment) attrs_str = json.dumps(self.attrs, default=dumper) conn.write(treename=treename, shot=runid, node='__attrs__', data=attrs_str) pprint(data) return runid, treename
[docs] def mds_translator(self, inv=None): """ Converts strings OMFITprofiles dataset keys to MDS+ nodes less than 12 chars long :param inv: string to which to apply the transformation if `None` the transformation is applied to all of the OMFITprofiles.model_tree_quantities for sanity check :param reverse: reverse the translation. Used to tranlate mds+ node names back to OMFITprofile names :return: transformed sting or if inv is None the `mapped_model_2_mds` and `mapped_mds_2_model` dictionaries """ mapper = SortedDict() mapper['_gradP_Vtor'] = 'gp_Vt' # special case for 'Er_He_gradP_Vtor' mapper['SAWTOOTH_'] = 'ST_' mapper['angular_momentum_density'] = 'mom_dens' mapper['angular_momentum'] = 'mom' mapper['midplane'] = 'mid' mapper['omega_gyro_'] = 'gyrof_' mapper['omega_'] = 'w_' mapper['2H1'] = 'D' mapper['4He2'] = 'He' mapper['6Li3'] = 'Li' mapper['10B5'] = 'B' mapper['12C6'] = 'C' mapper['14N7'] = 'N' mapper['20Ne10'] = 'Ne' mapper['since_last'] = '_last' mapper['until_next'] = '_next' mapper['_total'] = '_tot' mapper['T_i_T_e'] = 'TiTe' mapper['gradP'] = 'gp' mapper['gamma'] = 'gm' mapper['_ExB'] = 'eb' mapper['psin_'] = 'ps_' mapper['raw_'] = 'rw_' mapper['axis_value'] = 'r0' if inv is not None: for match, sub in mapper.items(): inv = inv.replace(match, sub) if len(inv) > 12: raise Exception( f'MDS+ OMFITprofiles quantity is longer than 12 chars: {inv}\nUpdate the mds_translator function accordingly' ) return inv else: model_tree_quantities = self.model_tree_quantities() mapped_model_2_mds = SortedDict() mapped_mds_2_model = SortedDict() for item0 in model_tree_quantities: item = item0 for match, sub in mapper.items(): item = item.replace(match, sub) if len(item) > 12: raise Exception(f'MDS+ string is longer than 12 chars: {item}') if item0 != item and item in model_tree_quantities: raise Exception(f'MDS+ string shadows something else: {item}') if item in mapped_mds_2_model: raise Exception(f'Multiple items map to the same quantity: {item0} {mapped_mds_2_model[item]}') mapped_model_2_mds[item0] = item mapped_mds_2_model[item] = item0 return mapped_model_2_mds, mapped_mds_2_model
[docs] def from_mds(self, server, runid): from omfit_classes.omfit_mds import OMFITmds import json tree = OMFITmds(server=server, treename='OMFIT_profs', shot=runid) contents = tree['__CONTENT__'].data()[0].split(";") x_coord = tree['__x_coord__'].data()[0] attrs_str = tree['__attrs__'].data()[0] coords = tree['__coords__'].data()[0].split(";") disp_names = tree['__dsp_name__'].data()[0].split(";") comment = tree['__comment__'].data()[0] if x_coord not in ['rho', 'psi_n']: raise Exception(f"x_coord was recorded as {x_coord}. It is not a recognized radial coordinate.") # Get the coords n_coord = {} for var in ['time', x_coord]: if var not in contents: # Tranlate exception to something that makes sense to user. raise Exception(f"Coordinate {var} missing from MDS+ data!") # Coord nodes by convention do not need translation, but might in the future dat = tree[var].xarray() dat = dat.rename(var) dat = dat.rename({'dim_0': var}) self[var] = dat n_coord[var] = len(dat.values) for i, var in enumerate(contents): if var in ['time', x_coord]: # Skip, but process coord label and attrs self[var].attrs['display_name'] = disp_names[i] else: node = self.mds_translator(inv=var) dat = tree[node].xarray() dat = dat.rename(var) # Parse dims, and construct coord translator subset ndim_data = len(dat.dims) var_coords = coords[i].split(',') ndim_coords = len(var_coords) if ndim_data != ndim_coords: printw(f"Dimension count does not match record for {var}.") else: rename_dict = {} for ii in np.arange(ndim_data): rename_dict[f'dim_{ii}'] = var_coords[ii] dat = dat.rename(rename_dict) self[var] = dat self.attrs = json.loads(attrs_str) self.comment = comment self.save() return self
[docs] def to_pFiles(self, eq, times=None, shot=None): """ :param eq: ODS() or dict. (OMFITtree() is a dict) Needs to contain equilibria information, either in the form of the ODS with needed eq already loaded, or as OMFITgeqdsk() objects in the Dict with the time[ms] as keys. Times for the eq need to be strict matches to profiles times coord. :param times: array like. time for which you would like p files to be generated. :param shot: int. shot number, only relevant in generating p file names. :return: OMFITtree() containing a series of OMFITpfile objs. """ # Generate times if needed if times is None: try: times = self['time'].values except KeyError: # Just here to add a helpful hint printw("Looking like your OMFITprofiles obj is missing 'time'. Is it properly initialized?") raise # Get shot if it can be found if shot is None: if 'shot' in self.attrs: shot = self.attrs['shot'] else: shot = 0 # Dummy number for p file names. # get good_times good_times = [] if isinstance(eq, ODS): # Check times good_times = [t for t in times if t in eq.time('equilibrium')] # ODS().time('...') will throw error if time is inconsistent elif isinstance(eq, dict): for i, time in enumerate(times): d3_time = int(time) if d3_time in eq: good_times.append(time) else: printw(f"Missing eq data for {time}, it will be skipped!") else: printw("Input arg 'eq' is in unrecognized format. This will fail!") good_times = [t for t in good_times if t in self['time'].values] good_times = array(good_times) if len(good_times) == 0: printw("No valid time found! Each timesilce needs profiles and equilibrium.") return else: printd(f"The following time was found to be good: {good_times}") printd("pFiles will be produced for these times only.") ods = ODS() # Now inject eq info, but only for good_times if isinstance(eq, ODS): prof_ods.equilibrium.time = np.array(good_times) for i, time in enumerate(good_times): j = np.where(eq.times('equilibrium') == time) ods[f'equilibrium.time_slice.{i}'] = eq[f'equilibrium.time_slice.{j}'] prof_ods.physics_consistent_times() # Generate missing time array elif isinstance(eq, dict): for i, time in enumerate(good_times): d3_time = int(time) ods = eq[d3_time].to_omas(ods=ods, time_index=i) # This is not very efficient, but does make to_omas() more standardized. Open to change as needs change. ods = self.to_omas(times=good_times, ods=ods) out_tree = OMFITtree() for i, time in enumerate(good_times): d3_time = int(time) out_tree[d3_time] = OMFITpFile(f'p{shot:06}.{d3_time:05}').from_omas(ods, time_index=i) return out_tree
[docs] def get_xkey(self): """ Get the key of the x-coord associated with this data array. :returns: str. key of the x-coord associated with the profiles, like 'psi_n', 'psi' or 'rho'. """ dims = list(self.dims.keys()) options = ['rho', 'psi_n', 'psi'] xaxis = [option for option in options if option in dims][0] return xaxis
[docs] def Zeff(self, gas='2H1', ni_to_ne=1, xfit='rho', device='DIII-D', update=True, verbose=False): """ Effective charge of plasma. Formula: Z_{eff} = \sum{n_s Z_s^2} / \sum{n_s Z_s} :return: DataArray. """ result = xarray.Dataset() meas_species, meas_ions, meas_ions_with_dens, meas_ions_with_fast = get_species(self) # required data strt = datetime.datetime.now() # For the list of ions with a density measurement ions_with_dens = [i for i in meas_ions if 'n_' + i in self] chk = self.check_keys(['n_e'] + ['n_{}'.format(i) for i in ions_with_dens], name='Zeff') # determine n_i from n_e if there is nothing else available if len(ions_with_dens) == 0 and 'n_e' in self: if ni_to_ne > 0: self['n_' + gas] = ni_to_ne * self['n_e'] ions_with_dens = [gas] printw( f' WARNING: No ions with a density measurement. Assuming a single ion species:{gas}, setting density n_{gas}={ni_to_ne}*ne' ) else: printw(' WARNING: Could not form Zeff. Missing ion densities') chk = False if not chk: return result # determine main ion density from quasi-neutrality if 'n_' + gas not in self: mg, zg = mZ(gas) nz = self['n_e'] - np.sum([self['n_' + i].values * mZ(i)[1] for i in ions_with_dens], axis=0) # Check if any isotope fraction measurements are available H_isotopes = ['1H1', '2H1', '3H1'] frac_meas = [iso for iso in H_isotopes if f'frac_{iso}' in self] if len(frac_meas) == 0: # Assume the only hydrogenic species is 'gas', this should be the typical case print(f" Assuming a single main-ion species:{gas}, setting density based on (ne - nz)") self[f'n_{gas}'] = nz / zg self[f'n_{gas}'].attrs['long_name'] = f'$n_{{gas}}$' self[f'n_{gas}'].attrs['units'] = 'm^-3' main_ions = [gas] keys = [f'dn_{s}_d{xfit}' for s in ['e'] + ions_with_dens] if self.check_keys(keys, name=f'dn_{gas}_d{xfit}'): self[f'dn_{gas}_d{xfit}'] = self[f'dn_e_d{xfit}'] - np.sum( [self[f'dn_{i}_d{xfit}'].values * mZ(i)[1] for i in ions_with_dens], axis=0 ) elif len(frac_meas) == 1: ion1 = frac_meas[0] print(f" Hydrogenic isotope fraction measurement available: frac_{ion1}") if ion1 == '3H1': raise Exception(" Tritium hydrogen fraction provided, not setup for this case") # Assume the other hydrogenic ion is either 1H1 or 2H1 ion2 = '2H1' if ion1 == '1H1' else '1H1' # Clip frac so densities dont become 0 printi(f" Allocating non impurity electrons between 2 hydrogenic ions: {ion1}, {ion2}") frac = np.clip(self[f'frac_{ion1}'].values, 1e-3, 0.999) if np.sum(np.invert(np.isfinite(frac))): raise Exception(" Fraction measurement has non finite values in it, fix before proceeding") self[f'n_{ion1}'] = nz * frac self[f'n_{ion1}'].attrs['long_name'] = f'$n_{{ion1}}$' self[f'n_{ion1}'].attrs['units'] = 'm^-3' self[f'n_{ion2}'] = nz * (1 - frac) self[f'n_{ion2}'].attrs['long_name'] = f'$n_{{ion2}}$' self[f'n_{ion2}'].attrs['units'] = 'm^-3' main_ions = [ion1, ion2] keys = [f'dn_{s}_d{xfit}' for s in ['e'] + ions_with_dens] if self.check_keys(keys, name=f'dn_{ion1}_d{xfit} & dn_{ion2}_d{xfit}'): dnz = self[f'dn_e_d{xfit}'] - np.sum([self[f'dn_{i}_d{xfit}'].values * mZ(i)[1] for i in ions_with_dens], axis=0) self[f'dn_{ion1}_d{xfit}'] = dnz * fac self[f'dn_{ion2}_d{xfit}'] = dnz * (1 - fac) else: raise Exception("Multiple hydrogenic ion fractions specified") ions_with_temp = [k for k in meas_ions if not k.startswith('fast') and 'T_' + k in self] for cur_mi in main_ions: if len(np.where(self['n_' + cur_mi].values <= 0)[0]): printe(' Had to force main ion density to be always positive!') printe(' This will likely present a problem when running transport codes!') self['n_' + cur_mi].values[np.where(self['n_' + cur_mi].values <= 0)] = np.max(self['n_' + cur_mi].values) * 0 + 1 # If a temperature isnt available for this ion use whatever ion temperature is available if (cur_mi not in ions_with_temp) and len(ions_with_temp): # Always prefer T_12C6 if available on DIII-D if is_device(device, 'DIII-D') and ('12C6' in ions_with_temp): T_ref_ion = '12C6' else: T_ref_ion = ions_with_temp[-1] print('- Setting T_{:} equal to T_{:}'.format(cur_mi, T_ref_ion)) T_name = f'T_{cur_mi}' self[T_name] = self['T_' + T_ref_ion] * 1 long_name = self[f'T_{T_ref_ion}'].attrs.get('long_name', T_name) self[T_name].attrs['long_name'] = long_name.replace(T_ref_ion, cur_mi) if f'dT_{T_ref_ion}_d{xfit}' in self: self[f'dT_{cur_mi}_d{xfit}'] = self[f'dT_{T_ref_ion}_d{xfit}'] * 1 self[f'dT_{cur_mi}_d{xfit}'].attrs['units'] = ( self[f'dT_{T_ref_ion}_d{xfit}'].attrs.get('units', '?').replace(T_ref_ion, cur_mi) ) ions_with_dens.append(cur_mi) else: ions_with_dens.append(gas) if verbose: print(' > Finding gas density took {:}'.format(datetime.datetime.now() - strt)) # calculate Zeff (not assuming quasi-neutrality) strt = datetime.datetime.now() nz_sum = np.sum([self['n_' + i].values * mZ(i)[1] for i in ions_with_dens], axis=0) nz2_sum = np.sum([self['n_' + i].values * mZ(i)[1] ** 2 for i in ions_with_dens], axis=0) z_eff = nz2_sum / nz_sum + 0 * self['n_e'].rename('Zeff') z_eff.attrs['long_name'] = r'$Z_{eff}$' if verbose: print( ' > Finding zeff took {:}'.format(datetime.datetime.now() - strt) ) # note without using .values in the sums this takes 10 seconds result.update({'Zeff': z_eff, 'Total_Zeff': z_eff * 1}) if update: self.update(result) return result
[docs] def pressure(self, species, xfit='rho', name=None, update=True, debug=False): """ Species pressure. Formula: P = \sum{n_s T_s} :param species: list. Species included in calculation sum. :param name: str. subscript for name of result DataArray (i.e. it wil be called p_name) :return: DataArray. Pressure (Pa) """ result = xarray.Dataset() # default name if name is None: name = '_'.join(species) # standard checks species = copy.deepcopy(species) good_species = [] keys = [] for s in species: chk = self.check_keys([f'n_{s}', f'T_{s}'], name=f'pressure of {name}') if chk: good_species.append(s) keys.append(f'n_{s}') keys.append(f'T_{s}') species = good_species if not len(species): return result # add sum total pressure da = 0 for s in species: da = (self['n_' + s] * self['T_' + s] * scipy.constants.e) + da da.attrs['long_name'] = r'$p_{' + ','.join(species) + '}$' da.attrs['units'] = 'Pa' if any(not is_uncertain(self[k].values) for k in keys): if debug: printw( f' > could not propagate error in pressure due to no std_dev in {[k for k in keys if not is_uncertain(self[k].values)]}' ) da.values = nominal_values(da.values) # remove error propagation if any fit doesn't include std_dev result.update({f'p_{name}': da}) # add derivative with proper error propagation if available from the fits dkeys = [f'd{k}_d{xfit}' for k in keys] chk = self.check_keys(dkeys, name=f'pressure derivative for {name}', print_error=debug) if chk: dda = 0 for s in species: dda = (self[f'dn_{s}_d{xfit}'] * self[f'dT_{s}_d{xfit}'] * scipy.constants.e) + dda if any(not is_uncertain(self[k].values) for k in dkeys): if debug or True: printw( f' > could not propagate error in pressure due to no std_dev in {[k for k in dkeys if not is_uncertain(self[k].values)]}' ) dda.values = nominal_values(dda.values) # remove error propagation if any fit doesn't include std_dev dda.attrs['long_name'] = rf'd p_{",".join(species)} / d$\{xfit}$' dda.attrs['units'] = f'Pa per {xfit}' result.update({f'dp_{name}_d{xfit}': dda}) if update: self.update(result) return result
[docs] def inverse_scale_length(self, key, update=True): """ Inverse scale length :param key: str. The variable for which the inverse scale length is computed :return: Dataset """ result = xarray.Dataset() if not self.check_keys([key, 'd' + key + '_drho'], name='Inverse scale length'): return result val = self[key].values * 1.0 val[nominal_values(val) == 0] *= np.nan isl = -self['d' + key + '_drho'] / val isl.attrs['units'] = '' isl.attrs['long_name'] = r'$\frac{d' + key + r'/d\rho}{' + key + r'}$' result[key + '_isl'] = isl if update: self.update(result) return result
[docs] def log_lambda(self, update=True): """ The Coulomb logarithm: the ratio of the maximum impact parameter to the classical distance of closest approach in Coulomb scattering. Lambda, the argument, is known as the plasma parameter. Formula: \ln \Lambda = 17.3 - \frac{1}{2}\ln(n_e/10^20)+\frac{3}{2}\ln(T_e/eV) :return: """ result = xarray.Dataset() # approx chk = self.check_keys(keys=['n_e', 'T_e'], name='ln(Lambda)') if not chk: return result # assumes n in m^-3, T in eV da = 0 * self['n_e'].rename('lnLambda') ne = self['n_e'].values te = self['T_e'].values da.values = 17.3 - 0.5 * unumpy.log(ne / 1.0e20) + 1.5 * unumpy.log(te * 1e-3) da.attrs['long_name'] = r'$\ln \Lambda$' result.update({'lnLambda': da}) if update: self.update(result) return result
[docs] def collisionality(self, s, s2='e', eq=None, update=True): """ Collisionality from J.D. Huba, "NRL FORMULARY", 2011. :param s: string. Species. :param s2: string. Colliding species (default elecctrons). **Currently not used** :return: Dataset """ result = xarray.Dataset() if not eq: # this is required as input in 8_postfit.py printe('WARNING!: Collisionality calculation requires an input equilibrium!') if self.check_keys(keys=['psi_n', 'n_' + s, 'T_' + s, 'lnLambda'], name='nu_' + s): m, Z = mZ(s) if s == 'e': fac = 2.91e-6 lbl = '$\\nu_{' + s + '} = 2.91e-12\\/ n_{' + s + '} \\ln \\Lambda} T_{' + s + '}^{-3/2}$' else: fac = 4.80e-8 / np.sqrt(m / constants.m_u) lbl = '$\\nu_{' + s + '} = 4.80e-14\\/ \\mu^{-1/2} Z^4 n_{' + s + '} \\ln \\Lambda} T_{' + s + '}^{-3/2}$' nu = fac * (Z**4) * (self['n_' + s] / 1e6) * self.log_lambda()['lnLambda'] * (self['T_' + s]) ** -1.5 nu.attrs['long_name'] = lbl result.update({'nu_' + s: nu}) # normalize vth = usqrt(2 * self['T_' + s] * constants.eV / m) dR = self['R_midplane'] - eq['R_maxis'] dR[np.where(dR < 1.0e-3)] = 1.0e-3 epsr = dR / eq['R_maxis'] epsr[np.where(epsr == 0)[0]] = np.nan nustar = nu * self['q'] * self['R_midplane'] / (vth * epsr**1.5) nustar.attrs['long_name'] = '$\\nu_{*,' + s + '} = \\nu_{' + s + '} (qR_{midplane} / v_{th,' + s + '} \\epsilon_r^{3/2})$' result.update({'nu_star_' + s: nustar}) if update: self.update(result) return result
[docs] def spitzer(self, update=True): # transverse Spitzer resistivity # source: NRL plasma formulary, J. D. Huba, 2009 result = xarray.Dataset() if self.check_keys(keys=['T_e', 'Zeff', 'lnLambda'], name='resistivity'): numerical_factor = 1.03e-4 # gives result in Ohm.m te = self['T_e'] zeff = self['Zeff'] lnlambda = self.log_lambda()['lnLambda'] resist_mks = numerical_factor * zeff * lnlambda * te**-1.5 ##Just in case someone needs this someday, here's the cgs version, units are seconds: # resist_cgs = 1.15e-14 * zeff * lnlambda * te**-1.5 resist_mks.attrs['long_name'] = r'$\eta$' resist_mks.attrs['units'] = 'Ohm*m' result.update({'resistivity': resist_mks}) if update: self.update(result) return result
[docs] def plasma_frequency(self, s, relativistic=False, update=True, debug=False): """ Calculate plasma frequency. :param s: string. Species. :param relativistic: bool. Make a relativistic correction for m_e (not actually the relativistic mass). :return : Dataset. """ result = xarray.Dataset() if self.check_keys(keys=['n_' + s] + relativistic * ['T_' + s], name='omega_plasma_' + s): m, Z = mZ(s) if relativistic: if s == 'e': m *= usqrt(1 + 5 * self['T_' + s] / 511e3) else: printe(' WARNING: No relativistic correction applied for ions') if debug: Ze2 = (Z * constants.e) ** 2 n_rename_to_omega_plasma = self['n_' + s].rename('omega_plasma_' + s) sqrt_of_n_rename = usqrt(n_rename_to_omega_plasma) meps = m * constants.epsilon_0 print('Ze2 = ', Ze2) print('n_rename_to_omega_plasma = ', n_rename_to_omega_plasma) print('sqrt_of_n_rename = ', sqrt_of_n_rename) print('meps = ', meps) print( 'shapes of Ze2, n_rename_to_omega_plasma, sqrt_of_n_rename, meps = ', np.shape(Ze2), np.shape(n_rename_to_omega_plasma), np.shape(sqrt_of_n_rename), np.shape(meps), ) omega_plasma = sqrt_of_n_rename * Ze2 * meps print('omega_plasma = ', omega_plasma) print('shape(omega_plasma) = ', np.shape(omega_plasma)) omega_plasma = self['n_' + s].rename('omega_plasma_' + s) * 0 + omega_plasma print('omega_plasma (after adding attributes back) = ', omega_plasma) else: omega_plasma = usqrt((self['n_' + s].rename('omega_plasma_' + s) * (Z * constants.e) ** 2) / (m * constants.epsilon_0)) omega_plasma = ( omega_plasma + self['n_' + s].rename('omega_plasma_' + s) * 0 ) # this last bit is to reattach the dataArray attributes as they will not survive unumpy.sqrt() result.update({"omega_plasma_" + s: omega_plasma}) result.update({"omega_plasma_" + s: omega_plasma}) if update: self.update(result) return result
[docs] def gyrofrequency(self, s, mag_field=None, relativistic=False, update=True): """ Calculate gyrofrequency at the LFS midplane. :param s: string. Species. :param mag_field: external structure generated from OMFITlib_general.mag_field_components :param relativistic: bool. Make a relativistic correction for m_e (not actually the relativistic mass). :return : Dataset. """ result = xarray.Dataset() xkey = self.get_xkey() if not mag_field: # this is required as input in 8_postfit.py printe('WARNING!: Gyrofrequency calculation requires a mag_field input! See 8_postfit.py for format.') if self.check_keys(keys=['R_midplane'] + relativistic * ['T_' + s], name='omega_gyro_' + s): m, Z = mZ(s) if relativistic: if s == 'e': m *= usqrt(1 + 5 * self['T_' + s] / 511e3) else: printe(' WARNING: No relativistic correction applied for ions') # magnetic field on the midplane index = np.argmin(np.abs(mag_field['Z_grid'][0, :])) eq_R0 = mag_field['R_grid'][:, index] eq_B0 = mag_field['Bmod_arr'][:, index, :] fit_B0 = np.zeros_like(self['R_midplane'].transpose('time', xkey)) for i, t in enumerate(self['time'].values): fit_B0[i, :] = np.interp(self['R_midplane'].isel(time=i), eq_R0, eq_B0[:, i]) name = "omega_gyro_" + s + '_midplane' da = xarray.DataArray(np.abs(Z) * constants.e * fit_B0 / m, coords=self['R_midplane'].coords) result.update({name: da}) if update: self.update(result) return result
[docs] def xo_cutoffs(self, s, mag_field=None, relativistic=True, update=True): """ Calculate X-mode R and L cutoffs. Note, O-mode cutoff is already stored as omega_plasma_e. :param s: string. Species. :param relativistic: bool. Make a relativistic correction for m_e (not actually the relativistic mass). :return : Dataset. """ result = xarray.Dataset() if self.check_keys(keys=['R_midplane', 'n_' + s] + relativistic * ['T_' + s], name=s + ' cutoffs'): m, Z = mZ(s) omega_c = self.gyrofrequency(s, mag_field=mag_field, relativistic=relativistic)['omega_gyro_' + s + '_midplane'] omega_p = self.plasma_frequency(s, relativistic=relativistic)['omega_plasma_' + s] omega_RX = +0.5 * omega_c + 0.5 * usqrt(omega_c**2 + 4 * omega_p**2) omega_LX = -0.5 * omega_c + 0.5 * usqrt(omega_c**2 + 4 * omega_p**2) result.update( { 'omega_RX_' + s + '_midplane': omega_RX.rename('omega_RX_' + s + '_midplane'), 'omega_LX_' + s + '_midplane': omega_LX.rename('omega_LX_' + s + '_midplane'), } ) if update: self.update(result) return result
[docs] def diamagnetic_frequencies(self, spc, update=True): """ Calculate the diamagnetic frequency, and its density / temperature components. Formula: \omega_P = -\frac{T}{nZe}\frac{dn}{d\psi} - \frac{1}{Ze}\frac{dT}{d\psi} :param spc: Species for which temperature is fit :param update: bool. Set to true to update self, if False, only returns and does not update self. Gradients, if missing, will always update though. """ freqs = xarray.Dataset() m, Z = mZ(spc) # calculate gradients if not avaliable if f'dn_{spc}_dpsi' not in self and f'n_{spc}' in self: dn_dpsi = self.xderiv('n_' + spc, coord='psi') if f'dT_{spc}_dpsi' not in self and f'T_{spc}' in self: dT_dpsi = self.xderiv('T_' + spc, coord='psi') # density part if self.check_keys(keys=[f'n_{spc}', f'T_{spc}'] + [f'dn_{spc}_dpsi'], name=f'omega_N_{spc}'): omega_N = -self[f'dn_{spc}_dpsi'] * self[f'T_{spc}'] * constants.eV / (self[f'n_{spc}'] * Z * constants.e) omega_N.attrs['long_name'] = r"$\omega_N = -\frac{T}{nZe}\frac{dn}{d\psi}$" freqs.update({f'omega_N_{spc}': omega_N}) # temperature part if self.check_keys(keys=[f'T_{spc}'] + [f'dT_{spc}_dpsi'], name=f'omega_T_{spc}'): omega_T = -self[f'dT_{spc}_dpsi'] * constants.eV / (Z * constants.e) omega_T.attrs['long_name'] = r"$\omega_T = -\frac{1}{Ze}\frac{dT}{d\psi}$" freqs.update({f'omega_T_{spc}': omega_T}) # total if len(freqs.data_vars) == 2: omega_P = omega_N + omega_T omega_P.attrs['long_name'] = r"$\omega_{p," + spc + r"} = -\frac{T}{nZe}\frac{dn}{d\psi} -\frac{1}{Ze}\frac{dT}{d\psi}$" freqs.update({f'omega_P_{spc}': omega_P}) if update: self.update(freqs) return freqs
[docs] def offset_frequency(self, s, ms=None, propagate_uncertainties=False, update=True): """ Calculate the NTV offset frequency. Formula: \omega_NTV0 = - (3.5+/-1.5) * \frac{1}{Ze}\frac{dT}{d\psi} :param s: Species for which temperature is fit :param ms: Species to use for m,Z (default to fit species) """ freqs = xarray.Dataset() if not ms: ms = s m, Z = mZ(ms) if self.check_keys(keys=['T_' + s] + ['dT_' + s + '_dpsi'], name='omega_NTV0_' + s): if propagate_uncertainties: fac = uncertainties.ufloat(3.5, 1.5) else: fac = 2.5 omega_NTV0 = fac * self['dT_' + s + '_dpsi'] * constants.eV / (Z * constants.e) omega_NTV0.attrs['long_name'] = r"$\omega_{NTV0} = 2.5 * \frac{1}{Ze}\frac{dT}{d\psi}$" freqs.update({'omega_NTV0_' + ms: omega_NTV0}) # include name os species used for profiles if ms != s: for key in list(freqs.data_vars.keys()): freqs = freqs.rename({key: key + '_' + s}) if update: self.update(freqs) return freqs
[docs] def radial_field(self, s, Er_vpol_zero=False, mag_field=None, eq=None, plot_debug_Er_plot=False, xfit='rho', update=True): """ Radial electric field. Formula: :param s: Species which will be used to calculate Er :param mag_field: external structure generated from OMFITlib_general.mag_field_components """ import scipy.interpolate as interp if not eq: # this is required as input in 8_postfit.py printe('WARNING!: Er calculation requires an input equilibrium!') if not mag_field: # this is required as input in 8_postfit.py printe('WARNING!: Er calculation requires a mag_field input! See 8_postfit.py for format.') xkey = self.get_xkey() species, ions, ions_with_dens, ions_with_fast = get_species(self) # Need to double check the logic of this, should the data be overwritten by zeros each time? # May be an issue with merging otherwise # req_data = ['V_pol_'+s,'T_'+s, 'n_'+s,'omega_tor_'+s] gradP_term = True if self.check_keys(keys=['T_' + s, 'n_' + s], name='Er_{}_gradP'.format(s), print_error=False) else False Vtor_term = True if self.check_keys(keys=['omega_tor_' + s], name='Er_{}_Vtor'.format(s), print_error=False) else False Vpol_term = True if self.check_keys(keys=['V_pol_' + s], name='Er_{}_Vpol'.format(s), print_error=False) else False if Er_vpol_zero: Vpol_term = True # Assume Vpol = 0 if it does not exist Er_term = True if (gradP_term and Vtor_term and Vpol_term) else False # Extract mass, element and Z from the name of the impurity, then change to my naming convention m, Z = mZ(s) mass, Z_imp, impurity = m, Z, s print(' * Performing Er calculation for {}, mass:{:g}, Z:{}'.format(impurity, mass, Z_imp)) Er_other_species = False for s_tmp in species: if 'Er_{}'.format(s_tmp) in self: Er_other = self['Er_{}'.format(s_tmp)].values Er_other_species = True print(' ' * 4 + 'Er already available from {}'.format(s_tmp)) if (not Er_term) and (not Vpol_term) and (not Vtor_term) and (not gradP_term): return {} # P [N/m2] = (n/V) [m-3] T [J=Nm], for P in N/m2 need to make sure ni is in m-3 and ti is in Joules # for grad P we need to know the radius of each rho point (varies with time.. need to loop through time) # from eq we know R(rho) # We have the rho values of the profiles i.e ti_rho, we want to know what R they correspond to # Problem here is that np.interp is expecting a monotonic rho. # Need to split into cases on the HFS and LFS rho = self['rho'].values ntimes = self['time'].shape[0] nrho = self[xkey].shape[0] profile_Rmid_derived = np.zeros((ntimes, nrho), dtype=float) for i in range(ntimes): profile_Rmid_derived[i, :] = self['R_midplane'].isel(time=i).values # r_mid = R_midplane-R_midplane[0] r_mid = self['R_midplane'] * 0 for i in range(ntimes): r_mid[i] = self['R_midplane'][i] - self['R_midplane'][i][0] # print(r_mid) if gradP_term: ti_values = self['T_{}'.format(impurity)].values # eV ni_values = self['n_{}'.format(impurity)].values # m-3 pi_values = ni_values * (ti_values * constants.e) gradP_arr = pi_values * 0 coords = self['T_{}'.format(impurity)].coords if Vpol_term: if Er_vpol_zero: pol_rot_data = np.zeros((ntimes, nrho), dtype=float) coords = self[next(iter(self.data_vars.keys()))].coords else: V_pol_DA = self['V_pol_{}'.format(impurity)] pol_rot_data = +V_pol_DA.values coords = V_pol_DA.coords # This is required if rho is 1D which is the case when the fits are done on rho (as opposed to psiN) if len(rho.shape) == 1: rho = np.tile(rho, (ntimes, 1)) # Computation saving : can get away with just interpolating on the midplane midplane_ind = int(np.argmin(np.abs(mag_field['Z_grid'][0, :]))) # Find the R values for the profiles vs time R_mag_axis_fun = interp1e(eq['time'].values, eq['profile_R_midplane'].values[0]) iz0 = np.argmin(np.abs(eq['Z'].values)) x = eq[xkey].values[:, iz0, :] r = (eq['R'].values[:, np.newaxis] * np.ones(x.shape)).T.ravel() t = (eq['time'].values[np.newaxis, :] * np.ones(x.shape)).T.ravel() x = x.T.ravel() if len(eq['time'].values) == 1: # need finite spread for linear interp lt = len(t) x = np.tile(x, 2) t = np.tile(t, 2) r = np.tile(r, 2) t[lt:] += 1 t[:lt] -= 1 R_mag_axis = eq['profile_R_midplane'].values[0] else: R_mag_axis = R_mag_axis_fun(t) valid = (r > R_mag_axis) & isfinite(x) & isfinite(t) & isfinite(r) R_fun = LinearNDInterpolator(list(zip(x[valid], t[valid])), r[valid]) R_near = NearestNDInterpolator(list(zip(x[valid], t[valid])), r[valid]) points = np.reshape(np.meshgrid(self[xkey].values, self['time'].values), (2, -1)).T profile_R = R_fun(points) fill = isnan(profile_R) profile_R[fill] = R_near(points[fill]) profile_R = profile_R.reshape((ntimes, nrho)) if Vtor_term: omega_tor_DA = self['omega_tor_{}'.format(impurity)] omega_tor = omega_tor_DA.values tor_data = omega_tor * profile_R coords = omega_tor_DA.coords if gradP_term: # What to do about the ones that are on the HFS? dR = np.gradient(profile_R, axis=-1) # Modify the innermost pi_values so that dP will always be 0 on axis dP = ugrad1(pi_values) # Set onaxis dP to zero dP[:, 0] = 0 # Need to make sure that dR is non zero, replace values of where dR=0 with min(positive dR) if not np.any(dR > 0): printe('gradP term in Er calculation had errors. Bad equilibrium?') eps = 1e-9 dR[dR <= eps] = np.min(dR[dR > eps]) gradP_arr = dP / dR # Figure out the Bt, Bp values for each of the radii x time lt = len(eq['time'].values) in_R = eq['R'].values in_t = eq['time'].values in_Bt = +mag_field['Bt_arr'][:, midplane_ind, :] in_Bp = +mag_field['Bp_arr'][:, midplane_ind, :] # need finite spread in time for RegularGridInterpolator otherwise sometimes it returns nan with a single timeslice # Create a dummy spread in time with the same field data for both times if lt == 1: nR = in_Bt.shape[0] in_t = np.tile(in_t, 2) # Create a spread in time in_t[lt:] += 1 in_t[:lt] -= 1 cor_size = np.ones((nR, 2), dtype=float) in_Bt = (in_Bt[:, 0])[:, np.newaxis] * cor_size in_Bp = (in_Bp[:, 0])[:, np.newaxis] * cor_size Bt_midplane_fun = RegularGridInterpolator((in_R, in_t), in_Bt, bounds_error=False, fill_value=None) Bp_midplane_fun = RegularGridInterpolator((in_R, in_t), in_Bp, bounds_error=False, fill_value=None) # Assemble a set of profile_R, time values for interpolating Bt and Bp rt_points = list(zip(profile_R.flatten(), np.repeat(self['time'].values, profile_R.shape[1]))) Bt_vals_midplane = Bt_midplane_fun(rt_points).reshape((ntimes, nrho)) Bp_vals_midplane = Bp_midplane_fun(rt_points).reshape((ntimes, nrho)) RBp_vals_midplane = profile_R * Bp_vals_midplane outputs = {} outputs_attrs = {} if gradP_term: outputs['Er_' + s + '_gradP'] = gradP_arr / (Z_imp * constants.e * ni_values) outputs_attrs['Er_' + s + '_gradP'] = {'units': 'V/m'} if Vtor_term: outputs['Er_' + s + '_Vtor'] = tor_data * Bp_vals_midplane outputs_attrs['Er_' + s + '_Vtor'] = {'units': 'V/m'} if Vpol_term: outputs['Er_' + s + '_Vpol'] = -pol_rot_data * Bt_vals_midplane outputs_attrs['Er_' + s + '_Vpol'] = {'units': 'V/m'} if Vtor_term and Vpol_term: outputs['Er_' + s + '_VxB'] = -pol_rot_data * Bt_vals_midplane + tor_data * Bp_vals_midplane outputs_attrs['Er_' + s + '_VxB'] = {'units': 'V/m'} if Er_term: outputs['Er_' + s] = outputs['Er_' + s + '_gradP'] + outputs['Er_' + s + '_Vtor'] + outputs['Er_' + s + '_Vpol'] outputs_attrs['Er_' + s] = {'units': 'V/m'} if Er_other_species and gradP_term and Vtor_term: outputs['Er_' + s + '_Vpol'] = Er_other - outputs['Er_' + s + '_Vtor'] - outputs['Er_' + s + '_gradP'] outputs_attrs['Er_' + s + '_Vpol'] = {'units': 'V/m'} outputs['V_pol_' + s] = -1.0 * outputs['Er_' + s + '_Vpol'] / Bt_vals_midplane outputs_attrs['V_pol_' + s] = {'units': 'm/s'} if gradP_term and Vtor_term: outputs['Er_' + s + '_gradP_Vtor'] = outputs['Er_' + s + '_gradP'] + outputs['Er_' + s + '_Vtor'] outputs_attrs['Er_' + s + '_gradP_Vtor'] = {'units': 'V/m'} if plot_debug_Er_plot: fig, ax = plt.subplots(nrows=7, sharex=True) for i in range(ntimes): clr = plt.cm.viridis(float(i) / ntimes) x_axis_R = False if x_axis_R: x = profile_R[i, :] ax[-1].set_xlabel('R') else: x = rho[i, :] ax[-1].set_xlabel('rho') for cur_ax in ax: cur_ax.axvline(1) cnt = 0 ax[cnt].plot(x, nominal_values(ni_values[i, :]), color=clr) ax[cnt].set_ylabel('n') cnt += 1 ax[cnt].plot(x, nominal_values(pi_values[i, :]), color=clr) ax[cnt].set_ylabel('P') cnt += 1 ax[cnt].plot(x, np.gradient(nominal_values(pi_values[i, :])) / np.gradient(rho[i, :]), color=clr) ax[cnt].set_ylabel('dP/drho') cnt += 1 ax[cnt].plot(x, np.gradient(rho[i, :]) / np.gradient(profile_R[i, :]), color=clr) ax[cnt].set_ylabel('drho/dR') ax[cnt].plot(x, np.gradient(rho[i, :]) / np.gradient(profile_Rmid_derived[i, :]), color=clr, ls='--') ax[cnt].set_ylabel('drho/dR') cnt += 1 ax[cnt].plot(x, nominal_values(gradP_arr[i, :]), color=clr) ax[cnt].set_ylabel('dP/dR') cnt += 1 ax[cnt].plot(x, nominal_values(outputs['Er_' + s + '_gradP'][i, :]), color=clr) ax[cnt].set_ylabel('Er_{}_gradP'.format(s)) cnt += 1 ax[0].set_title("{}".format(s)) fig.canvas.draw() output_DS = xarray.Dataset() # should really only output true flux functions! # need to include Omega and K # add omega_E_gradP, omega_E_Vtor and omega_E_Vpol to see the different contributions # calculate ExB shearing rate using Waltz-Miller derivation # gamma_ExB=(r/q)*omega_E_drho*(drho/dr) # calculation is made for the midplane if gradP_term: omega_E_gradP = xarray.DataArray(outputs['Er_' + s + '_gradP'] / RBp_vals_midplane, coords=coords) gamma_ExB_gradP = -1 * (r_mid / self['q']) * ugrad1(omega_E_gradP) / ugrad1(r_mid) gamma_ExB_gradP.attrs['long_name'] = r'$\gamma_{E,' + s + r'} = \frac{r}{q} \frac{dE_r}{d\rho} \frac{d\rho}{dr}$' gamma_ExB_gradP.attrs['units'] = '1/s' output_DS.update({'gamma_ExB_' + s + '_gradP': gamma_ExB_gradP}) if Vtor_term: omega_E_Vtor = xarray.DataArray(outputs['Er_' + s + '_Vtor'] / RBp_vals_midplane, coords=coords) gamma_ExB_Vtor = -1 * (r_mid / self['q']) * ugrad1(omega_E_Vtor) / ugrad1(r_mid) gamma_ExB_Vtor.attrs['long_name'] = r'$\gamma_{E,' + s + r'} = \frac{r}{q} \frac{dE_r}{d\rho} \frac{d\rho}{dr}$' gamma_ExB_Vtor.attrs['units'] = '1/s' output_DS.update({'gamma_ExB_' + s + '_Vtor': gamma_ExB_Vtor}) if Vpol_term: omega_E_Vpol = xarray.DataArray(outputs['Er_' + s + '_Vpol'] / RBp_vals_midplane, coords=coords) gamma_ExB_Vpol = -1 * (r_mid / self['q']) * ugrad1(omega_E_Vpol) / ugrad1(r_mid) gamma_ExB_Vpol.attrs['long_name'] = r'$\gamma_{E,' + s + r'} = \frac{r}{q} \frac{dE_r}{d\rho} \frac{d\rho}{dr}$' gamma_ExB_Vpol.attrs['units'] = '1/s' output_DS.update({'gamma_ExB_' + s + '_Vpol': gamma_ExB_Vpol}) if Er_term: omega_E = xarray.DataArray(outputs['Er_' + s] / RBp_vals_midplane, coords=omega_tor_DA.coords) omega_E.attrs['long_name'] = r'$\omega_{E,' + s + r'} = E_r/RB_p$' omega_E.attrs['units'] = 'rad/s' output_DS.update({'omega_E_' + s: omega_E}) gamma_ExB = -1 * (r_mid / self['q']) * ugrad1(omega_E) / ugrad1(r_mid) gamma_ExB.attrs['long_name'] = r'$\gamma_{E,' + s + r'} = \frac{r}{q} \frac{dE_r}{d\rho} \frac{d\rho}{dr}$' gamma_ExB.attrs['units'] = '1/s' output_DS.update({'gamma_ExB_' + s + '': gamma_ExB}) # Electrostatic potential -dPhi/dpsi = Er/RBp # cumtrapz is the only method numerically accurate enough to recover omega_E via. deriv() # Perform the integral epot_s = -1.0 * cumtrapz(omega_E.values, x=self['psi'].values, axis=1) if xfit == 'rho': epotx = self['rho'].values elif xfit == 'psi_n': epotx = self['psi_n'].values if ntimes > 1: print(' * Electrostatic Potential (in time)') # Get boundary condition to remove making Phi(1.0) == 0.0 epotBC = URegularGridInterpolator((self['time'].values, epotx), epot_s)((self['time'].values, 1.0)) # Remove boundary condition epot_s -= epotBC[:, np.newaxis] else: print(' * Electrostatic Potential (snapshot)') epot_s -= uinterp1d(epotx, epot_s[0, :])(1.0) epot = xarray.DataArray(epot_s, coords=omega_tor_DA.coords) epot.attrs['long_name'] = 'Electrostatic Potential ' + s epot.attrs['units'] = 'V' output_DS.update({'epot_' + s: epot}) for key, val in outputs.items(): DA = xarray.DataArray(val, coords=coords) if key in outputs_attrs: for attr_name, attr_val in outputs_attrs[key].items(): DA.attrs[attr_name] = attr_val output_DS.update({key: DA}) if update: self.update(output_DS) return output_DS
[docs] def omega_perp(self, s, eq=None, update=True): """ Perpendicular rotation frequency. Formula: \omega_{perp} = \omega_{E} + \sigma*\omega_P :param s: str. Species. :return: Dataset. """ result = xarray.Dataset() if not eq: # this is required as input in 8_postfit.py printe('WARNING!: Er calculation requires an input equilibrium!') species, ions, ions_with_dens, ions_with_fast = get_species(self) key_e = 'omega_E' for ie in species: if key_e + '_' + ie in self: key_e += '_' + ie if self.check_keys(keys=['psi_n'] + [key_e, 'omega_P_' + s], name='omega_perp_' + s): print(' > Using ' + key_e) helicity = np.sign(eq['plasma_current']) # Correct on all machines? omega_perp = self[key_e] + self['omega_P_' + s] omega_perp.attrs['long_name'] = r'$\omega_{\perp,' + s + r'} = \omega_{E} + \omega_P$' result.update({'omega_perp_' + s: omega_perp}) # J. Callen EPS 2016 - "true perpendicular rotation" if s == 'e' and self.check_keys(keys=['Zeff'], name='omega_alpha_' + s): # alpha coefficient from Eq. (25) in J.D. Callen, C.C. Hegna, and A.J. Cole, Nuclear Fusion 53, 113015 (2013). Zeff = self['Zeff'] alphahat_flutt = 1 + (5 / 9.0) * ( ((45 * np.sqrt(2) / 16.0) * Zeff + (33.0 / 16.0) * Zeff**2) / (1 + (151 * np.sqrt(2) / 72.0) * Zeff + (217 / 288.0) * Zeff**2) ) omega_alpha = omega_perp + (alphahat_flutt - 1) * self['omega_T_e'] omega_alpha.attrs['long_name'] = r'$\omega_{\alpha,e} = \omega_\perp + (\hat{\alpha}^{flutt}_\nu-1) \omega_{T,e}$' result.update({'omega_alpha_e': omega_alpha}) if update: self.update(result) return result
[docs] def angular_momentum(self, s, gas='2H1', xfit='rho', update=True): """ Angular momentum. Formula: L_{\phi} = m * n * \omega_{tor} * <R^2> :param s: str. Species. :return: Dataset. """ result = xarray.Dataset() wkey = 'omega_tor_' + s ions_with_rot = [key.replace('omega_tor_', '') for key in self.variables if key.startswith('omega_tor') and key != 'omega_tor_e'] # common to measure an impurity rotation if s == gas: if wkey + '_KDG' in self: wkey = wkey + '_KDG' printi(' > Using KDG rotation for {:}'.format(s)) elif wkey not in self and len(ions_with_rot) == 1: # don't know which to choose if there are more than 1 wkey = 'omega_tor_' + ions_with_rot[0] if self.check_keys(keys=['avg_R**2', 'avg_vp', 'geo_psi', 'n_' + s, wkey], name='angular_momentum_' + s): m, Z = mZ(s) # angular momentum density dlphi = m * self['n_' + s] * self[wkey] * self['avg_R**2'] dlphi.attrs['long_name'] = r'$L_{\phi,' + s + r'} = m n \omega_{tor} <R^2>$' dlphi.attrs['units'] = 'Nms / m^3' # volume intagral (copied from omfit_classes.fluxSurface) lphi = integrate.cumtrapz(self['avg_vp'] * dlphi, self['geo_psi'], initial=0) vlphi = 1 * dlphi # a DataArray like the others vlphi.values = lphi vlphi.attrs['long_name'] = r'$L_{\phi,' + s + r'} = \int_0^' + xfit + r' dV m n \omega_{tor} <R^2>$' vlphi.attrs['units'] = 'Nms' result.update({'angular_momentum_density_' + s: dlphi, 'angular_momentum_' + s: vlphi}) # collect the total if 'angular_momentum_density' in self: self['angular_momentum_density'] += dlphi else: self['angular_momentum_density'] = 1.0 * dlphi self['angular_momentum_density'].attrs['long_name'] = r'$L_{\phi} = \sum_s m_s n_s \omega_{tor,s} <R^2>$' self['angular_momentum_density'].attrs['units'] = 'Nms / m^3' if 'angular_momentum' in self: self['angular_momentum'] += vlphi else: self['angular_momentum'] = 1.0 * vlphi self['angular_momentum'].attrs['long_name'] = r'$L_{\phi} = \int_0^' + xfit + r' dV \sum_s m_s n_s \omega_{tor,s} <R^2>$' self['angular_momentum'].attrs['units'] = 'Nms' # time derivative, with separate density and velocity terms if 'dn_{:}_dt'.format(s) in self: dlphidtn = m * self['dn_{:}_dt'.format(s)] * self[wkey] * self['avg_R**2'] dlphidtn.attrs['long_name'] = r'$L_{\phi,' + s + r',t} = m dn/dt \omega_{tor} <R^2>$' dlphidtn.attrs['units'] = 'Nm / m^3' lphidtn = integrate.cumtrapz(self['avg_vp'] * dlphidtn, self['geo_psi'], initial=0) vlphidtn = 1 * dlphi vlphidtn.values = lphidtn vlphidtn.attrs['long_name'] = r'$L_{\phi,' + s + r',t} = \int_0^' + xfit + r' dV m dn/dt \omega_{tor} <R^2>$' vlphidtn.attrs['units'] = 'Nm' dltotdtn = self.get('dangular_momentum_density_dt_n', 0) + dlphidtn dltotdtn.attrs['long_name'] = r'$L_{\phi,t} = m dn/dt \omega_{tor} <R^2>$' dltotdtn.attrs['units'] = 'Nm / m^3' vltotdtn = self.get('dangular_momentum_dt_n', 0) + vlphidtn vltotdtn.attrs['long_name'] = r'$L_{\phi,t} = \int_0^' + xfit + r' dV m dn/dt \omega_{tor} <R^2>$' vltotdtn.attrs['units'] = 'Nm' result.update( { 'dangular_momentum_density_{:}_dt_n'.format(s): dlphidtn, 'dangular_momentum_{:}_dt_n'.format(s): dlphidtn, 'dangular_momentum_density_dt_n': dltotdtn, 'dangular_momentum_dt_n': vltotdtn, } ) if 'd{:}_dt'.format(wkey) in self: dlphidtw = m * self['n_' + s] * self['d{:}_dt'.format(wkey)] * self['avg_R**2'] dlphidtw.attrs['long_name'] = r'$L_{\phi,' + s + r',t} = m n d\omega_{tor}/dt <R^2>$' dlphidtw.attrs['units'] = 'Nm / m^3' lphidtw = integrate.cumtrapz(self['avg_vp'] * dlphidtw, self['geo_psi'], initial=0) vlphidtw = 1 * dlphi vlphidtw.values = lphidtw vlphidtw.attrs['long_name'] = r'$L_{\phi,' + s + r',t} = \int_0^' + xfit + r' dV m n d\omega_{tor}/dt <R^2>$' vlphidtw.attrs['units'] = 'Nm' dltotdtw = self.get('dangular_momentum_density_dt_v', 0) + dlphidtw dltotdtw.attrs['long_name'] = r'$L_{\phi,t} = m n d\omega_{tor}/dt <R^2>$' dltotdtw.attrs['units'] = 'Nm / m^3' vltotdtw = self.get('dangular_momentum_dt_v', 0) + vlphidtw vltotdtw.attrs['long_name'] = r'$L_{\phi,t} = \int_0^' + xfit + r' dV m n d\omega_{tor}/dt <R^2>$' vltotdtw.attrs['units'] = 'Nm' result.update( { 'dangular_momentum_density_{:}_dt_v'.format(s): dlphidtw, 'dangular_momentum_{:}_dt_v'.format(s): dlphidtw, 'dangular_momentum_density_dt_v': dltotdtw, 'dangular_momentum_dt_v': vltotdtw, } ) if 'dn_{:}_dt'.format(s) in self: result.update( { 'dangular_momentum_density_{:}_dt_nv'.format(s): dlphidtn + dlphidtw, 'dangular_momentum_{:}_dt_nv'.format(s): dlphidtn + dlphidtw, 'dangular_momentum_density_dt_nv': dltotdtn + dltotdtw, 'dangular_momentum_dt_nv': vltotdtn + vltotdtw, } ) if update: self.update(result) return result
[docs] def xderiv(self, key, coord='psi', update=True): """ Returns the derivative of the value corresponding to key on the spatial coordinate coord. :param key: str. The variable :param coord: str. The radial coordinate with respect to which the derivative is taken :param update: bool. Set to true to update self, if False, only returns and does not update self. :return: Dataset """ result = xarray.Dataset() xkey = self.get_xkey() if not self.check_keys([key, coord], name=f'd{key}_d{coord}'): return result dc = np.gradient(np.atleast_2d(self[coord].values), axis=-1) dkey = 'd' + key + '_d' + xkey if dkey in self and any(isfinite(v) for v in nominal_values(self[dkey].values).flat) and coord != xkey: # if we have derivative UQ from the fit method, keep it and just swap coordinates dx = np.gradient(np.atleast_2d(self[xkey].values), axis=-1) dkdc = DataArray(self[dkey] * (dx / dc), coords=self[key].coords) else: # otherwise we have to calculate the derivative numerically - huge uncertainties when propagated try: dkdc = DataArray(ugrad1(self[key].values) / dc, coords=self[key].coords) except ZeroDivisionError: # This happens if dc array have 0s and self[key].values is a uarray/has uncertainty. i_bad = dc == 0 dc[i_bad] = 1 dkdc_arr = ugrad1(self[key].values) / dc dkdc_arr[i_bad] = ufloat(np.nan, np.nan) dkdc = DataArray(ugrad1(self[key].values) / dc, coords=self[key].coords) if coord in ['rho', 'psi_n']: dkdc.attrs['units'] = self[key].attrs.get('units', '') dkdc.attrs['long_name'] = r'd{:} / d$\{:}$'.format(key, coord) result['d' + key + '_d' + coord] = dkdc if update: self.update(result) return result
[docs] def find_pedestal(self, data_array, der=True, update=True): """ Find the pedestal extent and values. :param data_array: DataArray. 2D fit/self profiles. :param der: bool. Find extent of high-derivative region (default is to find edge "bump" feature. :return: Dataset. Edge feature inner/center/outer radial coordinates and corresponding values. """ result = xarray.Dataset() xkey = self.get_xkey() # get x,y values da = data_array.transpose('time', xkey) x = da[xkey].values ys = nominal_values(da.values) if der: ys = np.gradient(ys, axis=1) / np.gradient(x) # find bump feature (start, middle, stop) indexes indxs = np.apply_along_axis(find_feature, 1, ys, x=x, M=0.01, k=5) # get the corresponding x,y points xpts = np.array([x[i] for i in indxs]).T ypts = np.array([y[i] for y, i in zip(da.values, indxs)]).T # check for cases in which no feature was found (indexes are 0 or maximum) nx = x.shape[0] ok = [] for i, j, k in indxs: if i in [0, nx] or k in [0, nx]: ok.append(np.nan) else: ok.append(1.0) # collect named DataArrays in a Dataset for i, key in enumerate(['inner', 'center', 'outer']): xname = '{:}_pedestal_{:}_{:}'.format(da.name, key, xkey) yname = '{:}_pedestal_{:}_{:}'.format(da.name, key, 'value') xda = xarray.DataArray(xpts[i] * ok, dims=['time'], coords={'time': da['time'].values}, name=xname) yda = xarray.DataArray(ypts[i] * ok, dims=['time'], coords={'time': da['time'].values}, name=yname) result.update({xname: xda, yname: yda}) if update: self.update(result) return result
[docs] def pedestal_mwidth(key): """ Calculate the width of the pedestal in meters at the magnetic axis "midplane". :param key: str. Quantity that the pedestal extent has been calculated for. :return: Dataset. Time evolution of pedestal width. """ xkey = self.get_xkey() # standard check if not self.check_keys( [xkey, 'R_midplane', key + '_pedestal_inner_' + xkey, key + '_pedestal_outer_' + xkey], name=key + ' pedestal width' ): return {} # calculate R_midplane width at each time width = [] for t in self['time']: rin = self['R_midplane'].sel(**{'time': t, xkey: self[key + '_pedestal_inner_' + xkey].sel(time=t)}) rout = self['R_midplane'].sel(**{'time': t, xkey: self[key + '_pedestal_outer_' + xkey].sel(time=t)}) width.append(rout - rin) # form DataArray result = xarray.DataArray(width, dims=['time'], coords={'time': self['time'].values}, name=key + '_pedestal_width_R_midplane') result.attrs['long_name'] = xkey + ' pedestal width in meters at the midplane' result.attrs['units'] = 'm' if update: self.update(result) return result
[docs] def calc_intrinsic_rotation(self, eq=None, update=True): """ Evaluation of the omega_intrinsic function :return: omega_intrinsic """ result = xarray.Dataset() if not eq: # this is required as input in 8_postfit.py printe('WARNING!: Er calculation requires an input equilibrium!') chk = self.check_keys( keys=['geo_R', 'geo_a', 'rho', 'T_e', 'T_2H1', 'avg_q', 'avg_fc', 'avg_Btot', 'R_midplane', 'T_e_pedestal_inner_rho'], name='TSD & AA intrinsic rotation model', ) if not chk: return result da = 0.0 * self['time'].rename('omega_ped_TSD') for itime, time in enumerate(self['time']): derivedt = self.isel(time=itime) eqt = eq.isel(time=itime) rho = derivedt['rho'].values derivedt = derivedt.isel(rho=np.where(rho <= 1)[0]) geo_R = derivedt['geo_R'].values[-1] geo_a = derivedt['geo_a'].values[-1] geo_Rx = eqt['R_xpoint'].values rho = derivedt['rho'].values I_p_sgn = np.sign(eqt['plasma_current'].values) Te = nominal_values(derivedt['T_e'].values) Ti = nominal_values(derivedt['T_2H1'].values) q = nominal_values(derivedt['avg_q'].values) fc = nominal_values(derivedt['avg_fc'].values) B0 = nominal_values(derivedt['avg_Btot'].values[0]) R_mp = nominal_values(derivedt['R_midplane'].values) rhoPed = derivedt['T_e_pedestal_inner_rho'].values rhoSep = 1.0 # derivedt['T_e_pedestal_outer_rho'].values # Call function with inputs da.values[itime] = utils_fusion.intrinsic_rotation(geo_a, geo_R, geo_Rx, R_mp, rho, I_p_sgn, Te, Ti, q, fc, B0, rhoPed, rhoSep) da.attrs['long_name'] = 'Tim Stoltzfus-Dueck & Arash Ashourvan intrinsic rotation model' da.attrs['units'] = 'rad/s' result.update({'omega_ped_TSD': da}) if update: self.update(result) return result
[docs] def P_rad_int(self, update=True): """ Volume integral of the total radiated power Formula: P_{rad,int} = \int{P_{rad}} :return: """ result = xarray.Dataset() chk = self.check_keys(keys=['P_rad', 'avg_vp', 'geo_psi'], name='Integrated radiated power') if not chk: return result da = 0 * self['P_rad'].rename('P_rad_int') pr = self['P_rad'].values # assumed in W/cm^3 # volume integral copied from omfit_classes.fluxSurface da.values = integrate.cumtrapz(self['avg_vp'] * pr, self['geo_psi'], initial=0) da.attrs['long_name'] = r'$\int{dV P_{rad}}$' da.attrs['units'] = 'MW' result.update({'P_rad_int': da}) if update: self.update(result) return result
[docs] def KDG_neoclassical(self, mag_field=None, xfit='rho', update=True): """ Poloidal velocity from Kim, Diamond Groebner, Phys. Fluids B (1991) with poloidal in-out correction based on Ashourvan (2017) :return: Places poloidal velocity for main-ion and impurity on outboard midplane into DERIVED. Places neoclassical approximation for main-ion toroidal flow based on measured impurity flow in self. """ # Default to only work for D, C as primary and impurity ion if 'n_12C6' not in self: printe(' Cannot run KDG without n_12C6') return if 'n_2H1' not in self: printe('Cannot run KDG without n_2H1') return if not mag_field: # this is required as input in 8_postfit.py printe('WARNING!: Er calculation requires a mag_field input! See 8_postfit.py for format.') # OMFITprofiles convenience nx = len(self[xfit]) ntimes = len(self['time']) #### # Geometric Quantities #### # Midplane major radius R R_midplane = self['R_midplane'].values # Major radius of magnetic axis R0 = R_midplane[:, 0] # Midplane minor radius r = R-R0 r_midplane = R_midplane - R0[:, np.newaxis] # Local inverse aspect ratio eps = r/R0 epsilon = r_midplane / R0[:, np.newaxis] # Circulating (Eq. C18) and trapped fractions fc_KDG = 1.0 - 1.46 * usqrt(epsilon) + 0.46 * epsilon * usqrt(epsilon) ft_KDG = 1.0 - fc_KDG # g function (Eq. C16) g_KDG = ft_KDG / fc_KDG #### # Magnetic field quantities #### mag_field = OMFITlib_general.mag_field_components(eq, make_plots=False) Bt_midplane = np.zeros((ntimes, nx), dtype=float) Bp_midplane = np.zeros((ntimes, nx), dtype=float) for i in range(ntimes): newr = self['R_midplane'].isel(time=i) newz = eq['Z_maxis'].isel(time=i).values * np.ones_like(newr) newrz = np.vstack((newr, newz)).T Bt_midplane[i, :] = RegularGridInterpolator( (mag_field['R_grid'][:, 0], mag_field['Z_grid'][0, :]), mag_field['Bt_arr'][:, :, i], bounds_error=False, fill_value=None )(newrz) Bp_midplane[i, :] = RegularGridInterpolator( (mag_field['R_grid'][:, 0], mag_field['Z_grid'][0, :]), mag_field['Bp_arr'][:, :, i], bounds_error=False, fill_value=None )(newrz) # Total magnetic field on midplane Btot_midplane = usqrt(Bt_midplane**2 + Bp_midplane**2) # Flux-surface averaged <B**2> B2_fsa = self['avg_Btot**2'].values #### # Species dependent quantities #### # Main ion mass and charge mi, Zi = OMFITlib_general.mZ('2H1') # Impurity mass and charge mI, ZI = OMFITlib_general.mZ('12C6') # Main ion density ni = self['n_2H1'].values # Impurity density nI = self['n_12C6'].values # Main ion temperature Ti = self['T_2H1'].values # Impurity density TI = self['T_12C6'].values # Main ion pressure Pi = self['p_2H1'].values # Impurity pressure PI = self['p_12C6'].values # Impurity strength parameter (after Eq. 9) alpha = (nI * ZI**2) / (ni * Zi**2) # Main ion thermal speed (m/s) sqrt(2T/m) (after Eq. 34) vti = usqrt(2.0 * Ti * constants.e / mi) # Impurity ion thermal speed vtI = usqrt(2.0 * TI * constants.e / mI) # Larmor radius (m) rhoi = (mi * vti) / (Zi * constants.e * Btot_midplane) rhoI = (mI * vtI) / (ZI * constants.e * Btot_midplane) # Poloidal Larmor radius (m) rhoip = (mi * vti) / (Zi * constants.e * Bp_midplane) # Scale-lengths (1/m) and carries minus sign for decreasing profile, # i.e. LTi is negative (after Eq. 34) dR = ugrad1(R_midplane) dTi = ugrad1(Ti) dPi = ugrad1(Pi) dPI = ugrad1(PI) # 1/LTi LTi_inv = (1.0 / Ti) * (dTi / dR) # 1/LPi LPi_inv = (1.0 / Pi) * (dPi / dR) # 1/LPI LPI_inv = (1.0 / PI) * (dPI / dR) # Viscocities # main ion (Eq. C15) mu_00_i = g_KDG * (alpha + np.sqrt(2.0) - np.log(1.0 + np.sqrt(2.0))) mu_01_i = g_KDG * ((3.0 / 2.0) * alpha + 4.0 / np.sqrt(2.0) - (5.0 / 2.0) * np.log(1.0 + np.sqrt(2.0))) mu_11_i = g_KDG * ((13.0 / 4.0) * alpha + (39.0 / 4.0) / np.sqrt(2.0) - (25.0 / 4.0) * np.log(1.0 + np.sqrt(2.0))) # impurity (Eq. C15) alpha_inv = 1.0 / alpha mu_00_I = g_KDG * (alpha_inv + np.sqrt(2.0) - np.log(1.0 + np.sqrt(2.0))) mu_01_I = g_KDG * ((3.0 / 2.0) * alpha_inv + 4.0 / np.sqrt(2.0) - (5.0 / 2.0) * np.log(1.0 + np.sqrt(2.0))) mu_11_I = g_KDG * ((13.0 / 4.0) * alpha_inv + (39.0 / 4.0) / np.sqrt(2.0) - (25.0 / 4.0) * np.log(1.0 + np.sqrt(2.0))) # Eq. 32 beta = (27.0 / 4.0) ** 2 * (mi / mI) ** 2 * (15.0 / 2.0 + usqrt(2.0 * alpha) * (vti / vtI)) ** (-1.0) # Eq. 31 bigD = mu_00_i * (mu_11_i + np.sqrt(2.0) + alpha - alpha * beta) - mu_01_i**2 bigD[bigD == 0] *= np.nan # can happen if midplane mag_field interpolation was out of bounds # Eq. 29 K1 = (1.0 / bigD) * mu_01_i * (np.sqrt(2.0) + alpha - alpha * beta) # Eq. 30 K2 = (1.0 / bigD) * (mu_00_i * mu_11_i - mu_01_i**2) #### # Poloidal velocities #### # Eq. 33 V_pol_i = (1.0 / 2.0) * vti * rhoi * (K1 * LTi_inv) * Btot_midplane * Bt_midplane / B2_fsa # Eq. 34 V_pol_I = ( (1.0 / 2.0) * vti * rhoi * ((K1 + (3.0 / 2.0) * K2) * LTi_inv - LPi_inv + (Zi / ZI) * (TI / Ti) * LPI_inv) * Btot_midplane * Bt_midplane / B2_fsa ) #### # Toroidal rotation #### # Eq. 40 # Sign w.r.t. Ip comes through Bpol in rhoip. omega_I_minus_omega_i = (3.0 / 4.0) * K2 * (vti / R_midplane) * rhoip * LTi_inv # Assume outer midplane omega_tor_12C6 is a flux surface averaged quantity (even if not accurate especially at the edge) omega_I = self['omega_tor_12C6'].values # Calculate KDG 2H1 omega toroidal # By algebra omega_i = omega_I - (omega_I - omega_i) omega_i = omega_I - omega_I_minus_omega_i # Add to self V_pol_i_ds = xarray.Dataset() V_pol_i_da = 0.0 * self['n_2H1'].rename('V_pol_2H1_KDG') V_pol_i_da.values = V_pol_i V_pol_i_da.attrs['long_name'] = 'Kim-Diamond-Groebner 2H1 V_pol' V_pol_i_da.attrs['units'] = 'm/s' V_pol_i_ds.update({'V_pol_2H1_KDG': V_pol_i_da}) if update: self.update(V_pol_i_ds) V_pol_I_ds = xarray.Dataset() V_pol_I_da = 0.0 * self['n_12C6'].rename('V_pol_12C6_KDG') V_pol_I_da.values = V_pol_I V_pol_I_da.attrs['long_name'] = 'Kim-Diamond-Groebner 12C6 V_pol' V_pol_I_da.attrs['units'] = 'm/s' V_pol_I_ds.update({'V_pol_12C6_KDG': V_pol_I_da}) if update: self.update(V_pol_I_ds) omega_i_ds = xarray.Dataset() omega_i_da = 0.0 * self['omega_tor_12C6'].rename('omega_tor_2H1_KDG') omega_i_da.values = omega_i omega_i_da.attrs['long_name'] = 'Kim-Diamond-Groebner 2H1 omega_tor' omega_i_da.attrs['units'] = 'rad/s' omega_i_ds.update({'omega_tor_2H1_KDG': omega_i_da}) if update: self.update(omega_i_ds) if 'V_pol_12C6' in self: # Measured omega_tor_12C6 is not a flux surface averaged quantity. # Calculate what the 12C6 flux surface averaged omega toroidal should be based on Eq (7) in Ashourvan 2017 (unpublished work) # This correction includes the poloidal in-out assymetry which is not negligible in the edge region avg_omega_I = ( self['omega_tor_12C6'].values + R_midplane * Bt_midplane * self['V_pol_12C6'] * (self['avg_1/R**2'] - (1 / (R_midplane**2))) / Bp_midplane ) print(' * Poloidal in-out assymtery correction applied') # Calculate KDG flux surface averaged 2H1 omega toroidal avg_omega_i = avg_omega_I - omega_I_minus_omega_i # Add to DERIVED avg_omega_I_ds = xarray.Dataset() avg_omega_I_da = 0.0 * self['omega_tor_12C6'].rename('omega_tor_avg_12C6') avg_omega_I_da.values = avg_omega_I avg_omega_I_da.attrs['long_name'] = 'Flux surface averaged 12C6 omega_tor' avg_omega_I_da.attrs['units'] = 'rad/s' avg_omega_I_ds.update({'omega_tor_avg_12C6': avg_omega_I_da}) if update: self.update(avg_omega_I_ds) avg_omega_i_ds = xarray.Dataset() avg_omega_i_da = 0.0 * self['omega_tor_12C6'].rename('omega_tor_avg_2H1_KDG') avg_omega_i_da.values = avg_omega_i avg_omega_i_da.attrs['long_name'] = 'Kim-Diamond-Groebner flux surface averaged 2H1 omega_tor' avg_omega_i_da.attrs['units'] = 'rad/s' avg_omega_i_ds.update({'omega_tor_avg_2H1_KDG': avg_omega_i_da}) if update: self.update(avg_omega_i_ds)
[docs] def get_nclass_conductivity_and_bootstrap(self, gas='2H1', xfit='rho', device=None, eq=None, debug=False, update=True): """ Call the neoclassical conductivity and bootstrap calculations from utils_fusion :return: Dataset. COntaining conductivity and bootstrap DataArrays """ result = Dataset() if not eq: # this is required as input in 8_postfit.py printe('WARNING!: Er calculation requires an input equilibrium!') # standard checks species, ions, ions_with_dens, ions_with_fast = get_species(self) needed_keys = ['q', 'geo_eps', 'avg_fc', 'geo_R', 'n_e', 'T_e', 'psi_n', f'T_{gas}', f'dT_{gas}_dpsi'] needed_keys += [f'n_{s}' for s in ions_with_dens if 'fast' not in s] needed_keys += [f'dn_{s}_dpsi' for s in ions_with_dens if 'fast' not in s] chk = self.check_keys(needed_keys, name='Conductivity') if chk: nis = [] dnis_dpsi = [] Zis = [] for species in ions_with_dens: if 'fast' not in species: s2 = ["".join(x) for _, x in itertools.groupby(species, key=lambda x: str(x).isdigit())] charge_state = int(s2[-1]) Zis.append(charge_state) nis.append(self['n_' + species].values) dnis_dpsi.append(self['dn_' + species + '_dpsi'].values) # note, we assume all the thermal ions have equilibrated dTi_dpsi = self[f'dT_{gas}_dpsi'].values Ti = self[f'T_{gas}'].values sigma_nc = utils_fusion.nclass_conductivity( psi_N=self['psi_n'].values, # this is only needed if you turn the plot on ne=self['n_e'].values, # ne in m^-3 Te=self['T_e'].values, # Te in eV Ti=Ti, # Ti in eV nis=nis, # density of ion species in m^-3 Zis=Zis, # charge states of ion species q=self['q'].values, # safety factor eps=self['geo_eps'].values, # inverse aspect ratio fT=1 - self['avg_fc'].values, # trapped particle fraction R=self['geo_R'].values, # major radius in m version='neo_2021', # should be the same with utils_fusion.sauter_bootstrap; ) signc_da = DataArray( sigma_nc, coords=[('time', self['time'].values), (xfit, self[xfit].values)], name='nclass_sigma', attrs=dict(units='Ohm^-1 m^-1', long_name='Neoclassical conductivity from Sauter 1999'), ) result.update({'nclass_sigma': signc_da}) needed_keys += ['avg_F', 'psi', 'p_thermal', 'dn_e_dpsi', 'dT_e_dpsi'] chk = self.check_keys(needed_keys, name='Bootstrap') if chk: jb_kwargs = dict( psi_N=self['psi_n'].values, # this is only needed if you turn the plot on ne=self['n_e'].values, # ne in m^-3 Te=self['T_e'].values, # Te in eV Ti=Ti, # Ti in eV nis=nis, # density of ion species in m^-3 Zis=Zis, # charge states of ion species I_psi=abs(self['avg_F'].values), psiraw=self['psi'].values, device=device, p=self['p_thermal'].values, # pressure in Pa q=self['q'].values, # safety factor eps=self['geo_eps'].values, # inverse aspect ratio fT=1 - self['avg_fc'].values, # trapped particle fraction R=self['geo_R'].values, # major radius in m charge_number_to_use_in_ion_collisionality='Koh', charge_number_to_use_in_ion_lnLambda='Zavg', return_units=False, dT_e_dpsi=self['dT_e_dpsi'].values, # derivative of Te dT_i_dpsi=dTi_dpsi, # derivative of Ti dn_e_dpsi=self['dn_e_dpsi'].values, # derivative of ne dnis_dpsi=dnis_dpsi, # derivative of ion densities ) # Published Sauter bootstrap with units # neo_2021 a new set of analytical coefficients from A.Redl, et al # jboot1 'Bootstrap from Sauter 1999' jboot = DataArray( utils_fusion.sauter_bootstrap(version='neo_2021', **jb_kwargs), # A/m^2 coords=[('time', self['time'].values), (xfit, self[xfit].values)], attrs={'units': 'A m^-2', 'long_name': 'Bootstrap from A.Redl, et al 2021'}, ) result.update({'jboot_sauter': jboot}) # J parallel normalized with EFIT standards jboot_efit = utils_fusion.sauter_bootstrap(version='osborne', **jb_kwargs) # A/m^2 # EFIT sign convention. It should match the sign of itot, no matter what itot = eq['plasma_current'].values jboot_efit *= np.sign(np.nanmean(nominal_values(jboot_efit))) * np.sign(itot)[:, np.newaxis] # EFIT normalization R0 = utils_fusion.device_specs(device)['R0'] if 'R0' is None: R0 = np.nanmean(eq['R_center'].values) # (m) inv_r = self['avg_1/R'].values cross_sec = self['geo_cxArea'].sel(**{xfit: 1.0, 'method': 'nearest'}).values if debug: print(' >> Normalizing to EFIT convention using') print(f' >> R0 = {R0}') print(f" >> 1/R = {self['avg_1/R'].sel(**{xfit: 1.0, 'method': 'nearest'}).values}") print(f' >> Cross sectional area = {cross_sec}') print(f' >> Total current = {itot}') jboot_efit = utils_fusion.current_to_efit_form(R0, inv_r.T, cross_sec, itot, jboot_efit.T).T jboot_efit_da = DataArray( jboot_efit, coords=[('time', self['time'].values), (xfit, self[xfit].values)], attrs=dict(units='Unitless', long_name='Normalized Boostrap Current for EFIT'), ) result.update({'jboot_efit': jboot_efit_da}) if debug: # see the larger error bars when errors are propagated through finite difference derivatives db_kwargs = dict([(k, v) for k, v in jb_kwargs.items() if 'dpsi' not in k]) jboot_efit = utils_fusion.sauter_bootstrap(version='osborne', **db_kwargs) # A/m^2 # EFIT sign convention. It should match the sign of itot, no matter what jboot_efit *= np.sign(np.nanmean(nominal_values(jboot_efit))) * np.sign(itot)[:, np.newaxis] # EFIT normalization jboot_efit = utils_fusion.current_to_efit_form(R0, inv_r.T, cross_sec, itot, jboot_efit.T).T jboot_efit = DataArray( jboot_efit, coords=[('time', self['time'].values), (xfit, self[xfit].values)], attrs=dict( units='Unitless', long_name='Normalized Boostrap Current for EFIT without propagating fit derivative error bars' ), ) result.update({'jboot_efit_nodpsi': jboot_efit}) if update: self.update(result) return result
[docs] def check_keys(self, keys=[], name='', print_error=True): """ Check to make sure required data is available """ missing = [] for k in keys: if not k in self: missing.append(k) if missing: if not name: name = 'value' if print_error: printw(' WARNING: Could not form {:}. Missing {:}'.format(name, ', '.join(['`%s`' % x for x in missing]))) return False return True
[docs] @dynaLoad def reset_coords(self, names=None, drop=False): """ Pass through implementation of Dataset.reset_coords(). Given names of coordinates, convert them to variables. Unlike Dataset.reset_corrds(), however, this function modifies in place! param names: Names of coords to reset. Cannot be index coords. Default to all non-index coords. param drop: If True, drop coords instead of converting. Default False. """ self._dataset = self._dataset.reset_coords(names=names, drop=drop) return self
[docs] def combine(self, other, combine_attrs='drop_conflicts'): """ Pass through implementation of xarray.combine_by_coords. Given another OMFITprofile, it seeks to combine the none conflicting data in the two objects. param other: Another instance of OMFITprofiles. param combine_attr: Keyword controlled behavior regarding conflicting attrs (as opposed to vars). Default to 'drop_conflicts' where conflicting attrs are dropped from the result. (see xarray.combine_by_coords) """ from xarray import combine_by_coords self.load() other.load() self._dataset = combine_by_coords([self._dataset, other._dataset], combine_attrs=combine_attrs) return self
[docs]class OMFITprofilesDynamic(OMFITncDynamicDataset): """ Class for dynamic calculation of derived quantities :Examples: Initialize the class with a filename and FIT Dataset. >> tmp=OMFITprofiles('test.nc', fits=root['OUTPUTS']['FIT'], equilibrium=root['OUTPUTS']['SLICE']['EQ'], root['SETTINGS']['EXPERIMENT']['gas']) Accessing a quantity will dynamically calculate it. >> print tmp['Zeff'] Quantities are then stored (they are not calculated twice). >> tmp=OMFITprofiles('test.nc', fits=root['OUTPUTS']['FIT'], equilibrium=root['OUTPUTS']['SLICE']['EQ'], main_ion='2H1') >> uband(tmp['rho'],tmp['n_2H1']) """ def __init__(self, filename, fits=None, equilibrium=None, main_ion='2H1', **kw): OMFITncDataset.__init__(self, filename, **kw) if fits: self.update(fits) # profile fit dimension dims = list(self.dims.keys()) xkey = dims[dims.index('time') - 1] # collect some meta data about which particle species have what info available self.attrs['main_ion'] = str(main_ion) species, ions, ions_with_dens, ions_with_fast = get_species(self) self['species'] = DataArray(species, dims=['species']) ions += [self.attrs['main_ion']] self['ions'] = DataArray(ions, dims=['ions']) ions_with_dens += [self.attrs['main_ion']] self['ions_with_dens'] = DataArray(ions_with_dens, dims=['ions_with_dens']) ions_with_rot = [key.replace('omega_tor_', '') for key in self if key.startswith('omega_tor') and key != 'omega_tor_e'] self['ions_with_rot'] = DataArray(ions_with_rot, dims=['ions_with_rot']) # interpolate other radial coordinates from equilibrium printd('- Interpolating equilibrium quantities') needed = { 'avg_R', 'avg_R**2', 'avg_1/R', 'avg_1/R**2', 'avg_Btot', 'avg_Btot**2', 'avg_vp', 'avg_q', 'avg_fc', 'avg_F', 'avg_P', 'geo_psi', 'geo_R', 'geo_Z', 'geo_a', 'geo_eps', 'geo_vol', } eqcoords = needed.intersection(list(equilibrium.keys())) for meas in eqcoords: if 'profile_' + xkey != meas and 'profile_psi_n' in equilibrium[meas]: yy = [] for t in self['time'].values: eq_t = equilibrium.sel(time=t, method='nearest') x = np.squeeze(eq_t['profile_' + xkey]) y = np.squeeze(nominal_values(eq_t[meas].values)) yy.append(interp1e(x, y)(self[xkey].values)) # Ensure that 'q' is not extrapolated outside the separatrix if meas == 'profile_q': mask = self[xkey].values > 1.0 yy[-1][mask] = np.nan yy = np.array(yy) key = meas.replace('profile_', '') self[key] = DataArray( yy, coords=[fits['time'], fits[xkey]], dims=['time', xkey], attrs=copy.deepcopy(equilibrium[meas].attrs) ) self.set_coords(eqcoords) # reassign global attrs clobbered when assigning eq DataArrays with attrs self.attrs['main_ion'] = str(main_ion) self.save() self.update_dynamic_keys(self.__class__) def __getitem__(self, key): # map specific quantities to class functions mapper = {} mapper['n_' + self.attrs['main_ion']] = 'calc_n_main_ion' mapper['T_' + self.attrs['main_ion']] = 'calc_T_main_ion' # resolve mappings if key not in self: if key in mapper: getattr(self, mapper[key])() if mapper[key] in self._dynamic_keys: self._dynamic_keys.pop(self._dynamic_keys.index(mapper[key])) # return value return OMFITncDynamicDataset.__getitem__(self, key)
[docs] def calc_n_main_ion(self): """ Density of the main ion species. Assumes quasi-neutrality. :return: None. Updates the instance's Dataset in place. """ main_ion = str(self.attrs['main_ion']) mg, zg = mZ(main_ion) nz = self['n_e'] for key in self['ions_with_dens'].values: if key != main_ion: nz -= self['n_' + key].values * mZ(key)[1] self['n_' + main_ion] = nz / zg invalid = np.where(self['n_' + main_ion].values <= 0)[0] if len(invalid) > 0: printe(' Had to force main ion density to be always positive!') printe(' This will likely present a problem when running transport codes!') valid = np.where(self['n_' + main_ion].values > 0)[0] self['n_' + main_ion].values[invalid] = np.nanmin(self['n_' + main_ion].values[valid])
[docs] def calc_T_main_ion(self): """ Temperature of the main ion species. Assumes it is equal to the measured ion species temperature. If there are multiple impurity temperatures measured, it uses the first one. :return: None. Updates the instance's Dataset in place. """ main_ion = str(self.attrs['main_ion']) impurities_with_temp = [k for k in self['ions'].values if k != 'b' and 'T_' + k in list(self.keys())] nwith = len(impurities_with_temp) if nwith == 0: raise OMFITexception("No main or impurity ion temperatures measured") if nwith > 1: printw( "WARNING: Multiple impurities temperatures measured, setting main ion temperature based on {:}".format( impurities_with_temp[0] ) ) for ion in impurities_with_temp: self['T_' + main_ion] = self[f'T_{ion}'] * 1 break
[docs] def calc_Zeff(self): r""" Effective charge of plasma. Formula: Z_{eff} = \sum{n_s Z_s^2} / \sum{n_s Z_s} :return: None. Updates the instance's Dataset in place. """ # calculate Zeff (not assuming quasi-neutrality) nz_sum = np.sum([self['n_' + i].values * mZ(i)[1] for i in self['ions_with_dens'].values], axis=0) nz2_sum = np.sum([self['n_' + i].values * mZ(i)[1] ** 2 for i in self['ions_with_dens'].values], axis=0) z_eff = nz2_sum / nz_sum + 0 * self['n_e'].rename('Zeff') z_eff.attrs['long_name'] = r'$Z_{eff}$' self['Zeff'] = z_eff
[docs] def calc_Total_Zeff(self): r""" Effective charge of plasma. Formula: Z_{eff} = \sum{n_s Z_s^2} / \sum{n_s Z_s} :return: None. Updates the instance's Dataset in place. """ main_ion = str(self.attrs['main_ion']) mg, zg = mZ(main_ion) nz = self['n_e'] for key in self['ions_with_dens'].values: if key != main_ion: nz -= self['n_' + key].values * mZ(key)[1] self['n_' + main_ion] = nz / zg invalid = np.where(self['n_' + main_ion].values <= 0)[0] if len(invalid) > 0: printe(' Had to force main ion density to be always positive!') printe(' This will likely present a problem when running transport codes!') valid = np.where(self['n_' + main_ion].values > 0)[0] self['n_' + main_ion].values[invalid] = np.nanmin(self['n_' + main_ion].values[valid])
if __name__ == '__main__': # ensure that all specified model_tree_quantities can be translated to have <=12 chars for s in model_tree_species: for q in model_tree_quantities: item0 = q.format(species=s) item = OMFITprofiles.mds_translator(None, item0)