Source code for omfit_classes.omfit_timcon
try:
# framework is running
from .startup_choice import *
except ImportError as _excp:
# class is imported by itself
if (
'attempted relative import with no known parent package' in str(_excp)
or 'No module named \'omfit_classes\'' in str(_excp)
or "No module named '__main__.startup_choice'" in str(_excp)
):
from startup_choice import *
else:
raise
from omfit_classes.omfit_ascii import OMFITascii
from scipy import signal
__all__ = ['OMFITtimcon']
[docs]class OMFITtimcon(SortedDict, OMFITascii):
"""
OMFIT class for parsing/writing the NBI time traces of the DIII-D PCS
"""
def __init__(self, filename):
OMFITascii.__init__(self, filename)
SortedDict.__init__(self)
self.dynaLoad = True
[docs] @dynaLoad
def load(self):
with open(self.filename, 'r') as _f:
lines = _f.read().split('\n')
nummod = int(lines[0].split('=')[1])
numphases = int(lines[1].split('=')[1])
k = 2
mode_lookup = {0: 'NoMod', 1: 'PCS', 2: 'TIMCON', 3: 'CALORIMETER', 4: r'PCS/TIMCON'}
states_lookup = {
0: 'PRI:AnyGen',
1: 'PRI:LikeSub',
2: 'PRI:NoSub',
3: 'PRI:SpecialA',
4: 'PRI:SpecialB',
5: 'PRI:SpecialC',
6: 'SEC:Gen/Like',
7: 'SEC:SpecialA',
8: 'SEC:SpecialB',
9: 'SEC:SpecialC',
}
for mod in range(nummod):
tmp = lines[k].split('-')
modname = tmp[0].strip().split()[1]
self[modname] = SortedDict()
tmp = tmp[1].split()
self[modname]['enabled'] = bool(int(tmp[0]))
self[modname]['total_on_time'] = float(tmp[1])
self[modname]['mode'] = int(tmp[2])
self[modname]['mode_txt'] = mode_lookup[self[modname]['mode']]
self[modname]['beam_status'] = int(tmp[3])
self[modname]['beam_status_txt'] = states_lookup[self[modname]['beam_status']]
self[modname]['sub_beam'] = self[modname]['mode_txt'] == 'PCS' and self[modname]['beam_status_txt'].find('SEC') == 0
self[modname]['sub_priority'] = int(tmp[4])
self[modname]['__unknown__'] = int(tmp[5])
k += 1
for phase in range(1, numphases + 1):
tmp = list(map(eval, lines[k].split(':')[1].split()))
self[modname]['PHASE%d' % phase] = SortedDict()
for k1, item in enumerate(['__enabled__', 'start', 'on', 'off', 'duration', 'pulses']):
self[modname]['PHASE%d' % phase][item] = tmp[k1]
# note the timcon files generated by timcon always have the enabled phase switched turned on (that's why we hide it here)
self[modname]['PHASE%d' % phase]['__enabled__'] = bool(self[modname]['PHASE%d' % phase]['__enabled__'])
k += 1
[docs] @dynaSave
def save(self):
lines = []
lines.append('nummod = %d' % len(self))
numphases = 100
for modname in self:
numphases = min([numphases, len([phase for phase in self[modname] if phase.startswith('PHASE')])])
lines.append('numphases = %d' % numphases)
for kmod, modname in enumerate(self):
lines.append(
(
'SRC%d %s - {enabled:d} {total_on_time:0.6f} {mode:d} {beam_status:d} {sub_priority:d} {__unknown__:d}'
% (kmod + 1, modname)
).format(**self[modname])
)
for phase in self[modname]:
if not phase.startswith('PHASE'):
continue
lines.append(
(
'%s: %d %d %d %d %d %0.6f'
% (
phase,
self[modname][phase]['__enabled__'],
self[modname][phase]['start'],
self[modname][phase]['on'],
self[modname][phase]['off'],
self[modname][phase]['duration'],
self[modname][phase]['pulses'],
)
)
)
with open(self.filename, 'w') as f:
f.write('\n'.join(lines) + '\n')
[docs] @dynaLoad
def waveform(self, beam, x, y=None, phases=None, duty_cycle_1=False):
if phases is None:
phases = [phase for phase in self[beam] if phase.startswith('PHASE')]
else:
phases = tolist(phases)
if y is None:
y = x * 0
starts = np.array([self[beam][phase]['start'] for phase in phases])
durations = np.array([self[beam][phase]['duration'] for phase in phases])
ons = np.array([self[beam][phase]['on'] for phase in phases])
ons *= np.array([self[beam]['enabled'] * self[beam][phase]['__enabled__'] for phase in phases])
offs = np.array([self[beam][phase]['off'] for phase in phases])
for k in range(len(phases)):
x0 = starts[k]
dur = durations[k]
on1 = ons[k]
off1 = offs[k]
if on1 == 0:
# abort if no on time
continue
# calculations
x1 = x0 + dur
if duty_cycle_1:
period = 100
duty = 1.0
else:
period = on1 + off1
duty = on1 / float(period)
# form the output
w = np.where((x > x0) * (x < x1))
tt = (x[w] - x0) * 2 * np.pi / period
y[w] = (signal.square(tt, duty=duty) + 1.0) / 2.0
return x, y
[docs] @dynaLoad
def plot_waveform(self, beam, ax=None, show_legend=True):
x = np.linspace(0, 10000, 10000)
if ax is None:
ax = pyplot.gca()
x, y = self.waveform(beam, x)
color_blind_color_cycle = [
'#1f77b4',
'#ff7f0e',
'#2ca02c',
'#d62728',
'#9467bd',
'#8c564b',
'#e377c2',
'#7f7f7f',
'#bcbd22',
'#17becf',
]
label = f"{self[beam]['mode_txt']} {self[beam]['beam_status_txt']}"
ax.plot(x, y, color=color_blind_color_cycle[self[beam]['mode']], label=label)
ax.set_ylim([0, 2])
ax.set_ylabel(beam)
for kphase, phase in enumerate(self[beam]):
if not phase.startswith('PHASE'):
continue
ax.axvspan(
self[beam][phase]['start'],
self[beam][phase]['start'] + self[beam][phase]['duration'],
color=['lightcoral', 'turquoise'][np.mod(kphase, 2)],
alpha=0.15,
linewidth=0,
)
ax.set_yticks([0, 1])
ax.set_xlabel('Time (ms)')
if show_legend:
ax.legend(loc=0, frameon=False, text_same_color=True, hide_markers=True, fontsize='small')
return x, y
[docs] @dynaLoad
def plot(self):
x = np.linspace(0, 10000, 10000)
y = x * 0
nrow = len(self) + 1
# what magic is making this method create a new figure?
# it is not transparent how I a) pass an existing figure for over-plotting, b) specify the size :(
fig = pyplot.gcf()
fig.set_size_inches(8, 9)
ax = pyplot.subplot(nrow, 1, 1)
ax.set_xlim(x[0], x[-1]) # have to get the x10^4 label in now so the next axes hide it
ax.set_ylabel('ALL')
ax.set_ylim(0, len(self) + 2)
for i, beam in enumerate(self):
ai = pyplot.subplot(nrow, 1, i + 2, sharex=ax)
x, y0 = self.plot_waveform(beam, ax=ai)
y += y0
ax.plot(x, y, 'k')
autofmt_sharex(hspace=0.1)