Source code for omfit_classes.omfit_timcon

try:
    # framework is running
    from .startup_choice import *
except ImportError as _excp:
    # class is imported by itself
    if (
        'attempted relative import with no known parent package' in str(_excp)
        or 'No module named \'omfit_classes\'' in str(_excp)
        or "No module named '__main__.startup_choice'" in str(_excp)
    ):
        from startup_choice import *
    else:
        raise

from omfit_classes.omfit_ascii import OMFITascii

from scipy import signal

__all__ = ['OMFITtimcon']


[docs]class OMFITtimcon(SortedDict, OMFITascii): """ OMFIT class for parsing/writing the NBI time traces of the DIII-D PCS """ def __init__(self, filename): OMFITascii.__init__(self, filename) SortedDict.__init__(self) self.dynaLoad = True
[docs] @dynaLoad def load(self): with open(self.filename, 'r') as _f: lines = _f.read().split('\n') nummod = int(lines[0].split('=')[1]) numphases = int(lines[1].split('=')[1]) k = 2 mode_lookup = {0: 'NoMod', 1: 'PCS', 2: 'TIMCON', 3: 'CALORIMETER', 4: r'PCS/TIMCON'} states_lookup = { 0: 'PRI:AnyGen', 1: 'PRI:LikeSub', 2: 'PRI:NoSub', 3: 'PRI:SpecialA', 4: 'PRI:SpecialB', 5: 'PRI:SpecialC', 6: 'SEC:Gen/Like', 7: 'SEC:SpecialA', 8: 'SEC:SpecialB', 9: 'SEC:SpecialC', } for mod in range(nummod): tmp = lines[k].split('-') modname = tmp[0].strip().split()[1] self[modname] = SortedDict() tmp = tmp[1].split() self[modname]['enabled'] = bool(int(tmp[0])) self[modname]['total_on_time'] = float(tmp[1]) self[modname]['mode'] = int(tmp[2]) self[modname]['mode_txt'] = mode_lookup[self[modname]['mode']] self[modname]['beam_status'] = int(tmp[3]) self[modname]['beam_status_txt'] = states_lookup[self[modname]['beam_status']] self[modname]['sub_beam'] = self[modname]['mode_txt'] == 'PCS' and self[modname]['beam_status_txt'].find('SEC') == 0 self[modname]['sub_priority'] = int(tmp[4]) self[modname]['__unknown__'] = int(tmp[5]) k += 1 for phase in range(1, numphases + 1): tmp = list(map(eval, lines[k].split(':')[1].split())) self[modname]['PHASE%d' % phase] = SortedDict() for k1, item in enumerate(['__enabled__', 'start', 'on', 'off', 'duration', 'pulses']): self[modname]['PHASE%d' % phase][item] = tmp[k1] # note the timcon files generated by timcon always have the enabled phase switched turned on (that's why we hide it here) self[modname]['PHASE%d' % phase]['__enabled__'] = bool(self[modname]['PHASE%d' % phase]['__enabled__']) k += 1
[docs] @dynaSave def save(self): lines = [] lines.append('nummod = %d' % len(self)) numphases = 100 for modname in self: numphases = min([numphases, len([phase for phase in self[modname] if phase.startswith('PHASE')])]) lines.append('numphases = %d' % numphases) for kmod, modname in enumerate(self): lines.append( ( 'SRC%d %s - {enabled:d} {total_on_time:0.6f} {mode:d} {beam_status:d} {sub_priority:d} {__unknown__:d}' % (kmod + 1, modname) ).format(**self[modname]) ) for phase in self[modname]: if not phase.startswith('PHASE'): continue lines.append( ( '%s: %d %d %d %d %d %0.6f' % ( phase, self[modname][phase]['__enabled__'], self[modname][phase]['start'], self[modname][phase]['on'], self[modname][phase]['off'], self[modname][phase]['duration'], self[modname][phase]['pulses'], ) ) ) with open(self.filename, 'w') as f: f.write('\n'.join(lines) + '\n')
[docs] @dynaLoad def waveform(self, beam, x, y=None, phases=None, duty_cycle_1=False): if phases is None: phases = [phase for phase in self[beam] if phase.startswith('PHASE')] else: phases = tolist(phases) if y is None: y = x * 0 starts = np.array([self[beam][phase]['start'] for phase in phases]) durations = np.array([self[beam][phase]['duration'] for phase in phases]) ons = np.array([self[beam][phase]['on'] for phase in phases]) ons *= np.array([self[beam]['enabled'] * self[beam][phase]['__enabled__'] for phase in phases]) offs = np.array([self[beam][phase]['off'] for phase in phases]) for k in range(len(phases)): x0 = starts[k] dur = durations[k] on1 = ons[k] off1 = offs[k] if on1 == 0: # abort if no on time continue # calculations x1 = x0 + dur if duty_cycle_1: period = 100 duty = 1.0 else: period = on1 + off1 duty = on1 / float(period) # form the output w = np.where((x > x0) * (x < x1)) tt = (x[w] - x0) * 2 * np.pi / period y[w] = (signal.square(tt, duty=duty) + 1.0) / 2.0 return x, y
[docs] @dynaLoad def plot_waveform(self, beam, ax=None, show_legend=True): x = np.linspace(0, 10000, 10000) if ax is None: ax = pyplot.gca() x, y = self.waveform(beam, x) color_blind_color_cycle = [ '#1f77b4', '#ff7f0e', '#2ca02c', '#d62728', '#9467bd', '#8c564b', '#e377c2', '#7f7f7f', '#bcbd22', '#17becf', ] label = f"{self[beam]['mode_txt']} {self[beam]['beam_status_txt']}" ax.plot(x, y, color=color_blind_color_cycle[self[beam]['mode']], label=label) ax.set_ylim([0, 2]) ax.set_ylabel(beam) for kphase, phase in enumerate(self[beam]): if not phase.startswith('PHASE'): continue ax.axvspan( self[beam][phase]['start'], self[beam][phase]['start'] + self[beam][phase]['duration'], color=['lightcoral', 'turquoise'][np.mod(kphase, 2)], alpha=0.15, linewidth=0, ) ax.set_yticks([0, 1]) ax.set_xlabel('Time (ms)') if show_legend: ax.legend(loc=0, frameon=False, text_same_color=True, hide_markers=True, fontsize='small') return x, y
[docs] @dynaLoad def plot(self): x = np.linspace(0, 10000, 10000) y = x * 0 nrow = len(self) + 1 # what magic is making this method create a new figure? # it is not transparent how I a) pass an existing figure for over-plotting, b) specify the size :( fig = pyplot.gcf() fig.set_size_inches(8, 9) ax = pyplot.subplot(nrow, 1, 1) ax.set_xlim(x[0], x[-1]) # have to get the x10^4 label in now so the next axes hide it ax.set_ylabel('ALL') ax.set_ylim(0, len(self) + 2) for i, beam in enumerate(self): ai = pyplot.subplot(nrow, 1, i + 2, sharex=ax) x, y0 = self.plot_waveform(beam, ax=ai) y += y0 ax.plot(x, y, 'k') autofmt_sharex(hspace=0.1)