Contains classes and functions to perform ELM detection and ELM filtering:
- OMFITelm class; main workhorse for ELM detection
- Some smoothing functions used by OMFITelm
The regression test for OMFITelm is in regression/test_OMFITelm.py
# framework is running
from .startup_choice import *
framework = True
except ImportError as _excp:
# class is imported by itself
framework = False
if (
'attempted relative import with no known parent package' in str(_excp)
or 'No module named \'omfit_classes\'' in str(_excp)
or "No module named '__main__.startup_choice'" in str(_excp)
from startup_choice import *
import numpy as np
from numpy.core import shape, mean
import scipy.signal
from scipy.interpolate import interp1d
import unittest
import warnings
from matplotlib import pyplot
from scipy import constants
# noinspection PyBroadException
import MDSplus
except Exception: # MDSplus raises Exception, so this can't be more narrow
print('Error importing MDSplus in omfit_elm.py.')
mds_available = False
mds_available = True
from omfit_classes.utils_base import printd, tolist, printw, printe
from omfit_classes.utils_fusion import is_device, tokamak
from omfit_classes.utils_math import smooth, smooth_by_convolution, pcs_rc_smooth, function_to_tree, dimens, butter_smooth, deriv
from omfit_classes.omfit_mds import OMFITmdsValue
from omfit_classes.omfit_rdb import OMFITrdb
from omfit_classes.exceptions_omfit import OMFITexception
from omfit_classes.omfit_testing import OMFITtest, manage_tests
from omfit_classes.omfit_eqdsk import read_basic_eq_from_mds
from omfit_classes.exceptions_omfit import doNotReportException as DoNotReportException
# This should work if launched from the command line in the right path
from omfit.omfit_plot import cornernote
from omfit.utils_plot import contrasting_color
from omfit.utils_math import array_info
except (ImportError, OSError):
__all__ = []
scratch = OMFIT['scratch'].setdefault('OMFITelm', SortedDict())
except (KeyError, NameError):
scratch = {}
def _available_to_user(f):
return f
class OMFITelmIncompatibleData(ValueError, DoNotReportException):
"""Input data (such as filterscope signals) can't be used with current settings"""
class OMFITelmNotSupported(ValueError, DoNotReportException):
"""One of the chosen settings is not supported or not yet implemented"""
def asym_gauss_smooth(x, y, s, lag, leading_side_width_factor):
This is a smoothing function with a Gaussian kernel that does not require evenly spaced data and allows the
Gaussian center to be shifted.
:param x: array
Dependent variable
:param y: array
Independent variable
:param s: float
Sigma of tailing side (same units as x)
:param lag: float
Positive values shift the Gaussian kernel back in time to increase weight in the past:
makes the result laggier. (same units as x)
:param leading_side_width_factor: float
The leading side sigma will be this much bigger than the tailing side. Values > 1 increase the weight on
data from the past, making the signal laggier. (unitless)
:return: array
Smoothed version of y
rescale = max(y)
y2 = y / rescale
ys = y2 * 0
n = len(y2)
s2 = s # Sigma for trailing side (future)
s1 = s * leading_side_width_factor # Sigma for leading side (past)
mdx = np.mean(np.diff(x)) # Average delta x
# We will make the smoothing kernel smaller than x or y to make the calculation faster.
# That is, we will truncate the weighting function when it gets small.
ahead = int(np.ceil(2 * s2 / mdx)) # How many indices ahead should the kernel go? (future)
behind = int(np.ceil(2 * s1 / mdx)) # How many indices behind should the kernel go? (past)
shift = int(round(lag / mdx)) # How big is in terms of number of elements?
printd('sigma past = {}'.format(s1), topic='omfit_elm')
printd('sigma future = {}'.format(s2), topic='omfit_elm')
'ahead = {ahead:} samples, {ahead_ms:} ms; behind = {behind:} samples, {behind_ms:} ms; '
'shift = {shift:} samples, {shift_ms:} ms'.format(
ahead=ahead, ahead_ms=ahead * np.mean(np.diff(x)), behind=behind, behind_ms=behind * mdx, shift=shift, shift_ms=shift * mdx
for i in range(n):
a = i - behind - shift # a is the first element to be considered in this time window (past)
if a < 0:
a = 0
b = i + ahead + 1 - shift # b is the last element to be considered in this time window (future)
ish = i - shift # Index of the center of the Gaussian
if b >= n:
# If b goes off the end of the array, then y[a:b] will return fewer than b-a samples. Can't have that!
b = n - 1
if b <= a:
# Lag setting may make a few slices impossible
ys[i] = y2[i]
if a >= b:
ys[i] = y2[i]
if ish <= 0 or ish >= (n - 1):
ys[i] = y2[i]
center = x[i] - lag
dx = x - center
kernel = np.zeros(b - a)
ia = ish - a
ba = b - a
kernel[0:ia] = np.exp(-((dx[a:ish] / s1) ** 2) / 2.0)
kernel[ia:ba] = np.exp(-((dx[ish:b] / s2) ** 2) / 2.0)
kernel = kernel / np.sum(kernel)
ys[i] = np.sum(y2[a:b] * kernel)
ys *= rescale
return ys
def fft_smooth(xx, yy, s):
Smooth by removing part of the spectrum in the frequency domain
1. FFT
2. Cut out part of the spectrum above some frequency
3. Inverse FFT to get back to time domain. The result will be missing some of the high frequency variation.
:param xx: 1D array
Independent variable, such as time
:param yy: 1D array matching length of xx
Dependent variable
:param s: float
Smoothing timescale in units matching xx. The cutoff frequency is cut = 1/(s*pi)
:return: 1D array matching length of xx
Smoothed (lowpass) version of yy
# Assumes that xx data are evenly spaced!
warnings.warn('WARNING: FFT smoother is fast but does not work well. Do not use unless experimenting!')
# This might be salvageable with special tuning. I am not happy with it. It technically works and is FAST.
if len(xx) % 2:
# RFFT wants an even number of elements
cut_last = True
xx = np.append(xx, xx[-1] * 2 - xx[-2])
yy = np.append(yy, yy[-1])
cut_last = False
rft = np.fft.rfft(yy)
freq = np.fft.rfftfreq(len(yy), d=xx[1] - xx[0])
cut = 1.0 / s / np.pi
# Have amplitude decay exponentially above the cutoff frequency. This seems to work better than just setting to 0.
rft[abs(freq) > cut] *= np.exp(-(abs(freq[abs(freq) > cut]) - cut) / cut)
ys = np.fft.irfft(rft)
if cut_last:
ys = ys[:-1]
return ys
class OMFITelm(SortedDict):
Quickly detect ELMs and run a filter that will tell you which time-slices are okay and which should be rejected
based on user specifications for ELM phase, etc.
def __init__(
:param device: string
Name of tokamak or MDS server
:param shot: int
Shot number
:param detection_settings: dict or string.
If this is 'default' or {}, then default settings will be used. To change some of the settings from their
machine-specific defaults, set to a dictionary that can contain:
- default_filterscope_for_elms: string or list of strings giving filterscope(s) to use. Will be overridden
by `use_filterscopes` keyword, if provided.
- smoother: string giving name of smoothing function. Pick from:
- gauss_smooth: Gaussian smoother
- rc_smooth: RC lowpass filter, specifically implemented to mimic approximations used in the DIII-D PCS.
- asym_gauss_smooth: Asymmetric Gaussian smoother with different sigma for past and future.
- combo: Combination of rc_smooth and gauss_smooth
- butter_smooth: Butterworth smoothing lowpass filter
- fft_smooth: lowpass via FFT, cut out part of spectrum, inverse FFT
- time_window: two element list of numbers giving ends of the time range to consider in ms
- time_factor: float: factor to multiply into MDSplus times to get timebase in ms
- jump_hold_max_dt: float Sets maximum dt between input data samples for the Jump&Hold method.
Used to exclude slow data from old shots, which don't work.
- allow_fallback_when_dt_large: bool. If a basic test of data compatibility with the chosen method
fails, a new method may be chosen if this flag is set.
- hold_elm_flag_until_low_dalpha: flag for turning on extra logic to hold the during-ELM state until
D_alpha drops to close to pre-ELM levels.
- hold_elm_low_dalpha_threshold: float: Threshold as a fraction between pre-ELM min & during-ELM max
- hold_elm_min_finding_time_window: float: Interval before ELM start to search for pre-ELM minimum (ms)
- hold_elm_timeout': float: Maximum time the ELM flag hold can persist (ms). The ELM can still be longer
than this as determined by other rules, but the hold flag cannot extend the ELM past this total duration.
- detection_method: int: 0 = classic edge detection style. 1 = new jump and hold strategy.
- find_jump_time_window: float: Length of time window used to find nearby maximum in method 1 (ms)
- find_jump_threshold: float: Threshold for normalized jump size in method 1 (unitless)
- \****_tuning: dict where \**** is the name of a smoothing function. Within this dict can be:
- mild_smooth: (ms) Smoothing timescale for mild smooth
- heavy_smooth_factor: Heavy will be this much stronger than mild
- mild_smooth_derivative_factor: The deriv is of already smoothed data, but it may need a bit more.
- heavy_smooth_derivative_factor: The derivative is smoothed again harder so there can be a diff.
- threshold_on_difference_of_smooths: When mild-heavy is greater than this threshold, it must be
during ELM (relative to max diff)
- threshold_on_difference_of_smoothed_derivatives_plus: When mild(der)-heavy(der) is greater than
this threshold, it must be on one of the edges of an ELM.
- threshold_on_difference_of_smoothed_derivatives_minus: Same as previous, but heavy(der)-mild(der)
- d_thresh_enhance_factor: When the diff of derivs is positive but the less smoothed derivative is
still negative, we're in danger of getting a false positive, so make the threshold higher.
- neg_d_thresh_enhance_factor: Same as before but for when mild(der)-heavy(der) is negative instead.
- debounce: Number of samples in debouncing window
- leading_side_width_factor: [asym_gauss_smooth ONLY]: how big is the asymmetry?
- gauss_center_lag: [asym_gauss_smooth ONLY]: Shift center back in time, making the thing laggier.
Negative value shifts forward in time, putting more weight on the future.
:param filter_settings: dict or string
If this is 'default' or {}, then default settings will be used. To change some of the settings from their
machine-specific defaults, set to a dictionary that can contain:
- elm_phase_range: A two element list or array containing the minimum and maximum acceptable values of
ELM phase. ELM phase starts at 0 when an ELM ends and goes to 1 just before the next ELM begins,
then flips to -1 and is negative during ELMs. ELM phase increases from -1 to 0 during an ELM before
becoming positive again in the next inter-ELM period.
- elm_since_reject_below: Data are rejected if the time since the last ELM is below this threshold.
Useful for packs of closely spaced ELMs; a double spike in D-alpha could register as two ELMs with a
very short inter-ELM period between them when phase would increase from 0 to 1 and a slice could be
accepted, even if there hadn't really been any time for ELM recovery.
This setting is ignored if its value is <= -10 or if either end of the elm_phase_range is < 0.
- elm_since_accept_above: Data are accepted if the time since the last ELM is above this threshold,
regardless of ELM phase. Useful for analyzing shots that have a mixture of ELMing and non-ELMing
phases. An ELM free period will normally be counted as one long inter-ELM period and it will take a
long time for ELM phase to increase, which could lead to rejection of too many data. This setting
overrides the ELM phase test to allow ELM-free periods to be included.
- CER_entire_window_must_pass_ELM_filter: Relevant for CER where stime>0. If this is True, then the
entire averaging window must be in the "good" part of the ELM phase. If this is False, only the
middle has to be in the good part and also no ELMs are allowed to start during the time window in
either case.
:param use_filterscopes: None, False, or input satisfying detection_settings -> default_filterscope_for_elms
Easier-to-access override for default_filterscope_for_elms in detection_settings
:param attempt_sum_tdi_filterscopoes: bool
Try to ask the server to sum D_alpha signals so that they don't have to be interpolated and summed client
side, and so that there will only be one signal transferred. Works sometimes, but not yet entirely reliable.
It also doesn't have an obvious speed benefit.
:param debug: bool
Turn on spammier debug print or printd statements and also save intermediate data
:param debug_plots: bool
Turn on debugging plots
:param on_failure: string
What action should be done for failures?
'exception': raise an exception
'pass': return all ones from filter(); passing all data through the ELM filter (same as no ELM filtering)
:param mode: string
'elm': ELM detection (what this class was always meant to do)
'sawtooth': Sawtooth detection
:param quiet: bool
Suppress some warnings and error messages which would often be useful.
self.debug = debug
self.debug_plots = debug_plots
self.on_failure = on_failure
self.quiet = quiet
printd(' OMFITelm: setup started', topic='omfit_elm')
# super().__init__() # Breaks attempts to use reload_python(omfit_elm)
self.status = {'setup': False, 'gathered': False, 'detected': False, 'calcs': False}
# Handle basic settings
self.device = tokamak(device)
self.supported_devices = ['DIII-D', 'KSTAR', 'NSTX', 'NSTXU']
if not is_device(self.device, self.supported_devices):
' OMFITelm WARNING: Current device {} is not supported and you may encounter errors. Please '
'try again with one of these supported devices: {}'.format(self.device, self.supported_devices)
if shot is None:
shot = 0
if shot <= 0:
shot += self.lookup_latest_shot()
self.shot = shot
self.mode = mode
# Handle machine specific setup details
self.machine_data = {
'DIII-D': {
'mds_tree': None,
'rvsout_ptname': 'rvsout',
'rvsin_ptname': 'rvsin',
'zvsout_ptname': 'zvsout',
'ip_ptname': 'ip',
'filterscope_data': {
'names': [
'locs': np.array(
[1.015, -1.113],
[1.049, -1.256],
[1.114, -1.321],
[1.143, -1.352],
[1.225, -1.362],
[1.307, -1.362],
[1.347, -1.362],
[1.491, -1.252],
[1.602, -1.252],
[1.015, 0.616],
[1.015, 0.861],
[1.015, 1.144],
[1.152, 1.176],
[1.262, 1.293],
[1.413, 1.261],
[1.545, 1.151],
[1.696, 1.079],
[1.016, -0.140],
'NSTX': {
'mds_tree': 'NSTX',
'filterscope_data': {
'names': [
'NSTXU': {
'mds_tree': 'NSTX',
'filterscope_data': {
'names': [
'KSTAR': {
'mds_tree': 'KSTAR',
'filterscope_data': {'names': [r'\pol_ha02']},
'ip_ptname': r'\rc02',
# ELM detection settings
self.default_settings_all = {
'generic': {
'detection': {
'default_filterscope_for_elms': 'auto_select',
'time_factor': 1.0,
'smoother': 'butter_smooth',
'gauss_smooth_tuning': {
'mild_smooth': 0.5,
'heavy_smooth_factor': 5.0,
'heavy_smooth_derivative_factor': 4.0,
'threshold_on_difference_of_smooths': 0.03,
'threshold_on_difference_of_smoothed_derivatives_plus': 0.14,
'threshold_on_difference_of_smoothed_derivatives_minus': 0.28,
'debounce': 7,
'mild_smooth_derivative_factor': 0.0,
'd_thresh_enhance_factor': 1.0,
'neg_d_thresh_enhance_factor': 1.0,
'rc_smooth_tuning': {
'mild_smooth': 0.65,
'heavy_smooth_factor': 3.5,
'heavy_smooth_derivative_factor': 3.0,
'threshold_on_difference_of_smooths': 0.028,
'threshold_on_difference_of_smoothed_derivatives_plus': 0.09,
'threshold_on_difference_of_smoothed_derivatives_minus': 0.18,
'debounce': 7,
'mild_smooth_derivative_factor': 0.5,
'd_thresh_enhance_factor': 3.5,
'neg_d_thresh_enhance_factor': 1.0,
'asym_gauss_smooth_tuning': {
'mild_smooth': 0.5,
'heavy_smooth_factor': 5.0,
'heavy_smooth_derivative_factor': 4.0,
'leading_side_width_factor': 2.0,
'center_lag': 0.0,
'threshold_on_difference_of_smooths': 0.03,
'threshold_on_difference_of_smoothed_derivatives_plus': 0.14,
'threshold_on_difference_of_smoothed_derivatives_minus': 0.28,
'debounce': 7,
'mild_smooth_derivative_factor': 0.0,
'd_thresh_enhance_factor': 1.0,
'neg_d_thresh_enhance_factor': 1.0,
'combo_tuning': {
'mild_smooth': 0.5,
'heavy_smooth_factor': 5.0,
'heavy_smooth_derivative_factor': 3.75,
'threshold_on_difference_of_smooths': 0.035,
'threshold_on_difference_of_smoothed_derivatives_plus': 0.12,
'threshold_on_difference_of_smoothed_derivatives_minus': 0.21,
'debounce': 7,
'mild_smooth_derivative_factor': 0.0,
'd_thresh_enhance_factor': 1.0,
'neg_d_thresh_enhance_factor': 1.0,
'butter_smooth_tuning': {
'mild_smooth': 0.5,
'heavy_smooth_factor': 5.0,
'mild_smooth_derivative_factor': 0.0,
'heavy_smooth_derivative_factor': 4.0,
'threshold_on_difference_of_smooths': 0.05,
'threshold_on_difference_of_smoothed_derivatives_plus': 0.14,
'threshold_on_difference_of_smoothed_derivatives_minus': 0.14,
'd_thresh_enhance_factor': 1.0,
'debounce': 7,
'neg_d_thresh_enhance_factor': 1.0,
'fft_smooth_tuning': {
'mild_smooth': 0.5,
'heavy_smooth_factor': 5.0,
'mild_smooth_derivative_factor': 0.0,
'heavy_smooth_derivative_factor': 3.5,
'threshold_on_difference_of_smooths': 0.028,
'threshold_on_difference_of_smoothed_derivatives_plus': 0.08,
'threshold_on_difference_of_smoothed_derivatives_minus': 0.18,
'd_thresh_enhance_factor': 3.0,
'debounce': 7,
'neg_d_thresh_enhance_factor': 1.0,
'smoother_info': {
'gauss_smooth': {'notes': 'SLOW', 'name': 'Gaussian smooth', 'preferred': False},
'rc_smooth': {'notes': 'good, RT relevant', 'name': 'RC lowpass smoothing filter', 'preferred': False},
'butter_smooth': {'notes': 'fast, RT relevant', 'name': 'Butterworth', 'preferred': True},
'asym_gauss_smooth': {'notes': 'SLOW', 'name': 'Asymmetric Gaussian smooth', 'preferred': False},
'combo': {'notes': 'SLOW', 'name': 'Combination: average RC & Gaussian', 'preferred': False},
'fft_smooth': {'notes': 'testing only', 'name': 'FFT smooth', 'preferred': False},
'time_window': [0, 15000],
'hold_elm_flag_until_low_dalpha': True,
'hold_elm_low_dalpha_threshold': 0.1, # Fraction between pre-ELM min & during-ELM max
'hold_elm_min_finding_time_window': 0.5, # ms
'hold_elm_timeout': 5.0, # ms
'detection_method': 1,
'jump_hold_max_dt': 0.1, # ms
'allow_fallback_when_dt_large': False,
'find_jump_time_window': 0.25, # ms
'find_jump_threshold': 0.12,
'find_jump_presmo': 0.1, # ms
'find_jump_debounce': 0, # samples
'filtering': {
'elm_phase_range': np.array([0, 1]),
'elm_since_reject_below': 0.1,
'elm_since_accept_above': 25.0,
'CER_entire_window_must_pass_ELM_filter': False,
'misc': {'elm_freq_method': 3},
'DIII-D': {},
'EAST': {'detection': {'time_factor': 1e3}},
'KSTAR': {'detection': {'time_factor': 1e3, 'find_jump_threshold': 0.05}},
'NSTX': {
'detection': {
'time_factor': 1e3,
'jump_hold_max_dt': 0.21, # ms # TODO: check this setting
'gauss_smooth_tuning': {
'mild_smooth': 1.5,
'threshold_on_difference_of_smooths': 0.16,
'threshold_on_difference_of_smoothed_derivatives_plus': 0.22,
'threshold_on_difference_of_smoothed_derivatives_minus': 0.33,
'rc_smooth_tuning': {
'mild_smooth': 2.0,
'threshold_on_difference_of_smooths': 0.18,
'threshold_on_difference_of_smoothed_derivatives_plus': 0.25,
'threshold_on_difference_of_smoothed_derivatives_minus': 0.25,
'asym_gauss_smooth_tuning': {},
'combo_tuning': {},
'butter_smooth_tuning': {
'mild_smooth': 2.0,
'heavy_smooth_factor': 4.0,
'threshold_on_difference_of_smooths': 0.2,
'threshold_on_difference_of_smoothed_derivatives_plus': 0.3,
'threshold_on_difference_of_smoothed_derivatives_minus': 0.4,
'filtering': {},
'misc': {},
'NSTXU': {
'detection': {
'jump_hold_max_dt': 0.21, # ms # TODO: check this setting
'gauss_smooth_tuning': {},
'rc_smooth_tuning': {},
'asym_gauss_smooth_tuning': {},
'combo_tuning': {},
'butter_smooth_tuning': {
'threshold_on_difference_of_smooths': 0.04,
'threshold_on_difference_of_smoothed_derivatives_plus': 0.08,
'threshold_on_difference_of_smoothed_derivatives_minus': 0.25,
'd_thresh_enhance_factor': 1.0,
'neg_d_thresh_enhance_factor': 0.5,
'filtering': {},
'misc': {},
self.default_settings_elm = {
'generic': {
'detection': {
'gauss_smooth_tuning': {},
'rc_smooth_tuning': {},
'asym_gauss_smooth_tuning': {},
'combo_tuning': {},
'butter_smooth_tuning': {},
'filtering': {},
'misc': {},
'DIII-D': {
'detection': {
'gauss_smooth_tuning': {},
'rc_smooth_tuning': {},
'asym_gauss_smooth_tuning': {},
'combo_tuning': {},
'butter_smooth_tuning': {},
'filtering': {},
'misc': {},
'NSTX': {
'detection': {
'gauss_smooth_tuning': {},
'rc_smooth_tuning': {},
'asym_gauss_smooth_tuning': {},
'combo_tuning': {},
'butter_smooth_tuning': {},
'filtering': {},
'misc': {},
'NSTXU': {
'detection': {
'gauss_smooth_tuning': {},
'rc_smooth_tuning': {},
'asym_gauss_smooth_tuning': {},
'combo_tuning': {},
'butter_smooth_tuning': {},
'filtering': {},
'misc': {},
self.default_settings_saw = {
'generic': {
'detection': {
'gauss_smooth_tuning': {},
'rc_smooth_tuning': {},
'asym_gauss_smooth_tuning': {},
'combo_tuning': {},
'butter_smooth_tuning': {
'mild_smooth': 0.5,
'heavy_smooth_factor': 5.0,
'heavy_smooth_derivative_factor': 4.0,
'threshold_on_difference_of_smooths': 0.06,
'threshold_on_difference_of_smoothed_derivatives_plus': 0.12,
'threshold_on_difference_of_smoothed_derivatives_minus': 0.12,
'debounce': 3,
'mild_smooth_derivative_factor': 0.0,
'd_thresh_enhance_factor': 1.0,
'neg_d_thresh_enhance_factor': 1.0,
'hold_elm_flag_until_low_dalpha': False,
'hold_elm_low_dalpha_threshold': 0.1, # Fraction between pre-ELM min & during-ELM max
'hold_elm_min_finding_time_window': 0.5, # ms
'hold_elm_timeout': 5.0, # ms
'detection_method': 0,
'jump_hold_max_dt': 0.5, # ms
'find_jump_time_window': 0.25, # ms
'find_jump_threshold': 0.12,
'find_jump_presmo': 0.1, # ms
'find_jump_debounce': 0, # samples
'filtering': {},
'misc': {},
'DIII-D': {
'detection': {
'gauss_smooth_tuning': {},
'rc_smooth_tuning': {},
'asym_gauss_smooth_tuning': {},
'combo_tuning': {},
'butter_smooth_tuning': {},
'filtering': {},
'misc': {},
'NSTX': {
'detection': {
'gauss_smooth_tuning': {},
'rc_smooth_tuning': {},
'asym_gauss_smooth_tuning': {},
'combo_tuning': {},
'butter_smooth_tuning': {},
'filtering': {},
'misc': {},
'NSTXU': {
'detection': {
'gauss_smooth_tuning': {},
'rc_smooth_tuning': {},
'asym_gauss_smooth_tuning': {},
'combo_tuning': {},
'butter_smooth_tuning': {},
'filtering': {},
'misc': {},
if self.mode == 'sawtooth':
recursiveUpdate(self.default_settings_all, self.default_settings_saw)
recursiveUpdate(self.default_settings_all, self.default_settings_elm)
self.default_settings = copy.deepcopy(self.default_settings_all['generic'])
recursiveUpdate(self.default_settings, self.default_settings_all.get(tokamak(device), {}))
self['settings'] = copy.deepcopy(self.default_settings)
'detection': detection_settings if isinstance(detection_settings, dict) else {},
'filtering': filter_settings if isinstance(filter_settings, dict) else {},
if not use_filterscopes:
self['settings']['detection']['filterscopes'] = tolist(self['settings']['detection']['default_filterscope_for_elms'])
self['settings']['detection']['filterscopes'] = tolist(use_filterscopes)
self.attempt_sum_tdi_filterscopoes = attempt_sum_tdi_filterscopoes
self.device_groups = {}
self.device_group = self.device_groups.get(self.device, self.device)
self.detection_settings_used = None
self['plots'] = SortedDict()
self.default_plot = 'elm_detection_plot'
scratch['shot'] = self.shot
scratch['device'] = self.device
scratch['settings'] = self['settings']
self.status['setup'] = True
if not mds_available:
printe('Unable to import MDSplus! OMFITelm will not be able to gather any data!')
def __call__(self, times_to_filter=None, **kwargs):
Runs detection as needed and optionally filters times (if provided)
:param times: [Optional] 1D array or list of such arrays as would be accepted by self.filter()
If times to filter are provided, __call__ will pass times_to_filter and kwargs to self.filter()
(after making sure detection is complete)
:param kwargs: keyword arguments to pass to .filter(), if applicable
:return: None or output from self.filter(), which is a 1D array ro list of 1D arrays.
if not self.get('success', False):
if times_to_filter is None:
return None
return self.filter(times_to_filter, **kwargs)
def __tree_repr__(self):
"""Provides the label that OMFIT should use while representing class instances in the tree."""
if not mds_available:
return 'Disabled: no MDSplus support', []
if self.status['calcs']:
state = 'ready (step 3/3)'
elif self.status['detected']:
state = 'detected (step 2/3)'
elif self.status['gathered']:
state = 'gathered (step 1/3)'
state = 'blank (step 0/3)'
label = '{mode:}{device:}#{shot:}, [{state:}], {smo:}, ph={phase_range:}, sa={since:}, sr={reject:}, ' 'fs={fs:}'.format(
mode='SAWTOOTH! ' if self.mode == 'sawtooth' else '',
return label, []
[docs] def lookup_latest_shot(self):
"""Looks up the last shot. Works for DIII-D. Used automatically if shot <= 0, in which case the shot number is
treated as relative to the last shot."""
import psycopg2
if is_device(self.device, 'DIII-D'):
last_shot = OMFITrdb(
"SELECT shot FROM summaries WHERE shot = (SELECT MAX(shot) FROM summaries)",
except (psycopg2.ProgrammingError, KeyError):
raise OMFITexception('Shot 0 lookup failed. Please confirm access to d3drdb.')
raise NotImplementedError(
'OMFITelm: Shot 0 lookup functionality is not implemented for device={}. '
'Please choose a real shot number.'.format(self.device)
return last_shot
[docs] def guess_time_factor(self, mdsvalue=None, mds_time_units=None, device=None):
Tries to guess the time factor needed to convert to ms
:param mdsvalue: OMFITmdsValue instance [optional]
Used to obtain mds_time_units, if provided
:param mds_time_units: string [optional]
This string will be compared to common ways of representing various
time units. If this is not provided and cannot be obtained, the guess
will be based on device only.
This setting is ignored if:
- mdsvalue is provided
- The device is one which is known to frequently have incorrect
units logged in MDSplus.
:param device: string [optinal]
Defaults to self.device. This shouldn't have to be supplied
except for testing or exotic applications.
:return: float
Factor needed to convert from time units to ms.
Returns None if it cannot be determined.
f' Attempting to guess time unit conversion factor for OMFITmdsValue instance {mdsvalue}, '
f'mds_time_units={mds_time_units}, device={device}',
ways_to_write_seconds = ['s', 'sec', 'seconds']
ways_to_write_ms = ['ms', 'milliseconds', 'milli-seconds', 'millisec', 'msec']
devices_that_use_seconds_by_default = ['EAST', 'EAST_US', 'KSTAR', 'NSTX', 'NSTXU']
devices_that_use_ms_by_default = ['DIII-D']
devices_that_might_have_incorrect_units_listing = ['EAST', 'EAST_US', 'KSTAR']
if device is None:
device = self.device
if mdsvalue is None:
time_units = mds_time_units
time_units = mdsvalue.units_dim_of(0)
if time_units is None:
time_units = mds_time_units
time_units = time_units.lower()
f' Updated values in time unit conversion factor guessing: mds_time_units={mds_time_units}, device={device}', topic='omfit_elm'
if time_units in ways_to_write_seconds:
tf = 1e3
elif (time_units in ways_to_write_ms) and (device not in devices_that_might_have_incorrect_units_listing):
tf = 1.0
elif device in devices_that_use_seconds_by_default:
tf = 1e3
elif device in devices_that_use_ms_by_default:
tf = 1.0
tf = None
if tf is None:
printd(' Could not determine time factor', topic='omfit_elm')
elif tf == 1e3:
printd(' Set time unit conversion factor to 1e3 to convert sec to ms', topic='omfit_elm')
elif tf == 1:
printd(' Set time unit conversion factor to 1 because time already appears to be in ms', topic='omfit_elm')
return tf
[docs] def get_ip(self, t):
Gets plasma current on a specified timebase and stores it with other arrays
:param t: 1D float array
Time in ms
ip = OMFITmdsValue( # Plasma current
tf = self['settings']['detection'].get('time_factor', None)
if tf is None:
iptf = self.guess_time_factor(mdsvalue=ip)
if iptf is None:
printw('Warning! Failed to guess time unit factor for Ip. Using 1.0.')
iptf = 1.0
iptf = tf
return interp1d(ip.dim_of(0) * iptf, ip.data(), bounds_error=False, fill_value='extrapolate')(t)
[docs] def select_filterscope(self):
Automatically pick which filterscope(s) or D_alpha measurement(s) to use as input to the ELM detection algorithm
# Original script created as part of OMFIT module ELM_processing
# Created by eggertw at 2016 Jul 06 10:34
# Overhauled by roelofsm at 2016 Dec 13 17:46
# Edited more by eldond
if self.mode == 'sawtooth':
det_set = self['settings']['detection']
tree = self.machine_data[self.device]['mds_tree']
printd(' OMFITelm: Picking the best filterscope chord(s) to use for ELM detection...', topic='omfit_elm')
if is_device(self.device, ['NSTX', 'NSTXU']):
# NSTX has filterscopes with very wide views; only need one per divertor.
# Might as well just add them together and always use the sum of both, right?
acceptlist_clean = ['.'.join(a.split('.')[-2:]) for a in acceptlist]
printd(' > Accepted filterscopes: for {:}#{:}: {:}'.format(self.device, self.shot, acceptlist_clean), topic='omfit_elm')
det_set['filterscopes'] = acceptlist
det_set['filterscope_chosen_automatically_for_shot'] = copy.copy(self.shot)
elif is_device(self.device, 'KSTAR'):
acceptlist = [r'\pol_ha02']
acceptlist_clean = ['pol_ha02']
det_set['filterscopes'] = acceptlist
det_set['filterscope_chosen_automatically_for_shot'] = copy.copy(self.shot)
elif is_device(self.device, 'DIII-D'):
if not mds_available:
printe('No MDSplus module available; cannot run filterscope selector.')
fsnames_available = []
acceptlist = []
# Filterscope (r,z) coordinates, organized so the first column is the radii, second column is the height
fslocs = self.machine_data[self.device]['filterscope_data']['locs']
# Filterscope names, in the same order as the (r,z) pairs above
fsnames = tolist(self.machine_data[self.device]['filterscope_data']['names'])
det_set.setdefault('bad_filterscope_channels', [])
if det_set['bad_filterscope_channels'] is not None:
for badone in det_set['bad_filterscope_channels']:
if badone in fsnames:
printd(' > Filterscope {} is listed as bad for this shot, removing...'.format(badone), topic='omfit_elm')
# Check that all filterscopes are working (or turned on), and discard the ones that aren't.
# Instead of setting a signal level floor, code now check for all finite or all zero.
# There is the current assumption that some points are allowed to be non-finite, but it might be the case
# that a chord should be removed if any value is nonfinite.
if (
'bad_filterscope_channels_listed_for_shot' not in det_set
or det_set['bad_filterscope_channels_listed_for_shot'] != self.shot
# Reset the bad fs channels if they weren't listed for this shot
det_set['bad_filterscope_channels_listed_for_shot'] = self.shot
det_set['bad_filterscope_channels'] = []
validloc = np.array([])
for i, name in enumerate(fsnames):
tmpval = OMFITmdsValue(server=self.device, treename=tree, shot=self.shot, TDI=name)
except MDSplus.MdsException:
printd('Filterscope {} ran into an mdsException, removing...'.format(name), topic='omfit_elm')
bad = True
if tmpval.dim_of(0) is None or tolist(tmpval.dim_of(0)) == [0]:
printd('Filterscope {} is missing x values, removing...'.format(name), topic='omfit_elm')
bad = True
elif tmpval.data() is None or tolist(tmpval.data()) == [0]:
printd(' > Filterscope {} is missing y values, removing...'.format(name), topic='omfit_elm')
bad = True
elif all((~np.isfinite(tmpval.data())) | (tmpval.data() == 0)): # If all data is non-finite or 0
printd(' > Filterscope {}is either off or not working, removing...'.format(name), topic='omfit_elm')
# Add it to the bad list
bad = True
bad = False
if bad:
det_set['bad_filterscope_channels'] = tolist(det_set['bad_filterscope_channels']) + [name]
validloc = np.append(validloc, [i])
validloc = validloc.astype(int)
# Filter to the good ones.
good_fslocs = fslocs[validloc, :]
good_fsnames = np.array(fsnames)[validloc]
# Gather the data we need (distances in meters)
rvsout = OMFITmdsValue( # Major radius of outer strike point of primary separatrix
server=self.device, treename=tree, shot=self.shot, TDI=self.machine_data[self.device]['rvsout_ptname']
rvsin = OMFITmdsValue( # Major radius of inner strike point of primary separatrix
server=self.device, treename=tree, shot=self.shot, TDI=self.machine_data[self.device]['rvsin_ptname']
zvsout = OMFITmdsValue( # Height of outer strike point of primary separatrix
server=self.device, treename=tree, shot=self.shot, TDI=self.machine_data[self.device]['zvsout_ptname']
if rvsout.dim_of(0) is not None:
t = rvsout.dim_of(0) * self.guess_time_factor(rvsout)
printd(' > Used rvsout.dim_of(0) as time coordinate', topic='omfit_elm')
elif rvsin.dim_of(0) is not None:
t = rvsin.dim_of(0) * self.guess_time_factor(rvsin)
printd(' > Used rvsin.dim_of(0) as time coordinate', topic='omfit_elm')
elif zvsout.dim_of(0) is not None:
t = zvsout.dim_of(0) * self.guess_time_factor(zvsout)
printd(' > Used zvsout.dim_of(0) as time coordinate', topic='omfit_elm')
t = None
printe('FAILURE: The time coordinate of all the strike points was None!')
# Extract the values
rvs_valsout = rvsout.data()
rvs_valsin = rvsin.data()
zvs_vals = zvsout.data()
ip_vals = self.get_ip(t)
# Produces a filter accepting times when plasma current is >= 80% of the 95% percentile (outlier rejection)
# and where R > 0 (when EFIT failed to find an X-point), if there are any points with R > 0
w = abs(ip_vals) >= 0.8 * np.nanpercentile(ip_vals, 0.95)
if np.any(rvs_valsin > 0):
w &= rvs_valsin > 0
f'WARNING: no data for strike point coordinates. EFIT may have failed for {self.shot}. '
f'Filterscope selection will be suboptimal because of this.'
# Average strike point for the shot
if sum(w):
r_avgout = np.nanmean(rvs_valsout[w])
r_avgin = np.nanmean(rvs_valsin[w])
z_avg = np.nanmean(zvs_vals[w])
r_avgout = r_avgin = z_avg = np.NaN
if abs(z_avg) < 0.8: # Check to see if this is a double null where the x point could change sides often
printd(' > Filterscope picker: this may be a double null. DN contingency activated.', topic='omfit_elm')
if z_avg >= 0: # Mostly upper biased
' >> Looks like mostly upper biased: look for a FS near the upper outer strike point '
'(filter time for zvsout >= 0)',
u2 = zvs_vals >= 0
else: # Mostly lower biased
' >> Looks like mostly lower biased: look for a FS near the lower outer strike point '
'(filter time for zvsout < 0)',
u2 = zvs_vals < 0
if self.debug:
printd('u2 = {:}'.format(u2), topic='omfit_elm')
printd(' >> np.shape(u2) = {:}'.format(np.shape(np.atleast_1d(u2))), topic='omfit_elm')
printd(' > Re-filtering signals with special filter for DN cases and recomputing averages...', topic='omfit_elm')
w *= u2 # Filter to be only upper or only lower
r_avgout = np.mean(rvs_valsout[w])
r_avgin = np.mean(rvs_valsin[w])
z_avg = np.mean(zvs_vals[w])
# Determine upper and lower chords.
if z_avg > 0: # Filter upper vs lower null
i_good = good_fslocs[:, 1] > 0.5 # Also filter out middle views.
i_good = good_fslocs[:, 1] < -0.5
# Apply up/down filter.
good_fslocs = good_fslocs[i_good, :]
good_fsnames = good_fsnames[i_good]
validloc = validloc[i_good]
# This block ensures the chord is not looking at the private flux region.
if r_avgin <= 1.015:
r_avgin = 1.016 # If the inner strike point is on (or inside) the center post, pretend
# it is 1mm out from it.
for i in range(len(good_fsnames)):
if (r_avgout < good_fslocs[i, 0]) or (r_avgin > good_fslocs[i, 0]):
if self.debug: # Avoid spam by requiring self.debug and flag for printd to be set.
printd(' r_avgout = {}'.format(r_avgout), topic='omfit_elm')
printd(' r_avgin = {}'.format(r_avgin), topic='omfit_elm')
printd(' z_avg = {}'.format(z_avg), topic='omfit_elm')
printd(' good_fslocs = {}'.format(good_fslocs), topic='omfit_elm')
printd(' np.shape(fslocs) = {}'.format(np.shape(fslocs)), topic='omfit_elm')
printd(' > Accepted filterscopes for {}#{}: {:}'.format(self.device, self.shot, acceptlist), topic='omfit_elm')
# Chord selection can be improved in the future with a goodness hierachy instead of a strict filtering.
# But currently we have not encountered robutness problems without fallback schemes.
if len(acceptlist) <= 0:
raise OMFITexception("Filterscope failed! No good chord found!")
det_set['filterscopes'] = acceptlist
det_set['filterscope_chosen_automatically_for_shot'] = copy.copy(self.shot)
else: # Unrecognized device
raise OMFITelmNotSupported(
f'Automatic filterscope selection is not supported for {device}. Please specify filterscopes manually.'
[docs] def get_omega_ce(self):
Gathers equilibrium data and calculates local electron cyclotron emission frequency for use in mapping.
This is relevant for sawtooth mode only.
:return: time (ms), rho (unitless), psi_N (unitless), omega_ce (radians/sec)
time is 1D with length nt
All others are 2D vs time and space with shape (nt, nx)
eq = read_basic_eq_from_mds(
g_file_quantities=['r', 'z', 'rhovn', 'fpol', 'psirz'],
derived_quantities=['psin1d', 'time', 'psin'],
imid = abs(eq['z']).argmin()
pm = eq['psin'][:, imid, :]
fpol = [np.array([None])] * len(eq['time'])
rho = [np.array([None])] * len(eq['time'])
for i in range(len(eq['time'])):
fpol[i] = interp1d(eq['psin1d'], eq['fpol'][i, :], bounds_error=False, fill_value='extrapolate')(pm[i, :])
rho[i] = interp1d(eq['psin1d'], eq['rhovn'][i, :], bounds_error=False, fill_value=1.05)(pm[i, :])
fpol = np.array(fpol)
rho = np.array(rho)
bt = fpol / eq['r']
dpdt, dpdr, dpdz = np.gradient(eq['psirz'], eq['time'], eq['r'][1] - eq['r'][0], eq['z'][1] - eq['z'][0])
br = dpdz[:, imid, :] / eq['r']
bz = -dpdr[:, imid, :] / eq['r']
btot = np.sqrt(br**2 + bz**2 + bt**2)
if self.debug:
self['ece_data']['bt'] = bt
self['ece_data']['br'] = br
self['ece_data']['bz'] = bz
self['ece_data']['btot'] = btot
self['ece_data']['r_midplane'] = eq['r']
self['ece_data']['psin_midplane'] = pm
tbt = eq['time']
wce = constants.e * btot / constants.m_e # radians/s, no relativistic correction
return tbt, rho, pm, wce # time (ms), space (unitless), space (unitless), angular frequency (rad/s)
[docs] def select_sawtooth_signal(self, rho_close_to_axis=0.1, rho_far_from_axis=0.4):
Automatically chooses signals to use for sawtooth detection.
First: try to get within rho < rho_close_to_axis.
Second: avoid cut-off.
If there are no good channels (no cut-off) within rho <= rho_close_to_axis,
get the closest channel in that's not cut-off, but don't look at rho >= rho_far_from_axis for it.
For cut-off, use average density to estimate local density, based on the assumption that anything farther
out than the top of the pedestal can be ignored anyway, and that density has a small gradient in the core. This
might be okay.
There is no relativistic correction for frequency, which could lead to a small error in position which is
deemed to be irrelevant for this application.
:param rho_close_to_axis: float between 0 and 1
The first try is to find non-cut-off channels with rho <= rho_close_to_axis
:param rho_far_from_axis: float between 0 and 1
If no channels with rho <= rho_close_to_axis, find the closest non-cut-off channel, but only use it if
its rho < rho_far_from_axis
:return: list of strings
Pointnames for good ECE channels to use in sawtooth detection
det_set = self['settings']['detection'] # Shortcut
def get_ece():
"""Gathers ECE data"""
auto_sawtooth_signal_supported_devices = ['DIII-D']
if not is_device(self.device, auto_sawtooth_signal_supported_devices):
raise OMFITelmNotSupported(
f"Automatic ECE / sawtooth signal selection is not supported for device {self.device}. "
f"Supported devices are {auto_sawtooth_signal_supported_devices}"
tf = self['settings']['detection'].get('time_factor', 1)
self['ece_data'] = SortedDict()
valid = OMFITmdsValue(server=self.device, shot=self.shot, treename='ELECTRONS', TDI=r'\ELECTRONS::TOP.ECE.CAL.VALID').data()
tedat = OMFITmdsValue(server=self.device, shot=self.shot, treename='ELECTRONS', TDI=r'\ELECTRONS::TOP.ECE.PROFS.ECEPROF') # eV
freq = OMFITmdsValue(server=self.device, shot=self.shot, treename='ELECTRONS', TDI=r'\ELECTRONS::TOP.ECE.SETUP.FREQ') # GHz
nc = len(valid)
tte = tedat.dim_of(0) * tf # Time in ms
cte = tedat.dim_of(1) # Channel numbers, counting from 1
if self.debug:
self['ece_data']['tedat'] = tedat
self['ece_data']['valid'] = valid
ite = cte - 1 # Indices
te = tedat.data()
nt = np.shape(te)[1]
new_te = np.zeros((nc, nt))
for j, i in enumerate(ite):
new_te[i] = te[j]
new_te[~valid.astype(bool), :] = 0
self['ece_data']['time'] = tte # ms
self['ece_data']['te'] = new_te # eV
self['ece_data']['freq'] = freq.data()[:nc] # GHz
def get_ece_rho(harmonic=2):
"""Gets rho values for ECE channels"""
twce, rhowce, psiwce, wce = self.get_omega_ce()
fce = interp1d(twce, wce, bounds_error=False, fill_value=0, axis=0)(self['ece_data']['time']) / (2 * np.pi)
fce /= 1e9 # Hz to GHz
fce *= harmonic
rhowce_ = interp1d(twce, rhowce, bounds_error=False, fill_value=1.05, axis=0)(self['ece_data']['time'])
if self.debug:
self['ece_data']['fce'] = fce
self['ece_data']['rhowce'] = rhowce
self['ece_data']['rhowce_'] = rhowce_
rho = np.array(
interp1d(fce[i, :], rhowce_[i, :], bounds_error=False, fill_value=1.05)(self['ece_data']['freq'])
for i in range(len(self['ece_data']['time']))
self['ece_data']['rho'] = rho
def get_cutoff():
"""Get information used for determining cut-off of ECE channels"""
density_ = OMFITmdsValue(server=self.device, shot=self.shot, TDI='density')
self['ece_data']['avg_density'] = interp1d(density_.dim_of(0), density_.data() / 1e13, bounds_error=False, fill_value=0)(
cc = constants.e**2 / (constants.m_e * constants.epsilon_0) # Cool Constants
wce = self['ece_data']['freq'] * 2 * np.pi * 1e9 # radians/sec
self['ece_data']['cutoff_density'] = wce**2 / cc / 1e19 # 1e19/m^3
self['ece_data']['cutoff_density'] /= 2.0 # Based on ECE_mapping in OMFITprofiles; maybe b/c 2nd harmonic?
# Only consider which channels to use during the Ip flattop.
ip = self['ece_data']['ip'] = self.get_ip(self['ece_data']['time'])
ip_thresh = np.percentile(abs(ip), 80) * 0.95
ip_okay = abs(ip) > ip_thresh
# Flag channels which are in danger of cutoff
typical_density = np.percentile(self['ece_data']['avg_density'][ip_okay], 99)
cut_flag = typical_density > self['ece_data']['cutoff_density']
if self.debug:
self['ece_data']['typical_density'] = typical_density
self['ece_data']['cutoff_flag'] = cut_flag
# Flag channels near the axis
rho_max = self['ece_data']['rho_max'] = np.percentile(self['ece_data']['rho'][ip_okay, :], 95, axis=0)
# Use 95th percentile instead of max to exclude some outliers
close = rho_max <= rho_close_to_axis
if not np.any(close & ~cut_flag):
rho_max2 = copy.copy(rho_max)
rho_max2[cut_flag] = 100 # Set cut-off channels to be really far out (basically ignore them)
if np.min(rho_max2) < rho_far_from_axis:
# Backup plan if no chans are really close to the axis: just use the closest one unless it's really far
# Ignore cutoff channels while selecting closest one
close[rho_max2.argmin()] = True
if self.debug:
self['ece_data']['close_flag'] = close
# Finalize channel selection
channels = np.arange(len(close))[close] + 1
ece_signals = ['tece{:02d}'.format(ch) for ch in channels]
det_set['filterscopes'] = ece_signals
det_set['filterscope_chosen_automatically_for_shot'] = copy.copy(self.shot)
return ece_signals
[docs] def set_filterscope(self, fs):
Change which filterscope(s) are used and delete any data which would become inconsistent with the new choice.
:param fs: string or list of strings giving pointnames for filterscopes
Should follow the rules for use_filterscopes keyword in __init__
det_set = self['settings']['detection']
new_filterscopes = tolist(fs)
if ('filterscopes' in det_set) and (tolist(det_set['filterscopes']) == new_filterscopes):
printd('Filterscope setting was already {}. No action needed.'.format(det_set['filterscopes']), topic='omfit_elm')
elif ('auto_select' in new_filterscopes or 'auto' in new_filterscopes or fs == 'auto') and (
'auto_select' in det_set['default_filterscope_for_elms']
or 'auto' in det_set['default_filterscope_for_elms']
or det_set['default_filterscope_for_elms'] == 'auto'
and ('filterscopes' in det_set)
'New filterscope setting instructs auto selection, but current settings are already auto select. ' 'No action needed.',
printd('New filterscope selection does not match existing settings. Purging all affected data.', topic='omfit_elm')
del_from_self = []
del_from_set = []
# Erase effects of automatic filterscope selection, if any
del_from_set += ['bad_filterscope_channels', 'filterscope_chosen_automatically_for_shot']
# Erase effects of get_dalpha, if any
del_from_self += ['dalpha_shot', 'dalpha_filters', 'dalpha_time', 'dalpha_y', 'get_dalpha_debugging']
# Erase effects of detection, if any
del_from_self += [
# Erase effects of ELM frequency calculation, if any
del_from_self += ['elm_freq_time', 'elm_freq', 'elm_freq_method']
# Clear out-of-date data
for remove_it in del_from_self:
if remove_it in self:
del self[remove_it]
for remove_it in del_from_set:
if remove_it in det_set:
del det_set[remove_it]
# Update settings
det_set['filterscopes'] = new_filterscopes
det_set['default_filterscope_for_elms'] = new_filterscopes
if 'auto_select' in det_set['filterscopes'] or 'auto' in det_set['filterscopes'] or fs == 'auto':
printd('Filterscope settings updated.', topic='omfit_elm')
[docs] def set_detection_params(self, detection_settings):
Updates ELM detection settings and clears out-of-date data, if any
:param detection_settings: dictionary consistent with the detection_settings keyword in __init__
if detection_settings != self['settings']['detection']:
printd('New detection settings do not match existing settings. Purging affected data.', topic='omfit_elm')
del_from_self = []
# Erase effects of detection, if any
del_from_self += [
# Erase effects of ELM frequency calculation, if any
del_from_self += ['elm_freq_time', 'elm_freq', 'elm_freq_method']
# Clear out-of-date data
for remove_it in del_from_self:
if remove_it in self:
del self[remove_it]
self['settings']['detection'] = copy.deepcopy(detection_settings)
printd('ELM detection settings updated.', topic='omfit_elm')
printd('New detection settings match existing settings. No action needed.', topic='omfit_elm')
[docs] def set_filter_params(self, filter_settings):
Updates ELM filter settings so that subsequent calls to .filter() may give different results.
:param filter_settings: dictionary consistent with the filter_settings keyword in __init___
self['settings']['filtering'] = filter_settings
[docs] def get_dalpha(self):
Gathers D_alpha for use in ELM_detection
if not mds_available:
printe('No MDSplus module available; cannot get D_alpha.')
t0 = time.time()
if self.debug:
self['get_dalpha_debugging'] = {}
det_set = self['settings']['detection']
fss = self['settings']['detection']['filterscopes']
if fss is None or tolist(fss)[0] is None or tolist(fss)[0] == 'None':
printd(' OMFITelm: invalid filterscope specification; resetting to auto_select')
fss = self['settings']['detection']['filterscopes'] = ['auto_select']
if 'auto_select' in fss or 'auto' in fss or fss == 'auto':
' OMFITelm: Calling auto filterscope selector because settings/detection/filterscopes = {}'.format(
' OMFITelm: SKIPPED auto filterscope selector because settings/detection/filterscopes = {}'.format(
' OMFITelm: Gathering D_alpha measurements for shot {}, filterscope(s) {}'.format(
self.shot, self['settings']['detection']['filterscopes']
fs = det_set['filterscopes']
treename = self.machine_data[self.device]['mds_tree']
tf = self['settings']['detection'].get('time_factor', None)
ip_ = OMFITmdsValue(self.device, shot=self.shot, treename=treename, TDI=self.machine_data[self.device]['ip_ptname'])
if tf is None:
iptf = self.guess_time_factor(mdsvalue=ip_)
if iptf is None:
printw('Warning! Failed to guess time unit factor for Ip. Using 1.0.')
iptf = 1.0
iptf = tf
def get_mask(x_):
if is_device(self.device, 'DIII-D'):
mask_ = np.ones(len(x_), bool) # No masking needed for DIII-D
elif is_device(self.device, ['NSTX', 'NSTXU']):
ip = interp1d(ip_.dim_of(0) * iptf, ip_.data(), fill_value=0, bounds_error=False)(x_)
mask_ = abs(ip) > abs(ip).max() * 0.1
# Smooth the mask then round the result up to extend it outward past the ends of the shot & debounce it.
smo = 500.0
dt = np.mean(np.diff(x_))
ns = np.ceil(smo / dt).astype(int)
mask_ = np.ceil(smooth(mask_.astype(float), window_len=ns)).astype(bool)
if self.debug:
self['get_dalpha_debugging']['mask_x_'] = x_
self['get_dalpha_debugging']['get_dalpha_ip_'] = ip_
self['get_dalpha_debugging']['ip_ptname'] = self.machine_data[self.device]['ip_ptname']
self['get_dalpha_debugging']['ns'] = ns
self['get_dalpha_debugging']['dt'] = dt
self['get_dalpha_debugging']['smo'] = smo
self['get_dalpha_debugging']['mask0'] = copy.copy(mask_)
self['get_dalpha_debugging']['ip_interpolated'] = ip
mask_ = np.ones(len(x_), bool) # Not enough information to do anything better
return mask_
def get_dalpha_x_y(dfs):
dalpha = 0
dfs = tolist(dfs) # Make sure this is a list (even if it has one element)
datafail = True
x0 = None
xd = None
if self.attempt_sum_tdi_filterscopoes:
fs_tree = 'SPECTROSCOPY' if treename is None else treename
start = r'\{}::'.format(fs_tree)
mid = '+' + start
sum_tdi = start + mid.join(dfs)
printd(' sum_tdi = {}'.format(sum_tdi), topic='omfit_elm')
scratch['fs_sum'] = OMFITmdsValue(server=self.device, treename=fs_tree, shot=self.shot, TDI=sum_tdi)
if scratch['fs_sum'] is not None and scratch['fs_sum'].data() is not None and len(scratch['fs_sum'].data()) > 1:
xd = scratch['fs_sum'].dim_of(0)
dalpha = scratch['fs_sum'].data()
'Server side sum of filterscopes in TDI call appears to have succeeded! (shot {})'.format(self.shot),
datafail = False
sum_failed_skipped = False
'Server side sum of filterscopes failed. Perhaps one of them is bad? (shot {})'.format(self.shot), topic='omfit_elm'
printd('Gathering individual filterscope signals and summing client side...', topic='omfit_elm')
sum_failed_skipped = True
sum_failed_skipped = True
if sum_failed_skipped:
for fscope in dfs:
# Gather filterscope data
scratch[fscope] = OMFITmdsValue(server=self.device, treename=treename, shot=self.shot, TDI=fscope)
if scratch[fscope] is not None and fscope is not None:
if (not np.array_equal(scratch[fscope].data(), np.array([0]))) and scratch[fscope].check():
datafail = False
yd = scratch[fscope].data() # Named it yd to avoid confusion with y that occurs later
xd = scratch[fscope].dim_of(0) # times in ms to go with dalpha
if tf is None:
fstf = self.guess_time_factor(mdsvalue=scratch[fscope])
if fstf is None:
printw(f'Warning! Could not guess time unit factor for Filterscope {fscope}! Using 1.0.')
fstf = 1.0
fstf = tf
xd *= fstf
valid = (
(xd >= self['settings']['detection']['time_window'][0])
& (xd <= self['settings']['detection']['time_window'][1])
& (np.isfinite(xd))
& (np.isfinite(yd))
yd = yd[valid]
xd = xd[valid]
if not np.array_equal(xd, x0) and x0 is not None:
# Interpolate in case the filterscopes are set to sample at different rates
yd = interp1d(xd, yd, bounds_error=False, fill_value=0)(x0)
# Fill with zeros outside of interp range: this would be relevant if one filterscope
# sampled at a faster rate for part of the shot and then ran out of samples and
# measured nothing. Within the time range where the fast sampling one is active, it
# can contribute to total signal. Outside of this range, it will just not do
# anything. Hopefully the time when it turns off doesn't trip the ELM detector, but
# one false positive probably isn't that bad. It's rare that sampling rates for
# filterscopes are changed.
# This is guaranteed to go off on the first pass because x0 is initialized as None.
# After that, every new x is compared to the first one and interpolated if it
# doesn't match.
x0 = xd
# yd is now guaranteed to be on the same timebase as the yd from the last pass through
# the loop. Maybe this was trivial or maybe we had to interpolate.
mask_ = get_mask(xd)
yd *= mask_
if abs(yd.min()) > abs(yd.max()) * 5:
# The signal appears to be negative (this does not mean inverted ELMs in detachment!)
yd *= -1
dalpha += yd # dalpha is total light from all of the filterscopes selected by filterpicker
printd(' added {:} to the total dalpha signal'.format(fscope), topic='omfit_elm')
if self.debug:
self['get_dalpha_debugging']['dalpha'] = dalpha
if datafail:
if not self.quiet:
printe('Failed to retrieve D_alpha for {}#{} after trying {}.'.format(self.device, self.shot, dfs))
print('FS** signals can take time to process. ' 'If running on a very recent shot, try pcphd02 or pcphd03.')
print("Filterscope source can be set in root['SETTINGS']['PHYSICS']['filterScopeForELMs']")
print("or the fs keyword in this script.")
print('Must abort due to bad D_alpha.')
settings_backup = self['settings']
plots_backup = self['plots']
self['settings'] = settings_backup
self['plots'] = plots_backup
self['success'] = False
return None, None
data_factor = 1.0 / max(abs(dalpha))
# The data factor rescales D_alpha to avoid floating overflow stuff with large numbers. The factor should be
# chosen to make the result be close to 1. (anywhere between 10^-3 and 10^+3 should be fine)
dalpha *= data_factor
# Sawtooth modification: take -d()/dt of signal
if self.mode == 'sawtooth':
printd('SAWTOOTH MODE! Changing signal to -d(signal)/dt', topic='omfit_elm')
self['raw_sawtooth_input'] = copy.copy(dalpha)
dalpha = -deriv(xd, dalpha)
return xd, dalpha
filters_match = np.all(np.atleast_1d(self['dalpha_filters'] == fs))
except (ValueError, KeyError):
filters_match = False
if (
'dalpha_shot' in self
and self['dalpha_shot'] == self.shot
and 'dalpha_filters' in self
and filters_match
and 'dalpha_time' in self
and 'dalpha_y' in self
# We've already gathered and summed data for this setup. Reuse the result from the tree.
printd(' OMFITelm: Existing result for D_alpha are valid; no need to gather D_alpha again.', topic='omfit_elm')
# If you need the results, please do:
# x = self['dalpha_time']
# y = self['dalpha_y']
# We don't have an appropriate past result to use
printd('Need to gather dalpha x y...')
if 'dalpha_shot' in self:
tmp1 = self['dalpha_shot'] == self.shot
tmp1 = 'N/A'
if 'dalpha_filters' in self:
tmp2 = filters_match
tmp2 = 'N/A'
' OMFITelm: results of tests done that led to need to gather dalpha: '
'(must be all True to skip this part)'
+ repr(['dalpha_shot' in self, tmp1, 'dalpha_filters' in self, tmp2, 'dalpha_time' in self, 'dalpha_y' in self]),
x, y = get_dalpha_x_y(fs)
self['dalpha_time'] = x
self['dalpha_y'] = y
self['dalpha_filters'] = copy.copy(fs)
" OMFITelm: D_alpha data gathered for shot {} and stored in "
"self['dalpha_time'] and self['dalpha_y'].".format(self.shot),
t1 = time.time()
' OMFITelm: get_dalpha script completed in {} s with self.attempt_sum_tdi_filterscopoes = {}'.format(
t1 - t0, self.attempt_sum_tdi_filterscopoes
self.status['gathered'] = True
[docs] def mask_remove_disruptions(self, t):
Finds a mask that should be suitable for removing disruptions
Disruptions can come with a huge D_alpha flash that can ruin the scale of the normal ELMs
:param t: 1D float array
Relevant timebase in ms
:return: 1D bool array
ip = self.get_ip(t)
ip *= np.sign(np.nanmean(ip))
ipnorm = ip / np.percentile(ip, 95) # unitless
tsmo = 3 # ms We don't want a lot of smoothing; just enough to take the edge off the worst noise
ipnorms = butter_smooth(t, ipnorm, tsmo)
ip_rate = deriv(t, ipnorms) # ms^-1 # Large negative values are bad, but are any spikes
threshold = 1.0 / (20) # ms^-1
mask = (abs(ip_rate) < threshold).astype(float)
mask = (np.floor(butter_smooth(t, mask, 2) + 1e-12)).astype(bool) # Debounce and expand rejection
return mask
[docs] def detect(self, **kw):
Chooses which ELM detection method to use and calls the appropriate function
:param kw: keywords to pass to actual worker function
method = self['settings']['detection'].get('detection_method', 0)
# Check for basic data problems
method = self.check_detection_inputs(method)
if method == 0:
elif method == 1:
raise ValueError('Unrecognized ELM detection method: {}\nValid options are: [0, 1]'.format(method))
[docs] def jump_and_hold_detect(self, calc_elm_freq=True):
New detection algorithm. Focuses on finding just the start of each ELM (when D_alpha jumps up) at first,
then forcing the ELM state to be held until D_alpha drops back down again.
:param calc_elm_freq: bool
t0 = time.time()
# Phase one: setup ---------------------------------------------------------------------------------------------
printd('Running jump & hold method of ELM detection for shot {:}...'.format(self.shot), topic='omfit_elm')
if self.debug:
self['detect_debugging'] = {}
if tokamak(self.device) not in self.supported_devices:
'WARNING: Current server/device {} not found in list of supported devices. '
'ELM detection is likely to fail! \nSuggestion: pick a device from the following list:{}'.format(
self.device, '\n '.join([''] + self.supported_devices)
# Tuning for finding the start
t_win = self['settings']['detection']['find_jump_time_window']
threshold = self['settings']['detection']['find_jump_threshold']
presmo = float(self['settings']['detection']['find_jump_presmo'])
debounce_setting = self['settings']['detection']['find_jump_debounce']
rollback_fraction = 0.85 # Allow fewer rollback steps
method = 2 # Sub-method within jump and hold. Method 2 seems to be better.
lag_butter = False # This has no effect because when presmo = 0 and method = 2.
less_rollback = True
# Get D_alpha
t = self['dalpha_time']
y = self['dalpha_y']
if t is None or y is None:
if not self.quiet:
print('Aborting ELM detection due to failure of D_alpha gathering')
return # Aborted during D_alpha gathering
t1 = time.time()
# Phase two: ELM start detection -------------------------------------------------------------------------------
# Now get a flag for the initial phase of each ELM. We'll catch the tail later.
ys = butter_smooth(t, y, timescale=presmo, laggy=lag_butter) if presmo > 0 else y # Smooth for noise reduction
dt = np.median(np.diff(t))
n_win = int(np.ceil(t_win / dt))
# Get the difference between each sample and the maximum in a small neighborhood
y1 = np.concatenate((np.repeat(np.NaN, n_win), ys, np.repeat(np.NaN, n_win)))
y2 = np.vstack(np.array([np.roll(y1, -i) for i in range(n_win)]))
with warnings.catch_warnings():
warnings.filterwarnings("ignore", message="All-NaN slice encountered")
localmax = np.nanmax(y2, axis=0)[n_win:-n_win]
dy0 = localmax - ys
# This thing is always going to lead the ELM because of the roll we used. Roll it back to align properly
nrb = int(np.round(t_win * (1 - threshold) * rollback_fraction / dt)) if less_rollback else n_win
dy = np.roll(np.concatenate((np.repeat(0, nrb), dy0, np.repeat(0, nrb))), nrb)[nrb:-nrb]
no_disr_mask = self.mask_remove_disruptions(t)
self['elm_detection_details'] = dict(localmax=localmax, dy=dy, n_win=n_win, nrb=nrb, ys=ys, no_disr_mask=no_disr_mask)
if method == 1:
# Settings for method 1 haven't been linked to proper settings dictionary because method 1 is not used
# unless you hack, anyway. So, you can hack these, too. d4r3 2 b 1337
smo = 0.25 # ms This one can't be zero
postsmo = 0.25 # ms
min_len = 0.35 # ms
n_min_len = int(np.ceil(min_len / dt))
dys = butter_smooth(t, dy, timescale=smo, laggy=lag_butter)
ddys = dy - dys
# Smooth for noise reduction
ddyss = butter_smooth(t, ddys, timescale=postsmo, laggy=lag_butter) if postsmo > 0 else ddys
ddyss[ddyss < 0] = 0 # The negative part is pure noise. This isn't necessary, but it makes me feel better.
# Find maximum in small neighborhood again, this time to hold high samples in order to debounce the flag
ddys1 = np.concatenate((np.repeat(np.NaN, n_min_len), ddyss, np.repeat(np.NaN, n_min_len)))
ddys2 = np.vstack(np.array([np.roll(ddys1, i) for i in range(n_min_len)]))
ddys3 = np.nanmax(ddys2, axis=0)[n_min_len:-n_min_len]
# Here's the initial ELM flag. It should catch the start of each ELM.
elm_flag = ddys3 / max(ddys3[no_disr_mask]) > threshold
self['elm_detection_details'].update(dict(dys=dys, ddys=ddys, ddyss=ddyss, ddys3=ddys3))
else: # method == 2
with warnings.catch_warnings():
# It's okay to underflow during this division. ndy = 0 just means there's no ELM starting: fine.
warnings.filterwarnings("ignore", message="underflow encountered in divide")
warnings.filterwarnings("ignore", message="underflow encountered in true_divide")
ndy = dy / max(dy[no_disr_mask])
elm_flag = ndy > threshold
self['elm_detection_details'].update(dict(ndy=ndy)) # , local_scale=local_scale))
if debounce_setting > 1:
# Do some smoothing to eliminate one or two element periods of ELM flag or not ELM flag
# We don't want ELM flag to look like this: 0000101011111010000
# Instead, we would like it to be like this: 0000001111111100000
# The filterscopes sample so quickly that getting an edge wrong by one sample is inconsequential,
# but getting a bounce (0000101111 instead of 0000011111) will count an extra ELM that's not real.
debounce = int(copy.copy(debounce_setting) // 2) * 2 + 1 # Force it to be odd
pad = int(debounce // 2)
# Pad with zeros so we don't roll in junk
elm_flag_b = np.append(np.append(np.zeros(pad), np.atleast_1d(elm_flag).astype(int)), np.zeros(pad))
elm_flag_b2 = elm_flag_b * 0
# Average the neighborhood of (debounce) points
for i in range(-pad, pad + 1):
elm_flag_b2 += np.roll(elm_flag_b, i) / (2.0 * pad + 1)
elm_flag = np.round(elm_flag_b2[pad:-pad]).astype(bool) # Strip off padding, round off, & convert to bool
printd(' Debounced', topic='omfit_elm')
# Sanitize the flag
elm_flag2 = self.sanitize_elm_flag(np.atleast_1d(elm_flag))
t2 = time.time()
# Phase tree: force the ELM state to hold until D_alpha drops back down again ----------------------------------
self.elm_hold_correction(t, y, elm_flag2)
t3 = time.time()
# Phase four: process th ELM flag to get phase, period, etc. ---------------------------------------------------
self.calculate_elm_detection_quantities(t, elm_flag2)
# Save details
self['elm_detection_details'].update(dict(elm_flag=elm_flag2, timestamp=datetime.datetime.now()))
self.detection_settings_used = copy.deepcopy(self['settings']['detection'])
self.setdefault('plots', SortedDict())['detection'] = function_to_tree(self.elm_detection_plot, self)
# self['plots']['detection2'] = OMFITpythonTask( # Doesn't give access to defaultVars GUI
# 'detection.py', fromString="parent._OMFITparent.elm_detection_plot()")
except ImportError:
type_, val_, traceback_ = sys.exc_info()
printd(' Failed to load plot script to tree with function_to_tree.', topic='omfit_elm')
if self.debug:
' Exception type = {type:}\n'
' Exception value = {value:}\n'
' Exception traceback = \n{trace:}'.format(type=type_, value=val_, trace=traceback_),
# This won't work outside of OMFIT and we don't want it to
printd('ELM detection finished for shot {:}'.format(self.shot), topic='omfit_elm')
if calc_elm_freq:
printd('Finished with extra ELM-related calculations.', topic='omfit_elm')
self.status['detected'] = True
t4 = time.time()
'New detection method took (s): total: {total:0.3f} setup: {setup:0.3f}, find start: {start:0.3f}, '
'find end: {end:0.3f}, final processing: {process:0.3f}'.format(
total=t4 - t0, setup=t1 - t0, start=t2 - t1, end=t3 - t2, process=t4 - t3
[docs] def classic_edge_detect(self, report_profiling=False, calc_elm_freq=True):
Uses a Difference of Smooths (generalized from difference of gaussians edge detection) scheme to find the edges
of D_alpha bursts. It also runs a DoS scheme on the derivative of D_alpha w.r.t. time.
This allows time points (in the D_alpha series) to be flagged as soon as the ELM starts: it doesn't just get
the tip, but instead it takes the whole thing.
:param calc_elm_freq: bool
Run ELM frequency calculation
:param report_profiling: bool
Reports time taken to reach / time taken between checkpoints in the code.
Used to identify bottlenecks and support optimization.
t0 = time.time()
printd('Running ELM detection for shot {:}...'.format(self.shot), topic='omfit_elm')
if self.debug:
self['detect_debugging'] = {}
if tokamak(self.device) not in self.supported_devices:
'WARNING: Current server/device {} not found in list of supported devices. '
'ELM detection is likely to fail! \nSuggestion: pick a device from the following list:{}'.format(
self.device, '\n '.join([''] + self.supported_devices)
# Shortcut
elmdet1 = self['settings']['detection']
elmdet = elmdet1['{:}_tuning'.format(self['settings']['detection']['smoother'])]
# There are two smooths in the difference of gaussian scheme; one is typically 5-6x stronger than the other
printd(' elm det: mild_smooth', elmdet['mild_smooth'], topic='omfit_elm')
' elm det: threshold_on_difference_of_smoothed_derivatives_plus',
' elm det: threshold_on_difference_of_smoothed_derivatives_minus',
t1 = time.time()
def smoother(sx, sy, s): # this function selects which actual smoothing function to use and calls it
t1_ = time.time()
if self['settings']['detection']['smoother'] == 'gauss_smooth':
ys = smooth_by_convolution(sy, xi=sx, window_size=s * 4, window_function='gaussian')
elif self['settings']['detection']['smoother'] == 'asym_gauss_smooth':
ys = asym_gauss_smooth(sx, sy, s, elmdet['gauss_center_lag'], elmdet['leading_side_width_factor'])
elif self['settings']['detection']['smoother'] == 'rc_smooth':
ys = pcs_rc_smooth(sx, sy, s)
elif self['settings']['detection']['smoother'] == 'fft_smooth':
ys = fft_smooth(sx, sy, s)
elif self['settings']['detection']['smoother'] == 'combo':
ys1 = pcs_rc_smooth(sx, sy, s)
ys2 = smooth_by_convolution(sy, xi=sx, window_size=s * 4, window_function='gaussian')
ys = (ys1 + ys2) / 2.0
elif self['settings']['detection']['smoother'] == 'butter_smooth':
ys = butter_smooth(sx, sy, cutoff=1.0 / (s * np.pi)) # s * pi gives s the same feel as in other functions
raise ValueError(
"['ELM_detection']['smoother'] to one of the following: Valid options for smoothing function: "
"'gauss_smooth', 'rc_smooth', 'asym_gauss_smooth', 'combo', fft_smooth, butter_smooth".format(self.device_group)
t2_ = time.time()
printd(' time taken = {} s'.format(t2_ - t1_), topic='omfit_elm')
return ys
t2 = time.time()
# Get D_alpha
x = self['dalpha_time']
y = self['dalpha_y']
if x is None or y is None:
if not self.quiet:
print('Aborting ELM detection due to failure of D_alpha gathering')
return # Aborted during D_alpha gathering
t3 = time.time()
# Smooth D_alpha with two different timescales and take the difference
s1 = smoother(x, y, elmdet['mild_smooth']) # smooth 1 (mild smooth is in ms)
s2 = smoother(x, y, elmdet['mild_smooth'] * elmdet['heavy_smooth_factor']) # smooth 2
ds = s1 - s2 # This is the difference of Gaussians which is central to the edge detection scheme
printd(' s1, s2, and ds calculated for shot {:}'.format(self.shot), topic='omfit_elm')
t4 = time.time()
# When mild-heavy is greater than this threshold, it must be during ELM (relative to max diff)
# Turn the relative threshold into an absolute threshold to which data can be compared
no_disr_mask = self.mask_remove_disruptions(x)
thresh1 = elmdet['threshold_on_difference_of_smooths'] * max(ds[no_disr_mask])
dyt = ds - thresh1 # Difference between ds and the threshold
with warnings.catch_warnings():
# It's okay to underflow during this division.
warnings.filterwarnings("ignore", message="underflow encountered in divide")
warnings.filterwarnings("ignore", message="underflow encountered in true_divide")
d1 = np.gradient(s1, x) # Calculate time derivative of the smoothed D_alpha trace.
printd(' gradient', topic='omfit_elm')
if elmdet['mild_smooth_derivative_factor'] > 0:
# Although the derivative is of smoothed data, it may still have too much noise & require more smoothing.
d1 = smoother(x, d1, elmdet['mild_smooth'] * elmdet['mild_smooth_derivative_factor'])
# Smooth the time derivative again. Now there are two versions of the smooth so we can take the difference.
d2 = smoother(x, d1, elmdet['mild_smooth'] * elmdet['heavy_smooth_derivative_factor'])
# Here's the difference of the smoothed derivatives.
# We can now extend the difference of Gaussians technique to the derivative.
dd = d1 - d2
printd(' d1, d2, and dd calculated for shot {:}'.format(self.shot), topic='omfit_elm')
t5 = time.time()
# When mild(der)-heavy(der) is greater than a threshold, it must be on one of the edges of an ELM
# Translate relative thresholds into absolute thresholds
thresh2 = elmdet['threshold_on_difference_of_smoothed_derivatives_plus'] * max(dd[no_disr_mask])
thresh3 = elmdet['threshold_on_difference_of_smoothed_derivatives_minus'] * min(dd[no_disr_mask])
# When the diff of derivs is positive but the less smoothed derivative is still negative, we're in danger of
# getting a false positive, so make the threshold higher
if elmdet['d_thresh_enhance_factor'] != 1.0 or elmdet['neg_d_thresh_enhance_factor'] != 1.0:
thresh2b = thresh2 * elmdet['d_thresh_enhance_factor'] # Enhanced +threshold on dd when ds is low
thresh3b = thresh3 * elmdet['neg_d_thresh_enhance_factor'] # Enhanced -threshold on dd when ds is high
thresh2 += d1 * 0
thresh3 += d1 * 0
thresh2[ds < (-thresh1)] = thresh2b
thresh3[ds > thresh1] = thresh3b
dd2 = dd - thresh2 # Compare difference of smoothed derivatives to threshold
dd3 = thresh3 - dd
# Make a flag to mark where the ELMs are - this is full length so saving it will be slow
elm_flag = (dyt > 0) | (dd2 > 0) | (dd3 > 0)
printd(' elm_flag calculated for shot {:}'.format(self.shot), topic='omfit_elm')
t6 = time.time()
if elmdet['debounce'] > 1:
# Do some smoothing to eliminate one or two element periods of ELM flag or not ELM flag
# We don't want ELM flag to look like this: 0000101011111010000
# Instead, we would like it to be like this: 0000001111111100000
# The filterscopes sample so quickly that getting an edge wrong by one sample is inconsequential,
# but getting a bounce (0000101111 instead of 0000011111) will count an extra ELM that's not real.
debounce = int(copy.copy(elmdet['debounce'])) / 2 * 2 + 1 # Force it to be odd
pad = int(debounce // 2)
elm_flag_b = np.append(np.append(np.zeros(pad), elm_flag), np.zeros(pad)) # Pad with zeros so we don't roll in junk
elm_flag_b2 = elm_flag_b * 0
# Average the neighborhood of (debounce) points
for i in range(-pad, pad + 1):
elm_flag_b2 += np.roll(elm_flag_b, i) / (2 * pad + 1)
elm_flag = np.round(elm_flag_b2[pad:-pad]).astype(bool) # Strip off padding, round off, & convert to bool
printd(' Debounced', topic='omfit_elm')
# Calculate useful quantities related to ELMs
elm_flag2 = self.sanitize_elm_flag(elm_flag)
t7 = time.time()
if elmdet1['hold_elm_flag_until_low_dalpha']:
self.elm_hold_correction(x, y, elm_flag2)
t8 = time.time()
self.calculate_elm_detection_quantities(x, elm_flag2)
t9 = time.time()
# Save details
self['elm_detection_details'] = dict(
self.detection_settings_used = copy.deepcopy(self['settings']['detection'])
self.setdefault('plots', SortedDict())['detection'] = function_to_tree(self.elm_detection_plot, self)
# self['plots']['detection2'] = OMFITpythonTask( # Doesn't give access to defaultVars GUI
# 'detection.py', fromString="parent._OMFITparent.elm_detection_plot()")
except ImportError:
type_, val_, traceback_ = sys.exc_info()
printd(' Failed to load plot script to tree with function_to_tree.', topic='omfit_elm')
if self.debug:
' Exception type = {type:}\n'
' Exception value = {value:}\n'
' Exception traceback = \n{trace:}'.format(type=type_, value=val_, trace=traceback_),
# This won't work outside of OMFIT and we don't want it to
printd('ELM detection finished for shot {:}'.format(self.shot), topic='omfit_elm')
if calc_elm_freq:
printd('Finished with extra ELM-related calculations.', topic='omfit_elm')
t10 = time.time()
self.status['detected'] = True
if report_profiling:
'Time required to reach checkpoints in ELM_detection script with smooth_funct = {}: (shot {})'.format(
self['settings']['detection']['smoother'], self.shot
times = [t0, t1, t2, t3, t4, t5, t6, t7, t8, t9, t10]
names = [
'function def',
'initial smoothing',
'derivatives & smo',
'define flag',
for i, t in enumerate(times):
print('t{}-t0 = {} s'.format(i, t - t0))
for i in range(1, len(times)):
print('t{}-t{} = {} ms {}'.format(i, i - 1, (times[i] - times[i - 1]) * 1000, names[i]))
print('total time = {} ms'.format((t10 - t0) * 1000))
'Old detection method took (s): total: {total:0.3f} setup: {setup:0.3f}, find start: {start:0.3f}, '
'find end: {end:0.3f}, final processing: {process:0.3f}'.format(
total=t10 - t0, setup=t3 - t0, start=t7 - t3, end=t8 - t7, process=t10 - t8
[docs] @staticmethod
def sanitize_elm_flag(elm_flag):
Force ELM flag to start and stop in non-ELM state
:param elm_flag: array (bool-like 1s & 0s or Trues & Falses)
:return: Sanitized ELM flag that follows the rules
printd(' OMFITelm.sanitize_elm_flag() starting...', topic='omfit_elm')
elm_flag2 = elm_flag.astype(int) # Copy and a convert bool to int
if elm_flag2[0] == 1:
# Assuming the shot starts in a non-ELMing state and force the data to comply with this assumption.
ww = tolist(np.where(elm_flag2 == 0)[0])[0] # First element that's 0
printd(' Need to fix problem w/ starting during ELM. Index of first non ELM slice: ww = {}'.format(ww), topic='omfit_elm')
elm_flag2[0:ww] = 0 # Make any starting ELM state go away. If D_alpha measurement starts
# during an ELM, we'll miss the first ELM. This is not a physically
# realistic scenario, so it's okay
# If we don't do this cleanup, then we'll have more ELM ends than ELM starts. Uneven numbers here will ruin
# everything. This cannot be allowed.
if elm_flag2[-1] == 1:
# Also assume that we end in a non-ELMing state. This assumption will be enforced so that we have the same
# number of ELM starts as ELM ends.
ww = tolist(np.where(elm_flag2 == 0)[0])[-1] # Last element that's 0
printd(' Need to fix problem w/ ending during ELM. Index of last non ELM slice: ww = {:} .'.format(ww), topic='omfit_elm')
elm_flag2[ww + 1 :] = 0
# Need matching numbers of ELM starts and ELM ends, as noted above.
return elm_flag2
[docs] def elm_hold_correction(self, x, y, elm_flag2):
Forces ELM flag to stay in ELMy state (set) until D_alpha level returns to low (close to pre-ELM) level
:param x: 1D float array
Time base for data
:param y: float array matching shape of x
D_alpha measurements or similar signal
:param elm_flag2: int array matching shape of x
Flag for during-ELM state (1) vs. no-ELM state (0)
:return: Adjusted ELM flag
ta0 = time.time()
printd(' OMFITelm.elm_hold_correction() starting...', topic='omfit_elm')
elmdet1 = self['settings']['detection']
# Get the local minimum within a small window before each sample
no_disr_mask = self.mask_remove_disruptions(x)
dx = np.median(np.diff(x))
nwin = int(np.ceil(elmdet1['hold_elm_min_finding_time_window'] / dx)) # Time window size in samples
n_hold_timeout = int(np.ceil(elmdet1['hold_elm_timeout'] / dx))
y1 = np.concatenate((np.repeat(y[no_disr_mask].max(), nwin), y, np.repeat(y[no_disr_mask].max(), nwin))) # Pad so can roll nicely
y2 = np.vstack(np.array([np.roll(y1, i) for i in range(nwin)])) # Make a stack of rolls to different positions
localmins = y2.min(axis=0)[nwin:-nwin] # Each element is min from window from t-adv_t_win to t
# Find start and end of each ELM
starts = np.where(np.append(0, np.diff(elm_flag2) == 1))[0]
ends = np.where(np.append(0, np.diff(elm_flag2) == -1))[0]
if self.debug:
self['detect_debugging']['starts'] = starts
self['detect_debugging']['ends'] = ends
self['detect_debugging']['elm_det_localmins'] = localmins
self['detect_debugging']['elm_det_localmaxs'] = np.array([np.NaN] * len(starts))
self['detect_debugging']['elm_det_threshs'] = np.array([np.NaN] * len(starts))
self['detect_debugging']['elm_det_adv_starts'] = starts
self['detect_debugging']['elm_det_adv_ends_pre'] = ends
self['detect_debugging']['elm_det_adv_hold_until'] = np.zeros(len(starts), int)
self['detect_debugging']['elm_det_changed'] = np.array([np.NaN] * len(starts))
# Revise ELM flag
unhandled = []
for i in range(len(starts)):
imax = y[starts[i] : ends[i]].argmax() + starts[i]
localmax = y[imax]
localmin = localmins[starts[i]]
thresh = (localmax - localmin) * elmdet1['hold_elm_low_dalpha_threshold'] + localmin
hold0 = np.where(y[imax:] < thresh)[0]
if len(hold0):
hold_until = hold0[0] + imax
hold_limit = starts[i] + n_hold_timeout
hold_until = min([hold_until, hold_limit])
saveit = elm_flag2[hold_until - 1]
elm_flag2[starts[i] : hold_until] = 1
unhandled += [(i, starts[i], x[starts[i]])]
saveit = hold_until = 0
if self.debug:
self['detect_debugging']['elm_det_localmaxs'][i] = localmax
self['detect_debugging']['elm_det_threshs'][i] = thresh
self['detect_debugging']['elm_det_adv_hold_until'][i] = hold_until
self['detect_debugging']['elm_det_changed'][i] = elm_flag2[hold_until - 1] - saveit
self['detect_debugging']['elm_det_hold_failed'] = unhandled
ta1 = time.time()
' Time required for correction to initial ELM detection (hold flag until low D_alpha): {} s'.format(ta1 - ta0),
return elm_flag2
[docs] def calculate_elm_detection_quantities(self, x, elm_flag2, no_save=False, shot=None):
Finish detection by compressing the ELM flag to 4 points per ELM and calculating quantities like ELM phase
Separate from .detect() to make room for different detection algorithms which will all finish the same way.
Could also be used for convenience with an externally calculated ELM flag by passing in your own ELM detection
results (and using no_save=True to avoid disturbing results calculated by the class, if you like).
:param x: float array
Time base in ms
:param elm_flag2: int array matching length of x
Flag indicating when ELMs are happening (1) or not (0)
:param no_save: bool
Disable saving of results; return them in a dictionary instead
:param shot: [optional]
Only used in announcements. If None, self.shot will be used. If you are passing in some other stuff and
don't want inconsistent printd statements, you can pass in a different thing for shot.
It would traditionally be an int, but it could be a string or something since it only gets printed here.
:return: None or dict
Returns results in a dict if no_save is set. Otherwise, results are just saved into the class and nothing is
printd(' OMFITelm.calculate_elm_detection_quantities() starting...', topic='omfit_elm')
shot = self.shot if shot is None else shot
big_time = 2e6 # (ms) A time so big that it will always be past the end of the time array.
# Difference in ELM flag detects when the flag changes between ELM and no-ELM and marks with 1 and -1.
# Leading zero inserted to make dimensions match.
def2 = np.insert(np.diff(elm_flag2), 0, 0)
w = np.where(def2 == 1)[0] # Entered ELM state
w2 = np.where(def2 == -1)[0] # Exited ELM state
spacer = np.mean(np.diff(x)) / 50.0 # This should be a small number, much smaller than other timescales of
# interest, but it can't get so small that it underflows because then
# there will be more than one point with the same time value, and that
# will confuse the interpolation routine.
if self.debug:
self['detect_debugging']['elm_det_w'] = w
self['detect_debugging']['elm_det_w2'] = w2
# The times in newtime are when the ELM starts (x[w]), an
# infinitesimal time later (x[w]+spacer), when the ELM ends (x[w2]),
# and another infinitesimal time after that (x[w2]+spacer). This
# lets us draw the nearly vertical line at the time when we go from
# being in an ELM to not. Instead of storing all this ELM information
# on the filterscope time base, we only need to store when the ELMs
# start and stop, and we can interpolate everything else from there
# (all the quantities we're dealing with here are linear, such as
# time since last ELM, etc.)
newtime = [x[w], x[w] + spacer, x[w2], x[w2] + spacer]
# New ELM flag for storing ELM on/off without hogging all
# the space used by elm_flag. Whereas elm_flag has as
# many elements as the original d_alpha, new_elm_flag only
# has four elements per ELM.
new_elm_flag = [w * 0, w * 0 + 1, w2 * 0 + 1, w2 * 0]
# ELM phase is ramps from 0 to 1 between ELMs, then jumps from 1 to
# -1 when an ELM starts. So at x[w], ELM phase has reached 1. At
# x[w]+spacer, it has changed to -1. At x[w2], the ELM is just ending
# and ELM phase has recovered to 0. At x[w2]+spacer, it is still 0.
elmphase = [w * 0 + 1, w * 0 - 1, w2 * 0, w2 * 0]
printd(' Calculated elmphase for {:}'.format(shot), topic='omfit_elm')
# Now we can figure out the the local ELM period and related quantities, which are useful in some filters.
prev_elm_end = np.roll(x[w2], 1) # The roll is so that this corresponds to the PREVIOUS ELM
prev_elm_end[0] = 0
prev_elm_start = np.roll(x[w], 1)
prev_elm_start[0] = 0
next_elm_start = np.roll(x[w], -1)
next_elm_start[-1] = big_time
since_last_elm = [x[w] - prev_elm_end, w * 0, w * 0, w * 0]
since_last_elm_neg = [x[w] - prev_elm_end, x[w] - x[w2], w * 0, w * 0]
# until_next_elm = [w*0, w*0, w*0, next_elm_start-x[w2]]
until_next_elm = [w * 0, w * 0, x[w] - x[w2], next_elm_start - x[w2]]
# Time from the end of one ELM to the start of the next one
local_inter_elm_period = [x[w] - prev_elm_end, x[w2] - x[w], x[w2] - x[w], next_elm_start - x[w2]]
# Time from the start of one ELM to the start of the next one
local_total_elm_period = [x[w] - prev_elm_start, next_elm_start - x[w], next_elm_start - x[w], next_elm_start - x[w]]
printd(' Calculated elm prev/next/since/period {:}'.format(shot), topic='omfit_elm')
# Format the arrays properly. They come out as a list of four arrays,
# with an array of start times, start times+epsilon, end times, and
# end times + epsilon. This has to be converted into a 1D array vs.
# time.
newtime = np.reshape(np.transpose(newtime), -1)
elmphase = np.reshape(np.transpose(elmphase), -1)
new_elm_flag = np.reshape(np.transpose(new_elm_flag), -1)
since_last_elm = np.reshape(np.transpose(since_last_elm), -1)
since_last_elm_neg = np.reshape(np.transpose(since_last_elm_neg), -1)
until_next_elm = np.reshape(np.transpose(until_next_elm), -1)
local_inter_elm_period = np.reshape(np.transpose(local_inter_elm_period), -1)
local_total_elm_period = np.reshape(np.transpose(local_total_elm_period), -1)
# Now pad all quantities so that ELM phase starts at 0
start_time = 0 # ms
first_elm_start = newtime[0]
newtime = np.append(start_time, newtime)
elmphase = np.append(0, elmphase)
new_elm_flag = np.append(0, new_elm_flag)
since_last_elm = np.append(0, since_last_elm)
since_last_elm_neg = np.append(0, since_last_elm_neg)
until_next_elm = np.append(newtime[1] - start_time, until_next_elm)
local_inter_elm_period = np.append(first_elm_start, local_inter_elm_period)
local_total_elm_period = np.append(first_elm_start, local_total_elm_period)
if no_save:
return dict(
# Store the results
self['elm_phase'] = elmphase
self['elm_flag'] = new_elm_flag
self['inter_elm_period'] = local_inter_elm_period
self['total_elm_period'] = local_total_elm_period
self['since_last_elm'] = since_last_elm
self['since_last_elm_neg'] = since_last_elm_neg
self['until_next_elm'] = until_next_elm
self['elm_time'] = newtime
self['success'] = True
[docs] def calc_frequency(self, method=None, plot_result=False):
Calculates ELM frequency
:param method: None or int
None: use value in settings
int: manual override: ID number of method you want to use
- 0: very simple: 1 / local period
- 1: simple w/ smooth
- 2: process period then invert
- 3: method 0 then interpolate and smooth
:param plot_result: bool
Make a plot at the end
t0 = time.time()
printd('Starting ELM_frequency calculation...', topic='omfit_elm')
method = self['settings']['misc']['elm_freq_method'] if method is None else method
if 'total_elm_period' not in self:
' ELM_detection needs to be done prior to ELM frequency calculation. Results not ready: '
'"total_elm_period" in self = {}.'.format('total_elm_period' in self),
t = self['elm_time']
period = self['total_elm_period']
# Define smoothing timescale in ms
smo = np.median(period) * 3 # Smooth over about 3 ELM periods
if method == 1:
printd('ELM frequency method 1: simple w/ smooth', topic='omfit_elm')
# This method doesn't give enough weight to ELM free periods
fr = 1.0 / period
fr = smooth(fr, 4 * 5) # There are 4 points per ELM. 20 point smoothing smooths over 5 ELMs.
tf = t
elif method == 2:
printd('ELM frequency method 2: process period then invert', topic='omfit_elm')
# This method is very similar to what the PCS does in real time, the only differences being that this one
# has access to offline ELM detection, which is potentially better, and it can use a forward-backward
# smoothing filter.
# Get the previous period
prev_period = np.roll(period, 4) # There are 4 points per ELM
prev_period[0] = 1e3
# Calculate time since the START of the last ELM (the value in the tree is since the END of the last ELM).
it_start0 = np.arange(len(t))
it_start = ((it_start0 - 1) // 4).astype(np.int64) * 4 + 1
it_start[0] = 0
t_start = t[it_start]
since_start = t - t_start
# Interpolate onto a nicer time base
cut = 4 + 1 # Cut off large period at the start of the shot (it doesn't mean anything anyway)
t2 = self['dalpha_time']
pp = interp1d(t[cut:], prev_period[cut:], bounds_error=False, fill_value=0)(t2)
s = interp1d(t[cut:], since_start[cut:], bounds_error=False, fill_value=0)(t2)
# Don't let time since ELM be less than the previous ELM period. This prevents division by small numbers and
# reports of high ELM frequency right after ELMs.
s2 = copy.copy(s)
s2[s < pp] = pp[s < pp]
# Cut off the extra large since value at the start of the shot to prevent long periods from smoothing into
# the start of ELMing.
s2[s2 > 1000] = 1000.0
# Smooth before taking reciprocal to get frequency.
s3 = butter_smooth(t2, s2, cutoff=1.0 / (smo * np.pi))
# Calculate frequency
fr = 1.0 / s3
tf = t2 # Record the time relevant for this calculation of ELM frequency
elif method == 3:
printd('ELM frequency method 3: take very simple method 0 then interpolate and smooth', topic='omfit_elm')
fr0 = 1.0 / period
t2 = self['dalpha_time']
fr = interp1d(t, fr0, bounds_error=False, fill_value=0)(t2)
fr_norm = np.nanmean(fr)
fr_norm = 1 if fr_norm == 0 or np.isnan(fr_norm) else fr_norm
with warnings.catch_warnings():
# It's okay to underflow during this division.
warnings.filterwarnings("ignore", message="underflow encountered in multiply")
fr = butter_smooth(t2, fr / fr_norm, cutoff=1.0 / (smo * np.pi)) * fr_norm
tf = t2
else: # method == 0
printd('ELM frequency method 0: very simple', topic='omfit_elm')
# This method could be noisy because of no smoothing
fr = 1.0 / period
tf = t
fr *= 1000.0 # Convert kHz to Hz
self['elm_freq_time'] = tf
self['elm_freq'] = fr
self['elm_freq_method'] = copy.copy(method)
if plot_result:
t1 = time.time()
self.setdefault('plots', SortedDict())['frequency'] = function_to_tree(self.elm_frequency_plot, self)
except ImportError:
printd(' Failed to load plot script to tree with function_to_tree', topic='omfit_elm')
# This won't work outside of OMFIT and we don't want it to
printd('ELM frequency calculation took {} ms'.format((t1 - t0) * 1000), topic='omfit_elm')
self.status['calcs'] = True
[docs] def filter(self, times_to_filter, cer_mode=False, stimes=0.0, apply_elm_filter=True, debug=None, on_failure=None):
Use ELM detector & ELM filtering settings to determine whether each element in an array of times is good or bad.
:param times_to_filter: numeric 1d array or a list of such arrays
For most signals (like Thomson scattering data): a single 1D array of times in ms. For CER: a list of
1D arrays, one for each CER channel.
:param cer_mode: bool
Assume times_to_filter is a list of lists/arrays, one entry per channel (loop through elements of
times, each one of which had better itself have several elements)
:param stimes: float -OR- array or list of arrays matching dimensions of times_to_filter
Averaging time window for CER. This typically set to 2.5 ms, but it can vary (read it from your data if
:param apply_elm_filter: bool
Debugging option: if this is set False, the script will go through its pre-checks and setup without actually
doing any ELM-filtering.
:param debug: bool [optional]
Override class debug flag for just the filtering run. Leave at None to use self.debug instead of overriding.
:param on_failure: str [optional]
Override class on_failure flag. Sets behavior when filtering is not possible.
'pass': Pass all data (elm_okay = 1 for all)
'exception': raise an exception. (default for unrecognized)
:return: array of bools or list of arrays matching shape of times_to_filter
Flag indicating whether each sample in times_to_filter is okay according to ELM filtering rules
debug = self.debug if debug is None else debug
if debug:
self.setdefault('debug_elm_filter', dict())
# Sanitize
times_to_filter = np.atleast_1d(times_to_filter)
# Find number of timing channels being used. CER has potentially different timing for each channel, so we treat
# each channel on its own timing. Thomson has the same timing for every spatial chord on the same system, so
# it's done as if it has only one timing channel.
dtf = dimens(times_to_filter)
if dtf == 1:
# This is Thomson-like. ELM filter will be put inside a loop over Thomson systems (probably only one or two
# are used at a time; maybe all three)
nchan = 1
# This is CER-like. Times to filter is a list of timing arrays. The length of the top level list is the
# number of channels.
nchan = len(times_to_filter)
printd('nchan = {}'.format(nchan), topic='omfit_elm')
# Warn the user if they made a weird setup (hopefully it works, anyway)
if nchan == 1 and cer_mode:
"WARNING: ELM filter on shot {:} detected only one channel, but this is CER mode. More than one "
"channel expected. Do you know what you're doing? Let's see if this will work.".format(self.shot)
if nchan != 1 and not cer_mode:
"WARNING: ELM filter on shot {:} detected more than one channel, but CER mode was not enabled. "
"This could be a problem.".format(self.shot)
# Process stimes
if np.all(np.atleast_1d(stimes == 0)):
# Turn off strict ELM filtering for stime=0. There is no averaging window so we don't have to worry about
# how we treat the averaging window.
strict = False
strict = self['settings']['filtering']['CER_entire_window_must_pass_ELM_filter']
dst = dimens(stimes)
printd('dimens(stimes) = {}'.format(dst), topic='omfit_elm')
if dst == 0:
printd('stimes input as scalar or single element vec: {}'.format(stimes), topic='omfit_elm')
# A scalar was provided. Make it be shaped like times_to_filter so they can zip together
if nchan > 1: # CER-like: times_to_filter is a list of timing arrays
st0 = []
for t in times_to_filter:
st = stimes + t * 0
st0 += [st]
stimes = st0
printd('stimes transformed from scalar into list of arrays vs time (CER-like)', topic='omfit_elm')
else: # Thomson-like: times_to_filter is a single timing array
stimes += times_to_filter * 0
printd('stimes transformed from scalar into array vs time (Thomson-like)', topic='omfit_elm')
elif dst == 1:
printd('stimes input as vector', topic='omfit_elm')
if nchan == 1:
# Everything is fine. stimes and times_to_filter are both Thomson-like. No action required. If they have
# different lengths, the setup is wrong somewhere else and cannot be salvaged here.
printd('times_to_filter and stimes both were 1D. No transformation performed on stimes', topic='omfit_elm')
# This is a problem. stimes and times_to_filter do not match. This is probably going to be fatal,
# but there is ONE thing we can try.
# The only way this works is if stime is constant with time and varies by channel.
# Assume that the user gave us stime vs. channel with no time dependence.
printw('WARNING: times_to_filter is CER-like but stimes has only one dimension.')
if len(tolist(stimes)) != len(times_to_filter):
# This should just fail if the lengths aren't the same. If the lengths aren't the same, then our
# assumption is wrong and the setup flaw cannot be solved here, so it's okay to just fail and let
# debugging commence elsewhere.
raise OMFITexception('FAIL: Lengths do not match: stimes & times_to_filter.')
'Assuming that stimes was specified as one scalar per CER channel. This is not a great way to '
'set this up! Do it some other way, please!'
st0 = []
for s, t in zip(tolist(stimes), times_to_filter):
st = s + t * 0 # s should be a scalar and t should be a vector
st0 += [st]
stimes = st0 # stimes should be a list of vectors
'times_to_filter was CER-like list of arrays but stimes was just a vector.\nTransformed each '
'scalar element of stimes into an array vs. time, giving a list of arrays.',
else: # stimes should be 2D
if nchan > 1:
# Everything is fine. stimes and times_to_filter are both CER-like
printd('times_to_filter and stimes were both lists of arrays (CER-like). No transformation performed.', topic='omfit_elm')
# Problem! stimes is CER-like and times_to_filter is Thomson-like. I don't know how to deal with this
# so just fail.
raise OMFITexception("ERROR: stimes has more dimensions than times_to_filter. I can't deal with this")
if (not self.get('success', False)) or (self.detection_settings_used != self['settings']['detection']):
# Run ELM detection if needed
if not self.get('success', False):
fail_msg = 'Aborting ELM filter due to failure in ELM detection ({}#{}).'.format(self.device, self.shot)
if (on_failure or self.on_failure) == 'pass':
return np.ones(len(times_to_filter), bool)
raise OMFITexception(
fail_msg + ' Set on_failure="pass" during OMFITelm init or change self.on_failure="pass" to '
'disable ELM filtering instead of raising an exception for this type of failure.'
# Try to form an ELM filter
if apply_elm_filter:
elm_time = copy.copy(self['elm_time'])
elm_phase = copy.copy(self['elm_phase'])
elm_since = copy.copy(self['since_last_elm'])
# Need to add a point to elm since so that the time after the last detected ELM can be processed correctly
# with regards to elm_since_accept_above.
elm_phase = np.append(elm_phase, max(elm_phase[-1], 0.00)) # If there is no more ELMs, there is no more phase.
elm_since = np.append(elm_since, max(self['settings']['detection']['time_window']) + 1 - max(elm_time))
elm_time = np.append(elm_time, max(self['settings']['detection']['time_window']) + 1)
# Pad max time by 1ms to gurantee that elm_time does not contain repeat time points.
# Inject poinits to allow the correct processing of the time before the first ELM.
elm_time = np.insert(elm_time, 0, [min(elm_time) - 0.2, min(elm_time) - 0.1])
elm_phase = np.insert(elm_phase, 0, [0, 0])
elm_since = np.insert(elm_since, 0, [1000, 1000])
# filterCER had assume_sorted=True here. Faster? How much faster?
elm_since_i = interp1d(elm_time, elm_since, bounds_error=False, fill_value=0.0)
elm_phase_i = interp1d(elm_time, elm_phase, bounds_error=False, fill_value=0.0)
if debug:
OMFIT['scratch']['elm_phase_i'] = copy.copy(elm_phase_i)
except (NameError, KeyError):
self['debug_elm_filter']['elm_phase_i'] = copy.copy(elm_phase_i)
self['debug_elm_filter']['elm_phase_i'] = "see OMFIT['scratch']"
' OMFITelm.filter(): Minimum timestep between ELM timing data: min(diff(elm_time)) = {}. '
printd(' OMFITelm.filter(): elm_time = {}'.format(array_info(elm_time)))
if self.debug_plots:
import matplotlib as mpl
from matplotlib import pyplot
fig, ax = pyplot.subplots(1)
fig.canvas.set_window_title('OMFITelm.filter() debugging')
ax.plot(elm_time[:-1], np.diff(elm_time))
ax.set_xlabel('elm_time[:-1] (ms)')
ax.set_ylabel('diff(elm_time) (ms)')
elm_okay = []
if not cer_mode:
times_to_filter = [times_to_filter] # Make this a 1 element list containing one time array
# If it is cer mode, then times_to_filter should be a many element list containing a time array for
# each channel
if debug:
# Make sure these are defined in case the loop variable is empty
self['debug_elm_filter']['elm_time_in'] = []
self['debug_elm_filter']['elm_phase_ts'] = []
for times_to_f, stime in zip(times_to_filter, stimes): # Loop through CER channels.
# If not doing CER, then there will only be one pass through this loop
if times_to_f is None: # Check for bad channels
printw('Invalid list of times for this channel. Skipping...')
elm_okay_i = True # This should be just one element I think because the None is one element
elm_okay += [elm_okay_i]
continue # Stop processing this channel
# Test 1: does an ELM start within the averaging window? The discontinuity in ELM phase could cause a
# problem with checking the endpoints of the window. This is CER relevant.
elm_start_in_window = (times_to_f * 0).astype(bool) # Setup array
if np.any(np.atleast_1d(stime > 0)): # We can leave this as all False if stime == 0.
twin_end = times_to_f + stime # End of time window
# twin_mid = times_to_f+stime/2.0
for i, t in enumerate(times_to_f): # Loop through CER time slices
# Find all the ELMs that started in this window (if any)
w = np.where((elm_time > times_to_f[i]) * (elm_time < twin_end[i]))[0]
if len(w) > 0: # If at least one ELM started in this window, it's an ELMy window.
elm_start_in_window[i] = True
# Evaluate the interpolation at the middle of the time window
# elm_since_mid = elm_since_i(times_to_f + stime / 2.0)
elm_phase_mid = elm_phase_i(times_to_f + stime / 2.0)
if debug:
self['debug_elm_filter']['elm_time_in'] = copy.copy(times_to_f)
self['debug_elm_filter']['elm_phase_ts'] = copy.copy(elm_phase_mid)
# Elm filter
if strict:
# Evaluate the interpolation at the start/end of the time window and make sure the whole time
# window passes the ELM filter
# Set the elm_since value to use to be the one from the end of the time window
elm_since_use = elm_since_i(times_to_f)
# Find elm phase at the start and end end of the time window
elm_phase_st = elm_phase_i(times_to_f)
elm_phase_end = elm_phase_i(times_to_f + stime)
# Are the endpoints of the time window within the acceptable ELM phase range?
elm_okay_i = (
(elm_phase_st >= self['settings']['filtering']['elm_phase_range'][0])
* (elm_phase_st <= self['settings']['filtering']['elm_phase_range'][1])
* (elm_phase_end >= self['settings']['filtering']['elm_phase_range'][0])
* (elm_phase_end <= self['settings']['filtering']['elm_phase_range'][1])
# Not strict: just test the middle of the window for phase
# Evaluate the interpolation at the middle of the time window
# Set the elm_since value to use to be the one from the middle of the time window
elm_since_use = elm_since_i(times_to_f + stime / 2.0)
# Find ELM phase at the middle of the time window
elm_phase_mid = elm_phase_i(times_to_f + stime / 2.0)
# Is the slice within the acceptable ELM phase range?
elm_okay_i = (elm_phase_mid >= self['settings']['filtering']['elm_phase_range'][0]) * (
elm_phase_mid <= self['settings']['filtering']['elm_phase_range'][1]
if self['settings']['filtering']['elm_since_accept_above'] > 0:
# Has so much time passed since the ELM that we don't care what the ELM phase is anymore?
# (i.e. it's gone ELM-free)
elm_okay_i += elm_since_use > self['settings']['filtering']['elm_since_accept_above']
if (self['settings']['filtering']['elm_since_reject_below'] > -10) and (
min(self['settings']['filtering']['elm_phase_range']) >= 0
# Make sure that this filter does not affect time after detection_window (~last point in elm_time)
w = np.where((times_to_f > min(elm_time)) * (times_to_f < max(elm_time)))
# elm_okay_i[w]: Has so little time passed since the last ELM that we don't want this slice,
# regardless of the phase?
# (i.e. two ELMs in rapid succession made ELM phase ramp really fast and it's misleading).
# Be careful to apply this only to the time range where ELM data exist, because ELM since is set to
# zero outside of this range even though it's not really the same meaning of zero there.
elm_okay_i[w] *= elm_since_use[w] > self['settings']['filtering']['elm_since_reject_below']
printd('elm_since_reject_below =', self['settings']['filtering']['elm_since_reject_below'], topic='omfit_elm')
printd('elm_since_reject_below was <= -10, so this check is disabled', topic='omfit_elm')
# Flag any time where an ELM starts in the averaging window. Relevant for CER where stime > 0.
# Does nothing if stime == 0 because elm_start_in_window should be all False.
elm_okay_i = elm_okay_i * (~elm_start_in_window)
elm_okay += [elm_okay_i]
if not cer_mode and len(elm_okay):
elm_okay = elm_okay[0] # Strip away the 1 element list if not CER mode
# If the ELM filter is not being applied, then mark all slices as okay with respect to ELMs
if cer_mode:
elm_okay = [(t * False + True).astype(bool) for t in times_to_filter]
elm_okay = (times_to_filter * 0 + 1).astype(bool)
printd('ELM filter complete for shot {:}'.format(self.shot), topic='omfit_elm')
return elm_okay
# Plots
[docs] def plot(self, fig=None, axs=None):
Launches default plot, which is normally self.elm_detection_plot(). Can be changed by setting self.default_plot.
:return: (Figure instance, array of Axes instances)
Tuple containing references to figure and axes used by the plot
from matplotlib import pyplot
if fig is None:
fig = pyplot.gcf()
return getattr(self, self.default_plot, self.elm_detection_plot)(fig=fig, axs=axs)
[docs] def elm_frequency_plot(self, time_range=None, time_range_for_mean=None, overlay=False, fig=None, axs=None, quiet=False, **plotkw):
Plots calculated ELM frequency
:param time_range: two element iterable with numbers
The plot will initially zoom to this time range (ms).
:param time_range_for_mean: two element numeric, True, or False/None
- True: time_range will be used to define the interval, then the mean ELM frequency will be calculated
- False or None: no mean ELM frequency calculation
- Two element numeric: mean ELM frequency will be calculated in this interval; can differ from time_range.
:param overlay: bool
Overlay on existing figure instead of making a new one
:param fig: matplotlib Figure instance
Figure to use when overlaying.
:param axs: List of at least two matplotlib Axes instances
Axes to use when overlaying
:param quiet: bool
Convert all print to printd
:param plotkw: Additional keywords are passed to pyplot.plot()
:return: (Figure instance, array of Axes instances)
import matplotlib as mpl
from matplotlib import pyplot
def printq(*args):
if quiet:
printd(*args, topic='OMFITelm')
# Make sure results are ready
if 'elm_freq' not in self:
# Figure setup
if overlay or fig:
if fig is None:
fig = pyplot.gcf()
if axs is None:
ax0 = pyplot.subplot(211)
ax1 = pyplot.subplot(212, sharex=ax0)
axs = [ax0, ax1]
fig, axs = pyplot.subplots(2, sharex='col')
axs[0].set_ylabel(r'$D_{\alpha}$ (AU)')
axs[1].set_xlabel('Time (ms)')
axs[1].set_ylabel('ELM frequency (Hz)')
axs[1].set_title('Frequency calculation method {}'.format(self['elm_freq_method']))
# Data setup
which_fs = self['dalpha_filters']
x = self['dalpha_time']
y = self['dalpha_y']
xf = self['elm_freq_time']
f = self['elm_freq']
if time_range is None:
# Pick time range
tw = self['settings']['detection']['time_window']
time_range = [max([tw[0], x.min()]), min([tw[1], x.max()])]
# Do plot
use_mf = time_range_for_mean is not None and time_range_for_mean is not False
axs[0].plot(x, y, **plotkw)
fllw = plotkw.pop('lw', mpl.rcParams['lines.linewidth'] * (0.5 if use_mf else 1))
fllw = plotkw.pop('linewidth', fllw)
freq_line = axs[1].plot(xf, f, zorder=0, lw=fllw, **plotkw)
if use_mf:
if time_range_for_mean is True:
time_range_for_mean = time_range
w = (x >= time_range_for_mean[0]) & (x <= time_range_for_mean[1])
f1 = interp1d(xf, f, bounds_error=False, fill_value=np.NaN)(x)[w]
mean_freq = np.nanmean(f1)
std_freq = np.nanstd(f1)
printq('Mean ELM frequency during interval {} ms = {:0.3f}+/-{:0.3f} Hz'.format(time_range_for_mean, mean_freq, std_freq))
# Importing from omfit_classes.utils_plot has side effects that may be undesirable in command-line stand-alone testing
contrast_color = contrasting_color(freq_line)
except NameError:
contrast_color = 'r'
ls=' ',
lw=mpl.rcParams['lines.linewidth'] * 1.3 + 1.3,
label=r'$\langle f_{{ELM}} \rangle$ = {:0.2f} Hz'.format(mean_freq),
axs[1].legend(loc=0, numpoints=1)
# Getting at OMFIT plot utilities when doing stand-alone command-line stuff can be a burden
cornernote(shot=self.shot, device=self.device, time='')
except NameError:
fig.text(0.99, 0.01, '{}#{}'.format(self.device, self.shot), fontsize=10, ha='right', transform=pyplot.gcf().transFigure)
return fig, axs
[docs] def elm_detection_plot(self, **kw):
Plots important variables related to ELM detection
:return: (Figure instance, array of Axes instances)
method = self['settings']['detection'].get('detection_method', 0)
method = self.check_detection_inputs(method)
if method == 0:
f, ax = self.classic_elm_detection_plot(**kw)
elif method == 1:
f, ax = self.jump_and_hold_elm_detection_plot(**kw)
raise ValueError('Unrecognized ELM detection method: {}\nValid options are: [0, 1]'.format(method))
return f, ax
[docs] def get_quantity_names(self):
Gives names of quantities like D_alpha or d/dt(T_e) and ELM vs sawtooth
:return: tuple of strings containing:
- Name of the raw Y variable (like D_alpha)
- Name of the event being detected (like ELM or sawtooth) for use in middle of sentence
- Name of event being detected (like ELM or Sawtooth) for use at start of sentence (capitalized 1st letter)
if self.mode == 'sawtooth':
tn = r'\frac{d}{dt}T_e' # thing name
en = 'sawtooth' # event name
cen = 'Sawtooth'
tn = r'D_\alpha'
en = cen = 'ELM'
return tn, en, cen
def _generate_filterscope_label(self):
Set up label for list of "filterscopes", which are now generalized to input signals
:return: string
Label to use for "filterscopes" or other raw input signals
filterscope = self['settings']['detection']['filterscopes']
fsl = tolist(filterscope)
if len(fsl) > 1:
filterscope_label = []
k = 0
while (len(', '.join(filterscope_label)) < 24) and k < (len(fsl)):
filterscope_label += ['{:}'.format(fsl[k])]
k += 1
if k < len(fsl):
filterscope_label += ['...']
filterscope_label = ', '.join(filterscope_label)
filterscope_label = fsl[0]
return filterscope_label
def _get_time_mask(self, crop_data_to_zoom_range=True, time_zoom=None):
Prepares mask for plotting only part of the time history
:param crop_data_to_zoom_range: bool
:param time_zoom: two element numeric iterable
:return: tuple containing:
time_range: a two element list containing the start and stop times in ms
wt: bool array for masking things shaped like dalpha_time.
time_zoom = (
{'DIII-D': [0, 8000], 'NSTX': [0, 6000], 'NSTXU': [0, 6000]}.get(tokamak(self.device), [0, 8000])
if time_zoom is None
else time_zoom
timemin = max([time_zoom[0], min(self['dalpha_time'])])
timemax = min([time_zoom[1], max(self['dalpha_time'])])
time_range = [timemin, timemax]
if crop_data_to_zoom_range:
wt = (self['dalpha_time'] >= timemin) & (self['dalpha_time'] <= timemax)
wt = self['dalpha_time'] > -1e20 # Should be all True
return time_range, wt
[docs] def plot_phase(self, ax=None, standalone=True):
"""Plots ELM phase"""
ax = pyplot.gca() if ax is None else ax
tn, en, cen = self.get_quantity_names()
ax.set_title('{} phase is < 0 during events, > 0 between events'.format(cen))
ax.plot(self['elm_time'], self['elm_phase'], label='{} phase'.format(cen))
ax.axhline(0, color='k', linestyle='-.')
# Add in ELM phase as it is evaluated on TS time base
try: # Might not work: is fastTS even hooked up?
ts_sys0 = fastTS['SETTINGS']['PHYSICS']['TS_systems'][0]
ts_time = fastTS['INPUTS']['current']['filtered_TS'][ts_sys0]['time']
except (NameError, KeyError, TypeError):
elm_phase_ts = interp1d(self['elm_time'], self['elm_phase'], fill_value=0, bounds_error=False)(ts_time)
ax.plot(ts_time, elm_phase_ts, '.', label='{} phase on TS time'.format(cen))
if standalone:
def _generic_plot_annotations(self, f=None, ax=None):
Convenient annotation package to be used for most plots
:param f: Figure instance
:param ax: Axes instance
f = (pyplot.gcf() if ax is None else ax.get_figure()) if f is None else f
ax = f.gca() if ax is None else ax
ax.set_xlabel('Time (ms)')
# Getting at OMFIT plot utilities when doing stand-alone command-line stuff can be a burden
cornernote(shot=self.shot, device=self.device, time='', ax=ax)
except NameError:
f.text(0.99, 0.01, '{}#{}'.format(self.device, self.shot), fontsize=10, ha='right', transform=f.transFigure)
[docs] def plot_signal_with_event_id(self, ax=None, wt=None, shade_elms=True, standalone=True):
Plots the signal used for detection and shades or recolors intervals where events are detected
:param ax: Axes instance
:param wt: array for masking
:param shade_elms: bool
:param standalone: bool
ax = pyplot.gca() if ax is None else ax
tn, en, cen = self.get_quantity_names()
if wt is None:
time_range, wt = self._get_time_mask()
ax.set_title('${}$ with final {} identification in red'.format(tn, en))
ax.plot(self['dalpha_time'][wt], self['dalpha_y'][wt], label='${}$'.format(tn))
yy = copy.copy(self['dalpha_y'])
not_elm = ~self['elm_detection_details']['elm_flag'].astype(bool)
# elm = dets['elm_flag'] # If you needed a flag for when an ELM is happening
yy[not_elm] = np.NaN
xx = copy.copy(self['dalpha_time'])
xx[not_elm] = np.NaN
if shade_elms:
ax.plot(xx[wt], yy[wt], 'r')
ax.fill_between(xx[wt], yy[wt] * 0, yy[wt], color='r', alpha=0.25, label=cen)
ax.plot(xx[wt], yy[wt], 'r', label=cen)
if standalone:
[docs] def plot_more_event_timing_info(self, ax, crop_elm_since=True, standalone=True):
Plots details related to ELM/sawtooth/whatever timing, like time since the last event
:param ax: Axes instance
:param crop_elm_since: bool
:param standalone: bool
ax = pyplot.gca() if ax is None else ax
tn, en, cen = self.get_quantity_names()
ax.set_title('More {} timing info'.format(en))
ax.plot(self['elm_time'], self['since_last_elm'], label='Time since last {} (ms)'.format(en))
ax.plot(self['elm_time'], self['until_next_elm'], label='Time until next {} (ms)'.format(en))
ax.plot(self['elm_time'], self['total_elm_period'], label='Local {} period duration start-start (ms)'.format(en))
ax.plot(self['elm_time'], self['inter_elm_period'], label='Local inter/intra {} period start-end or end-start (ms)'.format(en))
if crop_elm_since > 0:
if crop_elm_since is True:
crop_elm_since = np.mean(self['total_elm_period'])
ax.set_ylim([0, min([crop_elm_since, ax.get_ylim()[1]])])
if standalone:
[docs] def plot_raw_sawtooth_info(self, ax=None, wt=None, standalone=True, shade_elms=True):
Plots raw sawtooth signal (not -dTe/dt). Only makes sense in sawtooth mode.
:param ax: Axes instance
:param wt: array of bools
Mask for time range to plot
:param standalone: bool
:param shade_elms: bool
dets = self['elm_detection_details'] # Shortcut
ax = pyplot.gca() if ax is None else ax
if wt is None:
time_range, wt = self._get_time_mask()
ax.set_title('Raw signal')
ax.plot(self['dalpha_time'][wt], self['raw_sawtooth_input'][wt], label='$T_e$ from {}'.format(self._generate_filterscope_label()))
yy = copy.copy(self['raw_sawtooth_input'])
not_elm = ~dets['elm_flag'].astype(bool)
# elm = dets['elm_flag'] # If you needed a flag for when an ELM is happening
yy[not_elm] = np.NaN
xx = copy.copy(self['dalpha_time'])
xx[not_elm] = np.NaN
if shade_elms:
ax.plot(xx[wt], yy[wt], 'r')
ax.fill_between(xx[wt], yy[wt] * 0, yy[wt], color='r', alpha=0.25, label='Sawteeth')
ax.plot(xx[wt], yy[wt], 'r', label='Sawteeth')
if standalone:
[docs] def plot_hold_correction(self, ax=None, wt=None, time_range=None, standalone=True, shade_elms=True):
Plot explanation of the hold correction to the ELM flag
Only works if detection was done with debug = True
:param ax: Axes instance
:param wt: array of bools
time mask
:param time_range: two element numeric iterable
:param standalone: bool
:param shade_elms: bool
ax = pyplot.gca() if ax is None else ax
if (wt is None) or (time_range is None):
time_range, wt = self._get_time_mask()
if standalone:
t = self['dalpha_time']
y = self['dalpha_y']
min_window = self['settings']['detection']['hold_elm_min_finding_time_window']
starts = self['detect_debugging']['starts']
ends = self['detect_debugging']['ends']
localmins = self['detect_debugging']['elm_det_localmins']
localmax = self['detect_debugging']['elm_det_localmaxs']
thresh = self['detect_debugging']['elm_det_threshs']
new_ends = self['detect_debugging']['elm_det_adv_hold_until']
ax.plot(t[wt], y[wt], label='$y$')
needlab = True
for i in range(len(starts)):
# Mark up individual ELMs
x = [t[starts[i]], t[new_ends[i]]]
if ((np.max(x) >= time_range[0]) and (np.max(x) <= time_range[1])) or (
(np.min(x) >= time_range[0]) and (np.min(x) <= time_range[1])
# Only mark ELMs that are within the time window
xx = t[ends[i] : new_ends[i] + 1]
yy = y[ends[i] : new_ends[i] + 1]
label = 'Hold' if needlab else ''
if shade_elms:
ax.plot(xx, yy, 'r')
ax.fill_between(xx, yy * 0, yy, color='r', alpha=0.25, label=label)
ax.plot(xx, yy, 'r', label=label)
ax.axvline(t[starts[i]], color='r', linestyle=':', label='Start' if needlab else '')
ax.axvline(t[ends[i]], color='r', linestyle='-.', label='Original end' if needlab else '')
ax.axvline(t[new_ends[i]], color='r', linestyle='--', label='New end' if needlab else '')
xmn = [t[starts[i]] - min_window, t[new_ends[i]]]
ax.plot(x, [localmax[i]] * 2, color='k', label='$max_{local}(y)$ level' if needlab else '', linestyle='--')
ax.plot(xmn, [localmins[starts[i]]] * 2, color='g', linestyle='-.', label='$min_{local}(y)$ level' if needlab else '')
ax.plot(x, [thresh[i]] * 2, color='m', label='Local hold-release threshold' if needlab else '')
needlab = False
def _prepare_fig_axs(fig=None, axs=None, nplot=1, figsize=None, legend_outside=False):
Creates figure and axes as needed to accommodate a given number of plots
:param fig: [optional] figure instance
:param axs: [optional] axes instance
:param nplot: int
:param figsize: two element numeric iterable giving the figure size
:param legend_outside: bool
:return: fig, axes to use for plotting, all axes (including blanks to reserve space for a legend)
if fig is not None and axs is not None:
f, ax0 = fig, axs
elif fig is not None and axs is None:
f = fig
nax = nplot * (1 + int(legend_outside))
ax0 = [None] * nax
ax0[0] = pyplot.subplot(nplot, 1 + int(legend_outside), 1)
for i in range(1, nax):
ax0[i] = pyplot.subplot(nplot, 1 + int(legend_outside), i + 1, sharex=ax0[0])
ax0 = np.squeeze(np.atleast_1d(ax0).reshape((nplot, 1 + int(legend_outside))))
f, ax0 = pyplot.subplots(nplot, 1 + int(legend_outside), sharex='all', figsize=figsize)
ax0 = np.atleast_1d(ax0)
if legend_outside:
ax = ax0[:, 0]
for i in range(nplot):
ax0[i, 1].axis('off')
ax = ax0
return f, ax, ax0
[docs] def jump_and_hold_elm_detection_plot(
Plots important variables related to jump & hold ELM detection
This both demonstrates how the ELM detector works and serves as a diagnostic plot that can help with tuning.
:param time_zoom: two element iterable containing numbers
Zoom in to this time range in ms
If None, auto selects the default for the current device
:param crop_data_to_zoom_range: bool
Crop data to range given by time_zoom before calling plot. This can prevent resampling, which will make the
plot better.
:param crop_elm_since: float or bool
float = plot max in ms. True = auto-scale sensibly. False = auto-scale stupidly.
:param show_phase: bool
Plot ELM phase in an additional subplot. Phase is 0 at the end of an ELM, then increases during the
inter-ELM period until it reaches +1 at the start of the next ELM.
Then it jumps to -1 and increases back to 0 during the ELM.
:param show_more_timing: bool
Include an additional subplot showing more timing details like time since last ELM & local ELM period length
:param show_details: bool
Shows details of how ELM detection works (individual terms like DS, DD, etc.) in a set of add'l subplots.
:param hide_y: bool
Turns off numbers on the y-axis and in some legend entries. Useful if you want to consider D_alpha to be in
arbitrary units and don't want to be distracted by numbers.
:param hide_numbers: bool
Hides numerical values of smoothing time scales and other settings used in ELM detection.
Useful if you want shorter legend entries or a simpler looking plot.
:param legend_outside: bool
Place the legends outside of the plots; useful if you have the plots positioned so there is empty space to
the right of them.
:param notitles: bool
Suppress titles on the subplots.
:param shade_elms: bool
On the ELM ID plot, shade between 0 and D_alpha
:param figsize: Two element iterable containing numbers
(X, Y) Figure size in cm
:param fig: Figure instance
Used for overlay
:param axs: List of Axes instances
Used for overlay. Must be at least long enough to accommodate the number of plots ordered.
:return: (Figure instance, array of Axes instances)
from matplotlib import pyplot
printd('Running J&H ELM detection demo plot (shot {:})...'.format(self.shot), topic='omfit_elm')
if 'elm_detection_details' not in self:
elmdet = self['settings']['detection']['{:}_tuning'.format(self['settings']['detection']['smoother'])]
printd('mild_smooth = {}'.format(elmdet['mild_smooth']), topic='omfit_elm')
show_hold = self.debug * self['settings']['detection']['hold_elm_flag_until_low_dalpha']
nplot = (3 + show_hold) * show_details + 1 + show_phase + show_more_timing + int(self.mode == 'sawtooth')
f, ax, ax0 = self._prepare_fig_axs(fig=fig, axs=axs, nplot=nplot, figsize=figsize, legend_outside=legend_outside == 1)
time_range, wt = self._get_time_mask(crop_data_to_zoom_range=crop_data_to_zoom_range, time_zoom=time_zoom)
def draw_legend(axx, ii):
if legend_outside == 2:
axx[ii].legend(loc='center left', bbox_to_anchor=(1, 0.5)).set_draggable(True)
elif legend_outside:
axx[ii, 0].legend(loc='center left', bbox_to_anchor=(1, 0.5)).set_draggable(True)
# List of plots that draw their own legends so they shouldn't get legend calls at the end when all the normal
# legends would be drawn
skip_legends = []
filterscope_label = self._generate_filterscope_label()
tn, en, cen = self.get_quantity_names()
plotnum = 0
# Possible zeroth plot: sawtooth raw input
if self.mode == 'sawtooth':
self.plot_raw_sawtooth_info(ax=ax[plotnum], wt=wt, standalone=False, shade_elms=shade_elms)
plotnum += 1
if show_details:
# First plot: D_alpha, smoothed D_alpha, and localmax
nwin = self['elm_detection_details']['n_win']
def rollit(a, n):
return np.roll(np.concatenate((np.repeat(0, n), a, np.repeat(0, n))), n)[n:-n]
t = self['dalpha_time'][wt]
y = self['dalpha_y'][wt]
ax[plotnum].plot(t, y, label='$y = {}$ (from {:})'.format(tn, filterscope_label))
ys = rollit(self['elm_detection_details']['ys'], nwin)[wt]
localmax = rollit(self['elm_detection_details']['localmax'], nwin)[wt]
ys_label = '$y_s = delay(smooth(y))$'
threshold_label = 'Threshold'
threshold = self['settings']['detection']['find_jump_threshold']
if not hide_numbers:
ys_label += r', $\tau = {:0.2f}, L = {:0.2f}$ ms'.format(
float(self['settings']['detection']['find_jump_presmo']), self['settings']['detection']['find_jump_time_window']
threshold_label += ' = {:0.2f}'.format(threshold)
ax[plotnum].plot(t, ys, label=ys_label)
ax[plotnum].plot(t, localmax, label='$max_{local}(smooth(y))$')
plotnum += 1
dy = self['elm_detection_details']['dy'][wt]
ax[plotnum].plot(t, dy, label='$dy = max_{local}(y_s) - y_s$')
plotnum += 1
ndy = self['elm_detection_details']['ndy'][wt]
ax[plotnum].plot(t, ndy, label='Normalized $dy$')
ax[plotnum].axhline(threshold, color='m', label=threshold_label)
yy = copy.copy(ndy)
yy[ndy < threshold] = np.NaN
olabel = 'Jump detected'
if shade_elms:
ax[plotnum].plot(t, yy, 'r')
ax[plotnum].fill_between(t, yy * 0, yy, color='r', alpha=0.25, label=olabel)
ax[plotnum].plot(t, yy, 'r', label=olabel)
plotnum += 1
if self.debug:
# Only works if debug data are saved
self.plot_hold_correction(ax=ax[plotnum], wt=wt, time_range=time_range, standalone=False, shade_elms=shade_elms)
plotnum += 1
# Next...
# D_alpha with ELM identification (or -d/dt(T_e) with sawtooth ID)
self.plot_signal_with_event_id(ax=ax[plotnum], wt=wt, shade_elms=shade_elms, standalone=False)
if show_phase:
# ELM phase
plotnum += 1
self.plot_phase(ax=ax[plotnum], standalone=False)
if show_more_timing:
# Misc. ELM timing info
plotnum += 1
self.plot_more_event_timing_info(ax=ax[plotnum], crop_elm_since=crop_elm_since, standalone=False)
for i in range(nplot):
if i not in skip_legends:
draw_legend(ax0, i)
if notitles:
if hide_y:
self._generic_plot_annotations(f, ax[-1])
printd('Done with elm det demo {:}'.format(self.shot), topic='omfit_elm')
return f, ax0
[docs] def classic_elm_detection_plot(
Plots important variables related to classic ELM detection
This both demonstrates how the ELM detector works and serves as a diagnostic plot that can help with tuning.
:param time_zoom: two element iterable containing numbers
Zoom in to this time range in ms
If None, auto selects the default for the current device
:param crop_data_to_zoom_range: bool
Crop data to range given by time_zoom before calling plot. This can prevent resampling, which will make the
plot better.
:param crop_elm_since: float or bool
float = plot max in ms. True = auto-scale sensibly. False = auto-scale stupidly.
:param show_phase: bool
Plot ELM phase in an additional subplot. Phase is 0 at the end of an ELM, then increases during the
inter-ELM period until it reaches +1 at the start of the next ELM.
Then it jumps to -1 and increases back to 0 during the ELM.
:param show_more_timing: bool
Include an additional subplot showing more timing details like time since last ELM & local ELM period length
:param show_details: bool
Shows details of how ELM detection works (individual terms like DS, DD, etc.) in a set of add'l subplots.
:param hide_y: bool
Turns off numbers on the y-axis and in some legend entries. Useful if you want to consider D_alpha to be in
arbitrary units and don't want to be distracted by numbers.
:param hide_numbers: bool
Hides numerical values of smoothing time scales and other settings used in ELM detection.
Useful if you want shorter legend entries or a simpler looking plot.
:param legend_outside: bool
Place the legends outside of the plots; useful if you have the plots positioned so there is empty space to
the right of them.
:param notitles: bool
Suppress titles on the subplots.
:param shade_elms: bool
On the ELM ID plot, shade between 0 and D_alpha
:param figsize: Two element iterable containing numbers
(X, Y) Figure size in cm
:param fig: Figure instance
Used for overlay
:param axs: List of Axes instances
Used for overlay. Must be at least long enough to accommodate the number of plots ordered.
:return: (Figure instance, array of Axes instances)
from matplotlib import pyplot
printd('Running classic ELM detection demo plot (shot {:})...'.format(self.shot), topic='omfit_elm')
if 'elm_detection_details' not in self:
elmdet = self['settings']['detection']['{:}_tuning'.format(self['settings']['detection']['smoother'])]
dets = self['elm_detection_details']
printd('mild_smooth = {}'.format(elmdet['mild_smooth']), topic='omfit_elm')
if self['settings']['detection']['detection_method'] != 0 and show_details:
print('"show_details" is currently incompatible with selected ELM detection method and has been disabled')
show_details = False
show_hold = self.debug * self['settings']['detection']['hold_elm_flag_until_low_dalpha']
nplot = (4 + show_hold) * show_details + 1 + show_phase + show_more_timing + int(self.mode == 'sawtooth')
f, ax, ax0 = self._prepare_fig_axs(fig=fig, axs=axs, nplot=nplot, figsize=figsize, legend_outside=legend_outside == 1)
time_range, wt = self._get_time_mask(crop_data_to_zoom_range=crop_data_to_zoom_range, time_zoom=time_zoom)
def draw_legend(axx, ii):
if legend_outside == 2:
axx[ii].legend(loc='center left', bbox_to_anchor=(1, 0.5)).set_draggable(True)
elif legend_outside:
axx[ii, 0].legend(loc='center left', bbox_to_anchor=(1, 0.5)).set_draggable(True)
# List of plots that draw their own legends so they shouldn't get legend calls at the end when all the normal
# legends would be drawn
skip_legends = []
filterscope_label = self._generate_filterscope_label()
tn, en, cen = self.get_quantity_names()
plotnum = 0
# Possible zeroth plot: sawtooth raw input
if self.mode == 'sawtooth':
self.plot_raw_sawtooth_info(ax=ax[plotnum], wt=wt, standalone=False)
plotnum += 1
if show_details:
# First plot: D_alpha and smoothed D_alpha
ax[plotnum].set_title('Signal and smoothed signal')
ax[plotnum].plot(self['dalpha_time'][wt], self['dalpha_y'][wt], label='$y = {}$ (from {:})'.format(tn, filterscope_label))
if hide_numbers:
tmplabel1 = '$S1 = $MildSmooth(y)'
tmplabel2 = '$S2 = $AggressiveSmooth(y)'
tmplabel1 = r'$S1 = $MildSmooth(y), $\tau={:}$ ms'.format(elmdet['mild_smooth'])
tmplabel2 = r'$S2 = $AggressiveSmooth(y), $\tau={:}$ ms'.format(elmdet['mild_smooth'] * elmdet['heavy_smooth_factor'])
ax[plotnum].plot(self['dalpha_time'][wt], dets['s1'][wt], label=tmplabel1)
ax[plotnum].plot(self['dalpha_time'][wt], dets['s2'][wt], label=tmplabel2)
plotnum += 1
# Difference of smoothed D_alpha vs. threshold
ax[plotnum].set_title('Difference of smooths & threshold; red = identified as {} by this test'.format(en))
ax[plotnum].plot(self['dalpha_time'][wt], dets['ds'][wt], label='$DS = S1-S2$')
if hide_y:
tmplabel = r'Threshold$ = {:0.2f} \times $max(DS)'.format(elmdet['threshold_on_difference_of_smooths'])
tmplabel = r'Threshold$ = {:+0.4f} = {:+0.4f} =\times $max(DS)'.format(
dets['thresh1'], elmdet['threshold_on_difference_of_smooths']
if hide_numbers:
tmplabel = 'Threshold'
ax[plotnum].axhline(dets['thresh1'], color='k', linestyle='--', label=tmplabel)
dyt = dets['dyt']
y1 = dets['ds'] + 0
x1 = self['dalpha_time'] + 0
y1[np.where(dyt <= 0)] = None
x1[np.where(dyt <= 0)] = None
ax[plotnum].plot(x1[wt], y1[wt], 'r', label=cen)
ax[plotnum].axhline(0, color='k', linestyle=':', label='Zero')
plotnum += 1
# Derivative of D_alpha and smoothed derivative
ax[plotnum].set_title('Derivative of smooth & more smoothed derivative')
if elmdet['mild_smooth_derivative_factor'] > 0:
if hide_numbers:
tmplabel1 = '$D1 = $MildSmooth$($Deriv$(S1))$'
tmplabel1 = r'$D1 = $MildSmooth$($Deriv$(S1))$, $\tau={:}$ ms'.format(
elmdet['mild_smooth'] * elmdet['mild_smooth_derivative_factor']
tmplabel1 = '$D1 = $Deriv$(S1)$'
if hide_numbers:
tmplabel2 = '$D2 = $AggressiveSmooth$(D1)$'
tmplabel2 = r'$D2 = $AggressiveSmooth$(D1)$, $\tau={:}$ ms'.format(
elmdet['mild_smooth'] * elmdet['heavy_smooth_derivative_factor']
ax[plotnum].plot(self['dalpha_time'][wt], dets['d1'][wt], label=tmplabel1)
ax[plotnum].plot(self['dalpha_time'][wt], dets['d2'][wt], label=tmplabel2)
ax[plotnum].axhline(0, color='k', linestyle=':')
plotnum += 1
# Difference of smoothed derivatives and threshold
ax[plotnum].set_title('Difference of derivatives & thresholds; red = identified as {} by this test'.format(en))
ax[plotnum].plot(self['dalpha_time'][wt], dets['dd'][wt], label='$DD = D1-D2$')
if hide_y:
if elmdet['d_thresh_enhance_factor'] == 1:
tmplabel1 = r'Threshold$ = {:0.2f} \times $max(DD)'.format(
tmplabel1 = r'Threshold$ = {:0.2f}$ or ${:0.2f} \times $max(DD)'.format(
elmdet['threshold_on_difference_of_smoothed_derivatives_plus'] * elmdet['d_thresh_enhance_factor'],
tmplabel2 = r'Threshold$ = {:0.2f} \times $min(DD)'.format(elmdet['threshold_on_difference_of_smoothed_derivatives_minus'])
if elmdet['d_thresh_enhance_factor'] == 1:
tmplabel1 = r'Threshold$ = {:+0.4f} = {:+0.4f} \times $max(DD)'.format(
np.mean(dets['thresh2']), elmdet['threshold_on_difference_of_smoothed_derivatives_plus']
tmplabel1 = r'Threshold$ = {:+0.4f}$ or ${:+0.4f} = {:+0.4f}$ or ${:+0.4f} \times $max(DD)'.format(
elmdet['threshold_on_difference_of_smoothed_derivatives_plus'] * elmdet['d_thresh_enhance_factor'],
if elmdet['neg_d_thresh_enhance_factor'] == 1:
tmplabel2 = r'Threshold$ = {:+0.4f} = {:+0.4f} \times $min(DD)'.format(
np.mean(dets['thresh3']), elmdet['threshold_on_difference_of_smoothed_derivatives_minus']
tmplabel2 = r'Threshold$ = {:+0.4f}$ or ${:+0.4f} = {:+0.4f}$ or ${:+0.4f} \times $min(DD)'.format(
elmdet['threshold_on_difference_of_smoothed_derivatives_minus'] * elmdet['neg_d_thresh_enhance_factor'],
if hide_numbers:
tmplabel1 = 'Threshold'
tmplabel2 = None
if elmdet['d_thresh_enhance_factor'] == 1:
ax[plotnum].axhline(np.mean(dets['thresh2']), color='k', linestyle='--', label=tmplabel1)
ax[plotnum].plot(self['dalpha_time'][wt], dets['thresh2'][wt], color='k', linestyle='--', label=tmplabel1)
if elmdet['neg_d_thresh_enhance_factor'] == 1:
ax[plotnum].axhline(np.mean(dets['thresh3']), color='k', linestyle='--', label=tmplabel2)
ax[plotnum].plot(self['dalpha_time'][wt], dets['thresh3'][wt], color='k', linestyle='--', label=tmplabel2)
ax[plotnum].axhline(0, color='k', linestyle=':')
dd2 = dets['dd2']
dd3 = dets['dd3']
x2 = self['dalpha_time'] + 0
y2 = dets['dd'] + 0
x2[(dd2 <= 0) * (dd3 <= 0)] = None
y2[(dd2 <= 0) * (dd3 <= 0)] = None
ax[plotnum].plot(x2[wt], y2[wt], 'r', label=cen)
plotnum += 1
if show_hold:
self.plot_hold_correction(ax=ax[plotnum], wt=wt, time_range=time_range, standalone=False, shade_elms=shade_elms)
plotnum += 1
# Next...
# D_alpha with ELM identification (or -d/dt(T_e) with sawtooth ID)
self.plot_signal_with_event_id(ax=ax[plotnum], wt=wt, shade_elms=shade_elms, standalone=False)
if show_phase:
# ELM phase
plotnum += 1
self.plot_phase(ax=ax[plotnum], standalone=False)
if show_more_timing:
# Misc. ELM timing info
plotnum += 1
self.plot_more_event_timing_info(ax=ax[plotnum], crop_elm_since=crop_elm_since, standalone=False)
for i in range(nplot):
if i not in skip_legends:
draw_legend(ax0, i)
if notitles:
if hide_y:
self._generic_plot_annotations(f, ax[-1])
printd('Done with elm det demo {:}'.format(self.shot), topic='omfit_elm')
return f, ax0
if __name__ == '__main__':
# Default functionality: Just test OMFITelm.__init__()
# To run the unit tests, provide a number > 0 or the word test as a command line argument while calling
if len(sys.argv) > 1:
test_flag = int(sys.argv[1])
except ValueError:
test_flag = int(sys.argv[1] == 'test')
test_flag = 0
if test_flag > 0:
manage_tests(TestOMFITelm, failfast=False)
OMFITelm(shot=154754) # Test init, but don't try to use shot 0 as it will try to connect to d3drdb