Source code for omfit_classes.omfit_efitviewer

# -*-Python-*-
# Created by eldond at 2019-12-23 08:56

"""
Contains supporting functions to back the efitviewer GUI
"""

try:
    # framework is running
    from .startup_choice import *
except ImportError as _excp:
    # class is imported by itself
    if (
        'attempted relative import with no known parent package' in str(_excp)
        or 'No module named \'omfit_classes\'' in str(_excp)
        or "No module named '__main__.startup_choice'" in str(_excp)
    ):
        from startup_choice import *
    else:
        raise

import omas
from omas import ODS
from omfit_classes.omfit_base import OMFITtmp, OMFITmodule, OMFITtree
from omfit_classes.omfit_json import OMFITjson, OMFITsettings, SettingsName
from omfit_classes.omfit_omas_utils import (
    setup_hardware_overlay_cx,
    add_hardware_to_ods,
    omas_eq1d_pretty_name,
    ensure_consistent_experiment_info,
)
from omfit_classes.omfit_formatter import omfit_formatter
from omfit_classes.utils_fusion import is_device
from omfit_classes.omfit_mds import translate_MDSserver

__all__ = ['efitviewer']

# Provide better plot limits than autoscale, which might cut off some external hardware like magnetic probes
default_plot_limits = {'DIII-D': {'x': [0.75, 2.75], 'y': [-1.75, 1.75]}, 'EAST': {'x': [0.55, 3.4], 'y': [-2.1, 2.1]}}

# List of features and the OMAS version required to use them
omas_features = {
    'contour_quantity': '0.50.0',
    'allow_fallback': '0.50.0',
    'sf': '0.50.0',
    'label_contours': '0.50.0',
    'show_wall': '0.55.2',
    'xkw': '0.52.2',
}


class EfitviewerDataError(doNotReportException, ValueError):
    """There is a problem with the data needed to make efitviewer work"""


# Shortcuts and utilities for defining key information, data structure, and shortcuts -----------------------------
[docs]def efitviewer(): """Shortcut for launching efitviewer""" OMFIT['scratch']['__efitviewer_gui__'].run() return
def pick_output_location(): """ Selects efitviewer output folder for storing ODSs and settings :return (string, string, dict-like, dict-like, dict-like, dict-like) Location in the tree of the output folder Location in the tree of the settings dictionary Reference to output folder in the tree Reference to efitviewer settings Reference to settings in the local parent module or main OMFIT settings Reference to main scratch area """ default_out_loc = "OMFIT['efitviewer']" scratch = OMFIT['scratch'] override_loc = scratch.get('efitviewer_out_loc', None) # Resolve output location if override_loc is None: out_loc = default_out_loc printd('No output_loc supplied; using default') elif not (is_string(evalExpr(override_loc))): out_loc = default_out_loc printd('output_loc is not a string; using default loc instead') else: out_loc = output_loc printd('assuming user-supplied output_loc is valid') # Make sure the output loc exists the_split = out_loc.split('[') locations = ['['.join(the_split[:i]) for i in range(1, len(the_split) + 1)] for i in range(1, len(locations)): key = locations[i].split('[')[-1].split(']')[0] key = key.split(key[0])[1] # Remove quotes, regardless of whether ' or " is used the_parent = eval(locations[i - 1]) if key not in the_parent: if isinstance(the_parent, OMFITtmp): # Items added under OMFITtmp should be OMFITtree key_type = OMFITtree elif isinstance(the_parent, OMFITjson): # Items under OMFITjson should be SortedDict key_type = SortedDict elif isinstance(the_parent, OMFITsettings): # Items under OMFITsettings should be SettingsName key_type = SettingsName elif type(the_parent).__name__ in ['OMFITmaintree']: # OMFITmaintree might not be defined in this context key_type = OMFITtree elif type(the_parent).__name__ in ['OMFITtree', 'dict', 'SettingsName', 'SortedDict']: # Copy class of parent if parent's class is appropriate key_type = type(the_parent) printd('pick_output_location(): copy parent type', topic='efitviewer') else: # Otherwise, default to SortedDict key_type = SortedDict printd('pick_output_location(): we do not like this type; just use SortedDict', topic='efitviewer') printd(f'pick_output_location(): type(parent)={type(the_parent)}&type(key)={key_type}', topic='efitviewer') the_parent[key] = key_type() out = eval(out_loc) # Make sure settings are defined and load defaults if needed if 'SETTINGS' not in out: out['SETTINGS'] = OMFITsettings(os.sep.join([OMFITsrc, 'framework_guis', 'efitviewer_settings.txt'])) settings_loc = out_loc + "['SETTINGS']" settings = eval(settings_loc) # Also get a reference to parent module or main OMFIT settings for loc in locations[::-1]: ppm = eval(loc) if (isinstance(ppm, OMFITmodule) and ('SETTINGS' in ppm)) or ('MainSettings' in ppm): parent_settings = ppm['SETTINGS'] if 'SETTINGS' in ppm else ppm['MainSettings'] break else: parent_settings = MainSettings return out_loc, settings_loc, out, settings, parent_settings, scratch def add_command_box_tab(): """ Adds a tab to the command box. Useful if the user wants output dumped to a new tab. :return: int Index of the new command box tab that has been added """ number = len(OMFITaux['GUI'].command) OMFITaux['GUI'].commandAdd(number) OMFITaux['GUI'].commandNotebook.select(OMFITaux['GUI'].commandNotebook.tabs().index(OMFITaux['GUI'].commandNotebook.select())) return number # Data loading and related helpers -------------------------------------------------------------------------------- def parse_load_cx_ods_kw( gfile=None, afile=None, mfile=None, kfile=None, device=None, shot=None, t=None, efitid=None, index=0, no_empty=None, settings=None, minimal_eq_data=None, ): """ Handles setting defaults for keywords to load_cx_ods() Keeps load_cx_ods() simpler and allows isolated testing. load_cx_ods() is called by GUI buttons, and OMFIT GUIs are difficult to test. So, this helps accomplish some limited GUI testing. :param kw: keywords passed to load_cx_ods :return: tuple containing gfile: OMFITgeqdsk or None afile: OMFITaeqdsk or None mfile: OMFITmeqdsk or None kfile: OMFITkeqdsk or None device: str shot: int efitid: str no_empty: bool minimal_eq_data: bool ods_tag: str """ if settings is None: _, _, _, settings, _, _ = pick_output_location() the_case = settings['cases'][index] def_def_gfile = settings['cases'][0].setdefault('gfile', None) def_def_afile = settings['cases'][0].setdefault('afile', None) def_def_mfile = settings['cases'][0].setdefault('mfile', None) def_def_kfile = settings['cases'][0].setdefault('kfile', None) def_def_device = tokamak(settings['cases'][0].setdefault('device', None)) def_def_shot = evalExpr(settings['cases'][0].setdefault('shot', 0)) if index == 0: ods_tag = 'efitviewer_ods' def_device = def_def_device def_shot = def_def_shot def_gfile = def_def_gfile def_afile = def_def_afile def_mfile = def_def_mfile def_kfile = def_def_kfile else: ods_tag = 'efitviewer_ods_{}'.format(index) def_device = the_case.setdefault('device', def_def_device) def_shot = the_case.setdefault('shot', def_def_shot) def_gfile = the_case.setdefault('gfile', def_def_gfile) def_afile = the_case.setdefault('afile', def_def_afile) def_mfile = the_case.setdefault('mfile', def_def_mfile) def_kfile = the_case.setdefault('kfile', def_def_kfile) if gfile is None: gfile = def_gfile if gfile == 'unused': gfile = None if afile is None: afile = def_afile if afile == 'unused': afile = None if mfile is None: mfile = def_mfile if mfile == 'unused': mfile = None if kfile is None: kfile = def_kfile if kfile == 'unused': kfile = None if device is None: device = def_device if shot is None: shot = def_shot if efitid is None: efitid = the_case.setdefault('efit_tree', None) if no_empty is None: no_empty = settings.setdefault('no_empty', True) if minimal_eq_data is None: minimal_eq_data = settings.setdefault('minimal_eq_data', True) return gfile, afile, mfile, kfile, device, shot, efitid, no_empty, minimal_eq_data, ods_tag def read_gamk_file_strings(device, gfile, afile, mfile, kfile): """ Processes inputs that could be strings containing locations instead of the actual target objects :param device: str :param gfile: str or OMFITgeqdsk :param afile: str or OMFITaeqdsk :param mfile: str or OMFITmeqdsk :param kfile: str or OMFITkeqdsk :return: tuple gfile: OMFITgeqdsk or None gfile_str: str or None afile: OMFITaeqdsk or None afile_str: str or None mfile: OMFITmeqdsk or None mfile_str: str or None kfile: OMFITkeqdsk or None kfile_str: str or None """ result = [] for x in 'gamk': varname = f'{x}file' var = eval(varname) if isinstance(var, str): var_str = var var = eval(var) elif isinstance_str(var, f'OMFIT{x}eqdsk'): var_str = f'<<YOUR {x.upper}-FILE FOR {device}:{var.filename}>>' elif isinstance(var, dict): var_str = f'<<A group of {x}EQDSK files for {device}>>' else: var_str = None result += [var, var_str] return result def load_cx_ods( gfile=None, afile=None, mfile=None, kfile=None, device=None, shot=None, t=None, efitid=None, index=0, no_empty=None, minimal_eq_data=None, ): """ Puts an ODS with CX info in the tree :param gfile,afile,mfile,kfile: OMFIT[g/a/m/k]eqdsk instance [optional if shot, t, and efitid are supplied] Pass in 'unused' to prevent None from being used as an instruction to seek out a default value. :param device: string :param shot: int [ignored if gfile is supplied] :param t: int [ignored if gfile is supplied] Leave it as None to load minimal data for all slices, or specify a single time to load just one complete slice :param efitid: string [ignored if gfile is supplied] :param index: int Index of the ODS, for managing multi-shot overlays :param no_empty: bool Filter out equilibrium time-slices that have 0 current or 0 boundary outline points. (this is passed to multi_efit_to_omas() via setup_hardware_overlay_cx()) :minimal_eq_data: bool If the source is MDSplus, load only the minimal dataset needed to power the equilibrium cross section plot. :return: ODS instance """ printd(f' loading cx ods for index {index}', topic='omfit_efitviewer') _, _, out, settings, _, _ = pick_output_location() gfile, afile, mfile, kfile, device, shot, efitid, no_empty, minimal_eq_data, ods_tag = parse_load_cx_ods_kw( gfile=gfile, afile=afile, mfile=mfile, kfile=kfile, device=device, shot=shot, t=t, efitid=efitid, index=index, no_empty=no_empty, settings=settings, minimal_eq_data=minimal_eq_data, ) gfile, gfile_str, afile, afile_str, mfile, mfile_str, kfile, kfile_str = read_gamk_file_strings(device, gfile, afile, mfile, kfile) if gfile is None: load_cmd = "setup_hardware_overlay_cx({}, {}, {}, efitid={}, default_load=False, no_empty={}, minimal_eq_data={})".format( repr(device), repr(shot), repr(t), repr(efitid), no_empty, minimal_eq_data ) ods = out[ods_tag] = setup_hardware_overlay_cx( device, shot, t, efitid=efitid, default_load=False, quiet=True, no_empty=no_empty, minimal_eq_data=minimal_eq_data ) ods['dataset_description.ids_properties.source'] = 'MDSplus' else: printd(f' Using a gfile or set with type {type(gfile)} to setup cx data.', topic='omfit_efitviewer') ods = out[ods_tag] = setup_hardware_overlay_cx( device, geqdsk=gfile, aeqdsk=afile, meqdsk=mfile, keqdsk=kfile, default_load=False, quiet=True, no_empty=no_empty, minimal_eq_data=minimal_eq_data, ) load_cmd = "setup_hardware_overlay_cx({}, geqdsk={}, default_load=False, no_empty={}, minimal_eq_data={})".format( repr(device), gfile_str, no_empty, minimal_eq_data ) ods['dataset_description.ids_properties.source'] = gfile_str settings['cases'][index]['instructions_for_loading_ods'] = load_cmd if 'equilibrium' not in ods or 'time' not in ods['equilibrium']: error_message = ( f'\nFailed to populate equilibrium data in the ODS.\n' f'This can happen if the data are missing from the\n' f'source database or if there is a connection problem.\n' f'You can try "File" > "Reset SSH tunnels, database connections".\n' f'If that does not work, please verify access to the database\n' f'({translate_MDSserver(device, efitid)})\n' f'and check that the shot ({device}#{shot}) has data for {efitid}.' ) ods['dataset_description.ids_properties.comment'] = error_message raise EfitviewerDataError(error_message) return ods def list_available_hardware_descriptors(device): """ Determines which hardware systems have descriptor functions available. That is, which systems can easily have their geometry loaded into an ODS? :param device: string :return: list of strings List of formal IMAS names of hardware systems that can be easily loaded into an ODS by OMFIT """ hw0 = 'setup_' hw1 = '_hardware_description_' available_hardware_d3d = [ a.split(hw1 + 'd3d')[0].split(hw0)[-1] for a in omfit_classes.omfit_omas_d3d.__all__ if a.startswith(hw0) and a.endswith(hw1 + 'd3d') ] available_hardware_east = [ a.split(hw1 + 'east')[0].split(hw0)[-1] for a in omfit_classes.omfit_omas_east.__all__ if a.startswith(hw0) and a.endswith(hw1 + 'east') ] available_hardware_kstar = [ a.split(hw1 + 'kstar')[0].split(hw0)[-1] for a in omfit_classes.omfit_omas_kstar.__all__ if a.startswith(hw0) and a.endswith(hw1 + 'kstar') ] available_hardware_general = [ a.split(hw1 + 'general')[0].split(hw0)[-1] for a in omfit_classes.omfit_omas.__all__ if a.startswith(hw0) and a.endswith(hw1 + 'general') ] if is_device(device, 'DIII-D'): available_hardware = available_hardware_general + available_hardware_d3d elif is_device(device, ['EAST', 'EAST_US']): available_hardware = available_hardware_general + available_hardware_east elif is_device(device, 'KSTAR'): available_hardware = available_hardware_general + available_hardware_kstar else: available_hardware = available_hardware_general return available_hardware def list_available_hardware_data(ods, systems=None): """ Determines which systems appear to have valid geometry data in ODS and could probably be overlaid in efitviewer. :param ods: ODS instance :param systems: list of strings List of formal IMAS names of hardware systems to consider in the search. Defaults to all keys in top level ods. :return: list of strings List of formal IMAS names of hardware systems that look like they're ready to plot using data already in ods. """ if systems is None: systems = ods.keys() return [system for system in systems if len(ods.search_paths('{}.*(geometry|position|position_control)'.format(system))) > 0] def list_hardware_overlay_methods(): """ Determines which hardware systems can be overlaid natively by OMAS :return: list of strings List of formal IMAS names of hardware systems that have plot overlay methods built into OMAS """ a1 = 'plot_' a2 = '_overlay' return [a[len(a1) : -len(a2)] for a in dir(ODS) if a.startswith(a1) and a.endswith(a2) and a != 'plot_overlay'] def setup_avail_systems(suppress_systems=['pulse_schedule']): """ Makes sure keys in systems correspond to available plot overlays in ODS Operates on the 0th ODS instance, not the different equilibrium overlays :param suppress_systems: list of strings Remove these systems from the list; their methods are redundant """ out_loc, settings_loc, out, settings, parent_settings, scratch = pick_output_location() systems = settings['systems'] if 'efitviewer_ods' in out: ods = out['efitviewer_ods'] device = out['efitviewer_ods']['dataset_description.data_entry'].get('machine', tokamak(settings['cases'][0]['device'])) else: ods = None device = tokamak(settings['cases'][0]['device']) has_plot_method = list_hardware_overlay_methods() has_loader_method = list_available_hardware_descriptors(device) has_data = list_available_hardware_data(ods) overlays = has_plot_method and (has_loader_method or has_data) overlays = [overlay for overlay in overlays if overlay not in suppress_systems] for k in list(systems.keys()): if k not in overlays: systems.pop(k) for overlay in overlays: systems.setdefault(overlay, False) return # Special overlays ------------------------------------------------------------------------------------------------ def plot_scaled_boundary_overlay( ax=None, device='ITER', mds_tree='', shot=0, time=0, scale=1 / 3.68, x_offset=0, y_offset=-0.88 / 3.68, **kw ): r""" Overlays the boundary of another device, including scaling and offsets :param ax: Axes instance :param device: string :param mds_tree: string :param shot: int :param time: numeric Time in ms :param scale: float :param x_offset: float Horizontal offset after scaling (m) :param y_offset: float Vertical offset after scaling (m) :param \**kw: Keywords to pass to plot() :return: output from plot() """ out_loc, settings_loc, out, settings, parent_settings, scratch = pick_output_location() kw.pop('t', None) # t is ignored since we're choosing an overlay time from the settings under value kw.pop('ods', None) # We don't need this,but we have to take it to have a consistent call signature if ax is None: ax = scratch.get('efitviewer_cx_ax', None) if ax is None: ax = gca() if is_device(device, 'ITER'): dat = np.genfromtxt(os.sep.join([OMFITsrc, '..', 'samples', 'iter_BL2010_Gribov.txt']), skip_header=3) r = dat[:, 0] * abs(scale) + x_offset z = dat[:, 1] * scale + y_offset label = 'Scaled ITER' else: eq_dat = read_basic_eq_from_mds( device=device, shot=shot, tree=mds_tree, g_file_quantities=['RBBBS', 'ZBBBS'], a_file_quantities=[], measurements=[], other_results=[], derived_quantities=['time'], ) it = closestIndex(eq_dat['time'], time) r = eq_dat['RBBBS'][it, :] * scale + x_offset z = eq_dat['ZBBBS'][it, :] * scale + y_offset btime = eq_dat['time'][it] label = 'Scaled {}#{} @ {}'.format(device, shot, btime) kw.setdefault('label', label) return ax.plot(r, z, **kw) def plot_beam_emission_spectroscopy_overlay(ax=None, ods=None, **kw): r""" Overlays measurement positions for the Beam Emission Spectroscopy (BES) diagnostic for DIII-D :param ax: Axes instance :param ods: ODS instance (for confirming device and getting shot) :param \**kw: Keywords to pass to plot, such as color, marker, etc. :return: output from plot() call """ out_loc, settings_loc, out, settings, parent_settings, scratch = pick_output_location() if ods is None: ods = out.get('efitviewer_ods', None) if ods is None: return device = tokamak(ods['dataset_description.data_entry'].get('machine', settings['cases'][0]['device'])) if not is_device(device, 'DIII-D'): printe('beam_emission_spectroscopy overlay supports DIII-D only, sorry. Aborting and deactivating.') settings['special_systems']['beam_emission_spectroscopy'] = False return shot = ods['dataset_description.data_entry'].get('.pulse', evalExpr(settings['cases'][0]['shot'])) if ax is None: ax = scratch.get('efitviewer_cx_ax', None) if ax is None: ax = gca() kw.pop('t', None) # t is ignored for BES since the configuration doesn't change in time kw.pop('value', None) # The BES activation flag doesn't carry special information that we need to consider # noinspection PyBroadException try: from omfit_classes.omfit_mds import OMFITmdsValue r = OMFITmdsValue(device, shot=shot, TDI='BES_R').data() / 100.0 z = OMFITmdsValue(device, shot=shot, TDI='BES_Z').data() / 100.0 except Exception: printe('BES data acquisition failed for {}#{}. Aborting and deactivating.'.format(device, shot)) settings['special_systems']['beam_emission_spectroscopy'] = False report = ''.join(traceback.format_exception(*sys.exc_info())) printe(report) return kw.setdefault('marker', 's') kw.setdefault('linestyle', ' ') kw.setdefault('label', 'BES') return ax.plot(r, z, **kw) def plot_custom_script_overlay(ax=None, ods=None, defaultvars_keywords=None, **kw): r""" Calls a script to draw a custom overlay or set of overlays. The script should use defaultVars() to accept the following keywords at minimum: ax ods :param ax: Axes instance [optional] If not provided, the custom script had better have a way of choosing. The default axes when using the efitviewer GUI are referenced in scratch.get('efitviewer_cx_ax', None) :param ods: ODS instance :param defaultvars_keywords: dict Keywords to pass to script's defaultVars() :param \**kw: additional keywords accepted to avoid problems (ignored) """ if ods is None: out_loc, settings_loc, out, settings, parent_settings, scratch = pick_output_location() ods = out.get('efitviewer_ods', None) script_loc = kw.pop('script_loc', None) if script_loc is None: printe('Invalid script location. Cannot call custom script overlay.') printd(' efitviewer lib plot_custom_script_overlay: script_loc = {}'.format(repr(script_loc))) script = eval(script_loc) if defaultvars_keywords is None: defaultvars_keywords = {} script.runNoGUI(ax=ax, ods=ods, **defaultvars_keywords) return def plot_alt_limiter_overlay(ax=None, **kw): """ Plots an alternative limiter overlay :param ax: Axes instance :param kw: Additional keywords :return: output from plot() call """ # Extract kw ignore_kw = ['t', 'value', 'labelevery', 'notesize', 'mask', 'ods'] # Unused ignore_kw += ['retain_wall'] # This should be used elsewhere for ikw in ignore_kw: kw.pop(ikw, None) scale = kw.pop('scale', 1.0) alt_lim_loc = kw.pop('alt_limiter_data_loc', None) alt_lim_data = kw.pop('alt_limiter_data_array', None) # Get data if alt_lim_data is None and alt_lim_loc is None: return None elif alt_lim_data is None: alt_lim_data = eval(alt_lim_loc) lim_name = alt_lim_loc.split("'")[-2] else: lim_name = '' r = alt_lim_data[:, 0] * scale z = alt_lim_data[:, 1] * scale # Set up plot if ax is None: ax = scratch.get('efitviewer_cx_ax', None) if ax is None: ax = gca() kw.setdefault('label', 'Alternative limiter {}'.format(lim_name)) return ax.plot(r, z, **kw) # Plot and plot related utilities --------------------------------------------------------------------------------- def plot_grid(fig=None, ax=None, enable=None, grid_kw=None): """ :param fig: Figure instance :param ax: Axes instance :param enable: bool :param grid_kw: dict Keywords passed to grid """ out_loc, settings_loc, out, settings, parent_settings, scratch = pick_output_location() # Get default values if fig is None: fig = scratch.get('efitviewer_fig', None) if enable is None: enable = settings.setdefault('grid_enable', False) if grid_kw is None: grid_kw = settings.setdefault('grid_kw', {}) ax = get_axes(ax) # Update grid if enable: ax.grid(enable, which='major', axis='both', **grid_kw) else: ax.grid(False, which='major', axis='both') # Make sure the update is actually drawn if fig is not None: fig.canvas.draw() return def setup_special_contours(case=0, t=None, contour_quantity=None, spacing=None, decimal_places=3): """ Finds contour levels in contour_quantity to give contours that obey settings in the dictionary called spacing :param case: int Which efitviewer case / ODS instance should be selected? :param t: float Time in s, to select a time slice in equilibrium data Defaults to setting for the relevant case in efitviewer settings :param contour_quantity: string Quantity for output contour levels. Defaults to contour quantity in efitviewer settings. :param spacing: dict Spacing instructions; can contain: quantity: string R, psi, psi_n, S amount: float array in data units relevant to quantity reference: string Reference point. For flux measurements, the reference is always the boundary & this setting is ignored. For R, try outer_midplane. For S, try outer_lower_strike. :param decimal_places: int Round to this many decimal places """ out_loc, settings_loc, out, settings, parent_settings, scratch = pick_output_location() if case == 0: ods = out['efitviewer_ods'] else: ods = out['efitviewer_ods_{}'.format(case)] if t is None: t = settings['cases'][case]['time'] / 1000.0 if contour_quantity is None: contour_quantity = settings['contour_quantity'] if spacing is None: spacing = settings['special_contour_spacing'] it = closestIndex(ods['equilibrium']['time'], t) eq_slice = ods['equilibrium']['time_slice'][it] eq_phi = eq_slice['profiles_1d']['phi'] eq_psi = eq_slice['profiles_1d']['psi'] eq_psi_n = np.linspace(0, 1, len(eq_psi)) cr = eq_slice['profiles_2d'][0]['grid']['dim1'] cz = eq_slice['profiles_2d'][0]['grid']['dim2'] if contour_quantity in ['psi', 'PSI']: cq = eq_slice['profiles_2d'][0]['psi'] cq1d = eq_psi elif contour_quantity.lower() in ['psin', 'psi_n', 'psi_norm']: psi_bdry = eq_slice['global_quantities']['psi_boundary'] psi_axis = eq_slice['global_quantities']['psi_axis'] cq = (eq_slice['profiles_2d'][0]['psi'] - psi_axis) / (psi_bdry - psi_axis) cq1d = eq_psi_n elif contour_quantity in ['phi', 'PHI']: cq = eq_slice['profiles_2d'][0]['phi'] cq1d = eq_phi else: raise ValueError('Contour quantity {} is not yet supported for this operation, sorry.'.format(contour_quantity)) space_q = spacing.get('quantity', 'R') space_a = np.atleast_1d(spacing.get('amount', np.array([0, 0.01]))) space_ref = spacing.get('reference', 'outer_midplane') if space_q in ['R', 'Rmaj', 'R_major', 's', 'S']: if space_ref.lower() in ['outer_midplane', 'omp']: rmaxis = eq_slice['global_quantities.magnetic_axis.r'] zmaxis = eq_slice['global_quantities.magnetic_axis.z'] bdry_r = eq_slice['boundary.outline.r'] bdry_z = eq_slice['boundary.outline.z'] wr = bdry_r > rmaxis r_omp = interp1d(bdry_z[wr], bdry_r[wr])(zmaxis) ref_r = r_omp ref_z = zmaxis elif space_ref.lower() in ['outer_lower_strike']: # omfit_omas.multi_efit_to_omas() loads strike points in a set order: low out, low in, up out, up in try: ref_r = eq_slice['boundary.strike_point.0.r'] ref_z = eq_slice['boundary.strike_point.0.z'] except ValueError: ref_r = ref_z = np.NaN if np.isnan(ref_r) or np.isnan(ref_z): raise ValueError('Outer lower strike point not defined; might be a limited or upper-biased shape or just missing from ODS') elif space_ref.lower() in ['outer_upper_strike']: # omfit_omas.multi_efit_to_omas() loads strike points in a set order: low out, low in, up out, up in ref_r = eq_slice['boundary.strike_point.2.r'] ref_z = eq_slice['boundary.strike_point.2.z'] if np.isnan(ref_r) or np.isnan(ref_z): raise ValueError('Outer upper strike point not defined; might be a limited or lower-biased shape or just missing from ODS') else: raise ValueError('Unrecognized custom contour spacing reference: {}'.format(space_ref)) if space_q in ['R', 'Rmaj', 'R_major']: dr = space_a r = ref_r + dr z = ref_z + dr * 0 elif space_q in ['S', 's']: ds = space_a wallr0 = ods['wall.description_2d.0.limiter.unit.0.outline.r'] wallz0 = ods['wall.description_2d.0.limiter.unit.0.outline.z'] # Upsample the wall so that the nearest point will be on the right surface. Since the wall has detailed # features, the nearest point can be behind the true surface. For example, a strike point on the lower # shelf might find a closest wall point in the pump duct under the shelf. To avoid locking onto such a # point, put in lots of extra points along the wall contour instead. idx = np.linspace(0, 1, len(wallr0)) new_idx = np.linspace(0, 1, 20 * len(wallr0)) wallr = interp1d(idx, wallr0)(new_idx) wallz = interp1d(idx, wallz0)(new_idx) ref_dist = np.sqrt((wallr - ref_r) ** 2 + (wallz - ref_z) ** 2) ic = ref_dist.argmin() # Figure out which way the wall goes neighborhood = np.arange(ic - 3, ic + 3) neighborhood = neighborhood[(neighborhood >= 0) & (neighborhood < len(wallr))] if np.nanmean(np.diff(wallr[neighborhood])) > 0: wall_dir = 1 else: wall_dir = -1 # Get wall coordinate wall_ds = np.append(0, np.sqrt(np.diff(wallr) ** 2 + np.diff(wallz) ** 2)) wall_s = np.cumsum(wall_ds) * wall_dir wall_s -= wall_s[ic] # Get s coordinates of requested contour levels if ref_r > wallr[ic]: ref_s = ref_dist[ic] else: ref_s = -ref_dist[ic] s = ref_s + ds # Get R-Z coordinates of requested contour levels r = interp1d(wall_s, wallr)(s) z = interp1d(wall_s, wallz)(s) else: raise ValueError # Translate R-Z coordinates into flux surface labels (like psi, etc) contour_levels = (interp2d(cr, cz, cq.T, bounds_error=False, fill_value=np.NaN)(r, z))[0] elif space_q.lower() == 'psi': psi_levels = eq_slice['global_quantities']['psi_boundary'] + space_a contour_levels = interp1d(eq_psi, cq1d)(psi_levels) elif space_q.lower() in ['psi_n', 'psin']: psin_levels = 1 + space_a contour_levels = interp1d(eq_psi_n, cq1d)(psin_levels) else: raise ValueError('Unrecognized quantity for spacing contours: {}'.format(space_q)) return np.round(contour_levels, decimal_places) def get_axes(ax=None, which='cx'): """ Choose the best plot axes to use :param ax: Axes instance or 2D array of Axes instances [optional] If not specified, try to use efitviewer GUI's embedded axes. If those are not active, use gca(). :param which: str cx: deal with cross section axes profile: deal with array of profile axes :return: Axes instance or 2D array of Axes instances """ out_loc, settings_loc, out, settings, parent_settings, scratch = pick_output_location() if which == 'cx' and not settings.get('show_cx', True): printd(f'show_cx is off, so no cx_ax is needed. Returning None...', topic='efitviewer') return None if which == 'profile' and settings.get('profiles', {}).get('num_profiles', 0) < 1: printd(f'num_profiles < 1, so no profile_axes are needed. Returning None...', topic='efitviewer') if ax is None: printd(f'plot_single_cx_ods(): trying to look up efitviewer axes for {which}...', topic='efitviewer') if which == 'profile': ax = scratch.get('efitviewer_profile_axs', None) if ax is not None and len(ax.flatten()): test_ax = ax[0, 0] else: test_ax = None else: ax = scratch.get('efitviewer_cx_ax', None) test_ax = ax else: printd(f'received some axes to check against the needs of {which}...', topic='efitviewer') test_ax = ax if which == 'profile': if len(test_ax.flatten()): test_ax = ax[0, 0] else: test_ax = None if test_ax is not None: printd(f'testing whether {which} axes have a figure associated with them', topic='efitviewer') # Check if ax is active try: # https://stackoverflow.com/a/15311949/6605826 tk_figure_open = test_ax.figure.canvas.get_tk_widget().winfo_exists() except Exception: # I assume that get_tk_widget() will fail if the backend isn't tk tk_figure_open = False # https://stackoverflow.com/a/26485683/6605826 figure_window_open = test_ax.figure in list(map(pyplot.figure, pyplot.get_fignums())) if (not tk_figure_open) and (not figure_window_open): printd('plot_single_cx_ods(): efitviewer figure seems to have been closed', topic='efitviewer') ax = None if ax is None: printd('plot_single_cx_ods(): using populate_efitviewer_figure()', topic='efitviewer') fig = scratch['efitviewer_fig'] = pyplot.figure() settings.setdefault('profiles', SettingsName()) cx_ax, profile_axs = populate_efitviewer_figure( fig, scratch, show_cx=settings.setdefault('show_cx', True), num_profiles=settings['profiles'].setdefault('num_profiles', 0), num_profile_cols=settings['profiles'].setdefault('num_profile_cols', 0), profiles_sharex=settings['profiles'].setdefault('sharex', 'all'), profiles_sharey=settings['profiles'].setdefault('sharey', 'none'), **settings['profiles'].setdefault('gridspec_kw', {}), ) if which == 'profile': ax = profile_axs else: ax = cx_ax return ax def save_subplot_layout(reset=False): """ Either saves the current layout of subplots in the efitviewer figure, or erases prior records of fig layout The saves are particular to each configuration (number of columns, CX on/off, number of profile plots). :param reset: bool Erase the last record instead of saving """ out_loc, settings_loc, out, settings, parent_settings, scratch = pick_output_location() show_cx = settings.get('show_cx', True) ncol = settings.get('profiles', {}).get('num_profile_cols', 0) if ncol == 0: ncol = 1 tag = f'{"cx" if show_cx else ""}-{ncol}-{settings.get("profiles", {}).get("num_profiles", 0)}' if reset: settings['plot_style_kw'].setdefault('subplot_layout', SettingsName()).pop(tag, None) return layout = settings['plot_style_kw'].setdefault('subplot_layout', SettingsName()).setdefault(tag, SettingsName()) fig = scratch['efitviewer_fig'] for attr in ['wspace', 'hspace', 'left', 'right', 'bottom', 'top']: layout[attr] = getattr(fig.subplotpars, attr, None) def plot_style_enforcement(ax=None, plot_style=None): """ Applies efitviewers plot style settings to some axes :param ax: Axes instance [optional] Axes to modify; looks up efitviewer GUI axes or gca() if not specified :param plot_style: dict-like [optional] Settings to use for modifying axes appearance """ # Unpack settings and axes reference out_loc, settings_loc, out, settings, parent_settings, scratch = pick_output_location() if plot_style is None: plot_style = settings.get('plot_style_kw', {}) ax = get_axes(ax) if ax is None: return fig = ax.figure # Frame frame_on = plot_style.setdefault('frame_on', True) if frame_on is not None: ax.set_frame_on(frame_on) # Aspect ratio aspect = plot_style.setdefault('axes_aspect', 'equal_box') if aspect is not None: if '_' in str(aspect): aspect_ratio, adjustable = aspect.split('_') elif is_numeric(aspect): aspect_ratio = aspect adjustable = None else: aspect_ratio = aspect adjustable = None ax.set_aspect(aspect_ratio, adjustable=adjustable) # Tick spacing if plot_style.setdefault('tick_spacing', 0.25) is not None: ax.xaxis.set_major_locator(matplotlib.ticker.MultipleLocator(plot_style['tick_spacing'])) ax.yaxis.set_major_locator(matplotlib.ticker.MultipleLocator(plot_style['tick_spacing'])) # Tick position ax.xaxis.set_ticks_position(plot_style.setdefault('xtick_loc', 'both')) ax.yaxis.set_ticks_position(plot_style.setdefault('ytick_loc', 'both')) # Grid plot_grid(fig=fig, ax=ax, enable=plot_style.setdefault('grid_enable', False), grid_kw=plot_style.setdefault('grid_kw', {})) # Subplots layout show_cx = settings.get('show_cx', True) ncol = settings.get('profiles', {}).get('num_profile_cols', 0) if ncol == 0: ncol = 1 tag = f'{"cx" if show_cx else ""}-{ncol}-{settings.get("profiles", {}).get("num_profiles", 0)}' layout = ( settings.setdefault('plot_style_kw', SettingsName()).setdefault('subplot_layout', SettingsName()).setdefault(tag, SettingsName()) ) fig.subplots_adjust(**layout) # Annotations if plot_style.setdefault('subplot_label', None) is not None: font_size = settings.get('default_fontsize', None) if font_size is None: font_size = matplotlib.rcParams['xtick.labelsize'] tag_plots_abc( fig=fig, axes=ax, start_at=plot_style.setdefault('subplot_label', 0), corner=plot_style.setdefault('subplot_label_corner', [1, 1]), font_size=font_size, ) return # Main plotting functions ----------------------------------------------------------------------------------------- def populate_efitviewer_figure( fig, scratch, show_cx=True, num_profiles=0, num_profile_cols=0, profiles_sharex='all', profiles_sharey='none', **gridspec_kw ): """ Adds subplots to the efitviewer figure :param fig: Figure instance Can be a standalone matplotlib figure, or an OMFITx.Figure GUI element :param scratch: dict-like Scratch space :param show_cx: bool Show the cross section subplot (otherwise just profiles are shown) :param num_profiles: int Number of panels for profile plots :param num_profile_cols: int Number of columns for profiles :param profiles_sharex: str Share the x axis between profiles. Options: 'none', 'all', 'row', 'col'. :param profiles_sharey: str Share the Y axis between profiles. Options: 'none', 'all', 'row', 'col'. :param gridspec_kw: keywords to pass to gridspec. width_ratios is especially useful; use it to set the relative widths of columns by passing a numeric iterable matching the total number of columns (including CX, if shown) :return: tuple Axes instance: axes for the cross section or None if not used 2D array of Axes instances, padded with None as needed: axes for the profiles Padding with None would be needed for cases like 8 plots in a 3x3 arrangement, where one panel would be blank. """ # Set up the grid if num_profiles > 0: num_profile_cols_ = np.min([np.max([1, num_profile_cols]), num_profiles]) num_rows = int(np.max([np.ceil(num_profiles / float(num_profile_cols_)), 1])) else: num_profile_cols_ = 0 num_rows = 1 try: gs = matplotlib.gridspec.GridSpec(num_rows, num_profile_cols_ + show_cx, **gridspec_kw) except Exception: gs = matplotlib.gridspec.GridSpec(num_rows, num_profile_cols_ + show_cx) report = ''.join(traceback.format_exception(*sys.exc_info())) printe('Gridspec keywords for setting up efitviewer profile plots were invalid and were ignored!') print(report) printe('Gridspec keywords for setting up efitviewer profile plots were invalid and were ignored!') if show_cx: cx_ax = scratch['efitviewer_cx_ax'] = fig.add_subplot(gs[:, 0]) else: cx_ax = scratch['efitviewer_cx_ax'] = None profile_axs = scratch['efitviewer_profile_axs'] = np.empty((num_rows, num_profile_cols_), dtype=object) for i in range(num_rows): for j in range(num_profile_cols_): profile_axs[i, j] = None for i in range(num_profiles): row = i % num_rows col = i // num_rows fig_col = col + show_cx if (i > 0) and profiles_sharex != 'none': if profiles_sharex == 'all': sharex = profile_axs[0, 0] elif profiles_sharex == 'row': sharex = profile_axs[row, 0] elif profiles_sharex == 'col': sharex = profile_axs[0, col] else: sharex = None if (i > 0) and profiles_sharey != 'none': if profiles_sharey == 'all': sharey = profile_axs[0, 0] elif profiles_sharey == 'row': sharey = profile_axs[row, 0] elif profiles_sharey == 'col': sharey = profile_axs[0, col] else: sharey = None profile_axs[row, col] = fig.add_subplot(gs[row, fig_col], sharex=sharex, sharey=sharey) if (row < (num_rows - 1)) and (profiles_sharex in ['col', 'all']): profile_axs[row, col].get_xaxis().set_visible(False) if (col > 0) and profiles_sharey in ['row', 'all']: profile_axs[row, col].get_yaxis().set_visible(False) return cx_ax, profile_axs def plot_cx_ods(ods_idx=None, odss=None, cases=None, **kw): r""" Wrapper for making multiple calls to plot_single_cx_ods() Note that plot_single_cx_ods() calls plot_single_time_profile_set(), so this is the umbrella for all CX and profile plots. :param ods_idx: int or list of ints [optional] You should probably let it do its default behavior, but you can override it if you need to. :param odss: dict [optional] Keys are int matching entries in ods_idx Values are ODS instances. This is for overriding ODSs in the tree to support some very specific operations and is not recommended for most purposes. :param cases: dict-like [optional] Dictionary with entries foe each index in ods_idx specifying time (float or float array) and active (bool) :param \**kw: Additional keywords passed to plot_single_cx_ods() for 0th case. See docstring for plot_single_cx_ods. If you have overridden ods with your own list, the first entry in the list gets all the keywords, and subsequent entries get a subset. """ out_loc, settings_loc, out, settings, parent_settings, scratch = pick_output_location() printd('started plot_cx_ods...', topic='efitviewer') if cases is None: cases = settings['cases'] if ods_idx is None: # Default automatic selection of ods indices to loop over ods_idx = [] for idx in cases: # Collect indices of active cases to overlay if cases[idx].get('active', True) or idx == 0: ods_idx += [idx] if odss is None: odss = {k: None for k in ods_idx} tkw = kw.pop('t', None) first_only = ['overlays', 'special_overlays', 'customization'] clear_first = kw.pop('clear_first', None) kw2 = copy.copy(kw) for fkw in first_only: kw2.pop(fkw, None) kw2['overlays'] = {} kw2['special_overlays'] = {} for i, idx in enumerate(tolist(ods_idx)): # Special call for the primary ODS printd('plot_single_cx_odx() call for equilibrium slice i={}, idx={}'.format(i, idx)) if len(tolist(ods_idx)) <= 1: use_kw = kw printd(' allowing overlay instructions in this call instead of doing separately at the end') else: use_kw = kw2 printd(' no overlay instructions yet; must repeat at the end') plot_single_cx_ods( ods_idx=idx, ods=odss[idx], multi_cases=len(tolist(ods_idx)) > 1, t=cases[idx].get('time', None) if tkw is None else tkw, eq_active=cases[idx].get('active', True), wall_active=cases[idx].get('wall', True), clear_first=clear_first and (i == 0), eqkw=cases[idx].get('eqkw', {}), xkw=cases[idx].get('xkw', dict(marker='x')), **use_kw, ) if len(tolist(ods_idx)) > 1: # Diagnostic overlays from first ODS printd(' extra overlay call at the end for diagnostics') plot_single_cx_ods( ods_idx=0, ods=odss[0], multi_cases=len(tolist(ods_idx)) > 1, t=cases[0].get('time', None) if tkw is None else tkw, eq_active=False, wall_active=False, **kw, ) return def calc_btor_2d(ods, t_idx): """ Calculates the toroidal magnetic field on the 2D grid :param ods: ODS instance :param t_idx: int """ s = ods['equilibrium.time_slice'][t_idx] t = ods['equilibrium.time'] bcentr = interp1d(t, ods['equilibrium.vacuum_toroidal_field.b0'])(s['time']) rcentr = ods['equilibrium.vacuum_toroidal_field.r0'] rgrid = s['profiles_2d.0.grid.dim1'] zgrid = s['profiles_2d.0.grid.dim2'] btvac = bcentr * rcentr / rgrid btvac2d = btvac[:, np.newaxis] + zgrid[np.newaxis, :] * 0 psi1d = s['profiles_1d.psi'] f1d = s['profiles_1d.f'] psi2d = s['profiles_2d.0.psi'] f2d = interp1d(psi1d, f1d, bounds_error=False, fill_value=np.NaN)(psi2d) btor = f2d / rgrid[:, np.newaxis] btor[np.isnan(btor)] = btvac2d[np.isnan(btor)] s['profiles_2d.0.b_field_tor'] = btor def calculate_j_toroidal(ods, t_idx, check_availability=False): """ Calculates toroidal current density :param ods: ODS instance :param t_idx: int :param check_availability: bool Return a bool flag indicating whether the calculation is possible instead of doing the calculation :return: 1D float array """ from omfit_classes.fluxSurface import fluxSurfaces scratch = pick_output_location()[5] profiles_1d = ods['equilibrium']['time_slice'][t_idx]['profiles_1d'] profiles_2d = ods['equilibrium']['time_slice'][t_idx]['profiles_2d'][0] global_quantities = ods['equilibrium.time_slice'][t_idx]['global_quantities'] required_2d = ['psi'] required_1d = ['psi', 'f'] required_global = ['magnetic_axis.r'] is_available = all( [r in profiles_1d for r in required_1d] + [r in profiles_2d for r in required_2d] + [r in global_quantities for r in required_global] ) if check_availability: return is_available if (profiles_1d.get('r_outboard', None) is None) or (not len(profiles_1d['r_outboard'])) or np.all(np.isnan(profiles_1d['r_outboard'])): define_r_outboard(ods, t_idx) if ( (profiles_2d.get('b_field_tor', None) is None) or (not len(profiles_2d['b_field_tor'])) or np.all(np.isnan(profiles_2d['b_field_tor'])) ): calc_btor_2d(ods, t_idx) fluxtag = f"flux_surfaces_{ods['dataset_description.data_entry.machine']}#{ods['dataset_description.data_entry.pulse']}_{t_idx}" fluxsurf = scratch[fluxtag] = fluxSurfaces( Rin=profiles_2d['grid.dim1'], Zin=profiles_2d['grid.dim2'], PSIin=profiles_2d['psi'].T, Btin=profiles_2d['b_field_tor'].T, Rcenter=global_quantities['magnetic_axis.r'], F=profiles_1d['f'], P=profiles_1d['pressure'], rlim=ods['wall.description_2d.0.limiter.unit.0.outline.r'], zlim=ods['wall.description_2d.0.limiter.unit.0.outline.z'], levels=np.linspace(0, 1, len(profiles_1d['psi'])), gEQDSK=None, cocosin=ods.cocos, ) profiles_1d['j_tor'] = fluxsurf['avg']['Jt'] def define_r_outboard(ods, t_idx=None): """ Given an ODS with an equilibrium, define the 1D profile quantity r_outboard is defined. :param ods: ODS instance :param t_idx: int Index of the time slice to work on """ profiles_1d = ods['equilibrium']['time_slice'][t_idx]['profiles_1d'] profiles_2d = ods['equilibrium']['time_slice'][t_idx]['profiles_2d'][0] global_quantities = ods['equilibrium.time_slice'][t_idx]['global_quantities'] r = profiles_2d['grid.dim1'] z = profiles_2d['grid.dim2'] psi2d = profiles_2d['psi'] rmaxis = global_quantities['magnetic_axis.r'] zmaxis = global_quantities['magnetic_axis.z'] zidx = abs(z - zmaxis).argmin() rsel = r >= rmaxis psi_omp = psi2d[rsel, zidx] r_slice = r[rsel] profiles_1d['r_outboard'] = interp1d(psi_omp, r_slice, bounds_error=False, fill_value='extrapolate')(profiles_1d['psi']) def take_slice_of_2d(quantity, ods, t_idx): """ Takes a 1D slice out of a 2D profile quantity :param quantity: str :param ods: ODS instance :param t_idx: int :return: 1D float array """ profiles_1d = ods['equilibrium']['time_slice'][t_idx]['profiles_1d'] profiles_2d = ods['equilibrium']['time_slice'][t_idx]['profiles_2d'][0] global_quantities = ods['equilibrium.time_slice'][t_idx]['global_quantities'] if (profiles_1d.get('r_outboard', None) is None) or (not len(profiles_1d['r_outboard'])) or np.all(np.isnan(profiles_1d['r_outboard'])): define_r_outboard(ods, t_idx) rtar = profiles_1d['r_outboard'] ztar = global_quantities['magnetic_axis.z'] + 0 * rtar return RectBivariateSpline(profiles_2d['grid.dim1'], profiles_2d['grid.dim2'], profiles_2d[quantity])(rtar, ztar, grid=False) def get_quantity(quantity, ods, t_idx): """ Intercept quantity request to calculate special quantities as needed :param quantity: str :param ods: ODS instance :param t_idx: int :return: 1D float array """ profiles_1d = ods['equilibrium']['time_slice'][t_idx]['profiles_1d'] profiles_2d = ods['equilibrium']['time_slice'][t_idx]['profiles_2d'][0] global_quantities = ods['equilibrium.time_slice'][t_idx]['global_quantities'] if quantity == 'b_field_tor' and quantity not in profiles_2d: calc_btor_2d(ods, t_idx) if quantity == 'j_tor' and quantity not in profiles_1d and calculate_j_toroidal(ods, t_idx, check_availability=True): calculate_j_toroidal(ods, t_idx) if quantity == 'psi_norm': q = (profiles_1d['psi'] - global_quantities['psi_axis']) / (global_quantities['psi_boundary'] - global_quantities['psi_axis']) elif (quantity not in profiles_1d) and (quantity in profiles_2d): q = take_slice_of_2d(quantity, ods, t_idx) else: q = profiles_1d[quantity] return q def constraint_profile_mse_polarisation_angle(ods, t_idx=0, profile_x_quantity='R', ax=None, **plot_kw): """ Plots a profile of the MSE polarization angle constraint Each constraint profile plot function must accept these keywords: [ods, t_idx, profile_x_quantity, ax] Each constraint profile plot function must be named exactly: 'constraint_profile_{name of constraint set in equilibrium}' :param ods: ODS instance :param t_idx: int :param profile_x_quantity: str :param ax: Axes instance :param plot_kw: Addtional keywords passed to plot and errorbar """ if ax is None: # Only relevant for testing; otherwise a specific set of Axes within the # profile axes set should always be provided by the calling function. ax = gca() mse_eq = ods['equilibrium.time_slice'][t_idx]['constraints.mse_polarisation_angle'] nchan = len(mse_eq) r = np.empty(nchan) z = np.empty(nchan) measured = np.empty(nchan) measured_err = np.empty(nchan) reconstructed = np.empty(nchan) for ch in range(nchan): r[ch] = ods[mse_eq[ch]['source']]['active_spatial_resolution.0.centre.r'] z[ch] = ods[mse_eq[ch]['source']]['active_spatial_resolution.0.centre.z'] measured[ch] = mse_eq[ch]['measured'] measured_err[ch] = mse_eq[ch]['measured_error_upper'] reconstructed[ch] = mse_eq[ch]['reconstructed'] if profile_x_quantity.lower() == 'r': x = r ax.set_xlabel('R / m') elif profile_x_quantity.lower() == 'z': x = z ax.set_xlabel('Z / m') else: profiles_1d = ods['equilibrium']['time_slice'][t_idx]['profiles_1d'] profiles_2d = ods['equilibrium']['time_slice'][t_idx]['profiles_2d'][0] psi = RectBivariateSpline(profiles_2d['grid.dim1'], profiles_2d['grid.dim2'], profiles_2d['psi'])(r, z, grid=False) x = interp1d(profiles_1d['psi'], get_quantity(profile_x_quantity, ods, t_idx), bounds_error=False, fill_value=np.NaN)(psi) ax.set_xlabel(omas_eq1d_pretty_name(profile_x_quantity)) ax.set_ylabel('MSE pitch angle / radians') pkw = copy.deepcopy(plot_kw) for naughty in ['linestyle', 'marker', 'ls', 'label']: pkw.pop(naughty, None) a = ax.errorbar(x, measured, measured_err, marker='x', linestyle='', label='Measured', **pkw) # Don't use the next color in the cycle or else these profile colors won't line up with equilibrium colors pkw['color'] = associated_color(a[0].get_color()) ax.plot(x, reconstructed, marker='.', linestyle='', label='Reconstructed', **pkw) ax.legend(loc=0) def plot_single_time_profile_set( ods=None, t_idx=0, profile_axs=None, profile_x_quantity=None, profile_y_quantities=None, gentle=False, **plotkw ): """ Plots profiles for a single time-slice This is normally called by plot_single_cx_ods() :param ods: ODS() instance with the data :param t_idx: int :param profile_axs: 2D array of Axes instances :param profile_x_quantity: str :param profile_y_quantities: list of strings :param gentle: bool Report exceptions with print and ax.text instead of raising :param plotkw: Additional keywords passed to plot and plot-related functions in constraint profiles Could be passed to errorbar(), for example. """ if profile_axs is None or profile_x_quantity is None: return settings = pick_output_location()[3] profiles_1d = ods['equilibrium']['time_slice'][t_idx]['profiles_1d'] profiles_2d = ods['equilibrium']['time_slice'][t_idx]['profiles_2d'][0] constraints = ods['equilibrium']['time_slice'][t_idx]['constraints'] global_quantities = ods['equilibrium.time_slice'][t_idx]['global_quantities'] if (profile_x_quantity == 'r_outboard') and ('r_outboard' not in profiles_1d): define_r_outboard(ods, t_idx) # Sanitize plotkw pkw = copy.deepcopy(plotkw) for naughty in ['linewidths', 'colors', 'linestyles']: pkw.pop(naughty, None) for i, ax in enumerate(profile_axs.flatten()): if profile_y_quantities[i] is None: return # Is it a constraint that needs a special function? if ( (profile_y_quantities[i] not in profiles_1d) and (profile_y_quantities[i] not in profiles_2d) and (profile_y_quantities[i] in constraints) ): constraint_profile_function_name = f'constraint_profile_{profile_y_quantities[i]}' else: constraint_profile_function_name = None if constraint_profile_function_name is None: ax.set_xlabel(omas_eq1d_pretty_name(profile_x_quantity)) ax.set_ylabel(omas_eq1d_pretty_name(profile_y_quantities[i])) try: if constraint_profile_function_name is not None: constraint_profile_function = eval(constraint_profile_function_name) constraint_profile_function(ods=ods, t_idx=t_idx, profile_x_quantity=profile_x_quantity, ax=ax, **pkw) else: ax.plot(get_quantity(profile_x_quantity, ods, t_idx), get_quantity(profile_y_quantities[i], ods, t_idx), **pkw) except (KeyError, ValueError, NameError): if gentle: # Display the error instead of bringing the whole GUI down if the plot fails. print('Error in plot_single_time_profile_set:') report = ''.join(traceback.format_exception(*sys.exc_info())) printe(report) # noinspection PyBroadException try: ax.text(0.05, 0.95, report, ha='left', va='top', color='red') except Exception: printd('plot_single_cx_ods{}: Failed to properly report an exception!', topic='efitviewer') else: raise if settings['profiles'].get('sharex', 'all') in ['all', 'col']: if len(profile_axs[:, 0]) > 1: for ax in profile_axs[:-1, :].flatten(): ax.set_xlabel('') if settings['profiles'].get('sharey', 'none') in ['all', 'row']: if len(profile_axs[0, :]) > 1: for ax in profile_axs[:, 1:].flatten(): ax.set_ylabel('') if len(np.shape(profile_axs)) > 1 and np.shape(profile_axs)[1] > 1 and settings['profiles'].setdefault('last_col_right_label', True): for ax in profile_axs[:, -1].flatten(): ax.yaxis.tick_right() ax.yaxis.set_ticks_position('both') ax.yaxis.set_label_position("right") def plot_single_cx_ods( cx_ax=None, profile_axs=None, ods_idx=0, multi_cases=False, ods=None, t=None, overlays=None, special_overlays=None, clear_first=False, gentle=False, contour_quantity=None, allow_fallback=None, levels=None, contour_resample_factor=None, xlim=None, ylim=None, customization=None, default_fontsize=None, show_legend=None, show_cornernote=None, eq_active=None, wall_active=None, profile_x_quantity=None, profile_y_quantities=None, plot_style_kw=None, eqkw=None, xkw=None, ): """ Plots efitviewer-style cross section with overlays If data for an overlay are missing, they will be added before plotting. This function also calls plot_single_time_profile_set() to add profiles to the figure. :param cx_ax: Axes instance Axes to draw cross section on. :param profile_axs: 2D array containing Axes instances Axes to use for drawing profiles. Profiles disabled if set to None Pad with None if necessary (e.g. 8 plots in a 3x3 arrangement with one missing). :param ods_idx: int For looking up which group of settings to use, & for looking up which ODS to use, if ods is not overridden. :param multi_cases: bool Set up for supporting multi-case overlay. Affects how some annotations are drawn. Does not actually do multiple cases in this call. :param ods: ODS instance This is the source of data. At minimum, it should contain equilibrium data for plotting contours. :param t: numeric scalar or iterable Time slice(s) of interest (ms). Control multi-slice overlays by passing a list or array. :param overlays: dict Dictionary with keys matching standard hardware systems and bool keys to activate overlays of those systems :param special_overlays: dict Dictionary with keys matching special hardware overlays Each function should accept ax, ods, t, and value :param clear_first: bool Clears Axes before plotting. Use for quickly updating the same plot embedded in a GUI. :param gentle: bool Prints exception reports instead of raising exceptions. Use to prevent a whole GUI system from going down on a failed plot. :param contour_quantity: string Options: psi (poloidal mag flux), rho (sqrt of toroidal mag flux), or phi (toroidal mag flux) :param allow_fallback: bool If rho/psi/phi isn't available, allow fallback to something that is :param levels: numeric iterable Contour levels :param contour_resample_factor: int Upsample 2d data to improve contour smoothness :param xlim: two element sorted numeric iterable :param ylim: two element sorted numeric iterable :param customization: dict-like Each key should match the name of an overlay, like 'scaled_boundary' or 'gas_injection'. The values should be dict-like and contain keywords and values to pass to the overlay functions. Some of these keywords have been standardized, but others are specific to individual plot overlay functions. The GUI will interpret function docstrings to provide entries for setting these up properly. :param default_fontsize: float :param show_legend: int 0: prevent legend, even if it's a good idea 1: do legend even if other annotations could probably cover it 2: auto select: do legend for multi-cases only :param show_cornernote: int 0: prevent cornernote, even if it's a good idea 1: do cornernote based on primary case even if it might be not be the best idea (like if it mismatches with other cases that are also shown) 2: auto select: do cornernote for single case only :param eq_active: bool Equilibrium overlay active? :param wall_active: bool Wall overlay active / included in eq overlay? :param profile_x_quantity: str Quantity to use on the X-axes of the profile plots. :param profile_y_quantities: list of strings Quantities to show on the Y-axes of the profile plots. Must be at least as long as profile_axs. :param plot_style_kw: dict-like Settings for enforcing plot axes style :param eqkw: dict-like Customization options for equilibrium overlay contours and profile plots, passed to plot for boundary & contour for flux surfaces and to profiles. :param xkw: dict-like Customization options for equilibrium overlay's x-point, such as marker, color, markersize, mew, ... """ out_loc, settings_loc, out, settings, parent_settings, scratch = pick_output_location() if ods is None: printd('plot_single_cx_ods(): no ODS passed; must try to look up...', topic='efitviewer') if ods_idx == 0: ods_tag = 'efitviewer_ods' else: ods_tag = 'efitviewer_ods_{}'.format(ods_idx) ods = out.get(ods_tag, None) printd('plot_single_cx_ods(): tried to get tag {} from {}'.format(ods_tag, out_loc), topic='efitviewer') else: printd('plot_single_cx_ods(): ODS was received; no auto-lookup', topic='efitviewer') ods_tag = None if ods is None: printd('plot_single_cx_ods(): ODS is still None after trying auto-lookup; aborting.', topic='efitviewer') return printd('plot_single_cx_ods(): ODS is okay now.', topic='efitviewer') timing_ref = None # time.time() # If this is not None, timing information will be printed if timing_ref is not None: printe('-' * 80) if clear_first and cx_ax is None and profile_axs is None: fig = scratch.setdefault('efitviewer_fig', None) if fig is None: fig = scratch['efitviewer_fig'] = pyplot.gcf() for ax in fig.axes: ax.remove() populate_efitviewer_figure( fig, scratch, show_cx=settings.setdefault('show_cx', True), num_profiles=settings['profiles'].setdefault('num_profiles', 0), num_profile_cols=settings['profiles'].setdefault('num_profile_cols', 0), profiles_sharex=settings['profiles'].setdefault('sharex', 'all'), profiles_sharey=settings['profiles'].setdefault('sharey', 'none'), **settings['profiles'].setdefault('gridspec_kw', {}), ) cx_ax = get_axes(cx_ax) if isinstance(cx_ax, matplotlib.axes.Axes): fig = cx_ax.figure else: fig = None profile_axs = get_axes(profile_axs, which='profile') if fig is None and profile_axs is not None and isinstance(profile_axs[0, 0], matplotlib.axes.Axes): fig = profile_axs[0, 0].figure elif fig is None: fig = scratch.get('efitviewer_fig', None) if clear_first: if cx_ax is not None: cx_ax.cla() if profile_axs is not None: for ax in profile_axs.flatten(): if ax is not None: ax.cla() # noinspection PyBroadException try: if 'comment' in ods['dataset_description.ids_properties']: ids_comment = 'There is a comment in the ODS:\n' + ods['dataset_description.ids_properties.comment'] else: ids_comment = '' if 'equilibrium' not in ods: raise EfitviewerDataError('No equilibrium data. Cannot plot.' + ids_comment) if 'time' not in ods['equilibrium']: raise EfitviewerDataError('No time in equilibrium; nothing to plot.' + ids_comment) # Resolve keyword default values if contour_quantity is None: contour_quantity = settings.setdefault('contour_quantity', 'psi_norm') if allow_fallback is None: allow_fallback = settings.setdefault('allow_fallback', False) if levels is None: levels = settings['{}_levels'.format(contour_quantity)] if contour_resample_factor is None: contour_resample_factor = settings['contour_resample_factor'] if xlim is None: xlim = settings['xlim'] if ylim is None: ylim = settings['ylim'] if overlays is None: overlays = settings['systems'] if special_overlays is None: special_overlays = settings['special_systems'] if t is None: t = evalExpr(settings['cases'][ods_idx]['time']) if customization is None: customization = settings['co_efv'] if default_fontsize is None: default_fontsize = settings.get('default_fontsize', None) if show_legend is None: show_legend = settings.setdefault('show_legend', 2) if show_cornernote is None: show_cornernote = settings.setdefault('show_cornernote', 2) if eq_active is None: eq_active = settings['cases'][ods_idx]['active'] if wall_active is None: wall_active = settings['cases'][ods_idx].get('wall', True) settings.setdefault('profiles', SettingsName()) if profile_x_quantity is None: profile_x_quantity = settings['profiles'].get('xaxis_quantity', None) if profile_y_quantities is None: profile_y_quantities = [settings['profiles'].get(f'yaxis_quantity_{i}', None) for i in range(len(profile_axs.flatten()))] if plot_style_kw is None: plot_style_kw = settings.setdefault('plot_style_kw', {}) if eqkw is None: eqkw = settings['cases'][ods_idx].get('eqkw', {}) if xkw is None: xkw = settings['cases'][ods_idx].get('xkw', {'marker': 'x'}) if timing_ref is not None: print(time.time() - timing_ref, 'omfitlib efitviewer unpack stuff') # Get machine and shot info device = tokamak(ods['dataset_description.data_entry'].get('machine', settings.get('cases', {}).get(0, {}).get('device', None))) shot = ods['dataset_description.data_entry'].get('pulse', evalExpr(settings.get('cases', {}).get(0, {}).get('shot', None))) try: efitid = ods['dataset_description.ids_properties.comment'].split('EFIT tree = ')[-1] except ValueError: efitid = '' if t is None: printd('plot_single_cx_ods{}: t is None; aborting...', topic='efitviewer') return t = np.atleast_1d(t) printd('ods_idx', ods_idx, 'ods_tag', ods_tag, 't', t) # Gather overlay information as needed for overlay, overlay_active in overlays.items(): printd( 'checking overlay {}; active = {}, in ods = {}, in pulse_schedule: {}'.format( overlay, overlay_active, overlay in ods, overlay in ods['pulse_schedule'] ) ) if overlay_active and (overlay not in ods) and (overlay not in ods['pulse_schedule']): printd(' Need to add missing overlay data: {}'.format(overlay)) add_hardware_to_ods(ods, device, shot, overlay) if timing_ref is not None: print(time.time() - timing_ref, 'omfitlib efitviewer add missing data') # Plot equilibrium time-slice(s) if eq_active: printd('plot_single_cx_ods{}: this eq slice is active; trying to plot', topic='efitviewer') it = None for t_ in t: if t_ is None: it = 0 else: it = closestIndex(ods['equilibrium.time'] * 1000, t_) # Set up keywords for equilibrium_CX if multi_cases: eqcx_label = 'eq#{}: {}#{} {} {:0.1f} ms'.format(ods_idx, device, shot, efitid, ods['equilibrium.time'][it] * 1000) else: eqcx_label = 'Equilibrium {:0.1f} ms'.format(ods['equilibrium.time'][it] * 1000) eqcx_label = eqkw.pop('label', eqcx_label) eqcxkw = dict( ax=cx_ax, time_index=it, levels=levels, label=eqcx_label, contour_quantity=contour_quantity, allow_fallback=allow_fallback, sf=contour_resample_factor, xkw=xkw, show_wall=( (not special_overlays.get('alt_limiter', False)) or customization.get('alt_limiter', {}).get('retain_wall', True) ) and wall_active, **eqkw, ) # Remove unsupported keywords based on omas version for kw, ver in omas_features.items(): if compare_version(ver, omas.__version__) > 0: # Version required to support this keyword is newer than the OMAS version in use, so skip. eqcxkw.pop(kw, None) if cx_ax is not None: ods.plot_equilibrium_CX(**eqcxkw) # The call to draw the equilibrium contours if profile_axs is not None: plot_single_time_profile_set( ods=ods, t_idx=it, profile_axs=profile_axs, profile_x_quantity=profile_x_quantity, profile_y_quantities=profile_y_quantities, gentle=gentle, **eqkw, ) elif cx_ax is not None: printd('plot_single_cx_ods{}: this eq slice is INACTIVE; skipping eq plot', topic='efitviewer') it = 0 cx_ax.set_aspect('equal') cx_ax.set_frame_on(False) cx_ax.xaxis.set_ticks_position('bottom') cx_ax.yaxis.set_ticks_position('left') if cx_ax is not None: def overlay_customizations(ovls): ovls = dict(ovls) for ovl in ovls: if ovls[ovl] and ovl in customization: # Replace "True" w/ custom overlay instructions (if present) but first do some processing custom = dict(customization[ovl]) for item in list(custom.keys()): # Remove "None", since we were too busy to look up real defaults & they might not be None if custom[item] is None: if item == 't': custom[item] = t * 1e-3 else: custom.pop(item, None) if item.startswith('pass_in_keywords_'): pikw = custom.pop(item, {}) custom.update(pikw) # Special debugging if ovl in ['position_control', 'pulse_schedule']: custom['timing_ref'] = timing_ref # Custom instructions ready if len(custom): ovls[ovl] = custom else: ovls[ovl] = True return ovls # Plot hardware overlay(s) overlays = overlay_customizations(overlays) scratch['overlays'] = overlays old_font_size = copy.copy(rcParams['font.size']) if default_fontsize is not None: try: rcParams['font.size'] = default_fontsize except ValueError as exc: printe(f"Could not set rcParams['font.size'] = {repr(default_fontsize)} " f"(default_fontsize), because of {exc}") scratch['overlays'] = overlays # Thomson scattering has a different default than others, so we must make sure blank is replaced by False overlays.setdefault('thomson_scattering', False) ods.plot_overlay(ax=cx_ax, **overlays) # The call to do the overlays # Plot special hardware overlay(s) special_overlays = overlay_customizations(special_overlays) scratch['special_overlays'] = special_overlays for so, soa in special_overlays.items(): if soa: skw = soa if isinstance(soa, dict) else {} function_name = 'plot_{}_overlay'.format(so) try: special_function = eval(function_name) except NameError: printe('Unrecognized special overlay: {} has no function (expected {})'.format(so, function_name)) else: special_function(ax=cx_ax, ods=ods, **skw) # Annotations and finishing touches actually_show_legend = (multi_cases and (show_legend > 0)) or (show_legend == 1) actually_show_cornernote = (not multi_cases and (show_cornernote > 0)) or (show_cornernote == 1) if actually_show_legend: cx_ax.legend(loc=0) if actually_show_cornernote: cornernote( ax=cx_ax, device=device, shot='{} {}'.format(shot, efitid), time='{:0.2f}'.format(ods['equilibrium.time'][it] * 1000) if len(t) == 1 else ' ', ) if xlim is not None: cx_ax.set_xlim(xlim) if ylim is not None: cx_ax.set_ylim(ylim) if len(t) > 1: cx_ax.legend(loc=3) plot_style_enforcement(ax=cx_ax, plot_style=plot_style_kw) if default_fontsize is not None: rcParams['font.size'] = old_font_size # Make sure it draws if fig is not None: fig.canvas.draw() printd('plot_single_cx_ods{}: finished plot attempts with no exceptions to catch', topic='efitviewer') except Exception: printd('plot_single_cx_ods{}: exception while trying to plot...', topic='efitviewer') if gentle: # Display the error instead of bringing the whole GUI down if the plot fails. print('Error in plot_cx:') report = ''.join(traceback.format_exception(*sys.exc_info())) printe(report) # noinspection PyBroadException try: if cx_ax is None: cx_ax = gca() if clear_first: cx_ax.cla() cx_ax.text(0.05, 0.95, report, ha='left', va='top', color='red') except Exception: printd('plot_single_cx_ods{}: Failed to properly report an exception!', topic='efitviewer') else: raise return # Export ---------------------------------------------------------------------------------------------------------- def dump_plot_command_content(): """ Writes the contents of a script for reproducing the equilibrium cross section plot based on current settings :return: string Source code for a script that will reproduce the plot """ import getpass import socket out_loc, settings_loc, out, settings, parent_settings, scratch = pick_output_location() ods = out['efitviewer_ods'] device = tokamak(ods['dataset_description.data_entry'].get('machine', settings['cases'][0]['device'])) shot = ods['dataset_description.data_entry'].get('pulse', evalExpr(settings['cases'][0]['shot'])) case_defaults = dict(time=0, active=True, wall=True, eqkw={}, xkw=dict(marker='x')) cases = {k: {kk: v.get(kk, case_defaults.get(kk, None)) for kk in v} for k, v in settings['cases'].items()} overlays = {k: v for k, v in settings['systems'].items() if v} special_overlays = {k: v for k, v in settings['special_systems'].items() if v} all_ovl = copy.copy(overlays) all_ovl.update(special_overlays) default_fontsize = settings.get('default_fontsize', None) customization = {} for k, v in settings['co_efv'].items(): if all_ovl.get(k, False): customization[k] = {kk: vv for kk, vv in v.items() if (vv is not None) or (kk == 't')} for kk in list(customization[k].keys()): # List first because the dict could change during the loop if kk.startswith('pass_in') and len(customization[k][kk]) == 0: # Remove empty pass_in_keywords / pass_in_args since they are nothing but clutter customization[k].pop(kk, None) # Create a string that contains load instructions so it evaluates into a dict containing ODS instances load_instructions_dict = {index: settings['cases'][index].get('instructions_for_loading_ods', 'None') for index in settings['cases']} load_instructions_string = {index: 'replace_{}'.format(index) for index in settings['cases']} load_instructions_string = repr(load_instructions_string) for index in settings['cases']: load_instructions_string = load_instructions_string.replace("'replace_{}'".format(index), load_instructions_dict[index]) _, default_profile_y_quantities = find_profile_y_options() contents = '''def plot_eq_cx(): """ Plots an equilibrium slice with a specific configuration of hardware overlay(s) Automatically generated function output by efitviewer. Please save this file somewhere permanent to avoid losing it. Generated {date:} by {user:} on {host:} OMFIT version: {omfit_version:} """ from omfit_classes.omfit_efitviewer import plot_cx_ods, populate_efitviewer_figure device = {device:} shot = {shot:} cases = {cases:} odss = {load_instructions:} fig = pyplot.figure() gridspec_kw = {gridspec_kw:} cx_ax, profile_axs = populate_efitviewer_figure( fig, scratch, show_cx={show_cx:}, num_profiles={num_profiles:}, num_profile_cols={num_profile_cols:}, profiles_sharex={profiles_sharex:}, profiles_sharey={profiles_sharey:}, **gridspec_kw ) levels = {levels:} customization = {customization:} plot_cx_ods( cx_ax=cx_ax, profile_axs=profile_axs, odss=odss, cases=cases, contour_quantity={contour_quantity:}, allow_fallback={allow_fallback:}, levels=levels, plot_style_kw={plot_style_kw:}, contour_resample_factor={contour_resample_factor:}, xlim={xlim:}, ylim={ylim:}, overlays={overlays:}, special_overlays={special_overlays:}, customization=customization, default_fontsize={default_fontsize:}, show_legend={show_legend:}, profile_x_quantity={profile_x_quantity:}, profile_y_quantities={profile_y_quantities:}, ) return '''.format( device=repr(device), shot=repr(shot), cases=repr(cases), load_instructions=load_instructions_string, contour_quantity=repr(settings['contour_quantity']), allow_fallback=repr(settings['allow_fallback']), levels=repr(settings['{}_levels'.format(settings['contour_quantity'])]), plot_style_kw=settings.setdefault('plot_style_kw', {}), contour_resample_factor=settings['contour_resample_factor'], xlim=repr(settings['xlim']), ylim=repr(settings['ylim']), overlays=repr(overlays), special_overlays=repr(special_overlays), customization=repr(customization), date=datetime.datetime.now(), user=getpass.getuser(), host=socket.gethostname(), omfit_version=repo.active_branch()[-1], default_fontsize=default_fontsize, show_legend=settings.setdefault('show_legend', 2), gridspec_kw=repr(settings['profiles'].get('gridspec_kw', {})), show_cx=settings.get('show_cx', True), num_profiles=settings['profiles'].get('num_profiles', 0), num_profile_cols=settings['profiles'].get('num_profile_cols', 0), profiles_sharex=repr(settings['profiles'].get('sharex', 'all')), profiles_sharey=repr(settings['profiles'].get('sharey', 'none')), profile_x_quantity=repr(settings['profiles'].get('xaxis_quantity', None)), profile_y_quantities=[ settings['profiles'].get(f'yaxis_quantity_{i}', default_profile_y_quantities[i]) for i in range(settings['profiles'].get('num_profiles', 0)) ], ) contents += '\nplot_eq_cx()' contents = omfit_formatter(contents) print('The following commands will generate the last saved equilibrium cross section plot:\n') printi(contents) print('') return contents def dump_plot_command(): """ Writes a script for reproducing the equilibrium cross section plot based on current settings :return: (OMFITpythonTask, string) Reference to the script and its contents """ from omfit_classes.omfit_python import OMFITpythonTask out_loc, settings_loc, out, settings, parent_settings, scratch = pick_output_location() ods = out['efitviewer_ods'] device = tokamak(ods['dataset_description.data_entry'].get('machine', settings['cases'][0]['device'])) shot = ods['dataset_description.data_entry'].get('pulse', evalExpr(settings['cases'][0]['shot'])) filename = settings.setdefault('export_script_filename', "saved_eq_cx_plot_{}_{}".format(device, shot)) ext = os.extsep + 'py' if filename.endswith(ext): filename = filename[: -len(ext)] contents = dump_plot_command_content() script = out[filename] = OMFITpythonTask(filename + ext, fromString=contents) print("Script saved to out['{}']".format(filename)) return script, contents def box_plot_command(number=None, location=None): """ Puts commands for reproducing the plot into the command box :param number: int Index (not visible number) of the command box to edit. Although normally an int, there are some special values: None: get number from location instead 'new', 'New', or -1: create a new command box and use its number :param location: string Location in the tree where the command box number (not index) is stored :return: string String written to command box """ if number is None: number_plus_1 = eval(location) number = number_plus_1 - 1 if is_numeric(number_plus_1) else number_plus_1 if str(number).lower().startswith('new') or (number is None) or (is_numeric(number) and number < 0): number = add_command_box_tab() assert is_numeric(number) assert number >= 0 contents = dump_plot_command_content() prior_cmd_box_contents = OMFITaux['GUI'].command[number].get() printw('Contents of command box index {}, #{} prior to overwriting with plot script:'.format(number, number + 1)) printw('-' * 80) print(prior_cmd_box_contents) printw('-' * 80) printw('End of prior command box contents') OMFITaux['GUI'].command[number].set(contents) return contents # GUI helpers ----------------------------------------------------------------------------------------------------- def add_case(): """Extends the cases available for overlay by adding one case to the end""" out_loc, settings_loc, out, settings, parent_settings, scratch = pick_output_location() new_idx = np.max(list(settings['cases'].keys())) + 1 settings['cases'][int(new_idx)] = SettingsName() # new_idx must be int, not np.int64 return def delete_case(idx): """ Removes a case from the list :param idx: int Index of case to delete :return: new_max: int New maximum case index, in case you wanted to know """ out_loc, settings_loc, out, settings, parent_settings, scratch = pick_output_location() assert idx > 0, "You're not allowed to delete the primary case. Naughty!" del settings['cases'][idx] new_max = np.max(list(settings['cases'].keys())) return new_max def update_cx_time(location, index=None): """ Updates list of times to use for overlays. In this way, the plot can be updated to add/remove diagnostics without forgetting time-slice overlay setup. :param location: string Location in the OMFITtree of the setting that changed to trigger the overlay update :param index: int Index of the case to update """ out_loc, settings_loc, out, settings, parent_settings, scratch = pick_output_location() printd(f'update_cx_time: {location}, index={index}', topic='efitviewer') new_time = eval(location) if index is None: try: index = int(location.split("['cases'][")[-1].split("]['time']")[0]) except ValueError: try: index = int(location.split("scratch']['efitviewer_")[-1].split("_")[0]) except ValueError: index = int(location.split("__scratch__']['efitviewer_")[-1].split("_")[0]) printd(f' update_cx_time: location = {location:}, index = {index:}', topic='efitviewer') if settings['overlay']: new_times = np.append(np.atleast_1d(settings['cases'].setdefault(index, SettingsName()).setdefault('time', None)), new_time) else: new_times = np.atleast_1d(new_time) settings['cases'].setdefault(index, SettingsName())['time'] = new_times plot_cx_ods(clear_first=True) return def update_gui_figure(location=None, **kw): r""" Handle updates to the figure embedded in the GUI while setting defaults for appropriate keywords :param location: string Location in the OMFITtree of the setting that changed to trigger the overlay update :param \**kw: Additional keywords passed to plot_cx_ods You can override defaults """ printd('updating figure after changing location: {}'.format(location)) kw.setdefault('clear_first', True) kw.setdefault('gentle', True) plot_cx_ods(**kw) return def update_overlay(location): """ Updates times when overlay changes; used to clear old overlays when exiting overlay mode :param location: string Location in the OMFITtree of the setting that changed to trigger the overlay update """ out_loc, settings_loc, out, settings, parent_settings, scratch = pick_output_location() overlay = eval(location) if not overlay: for idx in settings['cases']: new_times = np.atleast_1d(settings['cases'][idx]['time'])[-1] settings['cases'][idx]['time'] = new_times update_gui_figure() return def default_xylim(dev=None): """Updates current settings to match device defaults""" out_loc, settings_loc, out, settings, parent_settings, scratch = pick_output_location() if dev is None: try: dev = out['efitviewer_ods']['dataset_description.data_entry.machine'] except (KeyError, ValueError): dev = tokamak(settings['cases'][0]['device']) default_xlim = default_plot_limits.get(dev, {}).get('x', None) default_ylim = default_plot_limits.get(dev, {}).get('y', None) settings['xlim'] = default_xlim settings['ylim'] = default_ylim update_gui_figure() return def grab_xylim(decimals=None): """ Grabs current figure limits and updates settings so updated plots will preserve limits :param decimals: int How many decimal places to retain while rounding :return: xlim, ylim They are loaded into settings, so catching the return is not necessary """ out_loc, settings_loc, out, settings, parent_settings, scratch = pick_output_location() if decimals is None: decimals = settings.setdefault('xylim_grab_rounding_decimals', 3) def proccess_lim(a): if decimals is None or decimals < 0: return a return tuple([np.round(aa, decimals) if is_numeric(aa) else aa for aa in np.atleast_1d(a)]) ax = scratch.get('efitviewer_cx_ax', None) if ax is not None: xlim = proccess_lim(ax.get_xlim()) ylim = proccess_lim(ax.get_ylim()) settings['xlim'] = xlim settings['ylim'] = ylim else: xlim = ylim = None return xlim, ylim def load_contours(): """ Loads contours specified by custom_contour_spacing setup :return: float array of contour levels They are loaded into SETTINGS so you don't have to catch this, but you can, if you want to. """ out_loc, settings_loc, out, settings, parent_settings, scratch = pick_output_location() cq = settings['contour_quantity'] cq_in = dict(psi='psin').get(cq, cq) spacing = settings['custom_contour_spacing'] levels = setup_special_contours(contour_quantity=cq_in, spacing=spacing) settings['{}_levels'.format(cq)] = levels return levels def get_default_snap_list(device): """ Guess default snap list based on device :param device: str :return: dict """ guesses = {'EAST': {'EFIT_EAST': 'EFIT_EAST'}, 'KSTAR': {'EFIT01': 'EFIT01', 'EFITRT1': 'EFITRT1'}} guesses['EAST_US'] = guesses['EAST'] for gd in guesses.keys(): if is_device(device, gd): return guesses[gd] return {} def find_cq_options(): """ Determines what should be available as options for the contour quantity :return: tuple list: list of options (as strings) str: name of the recommended default contour quantity """ out_loc, settings_loc, out, settings, parent_settings, scratch = pick_output_location() cq_opts = [] if 'psi' in out['efitviewer_ods']['equilibrium.time_slice[0].profiles_2d[0]']: cq_opts += ['psi_norm', 'psi'] if 'psi' in out['efitviewer_ods']['equilibrium.time_slice[0].profiles_1d']: # psi is used to interpolate, so 1d and 2d psi are both needed to support the 1d profiles. # Here are some popular ones: popular_from_1d = ['q', 'rho_tor', 'rho_tor_norm'] for pf1 in popular_from_1d: if pf1 in out['efitviewer_ods']['equilibrium.time_slice[0].profiles_1d']: cq_opts += [pf1] if 'phi' in out['efitviewer_ods']['equilibrium.time_slice[0].profiles_2d[0]']: cq_opts += ['phi'] if len(cq_opts): default_contour_quantity = 'psi_norm' if 'psi_norm' in cq_opts else cq_opts[0] else: default_contour_quantity = None return cq_opts, default_contour_quantity def find_profile_y_options(index=0, allow_2d=True, allow_constraints=True): """ Finds a list of quantities that could go on the y axis of the profile plots :param index: int Index of time-slice to check in, in case different times have different availability (hopefully not) :param allow_2d: bool Allow 2D profiles to enter the list. These have to be sliced later to make 1D plots. :param allow_constraints: bool Allow constraint profiles to enter the list. These have to have associated functions for interpreting constraint data properly. :return: tuple list: all options for y axis quantities list: list of recommended defaults for each plot panel """ out_loc, settings_loc, out, settings, parent_settings, scratch = pick_output_location() yopts = [k for k, v in out['efitviewer_ods']['equilibrium']['time_slice'][index]['profiles_1d'].items() if isinstance(v, np.ndarray)] if 'j_tor' not in yopts: if index == 0: ods = out['efitviewer_ods'] else: ods = out[f'efitviewer_ods_{index}'] if calculate_j_toroidal(ods, 0, check_availability=True): yopts += ['j_tor'] if allow_2d: yopts += [ k for k, v in out['efitviewer_ods']['equilibrium']['time_slice'][index]['profiles_2d.0'].items() if isinstance(v, np.ndarray) ] if 'b_field_tor' not in yopts: yopts += ['b_field_tor'] if allow_constraints: for k in out['efitviewer_ods']['equilibrium']['time_slice'][index]['constraints']: function_name = f'constraint_profile_{k}' try: eval(function_name) except NameError: pass else: yopts += [k] priorities = ['pressure', 'q', 'dpressure_dpsi', 'f_df_dpsi', 'phi'] ydefault = [p for p in priorities if p in yopts] ydefault += [y for y in yopts if y not in ydefault] ydefault += yopts ydefault += [None] * 100 return yopts, ydefault def setup_default_profiles(index=0): """Loads a group of settings that will probably make for good-looking & interesting profiles""" out_loc, settings_loc, out, settings, parent_settings, scratch = pick_output_location() # Axis quantities desired_quantities = ['pressure', 'q', 'dpressure_dpsi', 'f_df_dpsi'] quantities = [dq for dq in desired_quantities if dq in out['efitviewer_ods']['equilibrium']['time_slice'][index]['profiles_1d']] p = settings.setdefault('profiles', SettingsName()) p['xaxis_quantity'] = 'rho_tor_norm' for i, q in enumerate(quantities): p[f'yaxis_quantity_{i}'] = q # Subplot counts settings['show_cx'] = True p['num_profiles'] = len(quantities) p['num_profile_cols'] = square_subplots(len(quantities), just_numbers=True)[1] # Figure arrangement: size, spacing, etc. settings['figsize'] = (10, 8) p['gridspec_kw'] = {'width_ratios': [1 + p['num_profile_cols']] + [1] * p['num_profile_cols']} layout_tag = f"cx-{p['num_profiles']}-{p['num_profile_cols']}" layout = ( settings.setdefault('plot_style_kw', SettingsName()) .setdefault('subplot_layout', SettingsName()) .setdefault(layout_tag, SettingsName()) ) layout.update(dict(wspace=0.16, left=0.03, right=0.92))