Source code for omfit_classes.omfit_omas_utils

try:
    # framework is running
    from .startup_choice import *
except ImportError as _excp:
    # class is imported by itself
    if (
        'attempted relative import with no known parent package' in str(_excp)
        or 'No module named \'omfit_classes\'' in str(_excp)
        or "No module named '__main__.startup_choice'" in str(_excp)
    ):
        from startup_choice import *
    else:
        raise

from omfit_classes.utils_fusion import tokamak, is_device, sauter_bootstrap
from omfit_classes.utils_math import line_intersect
from omfit_classes.utils_base import compare_version
from omfit_classes.omfit_mds import OMFITmdsValue
from omfit_classes.omfit_eqdsk import gEQDSK_COCOS_identify, OMFITgeqdsk
from omfit_classes.omfit_efund import get_mhdindat
from omfit_classes.omfit_omas import OMFITods
from omfit_classes.exceptions_omfit import doNotReportException as DoNotReportException

import os
import numpy as np
import glob
import omas
from scipy.interpolate import interp1d
from omas import *
from matplotlib import pyplot as plt

__all__ = [
    'add_generic_OMFIT_info_to_ods',
    'add_experiment_info_to_ods',
    'ensure_consistent_experiment_info',
    'verify_ods_eq_code_params_ts',
    'setup_hardware_overlay_cx',
    'toray_to_omas',
    'nubeam_to_omas',
    'add_hardware_to_ods',
    'multi_efit_to_omas',
    'pf_coils_to_ods',
    'make_sample_ods',
    'transp_ic_to_omas',
    'setup_magnetics_hardware_description_general',
    'get_shape_control_points',
    'setup_position_control_hardware_description_general',
    'setup_pulse_schedule_hardware_description_general',
    'classify_ods_eq_topology',
    'get_strike_point',
    'OmasUtilsBadInput',
    'orthogonal_distance',
    'load_sample_eq_specifications',
    'GradeEquilibrium',
]


[docs]class OmasUtilsBadInput(ValueError, DoNotReportException): """Bad inputs were used to a method in omfit_omas_utils.py"""
# OMAS extra structures # Based on https://github.com/gafusion/OMFIT-source/blob/unstable/omfit/omfit_classes/fluxSurface.py#L3122-L3167 _extra_structures = { 'pulse_schedule': { 'pulse_schedule.position_control.flux_gates': { "data_type": "STRUCT_ARRAY", "documentation": "Each so-called gate is a pair of points in (R,Z) between which " "a specified flux surface (by default, the separatrix) must pass. For example, " "the separatrix might need to pass through the entrance to a narrow structure in the " "divertor, or a flux surface a set distance into the SOL might need to also enter " "such a structure.", "full_path": "pulse_schedule/position_control/flux_gates", }, 'pulse_schedule.position_control.flux_gates[:].name': { "data_type": "STR_0D", "documentation": "Name or label for this gate", "full_path": "pulse_schedule/position_control/flux_gates(igate)/name", }, 'pulse_schedule.position_control.flux_gates[:].r1': { "data_type": "FLT_0D", "documentation": "R coordinate of first endpoint of the gate.", "full_path": "pulse_schedule/position_control/flux_gates(igate)/r1", "units": "m", }, 'pulse_schedule.position_control.flux_gates[:].r2': { "data_type": "FLT_0D", "documentation": "R coordinate of second endpoint of the gate.", "full_path": "pulse_schedule/position_control/flux_gates(igate)/r2", "units": "m", }, 'pulse_schedule.position_control.flux_gates[:].z1': { "data_type": "FLT_0D", "documentation": "Z coordinate of first endpoint of the gate.", "full_path": "pulse_schedule/position_control/flux_gates(igate)/z1", "units": "m", }, 'pulse_schedule.position_control.flux_gates[:].z2': { "data_type": "FLT_0D", "documentation": "Z coordinate of second endpoint of the gate.", "full_path": "pulse_schedule/position_control/flux_gates(igate)/z2", "units": "m", }, 'pulse_schedule.position_control.flux_gates[:].applies_to_psin': { "data_type": "FLT_1D", "documentation": "normalized psi values of flux surfaces that must pass through this gate." "If no flux surfaces are listed under any system, it is assumed that the psi_n = 1 (LCFS) " "surface must clear the gate.", "full_path": "pulse_schedule/position_control/flux_gates(igate)/applies_to_psin(:)", }, 'pulse_schedule.position_control.flux_gates[:].applies_to_dr_mid': { "data_type": "FLT_1D", "documentation": "Specify flux surfaces that must pass through this gate by giving the " "R value of their intersection with the outboard midplane, relative to the LCFS " "(the LCFS is 0, and 0.01 is 1 cm out into the SOL. " "If no flux surfaces are listed under any system, it is assumed that the psi_n = 1 (LCFS) " "surface must clear the gate.", "full_path": "pulse_schedule/position_control/flux_gates(igate)/applies_to_dr_mid(:)", "units": "m", }, 'pulse_schedule.position_control.flux_gates[:].time_range': { "data_type": "FLT_1D", "documentation": "Specify the time range when this flux gate should be considered " "by providing the start and end of the range as a two element sequence. The gate applies " "to all times if this settings is missing or if the end time isn't after the start time.", "full_path": "pulse_schedule/position_control/flux_gates(igate)/time_range(:)", "units": "s", }, } } if not hasattr(omas.omas_utils, '_extra_structures'): omas.omas_utils._extra_structures = {} for _ids in _extra_structures: for _item in _extra_structures[_ids]: _extra_structures[_ids][_item]['lifecycle_status'] = 'omfit' omas.omas_utils._extra_structures.setdefault(_ids, {}).update(_extra_structures[_ids])
[docs]def add_generic_OMFIT_info_to_ods(ods, root=None): """ This function will fill in information in the ``ids_properties`` and ``code`` section of the input ``ods`` and return the updated ``ods`` :param ods: An omas ods instance :param root: An OMFITmodule, from which to extract the commit :returns: The updated ods """ if not len(ods.location): for ds in ods: if ds != 'dataset_description': add_generic_OMFIT_info_to_ods(ods[ds], root=root) return ods prop = ods['ids_properties'] prop['source'] = 'OMFIT project: ' + OMFIT.filename prop['provider'] = OMFIT['MainSettings']['SETUP']['email'] prop['creation_date'] = utils_base.now() code = ods['code'] code['name'] = 'OMFIT' if root is not None: code['commit'] = root['SETTINGS']['MODULE']['commit'] code['name'] = 'OMFIT %s module' % (root['SETTINGS']['MODULE']['ID']) else: code['commit'] = repo("log -1 --pretty='%H'").strip() code['version'] = repo('describe') code['repository'] = ':'.join([socket.gethostname(), OMFITsrc]) # this might be time dependent # code['output_flag'] = -1 # This means to not use this data; change to 0 to start using return ods
[docs]def add_experiment_info_to_ods(ods, root=None): """ This function will fill in information in the `info` about the machine/pulse :param ods: An omas ods instance :param root: An OMFITmodule, from which to extract the machine/pulse info :returns: The updated ods """ # add machine/pulse info if root is not None: experiment = root['SETTINGS']['EXPERIMENT'] else: experiment = OMFIT['MainSettings']['EXPERIMENT'] ods['dataset_description.data_entry.machine'] = experiment['device'] ods['dataset_description.data_entry.pulse'] = experiment['shot'] return ods
[docs]def ensure_consistent_experiment_info(ods, device, shot): """ Ensures that the ODS, device, and shot are consistent If machine or pulse are not set, they are set to the provided device / shot. If they are set but inconsistent, AssertionError is raised. :param ods: ODS instance :param device: str :param shot: int """ machine = ods['dataset_description.data_entry'].setdefault('machine', tokamak(device)) pulse = ods['dataset_description.data_entry'].setdefault('pulse', shot) assert is_device( ods['dataset_description.data_entry.machine'], device ), f"This ODS is for a different device ({machine}). We shouldn't load {device} data here." assert ( ods['dataset_description.data_entry.pulse'] == shot ), f"This ODS is for a different shot ({pulse}). We shouldn't load {shot} data here."
[docs]def verify_ods_eq_code_params_ts(ods): """ Ensures that the ODS contains code.params.equilibrium.time_slice If any are missing an AssertionError is raised. :param ods: ODS instance """ assert isinstance(ods, (ODS, OMFITods)), "Input is not ods type." assert 'equilibrium' in ods, "ODS must contain equilibrium." assert 'code' in ods['equilibrium'], "Equilibrium ods must contain code." assert 'parameters' in ods['equilibrium']['code'], "Code ods must contain parameters." assert 'time_slice' in ods['equilibrium']['code']['parameters'], "Parameters ods must contain time_slice."
[docs]def add_hardware_to_ods(ods, device, pulse, hw_sys, overwrite=False): """ Adds a single hardware system's info to an ODS (operates in place) :param ods: ODS instance :param device: string :param pulse: int :param hw_sys: string :param overwrite: bool :return: updated ods """ device = tokamak(device, output_style='GPEC').lower() printd(f'Adding hardware to ODS; device = {device}, pulse = {pulse}, hw_sys = {hw_sys}', topic='omfit_omas') if not check_okay_to_write(ods, hw_sys, overwrite=overwrite): printw('Hardware location data for {} already in ODS. Skipping.'.format(hw_sys)) return if hw_sys in ['magnetics', 'pulse_schedule', 'position_control']: setup_function_name = f'setup_{hw_sys}_hardware_description_general' setup_func_kw = {'device': tokamak(device)} else: setup_function_name = f"setup_{hw_sys}_hardware_description_{device}" exec(f'from omfit_classes.omfit_omas_{device} import {setup_function_name}', globals(), locals()) setup_func_kw = {} setup_function = eval(setup_function_name, globals(), locals()) printd('Setting up hardware description for {}...'.format(hw_sys)) epilogue = setup_function(ods, pulse, **setup_func_kw) for postcommand in epilogue.get('postcommands', []): eval(postcommand) return ods
def interpret_strike_point(eqi, lower_bias=None): """ Handles differences in recording strike point positions as RVSOUT, etc. vs RVSOD, etc. Determines whether the equilibrium is upper or lower biased to set which of the (upper/lower) strike point sets is connected to the primary separatrix. Then finds whichever definition exists and uses it to define as much as possible of the missing defintion. That is, if RVSOUT, ZVSOUT is provided and ZXPT1 < 0, then RVSOD = RVSOUT, ZVSOD = ZVSOUT, etc. We'll call these the OD definition (RVSOD) and the OI definition (RVSOUT/RVSIN). :param eqi: dict Dictionary of equilibrium information. Must contain one of the strike point definitions as well as ZXPT1. :param lower_bias: bool [optional] Backup plan in case you don't have ZXPT1 but you want the translation to be done for an upper/lower biased plasma, anyway. :return: dict The same dictionary that is input. Items are filled in in-place, so catching the return is not necessary. """ if eqi is None: printd('Unable to interpret stirke point data with no equilibrium information.') return eqi eqi_upper = {k.upper(): v for k, v in eqi.items()} t = eqi_upper['ATIME'] blank = np.empty(len(t)) blank[:] = np.NaN # Determine which definition(s) are defined ou_req_keys = [f'RVSOU', f'ZVSOU', f'RVSIU', f'ZVSIU'] od_req_keys = [f'RVSOD', f'ZVSOD', f'RVSID', f'ZVSID'] oi_req_keys = ['RVSOUT', 'ZVSOUT', 'RVSIN', 'ZVSIN'] ou_defined = all([k.upper() in eqi_upper and eqi_upper[k.upper()] is not None for k in ou_req_keys]) od_defined = all([k.upper() in eqi_upper and eqi_upper[k.upper()] is not None for k in od_req_keys]) oi_defined = all([k.upper() in eqi_upper and eqi_upper[k.upper()] is not None for k in oi_req_keys]) # Prep output areas or make excuses and return early if (od_defined or ou_defined) and oi_defined: printd( 'Both strike point definitions (RVSOUT,... and RVSOD,...) are already populated. ' 'There is no need to translate from one style to another.' ) return eqi elif (od_defined or ou_defined) and not oi_defined: printd( 'Using the RVSOD style strike point names to define RVSOUT style strike point names. ' 'A complete set of pointnames should be possible.' ) eqi['RVSOUT'] = copy.copy(blank) eqi['ZVSOUT'] = copy.copy(blank) eqi['RVSIN'] = copy.copy(blank) eqi['ZVSIN'] = copy.copy(blank) elif not (od_defined or ou_defined) and oi_defined: printd( 'Using the RVSOUT style strike point names to define RVSOD style strike point names. ' 'There is not enough information to define the secondary strike points this way.' ) if eqi.get('RVSOD', None) is None: eqi['RVSOD'] = copy.copy(blank) eqi['ZVSOD'] = copy.copy(blank) eqi['RVSID'] = copy.copy(blank) eqi['ZVSID'] = copy.copy(blank) if eqi.get('RVSOU', None) is None: eqi['RVSOU'] = copy.copy(blank) eqi['ZVSOU'] = copy.copy(blank) eqi['RVSIU'] = copy.copy(blank) eqi['ZVSIU'] = copy.copy(blank) else: # elif not od_defined and not oi_defined: printe( 'Neither strike point defintion style (RVSOUT,... and RVSOD,...) is populated. ' 'There is no way to define either style of strike point position pointname.' ) return eqi # Prepare information for picking upper vs lower bias zxpt1 = eqi_upper.get('ZXPT1', None) if zxpt1 is None: if lower_bias is None: raise ValueError( 'Equilibrium information dictionary did not have enough information to ' 'translate between RVSOUT and RVSOD type strike point position pointnames.' ) elif lower_bias: zxpt1 = -1 + t * 0 else: zxpt1 = 1 + t * 0 for i in range(len(t)): # Pick upper vs lower bias for this slice if zxpt1[i] <= 0: ud = 'D' else: ud = 'U' # Do the translation for this slice if (od_defined or ou_defined) and not oi_defined: for rz in 'RZ': for io, inout in zip(['I', 'O'], ['IN', 'OUT']): eqi[f'{rz}VS{inout}'][i] = eqi_upper[f'{rz}VS{io}{ud}'][i] elif not od_defined and oi_defined: for rz in 'RZ': for io, inout in zip(['I', 'O'], ['IN', 'OUT']): eqi[f'{rz}VS{io}{ud}'][i] = eqi_upper[f'{rz}VS{inout}'][i] # Replace R = 0 with NaN, because R = 0 is physically unreasonable and means failure to find the strike pt. for io, inout in zip(['I', 'O'], ['IN', 'OUT']): if not (eqi[f'RVS{inout}'][i] > 0): eqi[f'RVS{inout}'][i] = np.NaN eqi[f'ZVS{inout}'][i] = np.NaN for ud_ in 'UD': if not (eqi[f'RVS{io}{ud_}'][i] > 0): eqi[f'RVS{io}{ud_}'][i] = np.NaN eqi[f'ZVS{io}{ud_}'][i] = np.NaN return eqi def filter_equilibrium_info_time_slices(eqi): """ Given a dictionary of equilibrium information, perform filters to remove bad time-slices Bad is defined by 0 plasma current or 0 points in the boundary outline. The keys are case insensitive. :param eqi: dict Dictionary of equilibrium information. Must include 1D arrays: cpasma (plasma current in A) nbbbs (number of boundary points) :return: dict The updated equilibrium information dictionary The dictionary is edited in-place, so you don't have to catch the return """ # Track how to handle quantities vs. time. Don't do AEQDSK because it has a different timebase. time_varying_1d = ['time', 'cpasma', 'bcentr', 'ssimag', 'ssibry', 'rmaxis', 'zmaxis', 'rzero', 'nbbbs'] time_varying_2d = ['rhovn', 'rbbbs', 'zbbbs', 'qpsi', 'pres', 'ffprim', 'pprime', 'fpol'] time_varying_3d = ['psirz'] time_invariant = ['r', 'z', 'lim', 'rlim', 'zlim'] # Prepare a dict with all lowercase keys eqi_lower = {} original_keys = {} for k, v in eqi.items(): eqi_lower[k.lower()] = v original_keys[k.lower()] = k # Do the filtering wt = np.ones(len(eqi_lower['cpasma']), bool) # Time-slice filter bad_slice = abs(eqi_lower['cpasma']) < 0.1 # A bad_slice |= eqi_lower['nbbbs'] == 0 wt = ~bad_slice for quantity in time_varying_1d + time_varying_2d + time_varying_3d: if quantity in eqi_lower: if eqi_lower[quantity] is not None: eqi[original_keys[quantity]] = eqi_lower[quantity][wt] return eqi
[docs]def multi_efit_to_omas(device, pulse, efitid, ods=None, minimal=False, aeqdsk_time_diff_tol=0.1, no_empty=False, **kw): r""" Writes equilibrium data from MDSplus to ODS :param device: string :param pulse: int :param efitid: string :param ods: ODS instance A New ODS will be created if this is None :param minimal: bool Only gather and add enough data to run a cross-section plot :param aeqdsk_time_diff_tol: float Time difference in ms to allow between GEQDSK and AEQDSK time bases, in case they don't match exactly. GEQDSK slices where the closest AEQDSK time are farther away than this will not have AEQDSK data. :param no_empty: bool Remove empty GEQDSK slices from the result. Sometimes EFITs are loaded with a few invalid/empty slices (0 current, no boundary, all psi points = 0, etc.). This option will discard those slices before loading results into an ODS. :param \**kw: Additional keywords to read_basic_eq_from_mds() But not the quantities to gather as those are set explicitly in this function. :return: ODS instance The edit is done in-place, so you don't have to catch the output if you supply the input ODS On fail: empty ODS is returned """ from omfit_classes.omfit_eqdsk import read_basic_eq_from_mds # Setup and gather -------------------------------------------------------------------------------------------- if ods is None: ods = ODS() # Prepare for gathering gfq_essential = [ 'r', # Needed for interpreting 2D psi grid 'z', # Needed for interpreting 2D psi grid 'psirz', # Primary measurement: 2D psi(r,z) (actually 3D as funct of t, r, z) 'lim', # Primary measurement: limiting surface 'rlim', # Alternative to lim for some devices 'zlim', # Goes with Rlim; together they are an alternative for lim 'rmaxis', # Primary measurement: Magnetic axis coordinates 'zmaxis', # Primary measurement: Magnetic axis coordinates 'rbbbs', # Primary measurement: core plasma boundary outline 'zbbbs', # Primary measurement: core plasma boundary outline 'nbbbs', # Needed for interpreting core plasma boundary outline 'ssimag', # Needed for normalizing psirz 'ssibry', # Needed for normalizing psirz 'rhovn', # Primary measurement: square root of toroidal flux rho_N 'qpsi', # Primary measurement: safety factor q 'bcentr', # Needed for determining COCOS 'cpasma', # Needed for determining COCOS ] gfq_important = ['pres', 'ffprim', 'pprime', 'rzero', 'fpol'] afq_x = ['RXPT1', 'ZXPT1', 'RXPT2', 'ZXPT2'] afq_strike = ['{rz:}VS{io:}{ud:}'.format(rz=rz, io=io, ud=ud) for rz in 'RZ' for io in 'IO' for ud in 'UD'] afq_strike += ['RVSOUT', 'RVSIN', 'ZVSOUT', 'ZVSIN'] # Some devices use these instead of RVSOD, etc. mfq_essential = [] mfq_important = ['rrgam', 'zzgam', 'tangam', 'siggam', 'chigam', 'cmgam', 'fwtgam'] derived_essential = ['time'] derived_important = ['Br', 'Bz'] if minimal: use_gfq = gfq_essential use_mfq = mfq_essential use_derived = derived_essential else: use_gfq = gfq_essential + gfq_important use_mfq = mfq_essential + mfq_important use_derived = derived_essential + derived_important # Get basic data eqi = read_basic_eq_from_mds( device=device, shot=pulse, tree=efitid, g_file_quantities=use_gfq, a_file_quantities=afq_x + afq_strike + ['atime'], measurements=use_mfq, derived_quantities=use_derived, **kw, ) if eqi is None: printe('Try resetting SSH tunnels and gathering again.') return ods # Preliminary calculations ------------------------------------------------------------------------------------ # Do some post-processing and filtering eqi = interpret_strike_point(eqi) if no_empty: eqi = filter_equilibrium_info_time_slices(eqi) # Use rho(psi_N) and psi_N(R, Z) to get rho(R, Z) nr = np.shape(eqi['r'])[-1] if eqi['rhovn'] is None: rho2d = None else: rho2d = np.zeros(np.shape(eqi['psirz'])) for i in range(len(eqi['time'])): if eqi['ssimag'][i] == eqi['ssibry'][i]: # Dummy psi1d to prevent interpolation from failing completely. It'll be a junk answer either way. psi1d = np.linspace(eqi['ssimag'][i] - 0.1, eqi['ssimag'][i] + 0.1, nr) else: psi1d = np.linspace(eqi['ssimag'][i], eqi['ssibry'][i], nr) rho2d[i, :, :] = interp1d(psi1d, eqi['rhovn'][i, :], bounds_error=False, fill_value='extrapolate')(eqi['psirz'][i, :, :]) # Deal with X-points blank = np.empty(len(eqi['time'])) blank[:] = np.NaN rx1 = eqi['RXPT1'] if eqi['RXPT1'] is not None else blank zx1 = eqi['ZXPT1'] if eqi['ZXPT1'] is not None else blank rx2 = eqi['RXPT2'] if eqi['RXPT2'] is not None else blank zx2 = eqi['ZXPT2'] if eqi['ZXPT2'] is not None else blank badx1 = rx1 < 0 badx2 = rx2 < 0 rx1[badx1] = np.NaN zx1[badx1] = np.NaN rx2[badx2] = np.NaN zx2[badx2] = np.NaN # Load data to ODS -------------------------------------------------------------------------------------------- # Load basic and global information ods['equilibrium.time'] = eqi['time'] / 1000 # ms to seconds ods['dataset_description.data_entry.pulse'] = pulse ods['dataset_description.ids_properties.comment'] = 'Equilibrium data loaded from {}#{}, EFIT tree = {}'.format(device, pulse, efitid) if (eqi['lim'] is None) and (eqi['rlim'] is not None) and (eqi['zlim'] is not None): # Some devices populate rlim and zlim instead of lim. We have to converge on a consistent convention. rlim = eqi['rlim'] zlim = eqi['zlim'] if len(np.shape(rlim)) == 2: # Some devices have stored the limiter at each time point, making it 2D vs time and space. # But we hope the limiter doesn't actually change vs time, so we'll ignore the time varaition. rlim = rlim[0, :] zlim = zlim[0, :] eqi['lim'] = np.array([rlim, zlim]).T if ('wall' not in ods) and (eqi['lim'] is not None): s = ods['wall.description_2d.0.limiter'] s['type.description'] = 'first wall' s['type.name'] = 'first_wall' s['type.index'] = 0 s['unit.0.outline.r'] = eqi['lim'][:, 0] s['unit.0.outline.z'] = eqi['lim'][:, 1] # Determine COCOS of gEQDSK data saved in MDSplus native_cocos = gEQDSK_COCOS_identify(np.nanmean(eqi['bcentr']), np.nanmean(eqi['cpasma'])) # Load data for each time slice with omas_environment(ods, cocosio=native_cocos): for i, t in enumerate(eqi['time']): if len(np.shape(eqi['r'])) == 2: rgrid = eqi['r'][i] zgrid = eqi['z'][i] else: rgrid = eqi['r'] zgrid = eqi['z'] s = ods['equilibrium.time_slice.{}'.format(i)] s['time'] = t / 1000.0 # ms to sec s['boundary.outline.r'] = eqi['rbbbs'][i, : int(eqi['nbbbs'][i])] s['boundary.outline.z'] = eqi['zbbbs'][i, : int(eqi['nbbbs'][i])] # aEQDSK data can be saved with a different time base than GEQDSK (thanks for that), so find a separate index. if 'atime' in eqi: dtga = abs(t - eqi['atime']) ia = dtga.argmin() if dtga[ia] > aeqdsk_time_diff_tol: ia = None else: assert len(zx1) == len( eqi['time'] ), 'If no "atime" is available in equilibrium data, length of GEQDSK histories must match length of AEQDSK histories.' ia = i if ia is not None: # This GEQDSK slice has an AEQDSK slice that's within range, so put in the A data # X-points if zx1[ia] > 0 or zx2[ia] < 0: # X-points are swapped: 1 is upper, 2 is lower. Load them in reverse. s['boundary.x_point.0.r'] = rx2[ia] s['boundary.x_point.0.z'] = zx2[ia] s['boundary.x_point.1.r'] = rx1[ia] s['boundary.x_point.1.z'] = zx1[ia] else: # X-points are already in the preferred order: 1 is lower, 2 is upper. s['boundary.x_point.0.r'] = rx1[ia] s['boundary.x_point.0.z'] = zx1[ia] s['boundary.x_point.1.r'] = rx2[ia] s['boundary.x_point.1.z'] = zx2[ia] # Strike points for j, strike in enumerate(['VSOD', 'VSID', 'VSOU', 'VSIU']): if eqi['R' + strike] is not None: rval = eqi['R' + strike][ia] zval = eqi['Z' + strike][ia] else: rval = zval = np.NaN if rval > 0: s['boundary.strike_point'][j]['r'] = rval s['boundary.strike_point'][j]['z'] = zval else: s['boundary.strike_point'][j]['r'] = s['boundary.strike_point'][j]['z'] = np.NaN # gEQDSK data # ============0D s['global_quantities.magnetic_axis.r'] = eqi['rmaxis'][i] s['global_quantities.magnetic_axis.z'] = eqi['zmaxis'][i] s['global_quantities.psi_axis'] = eqi['ssimag'][i] s['global_quantities.psi_boundary'] = eqi['ssibry'][i] s['global_quantities.ip'] = eqi['cpasma'][i] # ============0D time dependent vacuum_toroidal_field if not minimal: ods['equilibrium.vacuum_toroidal_field.r0'] = eqi['rzero'][i] ods.set_time_array('equilibrium.vacuum_toroidal_field.b0', i, eqi['bcentr'][i]) # ============1D s['profiles_1d.psi'] = np.linspace(eqi['ssimag'][i], eqi['ssibry'][i], len(rgrid)) if not minimal: s['profiles_1d.f'] = eqi['fpol'][i, :] if rho2d is not None: s['profiles_1d.phi'] = eqi['rhovn'][i, :] ** 2 s['profiles_1d.rho_tor_norm'] = s['profiles_1d.rho_tor'] = eqi['rhovn'][i, :] if not minimal: s['profiles_1d.pressure'] = eqi['pres'][i, :] s['profiles_1d.f_df_dpsi'] = eqi['ffprim'][i, :] s['profiles_1d.dpressure_dpsi'] = eqi['pprime'][i, :] s['profiles_1d.q'] = eqi['qpsi'][i, :] # ============2D s['profiles_2d.0.psi'] = eqi['psirz'][i, :, :].T if rho2d is not None: s['profiles_2d.0.phi'] = (rho2d[i, :, :] ** 2).T s['profiles_2d.0.grid_type.index'] = 1 s['profiles_2d.0.grid.dim1'] = rgrid s['profiles_2d.0.grid.dim2'] = zgrid if not minimal: if 'Br' in eqi: s['profiles_2d.0.b_field_r'] = eqi['Br'][i, :, :].T if 'Bz' in eqi: s['profiles_2d.0.b_field_z'] = eqi['Bz'][i, :, :].T # mEQDSK data # ============ if not minimal and 'tangam' in eqi and 'rrgam' in eqi: # The array of channels can be padded with a bunch of empties at the end. valid_channels = np.where(abs(eqi['tangam'][i, :]) > 0)[0] if len(valid_channels): lastchan_num = valid_channels[-1] + 1 for j in range(lastchan_num): measured = unumpy.arctan(ufloat(eqi['tangam'][i, j], abs(eqi['siggam'][i, j]))) # unumpy.arctan was returning an object of type = uncertainties.core.AffineScalarFunc measured = ufloat(nominal_values(measured), std_devs(measured)) s[f'constraints.mse_polarisation_angle.{j}.measured'] = measured s[f'constraints.mse_polarisation_angle.{j}.reconstructed'] = np.arctan(eqi['cmgam'][i, j]) s[f'constraints.mse_polarisation_angle.{j}.time_measurement'] = t * 1e-3 s[f'constraints.mse_polarisation_angle.{j}.chi_squared'] = eqi['chigam'][i, j] s[f'constraints.mse_polarisation_angle.{j}.weight'] = eqi['fwtgam'][i, j] s[f'constraints.mse_polarisation_angle.{j}.source'] = f'mse.channel.{j}' # mEQDSK data # ============ if not minimal and 'tangam' in eqi and 'rrgam' in eqi: # The array of channels can be padded with a bunch of empties at the end. valid_channels = np.where(np.sum(abs(eqi['tangam']), axis=0) > 0)[0] if len(valid_channels): lastchan_num = valid_channels[-1] + 1 for j in range(lastchan_num): ods[f'mse.channel.{j}.polarisation_angle.time'] = eqi['time'] * 1e-3 ods[f'mse.channel.{j}.polarisation_angle.data'] = unumpy.arctan(uarray(eqi['tangam'][:, j], abs(eqi['siggam'][:, j]))) ods[f'mse.channel.{j}.active_spatial_resolution.0.centre.r'] = eqi['rrgam'][0, j] ods[f'mse.channel.{j}.active_spatial_resolution.0.centre.z'] = eqi['zzgam'][0, j] return ods
def check_shot_time(ref_shot, ref_time, gamk_file, raise_on_bad=True): """ Checks for a match in shot and time(s) between a reference shot and time and data from G,A,M, or K file(s) :param ref_shot: int :param ref_time: float or float array :param gamk_file: OMFITgeqdsk, OMFITaeqdsk, OMFITmeqdsk, OMFITkeqdsk or dict containing one or more of those If providing a dict, times should be in order and only one type of file should be provided at a time :param raise_on_bad: bool Raises an exception if the input isn't even the right type of file instead of just returning False :return: bool True if shot and time are consistent """ from omfit_classes.omfit_eqdsk import OMFITgeqdsk, OMFITaeqdsk, OMFITmeqdsk, OMFITkeqdsk printd(f'Checking ref shot {ref_shot} & time {ref_time} vs. G,A,M, or K files.', topic='omfit_omas_utils') if isinstance(gamk_file, OMFITgeqdsk): try: shot = int(gamk_file['CASE'][3].split('#')[-1]) t = float(gamk_file['CASE'][4].split('ms')[0]) except KeyError: printw('Unable to read g-file shot/time while trying to verify shot/time match.') shot = t = None elif isinstance(gamk_file, OMFITaeqdsk): shot = gamk_file.get('shot', None) t = gamk_file.get('time', None) elif isinstance(gamk_file, OMFITmeqdsk): shot = gamk_file.get('shot', {}).get('data', [None])[0] t = gamk_file.get('time', {}).get('data', [None])[0] elif isinstance(gamk_file, OMFITkeqdsk): shot = gamk_file.get('IN1', {}).get('ISHOT', None) t = gamk_file.get('IN1', {}).get('ITIME', None) elif isinstance(gamk_file, dict): vf = [v for v in gamk_file.values() if isinstance(v, (OMFITgeqdsk, OMFITaeqdsk, OMFITmeqdsk, OMFITkeqdsk))] if not len(vf): if raise_on_bad: raise OmasUtilsBadInput("dict did not contain any usable files") else: return False if ref_time is None: ref_time = [None] * len(vf) if not hasattr(ref_time, 'len'): ref_time = tolist(ref_time) if len(vf) != len(ref_time): return False return all([check_shot_time(ref_shot, ref_time[i], v) for i, v in enumerate(vf)]) else: if raise_on_bad: raise OmasUtilsBadInput("Input file was not recognized as g/a/m/k-eqdsk or dict containing those.") else: return False match = (shot == ref_shot) & (t == ref_time) printd(f'ref shot {ref_shot} vs {shot} & ref time {ref_time} vs {t}: match={match}', topic='omfit_omas_utils') return match # The master setup / data loading function
[docs]def setup_hardware_overlay_cx( device, pulse=None, time=None, efitid='EFIT01', geqdsk=None, aeqdsk=None, meqdsk=None, keqdsk=None, overwrite=False, default_load=True, minimal_eq_data=True, no_empty=False, **kw, ): r""" Sets up an OMAS ODS so that it can power a cross section view with diagnostic/hardware overlays. This involves looking up and writing locations of various hardware to the ODS. :param device: string Which tokamak? :param pulse: int Which pulse number? Used for gathering equilibrium and for looking up hardware configuration as it could vary with time. :param time: int or float array [optional] Time (ms) within the pulse, used for looking up equilibrium only If None and no gEQDSKs, try to get all times from MDSplus :param efitid: string EFIT SNAP file or ID tag in MDS plus: used for looking up equilibrium only :param geqdsk: OMFITgeqdsk instance or dict-like containing OMFITgeqdsk instance(s) (optional) Provides EFIT instead of lookup using device, pulse, time, efitid. efitid will be ignored completely. device and pulse will still be used to look up hardware configuration. time might be used. Providing inconsistent data may produce confusing plots. :param aeqdsk: OMFITaeqdsk instance or dict-like containing OMFITaeqdsk instance(s) (optional) Provides an option to load aeqdsk data to OMAS. Requirements: - geqdsk(s) are being used as the source for basic data - aeqdsk shot and all aeqdsk times match geqdsk shot and times exactly - OMFITaeqdsk has a to_omas() method (not implemented yet as of 2021-11-12) :param meqdsk: OMFITmeqdsk instance or dict-like containing OMFITmeqdsk instance(s) (optional) Provides an option to load meqdsk data to OMAS. Requirements: - geqdsk(s) are being used as the source for basic data - meqdsk shot and all meqdsk times match geqdsk shot and times exactly :param keqdsk: OMFITkeqdsk instance or dict-like containing OMFITkeqdsk instance(s) (optional) Provides an option to load meqdsk data to OMAS. Requirements: - geqdsk(s) are being used as the source for basic data - keqdsk shot and all keqdsk times match geqdsk shot and times exactly :param overwrite: bool Flag indicating whether it is okay to overwrite locations if they already exist in ODS :param default_load: bool Default action to take for loading a system. For example, \**kw lets you explicitly set gas_injection=False to prevent calling setup_gas_injection_hardware_description. But for systems which aren't specified, the default action (True/False to load/not load) is controlled by this parameter. :param minimal_eq_data: bool Skip loading all the equilibrium data needed to recreate GEQDSK files and only get what's needed for plots. :param no_empty: bool Filter out equilibrium time-slices that have 0 current or 0 boundary outline points. (this is passed to multi_efit_to_omas()) :param \**kw: keywords dictionary Disable gathering/setup of data for individual hardware systems by setting them to False using their names within IMAS. For example: gas_injection=False will prevent the call to setup_gas_injection_hardware_description(). :return: OMAS ODS instance containing the data you need to draw a cross section w/ diagnostic/hardware overlays. """ from omfit_classes.omfit_eqdsk import OMFITgeqdsk, OMFITaeqdsk, OMFITmeqdsk, OMFITkeqdsk quiet = kw.pop('quiet', False) def case_info_to_str(case_info): device_info = f" {case_info['device']}" if case_info['device'] is not None else '' shot_info = f"#{case_info['shot']}" if case_info['shot'] is not None else '' efitid_info = f" {case_info['efitid']}" if case_info['efitid'] is not None else '' return f"Equilibrium data from GEQDSK {device_info}{shot_info}{efitid_info}" # Equilibrium if (geqdsk is None) and (tolist(time)[0] is not None) and len(tolist(time)) == 1: # Single time assert pulse is not None, 'Must supply either valid gEQDSK or valid device, pulse, time, efitid (bad pulse)' assert time is not None, 'Must supply either valid gEQDSK or valid device, pulse, time, efitid (bad time)' assert efitid is not None, 'Must supply either valid gEQDSK or valid device, pulse, timeF, efitid (bad efitid)' geqdsk = OMFITgeqdsk('g{:06d}.{:05d}'.format(int(pulse), int(time))).from_mdsplus( device=device, shot=pulse, time=time, SNAPfile=efitid ) ods = geqdsk.to_omas() ods['dataset_description.ids_properties.comment'] = 'Equilibrium data loaded from {}#{}, EFIT tree = {}'.format( device, pulse, efitid ) elif geqdsk is None: # Load all times so we can switch quickly later ods = multi_efit_to_omas(device, pulse, efitid, minimal=minimal_eq_data, no_empty=no_empty) elif isinstance(geqdsk, OMFITgeqdsk): # Single time from single OMFITgeqdsk try: g_pulse = int(geqdsk['CASE'][3].split('#')[-1]) g_time = float(geqdsk['CASE'][4].split('ms')[0]) except KeyError: printw('Unable to confirm that g-file shot/time match other settings.') g_pulse = g_time = None else: pulse, time = g_pulse, g_time print('pulse & time have been reassigned from geqdsk contents: pulse = {}, time = {}'.format(pulse, time)) ods = geqdsk.to_omas() ods['dataset_description.ids_properties.comment'] = case_info_to_str(geqdsk.case_info()) for amk, amk_file in zip(['a', 'm', 'k'], [aeqdsk, meqdsk, keqdsk]): if isinstance(amk_file, eval(f'OMFIT{amk}eqdsk')): if not check_shot_time(g_pulse, g_time, amk_file): printe(f'{amk}eqsk file was provided, but its shot and time did not match the geqdsk.') else: if hasattr(amk_file, 'to_omas'): amk_file.to_omas(ods) else: printw(f'{amk}eqdsk file does not have a `to_omas()` method; skipping load.') else: printd(f'{amk}eqdsk is not instance of OMFIT{amk}eqdsk; skipping....') elif isinstance(geqdsk, dict): # Container that should have some OMFITgeqdsk instances g_keys = [k for k, v in geqdsk.items() if isinstance(v, OMFITgeqdsk)] if len(g_keys): ods = None for i, k in enumerate(g_keys): if ods is None: ods = geqdsk[k].to_omas(time_index=i) else: geqdsk[k].to_omas(ods=ods, time_index=i) ods['dataset_description.ids_properties.comment'] = case_info_to_str(geqdsk[g_keys[0]].case_info()) case = geqdsk[g_keys[0]]['CASE'] try: g_pulse = int(geqdsk[g_keys[0]]['CASE'][3].split('#')[-1]) g_times = np.array([float(geqdsk[k]['CASE'][4].split('ms')[0]) for k in g_keys]) except KeyError: printw('Unable to confirm that g-file shot/time match other settings.') g_pulse = g_times = None else: pulse, time = g_pulse, g_times for amk, amk_files in zip(['a', 'm', 'k'], [aeqdsk, meqdsk, keqdsk]): if amk_files is not None and isinstance(amk_files.values()[0], eval(f'OMFIT{amk}eqdsk')): if not check_shot_time(g_pulse, g_times, amk_files): printe(f'{amk}eqsk file was provided, but its shot and times did not match the geqdsk.') else: if hasattr(amk_files.values()[0], 'to_omas'): for i, amk_f in enumerate(amk_files.values()): amk_f.to_omas(ods, time_index=i) else: printw(f'{amk}eqdsk file does not have a `to_omas()` method; skipping load.') else: printd( f'First {amk}eqdsk is not instance of OMFIT{amk}eqdsk ' f'(type is {type(amk_files.values()[0]) if amk_files is not None else None}); skipping....' ) else: raise OMFITexception( 'No OMFITgeqdsk instances were found in the container! If you want to provide a dict-like object ' 'as geqdsk, it should contain some geqdsks!' ) else: raise OMFITexception( f'Unrecognized type for geqdsk: ({type(geqdsk)}) . ' f'Should be None to draw from device/shot in MDSplus, OMFITgeqdsk to use one equilibrium slice, ' f'or dict-like containing OMFITgeqdsk instances to draw from a set of OMFITgeqdsks.' ) ods['dataset_description.data_entry.machine'] = tokamak(device) ods['dataset_description.data_entry.pulse'] = pulse # Hardware # List subsets whose parent groups are going to be loaded; do not load these as it would be redundant subs = {'position_control': 'pulse_schedule'} dont_load = [] for sub in subs.keys(): if kw.get(subs[sub], default_load): dont_load += [sub] # Load hardware description data for hw_sys in omas.omas_utils.list_structures(ods.imas_version) + ['position_control']: if kw.pop(hw_sys, default_load) and hw_sys not in dont_load: add_hardware_to_ods(ods, device, pulse, hw_sys, overwrite=overwrite) if not quiet: print('Gathered data for hardware system: {}'.format(hw_sys)) else: if not quiet: print('Skipped data gathering for hardware system: {}'.format(hw_sys)) return ods
def check_okay_to_write(ods, hardware_name, overwrite=False): """ Check the status of the ODS before you overwrite existing data you care about! Used while setting up hardware information for the cross section overlay (setup_hardware_overlay_cx) :param ods: OMAS ODS instance :param hardware_name: string Name of the sub ODS for the hardware system of interest, like 'thomson_scattering' :param overwrite: bool Is overwriting allowed if the ODS contains locations only? Actual data will not be overwritten, regardless. :return: bool Flag indicating that it's okay to write locations, either because there are none for this category yet, or because you indicated that overwriting locations only (not data) would be okay. """ # Shortcuts to common position strings moving_pt = 'channel.0.position.r.data' # Measures at point which moves over time los_chan = 'channel.0.line_of_sight.first_point.r' # View-chord with some line-of-slight and 1st pt, etc. data_name = { # Data quantity to check to determine whether or not the ODS has data for this hardware system 'barometry': 'gauge.0.pressure.data', 'bolometer': 'channel.0.power.data', 'charge_exchange': 'channel.0.temperature.data', 'ec_antennas': 'antenna.0.power.data', 'ece': 'channel.0.t_e.data', 'equilibrium': 'time_slice.0.profiles_2d.0.psi', 'gas_injection': 'pipe.0.exit_position.flow_rate.data', 'ic_antennas': 'antenna.0.power.data', 'interferometer': 'channel.0.n_e_line.data', 'magnetics': 'bpol_probe.0.field.data', 'pf_active': 'coil.0.current.data', 'thomson_scattering': 'channel.0.t_e.data', 'langmuir_probes': 'embedded.0.ion_saturation_current.data', }.get(hardware_name, None) position = { # Position quantity to check to determine whether or not the ODS has hardware geometry information 'barometry': 'gauge.0.position.r', 'bolometer': los_chan, 'charge_exchange': moving_pt, 'ec_antennas': 'antenna.0.launching_position.r.data', 'ece': moving_pt, 'equilibrium': 'time_slice.0.profiles_2d.0.grid.dim1', 'gas_injection': 'pipe.0.exit_position.r', 'ic_antennas': 'antennas.0.strap.0.geometry.geometry_type.r', # "geometry_type" gets replaced 'interferometer': los_chan, 'magnetics': 'bpol_probe.0.position.r', 'pf_active': 'coil.0.element.0.geometry.geometry_type.r', # "geometry_type" gets replaced 'thomson_scattering': 'channel.0.position.r', 'langmuir_probes': 'embedded.0.position.r', }.get(hardware_name, None) geometry_type = { # The string "geometry_type" in position will be replaced by something like "rectangle" 'ic_antennas': 'antennas.0.strap.0.geometry.geometry_type', 'pf_active': 'coil.0.element.0.geometry.geometry_type', }.get(hardware_name, None) # Avoid overwriting existing data try: has_data = ods[hardware_name][data_name] is not None except (ValueError, LookupError, TypeError): has_data = False if has_data: # Writing new locations could produce inconsistency between T_e and R,Z data, so don't do it. print('This ODS already has {hw:} data loaded under {dat:}; skipping {hw:} location setup.'.format(hw=hardware_name, dat=data_name)) return False printd("Checked ods['{hw:}']['{dat:}'] and didn't find anything to overwrite.".format(hw=hardware_name, dat=data_name)) # Avoid overwriting existing hardware locations try: geo_type = ods[hardware_name][geometry_type] except (ValueError, LookupError, TypeError): geo_type = 'rectangle' # As good a default as any. # The alternative to geo_type lookup would be checking through all possibilities: "rectangle", "oblique", # etc., but if geometry_type isn't filled out, the position info isn't complete, anyway, so why bother? try: has_locs = ods[hardware_name][position.replace('geometry_type', geo_type)] is not None except (ValueError, LookupError, TypeError, AttributeError): has_locs = False if has_locs and not overwrite: print( 'This ODS already has {hw:} locations loaded under {pos:}; skipping {hw:} location setup ' '(call again with overwrite=True to reload locations).'.format(hw=hardware_name, pos=position) ) return False printd( "Checked ods['{hw:}']['{pos:}'] and didn't find anything to overwrite or overwrite was allowed.".format( hw=hardware_name, pos=position ) ) printd('Okay to write {} locations into ODS.'.format(hardware_name)) return True def get_first_wall(ods): """ Search for limiters with type description indicating a first wall. This is a utility used in the course of setup_hardware_overlay_cx. :param ods: OMAS ODS instance :return: array r and z arrays for wall outline """ desc = [ods['wall']['description_2d'][i]['limiter']['type']['description'] for i in ods['wall']['description_2d']] is_wall = [d in ['first wall', 'first_wall'] for d in desc] iwall = np.where(is_wall)[0][0] lim_outline = ods['wall']['description_2d'][iwall]['limiter']['unit'][0]['outline'] return lim_outline def trim_bolometer_second_points_to_box(ods, pad=None): """ Trims second points in bolometer array so that they are on the edge of a box around the data. That is, assuming the bolometer setup script crudely defined the second points in sightlines using the first point, an angle, and a length that would be long enough to span the whole tokamak interior, we will have second points well outside of the vessel for most chords. That's the way efitviewer sets up its chords at first. This function cleans up the results of such a crude setup. Don't use if you already set up your bolometers carefully. This is a utility used in the course of setup_hardware_overlay_cx. :param ods: OMAS ODS instance :param pad: float Amount of padding around the outside of the extrema of the limiting surface (m). Set to None to auto pick. :return: None (the ODS is edited in-place) """ # Pick a box around lim for the line endpoints lim_outline = get_first_wall(ods) box = np.array([lim_outline['r'].min(), lim_outline['z'].min(), lim_outline['r'].max(), lim_outline['z'].max()]) pad = 0.125 * ((box[2] - box[0]) + (box[3] - box[1])) / 2.0 if pad is None else pad box += np.array([-pad, -pad, pad, pad]) boxpath = np.array([[box[0], box[1]], [box[0], box[3]], [box[2], box[3]], [box[2], box[1]], [box[0], box[1]]]) for i in range(len(ods['bolometer']['channel'])): cls = ods['bolometer']['channel'][i]['line_of_sight'] ch_path = np.array([[cls['first_point.r'], cls['first_point.z']], [cls['second_point.r'], cls['second_point.z']]]) intersections = line_intersect(boxpath, ch_path) dist = [(inter[0] - cls['first_point.r']) ** 2 + (inter[1] - cls['first_point.z']) ** 2 for inter in intersections] cls['second_point.r'], cls['second_point.z'] = intersections[np.array(dist).argmax()] return def build_templates(template_file_search=os.sep.join([OMFITsrc, '..', 'modules', 'EFITtime', 'TEMPLATES', '*probe*'])): """ Creates a namelist of magnetic probe descriptions based on data in TEMPLATES :param template_file_search: string Filename of file or files with wildcards allowed. Will be processed with glob.glob to get a list of files. :return: dict Returns a dict containing OMFITnamelist instances, each with probe data """ from omfit_classes.omfit_namelist import OMFITnamelist d_templates = {} for template_file in glob.glob(template_file_search): file_root = '.'.join(os.path.split(template_file)[1].split('.')[:-1]) d_templates[file_root] = OMFITnamelist(template_file) return d_templates
[docs]def setup_magnetics_hardware_description_general(ods, pulse, device=None): """ Writes magnetic probe locations into ODS. :param ods: an OMAS ODS instance :param device: string Which tokamak? :param pulse: int :return: dict Information or instructions for follow up in central hardware description setup """ dprobe = get_mhdindat(device, pulse) if dprobe is None: printw('Warning: failed to look up magnetic probe configuration for device = {}, pulse = {}.'.format(device, pulse)) return {} if compare_version(ods.imas_version, '3.25.0') >= 0: # TODO: check exact version number where bpol_probe changed to b_field_pol_probe. I know it's after 3.23.2 bpol = 'b_field_pol_probe' # This could change in later versions of the IMAS schema. Please update. else: # IMAS schema 3.23.2 and earlier bpol = 'bpol_probe' nbp = len(dprobe['IN3']['XMP2']) for i in range(nbp): ods['magnetics'][bpol][i]['identifier'] = dprobe['IN3']['MPNAM2'][i] ods['magnetics'][bpol][i]['position.r'] = dprobe['IN3']['XMP2'][i] ods['magnetics'][bpol][i]['position.z'] = dprobe['IN3']['YMP2'][i] try: ods['magnetics'][bpol][i]['position.phi'] = float(dprobe['IN3']['MPNAM2'][i].strip()[-3:]) * np.pi / 180.0 except ValueError: printd('Failed to read angle measurement for b_field_pol_probe {}'.format(dprobe['IN3']['MPNAM2'][i])) nfl = len(dprobe['IN3']['RSI']) for i in range(nfl): ods['magnetics.flux_loop'][i]['identifier'] = dprobe['IN3']['LPNAME'][i] ods['magnetics.flux_loop'][i]['position.0.r'] = dprobe['IN3']['RSI'][i] ods['magnetics.flux_loop'][i]['position.0.z'] = dprobe['IN3']['ZSI'][i] return {}
def get_boundary_plasma_and_limiter(dev=None, sh=None, efi=None, times_=None, gs=None, limiter_only=False, debug=False, debug_out=None): """ Given gEQDSKs or instructions for looking up boundary & limiter information, get the plasma boundary and return it :param dev: string :param sh: int :param efi: string :param times_: float array Requested times. If g-files are provided, this is overruled and ignored. If this is not specified, the time base is read from data. :param gs: OMFITcollection instance containing gEQDSK instances as values and times as keys If this is provided, dev, sh, efi, and times_ are ignored. :param limiter_only: bool Skip gathering boundary; get limiter only. Should save time. :param debug: bool :param debug_out: dict-like [optional] :return: tuple ( t: 1D float array (nt) of times in ms n: 1D int array (nt) of number of valid boundary points at each time r: 2D float array (nt * nb) of boundary R coordinates in m (time first, then index), padded with NaNs z: 2D float array (nt * nb) of boundary Z coordinates in m (time first, then index), padded with NaNs rlim: 1D float array (nlim) of limiter R vertices in m zlim: 1D float array (nlim) of limiter Z vertices in m xr: 2D float array (nt * 2) of X-point R positions in m. Secondary X-point might be NaN if it isn't found. xz: 2D float array (nt * 2) of X-point Z positions in m. X-point order is [bottom, top] ) OR ( rlim: 1D float array (nlim) of limiter R vertices in m zlim: 1D float array (nlim) of limiter Z vertices in m ) """ if debug_out is None and debug: debug_out = {} if gs is not None: # Option 1: provide a collection of G-files # Get limiter from the first G-file. Assume the limiter doesn't change in time. g0 = gs.values()[0] rlim_ = g0['RLIM'] zlim_ = g0['ZLIM'] if limiter_only: return rlim_, zlim_ # Figure out dimensions and time base t_ = np.array(gs.keys()) nt = len(t) n = gs['NBBBS'] nmax = 0 for tt in t_: valid = np.isfinite(gs[tt]['RBBBS']) & (gs[tt]['RBBBS'] > 0) nmax = np.max([nmax, np.sum(valid)]) # Collect boundary r = np.empty((nt, nmax)) r[:] = np.NaN z = copy.copy(r) for i, tt in enumerate(t_): r[i, : n[i]] = gs[tt]['RBBBS'][: n[i]] z[i, : n[i]] = gs[tt]['ZBBBS'][: n[i]] # Get x-point. If you want both X-points, you'll have to use a different method. xr1, xz1 = gs['fluxSurfaces']['info']['xpoint'] xr2 = xz2 = np.array([np.NaN] * len(xr1)) xr_ = np.array([xr1, xr2]).T xz_ = np.array([xz1, xz2]).T else: from omfit_classes.omfit_eqdsk import read_basic_eq_from_mds # Option 2: read a reduced data set from MDSplus assert dev is not None, 'Must provide a device if not providing collection of gEQDSKs' assert sh is not None, 'Must provide a shot number if not providing collection of gEQDSKs' assert efi is not None, 'Must provide an EFIT ID if not providing collection of gEQDSKs' # Gather and extract basic data info = read_basic_eq_from_mds( device=dev, shot=sh, tree=efi, derived_quantities=['time'], g_file_quantities=['RLIM', 'ZLIM', 'LIM'] + ([] if limiter_only else ['RBBBS', 'ZBBBS', 'NBBBS']), a_file_quantities=[] if limiter_only else ['RXPT1', 'ZXPT1', 'RXPT2', 'ZXPT2'], ) if debug: debug_out['eq_info'] = info if info['LIM'] is None: rlim_ = info['RLIM'] zlim_ = info['ZLIM'] else: rlim_ = info['LIM'][:, 0] zlim_ = info['LIM'][:, 1] if limiter_only: return rlim_, zlim_ n_ = info['NBBBS'].astype(int) r_ = info['RBBBS'] z_ = info['ZBBBS'] blank = np.empty(len(info['time'])) blank[:] = np.NaN for thing in ['RXPT1', 'RXPT2', 'ZXPT1', 'ZXPT2']: if thing is None: info[thing] = blank xr_ = np.array([info['RXPT1'], info['RXPT2']]).T xz_ = np.array([info['ZXPT1'], info['ZXPT2']]).T # Handle time selection. Because the number of boundary points and their poloidal angles can change, # don't interpolate in time. Instead, just pick the nearest slice to each requested time. if times_ is None: treq = info['time'] else: treq = times_ nt = len(treq) nmax = n_.max() r = np.empty((nt, nmax)) r[:] = np.NaN z = copy.copy(r) n = np.empty(nt) n[:] = np.NaN t_ = copy.copy(n) for i in range(nt): j = closestIndex(info['time'], treq[i]) t_[i] = info['time'][j] n[i] = n_[j] r[i, : n_[i]] = r_[j, : n_[i]] z[i, : n_[i]] = z_[j, : n_[i]] n = n.astype(int) badx = (xr_ <= 0) | (xz_ <= -8) xr_[badx] = np.NaN xz_[badx] = np.NaN with warnings.catch_warnings(): warnings.filterwarnings('ignore', category=RuntimeWarning, message='invalid value encountered in .*') if (np.nanmean(xz_[:, 0]) > 0) or (np.nanmean(xz_[:, 1] < 0)): # X-points are in reverse order. Switch them so that bottom is first. xz_ = xz_[:, ::-1] xr_ = xr_[:, ::-1] if debug: debug_out['get_boundary_result'] = (t_, n, r, z, rlim_, zlim_, xr_, xz_) return t_, n, r, z, rlim_, zlim_, xr_, xz_
[docs]def get_shape_control_points(dev=None, sh=None, times_=None, debug=False, debug_out=None): """ Gathers shape control points describing targets for the plasma boundary to intersect and returns them :param dev: string :param sh: int :param times_: float array [optional] Times to use. If provided, interpolate data. If not, auto-assign using first valid segment. :param debug: bool :param debug_out: dict-like [optional] :return: tuple ( t: 1d float array (nt): times in ms (actual times used) r: 2d float array (nt * nseg): R coordinate in m of shape control points. Unused points are filled with NaN. z: 2d float array (nt * nseg): Z coordinate in m of shape control points rx: 2d float array (nt * 2): R coordinate in m of X points (2nd one may be NaN) rz: 2d float array (nt * 2): Z of X-points. Order is [bottom, top] rptnames: list of strings (nseg) giving labels for describing ctrl pts zptnames: list of strings (nseg) giving labels for describing ctrl pts list of 1D bool arrays vs. t giving validity of outer bottom, inner bottom, outer top, inner top strike pts ) """ assert dev is not None, 'Device must be specified in order to look up shape control data' assert sh is not None, 'Shot number must be specified in order to look up shape control data' if debug_out is None and debug: debug_out = {} if is_device(dev, 'DIII-D'): # Use this as a template. Copy and paste to set up other devices by adding elif ... segments = 18 # Maximum possible number of segments. first_segment = 1 # Starting number. Is the first segment 0 or 1? tree = None # MDSplus treename r_format = 'EFSSEG{:02d}RT' # r_format.format(segment_number) should give a valid MDSplus pointname z_format = 'EFSSEG{:02d}ZT' rx_format = 'EFSG{}RXT' # Like r_format, but for the target X-point coordinate zx_format = 'EFSG{}ZXT' label_format = '{:02d}' # List pairs of poitnames for [top, bottom] divertor control method for double null isoflux control algorithms: dn_ctrl_method_pointnames = [['IDIXCTLTOP', 'IDIXCTLBOT']] # List ptnames that might be in use for single null algorithms; which one is defined depends on algorithm in use div_ctrl_method_pointnames = ['ISISPMTHD', 'IUISPMTHD'] valid_ctrl_methods = [0] # Values of strike point method must be in this list to be valid valid_outer_strike_methods = [1, 3] valid_inner_strike_methods = [2, 3] factor_to_get_meters = 1.0 # Data are multiplied by this to put them in m factor_to_get_ms = 1.0 # Independent variables / timebases are multiplied by this to put them in ms else: raise OMFITexception('Control point data are not (yet) provided for device {}.'.format(dev)) if sh == 0: return None mkw = dict(server=dev, shot=sh, treename=tree) def get_dat(j0, j1, rf, zf, tt): """ Shortcut for gathering target data :param j0: int Starting index (included as 1st element) :param j1: int Ending index (not included; last element will be this - 1) :param rf: string Format for determining TDI by filling in a number :param zf: string Format for determining TDI by filling in a number :param tt: float array [optional] Times in ms. If provided, data will be interpolated to this timebase. If None, the timebase will be picked up from the first valid segment. :return: tuple ( t: 1D float array of actual times used in ms. Useful if input tt is None and times are auto-assigned. r: 2D float array (nt * nseg) giving time history of R coordinate in m for each segment; NaN for unused z: like R ) """ rr = [] zz = [] rm = None for j in range(j0, j1): rm = OMFITmdsValue(TDI=rf.format(j), **mkw) if (not rm.check()) or (abs(rm.data()).max() == 0): # Invalid data (fails check) or unused segment (all R=0) rr += [np.NaN] zz += [np.NaN] else: if tt is None: tt = rm.dim_of(0) zm = OMFITmdsValue(TDI=zf.format(j), **mkw) if not zm.check(): # Invalid data (fails check) rr += [np.NaN] zz += [np.NaN] else: r_ = interp1d(rm.dim_of(0) * factor_to_get_ms, rm.data(), bounds_error=False, fill_value=np.NaN)(tt) z_ = interp1d(zm.dim_of(0) * factor_to_get_ms, zm.data(), bounds_error=False, fill_value=np.NaN)(tt) rr += [r_ * factor_to_get_meters] zz += [z_ * factor_to_get_meters] # Replace scalar placeholder NaN entries with arrays of NaNs matching length of tt. empty = np.empty(len(tt)) empty[:] = np.NaN for j in range(len(rr)): if (len(np.atleast_1d(rr[j])) == 1) and np.isnan(np.atleast_1d(rr[j])): rr[j] = copy.copy(empty) zz[j] = copy.copy(empty) if debug: debug_out['rr_sample_get_dat'] = rr debug_out['rm_sample_get_dat'] = rm debug_out['tt_sample_get_dat'] = tt return tt, np.array(rr).T, np.array(zz).T times_, r, z = get_dat(first_segment, first_segment + segments, r_format, z_format, times_) times_, rx_0, zx_0 = get_dat(1, 3, rx_format, zx_format, times_) rptnames = zptnames = [label_format.format(i) for i in range(first_segment, first_segment + segments)] # Put X-points in the correct order rx_ = np.empty(np.shape(rx_0)) rx_[:] = np.NaN zx_ = copy.copy(rx_) with warnings.catch_warnings(): warnings.filterwarnings('ignore', category=RuntimeWarning, message='invalid value encountered in .*') top_first = (zx_0[:, 0] > 0) | (zx_0[:, 1] < 0) bot_first = (zx_0[:, 1] > 0) | (zx_0[:, 0] < 0) rx_[bot_first, :] = rx_0[bot_first, :] zx_[bot_first, :] = zx_0[bot_first, :] rx_[top_first, :] = rx_0[top_first, ::-1] zx_[top_first, :] = zx_0[top_first, ::-1] # Check X-point validity # Start with arrays of all-valid and then knock out times with invalid control methods. # For times or cases where the method cannot be determined, just ignore and let the X-point through. valid_xpt1 = np.ones(len(times_), bool) valid_xpt2 = np.ones(len(times_), bool) valid_outer_strike1 = copy.copy(valid_xpt1) valid_outer_strike2 = copy.copy(valid_xpt1) valid_inner_strike1 = copy.copy(valid_xpt1) valid_inner_strike2 = copy.copy(valid_xpt1) # First check single-null type control algorithms for dcmp in div_ctrl_method_pointnames: dcm = OMFITmdsValue(TDI=dcmp, **mkw) if not dcm.check(): # Bad signal; move on. The pointname to use can vary depending on which algorithm is used. continue method = interp1d(dcm.dim_of(0), dcm.data(), bounds_error=False, fill_value=np.NaN)(times_) valid = np.nanmax(np.array([method == v for v in valid_ctrl_methods] + [np.isnan(method)]), axis=0) valid_xpt1 *= valid valid_xpt2 *= valid # The second X-point probably doesn't even have a target defined, but let's just be sure. valid_os = np.nanmax(np.array([method == v for v in valid_outer_strike_methods] + [np.isnan(method)]), axis=0) valid_outer_strike1 *= valid_os valid_outer_strike2 *= valid_os valid_is = np.nanmax(np.array([method == v for v in valid_inner_strike_methods] + [np.isnan(method)]), axis=0) valid_inner_strike1 *= valid_is valid_inner_strike2 *= valid_is # Now double-null control algorithms for dcmps in dn_ctrl_method_pointnames: dcm_top = OMFITmdsValue(TDI=dcmps[0], **mkw) dcm_bot = OMFITmdsValue(TDI=dcmps[1], **mkw) if not (dcm_top.check() and dcm_bot.check()): # Unless we have both methods defined, move on. continue # Determine top and bottom X-point validity top_method = interp1d(dcm_top.dim_of(0), dcm_top.data(), bounds_error=False, fill_value=np.NaN)(times_) bot_method = interp1d(dcm_bot.dim_of(0), dcm_bot.data(), bounds_error=False, fill_value=np.NaN)(times_) nantop = np.isnan(top_method) nanbot = np.isnan(bot_method) # The + [nantop] or + [nanbot] is to include NaN in valid methods top_valid = np.nanmax(np.array([top_method == v for v in valid_ctrl_methods] + [nantop]), axis=0) bot_valid = np.nanmax(np.array([bot_method == v for v in valid_ctrl_methods] + [nanbot]), axis=0) top_os_valid = np.nanmax(np.array([top_method == v for v in valid_outer_strike_methods] + [nantop]), axis=0) bot_os_valid = np.nanmax(np.array([bot_method == v for v in valid_outer_strike_methods] + [nanbot]), axis=0) top_is_valid = np.nanmax(np.array([top_method == v for v in valid_inner_strike_methods] + [nantop]), axis=0) bot_is_valid = np.nanmax(np.array([bot_method == v for v in valid_inner_strike_methods] + [nanbot]), axis=0) # Map top X and bottom X to X1 and X2 w1top = zx_[:, 0] > 0 w2top = zx_[:, 1] > 0 valid_xpt1[w1top] *= top_valid[w1top] valid_xpt1[~w1top] *= bot_valid[~w1top] valid_xpt2[w2top] *= top_valid[w2top] valid_xpt2[~w2top] *= bot_valid[~w2top] valid_outer_strike1[w1top] *= top_os_valid[w1top] valid_outer_strike1[~w1top] *= bot_os_valid[~w1top] valid_outer_strike2[w2top] *= top_os_valid[w2top] valid_outer_strike2[~w2top] *= bot_os_valid[~w2top] valid_inner_strike1[w1top] *= top_is_valid[w1top] valid_inner_strike1[~w1top] *= bot_is_valid[~w1top] valid_inner_strike2[w2top] *= top_is_valid[w2top] valid_inner_strike2[~w2top] *= bot_is_valid[~w2top] # Suppress Xpt targets for times with invalid divertor control methods (no direct X-point control) rx_[~valid_xpt1, 0] = np.NaN zx_[~valid_xpt1, 0] = np.NaN rx_[~valid_xpt2, 1] = np.NaN zx_[~valid_xpt2, 1] = np.NaN # Pack flags for valid strike points because we don't know which shape control points are strike points yet valid_strikes = [valid_outer_strike1, valid_inner_strike1, valid_outer_strike2, valid_inner_strike2] if debug: debug_out['valid_xpt1'] = valid_xpt1 debug_out['valid_xpt2'] = valid_xpt2 debug_out['get_control_points_result'] = (times_, r, z, rx_, zx_) return times_, r, z, rx_, zx_, rptnames, zptnames, valid_strikes
def flag_points_on_limiter(rlim_, zlim_, rc, zc, tolerance=0.0116, debug=False, debug_out=None): """ Determines which control points are touching or outside of the limiting surface. These aren't part of the main plasma boundary control, but instead they are part of the divertor control solution. They should be analyzed separately for some purposes. :param rlim_: 1d float array (nlim) R coordinates of limiting surface in m :param zlim_: 1d float array (nlim) Z coordinates of limiting surface in m :param rc: 2d float array (nt * nseg) R coordinates of control points vs. time in m :param zc: 2d float array (nt * nseg) Z coordinates of control points vs. time in m :param tolerance: float Tolerance for distance between point and limiter, in m :param debug: bool Save stuff in a dict :param debug_out: dict-like [optional] :return: 1d bool array (nseg) flagging whether each control point is on the wall or not """ try: import shapely.geometry except ImportError: shapely = None if debug_out is None and debug: debug_out = {} # It may be possible for a main plasma boundary point to briefly touch the wall if the plasma limits transiently, # but we don't expect it to maintain contact long-term. If a control point is on the main boundary, it won't # transition into being a divertor point unless there's a phase change. We don't want to deal with this right now, # so just please pick a time range that stays in the same phase. # Pick sample times nt = len(rc[:, 0]) nseg = len(rc[0, :]) nb = len(rlim_) with warnings.catch_warnings(): warnings.filterwarnings('ignore', category=RuntimeWarning, message='All-NaN slice encountered') rcm = np.nanmax(rc, axis=1) # Consider control points that are used (not all NaNs) ii = np.arange(nt).astype(np.int64) # Get a general index iavail = ii[~np.isnan(rcm)] # Narrow available indices to times when control points aren't NaN na = len(iavail) isamp = ii[np.array([na // 3, na // 2, 2 * na // 3], np.int64)] # 3 points near the middle of the valid set lim_curve = shapely.geometry.LineString([(rl, zl) for rl, zl in zip(rlim_, zlim_)]) lim_poly = shapely.geometry.Polygon([(rl, zl) for rl, zl in zip(rlim_, zlim_)]) on_limiter_ = np.zeros(nseg, bool) outside_lim_ = np.zeros(nseg, bool) if shapely is not None: # This way is about 7 times faster deviations = np.empty((len(isamp), nseg), float) deviations[:] = np.NaN for j in range(nseg): on_lim = [] out_lim = [] for ii, i in enumerate(isamp): if np.isnan(rc[i, j]) or np.isnan(zc[i, j]): continue point = shapely.geometry.Point(rc[i, j], zc[i, j]) out_lim += [not lim_poly.contains(point)] deviations[ii, j] = point.distance(lim_curve) if abs(deviations[ii, j]) < tolerance: on_lim += [True] break else: on_lim += [False] on_limiter_[j] = np.all(on_lim) # A real limiter / divertor point would be limited on all of these. outside_lim_[j] = np.any(out_lim) else: # This way is clunky, but it doesn't require shapely, which OMFIT doesn't require skip_thresh = tolerance * 0.5 on_limiter_ = np.zeros(nseg, bool) deviations = np.empty((len(isamp), nseg, nb)) deviations[:] = np.NaN for j in range(nseg): on_lim = [] for ii, i in enumerate(isamp): r = rc[i, j] z = zc[i, j] for k in range(nb - 1): r0 = rlim_[k] r1 = rlim_[k + 1] z0 = zlim_[k] z1 = zlim_[k + 1] cross_product = (z - z0) * (r1 - r0) - (r - r0) * (z1 - z0) if abs(cross_product) > skip_thresh: # Cross product is so big it's not worth computing a deviation continue if (z1 == z0) and (r1 == r0): deviation = np.sqrt((z - z1) ** 2 + (r - r1) ** 2) else: db = np.sqrt((z1 - z0) ** 2 + (r1 - r0) ** 2) deviation = cross_product / db deviations[ii, j, k] = deviation if abs(deviation) <= tolerance: on_lim += [True] break else: on_lim += [False] on_limiter_[j] = np.all(on_lim) # A real limiter / divertor point would be limited on all of these. final_on_limiter = np.array(on_limiter_).astype(bool) | np.array(outside_lim_).astype(bool) if debug: debug_out['flag_lim_outside'] = outside_lim_ debug_out['flag_lim_on_lim'] = on_limiter_ debug_out['flag_points_on_limiter_result'] = final_on_limiter debug_out['flag_lim_isamp'] = isamp debug_out['flag_lim_deviations'] = deviations return final_on_limiter
[docs]def classify_ods_eq_topology(ods, dn_tolerance=0.001): """ Figure out whether the shape is USN, LSN, or limited :param ods: ODS instance :param dn_tolerance: float Tolerance (in terms of normalized psi difference) for declaring a double null :return: 1D float array Flag indicating whether each equilibrium slice is LSN (-1), USN (+1), DN (0), or unknown/limited (np.NaN) """ eq_time = ods['equilibrium.time'] xr = ods['equilibrium.time_slice.:.boundary.x_point.:.r'] xz = ods['equilibrium.time_slice.:.boundary.x_point.:.z'] eqr = ods['equilibrium.time_slice'][0]['profiles_2d.0.grid.dim1'] eqz = ods['equilibrium.time_slice'][0]['profiles_2d.0.grid.dim2'] psirz = ods['equilibrium.time_slice.:.profiles_2d.0.psi'] psia = ods['equilibrium.time_slice.:.global_quantities.psi_axis'] psib = ods['equilibrium.time_slice.:.global_quantities.psi_boundary'] dpsi = psib - psia psin = (psirz - psia[:, np.newaxis, np.newaxis]) / dpsi[:, np.newaxis, np.newaxis] topology = np.empty(len(eq_time)) # 0 is ~exact DN, 1 is USN, -1 is LSN, np.NaN is limited topology[:] = np.NaN for it in range(len(eq_time)): if np.any(~np.isnan(xr[it]) & ~np.isnan(xz[it])): xz_slice = xz[it, :] sel = ~np.isnan(xz_slice) xr_slice = xr[it, sel] xz_slice = xz_slice[sel] if len(xz_slice) == 1: topology[it] = np.sign(xz_slice[0]) elif len(xz_slice) > 1: xpsin_ = RectBivariateSpline(eqr, eqz, psin[it, :, :])(xr_slice, xz_slice, grid=False) xidx = abs(xpsin_ - 1).argsort() xpsin = xpsin_[xidx] xz_sorted = xz_slice[xidx] if abs(xpsin[0] - xpsin[1]) < dn_tolerance: topology[it] = 0 else: topology[it] = np.sign(xz_sorted[0] - xz_sorted[1]) return topology
[docs]def get_strike_point(ods, in_out='outer', pri_sec='primary'): """ Identify a specific strike point and get its coordinates It's easy to just pick a strike point, or even find inner-upper or outer-lower, but identifying primary-outer or primary-inner is a little harder to do. It's not clear that there's any consistent ordering of strike points that would trivialize this process, so this function exists to do it for you. :param ods: ODS instance :param in_out: str 'outer': try to get an outer strike point (default) 'inner': try to get an inner strike point :param pri_sec: str 'primary': try to get a strike point connected to the primary X-point (default) 'secondary': try to get a strike point connected with a secondary X-point :return: tuple time: 1D float array (s) r_strike: 1D float array (m) z_strike: 1D float array (m) """ if pri_sec == 'secondary': pri_sec_flip = -1 else: pri_sec_flip = 1 eq_time = ods['equilibrium.time'] r_strike_all = ods['equilibrium.time_slice.:.boundary.strike_point.:.r'] z_strike_all = ods['equilibrium.time_slice.:.boundary.strike_point.:.z'] topology = classify_ods_eq_topology(ods) preferred_strike_idx = np.empty(len(eq_time), int) preferred_strike_idx[:] = -1 for it in range(len(eq_time)): if (topology[it] * pri_sec_flip) > 0: sel = z_strike_all[it, :] > 0 elif (topology[it] * pri_sec_flip) < 0: sel = z_strike_all[it, :] < 0 else: continue if in_out == 'inner': sel &= r_strike_all[it, :] == np.min(r_strike_all[it, sel]) else: sel &= r_strike_all[it, :] == np.max(r_strike_all[it, sel]) if sum(sel) == 1: preferred_strike_idx[it] = np.where(sel)[0][0] r_strike = r_strike_all[range(len(eq_time)), preferred_strike_idx] z_strike = z_strike_all[range(len(eq_time)), preferred_strike_idx] return eq_time, r_strike, z_strike
def select_critical_time_samples(t, x): """ Identify samples that should be kept to allow information to be reconstructed trivially by linear interpolation :param t: 1D float array (nt) :param x: 2D array with first dimension matching t (nt * nx) :return: 1D bool array (nt) """ dxdt = np.gradient(x, axis=0) / np.gradient(t)[:, np.newaxis] dx2 = np.gradient(dxdt, axis=0) keep_sample = ~np.product(dx2 == 0, axis=1).astype(bool) # Make sure the endpoints are always retained keep_sample[0] = True keep_sample[-1] = True printd('select_critical_time_samples identified {} out of {} points as critical'.format(np.sum(keep_sample), len(t))) return keep_sample
[docs]def setup_position_control_hardware_description_general(ods, shot, device=None, limiter_efit_id=None, debug=False, debug_out=None): """ Loads DIII-D shape control data into the ODS :param ods: ODS instance :param shot: int :param device: string Only works for devices that have pointnames and other info specified. DIII-D should work. :param limiter_efit_id: string [optional] :param debug: bool :param debug_out: dict-like Catches debug output :return: dict Information or instructions for follow up in central hardware description setup """ # Handle input keyword defaults if debug and debug_out is None: debug_out = {} # Set up shortcuts pc = ods['pulse_schedule.position_control'] bry = pc['boundary_outline'] xpt = pc['x_point'] stk = pc['strike_point'] # Fetch primary data: boundary outline and X-points t = None t, rctrl, zctrl, rx, zx, rptnames, zptnames, valid_strikes = get_shape_control_points(device, shot, t, debug=debug, debug_out=debug_out) # Suppress invalid values rzero = rctrl == 0 rctrl[rzero] = np.NaN zctrl[rzero] = np.NaN # Get limiter try: rlim = ods['wall']['description_2d'][0]['limiter']['unit'][0]['outline']['r'] zlim = ods['wall']['description_2d'][0]['limiter']['unit'][0]['outline']['z'] except (KeyError, IndexError, ValueError): if limiter_efit_id is None: if is_device(device, 'DIII-D'): limiter_efit_id = 'EFITRT1' elif is_device(device, 'EAST'): limiter_efit_id = 'EFIT_EAST' else: raise ValueError('No rule for auto picking EFITID for device {}. Please specify limiter_efit_id.'.format(device)) rlim, zlim = get_boundary_plasma_and_limiter(device, shot, limiter_efit_id, t, limiter_only=True, debug=True, debug_out=debug_out) if debug: debug_out['rlim'] = rlim debug_out['zlim'] = zlim # Distinguish boundary from strike points on_limiter = flag_points_on_limiter(rlim, zlim, rctrl, zctrl, debug=debug, debug_out=debug_out) # Compress clump = np.concatenate((rctrl, zctrl, rx, zx), axis=1) clump[np.isnan(clump)] = 0 # NaNs will ruin the diffs and != 0 checks keep_sample = select_critical_time_samples(t, clump) if debug: debug_out['t_before'] = copy.copy(t) debug_out['rx_before'] = copy.copy(rx[:, 0]) debug_out['t_filtered'] = t[keep_sample] debug_out['rx_filtered'] = rx[keep_sample, 0] rctrl = rctrl[keep_sample, :] zctrl = zctrl[keep_sample, :] rx = rx[keep_sample, :] zx = zx[keep_sample, :] t = t[keep_sample] # Organize data rstrk = rctrl[:, on_limiter] zstrk = zctrl[:, on_limiter] rbdry = rctrl[:, ~on_limiter] zbdry = zctrl[:, ~on_limiter] rbpt = np.array(rptnames)[~on_limiter] zbpt = np.array(zptnames)[~on_limiter] rspt = np.array(rptnames)[on_limiter] zspt = np.array(zptnames)[on_limiter] # Load to ODS pc['time'] = t * 1e-3 for i in range(len(rbpt)): bry[i]['r.reference_type'] = bry[i]['z.reference_type'] = 1 bry[i]['r.reference_name'] = rbpt[i] bry[i]['z.reference_name'] = zbpt[i] bry[i]['r.reference'] = rbdry[:, i] bry[i]['z.reference'] = zbdry[:, i] for i in range(2): xpt[i]['r.reference_type'] = xpt[i]['z.reference_type'] = 1 xpt[i]['r.reference_name'] = xpt[i]['z.reference_name'] = ['Lower X-point', 'Upper X-point'][i] xpt[i]['r.reference'] = rx[:, i] xpt[i]['z.reference'] = zx[:, i] for i in range(len(rspt)): # Find any other strike points that have the same sign of Z with warnings.catch_warnings(): warnings.filterwarnings('ignore', category=RuntimeWarning, message='invalid value.*') warnings.filterwarnings('ignore', category=RuntimeWarning, message='Mean of empty slice') buddy = [j for j in range(len(rspt)) if (j != i) and ((np.nanmean(zstrk[:, j]) > 0) == (np.nanmean(zstrk[:, i]) > 0))] istop = np.nanmean(zstrk[:, i]) > 0 if len(buddy) == 1: # One other strike point with same sign of Z: compare to it isouter = np.nanmean(rstrk[:, i]) > np.nanmean(rstrk[:, buddy[0]]) else: # No other strike points or too many other strike points with same sign of Z: compare to X-point instead isouter = np.nanmean(rstrk[:, i]) > np.nanmean(rx[:, int(istop)]) if ~np.isnan(istop) & ~np.isnan(isouter): this_valid = valid_strikes[2 * istop + isouter][keep_sample] rstrk[~this_valid, i] = np.NaN zstrk[~this_valid, i] = np.NaN if debug: debug_out['this_strike_valid_{}'.format(i)] = this_valid stk[i]['r.reference_type'] = stk[i]['z.reference_type'] = 1 stk[i]['r.reference_name'] = rspt[i] stk[i]['z.reference_name'] = zspt[i] stk[i]['r.reference'] = rstrk[:, i] stk[i]['z.reference'] = zstrk[:, i] if debug: debug_out['rstrike'] = rstrk debug_out['zstrike'] = zstrk debug_out['keep_sample'] = keep_sample return {}
[docs]def setup_pulse_schedule_hardware_description_general(ods, pulse, device=None): """ Sets up the pulse_schedule ODS, which holds control information This is a pretty broad category! Not all subsets are supported so far. Sorry. :param ods: ODS instance :param pulse: int :param device: string :return: dict Information or instructions for follow up in central hardware description setup """ def update_instructions(ins, new_ins): if 'postcommands' in new_ins: if 'postcommands' in ins: ins['postcommands'] += new_ins['postcommands'] else: ins['postcommands'] = new_ins['postcommands'] instructions = {} update_instructions(instructions, setup_position_control_hardware_description_general(ods, pulse, device=device)) return instructions
[docs]def pf_coils_to_ods(ods, coil_data): """ Transfers poloidal field coil geometry data from a standard format used by efitviewer to ODS. WARNING: only rudimentary identifies are assigned. You should assign your own identifiers and only rely on this function to assign numerical geometry data. :param ods: ODS instance Data will be added in-place :param coil_data: 2d array coil_data[i] corresponds to coil i. The columns are R (m), Z (m), dR (m), dZ (m), tilt1 (deg), and tilt2 (deg) This should work if you just copy data from iris:/fusion/usc/src/idl/efitview/diagnoses/<device>/*coils*.dat (the filenames for the coils vary) :return: ODS instance """ from omas.omas_plot import geo_type_lookup rect_code = geo_type_lookup('rectangle', 'pf_active', ods.imas_version, reverse=True) outline_code = geo_type_lookup('outline', 'pf_active', ods.imas_version, reverse=True) nc = len(coil_data[:, 0]) for i in range(nc): ods['pf_active.coil'][i]['name'] = ods['pf_active.coil'][i]['identifier'] = 'PF{}'.format(i) if (coil_data[i, 4] == 0) and (coil_data[i, 5] == 0): rect = ods['pf_active.coil'][i]['element.0.geometry.rectangle'] rect['r'] = coil_data[i, 0] rect['z'] = coil_data[i, 1] rect['width'] = coil_data[i, 2] # Or width in R rect['height'] = coil_data[i, 3] # Or height in Z ods['pf_active.coil'][i]['element.0.geometry.geometry_type'] = rect_code else: outline = ods['pf_active.coil'][i]['element.0.geometry.outline'] fdat = coil_data[i] fdat[4] = -coil_data[i, 4] * np.pi / 180.0 fdat[5] = -(coil_data[i, 5] * np.pi / 180.0 if coil_data[i, 5] != 0 else np.pi / 2) outline['r'] = [ fdat[0] - fdat[2] / 2.0 - fdat[3] / 2.0 * np.tan((np.pi / 2.0 + fdat[5])), fdat[0] - fdat[2] / 2.0 + fdat[3] / 2.0 * np.tan((np.pi / 2.0 + fdat[5])), fdat[0] + fdat[2] / 2.0 + fdat[3] / 2.0 * np.tan((np.pi / 2.0 + fdat[5])), fdat[0] + fdat[2] / 2.0 - fdat[3] / 2.0 * np.tan((np.pi / 2.0 + fdat[5])), ] outline['z'] = [ fdat[1] - fdat[3] / 2.0 - fdat[2] / 2.0 * np.tan(-fdat[4]), fdat[1] + fdat[3] / 2.0 - fdat[2] / 2.0 * np.tan(-fdat[4]), fdat[1] + fdat[3] / 2.0 + fdat[2] / 2.0 * np.tan(-fdat[4]), fdat[1] - fdat[3] / 2.0 + fdat[2] / 2.0 * np.tan(-fdat[4]), ] ods['pf_active.coil'][i]['element.0.geometry.geometry_type'] = outline_code return ods
[docs]def toray_to_omas(ech_nml, uf_azi, uf_pol, uf_pow, ods=None, root=None): """ Given 3 ECH UFILEs (azimuthal angle, polar angle, and power), return an ODS with the launcher info filled out :param ech_nml: A TORAY namelist :param uf_azi: An ECH Ufile containing the azimuthal angle :param uf_pol: An ECH Ufile containing the polar angle :param uf_pow: An ECH Ufile containing the power :param ods: An existing ODS, to which to add the launcher info from the Ufiles :param root: An OMFITmodule instance (for experiment and generic OMFIT info) """ if ods is None: ods = ODS() t_in = ech_nml ec = ods['ec_launchers'] if root is not None: add_experiment_info_to_ods(ods, root=root) add_generic_OMFIT_info_to_ods(ods, root=root) ids_prop = ec['ids_properties'] ids_prop['comment'] = 'ECH data from Ufile input in OMFIT' ids_prop['source'] = ids_prop['comment'] ids_prop['homogeneous_time'] = True ids_prop['provider'] = os.environ['USER'] ids_prop['creation_date'] = time.ctime() ec['time'] = uf_pow['X0']['data'] numb = int(t_in['NANTECH']) myphi = float(t_in['PHECECH']) * np.pi / 180.0 # toroidal position of the source in [deg]. Useless for axi-symetric plasma if compare_version(ods.imas_version, "3.37.0") >= 0: beam = "beam" else: beam = "launcher" for i in range(numb): length = len(uf_azi['F']['data'][:, i]) myfreq = t_in['FREQECH'][i] antenna = ec[beam][i] antenna['frequency.data'] = [myfreq] * length antenna['steering_angle_pol.data'] = (uf_pol['F']['data'][:, i] - 90.0) * np.pi / 180.0 # polar angle from ECA UFILE in [deg] antenna['steering_angle_tor.data'] = (uf_azi['F']['data'][:, i] - 180.0) * np.pi / 180.0 # Azimutal angle from ECB UFILE in [deg] antenna['name'] = 'EC%d' % (i + 2) # arbitrary name antenna['identifier'] = 'EC%d' % (i + 2) # arbitrary identifier antenna['power_launched.data'] = uf_pow['F']['data'][:, i] # power en W, from ECP UFILE myr = float(t_in['XECECH'][i]) / 100.0 # R position of the source from TRANSP input in [cm] myz = float(t_in['ZECECH'][i]) / 100.0 # Z position of the source from TRANSP input in [cm] antenna['launching_position.r'] = [myr] * length antenna['launching_position.z'] = [myz] * length antenna['launching_position.phi'] = [myphi] * length return ods
[docs]def nubeam_to_omas(nb_nml, nb_uf=None, ods=None, root=None): """ Given a NUBEAM namelist and UFILE, return an ODS with the beam info filled out :param nb_nml: A NUBEAM namelist :param nb_uf: A NUBEAM Ufile :param ods: An existing ODS, to which to add the beam info from the namelist and Ufile :param root: An OMFITmodule instance (for experiment and generic OMFIT info) """ import pint ureg = pint.UnitRegistry() cm = ureg('cm') deg = ureg('deg') meter = ureg('m') if ods is None: ods = ODS() t_in = nb_nml nbi = ods['nbi'] if root is not None: add_experiment_info_to_ods(ods, root=root) add_generic_OMFIT_info_to_ods(ods, root=root) ids_prop = nbi['ids_properties'] ids_prop['comment'] = 'NBI data produced from NUBEAM namelist and Ufile input in OMFIT' ids_prop['source'] = ids_prop['comment'] ids_prop['homogeneous_time'] = True ids_prop['provider'] = os.environ['USER'] ids_prop['creation_date'] = time.ctime() if nb_uf is not None: nbi['time'] = nb_uf['X0']['data'] if 'BEAMS' in nb_uf: numb = len(nb_uf['BEAMS']) beams = nb_uf['BEAMS'] else: numb = int(nb_uf['SCALARS'][0]['data']) beams = ['beam_%d' % i for i in range(numb)] else: nbi['time'] = [0] numb = len(nb_nml['PINJA']) beams = ['beam_%d' % i for i in range(numb)] def get_arr_var(avar, ind): """ A convenience function to do return t_in[avar][ind] if avar in t_in else t_in[avar[:-1]] """ if not avar.endswith('A'): raise ValueError('The variable requested, %s, does not end with A, and is probably an error' % avar) return t_in[avar][ind] if avar in t_in else t_in[avar[:-1]] for bi, beam in enumerate(beams): unit = nbi['unit.%d' % bi] unit['name'] = beam unit['identifier'] = beam unit['species.a'] = get_arr_var('ABEAMA', bi) unit['species.z_n'] = get_arr_var('XZBEAMA', bi) if nb_uf is not None: cf = unit['beam_current_fraction.data'] = np.array( [ nb_uf['F']['data'][:, numb * 2 + bi], nb_uf['F']['data'][:, numb * 3 + bi], [ 0 if a == 0 and b == 0 else 1 - a - b for a, b in zip(nb_uf['F']['data'][:, numb * 2 + bi], nb_uf['F']['data'][:, numb * 3 + bi]) ], ] ) else: ffulla = get_arr_var('FFULLA', bi) fhalfa = get_arr_var('FHALFA', bi) unit['beam_current_fraction.data'] = cf = [[ffulla], [fhalfa], [1 - ffulla - fhalfa]] # unit['beam_current_fraction.time'] = homogeneous time tmp = cf / (np.array([1.0, 2.0, 3.0])[:, np.newaxis]) with warnings.catch_warnings(): warnings.filterwarnings('ignore', category=RuntimeWarning, message='invalid value encountered in true_divide') stmp = np.sum(tmp, axis=0) stmp[stmp == 0.0] = 1.0 unit['beam_power_fraction.data'] = tmp / stmp # unit['beam_power_fraction.time'] = homogeneous time if nb_uf is not None: unit['energy.data'] = nb_uf['F']['data'][:, numb + bi] # NUBEAM already in eV # unit['energy.time'] = homogeneous time unit['power_launched.data'] = nb_uf['F']['data'][:, bi] else: unit['power_launched.data'] = [get_arr_var('PINJA', bi)] unit['energy.data'] = [get_arr_var('EINJA', bi)] # unit['power_launched.time'] = homogeneous time bg = unit['beamlets_group.0'] angv = bg['angle'] = -np.arcsin((get_arr_var('XYBSCA', bi) - get_arr_var('XYBAPA', bi)) / get_arr_var('XLBAPA', bi)) bg['position.z'] = get_arr_var('XYBSCA', bi) * cm bg['position.r'] = np.sqrt(get_arr_var('XLBTNA', bi) ** 2 * np.cos(angv) + get_arr_var('RTCENA', bi) ** 2) * cm # bg['position.phi'] - Toroidal angle (oriented counter-clockwise when viewing from above) # XBZETA(IB)= toroidal angle of the beam source in an (R,zeta,Z) # right handed coordinate system (deg). bg['position.phi'] = -get_arr_var('XBZETA', bi) * deg # bg['direction'] - Direction of the beam seen from above the torus: # -1 = clockwise; 1 = counter clockwise # NLCO - .TRUE. if the beam is CO-injecting with the plasma current. # NLJCCW - .TRUE. ! orientation of Ip (.TRUE. for counter-clockwise) # Here "counter-clockwise" is as seen looking down on the tokamak from above. bg['direction'] = (2 * int(t_in['NLJCCW']) - 1) * (2 * int(t_in['NLCO'][bi]) - 1) bg['tangency_radius'] = get_arr_var('RTCENA', bi) * cm bg['focus.focal_length_horizontal'] = get_arr_var('FOCLRA', bi) * cm bg['focus.focal_length_vertical'] = get_arr_var('FOCLZA', bi) * cm dc = bg['divergence_component.0'] dc['particles_fraction'] = 1.0 dc['vertical'] = get_arr_var('DIVZA', bi) dc['horizontal'] = get_arr_var('DIVRA', bi) shape = get_arr_var('NBSHAPA', bi) if shape == 1: bg['width_horizontal'] = get_arr_var('BMWIDRA', bi) * 2 * cm bg['width_vertical'] = get_arr_var('BMWIDZA', bi) * 2 * cm else: raise NotImplementedError('IMAS only has rectangular sources') R = bg['position.r'] * meter # Radius of source PHI = bg['position.phi'] # Angle of source XLBAPA = get_arr_var('XLBAPA', bi) * cm XYBAPA = get_arr_var('XYBAPA', bi) * cm RTCENA = get_arr_var('RTCENA', bi) * cm XLBTNA = get_arr_var('XLBTNA', bi) * cm aperture = unit['aperture.0.centre'] # SPS 19 Dec 2019 # Verified that the following maintained constant radial aperture # (of value less than R, and satifying law of cosines) # while PHI rotated through 2pi # Right triangle with legs XLBTNA and RTCENA, angb opposite RTCENA angb = np.arctan2(RTCENA, XLBTNA) * bg['direction'] # Law of cosines # aperture['r'] = sqrt(R**2 + (XLBAPA * cos(angv))**2 - 2 * R * XLBAPA * cos(angv) * cos(angb)) x_disp_from_source = XLBAPA * np.cos(angv) * np.cos(PHI - np.pi + angb) y_disp_from_source = XLBAPA * np.cos(angv) * np.sin(PHI - np.pi + angb) x_source = R * np.cos(PHI) y_source = R * np.sin(PHI) x_ap = x_source + x_disp_from_source y_ap = y_source + y_disp_from_source aperture['r'] = (np.sqrt(x_ap**2 + y_ap**2)).to(meter).magnitude aperture['phi'] = (np.arctan2(y_ap, x_ap)).to(ureg.radian).magnitude aperture['z'] = XYBAPA.to(meter).magnitude ''' nbi%unit(:)%beamlets_group(:)%beamlets%tangency_radii(:) machine nbi%unit(:)%beamlets_group(:)%beamlets%angles(:) machine nbi%unit(:)%beamlets_group(:)%beamlets%power_fractions(:) machine nbi%unit(:)%beamlets_group(:)%beamlets%positions%r(:) machine nbi%unit(:)%beamlets_group(:)%beamlets%positions%z(:) machine nbi%unit(:)%beamlets_group(:)%beamlets%positions%phi(:) machine ''' return ods
[docs]def transp_ic_to_omas(tr_in, ods=None, uf_p=None, uf_f=None, root=None): """ Convert the TRANSP input namelist, tr_in, to omas :param tr_in: A TRANSP input namelist (TR.DAT) :param ods: An existing ODS object to update with IC antenna/power info :param uf_p: A ufile with power traces :param uf_f: A ufile with frequency traces :param root: An OMFITmodule instance (for experiment and generic OMFIT info) """ import pint ureg = pint.UnitRegistry() cm = ureg('cm') deg = ureg('deg') if ods is None: ods = ODS() if root is not None: add_experiment_info_to_ods(ods, root=root) add_generic_OMFIT_info_to_ods(ods, root=root) ic = ods['ic_antennas'] ids_prop = ic['ids_properties'] ids_prop['comment'] = 'ICRF data produced from TRANSP namelist and Ufile input in OMFIT' ids_prop['source'] = ids_prop['comment'] ids_prop['homogeneous_time'] = False ids_prop['provider'] = os.environ['USER'] ids_prop['creation_date'] = time.ctime() tr_in = tr_in.flatten() na = tr_in['NICHA'] def get_arr_var(avar, ind): """ A convenience function to do return tr_in[avar][ind] if avar in tr_in else tr_in[avar[:-1]] """ if not avar.endswith('A'): raise ValueError('The variable requested, %s, does not end with A, and is probably an error' % avar) return tr_in[avar][ind] if avar in tr_in else tr_in[avar[:-1]] for ai in range(na): ant = ic['antenna.%s' % ai] mod = ant['module.0'] mod['name'] = ant['name'] = str(ai) mod['identifier'] = ant['identifier'] = str(ai) if uf_f is None: ton = get_arr_var('TONICHA', ai) toff = get_arr_var('TOFFICHA', ai) dt = 0.1 * (toff - ton) mod['frequency.time'] = ant['frequency.:.time'] = [ton - dt, ton - dt / 1e3, ton, toff, toff + dt / 1e3, toff + dt] freq = get_arr_var('FRQICHA', ai) mod['frequency.data'] = ant['frequency.:.data'] = [0, 0, freq, freq, 0, 0] else: mod['frequency.time'] = ant['frequency.time'] = uf_f['X0']['data'] mod['frequency.data'] = ant['frequency.data'] = uf_f['F']['data'][:, ai] if uf_p is None: ton = get_arr_var('TONICHA', ai) toff = get_arr_var('TOFFICHA', ai) dt = 0.1 * (toff - ton) power = get_arr_var('PRFICHA', ai) mod['power_launched.time'] = ant['power_launched.time'] = [ton - dt, ton - dt / 1e3, ton, toff, toff + dt / 1e3, toff + dt] mod['power_launched.data'] = ant['power_launched.data'] = [0, 0, power, power, 0, 0] else: mod['power_launched.time'] = ant['power_launched.time'] = uf_p['X0']['data'] mod['power_launched.data'] = ant['power_launched.data'] = uf_p['F']['data'][:, ai] # ant['power_launched.data'] = uf_p # Need real ufile to fill this in strap = mod['strap.0'] rant = tr_in['RGEOANT_A'][:, ai] zant = tr_in['YGEOANT_A'][:, ai] ind = ~np.isnan(rant) & ~np.isnan(zant) r = strap['outline.r'] = rant[ind] * cm strap['outline.z'] = zant[ind] * cm strap['outline.phi'] = [0] * len(r) # Not in the TRANSP input namelist... strap['distance_to_conductor'] = tr_in['RFARTR_A'][ai] * cm if root and is_device(root['SETTINGS']['EXPERIMENT']['device'], 'JET'): phase = 0 nphi = tr_in['NNPHI'][1, ai + 1] if nphi == 27: phase = ([-np.pi / 2.0, np.pi / 2] * 2 + [0])[ai] else: print('tr_in["NNPHI"]=', tr_in['NNPHI']) raise NotImplementedError('Spectrum %s not yet adapted' % nphi) mod['phase_forward.time'] = mod['power_launched.time'] mod['phase_forward.data'] = [phase] * len(mod['phase_forward.time']) ic['time'] = [tr_in['TINIT'], tr_in['FTIME']] # Not used, but filled with total TRANSP time return ods
# Missing # ic_antennas%antenna(:)%module(:)%phase%data(:) # ic_antennas%antenna(:)%module(:)%phase%time(:) # ic_antennas%antenna(:)%module(:)%strap(:)%outline%phi machine # ic_antennas%antenna(:)%module(:)%strap(:)%width_tor machine # ic_antennas%code optional # ic_antennas%reference_point%r optional # ic_antennas%reference_point%z optional
[docs]def make_sample_ods(device='test', shot=123456, efitid='fakesample'): """ Generates an ODS with some minimal test data, including a sample equilibrium and at least one hardware system :param device: string :param shot: int :param efitid: string :return: ODS """ ods = ODS() ods.sample_equilibrium() ods.sample_gas_injection() ods['dataset_description.data_entry']['machine'] = device ods['dataset_description.data_entry']['pulse'] = shot ods['dataset_description.ids_properties.comment'] = 'Test data; EFIT tree = {}'.format(efitid) return ods
[docs]def orthogonal_distance(ods, r0, z0, grid=True, time_range=None, zf=5, maxstep=5e-4, minstep=1e-5, maxsteps=5000, debug=False): """ Calculates the orthogonal distance from some point(s) to the LCFS Works by stepping along the steepest gradient until the LCFS is reached. :param ods: ODS instance A complete, top level ODS, or selection of time slices from equilibrium. That is, `ods['equilibrium']['time_slice']`, if `ods` is a top level ODS. :param r0: 1D float array R coordinates of the point(s) in meters. If grid=False, the arrays must have length matching the number of time slices in equilibrium. :param z0: 1D float array Z coordinates of the point(s) in meters. Length must match r0 :param grid: bool Return coordinates on a time-space grid, assuming each point in R-Z is static and given a separate history. Otherwise (grid=False), assume one point is given & it moves in time, so return array will be 1D vs. time. :param time_range: 2 element numeric iterable [optional] Time range in seconds to use for filtering time_slice in ODS :param zf: float Zoom factor for upsampling the equilibrium first to improve accuracy. Ignored unless > 1. :param maxstep: float Maximum step size in m Restraining step size should prevent flying off the true path :param minstep: float Minimum step size in m Prevent calculation from taking too long by forcing a minimum step size :param maxsteps: int Maximum number of steps allowed in path tracing. Protection against getting stuck in a loop. :param debug: bool Returns a dictionary with internal quantities instead of an array with the final answer. :return: float array Length of a path orthogonal to flux surfaces from (r0, z0) to LCFS, in meters. If grid=True: 2D float array (time by points) If grid=False: a 1D float array vs. time """ # Auto select timing eqt = ods['.equilibrium.time'] if time_range is None: time_slices = range(len(eqt)) else: time_slices = where((eqt >= time_range[0]) & (eqt <= time_range[1]))[0] # Sanitize/check inputs def check_input(condition, message=None): if not condition: raise OmasUtilsBadInput(message) r0 = np.atleast_1d(r0) z0 = np.atleast_1d(z0) check_input(len(r0) == len(z0), 'R (len={len(r0)}) and Z (len={len(z0)}) arrays must have the same length.') if not grid: check_input(len(r0) == len(time_slices), 'R (len={len(r0)}) must have same # of elements as time-slices.') # Prepare to catch output if grid: results = np.empty((len(time_slices), len(r0))) paths = [[None] * len(r0)] * len(time_slices) else: results = np.empty(len(time_slices)) paths = [None] * len(time_slices) results[:] = np.NaN # Process each time slice separately for i, time_slice in enumerate(time_slices): # Extract basic data eq = ods['.equilibrium.time_slice'][time_slice] psib = eq['global_quantities.psi_boundary'] psia = eq['global_quantities.psi_axis'] psi_ = eq['profiles_2d.0.psi'] rgrid_ = eq['profiles_2d.0.grid.dim1'] zgrid_ = eq['profiles_2d.0.grid.dim2'] # Optional upsample to improve accuracy if zf > 1: psi = scipy.ndimage.zoom(psi_, zf) rgrid = scipy.ndimage.zoom(rgrid_, zf) zgrid = scipy.ndimage.zoom(zgrid_, zf) else: psi = psi_ rgrid = rgrid_ zgrid = zgrid_ # Gradients and interpolation psin = (psi - psia) / (psib - psia) dpsi = np.gradient(psin) dpsidr = dpsi[0] / np.gradient(rgrid) dpsidz = dpsi[1] / np.gradient(zgrid) dri = interp2d(rgrid, zgrid, dpsidr.T) dzi = interp2d(rgrid, zgrid, dpsidz.T) psini = interp2d(rgrid, zgrid, psin.T) # Pick which points to consider for this time-slice if grid: eligible_r0 = r0 eligible_z0 = z0 else: eligible_r0 = np.atleast_1d(r0[i]) eligible_z0 = np.atleast_1d(z0[i]) # Process each point individually for j in range(len(eligible_r0)): # Prepare to step along the path of steepest gradient pathr = [eligible_r0[j]] pathz = [eligible_z0[j]] r = eligible_r0[j] z = eligible_z0[j] psin0 = psini(eligible_r0[j], eligible_z0[j]) signdir = -1 if psin0 > 1 else 1 # Step along the path for m in range(maxsteps): # Pick default step and direction rstep = signdir * dri(r, z) zstep = signdir * dzi(r, z) steplen = np.sqrt(rstep**2 + zstep**2) stepdir = np.arctan2(zstep, rstep)[0] # Apply stepsize limits if steplen > maxstep: steplen = maxstep if steplen < minstep: steplen = minstep # Take the step rstep = np.cos(stepdir) * steplen zstep = np.sin(stepdir) * steplen r += rstep z += zstep pathr += [r] pathz += [z] # Check to see if the LCFS has been reached if (psini(r, z) < 1) and (psin0 > 1): break elif (psini(r, z) > 1) and (psin0 < 1): break else: # Ran out of steps before reaching the LCFS printw( f"Didn't finish tracing back to LCFS: time-slice#{i} ({time_slice}), point#{j} " f"({eligible_r0[j], eligible_z0[j]}); stopped after {m} steps." ) if grid: results[i, j] = np.NaN else: results[i] = np.NaN # Measure path length pathdr = np.array(np.diff(pathr)) pathdz = np.array(np.diff(pathz)) pathds = np.sqrt(pathdr**2 + pathdz**2) if grid: results[i, j] = np.sum(pathds) paths[i][j] = (pathr, pathz) else: results[i] = np.sum(pathds) paths[i] = (pathr, pathz) if debug: return dict(results=results, paths=paths) return results
[docs]def load_sample_eq_specifications(ods, hrts_gate='any'): """ Loads a sample of equilibrium shape specifications under pulse_schedule :param ods: ODS instance Data go here :param hrts_gate: str One of the boundary points is for checking whether the boundary passes through the HRTS range. But DIII-D's HRTS can be reconfigured to handle three different ranges! So you can select 'top': Relevant when the HRTS channels are positioned high (this might be the default position) 'mid' 'low' 'any': goes from the bottom of the 'low' range to the top of the 'top' range. """ pc = ods['pulse_schedule.position_control'] ref_type = 1 # 1 is absolute, 0 is relative. Not sure what that means, but 1 seems more likely to be relevant. timing = [0, 10] # Central time range to cover a typical DIII-D shot nt = len(timing) pc['time'] = timing # "Boundary" points that are actually on the divertor leg in the example (no where else to put them yet) pc['boundary_outline'][0]['r.reference_type'] = ref_type pc['boundary_outline'][0]['r.reference_name'] = 'outer div leg angle enforcer' pc['boundary_outline'][0]['r.reference'] = [ufloat(1.4966, 0.001)] * nt pc['boundary_outline'][0]['z'] = copy.deepcopy(pc['boundary_outline'][0]['r']) pc['boundary_outline'][0]['z.reference'] = [ufloat(1.2293, 0.001)] * nt # X points pc['x_point'][0]['r.reference_name'] = 'primary' pc['x_point'][0]['r.reference_type'] = ref_type pc['x_point'][0]['r.reference'] = [ufloat(1.338, 0.02)] * nt pc['x_point'][0]['z'] = copy.deepcopy(pc['x_point'][0]['r']) pc['x_point'][0]['z.reference'] = [ufloat(1.019, 0.02)] * nt # Strike points pc['strike_point'][0]['r.reference_name'] = 'outer primary' pc['strike_point'][0]['r.reference_type'] = ref_type pc['strike_point'][0]['r.reference'] = [ufloat(1.503, 0.001)] * nt pc['strike_point'][0]['z'] = copy.deepcopy(pc['strike_point'][0]['r']) pc['strike_point'][0]['z.reference'] = [ufloat(1.237, 0.001)] * nt pc['strike_point'][1]['r'] = copy.deepcopy(pc['strike_point'][0]['r']) pc['strike_point'][1]['r.reference_name'] = 'inner primary' pc['strike_point'][1]['r.reference'] = [ufloat(1.016, 0.2)] * nt pc['strike_point'][1]['z'] = copy.deepcopy(pc['strike_point'][1]['r']) pc['strike_point'][1]['z.reference'] = [ufloat(1.109, 0.2)] * nt # Gates pc['flux_gates'][0]['name'] = 'slot entrance' pc['flux_gates'][0]['r1'] = 1.486 pc['flux_gates'][0]['r2'] = 1.404 pc['flux_gates'][0]['z1'] = 1.117 pc['flux_gates'][0]['z2'] = 1.157 pc['flux_gates'][0]['applies_to_psin'] = 1.0 pc['flux_gates'][0]['applies_to_dr_mid'] = 0.005 i = 1 hrts_gates = dict(top=[0.65, 0.74], mid=[0.57, 0.73], low=[0.47, 0.67]) hrts_gates['any'] = [hrts_gates['low'][0], hrts_gates['top'][1]] hrts_zrange = hrts_gates[hrts_gate] pc['flux_gates'][i]['name'] = f'HRTS gate {hrts_gate}' pc['flux_gates'][i]['r1'] = 1.94 pc['flux_gates'][i]['r2'] = 1.94 pc['flux_gates'][i]['z1'] = hrts_zrange[0] pc['flux_gates'][i]['z2'] = hrts_zrange[1] pc['flux_gates'][i]['time_range'] = [1.5, 4.9]
def omas_eq1d_pretty_name(quantity, include_units=True): """ Translates ODS tag names into pretty names that are suitable for use in plot axes labels :param quantity: str Key within equilibrium.time_slice.:.profiles_1d :param include_units: bool Include units in the return string :return: str """ if quantity == 'psi_norm': label = r'$\psi_N$' units = '' elif quantity == 'dpressure_dpsi': label = r'$d_{\psi} p$' units = 'Pa Wb$^{-1}$' elif quantity == 'f_df_dpsi': label = r'$F d_{\psi} F$' units = 'T$^2$ m$^2$ Wb$^{-1}$' elif quantity == 'rho_tor_norm': label = r'$\rho_N$' units = '' elif quantity == 'r_outboard': label = '$R_{OMP}$' units = 'm' elif quantity == 'r_inboard': label = '$R_{IMP}$' units = 'm' else: info = omas_info_node(f'equilibrium.time_slice.:.profiles_1d.{quantity}') label = info.get('documentation', quantity).split('(')[0] if 'w.r.t.' in label: label = label.replace('w.r.t.', 'WwRrTt') label = label.split('.')[0] label = label.replace('WwRrTt', 'w.r.t.') else: label = label.split('.')[0] units = info.get('units', '') if units == '-': units = '' if units and include_units: label += f' / {units}' return label def omfit_omas_bootstrap(ods, version='neo_2021', time_index=0): """ This function calculates bootrap current from ods. """ gEQDSK = OMFITgeqdsk('tmp_gfile').from_omas(ods=ods) eqrho = gEQDSK['RHOVN'] coordsio = {'core_profiles.profiles_1d.%d.grid.rho_tor_norm' % time_index: eqrho} with omas_environment(ods, cocosio=gEQDSK.cocos, coordsio=coordsio): prof1d = ods['core_profiles']['profiles_1d'][time_index] psin = copy.deepcopy(prof1d['grid']['psi']) psin = psin - psin[0] psin = psin / psin[-1] Te = prof1d['electrons']['temperature'] Ti = prof1d['ion'][0]['temperature'] ne = prof1d['electrons']['density'] Zeff = prof1d['zeff'] pressure = prof1d['electrons']['pressure'] + prof1d['pressure_ion_total'] ngrid = len(psin) ion_num = len(prof1d['ion']) Te = Te.reshape(1, ngrid) Ti = Ti.reshape(1, ngrid) ne = ne.reshape(1, ngrid) Zeff = Zeff.reshape(1, ngrid) pressure = pressure.reshape(1, ngrid) nis = np.array([ods['core_profiles.profiles_1d[0].ion'][i]['density_thermal'] for i in ods['core_profiles.profiles_1d[0].ion']]) nis = nis.reshape(1, ion_num, ngrid) Zis = np.array([ods['core_profiles.profiles_1d[0].ion'][i]['element[0].z_n'] for i in ods['core_profiles.profiles_1d[0].ion']]) jboot = sauter_bootstrap(psi_N=psin, Te=Te, Ti=Ti, ne=ne, p=pressure, nis=nis, Zis=Zis, gEQDSKs=gEQDSK, version=version)[0, :] bcent = ods['equilibrium.vacuum_toroidal_field.b0'][time_index] prof1d['j_bootstrap'] = jboot / bcent if np.sign(sum(prof1d['j_bootstrap'])) != np.sign(sum(prof1d['j_total'])): prof1d['j_bootstrap'] *= -1 return ods
[docs]class GradeEquilibrium: """ Grade's an equilibrium shape's conformity to specified shape references """ def __init__(self, ods, debug_out=None): """ :param ods: ODS instance :param debug_out: dict-like [optional] Provide a place to store debugging output, or provide None to disable """ self.ods = ods self.results = dict() self.debug_out = debug_out self.zoomed_eq_results = dict() self.hires_contour_results = dict() self.units = 'm' # Units for spatial coordinates TODO read from OMAS instead of assuming self.fatline = rcParams['lines.linewidth'] * 1.25 + 1.25 self.bigmark = 25 self.bad_style = {'fontstyle': 'italic', 'fontweight': 'bold'} self.bad_alpha = 0.2 try: from skimage import measure except Exception: self.have_skimage = False else: self.have_skimage = True self.grade()
[docs] def printdq(self, *args, **kw): kw.setdefault('topic', 'GradeEquilibrium') printd(*args, **kw)
[docs] def zoomed_eq(self, slice_index=0, zoom=7): """ Returns an upsampled equilibrium, as may be needed for some methods :param slice_index: int Index of the time slice of the equilibrium :param zoom: zoom / upscaling factor :return: tuple 1D float array: r values of the grid 1D float array: z values of the grid 2D float array: psi_N at (r, z) """ tag = f'{slice_index}_{zoom:0.3f}' if tag in self.zoomed_eq_results: return self.zoomed_eq_results[tag] eq = self.ods['equilibrium']['time_slice'][slice_index] psia = eq['global_quantities']['psi_axis'] psib = eq['global_quantities']['psi_boundary'] psi = eq['profiles_2d'][0]['psi'] r = eq['profiles_2d'][0]['grid']['dim1'] z = eq['profiles_2d'][0]['grid']['dim2'] psin = (psi - psia) / (psib - psia) rzoom = scipy.ndimage.zoom(r, zoom) zzoom = scipy.ndimage.zoom(z, zoom) psinzoom = scipy.ndimage.zoom(psin, zoom) self.zoomed_eq_results[tag] = (rzoom, zzoom, psinzoom) return rzoom, zzoom, psinzoom
[docs] def hires_contour_sk(self, slice_index=0, psin=1, zoom=10): """ Returns r, z points along a high resolution contour at some psi_N value Requires skimage package, which isn't required by OMFIT and may not be available. THIS IS BETTER IF IT IS AVAILABLE! :param slice_index: int Index of the time slice of the equilibrium :param psin: float psi_N of the desired contour :param zoom: float zoom / upscaling factor :return: list Each element is a section of the contour, which might not be connected (e.g. different curve for PFR) Each element is a 2D array, where [:, 0] is R and [:, 1] is Z """ from skimage import measure r, z, psinmap = self.zoomed_eq(slice_index=slice_index, zoom=zoom) ridx = np.arange(len(r)) zidx = np.arange(len(z)) contours_raw = measure.find_contours(psinmap, psin) contours_rz = [array([interp1d(ridx, r)(cr[:, 0]), interp1d(zidx, z)(cr[:, 1])]).T for cr in contours_raw] return contours_rz
[docs] def hires_contour_omf(self, slice_index=0, psin=1, zoom=10): """ Returns r, z points along a high resolution contour at some psi_N value Uses OMFIT util functions. 10-50% slower than skimage, but doesn't require external dependencies. skimage is better if you have it! :param slice_index: int Index of the time slice of the equilibrium :param psin: float psi_N of the desired contour :param zoom: float zoom / upscaling factor :return: list Each element is a section of the contour, which might not be connected (e.g. different curve for PFR) Each element is a 2D array, where [:, 0] is R and [:, 1] is Z """ r, z, psinmap = self.zoomed_eq(slice_index=slice_index, zoom=zoom) contours_rz = contourPaths(r, z, psinmap.T, levels=np.atleast_1d(psin), smooth_factor=1) contours_rz = [crz.vertices for crz in contours_rz[0]] return contours_rz
[docs] def hires_contour(self, slice_index=0, psin=1, zoom=10): """Wrapper for picking whether to use omfit or skimage version""" tag = f'{slice_index}_{zoom:0.3f}_{psin:0.3f}' if tag in self.hires_contour_results: return self.hires_contour_results[tag] if self.have_skimage: result = self.hires_contour_sk(slice_index=slice_index, zoom=zoom, psin=psin) else: result = self.hires_contour_omf(slice_index=slice_index, zoom=zoom, psin=psin) self.hires_contour_results[tag] = result return result
[docs] def find_midplane(self, slice_index): """ Finds the intersection of the boundary with the outboard midplane :param slice_index: int :return: (float, float) R, Z coordinates of the LCFS at the outboard midplane """ rmaxis = self.ods['equilibrium']['time_slice'][slice_index]['global_quantities']['magnetic_axis']['r'] zmaxis = self.ods['equilibrium']['time_slice'][slice_index]['global_quantities']['magnetic_axis']['z'] contours = self.hires_contour(slice_index=slice_index, psin=1) # Find the contour section that crosses the midplane lowz = np.array([abs(contour[:, 1]).min(0) for contour in contours]) contour = np.array(contours)[np.argmin(lowz)] sel = (contour[:, 0] > rmaxis) & ((abs(contour[:, 1] - zmaxis)) < 0.25) rmidout = interp1d(contour[sel, 1], contour[sel, 0])(zmaxis) return rmidout, zmaxis
[docs] def find_fluxsurface(self, slice_index=0, midplane_dr=0.005): """ Finds contour segments of a flux surface outboard of the midplane by a specific amount :param slice_index: int :param midplane_dr: float :return: list of 2D arrays Sections of the contour for the flux surface The contour may have some disconnected regions in general, but if it is all simply connected, there will be only one element in the list with a single 2D array. """ rmidout, zmaxis = self.find_midplane(slice_index=slice_index) r, z, psin = self.zoomed_eq(slice_index=slice_index) psin_val = interp2d(r, z, psin.T)(rmidout + midplane_dr, zmaxis)[0] contours = self.hires_contour(slice_index=slice_index, psin=psin_val) return contours
[docs] def new_fig(self, slice_index=0): """Prepares a new figure and set of axes with aspect ratio and cornernote set up""" fig, ax = plt.subplots(1) ax.set_aspect('equal') ax.set_adjustable('box') de = self.ods['dataset_description.data_entry'] eq = self.ods['equilibrium.time_slice'][slice_index] cornernote(device=de.get('machine', ''), shot=de.get('pulse', ''), time=eq['time'] * 1000) return fig, ax
[docs] def plot_hires_eq(self, slice_index=0, ax=None, psin=None, r_rsep=None, track=None, **plotkw): """ Plots the high resolution contours generated for the grading process. Can be used with `track` to track whether a flux surface is already shown to avoid redundant overlays of the same thing. :param slice_index: int :param psin: float :param r_rsep: float :param ax: Axes instance :param track: dict-like [optional] Keeps track of what's already plotted to avoid redundant overlays of the same flux surfaces :param plotkw: extra keywords will be passed to plot """ if track is not None and psin is not None and psin in track['p_contours_shown']: self.printdq(f'This contour at psin={psin} was already shown and will be skipped') return None if track is not None and r_rsep is not None and r_rsep in track['r_contours_shown']: self.printdq(f'This contour at R-Rsep={r_rsep} was already shown and will be skipped') return None if psin is not None: contours = self.hires_contour(slice_index=slice_index, psin=psin) track['p_contours_shown'] += [psin] plotkw.setdefault('label', rf'$\psi_N={psin:0.3f}$') elif r_rsep is not None: contours = self.find_fluxsurface(slice_index=slice_index, midplane_dr=r_rsep) track['r_contours_shown'] += [r_rsep] plotkw.setdefault('label', rf'$R-R_{{sep}}={r_rsep:0.3f}$ {self.units}') else: self.printdq('No contours specified; skipping plot') return None if ax is None: fig, ax = self.new_fig(slice_index=slice_index) plotkw.setdefault('color', 'b') for contour in contours: ax.plot(contour[:, 0], contour[:, 1], **plotkw) return ax
[docs] def plot_flux_gates(self, slice_index=0, ax=None, track=None): """ Plots locations of flux gates :param slice_index: int :param ax: Axes instance :param track: dict-like [optional] """ if ax is None: fig, ax = self.new_fig(slice_index=slice_index) for fgi in self.ods['pulse_schedule.position_control.flux_gates']: g = self.ods['pulse_schedule.position_control.flux_gates'][fgi] name = g['name'] rsurf = g.get('applies_to_dr_mid', []) psurf = g.get('applies_to_psin', [] if rsurf else 1) tr = g.get('time_range') eqt = self.ods['equilibrium.time_slice'][slice_index]['time'] if (tr is not None) and (not (tr[0] <= eqt <= tr[1])): self.printdq( f'Flux gate {name} applies to time range {tr} s; slice {slice_index} at {eqt} s is ' f'outside of this range. This gate will be skipped on the plot.' ) continue lsurf1 = [rf'$\psi_N$={psurf}'] if len(np.atleast_1d(psurf)) else [] lsurf2 = [rf'$R-R_{{sep}}$={rsurf} {self.units}'] if len(np.atleast_1d(rsurf)) else [] lsurf = ', '.join(lsurf1 + lsurf2) # Make sure the relevant contours are shown cplot_kw = {'color': 'r', 'linestyle': '--', 'lw': 1} for p in np.atleast_1d(psurf): self.plot_hires_eq(slice_index=slice_index, ax=ax, track=track, psin=p, **cplot_kw) for r in np.atleast_1d(rsurf): self.plot_hires_eq(slice_index=slice_index, ax=ax, track=track, r_rsep=r, **cplot_kw) # Decide how well the gate was respected if 'flux_gates' in self.results: thru_gate = [v for k, v in self.results['flux_gates'][slice_index].items() if name in k] if len(thru_gate): thru_gate = sum(thru_gate) / float(len(thru_gate)) else: thru_gate = np.NaN else: thru_gate = None if thru_gate is None: marker = 's' conclusion = '' elif thru_gate == 1: marker = 'D' conclusion = ' (good)' elif thru_gate == 0: marker = '+' conclusion = ' (missed)' else: marker = 'x' conclusion = f' ({1 - thru_gate}% missed)' # Do the actual plot tmp = ax.plot([g['r1'], g['r2']], [g['z1'], g['z2']], marker=marker, linestyle='-', lw=0.2, alpha=0.7) ax.text( g['r1'], g['z1'], f' {name}{conclusion} (for {lsurf})', ha='left', va='top', color=tmp[0].get_color(), alpha=1 if thru_gate else self.bad_alpha, fontdict=None if thru_gate else self.bad_style, ) return ax
[docs] def plot_boundary_points(self, slice_index=0, ax=None): """ Plots reference or target points along the boundary outline :param slice_index: int :param ax: Axes instance """ if ax is None: fig, ax = self.new_fig(slice_index=slice_index) t = self.ods['equilibrium.time_slice'][slice_index]['time'] theta = np.linspace(0, 2 * np.pi, 61) d = 0.12 pc = self.ods['pulse_schedule.position_control'] # Combine boundary points and strike points bps = [ self.ods['pulse_schedule.position_control.boundary_outline'][bpi] for bpi in self.ods['pulse_schedule.position_control.boundary_outline'] ] sps = [ self.ods['pulse_schedule.position_control.strike_point'][spi] for spi in self.ods['pulse_schedule.position_control.strike_point'] ] is_strike = [False] * len(bps) + [True] * len(sps) bps += sps # Loop over boundary and strike points and deal with them for i, bp in enumerate(bps): bpn = bp['r.reference_name'] + (' (strike point)' if is_strike[i] else '') if 'boundary_points' in self.results: norm_dist = self.results['boundary_points'][slice_index].get(bpn, None) else: norm_dist = None if norm_dist is None: marker = '*' conclusion = '' fontdict = None alpha = 1 elif norm_dist <= 1: marker = '.' conclusion = f' {norm_dist*100:0.1f}%' fontdict = None alpha = 1 else: marker = 'x' conclusion = f' {norm_dist*100:0.1f}%' fontdict = self.bad_style alpha = self.bad_alpha rref = bp['r.reference'] zref = bp['z.reference'] r = interp1d(pc['time'], nominal_values(rref), bounds_error=False, fill_value=np.NaN)(t) z = interp1d(pc['time'], nominal_values(zref), bounds_error=False, fill_value=np.NaN)(t) r_err = interp1d(pc['time'], std_devs(rref), bounds_error=False, fill_value=np.NaN)(t) z_err = interp1d(pc['time'], std_devs(zref), bounds_error=False, fill_value=np.NaN)(t) # Mark the center of the point mark1 = ax.plot(r, z, marker=marker, alpha=0.7) # Ellipse with size indicating tolerance ax.plot(r + r_err * np.cos(theta), z + z_err * np.sin(theta), color=mark1[0].get_color(), alpha=alpha) # Crosshairs, to help find very small ellipses ax.plot(r + np.array([-d, d, np.NaN, 0, 0]), z + np.array([0, 0, np.NaN, -d, d]), color=mark1[0].get_color(), alpha=alpha) ax.text( r, z, f'{bpn}{conclusion}', va='bottom', ha='left' if 'angle_enforcer' in bpn else 'right', color=mark1[0].get_color(), fontdict=fontdict, alpha=alpha, ) return ax
[docs] def plot_x_points(self, slice_index=0, ax=None): """ Plots X-points and reference X-points :param slice_index: int :param ax: Axes instance """ if ax is None: fig, ax = self.new_fig(slice_index=slice_index) theta = np.linspace(0, 2 * np.pi, 61) d = 0.12 # Mark the measured X-points t = self.ods['equilibrium.time_slice'][slice_index]['time'] for eqxi in self.ods['equilibrium']['time_slice'][slice_index]['boundary']['x_point']: eqx = self.ods['equilibrium']['time_slice'][slice_index]['boundary']['x_point'][eqxi] ax.plot(eqx['r'], eqx['z'], marker='x', mew=self.fatline, markersize=self.bigmark, zorder=0, alpha=0.5) # Loop through reference X-points and deal with them pc = self.ods['pulse_schedule.position_control'] for xpi in pc['x_point']: xp = pc['x_point'][xpi] xpn = xp['r.reference_name'] if 'x_points' in self.results: norm_dist = self.results['x_points'][slice_index].get(xpn, None) else: norm_dist = None if norm_dist is None: marker = '*' conclusion = '' fontdict = None alpha = 1 elif norm_dist <= 1: marker = '+' conclusion = f' {norm_dist*100:0.1f}%' fontdict = None alpha = 1 else: marker = 'x' conclusion = f' {norm_dist*100:0.1f}%' fontdict = self.bad_style alpha = self.bad_alpha rref = xp['r.reference'] zref = xp['z.reference'] r = interp1d(pc['time'], nominal_values(rref), bounds_error=False, fill_value=np.NaN)(t) z = interp1d(pc['time'], nominal_values(zref), bounds_error=False, fill_value=np.NaN)(t) r_err = interp1d(pc['time'], std_devs(rref), bounds_error=False, fill_value=np.NaN)(t) z_err = interp1d(pc['time'], std_devs(zref), bounds_error=False, fill_value=np.NaN)(t) mark1 = ax.plot(r, z, marker=marker, alpha=0.7, markersize=self.bigmark) ax.plot(r + r_err * np.cos(theta), z + z_err * np.sin(theta), color=mark1[0].get_color(), alpha=alpha, lw=self.fatline) ax.plot(r + np.array([-d, d, np.NaN, 0, 0]), z + np.array([0, 0, np.NaN, -d, d]), color=mark1[0].get_color(), alpha=alpha) ax.text( r, z, f'{xpn} X-point{conclusion}', va='bottom', ha='left' if 'angle_enforcer' in xpn else 'right', color=mark1[0].get_color(), fontdict=fontdict, alpha=alpha, ) return ax
[docs] def plot(self, slice_index=0, ax=None): """ Plots the equilibrium cross section with shape targets marked Instead of the standard equilibrium cross section plot supplied by OMAS, the high resolution contours generated for the grading process are shown. :param slice_index: int Which slice should be shown? :param ax: Axes instance Plot on these axes, if supplied. Otherwise, create a new figure and set up cornernote and axes scale. """ track = {'p_contours_shown': [], 'r_contours_shown': []} # Set up axes, if needed if ax is None: fig, ax = self.new_fig(slice_index=slice_index) # General overlays and annotations rwall = self.ods['wall']['description_2d'][0]['limiter']['unit'][0]['outline']['r'] zwall = self.ods['wall']['description_2d'][0]['limiter']['unit'][0]['outline']['z'] ax.plot(rwall, zwall, color='k') rmidout, zmaxis = self.find_midplane(slice_index=slice_index) ax.plot(rmidout, zmaxis, marker='h', color='b', label='Outboard midplane') # Plot overlays for specific types of constraints self.plot_hires_eq(slice_index=slice_index, ax=ax, psin=1, track=track) self.plot_flux_gates(slice_index=slice_index, ax=ax, track=track) self.plot_boundary_points(slice_index=slice_index, ax=ax) self.plot_x_points(slice_index=slice_index, ax=ax) ax.legend(loc=0, numpoints=1) return ax
def __call__(self): if not hasattr(self, 'results'): self.grade() return self.results
[docs] def grade(self, simple_x_point_mapping=True): """ Primary method for grading equilibrium shape conformity to reference shape. Results are placed in self.results and also returned :param simple_x_point_mapping: bool True: Use simple mapping of measured X-points to targets: just find closest X-point to each target. False: Try hard to sort X-points into primary (psin=1) and secondary (psin > 1) and only compare primary targets to primary measurements. :return: dict Dictionary of results, with one key per category """ self.results['boundary_points'] = self.grade_boundary_points() self.results['x_points'] = self.grade_x_points(simple_map=simple_x_point_mapping) self.results['flux_gates'] = self.grade_gates() return self.results
[docs] def grade_x_points(self, improve_xpoint_measurement=True, simple_map=True, psin_tolerance_primary=5e-4): """ Grades conformity of X-point position(s) to specifications :param improve_xpoint_measurement: bool :param simple_map: bool For each target, just find the closest measured X-point and compare. Requires at least as many measurements as targets and raises OmasUtilsBadInput if not satisfied. :param psin_tolerance_primary: float Tolerance in psin for declaring an X-point to be on the primary separatrix :return: list of dicts One list element per time-slice. Within each dictionary, keys give labels for X-point targets, and values give normalized distance errors. """ from omfit_classes.omfit_eqdsk import x_point_search if self.debug_out is not None: out = self.debug_out[f'grade_x_points_{"simple" if simple_map else "map"}'] = self.debug_out.__class__() out['zoomed_eq_sample'] = self.zoomed_eq(slice_index=0) else: out = None # Make sure inputs are ready and abort early if not if len(self.ods['pulse_schedule.position_control.x_point']) < 1: self.printdq('No X-points references are defined in specifications. Skip grading X-points.') return [] # Get measured X-points t = self.ods['.equilibrium.time'] xr = copy.deepcopy(self.ods['equilibrium.time_slice.:.boundary.x_point.:.r']) xz = copy.deepcopy(self.ods['equilibrium.time_slice.:.boundary.x_point.:.z']) nxm = len(xr[0, :]) self.printdq(' Processing measured X-point positions...') if improve_xpoint_measurement: rr = copy.copy(xr) zr = copy.copy(xz) for i in range(len(t)): rg, zg, pzoom = self.zoomed_eq(slice_index=i) if out is not None: out['rg'] = rg out['zg'] = zg out[f'pzoom{i}'] = pzoom for j in range(nxm): psin_at_x = interp2d(rg, zg, pzoom.T)(xr[i, j], xz[i, j]) xr_, xz_ = x_point_search(rg, zg, pzoom.T, r_center=rr[i, j], z_center=zr[i, j], zoom=3) if np.isnan(xr_) or np.isnan(xz_): self.printdq( f' Did not find a primary xpoint near {xr[i, j]}, {xz[i, j]}; ' f'setting psi_boundary_weight=0 and trying again.' ) xr_, xz_ = x_point_search(rg, zg, pzoom.T, r_center=rr[i, j], z_center=zr[i, j], zoom=3, psi_boundary_weight=0) if ~np.isnan(xr_) and ~np.isnan(xz_): xr[i, j], xz[i, j] = xr_, xz_ self.printdq( f' xpointsearch {i}, {j} centered at {rr[i, j]}, {zr[i, j]} ' f'(psin = {psin_at_x} here) found xpoint at {xr[i, j]}, {xz[i, j]}' ) # Get target X-points pc = self.ods['pulse_schedule.position_control'] tt = pc['time'] xrtd = nominal_values(pc['x_point.:.r.reference']).T nxt = len(xrtd[0, :]) xrte = std_devs(pc['x_point.:.r.reference']).T ikw0 = dict(bounds_error=False, fill_value=np.NaN, axis=0) xrt = interp1d(tt, xrtd, **ikw0)(t) xrt_err = interp1d(tt, xrte, **ikw0)(t) xztd = nominal_values(pc['x_point.:.z.reference'].T) xzte = std_devs(pc['x_point.:.z.reference'].T) xzt = interp1d(tt, xztd, **ikw0)(t) xzt_err = interp1d(tt, xzte, **ikw0)(t) nt = len(t) if self.debug_out is not None: self.printdq(f'tt = {tt}') out['t'] = t out['nxt'] = nxt out['tt'] = tt out['xrtd'] = xrtd out['xztd'] = xrtd out['xrt'] = xrt out['xzt'] = xzt out['xr'] = xr out['xz'] = xz if simple_map: # Mapping X-points is hard! Let's just pick the closest measurement to each target. if nxt > nxm: raise OmasUtilsBadInput("Simple mapping requires at least as many measured X-points as targets.") dxr = np.nanmin(abs(xr[:, :, np.newaxis] - xrt[:, np.newaxis, :]), axis=1) dxz = np.nanmin(abs(xz[:, :, np.newaxis] - xzt[:, np.newaxis, :]), axis=1) else: # Now we must map which measured X-point goes with which target. They might not all have targets. # Map target X-points. There can be more than one on a surface in case of double null or snowflake. primary_xt = [] secondary_xt = [] unidentified_xt = list(range(nxt)) for i in copy.copy(unidentified_xt): x = pc['x_point'][i] named_primary = 'primary' in repr(x['r.reference_name']).lower() + repr(x['z.reference_name']).lower() if named_primary: primary_xt += [i] unidentified_xt.remove(i) for i in copy.copy(unidentified_xt): x = pc['x_point'][i] named_sec = 'secondary' in repr(x['r.reference_name']).lower() + repr(x['z.reference_name']).lower() if named_sec: secondary_xt += [i] unidentified_xt.remove(i) if len(unidentified_xt) > 0 and nxt == 1: # There is only one target, so it's pretty safe to assume it's for the primary x-point. # If you don't like this assumption, name your single X-point with "secondary" in reference_name. primary_xt += [0] unidentified_xt.remove(0) if len(unidentified_xt) > 0: printw( 'WARNING! Failed to identify all the X-point targets as primary or secondary. ' 'The unidentified ones might be treated as secondary.' ) # Map measured X-points. It's trickier since mapping between primary/secondary & top/bottom can switch. is_primary = np.zeros((nt, nxm), bool) for it in range(nt): # Get normalized psi values interpolated from the upsampled flux map to the X-point locations rgrid, zgrid, psingrid = self.zoomed_eq(slice_index=0) psin_x = np.array([interp2d(rgrid, zgrid, psingrid.T)(xr[it, j], xz[it, j])[0] for j in range(len(xr[it, :]))]) # Could a given X-point be the primary? Must be very close to psin = 1 close_psin = abs(psin_x - 1) < psin_tolerance_primary smallest_delta_this_slice = np.nanmin(abs(psin_x - 1)) # Could there be more than one primary X-point (balanced DN or snowflake)? # To qualify, the two X-points would have to be about the same distance from psin = 1 close_to_minimum_delta = (abs(psin_x - 1) / smallest_delta_this_slice) < 1.2 # What if the errors go in opposite directions? As in, one X-point's psin estimate is slightly < 1? close_to_minimum_psin_x = (psin_x - np.nanmin(psin_x)) < psin_tolerance_primary if self.debug_out is not None: out[f'psin_x_{it}'] = psin_x out[f'close_psin_{it}'] = close_psin out[f'smallest_delta_this_slice_{it}'] = smallest_delta_this_slice out[f'close_to_minimum_delta_{it}'] = close_to_minimum_delta out[f'close_to_minimum_psin_x_{it}'] = close_to_minimum_psin_x is_primary[it, :] = close_psin & close_to_minimum_delta & close_to_minimum_psin_x # Now do the comparison, but only primary targets vs primary X-points, and only secondary vs secondary. target_primary = np.zeros((nt, nxt), bool) for i in primary_xt: target_primary[:, i] = True compare_okay = is_primary[:, :, np.newaxis] == target_primary[:, np.newaxis, :] bigboy = 1e8 dxr = np.nanmin(abs(xr[:, :, np.newaxis] - xrt[:, np.newaxis, :]) + ~compare_okay * bigboy, axis=1) dxz = np.nanmin(abs(xz[:, :, np.newaxis] - xzt[:, np.newaxis, :]) + ~compare_okay * bigboy, axis=1) dxr[dxr >= bigboy] = np.NaN dxz[dxz >= bigboy] = np.NaN if self.debug_out is not None: out['compare_okay'] = compare_okay out['is_primary'] = is_primary out['target_primary'] = target_primary out['dxr'] = dxr out['dxz'] = dxz real_dist = np.sqrt(dxr**2 + dxz**2) no_norm = np.nanmax(abs(xrt_err)) == 0 or np.nanmax(abs(xzt_err == 0)) if no_norm: printw( "All X-point R or Z position uncertainties were 0; skipping normalization to " "uncertainty/tolerance. Errors will be reported in data units instead. Now you can't " "compare errors with different units: remember that you've done this to yourself." ) norm_dist = real_dist else: norm_dist = np.sqrt((dxr / xrt_err) ** 2 + (dxz / xzt_err) ** 2) for it in range(nt): eqt = self.ods['equilibrium.time'][it] print(f' Conformity of X-point positions to specs for equilibrium slice {it} at t={eqt:0.3f} s:') for ix in range(nxt): xpt = self.ods['pulse_schedule.position_control.x_point'][ix] xpn = xpt['r.reference_name'] if no_norm: print(f' The error at X-point {xpn} is {real_dist[it, ix]:0.3f} {self.units}') else: print( f' The error at X-point {xpn} is {norm_dist[it, ix] * 100:0.1f}% of tolerance: ' f'{"okay" if norm_dist[it, ix] < 1 else "BAD!!"}' ) if self.debug_out is not None: out['norm_dist'] = norm_dist out['real_dist'] = real_dist out['xrt_err'] = xrt_err out['xzt_err'] = xzt_err return [ {pc['x_point'][j]['r.reference_name']: norm_dist[i, j] for j in range(len(norm_dist[0, :]))} for i in range(len(norm_dist[:, 0])) ]
[docs] def grade_boundary_points(self): """ Grades proximity of boundary to target boundary points We need a high resolution flux surface contour for each time slice, so I think we have to loop and upsample each one individually. """ results = [] for slice_index in self.ods['equilibrium']['time_slice']: t = self.ods['equilibrium']['time_slice'][slice_index]['time'] print(f' Checking boundary points for equilibrium slice {slice_index} at t={t}') results += [self.grade_boundary_points_slice(slice_index=slice_index)] return results
[docs] def grade_boundary_points_slice(self, slice_index=0): """ Grades proximity of boundary to target boundary points for a specific time-slice of the equilibrium Strike points are included and treated like other boundary points (except they get a note appended to their name when forming labels). There can be differences in the exact wall measurements (such as attempts to account for toroidal variation, etc.) that cause a strike point measurement or specification to not be on the reported wall. Since I can't think of a way to enforce consistent interpretation of where the wall is between the strike point specification and the measured equilibrium, I've settled for making sure the boundary goes through the specified strike point. This could go badly if the contour segment passing through the specified strike point were disconnected from the core plasma, such as if it limited or something. I'm hoping to get away with this for now, though. :param slice_index: int Number of the time-slice to consider for obtaining the LCFS. """ results = {} contours = self.hires_contour(slice_index=slice_index) pc = self.ods['pulse_schedule.position_control'] the_boundary_points = [pc['boundary_outline'][i] for i in pc['boundary_outline']] the_strike_points = [pc['strike_point'][i] for i in pc['strike_point']] the_points = the_boundary_points + the_strike_points is_strike = [False] * len(the_boundary_points) + [True] * len(the_strike_points) for i in range(len(the_points)): bpn = the_points[i]['r.reference_name'] + (' (strike point)' if is_strike[i] else '') self.printdq(f' Now checking boundary point {bpn}...') bpr = the_points[i]['r.reference'] bpz = the_points[i]['z.reference'] # Get normalization constants bpr_err = std_devs(bpr) bpz_err = std_devs(bpz) both_zero = (bpr_err == 0) & (bpz_err == 0) bpr_err[both_zero] = 1 bpz_err[both_zero] = 1 if any(both_zero): printw( f'Warning: uncertainties (used as tolerances) are 0 for {bpn} for {sum(both_zero)} points ' f'out of {len(bpr_err)}. Errors are not being normalized to tolerances for these points.' ) no_norm = True else: no_norm = False bpr_zero = (bpr_err == 0) & (bpz_err != 0) # Make the R tolerance non-zero but much smaller than Z tolerance bpr_err[bpr_zero] = bpz_err[bpr_zero] / 1e16 bpz_zero = (bpz_err == 0) & (bpr_err != 0) bpz_err[bpz_zero] = bpr_err[bpz_zero] / 1e16 norm_dist = [] for c_sec in contours: # Assume the boundary is high enough resolution that we can do distance the lazy way # min_dist = np.sqrt(np.nanmin((c_sec[:, 0] - bp['r']) ** 2 + (c_sec[:, 1] - bp['z']) ** 2)) # I don't trust the lazy way, so do it nicely norm_dist2 = [] for j in range(len(c_sec[:, 0]) - 1): # Do the distance to line in normalized coordinates instead of data units cpr, cpz = point_to_line( nominal_values(bpr) / bpr_err, nominal_values(bpz) / bpz_err, c_sec[j, 0] / bpr_err, c_sec[j, 1] / bpz_err, c_sec[j + 1, 0] / bpr_err, c_sec[j + 1, 1] / bpz_err, return_closest_point=True, ) # Normalized distances for this point to each segment of this section of this contour ndr = nominal_values(bpr) / bpr_err - cpr ndz = nominal_values(bpz) / bpz_err - cpz norm_dist2 += [np.sqrt(ndr**2 + ndz**2)] # Normalized distance for this point to the closest segment of this section of this contour norm_dist += [np.nanmin(norm_dist2)] # Normalized distance for this point to the closest segment of the closest section of this contour # AKA minimum distance from this point to any part of the contour. norm_dist = np.min(norm_dist) if no_norm: data_units = 'm' # TODO: read this from OMAS print(f' The error at point {bpn} is {norm_dist} {data_units}') else: print( f' The error at point {bpn} is {norm_dist * 100:0.1f}% of tolerance: ' f'{"acceptable." if norm_dist < 1 else "BAD!"}' ) results[f'{bpn}'] = norm_dist return results
[docs] def grade_gates(self): """Grades boundary on passing through 'gates': pairs of points""" results = [] for slice_index in self.ods['equilibrium']['time_slice']: t = self.ods['equilibrium']['time_slice'][slice_index]['time'] print(f' Checking boundary gates for equilibrium slice {slice_index} at t={t}') results += [self.grade_gates_slice(slice_index=slice_index)] return results
[docs] def grade_gates_slice(self, slice_index=0): """Grades flux surfaces on passing through 'gates' (pairs of points) at a specific time-slice""" results = {} pc = self.ods['pulse_schedule.position_control'] for i in pc['flux_gates']: bg = pc['flux_gates'][i] bgn = bg['name'] self.printdq(f' Now checking boundary gate {bgn}...') time_range = bg.get('time_range', None) if (time_range is None) or (time_range[0] >= time_range[1]): self.printdq(f' Flux gate {bgn} does not have a specified time range, so it applies to all times.') else: eqt = self.ods['equilibrium.time'][slice_index] slice_in_range = time_range[0] <= eqt <= time_range[1] if not slice_in_range: print(f' Flux gate {bgn} does not apply to time slice {slice_index} at {eqt} s') continue gate_path = np.array([[bg['r1'], bg['r2']], [bg['z1'], bg['z2']]]).T cvals_psin = np.atleast_1d(bg.get('applies_to_psin', [])) cvals_rmid = np.atleast_1d(bg.get('applies_to_dr_mid', [])) contour_count = len(cvals_psin) + len(cvals_rmid) if contour_count == 0: cvals_psin = [1] # There's caching within the methods for obtaining flux surfaces, so they can be reused by a bunch # of gates in this loop without too much of a speed penalty. all_contours = [self.hires_contour(slice_index=slice_index, psin=psin) for psin in cvals_psin] + [ self.find_fluxsurface(slice_index=slice_index, midplane_dr=dr) for dr in cvals_rmid ] contour_labels = [f'psin={psin:0.3f}' for psin in cvals_psin] + [f'dR_mid={dr}' for dr in cvals_rmid] for contours, label in zip(all_contours, contour_labels): intersects = False for c_sec in contours: if len(line_intersect(c_sec, gate_path)): intersects = True if intersects: conclusion = 'passes' else: if bg.get('critical', True): conclusion = 'DOES NOT PASS' else: conclusion = 'does not pass' print(f' Flux surface {label} {conclusion} through gate {bgn}') results[f'{label} surface thru {bgn}'] = intersects return results