try:
# framework is running
from .startup_choice import *
except ImportError as _excp:
# class is imported by itself
if (
'attempted relative import with no known parent package' in str(_excp)
or 'No module named \'omfit_classes\'' in str(_excp)
or "No module named '__main__.startup_choice'" in str(_excp)
):
from startup_choice import *
else:
raise
from omfit_classes.omfit_base import OMFITcollection
from omfit_classes.omfit_ascii import OMFITascii
from omfit_classes.utils_math import integz
from omfit_classes.omfit_gapy import OMFITgacode, OMFITinputprofiles, OMFITinputgacode
from omfit_classes.omfit_json import SettingsName
# Explicit imports
import numpy as np
from scipy import interpolate
try:
import pygacode.tgyro.data
import pygacode.gyro.data_plot
import pygacode.cgyro.data_plot
import pygacode.neo.data
except ImportError:
__all__ = []
else:
__all__ = ['OMFITgacode', 'OMFITtgyro', 'OMFITgyro', 'OMFITcgyro', 'OMFITneo', 'copy_gacode_module_settings']
def om_fig(func):
"""This is a decorator to pass a clean figure to the gacode plot routines"""
def new_func(*args, **kw):
if 'fig' not in kw or kw['fig'] is None:
kw['fig'] = pyplot.figure()
return func(*args, **kw)
return new_func
[docs] class OMFITtgyro(SortedDict, OMFITobject, pygacode.tgyro.data.tgyrodata):
"""
Class used to interface with TGYRO results directory
:param filename: directory where the TGYRO result files are stored.
The data in this directory will be loaded upon creation of the object.
"""
def __init__(self, filename=None, **kw):
kw['file_type'] = 'dir'
OMFITobject.__init__(self, filename, **kw)
self.OMFITproperties.pop('file_type', 'dir')
SortedDict.__init__(self, sorted=True)
self.dynaLoad = True
[docs] @dynaLoad
def load(self):
filename = self.filename
pygacode.tgyro.data.tgyrodata.__init__(self, filename, verbose=False)
# The tgyrodata class operates on self.data, this will be handled by __getattr__
self.update(self.data)
delattr(self, 'data')
# Add (derived) quantities
# minor radius
self['a'] = self['rmin'][0][1] / self['r/a'][0][1]
# Derivative on sparse TGYRO grid: a*d(w0)/dr (Note conversion of c_s, below, from m/s to cm/s)
self['a/dw0dr'] = -self['a*gamma_p/cs'] * (100 * self['c_s']) / self['a'] / self['rmaj/a']
# Rotation on sparse TGYRO grid: w0 (Note conversion of c_s, below, from m/s to cm/s)
self['w0'] = self['M=wR/cs'] * (100 * self['c_s']) / (self['a'] * self['rmaj/a'])
# Convergence
if 'convergence' not in self:
convergence_items = []
for item in list(self.keys()):
if re.match('E([emp]flux_.*)', item):
convergence_items.append(item)
# new TGYRO format with multiple ions particle evolution
if len(convergence_items):
for k, item in enumerate(convergence_items):
if k == 0:
self['convergence'] = self[item].sum(axis=1)
else:
self['convergence'] += self[item].sum(axis=1)
# old TGYRO format, with electron particle evolution
else:
if 'residual' not in self:
self.get_residual()
if 'convergence' not in self:
self['convergence'] = self['residual'][1::2, :, :].sum(axis=0).sum(axis=1)
# input.gacode is not used by TGYROData, so it may not be in the directory
if os.path.exists(filename + os.sep + 'input.gacode'):
tmp = OMFITinputgacode(filename + os.sep + 'input.gacode')
tmp.load()
self.dataProfiles = dict()
self.dataProfiles.update(tmp)
else:
self.dataProfiles = None
# if multiple input.gacode.XXX are found, read them in
# the 0th profile is always input.gacode (if it is there, and it should!)
# the last profiles is always input.gacode.new (if it is there)
input_gacode_evo = self['profiles_evolution'] = OMFITcollection()
if os.path.exists(self.filename + os.sep + 'input.gacode'):
input_gacode_evo[0] = OMFITinputgacode(self.filename + os.sep + 'input.gacode')
tmp = glob.glob(self.filename + os.sep + 'input.gacode.[0-9]*')
tmp = sorted(tmp, key=lambda x: int(x.split('.')[-1]))
for k, item in enumerate(tmp):
input_gacode_evo[k + 1] = OMFITinputgacode(item)
if os.path.exists(self.filename + os.sep + 'input.gacode.new'):
input_gacode_evo[len(input_gacode_evo)] = OMFITinputgacode(self.filename + os.sep + 'input.gacode.new')
# parsing of 'out.tgyro.ped' if it exists and is filled with something
if (
'betan' not in self
and os.path.exists(self.filename + os.sep + 'out.tgyro.ped')
and os.stat(self.filename + os.sep + 'out.tgyro.ped').st_size
):
with open(self.filename + os.sep + 'out.tgyro.ped', 'r') as f:
tmp = f.readlines()
data = np.array([list(map(float, x.split())) for x in tmp[2::3]])
for k, name in enumerate(tmp[0].split()):
if name in ['r_top/a', 'betan', 'psi_top', 'p_top', 't_top', 'n_top']:
self[name] = data[:, k]
# parsing out.tgyro.run
if os.path.exists(self.filename + os.sep + 'out.tgyro.run'):
with open(self.filename + os.sep + 'out.tgyro.run', 'r') as f:
self['run_log'] = f.read()
[docs] def plot_profiles_evolution(self, quantity=None, x='rho', ax=None, colors=None):
"""
plot evolution in input.gacode.XXX of the quantity
:param quantity: quantity to be plotted (plots Te, Ti, ne, omega0 if `quantity is None`)
:param x: x axis, typically `rho` or `rmin`
:param ax: axis to plot in (only if `quantity is not None`)
:param colors:
"""
if 'profiles_evolution' not in self:
printw('input.gacode.{0...} files not in TGYRO output directory')
return
what = [['Te', 'Ti_1'], [], ['ne'] + ['ni_%d' % k for k in range(1, 11)], ['omega0']]
if not quantity:
fig = figure()
for k, items in enumerate(what):
colors0 = colors
if colors is None:
colors0 = list(default_colorblind_line_cycle)
if not quantity:
ax = pyplot.subplot(2, 2, k + 1)
elif ax is None:
ax = pyplot.gca()
i = 0
for k1, item in enumerate(items):
if quantity and item != quantity:
continue
itemName = item
if re.match('.*_[0-9]+$', item):
if int(item.split('_')[1]) in self['profiles_evolution'][0]['IONS']:
ion = self['profiles_evolution'][0]['IONS'][int(item.split('_')[1])]
itemName = item.split('_')[0][:-1] + ion[0] + ' ' + ion[3]
else:
continue
color = colors0[i]['color']
ax.plot(nan, nan, color=color, label=itemName)
ax.plot(nan, nan, color=color, ls='--', label=itemName + ' experiment')
for line in list(self['profiles_evolution'].keys()):
xx = self['profiles_evolution'][line][x]
yy = self['profiles_evolution'][line][item].copy()
ax.plot(xx, yy, color=color, alpha=(line + 1.0) / len(self['profiles_evolution']))
ax.plot(self['profiles_evolution'][0][x], self['profiles_evolution'][item][:, 0], color=color, ls='--')
i += 1
try:
ax.legend(loc=0).draggable(True)
except Exception:
pass
if not quantity:
pyplot.subplot(2, 2, 2)
pyplot.semilogy(self['convergence'], color='k')
@dynaLoad
def __getattr__(self, attr):
if attr == 'data':
return self # The TGYROdata class operates on self.data
elif attr == 'n_iterations':
return self.data['rho'].shape[0] - 1
else:
raise AttributeError('bad attribute `%s`' % attr)
[docs] def get_residual(self):
def get_gacode_residual():
fn = 'out.tgyro.residual'
with open(self.dir + '/' + fn, 'r') as f:
data = f.readlines()
if len(data) == 0:
if self.n_iterations == 0:
print("WARNING: out.tgyro.residual is blank")
return 0
raise OSError("out.tgyro.residual is blank")
# Data dimensions
nr = self.n_r
nb = self.n_iterations + 1
nc = 1 + 2 * self.n_evolve
numdata = np.zeros((nc, nb, nr - 1), dtype=float)
cdata = np.zeros((nb), dtype=float)
new_format = os.path.exists(self.dir + '/out.tgyro.profile_i1')
nh = [1, 2][new_format]
if not new_format:
nr = nr - 1
self.data['convergence'] = []
for ib in range(nb):
try:
if new_format:
tags = data[ib * (nr + nh)].split() # Contains overall residual
cdata[ib] = float(data[ib * (nr + nh) + nh - 1].split(':')[1].split()[1])
except Exception:
raise ValueError("out.tgyro.residual shorter than expected")
for ir in range(nr - 1):
row = data[ib * (nr + nh) + ir + nh].split()
for ic in range(nc):
numdata[ic, ib, ir] = row[ic]
self.data['convergence'] = cdata
self.data['residual'] = numdata
return self.data
valid = False
data = get_gacode_residual()
if data is not 0:
valid = True
if not valid:
# Data dimensions
nr = self.n_r
nb = self.n_iterations + 1
# 9 = 1+2*n_evolve, where n_evolve=4 (ti,te,ne,er)
nc = 1 + 2 * self.n_evolve
numdata = np.zeros((nc, nb, nr - 1), dtype=float)
self.data['residual'] = numdata * np.nan
[docs] def sprofile(self, what, nf0=201, x='r/a', verbose=False):
"""
This function returns smooth profiles on a uniform ['r/a', 'rho', 'rmin', 'rmaj/a'] grid
:param what: what profile to return ['r/a', 'rho', 'rmin', 'rmaj/a', 'te', 'a/LTe', 'ne', 'a/Lne' , 'ti', 'a/LTi', 'ni', 'a/Lni', 'M=wR/cs']
:param nf0: number of points
:param x: return profiles equally spaced in 'r/a', 'rho', 'rmin', 'rmaj/a'
:param verbose: plotting of the `what` quantity
:return: `what` quantity (nf0 x niterations) at the `x` locations
Note: all TGYRO quantities are naturally defined over 'r/a'
**example**
>> tmp=OMFITtgyro('...')
>> x='rho'
>> y='te'
>> pyplot.plot(tmp[x].T,tmp[y].T,'or')
>> X=tmp.sprofile(x, nf0=201, x=x, verbose=False)
>> Y=tmp.sprofile(y, nf0=201, x=x, verbose=False)
>> pyplot.plot(X,Y,'b',alpha=0.25)
"""
n = self.n_iterations
# rf0 contains the value of `x` uniformly spaced
rf0 = np.linspace(self.data[x][0][0], self.data[x][0][-1], int(nf0 * self.data[x][0][-1]))
if what == x:
return rf0
# rf0 contains the value of r/a uniformly spaced in 'x'
if x != 'r/a':
rf0 = interpolate.interp1d(self.data[x][0], self.data['r/a'][0])(rf0)
if what in ['r/a', 'rho', 'rmin', 'rmaj/a']:
return interpolate.interp1d(self.data['r/a'][0], self.data[what][0])(rf0)
quantity = {'ne': 'a/Lne', 'te': 'a/Lte', 'w0': 'a/dw0dr'}
quantityScaleLen = {'a/Lne': 'ne', 'a/Lte': 'te', 'w0': 'a/dw0dr'}
for spec in range(1, 10):
for quant in ['t', 'n']:
quantity['%si%d' % (quant, spec)] = 'a/L%si%d' % (quant, spec)
quantityScaleLen['a/L%si%d' % (quant, spec)] = '%si%d' % (quant, spec)
pf0 = np.zeros((len(rf0), n))
for l in range(n):
if what == 'w0':
pf0[:, l] = interpolate.interp1d(
self.data['r/a'][0], integrate.cumtrapz(self.data[quantity[what]][l], self.data['r/a'][l], initial=0), kind=1
)(rf0)
pf0[:, l] = pf0[:, l] - pf0[-1, l] + self.data[what][l][-1]
elif what in quantity:
pf0[:, l] = integz(
self.data['r/a'][l], self.data[quantity[what]][l], self.data['r/a'][l][-1], self.data[what][l][-1], rf0
)
elif what in quantityScaleLen:
pf0[:, l] = interpolate.interp1d(self.data['r/a'][0], self.data[what][l])(rf0)
else:
raise ValueError("Quantity '" + what + "' is not in %s" % repr(list(quantity.keys()) + list(quantity.values())))
if verbose:
figure()
plotc(rf0, pf0)
pyplot.ylabel(what)
pyplot.xlabel(r'$' + x + '$')
return pf0
[docs] def jacobian(self, return_matrix=False):
"""
:param return_matrix: return jacobian as dictionary or matrix
:return: dictionary with the jacobians of the last iteration as calculated by TGYRO
"""
data = {}
with open(self.filename + os.sep + 'out.tgyro.jacobian', 'r') as f:
lines = f.readlines()
for line in lines:
if line.startswith('r/a'):
pass
else:
data.setdefault(line.split()[0], []).append(float(line.split()[1]))
for k in data:
data[k] = np.array(data[k])
if not return_matrix:
return data
else:
neq = 0
X = []
Y = []
for ky, y in enumerate(['Qe', 'Qi', 'Ge', 'Pi']):
for kx, x in enumerate(['zTe', 'zTi', 'zne', 'w0']):
if 'd%s/d%s' % (y, x) in data:
X.append(x)
Y.append(y)
neq += 1
neq = int(np.sqrt(neq))
X = unsorted_unique(X)
Y = unsorted_unique(Y)
d = np.zeros((neq, neq, len(self['rho'][0, :]) - 1))
for rr in range(len(self['rho'][0, :]) - 1):
for ky, y in enumerate(Y):
for kx, x in enumerate(X):
d[ky, kx, rr] = data['d%s/d%s' % (y, x)][rr]
for ky, y in enumerate(Y):
n = abs(d[ky, ky, rr])
d[ky, :, rr] = d[ky, :, rr] / n
d[:, ky, rr] = d[:, ky, rr] / n
d[ky, ky, rr] = 1.0
return d
[docs] @dynaLoad
def plot(self, **kw):
"""
This function plots ne, ni, te, ti for every iteration run by TGYRO
:return: None
"""
pyplot.clf()
pyplot.gcf().canvas.set_window_title('TGYRO results')
# whatTgyroDict:
# keys will be the titles of each subplot
# ['name'][0] -> input.gacode variable name (output.dataProfiles)
# ['name'][1] -> out.tgyro.* variable name (output.sprofile and in output.keys())
whatTgyroDict = SortedDict()
whatTgyroDict['$n_e$'] = {'normalization': [1, 1e13], 'unit': '$10^{19}/m^3$', 'name': ['ne', 'ne']}
whatTgyroDict['$n_{i1}$'] = {'normalization': [1, 1e13], 'unit': '$10^{19}/m^3$', 'name': ['ni_1', 'ni1']}
whatTgyroDict['convergence'] = {}
whatTgyroDict['$T_e$'] = {'normalization': [1, 1], 'unit': '$keV$', 'name': ['Te', 'te']}
whatTgyroDict['$T_i$'] = {'normalization': [1, 1], 'unit': '$keV$', 'name': ['Ti_1', 'ti1']}
whatTgyroDict[r'$\omega_0$'] = {'normalization': [1e3, 1e3], 'unit': '$krad/s$', 'name': ['omega0', 'w0']}
for k, whatTgyro in enumerate(whatTgyroDict.keys()):
pyplot.gcf()
pyplot.subplot(2, 3, k + 1)
if whatTgyro == 'convergence':
pyplot.semilogy(self['convergence'], color='k')
pyplot.xlim([0, len(self['convergence']) - 1])
pyplot.title('Convergence')
continue
if self.dataProfiles is not None:
pyplot.plot(
self.dataProfiles['rho'],
self.dataProfiles[whatTgyroDict[whatTgyro]['name'][0]] / whatTgyroDict[whatTgyro]['normalization'][0],
'k--',
linewidth=3,
)
plotc(
self.sprofile('rho', nf0=201, x='rho'),
self.sprofile(whatTgyroDict[whatTgyro]['name'][1], nf0=201, x='rho') / whatTgyroDict[whatTgyro]['normalization'][1],
)
pyplot.plot(self['rho'].T, self[whatTgyroDict[whatTgyro]['name'][1]].T / whatTgyroDict[whatTgyro]['normalization'][1], '.r')
if k in [3, 4, 5]:
pyplot.xlabel('$\\rho$')
pyplot.title('%s [%s]' % (whatTgyro, whatTgyroDict[whatTgyro]['unit']))
[docs] def calcQ(self, it_index=-1):
"""
Calculate and return the fusion energy gain factor, Q,
assuming D+T->alpha+neutron is the main reaction
:param it_index: Indicates for which iteration to calculate Q
"""
# Factor of 5 in following is because total fusion power = 5*alpha power, correct for DT
return (
(self['p_i_fus'][it_index, -1] + self['p_e_fus'][it_index, -1])
* 5
/ (self['p_i_aux'][it_index, -1] + self['p_e_aux'][it_index, -1])
)
[docs] class OMFITgyro(SortedDict, OMFITobject, pygacode.gyro.data_plot.gyrodata_plot):
"""
Class used to interface with GYRO results directory
This class extends the OMFITgyro class with the save/load methods of the OMFITpath class
so that the save/load carries all the original files from the GYRO output
:param filename: directory where the GYRO result files are stored.
The data in this directory will be loaded upon creation of the object.
:param extra_files: Any extra files that should be downloaded from the remote location
:param test_mode: Don't raise an exception if out.gyro.t is not present (and abort loading at that point)
"""
def __init__(self, filename=None, extra_files=[], test_mode=False, **kw):
if isinstance(filename, (list, tuple)):
kw['tunnel'] = filename[2]
kw['server'] = filename[1]
filename = filename[0]
if ',' not in filename:
# output in alphabetical order
# This is copied from an ls for reg01 with three fields for phi, A|| and Aperp as well as nl01
outputs = [
'bin.gyro.field_rms',
'bin.gyro.gbflux_i',
'bin.gyro.gbflux_n',
'bin.gyro.kxkyspec',
'bin.gyro.moment_u',
'bin.gyro.moment_zero',
'gyrotest_flag',
'halt',
'input.gyro',
'input.gyro.gen',
'out.gyro.balloon_a',
'out.gyro.balloon_aperp',
'out.gyro.balloon_epar',
'out.gyro.balloon_phi',
'out.gyro.efficiency',
'out.gyro.error',
'out.gyro.freq',
'out.gyro.geometry_arrays',
'out.gyro.localdump',
'out.gyro.memory',
'out.gyro.phase_space',
'out.gyro.prec',
'out.gyro.profile',
'out.gyro.radial_op',
'out.gyro.run',
'out.gyro.star',
'out.gyro.t',
'out.gyro.timing',
'out.gyro.units',
'out.gyro.version',
'fieldeigen.out',
]
# input.gacode
outputs += ['input.gacode']
# Batch files
outputs += ['batch.src', 'batch.out', 'batch.err']
outputs += extra_files
filename = ','.join([filename + os.sep + x for x in outputs])
kw['file_type'] = 'dir'
OMFITobject.__init__(self, filename, **kw)
self.OMFITproperties.pop('file_type', 'dir')
SortedDict.__init__(self, sorted=True)
self.test_mode = test_mode
self.dynaLoad = True
[docs] @dynaLoad
def load(self):
filename = self.filename
if os.path.exists(filename + '/input.gyro.gen'):
self['input.gyro.gen'] = OMFITgacode(filename + '/input.gyro.gen')
if os.path.exists(filename + '/out.gyro.run'):
self['out.gyro.run'] = OMFITascii(filename + '/out.gyro.run')
if os.path.exists(filename + '/out.gyro.version'):
self['out.gyro.version'] = OMFITascii(filename + '/out.gyro.version')
if os.path.exists(filename + '/out.gyro.efficiency'):
self['out.gyro.efficiency'] = OMFITascii(filename + '/out.gyro.efficiency')
if os.path.exists(filename + '/batch.src'):
self['batch.src'] = OMFITascii(filename + '/batch.src')
if os.path.exists(filename + '/batch.out'):
self['batch.out'] = OMFITascii(filename + '/batch.out')
if os.path.exists(filename + '/batch.err'):
self['batch.err'] = OMFITascii(filename + '/batch.err')
if not os.path.exists(filename + '/out.gyro.t') and self.test_mode:
self.dynaLoad = False
printw('No out.gyro.t file: aborting the load')
return
pygacode.gyro.data.GYROData.__init__(self, filename)
# create keys from self attributes
# add input.gacode as object to gyro class here if needed
# common variables
self['originalFilename'] = self.originalFilename
self['profile'] = self.profile
self['geometry'] = self.geometry
self['n_r'] = self.profile['n_x']
self['n_bnd'] = self.profile['n_explicit_damp']
n_n = self.profile['n_n']
tor_n = self.profile['n0'] + self.profile['d_n'] * np.linspace(0, n_n - 1, n_n)
self['tor_n'] = tor_n.astype(int)
self['kyrhos'] = self.profile['kt_rho']
self['n_n'] = n_n
self['n_theta_plot'] = self.profile['n_theta_plot']
self['r'] = self.profile['r']
self['t'] = self.t['(c_s/a)t']
self['n_time'] = self.t['n_time']
# units
self['units'] = dict()
self['units']['m_ref'] = self.units[0] # kg
self['units']['b_unit'] = self.units[1] # T
self['units']['a'] = self.units[2] # m
self['units']['csD/a'] = self.units[3] # 1/s
self['units']['csD'] = self.units[4] # m/s
self['units']['Te'] = self.units[5] # keV
self['units']['ne'] = self.units[6] # 10^19/m^3
self['units']['rho_sD'] = self.units[7] # m
self['units']['chi_gBD'] = self.units[8] # m^2/s
self['units']['Gamma_gBD'] = self.units[9] # 0.6244e22/m^2/s = MW/keV/m^2
self['units']['Q_gBD'] = self.units[10] # MW/m^2
self['units']['Pi_gBD'] = self.units[11] # Nm/m^2
self['units']['S_gBD'] = self.units[12] # MW/m^3
# tags
self['tagfield'] = self.tagfield
self['tagfieldtext'] = self.tagfieldtext[0 : self.profile['n_field']]
self['tagmom'] = self.tagmom
self['tagspec'] = self.tagspec
# create a Boolean flag indicating whether run is linear or nonlinear
if self.profile['nonlinear_flag'] == 0:
self['nonlinear_run'] = False
else:
self['nonlinear_run'] = True
if os.path.exists(self.filename + '/fieldeigen.out'):
self['eigensolver'] = True
else:
self['eigensolver'] = False
# fluctuations
self['flucs'] = dict()
# load other results based on whether run is nonlinear or linear (either initial value or eigen solver)
if self['nonlinear_run']:
# load n=0 and n>0 potential
self['field_rms'] = dict()
if hasattr(self, 'field_rms'):
tmp = xarray.DataArray(
self.field_rms / np.average(self['profile']['rho_s']),
coords={'potential': ['n=0', 'n>0'], 't': self['t']},
dims=['potential', 't'],
attrs={'comment': 'NL zonal and n>0 potential'},
)
self['field_rms']['n=0'] = tmp.isel(potential=0)
self['field_rms']['n>0'] = tmp.isel(potential=1)
# load fluxes vs. ky and time
self['flux_t'] = dict()
self['flux_ky'] = dict()
self['flux_r'] = dict()
self.read_gbflux_n()
if hasattr(self, 'gbflux_n'):
tmp = xarray.DataArray(
self.gbflux_n,
coords={
'species': self['tagspec'],
'field': self['tagfieldtext'],
'moment': ['n', 'e', 'v', 's'],
'ky': self['kyrhos'],
't': self['t'],
},
dims=['species', 'field', 'moment', 'ky', 't'],
attrs={'comment': 'NL gyroBohm-normalized fluxes vs (ky,t)'},
)
self['flux_ky']['particle'] = tmp.isel(moment=0)
self['flux_ky']['energy'] = tmp.isel(moment=1)
self['flux_ky']['momentum'] = tmp.isel(moment=2)
self['flux_t']['particle'] = self['flux_ky']['particle'].sum(dim='ky')
self['flux_t']['energy'] = self['flux_ky']['energy'].sum(dim='ky')
self['flux_t']['momentum'] = self['flux_ky']['momentum'].sum(dim='ky')
self.read_gbflux_i()
if hasattr(self, 'gbflux_i'):
tmp = xarray.DataArray(
self.gbflux_i,
coords={
'species': self['tagspec'],
'field': self['tagfieldtext'],
'moment': ['n', 'e', 'v', 's'],
'r': self['r'],
't': self['t'],
},
dims=['species', 'field', 'moment', 'r', 't'],
attrs={'comment': 'NL gyroBohm-normalized fluxes vs (r,t)'},
)
self['flux_r']['particle'] = tmp.isel(moment=0)
self['flux_r']['energy'] = tmp.isel(moment=1)
self['flux_r']['momentum'] = tmp.isel(moment=2)
else:
# load frequency
self['freq'] = dict()
if hasattr(self, 'freq'):
self['freq']['omega'] = xarray.DataArray(
self.freq['(a/c_s)w'], coords={'ky': self['kyrhos'], 't': self['t']}, dims=['ky', 't']
)
self['freq']['gamma'] = xarray.DataArray(
self.freq['(a/c_s)gamma'], coords={'ky': self['kyrhos'], 't': self['t']}, dims=['ky', 't']
)
self['freq']['omega_err'] = xarray.DataArray(
self.freq['err(a/c_s)w'], coords={'ky': self['kyrhos'], 't': self['t']}, dims=['ky', 't']
)
self['freq']['gamma_err'] = xarray.DataArray(
self.freq['err(a/c_s)gamma'], coords={'ky': self['kyrhos'], 't': self['t']}, dims=['ky', 't']
)
# eigenvalue solver
if self['eigensolver']:
data = np.loadtxt(self.filename + '/fieldeigen.out')
self['t'] = np.arange(data.shape[0])
self['n_time'] = data.shape[0]
self['freq']['gamma'] = xarray.DataArray(
np.atleast_2d(data[:, 0]),
coords={'ky': self['kyrhos'], 't': self['t']},
dims=['ky', 't'],
attrs={'comment': 'Eigenvalue growth rate'},
)
self['freq']['omega'] = xarray.DataArray(
np.atleast_2d(data[:, 1]),
coords={'ky': self['kyrhos'], 't': self['t']},
dims=['ky', 't'],
attrs={'comment': 'Eigenvalue frequency'},
)
self['freq']['eigensolver_err'] = xarray.DataArray(
np.atleast_2d(data[:, 3]),
coords={'ky': self['kyrhos'], 't': self['t']},
dims=['ky', 't'],
attrs={'comment': 'Eigensolver error'},
)
del self['freq']['gamma_err']
del self['freq']['omega_err']
# load ballooning modes
if self['n_n'] == 1:
try:
self.read_balloon()
n_p = self.profile['n_x'] / self.profile['box_multiplier']
n_ang = self.profile['n_theta_plot'] * n_p
x_ball = -(1.0 + n_p) + 2.0 * n_p * np.arange(n_ang) / float(n_ang) # theta_balloon/pi
self['balloon'] = {'theta_b_over_pi': x_ball}
except Exception:
self.balloon = dict()
print('no ballooning mode files')
if self['eigensolver']:
balloon_field = np.zeros((int(n_ang), self['n_time']), dtype=complex)
if 'balloon_phi' in self.balloon:
balloon_field[:, -1] = self.balloon['balloon_phi'][:, 0, -1]
self['balloon']['balloon_phi'] = xarray.DataArray(
balloon_field,
coords={'theta_b_over_pi': self['balloon']['theta_b_over_pi'], 't': self['t']},
dims=['theta_b_over_pi', 't'],
)
if 'balloon_a' in self.balloon:
balloon_field[:, -1] = self.balloon['balloon_a'][:, 0, -1]
self['balloon']['balloon_apar'] = xarray.DataArray(
balloon_field,
coords={'theta_b_over_pi': self['balloon']['theta_b_over_pi'], 't': self['t']},
dims=['theta_b_over_pi', 't'],
)
if 'balloon_aperp' in self.balloon:
balloon_field[:, -1] = self.balloon['balloon_aperp'][:, 0, -1]
self['balloon']['balloon_bpar'] = xarray.DataArray(
balloon_field,
coords={'theta_b_over_pi': self['balloon']['theta_b_over_pi'], 't': self['t']},
dims=['theta_b_over_pi', 't'],
)
if 'balloon_epar' in self.balloon:
balloon_field[:, -1] = self.balloon['balloon_epar'][:, 0, -1]
self['balloon']['balloon_epar'] = xarray.DataArray(
balloon_field,
coords={'theta_b_over_pi': self['balloon']['theta_b_over_pi'], 't': self['t']},
dims=['theta_b_over_pi', 't'],
)
else:
if 'balloon_phi' in self.balloon:
self['balloon']['balloon_phi'] = xarray.DataArray(
self.balloon['balloon_phi'][:, 0, :],
coords={'theta_b_over_pi': self['balloon']['theta_b_over_pi'], 't': self['t']},
dims=['theta_b_over_pi', 't'],
)
if 'balloon_a' in self.balloon:
self['balloon']['balloon_apar'] = xarray.DataArray(
self.balloon['balloon_a'][:, 0, :],
coords={'theta_b_over_pi': self['balloon']['theta_b_over_pi'], 't': self['t']},
dims=['theta_b_over_pi', 't'],
)
if 'balloon_aperp' in self.balloon:
self['balloon']['balloon_bpar'] = xarray.DataArray(
self.balloon['balloon_aperp'][:, 0, :],
coords={'theta_b_over_pi': self['balloon']['theta_b_over_pi'], 't': self['t']},
dims=['theta_b_over_pi', 't'],
)
if 'balloon_epar' in self.balloon:
self['balloon']['balloon_epar'] = xarray.DataArray(
self.balloon['balloon_epar'][:, 0, :],
coords={'theta_b_over_pi': self['balloon']['theta_b_over_pi'], 't': self['t']},
dims=['theta_b_over_pi', 't'],
)
# load quasilinear flux weights
self['qlflux_ky'] = dict()
self.read_gbflux_i()
if hasattr(self, 'gbflux'):
gbflux = np.zeros((len(self['tagspec']), len(self['tagfieldtext']), 4, self['n_time']))
gbflux[:, :, :, -1] = self.gbflux[:, :, :, -1]
if self['eigensolver']:
tmp = xarray.DataArray(
gbflux,
coords={
'species': self['tagspec'],
'field': self['tagfieldtext'],
'moment': ['n', 'e', 'v', 's'],
't': self['t'],
},
dims=['species', 'field', 'moment', 't'],
attrs={'comment': 'QL gyroBohm-normalized fluxes vs (t)'},
)
else:
tmp = xarray.DataArray(
self.gbflux,
coords={
'species': self['tagspec'],
'field': self['tagfieldtext'],
'moment': ['n', 'e', 'v', 's'],
't': self['t'],
},
dims=['species', 'field', 'moment', 't'],
attrs={'comment': 'QL gyroBohm-normalized fluxes vs (t)'},
)
self['qlflux_ky']['particle'] = tmp.isel(moment=0)
self['qlflux_ky']['energy'] = tmp.isel(moment=1)
self['qlflux_ky']['momentum'] = tmp.isel(moment=2)
self['qlflux_ky']['exchange'] = tmp.isel(moment=3)
for item in dir(pygacode.gyro.data_plot.gyrodata_plot):
func = getattr(pygacode.gyro.data_plot.gyrodata_plot, item)
if item.startswith('plot') and 'fig' in inspect.getfullargspec(func).args:
wrappedfunc = om_fig(func)
setattr(OMFITgyro, item, wrappedfunc)
[docs] class OMFITcgyro(SortedDict, OMFITobject, pygacode.cgyro.data_plot.cgyrodata_plot):
"""
Class used to interface with CGYRO results directory
:param filename: directory where the CGYRO result files are stored.
The data in this directory will be loaded upon creation of the object.
:param extra_files: Any extra files that should be downloaded from the remote location
:param test_mode: Don't raise an exception if out.gyro.t is not present (and abort loading at that point)
"""
def __init__(self, filename=None, extra_files=[], test_mode=False, **kw):
if isinstance(filename, (list, tuple)):
kw['tunnel'] = filename[2]
kw['server'] = filename[1]
filename = filename[0]
if not is_localhost(kw.get('server', 'localhost')):
outputs = [
'input.cgyro',
'input.cgyro.gen',
'out.cgyro.aparb',
'out.cgyro.bparb',
'out.cgyro.egrid',
'out.cgyro.equilibrium',
'out.cgyro.freq',
'out.cgyro.geo',
'out.cgyro.grids',
'out.cgyro.hosts',
'out.cgyro.info',
'out.cgyro.ky_flux',
'out.cgyro.lky_flux_e',
'out.cgyro.lky_flux_n',
'out.cgyro.lky_flux_v',
'out.cgyro.memory',
'out.cgyro.mpi',
'out.cgyro.phib',
'out.cgyro.prec',
'out.cgyro.res_ver',
'out.cgyro.tag',
'out.cgyro.time',
'out.cgyro.timing',
'out.cgyro.version',
]
# binary files
outputs += [
'bin.cgyro.aparb',
'bin.cgyro.bparb',
'bin.cgyro.freq',
'bin.cgyro.geo',
'bin.cgyro.kxky_apar',
'bin.cgyro.kxky_bpar',
'bin.cgyro.kxky_e',
'bin.cgyro.kxky_flux_e',
'bin.cgyro.kxky_n',
'bin.cgyro.kxky_phi',
'bin.cgyro.ky_flux',
'bin.cgyro.ky_cflux',
'bin.cgyro.ky_flux_e',
'bin.cgyro.ky_flux_n',
'bin.cgyro.ky_flux_v',
'bin.cgyro.phib',
]
# input.gacode
outputs += ['input.gacode']
# batch files
outputs += ['batch.src', 'batch.out', 'batch.err']
outputs += extra_files
filename = ','.join([filename + os.sep + x for x in outputs])
kw['file_type'] = 'dir'
OMFITobject.__init__(self, filename, **kw)
self.OMFITproperties.pop('file_type', 'dir')
SortedDict.__init__(self, sorted=True)
self.test_mode = test_mode
self.dynaLoad = True
[docs] @dynaLoad
def load(self):
filename = self.filename
if os.path.exists(filename + '/input.cgyro.gen'):
self['input.cgyro.gen'] = OMFITgacode(filename + '/input.cgyro.gen')
if os.path.exists(filename + '/out.cgyro.info'):
self['out.cgyro.info'] = OMFITascii(filename + '/out.cgyro.info')
if os.path.exists(filename + '/out.cgyro.mpi'):
self['out.cgyro.mpi'] = OMFITascii(filename + '/out.cgyro.mpi')
if os.path.exists(filename + '/out.cgyro.version'):
self['out.cgyro.version'] = OMFITascii(filename + '/out.cgyro.version')
if os.path.exists(filename + '/batch.src'):
self['batch.src'] = OMFITascii(filename + '/batch.src')
if os.path.exists(filename + '/batch.out'):
self['batch.out'] = OMFITascii(filename + '/batch.out')
if os.path.exists(filename + '/batch.err'):
self['batch.err'] = OMFITascii(filename + '/batch.err')
if not os.path.exists(filename + '/out.cgyro.time') and self.test_mode:
self.dynaLoad = False
printw('No out.cgyro.time file: aborting the load')
return
pygacode.cgyro.data.cgyrodata.__init__(self, filename + '/')
# common variables
self['originalFilename'] = self.originalFilename
self['n_r'] = self.n_radial
self['n_n'] = self.n_n
self['n_species'] = self.n_species
self['n_field'] = self.n_field
self['n_theta'] = self.n_theta
self['n_energy'] = self.n_energy
self['n_xi'] = self.n_xi
self['m_box'] = self.m_box
self['length'] = self.length
self['n_global'] = self.n_global
self['theta_plot'] = self.theta_plot
self['kyrhos'] = self.ky
self['kxrhos'] = self.kx
self['t'] = self.t
self['n_time'] = self.n_time
self['field_tags'] = ['Phi', 'Apar', 'Bpar'][0 : self.n_field]
self['species_tags'] = list(map(str, np.arange(self.n_species) + 1))
# units- need to test if out.cgyro.equilibrium loaded
# most equilibrium parameters in self['input.cgyro.gen']
# what to do about gyroBohm units
self['units'] = dict()
if hasattr(self, 'b_unit'):
self['units']['b_unit'] = self.b_unit # T
self['units']['a'] = self.a_meters # m
self['units']['vth_norm'] = self.vth_norm # m/s
self['units']['Te_norm'] = self.temp_norm # keV
self['units']['ne_norm'] = self.dens_norm # 10^19/m^3
self['units']['mass_norm'] = self.mass_norm # 10^-27 kg^-1?
self['units']['rho_star'] = self.rho_star_norm
self['units']['gamma_gb_norm'] = self.gamma_gb_norm # 10^19 m^-2 s^-1
self['units']['q_gb_norm'] = self.q_gb_norm # MW / m^2
self['units']['pi_gb_norm'] = self.pi_gb_norm # N / m
self['units']['Z'] = self.z
self['units']['mass'] = self.mass # relative to species 1
self['units']['dens'] = self.dens
self['units']['temp'] = self.temp
self['units']['dlnndr'] = self.dlnndr
self['units']['dlntdr'] = self.dlntdr
self['units']['nu'] = self.nu
# create a Boolean flag indicating whether run is linear or nonlinear
if self['input.cgyro.gen']['NONLINEAR_FLAG'] == 0:
self['nonlinear_run'] = False
else:
self['nonlinear_run'] = True
# fluctuations
self['flucs'] = dict()
# CGYRO always saves frequencies, even for NL runs
self['freq'] = dict()
if hasattr(self, 'freq'):
# gamma and omega are arrays of size [n_n, n_time]
tmp = xarray.DataArray(
self.freq, coords={'type': ['omega', 'gamma'], 'ky': self['kyrhos'], 't': self['t']}, dims=['type', 'ky', 't']
)
self['freq']['omega'] = tmp.isel(type=0)
self['freq']['gamma'] = tmp.isel(type=1)
# load other results based on whether run is linear or nonlinear
self.getflux()
if self['nonlinear_run']:
self['flux_t'] = dict()
self['flux_ky'] = dict()
if hasattr(self, 'ky_flux'):
tmp = xarray.DataArray(
self.ky_flux,
coords={
'species': self['species_tags'],
'moment': ['n', 'e', 'v'],
'field': self['field_tags'],
'ky': self['kyrhos'],
't': self['t'],
},
dims=['species', 'moment', 'field', 'ky', 't'],
attrs={'comment': 'NL gyroBohm-normalized fluxes'},
)
self['flux_ky']['particle'] = tmp.isel(moment=0)
self['flux_ky']['energy'] = tmp.isel(moment=1)
self['flux_ky']['momentum'] = tmp.isel(moment=2)
self['flux_t']['particle'] = self['flux_ky']['particle'].sum(dim='ky')
self['flux_t']['energy'] = self['flux_ky']['energy'].sum(dim='ky')
self['flux_t']['momentum'] = self['flux_ky']['momentum'].sum(dim='ky')
else:
# load ballooning modes. IFF n_radial == 1, ZF test, otherwise standard finite-ky
if self.n_radial == 1:
ball_x = self.theta / np.pi
label_x = 'theta_over_pi'
else:
ball_x = self.thetab / np.pi
label_x = 'theta_b_over_pi'
self['balloon'] = {label_x: ball_x}
if hasattr(self, 'phib'):
tmp = self.phib[0, :, :] + 1j * self.phib[1, :, :]
self['balloon']['balloon_phi'] = xarray.DataArray(
tmp, coords={label_x: self['balloon'][label_x], 't': self['t']}, dims=[label_x, 't']
)
if hasattr(self, 'aparb'):
tmp = self.aparb[0, :, :] + 1j * self.aparb[1, :, :]
self['balloon']['balloon_apar'] = xarray.DataArray(
tmp, coords={label_x: self['balloon'][label_x], 't': self['t']}, dims=[label_x, 't']
)
if hasattr(self, 'bparb'):
tmp = self.bparb[0, :, :] + 1j * self.bparb[1, :, :]
self['balloon']['balloon_bpar'] = xarray.DataArray(
tmp, coords={label_x: self['balloon'][label_x], 't': self['t']}, dims=[label_x, 't']
)
# load quasilinear flux weights
self['qlflux_ky'] = dict()
if hasattr(self, 'ky_flux'):
tmp = xarray.DataArray(
self.ky_flux,
coords={
'species': self['species_tags'],
'moment': np.arange(3),
'field': self['field_tags'],
'ky': self['kyrhos'],
't': self['t'],
},
dims=['species', 'moment', 'field', 'ky', 't'],
attrs={'comment': 'quasilinear weights'},
)
# to be compatible with GYRO convention remove the dim=ky for the single ky
tmp = tmp.sum(dim='ky')
self['qlflux_ky']['particle'] = tmp.isel(moment=0)
self['qlflux_ky']['energy'] = tmp.isel(moment=1)
self['qlflux_ky']['momentum'] = tmp.isel(moment=2)
if len(tmp['moment']) == 4:
self['qlflux_ky']['exchange'] = tmp.isel(moment=3)
for item in dir(pygacode.cgyro.data_plot.cgyrodata_plot):
func = getattr(pygacode.cgyro.data_plot.cgyrodata_plot, item)
if item.startswith('plot') and 'fig' in inspect.getfullargspec(func).args:
wrappedfunc = om_fig(func)
setattr(OMFITcgyro, item, wrappedfunc)
[docs] class OMFITneo(SortedDict, OMFITobject, pygacode.neo.data.NEOData):
"""
Class used to interface with NEO results directory
:param filename: directory where the TGYRO result files are stored.
The data in this directory will be loaded upon creation of the object.
"""
def __init__(self, filename, **kw):
kw['file_type'] = 'dir'
OMFITobject.__init__(self, filename, **kw)
self.OMFITproperties.pop('file_type', 'dir')
SortedDict.__init__(self, sorted=True)
self.dynaLoad = True
[docs] @dynaLoad
def load(self):
pygacode.neo.data.NEOData.__init__(self, self.filename)
self['neo_results'] = SortedDict()
for k in [
'equil',
'expnorm',
'grid',
'phi',
'rotation',
'theory',
'theory_nclass',
'transport',
'transport_gv',
'transport_exp',
]:
for key, v in list(getattr(self, k, {}).items()):
if key in self['neo_results'] and np.all(np.atleast_1d(self['neo_results'][key] != v)):
print('%s already added to neo_results' % key)
self['neo_results'][key] = v
self['neo_files'] = SortedDict()
for k in list(self.keys()):
prefixes = ('out.neo', 'input', 'out.expro', 'OMFIT_run_')
if k.startswith(prefixes):
self['neo_files'][k] = self[k]
del self[k]
[docs] def copy_gacode_module_settings(root=None):
"""
Utility function to sync all GACODE modules to use the same working environment
:param root: GACODE module to use as template (if None only sets module['SETTINGS']['SETUP']['branch'])
"""
from omfit_classes.omfit_base import OMFITexpression
default_GACODE_branch = OMFITexpression(
'''
server = root['SETTINGS']['REMOTE_SETUP']['serverPicker']
if is_server(server, ['iris', 'saturn']):
return_variable = '/2022.17'
elif is_server(server, ['omega']):
return_variable = '/2022.17'
elif is_server(server, 'portal'):
return_variable = 'gacode'
elif is_server(server, 'cori'):
return_variable = ''
else:
raise ValueError(f'GACODE installation on {server} has not yet been updated to post_gacode')
'''.strip()
)
for module in ['TGYRO_GACODE', 'TGLF_GACODE', 'GYRO_GACODE', 'PROFILES_GEN_GACODE', 'NEO_GACODE']:
if module not in OMFIT:
OMFIT.loadModule(module, "OMFIT['%s']" % module, withSubmodules=False, quiet=True)
OMFIT[module]['SETTINGS']['SETUP']['branch'] = copy.deepcopy(default_GACODE_branch)
if root is None:
continue
print('Copying executable and branch settings from %s to %s' % (root.ID[:-7], module[:-7]))
OMFIT[module]['SETTINGS']['SETUP']['branch'] = copy.deepcopy(root['SETTINGS']['SETUP']['branch'])
for item in root['SETTINGS']['REMOTE_SETUP']:
if not isinstance(root['SETTINGS']['REMOTE_SETUP'][item], dict):
continue
if item not in OMFIT[module]['SETTINGS']['REMOTE_SETUP']:
OMFIT[module]['SETTINGS']['REMOTE_SETUP'][item] = SettingsName()
for k in root['SETTINGS']['REMOTE_SETUP'][item]:
if k in OMFIT[module]['SETTINGS']['REMOTE_SETUP']['iris']:
OMFIT[module]['SETTINGS']['REMOTE_SETUP'][item][k] = copy.deepcopy(root['SETTINGS']['REMOTE_SETUP'][item][k])
else:
OMFIT[module]['SETTINGS']['REMOTE_SETUP'][item]['environment'] = copy.deepcopy(
root['SETTINGS']['REMOTE_SETUP'][item]['environment']
)
# OMFIT[module]['SETTINGS']['REMOTE_SETUP']['saturn']['executable'] = OMFIT[module]['SETTINGS']['REMOTE_SETUP']['iris'][
# 'executable'
# ]
############################################
if '__main__' == __name__:
test_classes_main_header()
sample_files = glob.glob(OMFITsrc + '/../samples/input*')
eligble_samples = [fn for fn in sample_files if not (fn.endswith('jbs') or '_omfit' in fn)]
fn = eligble_samples[0] # Pick the first one
f0 = OMFITgacode(fn)
f0.load()