Source code for omfit_classes.omfit_thomson

try:
    # framework is running
    from .startup_choice import *
except (ImportError, OSError) as _excp:
    # class is imported by itself
    if (
        'attempted relative import with no known parent package' in str(_excp)
        or 'No module named \'omfit_classes\'' in str(_excp)
        or "No module named '__main__.startup_choice'" in str(_excp)
    ):
        from startup_choice import *
    else:
        raise

from omfit_classes.omfit_error import OMFITerror
from omfit_classes.utils_base import printd, tolist
from omfit_classes.utils_fusion import is_device, tokamak, lambda_debye
from omfit_classes.utils_math import closestIndex
from omfit_classes.omfit_mds import OMFITmdsValue
from omfit_classes.omfit_rdb import OMFITrdb
from omfit_classes.omfit_eqdsk import read_basic_eq_from_mds

import numpy as np
from numpy.core import shape
from scipy.interpolate import interp1d
from scipy import constants
import unittest
from uncertainties.unumpy import uarray, std_devs, nominal_values

try:
    import MDSplus
except Exception:  # MDSplus raises Exception, so this can't be more narrow
    print('Error importing MDSplus in omfit_elm.py.')
    mds_available = False
else:
    mds_available = True

__all__ = []


def _available_to_user(f):
    __all__.append(f.__name__)
    return f


[docs]@_available_to_user class OMFITthomson(SortedDict): """ Helps with fetching data from the Thomson Scattering diagnostic. It also has some other handy features to support analysis based on Thomson data: - Filter data by fractional error (e.g. throw out anything with > 10% uncertainty) - Filter data by reduced chi squared (e.g. throw out anything with redchisq > 8) - Filter data using ELM timing (e.g. only accept data if they are sampled between 50 and 99% of their local inter-ELM period) """ def __init__( self, device='DIII-D', shot=0, efitid='EFIT01', revision_num=-1, subsystems='auto_select', override_default_measurements=None, quality_filters='default', elm_filter=None, efit_data=None, allow_backup_efitid=False, debug=False, verbose=False, ): """ :param device: Device name, like 'DIII-D' :param shot: Shot number to analyze. :param efitid: String describing the EFIT to use for mapping, such as 'EFIT01'. For DIII-D, 'EFIT03' and 'EFIT04' are recommended because they are calculated on the same time base as TS.S :param revision_num: A string specifying a revision like 'REVISION00' or just the number like 0. -1 Selects the "blessed" or "best" revision automatically. -2 selects the best revision the same as in -1, but also creates a folder with raw data from all revisions. :param subsystems: List of Thomson systems to do handle. For DIII-D, this can be any subset of ['core', 'divertor', 'tangential']. For other devices, this setting does nothing and the systems list will be forced to ['core']. Set this to 'auto_select' to pick a setting that's a good idea for your device. (Currently, all non-DIII-D devices are forced to ['core'].) :param override_default_measurements: list of strings [optional] Use this to do lightweight gathering of only a few quantities. More advanced uses, like filtering, require all of the default quantities. :param quality_filters: Set to 'default' or a dictionary structure specifying settings for quality filters. Missing settings will be set to default values (so an empty dictionary {} is a valid input here). Top level settings in quality_filters: - remove_bad_slices: Any time-slice which is all bad measurements or any chord which is all bad will be identified. These can be removed from the final dataset, which saves the user from carting around bad data. - set_bad_points_to_zero: Multiply data by the okay flag, which will set all points marked as bad to 0+/-0 - ip_thresh_frac_of_max: Set a threshold on Ip so that slices with low Ip (such as at the start of the shot or during rampdown) will not pass the filter. In addition, there are filters specified on a subsystem-by-subsystem basis. In addition to the real subsystems, there is a 'global_override' subsystem, which takes precedence if its settings aren't None. - bad_chords: array or list of bad chord indices for this subsystem. Set to empty list if no bad channels. (Real subsystems only, no global override) - redchisq_limit: A number specifying the maximum acceptable reduced chi squared value. This refers to the fit to Thomson's raw pulsed and DC data signals to determine Te and ne. - frac_temp_err_hot_max: Upper limit on acceptable fractional uncertainty in Te when Te is above the hot_cold_boundary threshold. - frac_temp_err_cold_max: Upper limit on acceptable fractional uncertainty in Te when Te is below the hot_cold_boundary threshold. - hot_cold_boundary: Te boundary between "hot" and "cold" temperatures, which have different fractional uncertainty limits. - frac_dens_err_max: Maximum fractional uncertainty in ne measurements. :param elm_filter: Provide an instance of an ELM filtering class like OMFITelm or set to None to have OMFITthomson set this up automatically. :param efit_data: This is usually None, which instructs OMFITthomson to gather its own EFIT data to use in mapping. However, you can pass in a dictionary with contents matching the format returned by self.gather_efit_data() and then self.gather_efit_data() will be skipped. :param allow_backup_efitid: T/F: Allow self.gather_efit_data() to choose self.efitid if it fails to find data for the requested EFIT. :param debug: bool Debug mode saves some intermediate results in a special dictionary. :param verbose: bool Always print debugging statements. May be useful when using this class outside the framework. """ # super().__init__() SortedDict.__init__(self) self.status = {'setup': False, 'gathered': False, 'mapped': False, 'filtered': False, 'calcs': False} self.verbose = verbose # Handle basic settings self.device = tokamak(device) self.supported_devices = ['DIII-D', 'NSTX', 'NSTXU'] if shot is None: shot = 0 if shot <= 0: shot += self.lookup_latest_shot() self.shot = shot self.efitid = efitid self.allow_backup_efitid = allow_backup_efitid self.revision_num = revision_num if subsystems == 'auto_select': self.subsystems = ['core', 'tangential'] if is_device(device, 'DIII-D') else ['core'] else: self.subsystems = tolist(subsystems) if is_device(self.device, 'DIII-D') else ['core'] if override_default_measurements is None: self.measurements = ['r', 'z', 'temp', 'temp_e', 'density', 'density_e', 'redchisq'] else: printw( 'WARNING! OMFITthomson needs all of the default measurements to optimally perform key functions ' 'like filtering. You should not change the measurements list unless you are doing lightweight raw ' 'data gathering only.' ) self.measurements = override_default_measurements # Figure out the revision number. It should be a string. If the user supplied just a number, build a string # from that. If the number is -1, that indicates that the blessed revision should be used (-1 is not a valid # revision # in MDS) if isinstance(revision_num, str): # The user supplied a string self.revision = revision_num self.revision_num = re.findall(r'\d+', revision_num)[0] elif revision_num in [-1, -2]: # Special case: -1 means that the blessed/best revision should be used. For DIII-D, "BLESSED" is a # pointer to one of the numbered revisions that is controlled by the Thomson scattering group. The TS # group should've pointed blessed at the best revision. if is_device(self.device, 'DIII-D'): self.revision = 'blessed' elif is_device(self.device, ('NSTX', 'NSTX-U')): self.revision = 'best' else: raise IOError( 'Unrecognized server/device option: {}. Please choose an option from {}.'.format(self.device, self.supported_devices) ) else: # The user supplied a number, so convert it into a proper string if is_device(self.device, 'DIII-D'): self.revision = 'REVISIONS.REVISION{:02d}'.format(revision_num) elif is_device(self.device, ('NSTX', 'NSTX-U')): self.revision = 'TS{:0d}'.format(revision_num) else: raise IOError( 'Unrecognized server/device option: {}. Please choose an option from {}.'.format(self.dvecice, self.supported_devices) ) # Handle quality filters self.default_quality_filters = { 'ip_thresh_frac_of_max': 0.2, 'set_bad_points_to_zero': True, 'remove_bad_slices': True, 'global_override': { 'redchisq_limit': None, 'frac_temp_err_hot_max': None, 'frac_temp_err_cold_max': None, 'hot_cold_boundary': None, 'frac_dens_err_max': None, }, 'core': { 'redchisq_limit': 8.0, 'bad_chords': np.array([40]), 'frac_temp_err_hot_max': 0.5, 'frac_temp_err_cold_max': 0.95, 'hot_cold_boundary': 50.0, 'frac_dens_err_max': 0.5, }, 'tangential': { 'redchisq_limit': 8.0, 'bad_chords': None, 'frac_temp_err_hot_max': 0.5, 'frac_temp_err_cold_max': 0.95, 'hot_cold_boundary': 100.0, 'frac_dens_err_max': 0.5, }, 'divertor': { 'redchisq_limit': 8.0, 'bad_chords': None, 'frac_temp_err_hot_max': 0.5, 'frac_temp_err_cold_max': 0.95, 'hot_cold_boundary': 15.0, 'frac_dens_err_max': 0.5, }, } self.quality_filters = self.default_quality_filters if quality_filters == 'default': self.printdq(' Using default quality filters') elif isinstance(quality_filters, dict): self.printdq(' Got dictionary of quality filters') sub_keys = ['core', 'tangential', 'divertor', 'global_override'] non_sub_keys = [k for k in list(self.default_quality_filters.keys()) if k not in sub_keys] for k in non_sub_keys: qf = quality_filters.get(k, None) if qf is None: self.printdq(' {} not found in quality filter spec. Using default value for {}'.format(k, k)) qf = self.default_quality_filters[k] else: self.printdq(' {} found in quality filters specified in input!'.format(k)) self.quality_filters[k] = qf for sk in sub_keys: if sk in quality_filters: self.printdq(' Input quality filter spec has data for {} subsystem, updating values...'.format(sk)) keys = list(qualit_filters[sk].keys()) for k in keys: qf = quality_filters[sk].get(k, None) if qf is None: self.printdq( ' {} not found in quality filter spec for {} subsystem. ' 'Using default value for {}/{}'.format(k, sk, sk, k) ) qf = self.default_quality_filters[sk][k] else: self.printdq(' {}/{} found in quality filters specified in input!'.format(sk, k)) self.quality_filters[sk][k] = qf else: printw(' Did not understand what to do with quality filter specification. Using defaults instead.') # Handle ELM filter self.elm_filter = False # This would be an instance of the ELM filtering module if it were in use. # Put here so not always imported - requires framework from omfit_classes.omfit_elm import OMFITelm if elm_filter is None: self.elm_filter = OMFITelm(shot=shot, device=device) elif elm_filter in ['disabled', 'off', 'no']: print(' ELM filter disabled') elif isinstance(elm_filter, OMFITelm): self.elm_filter = elm_filter else: print(' Unhandled ELM filter setup.') printw(' ELM filter disabled due to unhandled setup!') # Make labels global_redchisq = self.quality_filters['global_override']['redchisq_limit'] if global_redchisq is None: global_redchisq = 0.0 try: elm_phase = elm_filter['elm_phase_range'] elm_reject = elm_filter['elm_since_reject_below'] elm_since = elm_filter['elm_since_accept_above'] elm_smoother = elm_filter['smoother'] except (KeyError, NameError, TypeError): # KeyError if one of the tags is missing, NameError if a dependency isn't defined, # and TypeError if one of the intermediate tags can't be indexed by a string. elm_phase = [0, 0] elm_reject = 0 elm_since = 0 elm_smoother = 'x' self.short_label = ''.join(['{device:}_{shot:06d}_rev{revision:}']).format( device=device, shot=shot, revision=int(self.revision_num) ) self.label = ''.join( [ '{shot:06d}_rev{revision:}_rchsq-g{global_redchisq:05.1f}' 'c{core_redchisq:05.1f}d{div_redchisq:05.1f}t{tan_redchisq:05.1f}_{efitid:}_', 'elm-ph{elm_phase_min:04.2f}:{elm_phase_max:04.2f}', 'r{elm_reject:04.1f}s{elm_since:05.1f}{elm_smoother:1s}', ] ).format( shot=shot, global_redchisq=global_redchisq, core_redchisq=self.quality_filters['core']['redchisq_limit'], div_redchisq=self.quality_filters['divertor']['redchisq_limit'], tan_redchisq=self.quality_filters['tangential']['redchisq_limit'], efitid=efitid, elm_phase_min=elm_phase[0], elm_phase_max=elm_phase[1], elm_reject=elm_reject, elm_since=elm_since, elm_smoother=elm_smoother[0], revision=int(self.revision_num), ) # Handle efit_data if efit_data is not None: self['efit_data'] = efit_data self.efit_data_provided = True else: self.efit_data_provided = False self.status['setup'] = True self.debug = debug if self.debug: self['debugging'] = SortedDict() # Setup details to support convenient plotting self.default_plot = 'contour_plot' self['plots'] = SortedDict() if not mds_available: printe('Unable to import MDSplus! OMFITthomson will not be able to gather any data!') def __call__(self, report=False): """ Performs all primary operations: gather, map, filter, calculate derived quantities, and package dataset. After running, data will be loaded into the class. :param report: bool Print a status report when done """ self.gather() self.gather_efit_info() self.map() self.filter() self.calculate_derived() if report: self.report_status()
[docs] def printdq(self, *args): """Wrapper for controlling verbosity locally""" if self.verbose: tag_print(*args, tag='DEBUG') else: printd(*args, topic='omfit_ts')
def __tree_repr__(self): """Provides the label that OMFIT should use while representing class instances in the tree.""" if not mds_available: return 'Disabled: no MDSplus support', [] subs = ''.join([subsys[0] for subsys in tolist(self.subsystems)]) state = ( 'done (step 4/4)' if self.status['calcs'] else 'filtered (step 3/4)' if self.status['filtered'] else 'mapped (step 2/4)' if self.status['mapped'] else 'gathered (step 1/4)' if self.status['gathered'] else 'blank (step 0/4)' ) label = '{device:}#{shot:}, [{status:}], map={efitid:}, subs={subs:}, rev={rev:}'.format( device=self.device, shot=self.shot, efitid=self.efitid, subs=subs, rev=self.revision, status=state ) return label, [] # Utilities
[docs] def lookup_latest_shot(self): """Looks up the last shot. Works for DIII-D. Used automatically if shot <= 0, in which case the shot number is treated as relative to the last shot.""" import psycopg2 if is_device(self.device, 'DIII-D'): print(' Looking up latest DIII-D shot...') try: last_shot = OMFITrdb( "SELECT shot FROM summaries WHERE shot = (SELECT MAX(shot) FROM summaries)", db='d3drdb', server='d3drdb', by_column=True, )['shot'][0] except (psycopg2.ProgrammingError, KeyError): raise OMFITexception( 'OMFITthomson is Unable to connect to d3drdb for shot 0 lookup! ' 'Please try again with a real shot number!' ) else: raise NotImplementedError( 'OMFITthomson: Shot 0 lookup functionality is not implemented for device={}. ' 'Please choose a real shot number.'.format(self.device) ) print(' last_shot = {}'.format(last_shot)) return last_shot
[docs] def report_status(self): """ Prints out a report that can be read easily from the command line """ print('OMFITthomson instance: status and contents:') print(' Settings for this instance:') print(' device = {}, shot = {}'.format(self.device, self.shot)) print(' revision_num = {}, revision = {}'.format(self.revision_num, self.revision)) print(' efitid = {}, subsystems = {}'.format(self.efitid, self.subsystems)) print(' measurements = {}'.format(self.measurements)) print(' Status = {}'.format(self.status)) print(' Contents:') print(' self.keys() = {}'.format(list(self.keys()))) for group in ['raw', 'processed', 'filtered']: if group in self: print(" self['{}'].keys() = {}".format(group, list(self[group].keys()))) sub_keys_present = [k for k in list(self[group].keys()) if k in ['core', 'tangential', 'divertor']] k0 = sub_keys_present[0] if len(sub_keys_present) else list(self[group].keys())[0] if len(self[group]) else None if k0 is not None: print(" self['{}']['{}'].keys() = {}".format(group, k0, list(self[group][k0].keys())))
[docs] def find_all_revisions(self, verbose=True): """Looks up all extant revisions of DIII-D Thomson data for the current shot.""" if verbose: print('\n-----') print('Looking up which Thomson scattering analysis revisions exist for {:}#{:}...'.format(self.device, self.shot)) if not is_device(self.device, 'DIII-D'): raise NotImplementedError('OMFITthomson can only search all revisions for DIII-D so far.') rev_tree = OMFITmdsValue(self.device, treename='electrons', shot=self.shot, TDI=r'getnci("ts.revisions\***","FULLPATH")').data() self['all_revisions'] = [] for r in rev_tree: tmp = r.split('TOP.TS.')[1].split('.') revision = '.'.join(tmp[0:2]).strip() if len(tmp) < 3: tmp = revision.split(':') if len(tmp) < 2: self['all_revisions'] += [revision] if verbose: print('Found revisions:') for r in self['all_revisions']: print(' {:}'.format(r)) print("Saved revision list as self['all_revisions']")
[docs] def check_filtered_data(self): """ Check that results are available and that they match shot and filter settings """ if 'filtered' not in list(self.keys()): # This definitely isn't done if the key isn't even in self! self.status['filtered'] = False return False # Settings to check for consistency check_elm = ['elm_phase_range', 'elm_since_reject_below', 'elm_since_accept_above'] check_ts = ['redchisq_limit', 'bad_chords', 'frac_temp_err_hot_max', 'frac_temp_err_cold_max', 'hot_cold_boundary'] filter_settings_okay = True # Everything is okay unless it isn't elm_settings = {} # Not hooked up yet, sorry. print(' OMFITthomson: Checking filtered data...') # Check saved settings from the TS group vs. the settings namelist; they should match for i in check_elm: try: self.printdq(' elm check: {}'.format(i)) self.printdq(' saved: {}'.format(self['filtered']['settings'][i])) self.printdq(' settings: {}'.format(elm_settings['ELM_filter'][i])) if np.any(np.atleast_1d(self['filtered']['settings'][i] != elm_settings['ELM_filter'][i])): filter_settings_okay = False print( 'ELM filter settings do not match: saved {:} = {:} vs. tree settings {:} = {:}. Need to ' 're-filter...'.format(i, self['filtered']['settings'][i] + 0, i, elm_settings['ELM_filter'][i] + 0) ) except KeyError: filter_settings_okay = False # Check saved settings from the ELM group vs. the settings namelist; they should match for subsys in self.subsystems: self.printdq(' OMFITthomson: Checking TS sys: {}'.format(subsys)) if subsys in self['filtered']: for i in check_ts: try: self.printdq('TS check: {}'.format(i)) if i in self.quality_filters['global_override']: set_tmp = self.quality_filters['global_override'][i] else: set_tmp = None self.printdq(' {} = {}'.format(i, set_tmp)) if set_tmp is None: set_tmp = self.quality_filters[subsys][i] self.printdq( ' > {} = {} (None as a global setting is code for use the subsys ' 'specific setting)'.format(i, set_tmp) ) self.printdq(' {}'.format(self['filtered'][subsys]['settings'][i])) if self['filtered'][subsys]['settings'][i] != set_tmp: filter_settings_okay = False print( 'TS quality filter settings do not match: saved {:} = {:} vs current tree setting {:}' ' = {:}. Need to refilter...'.format(i, self['filtered'][subsys]['settings'][i], i, set_tmp) ) except KeyError: filter_settings_okay = False print( 'Failed when checking {} filter settings for consistency in subsys {}. ' 'Need to re-filter...'.format(i, subsys) ) else: self.printdq(' subsys {} not in filtered output') self.status['filtered'] *= filter_settings_okay return filter_settings_okay
[docs] def check_efit_info(self): """ Checks for consistency of any currently loaded EFIT data :return: Flag indicating whether new EFIT data are needed (T) or not (F). """ need_new_efit_data = False check_if_match = {'shot': [self.shot], 'efitID': [self.efitid, 'CAKE']} for check_name, check_values in list(check_if_match.items()): try: match = self['efit_data'][check_name] in check_values except KeyError: self.printdq(' KeyError when checking {}: need to gather new EFIT data'.format(check_name)) need_new_efit_data = True else: need_new_efit_data *= not match self.printdq(' {} matches = {}; need_new_efit = {}'.format(check_name, match, need_new_efit_data)) return need_new_efit_data
[docs] def to_dataset(self): """ Packs data into a list of xarray Dataset instances, one per subsystem :return: dictionary with one Dataset instance for each subsystem """ from xarray import DataArray, Dataset data = {} for sub in self.subsystems: coords = dict(time=self['filtered'][sub]['time'], channel=self['filtered'][sub]['chord_index']) dims = ['channel', 'time'] psi = DataArray(self['filtered'][sub]['psin_TS'], coords=coords, dims=dims, name='psi_N') r = DataArray(self['filtered'][sub]['r'], coords=dict(channel=self['filtered'][sub]['chord_index']), dims=['channel'], name='R') z = DataArray(self['filtered'][sub]['z'], coords=dict(channel=self['filtered'][sub]['chord_index']), dims=['channel'], name='Z') meas = [] if 'temp' in self.measurements: meas += ['T_e'] if 'density' in self.measurements: meas += ['n_e'] d = Dataset(dict(psi=psi, r=r, z=z), coords=coords, attrs=dict(measurements=meas)) params = [par for par in self.measurements if par in ['temp', 'density', 'redchisq']] try: _ = self['filtered'][sub]['press'] except KeyError: pass else: params += ['press'] for param in params: y = self['filtered'][sub][param] if param + '_e' in self['filtered'][sub]: y = uarray(y, self['filtered'][sub][param + '_e']) d[param] = DataArray(y, coords=coords, dims=dims, name=param) data[sub] = d return data
# Primary class methods
[docs] def gather(self, verbose=True): """ Gathers Thomson scattering data from MDSplus """ print(' OMFITthomson: gathering data...') self['raw'] = SortedDict() treename = {'DIII-D': 'ELECTRONS', 'NSTX': 'ACTIVESPEC', 'NSTXU': 'ACTIVESPEC'}.get(tokamak(self.device), None) # MDS tree name if verbose: print('Fetching TS data') measurement_translations = { 'NSTX': {'temp': 'fit_te', 'temp_e': 'fit_te_err', 'density': 'fit_ne', 'density_e': 'fit_ne_err', 'r': 'fit_radii'} } unit_conversion_factors = { # Accessed w/ .get(server, {}).get(meas, 1), so don't define things w/ ucf = 1 'NSTX': { 'temp': 1000.0, # keV to eV 'temp_e': 1000.0, # keV to eV 'density': 1e6, # cm^-3 to m^-3 'density_e': 1e6, # cm^-3 to m^-3 'r': 0.01, # cm to m 'time': 1000.0, # s to ms } } measurement_translations['NSTXU'] = measurement_translations['NSTX'] unit_conversion_factors['NSTXU'] = unit_conversion_factors['NSTX'] if verbose: print('Retrieving Thomson data for {} shot {:}'.format(self.device, self.shot)) print('Revision: {:}'.format(self.revision)) if self.revision_num == -2 and is_device(self.device, 'DIII-D'): # Handle the case where the user wants all the revisions self.find_all_revisions(verbose=False) revisions = [self.revision] + self['all_revisions'] elif self.revision_num == -2 and is_device(self.device, ('NSTX', 'NSTX-U')): printw('All-revision gathering capability not implemented for NSTX/NSTX-U yet') revisions = [self.revision] else: revisions = [self.revision] # Loop through revisions for r, revision in enumerate(revisions): # Direct output so that the blessed revision goes to the main raw_TS tree and the other revs go to sub-trees if r == 0: out = self['raw'] else: if is_device(self.device, 'DIII-D'): rev = revision.split('.')[1] # Take REVISIONS. out of REVISIONS.REVISION00 so it's just REVISION00. elif is_device(self.device, ('NSTX', 'NSTX-U')): rev = revision else: raise IOError('Unrecognized server/device option: {}. ' 'How did you even get this far?'.format(server)) out = self['raw'][rev] = SortedDict() # Make a shortcut to revision subtree if is_device(self.device, 'DIII-D'): # Handle DIII-D version, which has subsystems. # Loop through TS subsystems for subsys in self.subsystems: # Build the base of the call so we can add just the measurement to the end of it later basecall = '.ts.{:}.{:}.'.format(revision, subsys) self.printdq(' sys = {}, basecall = {}'.format(subsys, basecall)) out[subsys] = SortedDict() # Make sure the output location exists # Loop through quantities to gather for meas in self.measurements: call = basecall + meas data = OMFITmdsValue(server=self.device, treename=treename, shot=self.shot, TDI=call) out[subsys][meas] = data.data() if (meas in ['temp', 'temp_e', 'density', 'density_e', 'redchisq']) and ('time' not in out[subsys]): # These signals better have the same time dimension, or else something is very wrong. # We don't even have to bother covering for a case where the time dimensions within a TS # subsystem don't match because the case is probably un-salvageable and horribly corrupted. out[subsys]['time'] = data.dim_of(0) elif is_device(self.device, ('NSTX', 'NSTX-U')): # Handle NSTX/U version, which does not have subsystems. basecall = '.mpts.output_data.{:}.'.format(revision) self.printdq(' basecall = {}'.format(basecall)) subsys = tolist(self.subsystems)[0] print( 'NSTX/NSTX-U do not have sub-systems like at DIII-D, so we will pretend that all data are from ' 'the first subsystem in the list, which is: {}'.format(subsys) ) out[subsys] = SortedDict() # Make sure the output location exists data = None self.printdq(' Starting loop through measurements: {}'.format(self.measurements)) for meas_ in self.measurements: meas = measurement_translations.get(self.device, {}).get(meas_, None) ucf = unit_conversion_factors.get(self.device, {}).get(meas_, 1) if meas is not None: call = basecall + meas self.printdq(' call = {}, shot = {}, server = {}, treename = {}'.format(call, self.shot, self.device, treename)) data = OMFITmdsValue(server=self.device, treename=treename, shot=self.shot, TDI=call) out[subsys][meas_] = data.data() * ucf # These signals better have the same time dimension, or else something is very wrong. We don't # even have to bother covering for a case where the time dimensions don't match because the case # is probably un-salvageable and horribly corrupted. ucf = unit_conversion_factors.get(self.device, {}).get('time', 1) out[subsys]['time'] = data.dim_of(0) * ucf # Just use the last `data` that's left over from the loop measurements_l = [meas_.lower() for meas_ in self.measurements] out_k_l = [k.lower() for k in list(out[subsys].keys())] if 'r' in measurements_l and 'z' in measurements_l and 'z' not in out_k_l: out[subsys]['z'] = out[subsys]['r'] * 0 if 'redchisq' in measurements_l and 'redchisq' not in out_k_l: if 'temp' in out[subsys]: out[subsys]['redchisq'] = out[subsys]['temp'] * 0 elif 'density' in out[subsys]: out[subsys]['redchisq'] = out[subsys]['density'] * 0 else: printw('WARNING: Unable to properly assign redchisq or even redchisq placeholder!') out[subsys]['redchisq'] = None else: raise IOError( 'Unrecognized server/device option: {}. Honestly, how have you not been caught before ' 'this? You should not have gotten this far (you are a bad person).'.format(self.device) ) if is_device(self.device, 'DIII-D'): self['raw']['blessed_id'] = OMFITmdsValue(server=self.device, treename=treename, shot=self.shot, TDI='ts.blessed_id').data()[0] # Save shot & revision so data can be checked against input settings in the future self['raw']['device'] = copy.copy(self.device) self['raw']['shot'] = copy.copy(self.shot) # Copy so that they don't update when settings in the tree change self['raw']['revision_num'] = copy.copy(self.revision_num) self['raw']['revision'] = copy.copy(self.revision) self.status['gathered'] = True print('Done gathering raw Thomson data.')
[docs] def gather_efit_info(self): """ Gathers basic EFIT data for use in mapping. """ self.setdefault('efit_data', SortedDict()) if self.check_efit_info(): print(' Gathering EFIT {:}...'.format(self.efitid)) g_file_quantities = ['rhovn', 'pres', 'bcentr', 'rmaxis', 'zmaxis', 'r', 'z', 'lim', 'cpasma'] a_file_quantities = [] derived_quantities = ['psin', 'psin1d', 'time'] backup_plans = {'EFIT04': 'EFIT03', 'EFIT03': 'EFIT01', 'EFIT02': 'EFIT01'} efit_data = read_basic_eq_from_mds( device=self.device, server=None, shot=self.shot, tree=self.efitid, g_file_quantities=g_file_quantities, a_file_quantities=a_file_quantities, derived_quantities=derived_quantities, ) backup_plan = backup_plans.get(self.efitid, None) if not self.allow_backup_efitid: backup_plan = None while backup_plan and not efit_data: printw('Could not find {}, so falling back to {}...'.format(self.efitid, backup_plan)) efit_data = read_basic_eq_from_mds( device=self.device, server=None, shot=self.shot, tree=self.efitid, g_file_quantities=g_file_quantities, a_file_quantities=a_file_quantities, derived_quantities=derived_quantities, ) if not efit_data: backup_plan = backup_plans.get(backup_plan, None) else: print('It worked! Got equilibrium from backup source: {}'.format(backup_plan)) self['efit_data']['original_request_efitid'] = copy.copy(self.efitid) self.efitid = backup_plan if not efit_data: raise OMFITerror( 'Could not read equilibrium data from {} and no backup plan found! ' 'Unable to map TS data!'.format(self.efitid) ) self['efit_data'] = SortedDict() # Clobber any existing efit_data to avoid mixing with old results. if efit_data: # Record which shot we're doing this for so we can compare later to ensure that data are current efit_data['shot'] = copy.copy(self.shot) efit_data['efitID'] = self.efitid efit_data['gathered_by'] = 'OMFITthomson' for a in list(efit_data.keys()): self['efit_data'][a] = efit_data[a] else: print('OMFITthomson already has appropriate EFIT data for shot {}. ' 'Skipping EFIT gathering.'.format(self.shot))
[docs] def map(self, note='', remove_efits_with_large_changes=False): """ Maps Thomson to the EFIT. Because most of the TS data are at the same radial location, the interpolation of psi(r,z) onto R,Z for Thomson is sped up by first interpolating to the R for most Thomson, then interpolating along the resulting 1D psi(z). If there is more than one R value (such as if tangential TS is included), the program loops over each unique R value. This could be done with one 2D interpolation, but I think it would be slower. :param note: Prints a note when starting mapping. :param remove_efits_with_large_changes: Filter out EFIT time slices where the axis or boundary value of un-normalized psi changes too fast. It's supposed to trim questionable EFITs from the end, but it doesn't seem to keep them all out with reasonable thresholds. This feature was a nice idea and I think it's coded properly, but it isn't performing at the expected level, so either leave it off or tune it up better. """ # Three segments of TS data: pulses before the first EFIT slice (may be empty) # pulses after the last EFIT slice (may be empty) # pulses between first & last EFIT slice # Just use the nearest slice. # If using EFIT03 or EFIT04 (as recommended), the EFIT goes on the TS timebase, anyway. print('Mapping TS to EFIT...' + note) if is_device(self.device, ['NSTX', 'NSTXU']) and self.efitid.upper().startswith('EFIT'): printw( 'WARNING: Mapping of NSTX/U data to basic EFITs typically produces inaccurate results! ' 'Watch out for double valued profiles in the pedestal. Properly tuning psi_N mapping of NSTX ' 'Thomson data is complicated.' ) self.printdq('map_TS systems = {}'.format(self.subsystems)) # Make sure the tree is set to accept processed data (psin for TS) self['processed'] = SortedDict() # Gather data need_to_gather = False # For Thomson try: # The recorded shot for gathered data should match the shot in the settings tree. Otherwise, gather. if self['raw']['shot'] != self.shot: need_to_gather = True for subsys in self.subsystems: if subsys not in self['raw']: need_to_gather = True except KeyError: need_to_gather = True if need_to_gather: self.gather() need_to_gather_efit = False # Do it again for EFIT try: # The recorded shot for gathered data should match the shot in the settings tree. Otherwise, gather. if self['efit_data']['shot'] != self.shot: need_to_gather_efit = True if self['efit_data']['efitid'] != self.efitid: need_to_gather_efit = True except KeyError: need_to_gather_efit = True if need_to_gather_efit: self.printdq(' map_TS: need to gather EFIT info again') self.gather_efit_info() self.printdq(' map_TS: done dealing with gathering stuff') # At this point, data should be available. # Get some info out of EFIT efit_r = self['efit_data']['r'] efit_z = self['efit_data']['z'] if len(np.atleast_1d(np.shape(efit_r))) == 2: # Oh great, someone decided to save R as being time dependent, even though it probably isn't. # If it is, this script will be WRONG and bad. Note that the IMAS standard uses a time-dependent R-grid, # so this will have to be supported properly in the future. This script will have to be updated or replaced. printw( 'WARNING: EFIT R grid was saved as 2D. If it is actually time dependent (probably is not), ' 'then mapping in fastTS will be wrong!' ) nt = len(self['efit_data']['time']) wta = np.where(np.array(np.shape(efit_r)) == nt)[0][0] # Where's the time axis? ii = 0 if wta == 0: while efit_r[ii, :].max() == 0: ii += 1 # Find a slice that's not blank. Some might be, but non-blanks should be the same. efit_r = efit_r[ii, :] efit_z = efit_z[ii, :] elif wta == 1: while efit_r[:, ii].max() == 0: ii += 1 efit_r = efit_r[:, ii] efit_z = efit_z[:, ii] else: raise ValueError('Problem finding which axis to deal with when collapsing EFIT R grid.') elif len(np.atleast_1d(np.shape(efit_r))) == 1: self.printdq(' EFIT R grid is 1D, which is how we like it. Good.') else: raise ValueError('EFIT R grid has too many dimensions! np.shape(efit_r) = {}'.format(np.shape(efit_r))) # We have to make sure we get the best EFIT slice for each Thomson slice. We will compare timings. efit_time = self['efit_data']['time'] # Filter EFIT slices to remove slices that are full of all zeros psin = self['efit_data']['psin'] # This psin is a function of efit_time, efit_r, and efit_z maxpsin = np.max(psin, axis=(1, 2)) # Maximum value at any point in space for each time slice self.printdq(' map_TS: max value in space at each time slice maxpsin: np.shape(maxpsin) = {}'.format(np.shape(maxpsin))) if remove_efits_with_large_changes: # Fine idea, but it doesn't seem to reliably get rid of EFITs where the innermost TS point jumps really far, # so the plot still looks stupid. Maybe we just need a better plot than tricontourf. sm = self['efit_data']['ssimag'] # un-normalized pSI values at the MAGnetic axis and at the BoundaRY bm = self['efit_data']['ssibry'] # Sorry, I don't know what's up with the first s or why it's ssi instead of psi. dsm = deriv(efit_time, sm) # Time derivative to check for changes dbm = deriv(efit_time, bm) sdsm = smooth(dsm, 5) sdbm = smooth(dbm, 5) dsmthresh = 0.025 dbmthresh = 5e-4 if debug: print('removing EFITs with large changes') f, ax = pyplot.subplots(2, sharex=True) ax[0].plot(dsm, label='time deriv of un-normalized psi value on axis') ax[0].plot(sdsm) ax[0].axhline(dsmthresh, color='k', linestyle='--') ax[0].axhline(-dsmthresh, color='k', linestyle='--') ax[1].plot(dbm, label='time deriv of un-normalized psi value on boundary') ax[1].plot(sdbm) ax[1].axhline(dbmthresh, color='k', linestyle='--') ax[1].axhline(-dbmthresh, color='k', linestyle='--') wefg = np.where((maxpsin != 0) * (abs(sdsm) < dsmthresh) * (abs(sdbm) < dbmthresh))[0] else: wefg = np.where(maxpsin != 0)[0] # Find slices where max value is non-zero psin = psin[wefg, :, :] efit_time = efit_time[wefg] # Filtered EFIT doesn't have blank (all zero) slices. # Loop through Thomson subsystems. 'core' is the most popular, # but you could also do ['core', 'divertor', 'tangential'] or any subset thereof. for subsys in self.subsystems: # Make sure the output has a place to go self['processed'].setdefault(subsys, SortedDict()) try: ts_time = self['raw'][subsys]['time'] # Thomson timing to compare to EFIT except KeyError: ts_time = None if ts_time is None: printw('WARNING! System {:} appears to be missing data! Mapping aborted!'.format(sys)) self['processed'][subsys]['psin_TS'] = None # Leaving this here signals that mapping was attempted continue slice_to_use = np.zeros(len(ts_time)) # Record best EFIT slice for each TS slice and put it in this array # If thomson starts before EFIT (often does), then use the first valid EFIT slice for early Thomson data. # Bad, but nothing better. User should be able to notice this happening from a contour. w = np.where(ts_time < min(efit_time))[0] if len(w) > 0: slice_to_use[w] = 0 first_ts = max(w) + 1 else: first_ts = 0 self.printdq( ' OMFITthomson.map: first_ts = {:} (first TS time-slice that is within EFIT time range; ' 'earlier TS slices use the first available EFIT although it ' 'happens later than the early TS data)'.format(first_ts) ) # If Thomson ends after EFIT (also often happens), then use the last valid EFIt slice for late Thomson data. # Thomson after EFIT probably happened after the flattop and you don't care, anyway. It should be apparent # that the same EFIT is being used if you look at a contour. w = np.where(ts_time > max(efit_time))[0] if len(w) > 0: slice_to_use[w] = len(efit_time) - 1 last_ts = min(w) - 1 else: last_ts = len(ts_time) - 1 self.printdq( ' map_TS: last_ts = {:} (last TS time-slice that is within EFIT time range; ' 'any slice later than this will use the last EFIT available)'.format(last_ts) ) # Now go slice by slice and find the closest EFIT slice to each TS slice. ts_slices = np.arange(first_ts, last_ts + 1) # This is a list of indices of TS slices that are within the # EFIT time range and we should bother to find them the # closest EFIT. self.printdq(' ts_slices = {} (indices of TS slices that are within EFIT time range)'.format(ts_slices)) self.printdq(' len(ts_slices) = {}'.format(len(ts_slices))) self.printdq(' ts_time[ts_slices] = {}'.format(ts_time[ts_slices])) for i in ts_slices: # For each TS time slice, find index of the EFIT slice to use slice_to_use[i] = closestIndex(efit_time, ts_time[i]) slice_to_use = slice_to_use.astype(int) if self.debug: self['debugging']['slice_to_use'] = slice_to_use self.printdq(' np.shape(slice_to_use) = {}'.format(np.shape(slice_to_use))) self.printdq(' np.shape(efit_time) = {} (this is after the wefg filter)'.format(np.shape(efit_time))) self.printdq( ' np.shape(psin) = {} (this is the psin from the EFIT after filtering out bad slices)'.format(np.shape(psin)) ) # #############--------------------------------- # Okay, great. We should know which EFIT slice is best for each TS slice. You could interpolate the EFIT in # time to be fancier, but this probably doesn't matter. It's fine. Let's go! # We're going to do a fast spatial interpolation. # This is optimized using specific information about the Thomson system: that almost all of the chords # are in the same radial position. That means that we can calculate the coefficients for the R # interpolation once for each unique R value (there is only 1 for the core, and only 1 for divertor. # Tangential is different). We do the R interpolation once, get psi_N vs Z at the correct R, and then # do a simple 1D interpolation. Fast! For the tangential system, it will loop through the 6 unique R # values instead of doing just 1 unique R value for core & div. Oh well. ts_r = self['raw'][subsys]['r'] ts_z = self['raw'][subsys]['z'] # Set of unique R and Z ur = set(ts_r) uz = set(ts_z) # zcheck = np.zeros(np.shape(ts_z)) # Set up debugging array # Make an array to catch psin at the TS locations vs EFIT time base. This guy needs to be converted to the # TS time base later. The mapping for this conversion was calculated earlier. psin_ts_almost = np.zeros([len(ts_r), len(efit_time)]) for i, r in enumerate(ur): # Do the R interpolation for each Unique R value (ur) dr = r - efit_r right = np.searchsorted(efit_r, r, side="left") # Find closest EFIT R on the left & right of TS R value if efit_r[right] == r: # Shortcut in case of exact match psin_slice = psin[:, :, right] else: left = right - 1 # Find closest EFIT R values on the left and right of TS R value dlr = efit_r[right] - efit_r[left] # Find the spacing between the EFIT R values on L and R left_weight = -dr[right] / dlr # Large diff between TS & right EFIT ==> large weight on left value right_weight = dr[left] / dlr # Large diff between TS & left EFIT ==> large weight on right value # Weighted sum of psin from left and right sides: psin_slice = psin[:, :, left] * left_weight + psin[:, :, right] * right_weight # This has reduced psin from psin(R,Z) to psin(Z) for j, z in enumerate(uz): # Do the Z interpolation for each Unique Z value (uz) # Same idea as before, except it reduces a line to a point instead of a grid to a line. dz = z - efit_z right = np.searchsorted(efit_z, z, side='left') if efit_z[right] == z: psin_pt = psin_slice[:, right] else: left = right - 1 dlr = efit_z[right] - efit_z[left] left_weight = -dz[right] / dlr right_weight = dz[left] / dlr psin_pt = psin_slice[:, left] * left_weight + psin_slice[:, right] * right_weight # Now we map this back into the shape of TS data. Remember, this interpolation operated on unique # values within the TS coordinates, not on each point. This result here does not necessarily have # compatible dimensions with the original TS, so we have to map it in. wc = np.where((ts_r == r) * (ts_z == z))[0] psin_ts_almost[wc, :] = psin_pt[np.newaxis, :] # Now we have psi_N at each Thomson spatial location vs the EFIT time base. # It's simple to translate to the TS time base since we already figured out which EFIT slice was best to use self['processed'][subsys]['psin_TS'] = psin_ts_almost[:, slice_to_use] # Tag the output with shot and EFIT ID so it can be checked vs. user settings later (make sure data are current) self['processed']['psin_TS_shot'] = copy.copy(self.shot) self['processed']['psin_TS_efitID'] = copy.copy(self.efitid) self.status['mapped'] = True
[docs] def filter(self): """ Data quality and ELM filter ELM phase and timing data are used to select slices. Individual data are flagged to indicate whether they passed the filter. If any chords are completely bad, then they are just removed from the output. """ print(' OMFITthomson: filtering data...') # Set up ELM filter apply_elm_filter = self.elm_filter is not False elm_phase_range = self.elm_filter['settings']['filtering']['elm_phase_range'] if apply_elm_filter else 0 elm_since_reject_below = self.elm_filter['settings']['filtering']['elm_since_reject_below'] if apply_elm_filter else 0 elm_since_accept_above = self.elm_filter['settings']['filtering']['elm_since_accept_above'] if apply_elm_filter else 0 # Make sure things are ready if not self.status['mapped']: self.map() # Get plasma current for later if is_device(self.device, 'DIII-D'): ip = OMFITmdsValue(self.device, treename=None, shot=self.shot, TDI='ip') ipi = interp1d(ip.dim_of(0), ip.data(), bounds_error=False, fill_value=0) elif is_device(self.device, ('NSTX', 'NSTX-U')): ip = OMFITmdsValue(self.device, treename='engineering', shot=self.shot, TDI='analysis.ip1') ipi = interp1d(ip.dim_of(0) * 1000, ip.data(), bounds_error=False, fill_value=0) else: raise IOError( 'No instructions for looking up plasma current for this device: {}. ' 'Unable to apply Thomson data filters. Please try again with a supported device from this ' 'list: {}'.format(self.device, self.supported_devices) ) # Prepare tree to catch results fsetg = self.setdefault('filtered', SortedDict()).setdefault('settings', SortedDict()) for subsys in self.subsystems: ts_time = self['raw'][subsys]['time'] if ts_time is None: printw('WARNING! System {:} appears to be missing data. ' 'Aborting filtering for {:}.'.format(subsys, subsys)) fsys = self['filtered'].setdefault(subsys, SortedDict()) for a in ['time', 'r', 'z', 'chord_index', 'psin_TS', 'temp', 'density']: # Throw None into a few key parameter names to make it clear that nothing good happened here. fsys[a] = None fsys.setdefault('filters', SortedDict())['okay'] = None fsys['filters'].setdefault('raw', SortedDict())['raw_okay'] = None continue temp = copy.copy(self['raw'][subsys].get('temp', None)) tempe = copy.copy(self['raw'][subsys].get('temp_e', None)) density = copy.copy(self['raw'][subsys].get('density', None)) density_e = copy.copy(self['raw'][subsys].get('density_e', None)) # Figure out filters fs = self.quality_filters[subsys] # Shortcut to system specific filter settings fg = self.quality_filters['global_override'] # Shortcut to global filter settings # Global filter settings take precedence if not ==None selected_filters = {} # Things where the global should be able to take over for thing in ['redchisq_limit', 'frac_temp_err_hot_max', 'frac_temp_err_cold_max', 'frac_dens_err_max', 'hot_cold_boundary']: if fg[thing] is not None: self.printdq( ' Using GLOBAL OVERRIDE setting for {:} filter setting {:}, value = {:} ' '<<<< OVERRIDE'.format(subsys, thing, fg[thing]) ) selected_filters[thing] = fg[thing] else: self.printdq('Using system specific setting for {:} filter setting {:}, value = {:}'.format(subsys, thing, fs[thing])) selected_filters[thing] = fs[thing] # Things which are system-specific only for thing in ['bad_chords']: selected_filters[thing] = fs[thing] # Unpack (not all of them need to be unpacked because I put in selected_filters[''] around them redchisq_limit = selected_filters['redchisq_limit'] bad_chords = selected_filters['bad_chords'] frac_temp_err_hot_max = selected_filters['frac_temp_err_hot_max'] frac_temp_err_cold_max = selected_filters['frac_temp_err_cold_max'] hot_cold_boundary = selected_filters['hot_cold_boundary'] # Time flags #--------------------------------------------------------------------------- if int(apply_elm_filter) > 0: ep_devices = self.elm_filter.supported_devices if self.device in ep_devices: # ELM filter elm_okay = self.elm_filter.filter(times_to_filter=ts_time, cer_mode=False, stimes=0.0) else: printw('WARNING: Current device ({}) is not supported by ELM_processing. Skipping ELM filter...'.format(device)) print(' ELM_processing supported devices: {}'.format(ep_devices)) elm_okay = np.ones(len(ts_time), bool) apply_elm_filter = False else: elm_okay = np.ones(len(ts_time), bool) # Filter to cut negative time inshot_okay = ts_time > 0 # Filter to select where ip is okay ipts = ipi(ts_time) if self.debug: self['debugging']['ipts'] = ipts ipokay = abs(ipts) > self.quality_filters['ip_thresh_frac_of_max'] * max(abs(ipts)) # Combine all time-slice filters timeslice_okay = elm_okay * ipokay * inshot_okay # Position flags #----------------------------------------------------------------------- # Manual chord rejection manual_chord_okay = np.ones(len(self['raw'][subsys]['r'])) if len(tolist(bad_chords, [None])): w = len(manual_chord_okay) > bad_chords > 0 if np.sum(np.atleast_1d(w)): manual_chord_okay[np.atleast_1d(bad_chords)[w]] = 0 # Check to see that the chord got at least one slice nonzero_error = tempe.max(1) > 0 # Combine all chord filters chord_okay = (manual_chord_okay * nonzero_error).astype(bool) # 2D flags #--------------------------------------------------------------------------- all_okay_2d = np.ones((len(self['raw'][subsys]['r']), len(self['raw'][subsys]['time'])), bool) # Reduced chi squared below threshold if self['raw'][subsys].get('redchisq', None) is not None: redchisq_okay = self['raw'][subsys]['redchisq'] < redchisq_limit else: redchisq_okay = all_okay_2d # Check for high fractional error --- # Check fractional temp err if temp is not None: temp_nonzero = copy.copy(temp) temp_nonzero[temp_nonzero == 0] = 1.0 # Avoid divide by zero error frac_temp_err = tempe / temp_nonzero frac_err_okay = (temp > hot_cold_boundary) * (frac_temp_err < frac_temp_err_hot_max) + (temp <= hot_cold_boundary) * ( frac_temp_err < frac_temp_err_cold_max ) else: frac_err_okay = all_okay_2d # Check fractional dens err if density is not None: dens_nonzero = copy.copy(density) dens_nonzero[dens_nonzero == 0] = 1.0 # Avoid divide by zero error frac_dens_err = density_e / dens_nonzero frac_dens_err_okay = frac_dens_err < selected_filters['frac_dens_err_max'] # Overall fractional error combo frac_err_okay &= frac_dens_err_okay # Check for non-zero measurement (TS can't measure 0 and 0+/-0 means missing data). We can go from just # temperature. If Temperature exists, so will density. They are parameters output from the same fit to raw # signal. if tempe is not None: measurement_exist_okay = tempe > 0 elif density_e is not None: measurement_exist_okay = density_e > 0 else: measurement_exist_okay = all_okay_2d # Combine all 2D filters two_d_okay = redchisq_okay * frac_err_okay * measurement_exist_okay # This is ONLY the two D filters. The 1D filters are things like the ELM filter that are time only or the # bad chord selector which is position only. THESE ARE NOT INCLUDED HERE (because we can delete any row or # column that's all bad). if two_d_okay.max() > 1: two_d_okay[np.where(two_d_okay > 1)[0]] = 1 ########################### chord_index = np.atleast_1d(list(range(len(self['raw'][subsys]['r'])))) wt = timeslice_okay wc = chord_okay okay = np.atleast_2d(two_d_okay[:, wt][wc, :]) okay_complete = copy.copy(two_d_okay) okay_complete[:, ~wt] = 0 okay_complete[~wc, :] = 0 # okay_complete has the same shape as two_d_okay, but it has the 1D filters applied to it. # USE THIS if you aren't removing bad slices entirely # Set up tree for results (make sure the trees exist and make shortcuts) fsys = self['filtered'].setdefault(subsys, SortedDict()) filt = fsys.setdefault('filters', SortedDict()) fset = fsys.setdefault('settings', SortedDict()) fraw = filt.setdefault('raw', SortedDict()) # Save filter results if self.quality_filters['remove_bad_slices']: filt['okay'] = okay # This is two_d_okay with the bad time slices and chords REMOVED from the array fsys['chord_index'] = chord_index[wc] filt['elm_okay'] = elm_okay[wt] else: filt['okay'] = okay_complete # This is two_d_okay with the bad time slices and chords set to ZERO # (the array stays the same size) fsys['chord_index'] = chord_index filt['elm_okay'] = elm_okay fraw['raw_okay'] = okay_complete fraw['redchisq_okay'] = redchisq_okay fraw['frac_err_okay'] = frac_err_okay fraw['measurement_exist_okay'] = measurement_exist_okay fraw['ipokay'] = ipokay fraw['inshot_okay'] = inshot_okay fraw['elm_okay'] = elm_okay fraw['manual_bad_chords'] = copy.copy(bad_chords) filt['accepted_time_slices'] = wt filt['accepted_chords'] = wc filt['two_d_okay'] = two_d_okay # Save filtered data if self.quality_filters['remove_bad_slices']: for tag in ['temp', 'temp_e', 'density', 'density_e', 'redchisq']: # These get bad slices & chords removed. Remaining bad points are set to 0 by multiplying by `okay`. if self['raw'][subsys].get(tag, None) is None: self.printdq(' {} was None in system {}! Copying `None` across without filtering.'.format(tag, sys)) fsys[tag] = None else: fsys[tag] = self['raw'][subsys][tag][wc, :][:, wt] for tag in ['psin_TS']: # This is like the above entry for temp, etc., # but separate because it's in a different place than raw data. fsys[tag] = self['processed'][subsys][tag][wc, :][:, wt] for tag in ['time']: # These get bad slices removed; separate b/c different indexing than the 2D data fsys[tag] = self['raw'][subsys][tag][wt] for tag in ['r', 'z']: # These get bad chords removed; separate b/c different indexing than the 2D data fsys[tag] = self['raw'][subsys][tag][wc] else: for tag in ['temp', 'temp_e', 'density', 'density_e', 'redchisq', 'r', 'z', 'time']: # Unlike the other side of this if, the 1D & 2D arrays don't need different processing this time b/c # we aren't removing the bad slices fsys[tag] = self['raw'][subsys][tag] for tag in ['psin_TS']: # This is separate because it's in a different place than the raw data fsys[tag] = self['processed'][subsys][tag] if self.quality_filters['set_bad_points_to_zero']: # We can make an extra barrier against accidentally including bad data if we multiply the values at # every bad point by zero. This is an option because there may be cases where one would like to examine # which data have been rejected, and this is hard to do if they have been zeroed out. for tag in ['temp', 'temp_e', 'density', 'density_e', 'psin_TS']: if fsys[tag] is not None: fsys[tag] = fsys[tag] * okay.astype(float) # Save filter settings - sys specific # Without copy.copy(), the "copies" of the settings would update with the main one, and we don't want that. fset['redchisq_limit'] = copy.copy(redchisq_limit) fset['bad_chords'] = copy.copy(bad_chords) fset['frac_temp_err_hot_max'] = copy.copy(frac_temp_err_hot_max) fset['frac_temp_err_cold_max'] = copy.copy(frac_temp_err_cold_max) fset['frac_dens_err_max'] = copy.copy(selected_filters['frac_dens_err_max']) fset['hot_cold_boundary'] = copy.copy(hot_cold_boundary) fset['remove_bad_slices'] = copy.copy(self.quality_filters['remove_bad_slices']) fset['set_bad_points_to_zero'] = copy.copy(self.quality_filters['set_bad_points_to_zero']) # Save ELM detection & filter settings - all systems fsetg['elm_phase_range'] = copy.copy(elm_phase_range) fsetg['elm_since_reject_below'] = copy.copy(elm_since_reject_below) fsetg['elm_since_accept_above'] = copy.copy(elm_since_accept_above) if apply_elm_filter: elm_det_set = self.elm_filter['settings']['detection'] fsetg['elm_detection_smooth_funct'] = copy.copy(elm_det_set['smoother']) fsetg['elm_detection_filterscopes'] = copy.copy(elm_det_set['default_filterscope_for_elms']) fsetg['elm_detection_auto_fscopes'] = 'filterscope_chosen_automatically_for_shot' in elm_det_set fsetg['elm_detection_tuning'] = copy.deepcopy(elm_det_set['{:}_tuning'.format(elm_det_set['smoother'])]) # Save shot for records self['filtered']['device'] = copy.copy(self.device) self['filtered']['shot'] = copy.copy(self.shot) self.status['filtered'] = True print('Done filtering TS data.') # Add shortcuts to plot methods for convenience try: self.setdefault('plots', SortedDict())['profile_plot'] = function_to_tree(self.profile_plot, self) self['plots']['contour_plot'] = function_to_tree(self.contour_plot, self) except ImportError: type_, val_, traceback_ = sys.exc_info() self.printdq(' Failed to load plot script to tree with function_to_tree.', topic='omfit_elm') if self.debug: self.printdq( ' Exception type = {type:}\n' ' Exception value = {value:}\n' ' Exception traceback = \n{trace:}'.format(type=type_, value=val_, trace=traceback_), topic='omfit_elm', )
# This won't work outside of OMFIT and we don't want it to
[docs] def select_time_window( self, t0, dt=25.0, systems='all', parameters=['temp', 'density'], psi_n_range=[0, 1.5], strict=None, use_shifted_psi=True, alt_x_path=None, comment=None, perturbation=None, realtime=False, ): """ Grabs Thomson data for the time window [t0-dt, d0+dt] for the sub-systems and parameters specified. :param t0: Center of the time window in ms :param dt: Half-width of the time window in ms :param systems: Thomson sub-systems to gather (like ['core', 'divertor', 'tangential']). If None: detect which systems are available. :param parameters: Parameters to gather (like ['temp', 'density', 'press']) :param psi_n_range: Range of psi_N values to accept :param strict: Ignored (function accepts this keyword so it can be called generically with same keywords as its counterpart in the quickCER module) :param use_shifted_psi: T/F attempt to look up corrected psi_N (for profile alignment) in alt_x_path. :param alt_x_path: An alternative path for gathering psi_N. This can be an OMFIT tree or a string which will give an OMFITtree when operated on with eval(). Use this to provide corrected psi_N values after doing profile alignment. Input should be an OMFIT tree containing trees for all the sub systems being considered in this call to select_time_window ('core', 'tangential', etc.). Each subsystem tree should contain an array of corrected psi_N values named 'psin_corrected'. :param comment: Optional: you can provide a string and your comment will be announced at the start of execution. :param perturbation: None or False for no perturbation or a dictionary with instructions for perturbing data for doing uncertainty quantification studies such as Monte Carlo trials. The dictionary can have these keys: - random: T/F (default T): to scale perturbations by normally distributed random numbers - sigma: float (default 1): specifies scale of noise in standard deviations. Technically, I think you could get away with passing in an array of the correct length instead of a scalar. - step_size: float: specifies absolute value of perturbation in data units (overrides sigma if present) - channel_mask: specifies which channels get noise (dictionary with a key for each parameter to mask containing a list of channels or list of T/F matching len of channels_used for that parameter) - time_mask: specifies which time slices get noise (dictionary with a key for each parameter to mask containing a list of times or list of T/F matching len of time_slices_used for that parameter) - data_mask: specifies which points get noise (overrides channel_mask and time_mask if present) (dictionary with a key for each parameter to mask containing a list of T/F matching len of data for that parameter. Note: this is harder to define than the channel and time lists.) Shortcut: supply True instead of a dictionary to add 1 sigma random noise to all data. :param realtime: T/F: Gather realtime data instead of post-shot analysis results. :return: A dictionary containing all the parameters requested. Each parameter is given in a dictionary containing x, y, e, and other information. x, y, and e are sorted by psi_N. """ from operator import itemgetter if systems == 'all': systems = self.subsystems self.printdq('OMFITthomson: select_time_window...') if comment is not None: self.printdq('OMFITthomson: {}'.format(comment)) if realtime: ts_src = 'filtered_RTTS' else: ts_src = 'filtered' # Minor notifications (tell user not to try to put real values in the unused keywords (uk) for ukn, uk in list({'strict': strict}.items()): if uk is not None: self.printdq(' {} keyword is ignored by fastTS version of select_time_window(), but its value was not None!'.format(ukn)) results = self if ts_src not in results: if ts_src == 'filtered_RTTS': self.printdq("fastTS: filtered realtime Thomson data missing in INPUTS! Executing RT gather+filter...") raise NotImplementedError("Realtime data handling not ready yet") else: self.printdq("fastTS: filtered Thomson data missing in INPUTS! Executing gather+filter...") self.__call__() if systems is None: systems = [k for k in list(results[ts_src].keys()) if isinstance(results[ts_src][k], dict) and k != 'settings'] self.printdq('Auto-assigned systems to based on filtered data availability: {:}'.format(systems)) # Sanitize inputs parameters = tolist(parameters) systems = tolist(systems) psi_n_range = np.atleast_1d(psi_n_range) t0 = np.atleast_1d(t0)[0] result = {} for par in parameters: self.printdq('Loop through parameters: {:}'.format(par)) time_slices_used = np.array([]) systems_used = [] system_okay = np.zeros(len(systems), bool) x1 = np.array([]) y1 = np.array([]) e1 = np.array([]) t1 = np.array([]) c1 = np.array([]) sys1 = np.array([]) chords_used = [] for i_sys, sys in enumerate(systems): self.printdq('Loop through systems: {:}'.format(sys)) # Make sure this system & parameter were gathered if sys not in results[ts_src]: self.printdq(' No data for subsystem {:}'.format(sys)) continue if par not in results[ts_src][sys] or results[ts_src][sys][par] is None: printw(' fastTS select_time_window(): No data for this parameter: {:}'.format(par)) continue if use_shifted_psi: try: if isinstance(alt_x_path, str): alt_x_path = eval(alt_x_path) else: alt_x_path = alt_x_path x = alt_x_path[sys]['psin_corrected'] except (KeyError, TypeError): x = results[ts_src][sys]['psin_TS'] self.printdq(' Got psi_N values from filtered_TS (not using shifted profiles)') else: self.printdq( ' Got psi_N values from alt_x_path (probably using shifted profiles) (path = {})'.format( treeLocation(alt_x_path)[-1] ) ) self.printdq(' psi_N values: min = {}, max = {}, average = {}'.format(x.min(), x.max(), x.mean())) else: x = results[ts_src][sys]['psin_TS'] self.printdq(' Got psi_N values from filtered_TS (not using shifted profiles)') y = results[ts_src][sys][par] try: e = results[ts_src][sys]['{:}_e'.format(par)] except KeyError: self.printdq(' Could not find uncertainties for this parameter: {:}'.format(par)) # User is allowed to request r and z as parameters, but these do not have uncertainties. e = np.zeros(len(np.atleast_1d(x)), type(np.atleast_1d(x)[0])) okay = results[ts_src][sys]['filters']['okay'] t = results[ts_src][sys]['time'] ch = results[ts_src][sys]['chord_index'] ch2 = ch[:, np.newaxis] + np.zeros(np.shape(x)) # 2D version of chord index for tracking chords used sys_num = {'core': 0, 'divertor': 1000, 'tangential': 2000}[sys] ch2 += sys_num # Time filter t2 = t[np.newaxis, :] + np.zeros(np.shape(x)) # 2D version of time for making the filter we'll use on data wt = (t2 >= (t0 - dt)) & (t2 <= (t0 + dt)) # Position filter wx = (x >= min(psi_n_range)) & (x <= max(psi_n_range)) # Overall filter w = wt * wx * okay # Make sure the filter doesn't delete everything if w.max() < 1: self.printdq(' No data within this time window for this subsystem: {:}+/-{:}, {:}'.format(t0, dt, sys)) continue x1 = np.append(x1, x[w]) y1 = np.append(y1, y[w]) e1 = np.append(e1, e[w]) t1 = np.append(t1, t2[w]) c1 = np.append(c1, ch2[w]) # Keep track of slices and chords used time_slices_used0 = np.sort(tolist(set(t2[w].flatten()))) # tolist() removes the set type time_slices_used = np.append(time_slices_used, time_slices_used0) chords_used0 = np.sort(tolist(set(ch2[w].flatten()))) # chords_used0 = ['{}{:02d}'.format(sys[0], int(ch_)) for ch_ in chords_used0] chords_used = np.append(chords_used, chords_used0) sys1 = np.append(sys1, [sys] * len(x[w])) systems_used += [sys] system_okay[i_sys] = True # Any error will abort the loop, so we won't get to here unless things are okay. self.printdq(' {:} {:} looks okay'.format(par, sys)) # Done with system loop # Finish working on this parameter if system_okay.max() < 1: self.printdq(' Failed to gather parameter {:} from any subsystem'.format(par)) continue # Sort by x seq = list(zip(x1, y1, e1, t1, c1, sys1)) seq2 = sorted(seq, key=itemgetter(0)) x2, y2, e2, t22, c22, sys2 = list(zip(*seq2)) x2 = np.array(x2) y2 = np.array(y2) e2 = np.array(e2) t22 = np.array(t22) c22 = np.array(c22) sys2 = np.array(sys2) result[par] = { 'x': x2, 'y': y2, 'e': e2, 't': t22, 'ch': c22, 'channel_type': sys2, 'shot': copy.copy(results[ts_src]['shot']), 'filter_settings': copy.deepcopy(results[ts_src]['settings']), 'time_slices_used': time_slices_used, 'channels_used': chords_used, 'systems_used': systems_used, 'mapping_equilibrium': copy.copy(results['efit_data']['efitID']), } things = ['typical_scale', 'typical_pedestal_scale', 'typical_sol_scale'] for thing in things: if '{}_{}'.format(par, thing) in results[ts_src]: result[par][thing] = results[ts_src]['{}_{}'.format(par, thing)] def perturb_data(data, pert=None): """ Applies a perturbation to data. :param data: result dictionary containing data to be perturbed. :param pert: Instructions for applying perturbations. :return: result dictionary with perturbations added. """ self.printdq('Adding perturbations to Thomson data returned by select_time_window()...') try: self.printdq(' pert.keys() = {}'.format(list(pert.keys()))) except AttributeError: pass else: if 'data_mask' in list(pert.keys()): try: self.printdq(" pert['data_mask'].keys() = {}".format(list(pert['data_mask'].keys()))) except AttributeError: self.printdq(" pert['data_mask'] = {}".format(pert['data_mask'])) # Figure out perturbation type if isinstance(pert, dict): # Normally, the user should pass in a dictionary for pert with pert['type'] = the type they want. randomize = pert.get('random', True) sigma = pert.get('sigma', 1) step_size = pert.get('step_size', None) channel_mask = pert.get('channel_mask', None) time_mask = pert.get('time_mask', None) data_mask = pert.get('data_mask', None) elif type(pert) in [bool, bool_]: # If pert is True instead of a dictionary, do random noise for all channels if pert: randomize = True sigma = 1 step_size = None channel_mask = None time_mask = None data_mask = None else: # Perturbations turned off return data elif pert is None: self.printdq('pert is None: perturbations disabled') return data else: # Could not figure it out printe('Could not figure out perturbation type. Aborting perturb_data() and returning unperturbed data.') print(' pert = {}'.format(pert)) return data if data_mask: self.printdq(' Data mask detected; disabling channel_mask and time_mask.') time_mask = None channel_mask = None for par_ in list(data.keys()): self.printdq(' Perturbing {}...'.format(par_)) p = data[par_] def setup_mask(mask_, xx, is_data=False, label=''): self.printdq(' Mask setup... ({})'.format(label)) n = len(xx) if not (len(np.atleast_1d(mask_)) > 5 or isinstance(mask_, dict)): self.printdq(' Input mask = {}'.format(mask_)) elif isinstance(mask_, dict): self.printdq(' Input mask is a dictionary with mask_.keys() = {}'.format(list(mask_.keys()))) for key in list(mask_.keys()): self.printdq(" len(mask_['{}']) = {}".format(key, len(mask_[key]))) elif len(np.atleast_1d(mask_)) > 5: self.printdq(' Input mask is type {} with length {}'.format(type(mask_), len(np.atleast_1d(mask_)))) else: self.printdq(' Input mask is type {}'.format(type(mask_))) mask_g = copy.deepcopy(mask_) if mask_g is not None: # Try to get out the mask mask = mask_g.get(par_, None) else: mask = None if mask is None: # Make up the default mask, which passes everything self.printdq(' Input mask was replaced with all True because it was None.') mask = np.ones(n, bool) # Try to detect a mask which has been input as a list of channels or slices to pass. mask_okay = (type(tolist(mask)[0]) in [bool, bool_]) and (len(mask) == n) if not mask_okay: if is_data: # The normal mask transformations for channel & time don't work for data, so fail everything printw( ' Improperly formed data mask detected for {}; len(mask) = {} vs. len(xx) = {}. ' 'Cancelling perturbations...'.format(par_, len(mask), len(xx)) ) mask = (xx * 0).astype(bool) else: # The mask is a list of bools of the proper length, but maybe it can be transformed int one. if type(tolist(mask)[0]) in [bool, bool_]: # The mask looks like a list/array of bools, but the length doesn't match. printw( ' Improperly formed {} MASK detected for {}; len(mask) = {} vs. ' 'len(xx) = {}. Cancelling perturbations...'.format(label, par_, len(mask), len(xx)) ) mask = (xx * 0).astype(bool) else: # Assume that mask is a list of time slices or channels to use. Loop through each # channel or time in the list and flag every datum which matches. self.printdq(' mask needs additional processing...') if not ( (isinstance(mask[0], type(xx[0]))) or (isinstance(mask[0], str) and isinstance(xx[0]), str) or (is_numeric(mask[0]) and is_numeric(xx[0])) ): raise ValueError( 'Incompatible types between mask and data or coordinate to be masked ' '({}). Mask type: {}, data type: {}. Types may only disagree if mask ' 'is boolean.'.format(par_, type(mask[0]), type(xx[0])) ) mask0 = copy.copy(mask) mask = np.zeros(n, bool) for m in mask0: if len(np.atleast_1d(xx == m)) != len(mask): raise ValueError( 'An error occurred while masking data for perturbations: ' 'length of mask did not match length of match adjustment.' ) mask[xx == m] = True else: self.printdq(' Mask is a list of bools of the correct length ({}); No more processing!'.format(len(mask))) self.printdq( ' Done processing mask. len(mask) = {}, len(xx) = {}, label = {}, is_data = {}.'.format( len(mask), len(xx), label, is_data ) ) return mask # Resolve masks into lists of T/F flags matching the lengths of time_slices_used, channels_used, and x tm_ = setup_mask(time_mask, p['time_slices_used'], label='time') cm_ = setup_mask(channel_mask, p['channels_used'], label='channel') dm_ = setup_mask(data_mask, p['x'], is_data=True, label='data') # Resolve masks into list of T/F matching length of x valid_ch = p['channels_used'][cm_] valid_t = p['time_slices_used'][tm_] cm_ = [ch_ in valid_ch for ch_ in p['ch']] tm_ = [t_ in valid_t for t_ in p['t']] # Combine masks into base perturbation final_mask = dm_ & cm_ & tm_ perturbation_ = copy.copy(final_mask).astype(float) # Scale the perturbation by sigma*e or by step_size if step_size: perturbation_ *= step_size self.printdq(' Scaled perturbations by absolute step_size {}'.format(step_size)) else: perturbation_ *= sigma * p['e'] self.printdq(' Scaled perturbations by some number of standard deviations ({})'.format(sigma)) if randomize: # Scale the perturbation by normally distributed random numbers perturbation_ *= np.random.normal(0, 1, len(perturbation_)) self.printdq(' Randomized perturbations') p['perturbation'] = perturbation_ p['perturbation_added'] = np.any(perturbation_) p['y'] += perturbation_ p['final_perturbation_mask'] = final_mask p['perturbation_masks'] = {'time': tm_, 'channel': cm_, 'data': dm_, 'valid_lists': {'t': valid_t, 'ch': valid_ch}} return data # Handle perturbations if perturbation: result = perturb_data(result, perturbation) result['_perturbation_added'] = False for par in parameters: if par in result: if result[par].setdefault('perturbation_added', False): result['_perturbation_added'] = True else: self.printdq('No perturbations added to Thomson data') for par in parameters: if par in result: result[par]['perturbation'] = None result[par]['perturbation_added'] = False result['_perturbation_added'] = False # Record instructions for perturbations result['_perturbation'] = perturbation # Note data source result['_realtime'] = realtime result['__ts_src__'] = ts_src return result
# Calculate derived quantities
[docs] def calculate_derived(self, zeff=2.0, ti_over_te=1.0, zi=1.0, zb=6.0, mi=2.0, mb=12.0, use_uncert=False): """ Calculate simple derived quantities. This is limited to quantities which are insensitive to ion measurements and can be reasonably estimated from electron data only with limited assumptions about ions. Assumptions: :param zeff: float Assumed effective charge state of ions in the plasma: Z_eff=(n_i * Z_i^2 + n_b * Z_b^2) / (n_i * Z_i + n_b * Z_b) :param ti_over_te: float or numeric array matching shape of te Assumed ratio of ion to electron temperature :param zi: float or numeric array matching shape of ne Charge state of main ions. Hydrogen/deuterium ions = 1.0 :param mi: float or numeric array matching shape of te Mass of main ions in amu. Deuterium = 2.0 :param zb: float or numeric array matching shape of ne Charge state of dominant impurity. Fully stripped carbon = 6.0 :param mb: float or numeric array matching shape of ne Mass of dominat impurity ions in amu. Carbon = 12.0 :param use_uncert: bool Propagate uncertainties (takes longer) """ # Set up assumptions assume = self['assumptions'] = SortedDict() assume['note'] = ( 'Assumptions used as needed for derived quantities. Calculations should not be very ' 'sensitive to these assumptions such that high accuracy is not required.' ) assume['Z_eff'] = zeff assume['Ti_over_Te'] = ti_over_te assume['Z_i'] = zi assume['Z_b'] = zb assume['m_i'] = mi assume['m_b'] = mb # Start calculating derived quantities out = self['analysis'] = SortedDict() for sub in self.subsystems: out[sub] = SortedDict() assume[sub] = SortedDict() okay = self['filtered'][sub]['filters']['okay'] te = ne = None if self['filtered'][sub]['temp'] is not None: te = uarray(self['filtered'][sub]['temp'], self['filtered'][sub]['temp_e']) if self['filtered'][sub]['density'] is not None: ne = uarray(self['filtered'][sub]['density'], self['filtered'][sub]['density_e']) if (te is not None) and (ne is not None): pe = te * ne * 1.6021766e-19 # Convert eV * m^-3 to Pa self['filtered'][sub]['press'] = nominal_values(pe) self['filtered'][sub]['press_e'] = std_devs(pe) else: self['filtered'][sub]['press'] = self['filtered'][sub]['press_e'] = None if not use_uncert: te, ne = nominal_values(te), nominal_values(ne) te_, ne_, bad = self.data_filter(te, ne, okay=okay) nb = (ne / zb) * (zeff - zi) / (zb - zi) # Get impurity density from Zeff ni = (ne - nb * zb) / zi # Get main ion density from electron and impurity densities ti = te * ti_over_te # Get ion temperature from assumed relationship to electron temperature assume[sub]['n_b'] = nb assume[sub]['n_i'] = ni assume[sub]['T_i'] = ti out[sub]['debye'] = lambda_debye(te=te_, ne=ne_) # m debye_, _ = self.data_filter(out[sub]['debye']) out[sub]['lnLambda'], out[sub]['lnLambda_e'] = self.lnlambda(te_, ne_, debye_) out[sub]['resistivity'] = self.resistivity(te_, zeff, out[sub]['lnLambda']) # Ohm*m out[sub]['slowing_down_time'] = self.taue(te_, ne_, out[sub]['lnLambda_e'], mb, zb) for item in ['debye', 'lnLambda', 'lnLambda_e', 'resistivity', 'slowing_down_time']: out[sub][item][bad] = np.NaN self.status['calcs'] = True
[docs] @staticmethod def lnlambda(te, ne, debye): """ Calculate the Coulomb logarithm for electrons ln(Lambda) :param te: array Electron temperature in eV :param ne: array matching dimensions of te Electron density in m^-3 :param debye: array Debye length in m :return: lnLambda, lnLambda_e """ # General? lnLambda; Source: Callen 2006 book: Fundamentals of plasma physics chapter 2 ee = 4.8032e-10 # Elementary charge in statcoulomb te_erg = te * constants.eV * 1e7 r_c = ee**2 / te_erg # cm r_c /= 100.0 # (m) Classical distance of closest approach r_qm = 1.1e-10 / te**0.5 # m Quantum Mechanical distance of closest approach r_min = np.maximum(r_c, r_qm) ln_lambda = np.log(nominal_values(debye / r_min)) # lnLambda for electron-electron collisions; Source: NRL Plasma Formulary 2009 ln_lambda_e = ( 23.5 - np.log((nominal_values(ne) / 1e6) ** 0.5 * nominal_values(te) ** (-5.0 / 4.0)) - (1e-5 + (np.log(nominal_values(te)) - 2) ** 2 / 16.0) ** 0.5 ) return ln_lambda, ln_lambda_e
[docs] @staticmethod def resistivity(te, zeff, ln_lambda): """ Calculate Spitzer transverse resistivity using NRL Plasma Formulary 2009 Page 29 :param te: float or array Electron temperature in eV :param zeff: float or array matching dimensions of te Effective ion charge state for collisions with electrons :param ln_lambda: float or array matching dimensions of te Coulomb logarithm :return: array matching dimensions of te Resistivity in Ohm*m """ return 1.03e-4 * zeff * ln_lambda * te ** (-1.5)
[docs] @staticmethod def taue(te, ne, ln_lambda_e, m_b=2.0, z_b=1.0): """ Calculates the spitzer slowing down time using equation 5 of W. Heidbrink's 1991 DIII-D physics memo :param te: An array of T_e values in eV :param ne: Array of n_e values in m^-3 :param ln_lambda_e: Coulomb logarithm for electron-electron collisions :param m_b: Atomic mass of beam species (normally deuterium) in atomic mass units :param z_b: Atomic number of beam species (normally deuterium) in elementary charges :return: Fast ion slowing down time in ms. """ self.printdq('fastTS: OMFITlib_slowdown_time...') c = 6.3e8 * m_b / z_b**2 # Collect constants ne2 = ne * 1e-6 # Convert from m^-3 to cm^-3 bottom = ne2 * ln_lambda_e recip = bottom * 0 recip[bottom > 0] = 1.0 / bottom[bottom > 0] # Avoid divide by zero tau_se = c * te**1.5 * recip # Seconds return tau_se * 1000 # ms
[docs] @staticmethod def data_filter(*args, **kwargs): """ Removes bad values from arrays to avoid math errors, for use when calculating Thomson derived quantities :param args: list of items to sanitize :param kwargs: Keywords - okay: array matching dimensions of items in args: Flag indicating whether each element in args is okay - bad_fill_value: float: Value to use to replace bad elements :return: list of sanitized items from args, followed by bad """ bad = np.zeros(np.shape(args[0]), bool) okay = kwargs.pop('okay', None) bad_fill_value = kwargs.pop('bad_fill_value', 1.0) if okay is not None: bad = bad | ~okay for arg in args: sd = std_devs(arg) if (np.atleast_1d(sd) == 0).all(): sd += 1.0 bad = bad | (nominal_values(arg) <= 0) | (sd <= 0) new_args = copy.deepcopy(args) for i in range(len(new_args)): if (new_args[i] is not None) and (np.shape(new_args[i]) == np.shape(bad)): new_args[i][bad] = bad_fill_value return tolist(new_args) + [bad]
# Plot methods
[docs] def plot(self, fig=None, axs=None): """ Launches default plot, which is normally self.elm_detection_plot(); can be changed by setting self.default_plot. :return: (Figure instance, array of Axes instances) Tuple containing references to figure and axes used by the plot """ if fig is None: fig = pyplot.gcf() return getattr(self, self.default_plot, self.contour_plot)(fig=fig, axs=axs)
[docs] @staticmethod def setup_axes(fig, nrow=1, ncol=1, squeeze=True, sharex='none', sharey='none'): """ Utility: add grid of axes to existing figure :param fig: Figure instance :param nrow: int Number of rows :param ncol: int Number of columns :param squeeze: bool Squeeze output to reduce dimensions. Otherwise, output will be a 2D array :param sharex: string Describe how X axis ranges should be shared/linked. Pick from: 'row', 'col', 'all', or 'none' :param sharey: string Describe how Y axis ranges should be shared/linked. Pick from: 'row', 'col', 'all', or 'none' :return: Axes instance or array of axes instances """ import matplotlib.axes pyplot.figure(num=fig.number) axs = np.empty((nrow, ncol), matplotlib.axes.Axes) axs[0, 0] = pyplot.subplot(nrow, ncol, 1) sharex = 'all' if sharex is True else sharex sharey = 'all' if sharey is True else sharey for ix in range(nrow): for iy in range(ncol): i = iy + ncol * ix if (ix > 0) and sharex in ['col', 'column']: sx = axs[0, iy] elif (iy > 0) and sharex == 'row': sx = axs[ix, 0] elif (i > 0) and sharex == 'all': sx = axs[0, 0] else: sx = None if (ix > 0) and sharey in ['col', 'column']: sy = axs[0, iy] elif (iy > 0) and sharey == 'row': sy = axs[ix, 0] elif (i > 0) and sharey == 'all': sy = axs[0, 0] else: sy = None if i > 0: axs[ix, iy] = pyplot.subplot(nrow, ncol, i + 1, sharex=sx, sharey=sy) if squeeze: axs = np.squeeze(axs) return axs
[docs] def profile_plot( self, t=None, dt=25.0, position_type='psi', params=['temp', 'density'], systems='all', unit_convert=True, fig=None, axs=None ): """ Plots profiles of physics quantities vs. spatial position for a selected time window :param t: float Center of time window in ms :param dt: float Half-width of time window in ms. All data between t-dt and t+dt will be plotted. :param position_type: string Name of X coordinate. Valid options: 'R', 'Z', 'PSI' :param params: list of strings List physics quantities to plot. Valid options are temp, density, and press :param systems: list of strings or 'all' List subsystems to include in the plot. Choose all to use self.subsystems. :param unit_convert: bool Convert units from eV to keV, etc. so most quantities will be closer to order 1 in the core. :param fig: Figure instance Plot will be drawn using existing figure instance if one is provided. Otherwise, a new figure will be made. :param axs: 1D array of Axes instances matching length of params. Plots will be drawn using existing Axes instances if they are provided. Otherwise, new axes will be added to fig. :return: Figure instance, 1D array of Axes instances """ from matplotlib import pyplot if systems == 'all': systems = self.subsystems names = { 'press': '$p_e$ (kPa)' if unit_convert else '$p_e$ (Pa)', 'density': '$n_e$ (10$^{19}$/m$^{3}$)' if unit_convert else '$n_e$ (m$^{-3}$)', 'temp': '$T_e$ (keV)' if unit_convert else '$T_e$ (eV)', } multipliers = {'press': 1e-3 if unit_convert else 1, 'density': 1e-19 if unit_convert else 1, 'temp': 1e-3 if unit_convert else 1} if fig is None: fig = pyplot.figure() if axs is None: axs = self.setup_axes(fig, len(tolist(params)), sharex='all') axs = np.atleast_1d(axs) axs[0].set_title('Thomson scattering') for ax in axs[:-1]: ax.tick_params(labelbottom=False) if t is None: self.printdq('No profile time provided to OMFITthomson profile_plot. Picking one arbitrarily...', topic='OMFITthomson') # Pick a time automatically based on where data are available okay1 = self['filtered'][systems[0]]['filters']['okay'] # Sum okay flag along channels okay2 = np.sum(okay1, axis=0) # Find times when most channels are available (avail. ch. count >= 90th percentile) okay3 = okay2 >= np.percentile(okay2, 90) # Take the mean of times when most channels are available and use that to plot t = np.mean(self['filtered'][systems[0]]['time'][okay3]) # Is this a good idea? Well, if you don't like it, try passing in a time! printw( 'No time provided to OMFITthomson.profile_plot. t = {} was chosen; it is the average of times with ' 'high channel availability on the first listed subsystem ({}).'.format(t, systems[0]) ) for sub in systems: wt = (self['filtered'][sub]['time'] >= (t - dt)) & (self['filtered'][sub]['time'] <= (t + dt)) okay = self['filtered'][sub]['filters']['okay'][:, wt] if position_type in ['z', 'Z']: x = self['filtered'][sub]['z'][:, np.newaxis] + 0 * wt axs[-1].set_xlabel('$Z$ (m)') elif position_type in ['r', 'R']: x = self['filtered'][sub]['r'][:, np.newaxis] + 0 * wt axs[-1].set_xlabel('$R$ (m)') else: x = self['filtered'][sub]['psin_TS'][:, wt] axs[-1].set_xlabel(r'$\psi_N$') for i, param in enumerate(tolist(params)): axs[i].set_ylabel(names[param]) y = self['filtered'][sub][param][:, wt] * multipliers[param] try: ye = self['filtered'][sub][param + '_e'][:, wt] * multipliers[param] except KeyError: ye = y * 0 axs[i].errorbar(x[okay], y[okay], ye[okay], label=sub, linestyle='', marker='_') for ax in axs: ax.legend(loc=0) try: # Getting at OMFIT plot utilities when doing stand-alone command-line stuff can be a burden cornernote(shot=self.shot, device=self.device, time=r'{}$\pm${}'.format(t, dt)) except NameError: fig.text( 0.99, 0.01, r'{}#{} {}$\pm${} ms'.format(self.device, self.shot, t, dt), fontsize=10, ha='right', transform=pyplot.gcf().transFigure, ) return fig, axs
[docs] def contour_plot( self, position_type='psi', params=['temp', 'density'], unit_convert=True, combine_data_before_contouring=False, num_color_levels=41, fig=None, axs=None, ): """ Plots contours of a physics quantity vs. time and space :param position_type: string Select position from 'R', 'Z', or 'PSI' :param params: list of strings Select parameters from 'temp', 'density', 'press', or 'redchisq' :param unit_convert: bool Convert units from e.g. eV to keV to try to make most quantities closer to order 1 :param combine_data_before_contouring: bool Combine data into a single array before calling tricontourf. This may look smoother, but it can hide the way arrays from different subsystems are stitched together :param num_color_levels: int Number of contour levels :param fig: Figure instance Provide a Matplotlib Figure instance and an appropriately dimensioned array of Axes instances to overlay :param axs: array of Axes instances Provide a Matplotlib Figure instance and an appropriately dimensioned array of Axes instances to overlay :return: Figure instance, array of Axes instances Returns references to figure and axes used in plot """ from matplotlib import pyplot if fig is None: fig = pyplot.figure() if axs is None: axs = self.setup_axes(fig, len(tolist(params)), sharex='all') axs = np.atleast_1d(axs) axs[-1].set_xlabel('Time (ms)') axs[0].set_title('Thomson scattering') for ax in axs[:-1]: ax.tick_params(labelbottom=False) tt = np.array([]) xx = np.array([]) yy = np.array([]) names = { 'press': '$p_e$ (kPa)' if unit_convert else '$p_e$ (Pa)', 'density': '$n_e$ (10$^{19}$/m$^{3}$)' if unit_convert else '$n_e$ (m$^{-3}$)', 'temp': '$T_e$ (keV)' if unit_convert else '$T_e$ (eV)', } multipliers = {'press': 1e-3 if unit_convert else 1, 'density': 1e-19 if unit_convert else 1, 'temp': 1e-3 if unit_convert else 1} for ax, param in zip(axs, params): for sub in self.subsystems: okay = self['filtered'][sub]['filters']['okay'] t = self['filtered'][sub]['time'][np.newaxis, :] + 0 * okay if position_type in ['z', 'Z']: x = self['filtered'][sub]['z'][:, np.newaxis] + 0 * okay ax.set_ylabel('$Z$ (m)') elif position_type in ['r', 'R']: x = self['filtered'][sub]['r'][:, np.newaxis] + 0 * okay ax.set_ylabel('$R$ (m)') else: x = self['filtered'][sub]['psin_TS'] ax.set_ylabel(r'$\psi_N$') y = nominal_values(self['filtered'][sub][param]) * multipliers.get(param, 1) xf = x.flatten() tf = t.flatten() yf = y.flatten() w = okay.flatten().astype(bool) tf = tf[w] xf = xf[w] yf = yf[w] tt = np.append(tt, tf) xx = np.append(xx, xf) yy = np.append(yy, yf) if not combine_data_before_contouring: im = ax.tricontourf(tf, xf, yf, num_color_levels) if combine_data_before_contouring: im = ax.tricontourf(tt, xx, yy, num_color_levels) cb = pyplot.colorbar(im, ax=ax) cb.set_label(names.get(param, param)) ax.axhline(1.0, color='k', linestyle='--') try: # Getting at OMFIT plot utilities when doing stand-alone command-line stuff can be a burden cornernote(shot=self.shot, device=self.device, time='') except NameError: fig.text(0.99, 0.01, '{}#{}'.format(self.device, self.shot), fontsize=10, ha='right', transform=pyplot.gcf().transFigure) return fig, axs
class TestOMFITthomson(unittest.TestCase): """ Tests for OMFITthomson class and supporting functions. Does not require OMFIT framework/GUI to be running. """ from matplotlib import pyplot # Set up test case device = 'DIII-D' shot = 154749 efitid = 'EFIT03' time0 = 2000.0 # Default time to use when needed # Choose test options show_debug_plots = False # Status flags plots_open = pyplot.get_fignums() # Test helpers def setUp(self): # Announce start of test with debug print (occasionally helps to sort out weird problems) test_id = self.id() test_name = '.'.join(test_id.split('.')[-2:]) printd('\n{}...'.format(test_name)) self.t0 = time.time() self.plots_open = self.pyplot.get_fignums() def tearDown(self): # Announce end of test with debug print self.t1 = time.time() dt = self.t1 - self.t0 test_name = '.'.join(self.id().split('.')[-2:]) printd(' {} done.'.format(test_name)) printd(' {} took {:0.6f} s\n'.format(test_name, dt)) # Automatically set window titles for new plots new_plots = [plot for plot in self.pyplot.get_fignums() if plot not in self.plots_open] for i, plot in enumerate(new_plots): self.pyplot.figure(plot).canvas.set_window_title('{} {}'.format('.'.join(self.id().split('.')[1:]), i)) @classmethod def setUpClass(cls): # Set up reusable class instances cls.basic_tf = OMFITthomson(device=cls.device, shot=cls.shot, efitid=cls.efitid) cls.basic_tf() @classmethod def tearDownClass(cls): if cls.show_debug_plots: cls.pyplot.show() # Tests for OMFITthomson class methods def test_tf_init(self): tf = OMFITthomson(device=self.device, shot=self.shot, efitid=self.efitid) assert tf.shot == self.shot self.assertIsInstance(getattr(tf, 'device', None), str) self.assertIsInstance(getattr(tf, 'shot', None), int) def test_tf_shot_lookup(self): tf = OMFITthomson(device=self.device, shot=0, efitid=self.efitid) self.assertGreater(getattr(tf, 'shot', None), 0) def test_tf_main(self): tf = OMFITthomson(device=self.device, shot=self.shot, efitid=self.efitid) # Needs to work on a fresh instance tf() self.assertIn('raw', tf) assert self.shot == tf['filtered']['shot'] self.assertIn('temp', tf['filtered'][tf.subsystems[0]]) @unittest.skipIf(not os.environ.get('DISPLAY', ''), "Can't test plots without a display") def test_tf_plots(self): import matplotlib import matplotlib.figure import matplotlib.axes fig, axs = self.basic_tf.profile_plot(2000) assert isinstance(fig, matplotlib.figure.Figure) assert isinstance(axs[0], matplotlib.axes.Axes) figc, ax = self.basic_tf.contour_plot() assert isinstance(figc, matplotlib.figure.Figure) def test_to_dataset(self): from xarray import DataArray, Dataset data = self.basic_tf.to_dataset() d0 = data[list(data.keys())[0]] assert isinstance(d0, Dataset) def test_save_load(self): tf = copy.deepcopy(self.basic_tf) # This class doesn't have dedicated save and load methods, so we just have to make sure it can be pickled. pickle.dumps(tf) def test_select_time_window(self): params = ['temp', 'density'] tw = self.basic_tf.select_time_window(self.time0, parameters=params) for param in params: assert param in list(tw.keys()) for x in 'txye': assert x in list(tw[param].keys()) ############################################ if __name__ == '__main__': test_classes_main_header() if len(sys.argv) > 1: try: test_flag = int(sys.argv[1]) except ValueError: test_flag = int(sys.argv[1] == 'test') else: test_flag = 0 if test_flag > 0: sys.argv.pop(1) unittest.main(failfast=False) else: OMFITthomson(shot=154754) # Test init, but don't try to use shot 0 as it will try to connect to d3drdb