Source code for omfit_classes.omfit_gato

try:
    # framework is running
    from .startup_choice import *
except ImportError as _excp:
    # class is imported by itself
    if (
        'attempted relative import with no known parent package' in str(_excp)
        or 'No module named \'omfit_classes\'' in str(_excp)
        or "No module named '__main__.startup_choice'" in str(_excp)
    ):
        from startup_choice import *
    else:
        raise


from omfit_classes.omfit_ascii import OMFITascii
from omfit_classes.omfit_json import OMFITjson
from omfit_classes.sortedDict import SortedDict
import fortranformat
import numpy as np
from scipy import integrate, interpolate

__all__ = ['OMFITdskgato', 'OMFITo4gta', 'OMFITgatohelp', 'OMFITgatoDiagnostic']


[docs]class OMFITdskgato(SortedDict, OMFITascii): r""" OMFIT class used to interface to equilibria files generated by GATO (.dskgato files) NOTE: this object is "READ ONLY", meaning that the changes to the entries of this object will not be saved to a file. Method .save() could be written if becomes necessary :param filename: filename passed to OMFITobject class :param \**kw: keyword dictionary passed to OMFITobject class """ def __init__(self, filename='', **kw): OMFITascii.__init__(self, filename, **kw) SortedDict.__init__(self) self.dynaLoad = True
[docs] @dynaLoad def load(self): # Parser based on prgen_reqd_dskgato.f90 in GACODE with open(self.filename, 'r') as f: lines = f.read() fr = fortranformat.FortranRecordReader f10 = fr('1p4e19.12') def read_array(n=None): tmp = [] if n is None: while True: try: tmp.extend(f10.read(next(lines_iter))) except Exception: return np.array(tmp).astype(float) else: for k in range(n // 4 + 1): tmp.extend(f10.read(next(lines_iter))) return np.array(tmp).astype(float)[:n] lines_iter = iter(lines.split('\n')) self.clear() self['__header__'] = '' while True: try: tmp = next(lines_iter) if len(tmp.split()) == 2: self['NSURF'], self['NTHT'] = list(map(int, tmp.split()[:2])) self['SYMMETRIC'] = 1 break elif len(tmp.split()) == 3: self['NSURF'], self['NTHT'], self['SYMMETRIC'] = list(map(int, tmp.split()[:3])) break except ValueError: self['__header__'] += tmp self['NSURF'] = self['NSURF'] - 1 # Accounting for magnetic axis self['NARC'] = 2 * (self['NTHT'] - 1) + 1 # ! Repeat point self['RCENTR'], self['RMAXIS'], self['ZMAXIS'], self['BCENTR'] = f10.read(next(lines_iter)) self['CURRENT'], self['axddxz'], dummy, dummy = f10.read(next(lines_iter)) self['PSI'] = read_array(self['NSURF']) self['FPOL'] = read_array(self['NSURF']) self['FFPRIM'] = read_array(self['NSURF']) self['PRES'] = read_array(self['NSURF']) self['PPRIME'] = read_array(self['NSURF']) self['QPSI'] = read_array(self['NSURF']) self['NE'] = read_array(self['NSURF']) self['SEQDPDR'] = read_array(self['NTHT']) self['SEQDPDZ'] = read_array(self['NTHT']) for item in ['R', 'Z', 'Br', 'Bz']: try: self[item] = np.reshape(read_array(self['NTHT'] * (self['NSURF'] + 1)), (-1, self['NSURF'] + 1)) except StopIteration: break except ValueError: # sometimes Br and Bz may not be written (eg. by TOQ code) self[item] = [] # Parse extra namelist that is sometimes appended at the bottom (eg. by TOQ code) if False and '&' in lines: from omfit_classes.namelist import NamelistFile nml_txt = '&' + '&'.join(lines.split('&')[1:]).strip() self['AuxNamelist'] = NamelistFile(input_string=nml_txt, nospaceIsComment=True, bang_comment_symbol='<', collect_arrays=False) self.add_derived()
[docs] def add_derived(self): """ Compute additional quantities that are not in the DSKGATO file itself """ self['SIMAG'] = self['PSI'][0] self['SIBRY'] = self['PSI'][-1] # Calculate RHOVN from PSI and Q profile phi = integrate.cumtrapz(self['QPSI'], np.linspace(self['SIMAG'], self['SIBRY'], len(self['QPSI'])), initial=0) self['RHOVN'] = np.sqrt(np.abs(2 * np.pi * phi / (np.pi * self['BCENTR']))) self['RHOVN'] = self['RHOVN'] / np.max(self['RHOVN']) # Boundaries if 'R' in self and 'Z' in self: self['RADIAL'] = np.sqrt((self['R'] - self['RMAXIS']) ** 2 + (self['Z'] - self['ZMAXIS']) ** 2) self['THETA'] = np.arctan2((self['Z'] - self['ZMAXIS']), self['R'] - self['RMAXIS']) R = self['R'] Z = self['Z'] if self['SYMMETRIC']: R = np.vstack((R, R[self['NSURF'] - 1 :: -1, :])) Z = np.vstack((Z, -Z[self['NSURF'] - 1 :: -1, :])) self['RBBBS'] = R[:, -1] self['ZBBBS'] = Z[:, -1]
[docs] def plot(self, usePsi=False, only2D=False, **kw): r""" Function used to plot dskgato-files. This plot shows flux surfaces in the vessel, pressure, q profiles, P' and FF' :param usePsi: In the plots, use psi instead of rho :param only2D: only make flux surface plot :param levels: list of sorted numeric values to pass to 2D plot as contour levels :param label: plot item label to apply lines in 1D plots (only the q plot has legend called by the geqdsk class itself) and to the boundary contour in the 2D plot (this plot doesn't call legend by itself) :param ax: Axes instance to plot in when using only2D :param \**kw: Standard plot keywords (e.g. color, linewidth) will be passed to Axes.plot() calls. """ import matplotlib from matplotlib import pyplot label = kw.pop('label', None) # Take this out so the legend doesn't get spammed by repeated labels def plot2D(ax, **kw): kw1 = copy.copy(kw) kw1['linewidth'] = kw['linewidth'] + 1 # boundary ax.plot(self['RBBBS'], self['ZBBBS'], label=label, **kw1) # get the color kw1.setdefault('color', ax.lines[-1].get_color()) if 'R' in self and 'Z' in self: # number of levels m = int(np.ceil(self['NSURF'] / 10.0)) # flux surfaces R = self['R'] Z = self['Z'] if self['SYMMETRIC']: R = np.vstack((R, R[self['NSURF'] - 1 :: -1, :])) Z = np.vstack((Z, -Z[self['NSURF'] - 1 :: -1, :])) if usePsi: psin = (self['PSI'] - self['PSI'][0]) / (self['PSI'][-1] - self['PSI'][0]) R = interpolate.interp1d(psin, R[:, 1:])(np.linspace(0, 1, 10)) Z = interpolate.interp1d(psin, Z[:, 1:])(np.linspace(0, 1, 10)) else: R = interpolate.interp1d(self['RHOVN'], R[:, 1:])(np.linspace(0, 1, 10)) Z = interpolate.interp1d(self['RHOVN'], Z[:, 1:])(np.linspace(0, 1, 10)) R = np.vstack((R, R[0, :] * np.nan)).T Z = np.vstack((Z, Z[0, :] * np.nan)).T ax.plot(R.flatten(), Z.flatten(), **kw) # magnetic axis ax.plot(self['RMAXIS'], self['ZMAXIS'], '+', **kw1) # aspect_ratio ax.set_aspect('equal') ax.set_xlim((np.array(ax.get_xlim()) - self['RMAXIS']) * 1.05 + self['RMAXIS']) ax.set_ylim((np.array(ax.get_ylim()) - self['ZMAXIS']) * 1.05 + self['ZMAXIS']) kw.setdefault('linewidth', 1) if not only2D: fig = pyplot.gcf() kw.pop('ax', None) # This option can't be used in this context, so remove it to avoid trouble. pyplot.subplots_adjust(wspace=0.23) if usePsi: xName = '$\\psi$' x = np.linspace(0, 1, len(self['PRES'])) else: xName = '$\\rho$' if 'RHOVN' in self and np.sum(self['RHOVN']): x = self['RHOVN'] else: x = self['AuxQuantities']['RHO'] ax = pyplot.subplot(232) ax.plot(x, self['PRES'], **kw) kw.setdefault('color', ax.lines[-1].get_color()) ax.set_title(r'$\,$ Pressure') ax.ticklabel_format(style='sci', scilimits=(-1, 2), axis='y') pyplot.setp(ax.get_xticklabels(), visible=False) ax = pyplot.subplot(233, sharex=ax) ax.plot(x, self['QPSI'], **kw) ax.set_title('$q$ Safety factor') ax.ticklabel_format(style='sci', scilimits=(-1, 2), axis='y') if label: try: ax.legend(labelspacing=0.2, loc=0).draggable(state=True) except Exception: pass pyplot.setp(ax.get_xticklabels(), visible=False) ax = pyplot.subplot(235, sharex=ax) ax.plot(x, self['PPRIME'], **kw) ax.set_title(r"$P\,^\prime$ source function") ax.ticklabel_format(style='sci', scilimits=(-1, 2), axis='y') pyplot.xlabel(xName) ax = pyplot.subplot(236, sharex=ax) ax.plot(x, self['FFPRIM'], **kw) ax.set_title(r"$FF\,^\prime$ source function") ax.ticklabel_format(style='sci', scilimits=(-1, 2), axis='y') pyplot.xlabel(xName) ax = pyplot.subplot(131, aspect='equal') ax.set_frame_on(False) ax.xaxis.set_ticks_position('bottom') ax.yaxis.set_ticks_position('left') else: ax = kw.pop('ax', pyplot.gca()) plot2D(ax, **kw)
[docs]class OMFITo4gta(SortedDict, OMFITascii): r""" OMFIT class used to interface with GATO o4gta files :param filename: filename passed to OMFITascii class :param \**kw: keyword dictionary passed to OMFITascii class """ def __init__(self, filename, **kw): SortedDict.__init__(self) OMFITascii.__init__(self, filename, **kw) self.dynaLoad = True
[docs] @dynaLoad def load(self): with open(self.filename, 'r') as f: d = f.readlines() passed_amp = False for i, ival in enumerate(d): if passed_amp: if 'Relative' in ival or 'Imaginary Amplitudes' in ival or 'Fourier' in ival: break if 'psval' in ival: x = ival.split() for mode in x: try: mode = int(mode) except ValueError: pass self[mode] = [] elif ival != '': xx = ival.split() if len(xx): for j, mode in enumerate(x): try: mode = int(mode) except ValueError: pass self[mode].append(float(xx[j])) if 'Real Amplitudes' in ival: passed_amp = True for item in self: if isinstance(self[item], list): self[item] = np.array(self[item])
@property def modes(self): return [item for item in self if isinstance(item, int)]
[docs] def plot(self, asinh_transform=1.0): """ :param asinh_transform: apply `xi_mod = arcsinh(xi * asinh_transform)` transformation """ from matplotlib import pyplot tmp = [] for mode in self.modes: tmp.append(self[mode]) tmp = np.array(tmp) if asinh_transform: tmp = arcsinh(tmp * asinh_transform) M = np.max(abs(tmp.flatten())) C = pyplot.pcolor(self['psval'], self.modes, tmp, vmin=-M, vmax=M, cmap=pyplot.get_cmap('seismic')) pyplot.colorbar(C) pyplot.xlabel(r'$\psi$') pyplot.title(r'$\xi$') pyplot.ylabel('m') pyplot.legend(loc=0)
[docs]def OMFITgatohelp(*args, **kw): r""" generates help dictionary for GATO namelist starting from smaphXXX.f :param \*args: arguments passed to OMFITascii object :param \**kw: keyworkd arguments passed to OMFITascii object :return: OMFITjson with parsed help definitions and default values """ obj = OMFITascii(*args, **kw) with open(obj.filename, 'r') as f: lines = [re.sub('\r', '', x).rstrip('\\') for x in f.read().split('\n')] entries = OMFITjson('GATO_help.json') state = None mapper = {} npxdm = None for line in lines: if line.startswith('c Physical Case:'): state = 'definitions' if line.startswith('c 1.0 Define constants and default parameters'): state = None if line.startswith('c 1.4 Set namelist defaults'): state = 'defaults' if line.startswith('c 2.0 Read namelist input file'): state = None if npxdm is None and line.strip().startswith('parameter (npx='): npxdm = eval(re.sub(r'\s+parameter\s*\(npx=([0-9]+),.*', r'\1', line)) print('npx=%d' % npxdm) if state == 'definitions': if re.match(r'^c\s+[\w\(\)\,]+:\s*\w+.*', line): entry = re.sub(r'^c\s+([\w\(\)\,]+):\s+\w+.*', r'\1', line) entry = entry.split('(')[0] entries[group][entry] = {} entries[group][entry]['description'] = re.sub(r'^c\s+[\w\(\)\,]+:\s+(\w+.*)', r'\1', line).strip() + '\n' entries[group][entry]['default'] = 0 mapper[entry] = group elif re.match(r'^c \w.*:$', line.strip()): group = re.sub(r'^c\s+(\w.*):', r'\1', line) print('* ' + group) entries[group] = {} else: entries[group][entry]['description'] += re.sub(r'^c\s+(.+)', r'\1', line).strip() + '\n' if state == 'defaults': if line.startswith('c') or not len(line.strip()) or re.match(r'^[0-9].*', line.strip()): continue entry, value = line.strip().split('=') entry = entry.strip() value = value.strip() max0 = lambda *x: max(x) min0 = lambda *x: min(x) ncxdm = 2 * npxdm npk = npxdm try: locals()[entry] = eval(value) entries[mapper[entry]][entry]['default'] = eval(value) except KeyError: pass return entries
[docs]class OMFITgatoDiagnostic(SortedDict, OMFITascii): r""" OMFIT class used to interface with GATO diagnostic.dat files :param filename: filename passed to OMFITascii class :param \**kw: keyword dictionary passed to OMFITascii class """ def __init__(self, filename, **kw): SortedDict.__init__(self) OMFITascii.__init__(self, filename, **kw) self.dynaLoad = True
[docs] @dynaLoad def load(self): with open(self.filename, 'r') as f: d = f.readlines() # Read lines and put data into lists # Acting on tmp dictionary seems to be faster than acting on self here tmpDict = {} item = 'beginning' tmpDict[item] = [] for i, line in enumerate(d): if 'i=1' in line or 'j=1' in line: item = ''.join(line.split()) tmpDict[item] = [] else: vals = line.split() for mode in vals: try: tmpDict[item].append(float(mode)) except ValueError: pass # Reshape 1D list into 1D or 2D array and rename for item in tmpDict: if 'j=1' in item and 'i=1' in item: try: jpsi = int(item.split('j=1,')[1].split(',')[0]) itht = int(item.split('i=1,')[1]) tmpDict[item] = np.reshape(np.array(tmpDict[item]), (itht, jpsi)) item_short = item.split(',j=1')[0] self[item_short] = tmpDict[item] except Exception: self[item] = 'Could not be parsed' elif 'i=1' in item: tmpDict[item] = np.array(tmpDict[item]) item_short = item.split(',i=1')[0] self[item_short] = tmpDict[item] elif 'j=1' in item: tmpDict[item] = np.array(tmpDict[item]) item_short = item.split(',j=1')[0] self[item_short] = tmpDict[item] else: self[item] = 'Could not be parsed'
[docs] def plot(self, item='xreal(j,i)'): """ :param item: is plotting item against R, Z grid """ from matplotlib import pyplot fig = pyplot.figure() ax = fig.add_subplot(111) pyplot.plot(self['rwall(i)'], self['zwall(i)'], color='k') C = pyplot.contourf(self['rcc(j,i)'], self['zcc(j,i)'], self[item], cmap=pyplot.get_cmap('seismic')) pyplot.colorbar(C) ax.set_aspect('equal') pyplot.xlabel(r'$R (m)$') pyplot.ylabel(r'$Z (m)$') pyplot.title(item)
[docs] def plot_modes(self, item='xreal(j,i)', tolerance=0.25): """ :param item: plotting item against R, Z grid :param tolerance: size of mode needed to be plotted relative to the largest mode """ psi = self['psimesh(j,i)'][0, :] xfft = np.fft.fft(self[item], axis=0) xfft_ar = abs(np.real(xfft[:, :])) tlen = len(xfft[0, :]) freqs = np.fft.fftfreq(tlen, d=1.0 / tlen) maxval = max(np.sum(xfft, axis=1)) for i in range(tlen): if np.sum(xfft_ar[i, :]) > tolerance * maxval: pyplot.plot(psi, xfft_ar[i, :], label=freqs[i]) pyplot.xlabel(r'$\psi$') pyplot.ylabel(item) pyplot.legend(loc='upper left')
############################################ if '__main__' == __name__: test_classes_main_header() tmp1 = OMFITdskgato(OMFITsrc + '/../samples/29.dskgato') from matplotlib import pyplot pyplot.figure() tmp1.plot(usePsi=False) pyplot.figure() tmp1.plot(usePsi=True) pyplot.show()