try:
# framework is running
from .startup_choice import *
except ImportError as _excp:
# class is imported by itself
if (
'attempted relative import with no known parent package' in str(_excp)
or 'No module named \'omfit_classes\'' in str(_excp)
or "No module named '__main__.startup_choice'" in str(_excp)
):
from startup_choice import *
else:
raise
from omfit_classes.omfit_ascii import OMFITascii
from omfit_classes.omfit_matrix import OMFITmatrix
from omfit_classes.omfit_eqdsk import OMFITgeqdsk
from traceback import extract_stack
from warnings import filterwarnings
import numpy as np
import xarray as xr
import pandas as pd
__all__ = ['OMFITtrip3Dcoil', 'OMFITtrip3Dlines', 'OMFITtrip3Dhits', 'OMFITtrip3Dstart', 'OMFITtrip3Dprobeg']
def foot_on_line(xp, yp, x1, x2, y1, y2):
r"""
Given a point (P) and a segment (1), (2), compute the foot (F) on the line and the point-to-line distance d.
(P) \ (P) \
: | : |
: | d : | d
: | : |
: / : /
(1)==(F)===========(2) (F)------(1)=======(2)
An array of points P is allowed, but there can only be one line segment.
:param xp: numeric
X-coordinate of point P or array of X-coordinates of several points
:param yp: numeric
Y-coordinate of point P or array of Y-coordinates of several points (must be scalar or match the shape of xp)
:param x1: float
X-coordinate of one end of the line segment (point 1)
:param x2: float
X-coordinate of the other end of the line segment (point 2)
:param y1: float
Y-coordinate of one end of the line segment (point 1)
:param y2: float
Y-coordinate of the other end of the line segment (point 2)
:return: tuple
X-coordinate(s) of the foot point as a float or array (matching the shape of xp or yp)
Y-coordinate(s) of the foot point as a float or array (matching the shape of xp or yp)
Distance between the point(s) P and the line as a float or array (matching the shape of xp or yp)
"""
if x1 == x2 and y1 == y2:
raise ValueError('(x1,y1) == (x2,y2)')
elif x1 == x2: # vertical
xf = x1 * xp**0
yf = yp * xp**0
elif y1 == y2: # horizontal
xf = xp * xp**0
yf = y1 * xp**0
else: # diagonal
# (1) and (2)
mm = (y2 - y1) / (x2 - x1)
qq = y1 - mm * x1
# (p) and (f)
nn = -1.0 / mm
pp = yp - nn * xp
# foot
xf = (pp - qq) / (mm - nn)
yf = nn * (xf - xp) + yp
# distance
dd = np.hypot(xf - xp, yf - yp)
return xf, yf, dd
def sshape(d):
return 'x'.join([str(s) for s in np.shape(d)])
# Created by wuwen at 2015/01/22 15:50
[docs]class OMFITtrip3Dcoil(SortedDict, OMFITascii):
def __init__(self, filename, **kw):
SortedDict.__init__(self)
OMFITascii.__init__(self, filename, **kw)
self.dynaLoad = True
def __tree_repr__(self):
myrepr = 'FILE: %s (%s' % (os.path.split(self.filename)[1], sizeof_fmt(self.filename, ' '))
if not self.dynaLoad:
myrepr += ', {} loops'.format(len(self) - 2)
myrepr += ')'
return myrepr, []
[docs] @dynaLoad
def load(self):
with open(self.filename, 'r') as f:
lines = f.readlines()
self['device'] = lines[0].strip()
self['coil'] = lines[1].strip()
nloops = int(lines[2].split()[0])
k = 3
for l in range(nloops):
nitems = int(lines[k].split()[0])
titem = lines[k].split()[1]
k += 1
loop = SortedDict()
for s in range(nitems):
loop[s] = [float(i) for i in lines[k].split()]
k += 1
self[l, titem] = loop
[docs] @dynaSave
def save(self):
with open(self.filename, 'w') as f:
f.write(self['device'].strip() + '\n')
f.write(self['coil'].strip() + '\n')
f.write('%3d loops\n' % (len(self) - 2))
for k in list(self.keys())[2:]:
f.write('%3d %s\n' % (len(self[k]), k[1]))
for c in list(self[k].keys()):
f.write((' % f' * len(self[k][c]) + '\n') % tuple(self[k][c]))
# Created by Trevisan at 05 Dec 2016 15:49
[docs]class OMFITtrip3Dlines(OMFITmatrix):
"""
The OMFITtrip3Dlines class parses and handles the TRIP3D 'lines.out' output file.
A self-described xarray object is stored under self['data'].
"""
# file descriptors
columns = {
2: ['psimin', 'psimax'],
# 8: ['phi','rho','ith','the','bmag','brot','psin','mlen'],
9: ['phi', 'rho', 'ith', 'the', 'brho', 'bthe', 'bphi', 'mlen', 'psin'],
}
for c in [9]:
columns[c + 2] = list(columns[c])
columns[c + 2].extend(['rr', 'zz'])
columns[c + 4] = list(columns[c + 2])
columns[c + 4].extend(['psimin', 'psimax'])
def __getitem__(self, key):
try:
return SortedDict.__getitem__(self, key)
except Exception:
if key == 'data':
raise
try:
return self['data'].loc[:, :, key].values
except Exception:
return self['data'].attrs[key]
def __mul__(self, other, quiet=False):
# sanity checks
if not isinstance(other, self.__class__):
raise TypeError('class mismatch: {} vs {}.'.format(other.__class__.__name__, self.__class__.__name__))
if self['data'].shape[0] != other['data'].shape[0]:
raise ValueError(
'shape mismatch: {} vs {} (full dims: {}, {}).'.format(
self['data'].shape[0], other['data'].shape[0], self['data'].shape, other['data'].shape
)
)
# psiN arrays
self['data'].loc[:, :, 'psimin'] = (self['psin'].transpose() ** 0 * np.nanmin(other['data'][:, 0, :], axis=-1)).transpose()
self['data'].loc[:, :, 'psimax'] = (self['psin'].transpose() ** 0 * np.nanmax(other['data'][:, 0, :], axis=-1)).transpose()
if not quiet:
_print(self, 'updated psimin and psimax.')
return self
[docs] @dynaLoad
def load(self, bin=None, zip=None, geqdsk=None, verbose=False, quiet=False):
p = False
t = tstart = time.time()
# read file
bin, zip = OMFITmatrix.load(
self, bin=bin, zip=zip, header=None, comment='%', delim_whitespace=True, index_col=False, na_filter=False, engine='c'
)
if not quiet and verbose:
p = _print(self, '%.2f s, %s, read from file.' % (time.time() - tstart, sshape(self.get('data', None))), p=p)
if not bin:
# reshape to 3D
tstart = time.time()
self.to3d()
if not quiet and verbose:
p = _print(self, '%.2f s, %s, reshaped to 3D.' % (time.time() - tstart, sshape(self.get('data', None))), p=p)
if np.shape(self['data'])[-1] in [8, 9]:
# add attributes
tstart = time.time()
stat = self.addattrs(geqdsk=geqdsk)
if not quiet and verbose:
if stat is None:
p = _print(self, '%.2f s, did not read gEQDSK data.' % (time.time() - tstart), tag='w', p=p)
else:
p = _print(self, '%.2f s, read gEQDSK data.' % (time.time() - tstart), p=p)
# add columns
tstart = time.time()
self.addcols()
if not quiet and verbose:
p = _print(self, '%.2f s, %s, added columns.' % (time.time() - tstart, sshape(self['data'])), p=p)
# summary
if not quiet:
if zip:
z = 'zlibbed' if bin else 'gzipped'
else:
z = 'plain'
p = _print(
self, '%.2f s, %s, loaded %s %s.' % (time.time() - t, sizeof_fmt(self.filename, ' '), z, 'NetCDF' if bin else 'ASCII'), p=p
)
[docs] def to3d(self):
old = self['data'].values
s = np.shape(old)
if s[-1] == 2:
lidx = np.array(list(range(s[0]))).astype(int)
llen = lidx**0
elif s[-1] in [8, 9]:
lidx = np.where(old[:, -2] == 0)[0].astype(int)
llen = np.diff(np.append(lidx, s[-2])).astype(int)
else:
raise ValueError('unexpected shape: ' + str(s))
new = np.nan * np.zeros((np.size(lidx), np.max(np.append(llen, 1)), s[-1]))
for idx, (at, len) in enumerate(zip(lidx, llen)):
new[idx, :len, :] = old[at : at + len, :]
self.toxr(new)
[docs] def addattrs(self, geqdsk=None):
if geqdsk is None:
try:
geqdsk = eval(treeLocation(self)[-3])['INPUTS']['gEQDSK']
except Exception:
pass
try:
rmax = geqdsk['RMAXIS']
zmax = geqdsk['ZMAXIS']
rlim = geqdsk['RLIM']
zlim = geqdsk['ZLIM']
stat = True
except Exception:
rmax = np.nan
zmax = np.nan
rlim = np.nan
zlim = np.nan
stat = None
finally:
self['data'].attrs['rmax'] = rmax
self['data'].attrs['zmax'] = zmax
self['data'].attrs['rlim'] = rlim
self['data'].attrs['zlim'] = zlim
return stat
[docs] def addcols(self):
filterwarnings('ignore', r'All-NaN (slice|axis) encountered')
old = self['data'].values
s = np.shape(old)
try:
rmax = self['rmax']
zmax = self['zmax']
if np.isnan(rmax) or np.isnan(zmax):
raise ValueError
except Exception:
rmax = zmax = 0.0
new = np.nan * np.zeros((s[0], s[1], s[2] + 4))
new[:, :, :-4] = old[:, :, :]
new[:, :, s[2] + 0] = rmax + self['rho'] * np.cos(np.deg2rad(self['the']))
new[:, :, s[2] + 1] = zmax + self['rho'] * np.sin(np.deg2rad(self['the']))
new[:, :, s[2] + 2] = (self['psin'].transpose() ** 0 * np.nanmin(self['psin'], axis=-1)).transpose()
new[:, :, s[2] + 3] = (self['psin'].transpose() ** 0 * np.nanmax(self['psin'], axis=-1)).transpose()
self.toxr(new)
[docs] def toxr(self, data, attrs=None):
s = np.shape(data)
l = list(range(s[0]))
p = list(range(s[1]))
c = self.columns[s[2]]
if attrs is None:
attrs = self['data'].attrs
self['data'] = xr.DataArray(data, name='lines', dims=['L', 'P', 'C'], coords=[l, p, c], attrs=attrs)
[docs] @dynaSave
def save(self, zip=True, quiet=False):
t = time.time()
bin, zip = OMFITmatrix.save(self, bin=True, zip=zip)
if not quiet:
if zip:
z = 'zlibbed' if bin else 'gzipped'
else:
z = 'plain'
_print(self, '%.2f s, %s, saved %s %s.' % (time.time() - t, sizeof_fmt(self.filename, ' '), z, 'NetCDF' if bin else 'ASCII'))
[docs] def prep(self, type, prefix=True, quiet=False):
# disable method for non-line instances
if np.shape(self['data'])[-1] == 2:
return
# prep type
if type not in ['poincare', 'contour', 'control']:
raise TypeError('unexpected value for prep type')
# load before starting the stopwatch
if self.dynaLoad:
self.load(prefix=prefix, quiet=quiet)
# stopwatch and summary
tstart = time.time()
# check data
if '__%s__' % type in self:
return
# prepare data
data = self['data']
if type == 'poincare':
la = r"poloidal angle, $\vartheta$ [deg]"
aa = self['the'].copy()
numsi, numsj = np.where(np.isnan(aa) == False)
high = np.where(aa[numsi, numsj] > 180)[0]
aa[numsi[high], numsj[high]] -= 360
lb = r"normalized poloidal flux, $\Psi_N$"
bb = self['psin'].copy()
elif type == 'contour':
la = r"$R - R_{ax}$ [m]"
aa = self['rr'].copy()
lb = r"$Z - Z_{ax}$ [m]"
bb = self['zz'].copy()
elif type == 'control':
la = r"normalized toroidal angle, $\phi \% 360$ [deg]"
aa = self['phi'] % 360
lb = r"toroidal angle, $\phi$ [deg]"
bb = self['phi'].copy()
# initial psin
lc = r"initial $\Psi_N$"
cc = np.transpose(np.transpose(self['psin']) ** 0 * self['psin'][:, 0])
# minimum psin
ld = r"min $\Psi_N$"
dd = self['psimin'].copy()
# store data
self['__%s__' % type] = np.transpose([np.transpose(aa), np.transpose(bb)])
self['__colors__'] = np.transpose([np.transpose(cc), np.transpose(dd)])
labels = self.setdefault('__labels__', {})
labels[type] = [la, lb]
labels['colors'] = [lc, ld]
# stopwatch and summary
if not quiet:
_print(self, '%s: prepared data in %.3f s.' % (type.ljust(8), time.time() - tstart), prefix=prefix)
[docs] def doplot(
self,
type,
geqdsk=None,
lines=(None, None, None),
points=(None, None, None),
limx=(None, None),
limy=(None, None),
col='min',
cbar=True,
prefix=True,
quiet=False,
**kw,
):
# disable method for non-line instances
if np.shape(self['data'])[-1] == 2:
return
# printed
p = False
# plot type
if type not in ['poincare', 'contour', 'control']:
raise TypeError('unexpected value for plot type: %s' % type)
# color type
if col not in ['min', 'ini']:
raise TypeError('unexpected value for color: %s' % col)
# prepare data
self.prep(type=type, prefix=prefix, quiet=quiet)
# retrieve data
aa = self['__%s__' % type][:, :, 0]
bb = self['__%s__' % type][:, :, 1]
[la, lb] = self['__labels__'][type][:2]
if col == 'ini':
c = 0
elif col == 'min':
c = 1
cc = self['__colors__'][:, :, c]
lc = self['__labels__']['colors'][c]
# stopwatch and summary
tstart = time.time()
# line slicing
if isinstance(lines, tuple):
lsli = slice(*lines)
if not quiet and lines != (None, None, None):
p = _print(self, '%s: slicing lines as %s.' % (type.ljust(8), repr(lines)), prefix=prefix, p=p)
elif isinstance(lines, list) and np.size(lines):
lsli = lines
if not quiet:
p = _print(self, '%s: selecting %d lines.' % (type.ljust(8), np.size(lines)), prefix=prefix, p=p)
else:
raise TypeError('unexpected value for lines')
# point slicing
if isinstance(points, tuple):
psli = slice(*points)
if not quiet and points != (None, None, None):
p = _print(self, '%s: slicing points as %s.' % (type.ljust(8), repr(points)), prefix=prefix, p=p)
elif isinstance(points, list) and np.size(points):
psli = points
if not quiet:
p = _print(self, '%s: selecting %d points.' % (type.ljust(8), np.size(points)), prefix=prefix, p=p)
else:
raise TypeError('unexpected value for points')
# default boundaries
kw.setdefault('vmin', np.nanmin(cc[lsli, psli].reshape(-1)))
kw.setdefault('vmax', np.nanmax(cc[lsli, psli].reshape(-1)))
# default colormap
if 'cmap' not in kw:
intmid = int(1e4)
inthig = int(intmid * kw['vmin'])
intlow = int(intmid * kw['vmax'])
higbar = intmid > inthig
lowbar = intlow > intmid
if higbar:
hi = pyplot.cm.autumn(np.linspace(0, 1, intmid - inthig))
if lowbar:
lo = pyplot.cm.winter_r(np.linspace(0, 1, intlow - intmid))
if higbar and lowbar:
mi = int((intlow - inthig) // 100) * [[0, 0, 0, 1]]
bar = np.vstack((hi, mi, lo))
else:
bar = hi if higbar else lo
kw['cmap'] = matplotlib.colors.LinearSegmentedColormap.from_list('cmapall', bar)
kw['vmin'] = 1.0 * inthig / intmid
kw['vmax'] = 1.0 * intlow / intmid
# contour plot
refa = 0
refb = 0
if type == 'contour':
pyplot.gca().set_aspect('equal')
# try to locate the gEQDSK in INPUTS
if geqdsk is None:
try:
geqdsk = eval(treeLocation(self)[-3])['INPUTS']['gEQDSK']
if not isinstance(geqdsk, OMFITgeqdsk):
raise TypeError
except Exception:
geqdsk = None
try:
refa = self['rmax']
refb = self['zmax']
la = r'$R$ [m]'
lb = r'$Z$ [m]'
pyplot.plot(self['rlim'], self['zlim'], 'k-')
except Exception:
if not quiet:
p = _print(self, '%s: R/ZLIM will not be plotted.' % type.ljust(8), tag='w', prefix=prefix, p=p)
pyplot.plot(refa, refb, 'k+')
# plot
kw.setdefault('s', 1)
kw.setdefault('edgecolor', '')
kw.setdefault('rasterized', True)
scatter(aa[lsli, psli], bb[lsli, psli], c=cc[lsli, psli], **kw)
# set limits
if limx != (None, None):
pyplot.xlim(limx)
elif type == 'poincare':
pyplot.xlim(-180, 180)
if limy != (None, None):
pyplot.ylim(limy)
# set labels
pyplot.xlabel(la)
pyplot.ylabel(lb)
if cbar:
cb = colorbar()
cb.ax.set_title(lc)
if kw['vmin'] < 1.0 and 1.0 < kw['vmax']:
try:
ticks = np.around(np.linspace(kw['vmin'], kw['vmax'], 7), decimals=3)
(tisep,) = np.where(abs(ticks - 1) == min(abs(ticks - 1)))
ticks += 1 - ticks[tisep]
cb.set_ticks(ticks)
except Exception:
if not quiet:
p = _print(self, '%s: could not set colorbar ticks.' % type.ljust(8), tag='w', prefix=prefix, p=p)
# stopwatch and summary
tstop = time.time()
num = np.size(aa[lsli, psli]) - np.sum(np.isnan(aa[lsli, psli]))
if not quiet:
p = _print(self, '{}: plotted {:,d} points in {:.3f} s.'.format(type.ljust(8), num, tstop - tstart), prefix=prefix, p=p)
[docs] def plot(self, type='summary', **kw):
# disable method for non-line instances
if np.shape(self['data'])[-1] == 2:
return
# prefix
me = self.__class__.__name__ + '.plot'
blankme = ' ' * len(me)
# quiet keyword
quiet = kw.setdefault('quiet', False)
# plot type
if type in ['poincare', 'contour', 'control']:
self.doplot(type=type, **kw)
return
elif type != 'summary':
raise TypeError('unexpected value for plot type: %s' % type)
# prepare data
p = False
for t in ['poincare', 'contour', 'control']:
if '__%s__' % t in self:
continue
self.prep(type=t, prefix=blankme if p else True, quiet=quiet)
p = True
# default slicing
p = False
if 'lines' not in kw and 'points' not in kw:
# line slicing: toroidal planes
kw['lines'] = (None, None, None)
(planes,) = np.where(self['data'][:, 0, 0] > self['data'][0, 0, 0])
if len(planes):
stop = planes[0]
kw['lines'] = (None, stop, None)
if not quiet:
p = _print(
self,
'%s: detected %d starting planes with step of %.1f deg.'
% (type.ljust(8), np.shape(self['data'])[0] / stop, self['data'][stop, 0, 0]),
p=p,
)
else:
if not quiet:
p = _print(self, '%s: detected 1 starting plane at %.1f deg.' % (type.ljust(8), self['data'][0, 0, 0]), p=p)
# point slicing: poincare planes
kw['points'] = (None, None, None)
step = 0
for idx in [0, int(np.shape(self['data'])[1] // 2)]:
try:
diff = abs(self['data'][0, idx + 1, 0] - self['data'][0, idx, 0])
step = int(round(360.0 / diff))
break
except Exception:
pass
if step == 0:
if not quiet:
p = _print(self, '%s: could not detect toroidal step.' % type.ljust(8), tag='w', p=p)
elif step == 1:
if not quiet:
p = _print(self, '%s: detected 1 poincare plane at %.1f deg.' % (type.ljust(8), self['data'][0, idx, 0]), p=p)
else:
kw['points'] = (None, None, step)
if not quiet:
p = _print(self, '%s: detected %d poincare planes with step of %.1f deg.' % (type.ljust(8), step, round(diff)), p=p)
else:
kw.setdefault('points', (None, None, None))
kw.setdefault('lines', (None, None, None))
if not quiet:
p = _print(self, '%s: overriding default slicing.' % type.ljust(8), p=p)
# colormaps
gg = matplotlib.colors.LinearSegmentedColormap.from_list('g', 2 * [[0.7, 0.7, 0.7, 1]])
kk = matplotlib.colors.LinearSegmentedColormap.from_list('k', 2 * [[0, 0, 0, 1]])
# composite plot
ioff()
suptitle(treeLocation(self)[-1])
# control plots
pyplot.subplot(2, 2, 1)
if 'points' in kw and kw['points'] != (None, None, None):
kwa = kw.copy()
kwa['points'] = (None, None, None)
kwa['cmap'] = gg
kwa.pop('quiet')
kwa['cbar'] = False
# downsample lines for performance
if isinstance(kw['lines'], tuple):
lsli = slice(*kw['lines'])
elif isinstance(kw['lines'], list) and np.size(kw['lines']):
lsli = kw['lines']
lidx = list(range(np.shape(self['data'])[0]))[lsli]
got = np.count_nonzero(~np.isnan(self['data'][lidx, :, 0]))
want = 123456
if got > want:
step = got / want + int(got % want > 0)
kwa['lines'] = lidx[::step]
if not quiet:
p = _print(self, '{:s}: downsampled control for performance.'.format(type.ljust(8)), p=p)
self.doplot(type='control', quiet=True, **kwa)
kwb = kw.copy()
kwb['s'] = 6
kwb['cmap'] = kk
kwb['cbar'] = False
self.doplot(type='control', prefix=blankme, **kwb)
# poincare plot
pyplot.subplot(2, 2, 3)
kwc = kw.copy()
kwc['cbar'] = False
self.doplot(type='poincare', prefix=blankme, **kwc)
# contour plot
pyplot.subplot(1, 2, 2)
self.doplot(type='contour', prefix=blankme, **kw)
# interactive plotting
ion()
draw()
show()
# Created by Trevisan at 11 Aug 2017 16:03
[docs]class OMFITtrip3Dhits(OMFITmatrix):
"""
The OMFITtrip3Dhits class parses and handles the TRIP3D 'hit.out' output file.
A self-described xarray object is stored under self['data'].
"""
def __getitem__(self, key):
try:
return SortedDict.__getitem__(self, key)
except Exception:
if key == 'data':
raise
try:
return self['data'].loc[:, :, key].values
except Exception:
return self['data'].attrs[key]
@dynaLoad
def __invert__(self):
s = copy.deepcopy(self)
s['data'].attrs = self['data'].attrs.copy()
for att in 'abc':
s['data'].attrs.pop(att, None)
s['data'].loc[:, 0, :] = self['data'].loc[:, 2, :].copy()
s['data'].loc[:, 2, :] = self['data'].loc[:, 0, :].copy()
s['data'].loc[:, :, 'mlen'] = self['data'].loc[:, :, 'mlen'].copy()
s['data'][:, 1, :].data *= np.nan
s.filename = s.link = OMFITascii('hit.inv').filename
if self.readOnly:
s.readOnly = False
s.save()
s.readOnly = True
return s
[docs] @dynaLoad
def load(self, bin=None, zip=None, inverse=True, geqdsk=None, verbose=False, quiet=False):
p = False
t = tstart = time.time()
# read file
bin, zip = OMFITmatrix.load(self, bin=bin, zip=zip)
if not quiet and verbose:
p = _print(self, '%.2f s, %s, read from file.' % (time.time() - tstart, sshape(self['data'])), p=p)
if not bin:
# reshape to 3D
tstart = time.time()
self.to3d()
if not quiet and verbose:
p = _print(self, '%.2f s, %s, reshaped to 3D.' % (time.time() - tstart, sshape(self['data'])), p=p)
# add attributes
tstart = time.time()
stat = self.addattrs(inverse=inverse, geqdsk=geqdsk)
if not quiet and verbose:
if stat is None:
p = _print(self, '%.2f s, did not read gEQDSK data.' % (time.time() - tstart), tag='w', p=p)
else:
p = _print(self, '%.2f s, read %s gEQDSK data.' % (time.time() - tstart, 'inverted' if stat else 'normal'), p=p)
# add columns
tstart = time.time()
self.addcols()
if not quiet and verbose:
p = _print(self, '%.2f s, %s, added columns.' % (time.time() - tstart, sshape(self['data'])), p=p)
# summary
if not quiet:
if zip:
z = 'zlibbed' if bin else 'gzipped'
else:
z = 'plain'
p = _print(
self, '%.2f s, %s, loaded %s %s.' % (time.time() - t, sizeof_fmt(self.filename, ' '), z, 'NetCDF' if bin else 'ASCII'), p=p
)
[docs] def to3d(self):
old = self['data'].values
s = np.shape(old)
idx = old[:, 0].astype(int)
cols = int((s[-1] - 1) // 3)
new = np.nan * np.zeros((s[0], 3, cols))
for p in range(3):
new[:, p, :cols] = old[:, 1 + p * cols : 1 + (p + 1) * cols]
self.toxr(new, attrs={'index': idx})
[docs] def addattrs(self, inverse=True, geqdsk=None):
if geqdsk is None:
try:
geqdsk = eval(treeLocation(self)[-3])['INPUTS']['gEQDSK']
except Exception:
pass
try:
rmax = geqdsk['RMAXIS']
zmax = geqdsk['ZMAXIS']
rlim = geqdsk['RLIM'][-1 if inverse else None :: -1 if inverse else None]
zlim = geqdsk['ZLIM'][-1 if inverse else None :: -1 if inverse else None]
slim = np.insert(np.cumsum(np.hypot(np.diff(rlim[:]), np.diff(zlim[:]))), 0, 0)
stat = inverse
except Exception:
rmax = np.nan
zmax = np.nan
rlim = np.nan
zlim = np.nan
slim = np.nan
stat = None
finally:
self['data'].attrs['rmax'] = rmax
self['data'].attrs['zmax'] = zmax
self['data'].attrs['rlim'] = rlim
self['data'].attrs['zlim'] = zlim
self['data'].attrs['slim'] = slim
return stat
[docs] def addcols(self):
filterwarnings('ignore', r'All-NaN (slice|axis) encountered')
old = self['data'].values
s = np.shape(old)
try:
rmax = self['rmax']
zmax = self['zmax']
if np.isnan(rmax) or np.isnan(zmax):
raise ValueError
except Exception:
rmax = zmax = 0.0
new = np.nan * np.zeros((s[0], s[1], s[2] + 2))
new[:, :, :-2] = old[:, :, :]
new[:, :, s[2] + 0] = rmax + self['rho'] * np.cos(np.deg2rad(self['the']))
new[:, :, s[2] + 1] = zmax + self['rho'] * np.sin(np.deg2rad(self['the']))
self.toxr(new)
[docs] def toxr(self, data, attrs=None):
s = np.shape(data)
l = list(range(s[0]))
p = list(range(s[1]))
c = OMFITtrip3Dlines.columns[s[2]]
if attrs is None:
attrs = self['data'].attrs
self['data'] = xr.DataArray(data, name='hits', dims=['L', 'P', 'C'], coords=[l, p, c], attrs=attrs)
[docs] @dynaSave
def save(self, zip=False, quiet=False):
t = time.time()
bin, zip = OMFITmatrix.save(self, bin=True, zip=zip)
if not quiet:
if zip:
z = 'zlibbed' if bin else 'gzipped'
else:
z = 'plain'
_print(self, '%.2f s, %s, saved %s %s.' % (time.time() - t, sizeof_fmt(self.filename, ' '), z, 'NetCDF' if bin else 'ASCII'))
[docs] def prep(self, project=False, force=False, save=None, prefix=True, quiet=False):
if not force and np.all([w in self['data'].attrs for w in 'abc']):
return
if self.dynaLoad:
self.load()
# ignore all-nan warnings
filterwarnings('ignore', r'All-NaN (slice|axis) encountered')
if np.all(np.isnan(self['data'][:, 1, :].data)):
project = True
if np.any(np.isnan([self['rmax'], self['zmax']])) or np.any(np.isnan([self['rlim'], self['zlim'], self['slim']])):
raise ValueError('missing gEQDSK data.')
# shortcuts
data = self['data'].data
rlim = self['rlim']
zlim = self['zlim']
ss = self['slim']
c = {v: k for k, v in enumerate(self['data'].coords['C'].data)}
# total number of points
tot = np.shape(data)[0]
myformat = '{:' + str(len('{:,d}'.format(tot))) + ',d}'
tstart = time.time()
# hit points
iih = np.zeros(tot).astype('int')
rrh = np.nan * np.zeros(tot)
zzh = np.nan * np.zeros(tot)
phh = np.nan * np.zeros(tot)
mah = np.nan * np.zeros(tot)
# detect phi step
dphi = np.abs(np.diff(data[:, 1:, c['phi']], axis=1)).squeeze()
dphi[np.where(dphi[:] == 0.0)] = np.nan
dpinit = np.nanmin(dphi)
if not quiet:
p = _print(self, (myformat + ' total points, step is {:.2f} deg.').format(tot, dpinit), prefix=prefix)
# index lists
ko = []
try1 = []
try2 = []
# 1st TRY: USE SECOND-TO-LAST AND LAST POINT
for t in range(tot):
xx = data[t, 1:, c['rr']]
yy = data[t, 1:, c['zz']]
ph = data[t, 1:, c['phi']]
ma = data[t, 1:, c['mlen']]
try:
# this is NOT the second-to-last point
if np.isnan(dphi[t]) or dphi[t] > dpinit * 1.5:
raise ValueError
# discard kicks greater than 50 cm
if np.hypot(np.diff(xx), np.diff(yy)) > 0.5:
raise ValueError
# compute intersection
result, index = line_intersect(np.vstack((rlim, zlim)).T, np.vstack((xx, yy)).T, return_indices=True)
rrh[t], zzh[t] = result[0]
iih[t] = index[0][0]
except (ValueError, IndexError):
# discard second-to-last point
ko.append(t)
continue
# compute normalized distance
ddh = np.hypot(rrh[t] - xx[1], zzh[t] - yy[1])
ddh /= np.hypot(xx[0] - xx[1], yy[0] - yy[1])
# inteporlate phi, mag
phh[t] = ph[1] + ddh * (ph[0] - ph[1])
mah[t] = ma[1] + ddh * (ma[0] - ma[1])
try1.append(t)
# 2nd TRY: USE ONLY LAST POINT
if project and np.size(ko):
kos = np.array(ko).astype('int')
ko = []
# limiter-to-last point distance
limd = np.nan * np.zeros((len(rlim), np.size(kos), 3))
# for each divertor segment
for l in range(len(rlim) - 1):
if rlim[l] == rlim[l + 1] and zlim[l] == zlim[l + 1]:
continue
# distance from segment center
xc = np.mean([rlim[l], rlim[l + 1]])
yc = np.mean([zlim[l], zlim[l + 1]])
rc = np.hypot(xc - rlim[l], yc - zlim[l])
(cpu,) = np.where(np.hypot(data[kos[:], 2, c['rr']] - xc, data[kos[:], 2, c['zz']] - yc) <= rc * 1.5)
if not np.size(cpu):
continue
# compute foot (xf,yf) and distance dd
xf, yf, dd = foot_on_line(
xp=data[kos[cpu], 2, c['rr']], x1=rlim[l], x2=rlim[l + 1], yp=data[kos[cpu], 2, c['zz']], y1=zlim[l], y2=zlim[l + 1]
)
limd[l, cpu[:], 0] = xf[:]
limd[l, cpu[:], 1] = yf[:]
limd[l, cpu[:], 2] = dd[:]
# for each point
for k in range(len(kos)):
(result,) = np.where(limd[:, k, 2] == np.nanmin(limd[:, k, 2]))
try:
iih[kos[k]] = result[0]
try2.append(kos[k])
except IndexError:
ko.append(kos[k])
rrh[try2] = limd[iih[try2], list(range(len(try2))), 0]
zzh[try2] = limd[iih[try2], list(range(len(try2))), 1]
phh[try2] = data[try2, 2, c['phi']].copy()
mah[try2] = data[try2, 2, c['mlen']].copy()
# s: based on hit data
ssh = ss[iih[:]] + np.hypot(rrh[:] - rlim[iih[:]], +zzh[:] - zlim[iih[:]])
# store data
self['data'].attrs['a'] = phh[:] * -1 % 360
self['data'].attrs['b'] = ssh[:]
self['data'].attrs['c'] = mah[:]
# summary
if np.size(try1):
p = _print(self, (myformat + ' intersections.').format(np.size(try1)), tag='i', prefix=prefix, p=p)
if np.size(try2):
p = _print(self, (myformat + ' projections.').format(np.size(try2)), tag='w', prefix=prefix, p=p)
if np.size(ko):
p = _print(self, (myformat + ' errors.').format(np.size(ko)), tag='e', prefix=prefix, p=p)
p = _print(self, 'computed hit data in {:.3f} s.'.format(time.time() - tstart), prefix=prefix, p=p)
# save data
if save is True or (save is None and self.OMFITproperties['bin'] is True):
ro = self.readOnly
self.readOnly = False
self.save()
self.readOnly = ro
[docs] def plot(self, mlen=100, sol=False, cbar=True, inverse=False, prefix=True, quiet=False, **kw):
r"""
The OMFITtrip3Dhits.plot plots the footprint based on the data prepared by OMFITtrip3Dhits.prep.
:param mlen: if None, select all the data points,
if MIN, select points with MIN <= mlen,
if (MIN,MAX), MIN <= mlen <= MAX.
:param cbar: if True, the colorbar will be plotted,
if False, hidden.
:param inverse: if False, y-axis is normal,
if True, inverted.
:param quiet: if False, output to console is normal,
if True, suppressed.
:param \**kw: keyword dictionary to be passed to scatter().
:return: None
"""
filterwarnings('ignore', r'invalid value encountered in (less|greater)_equal')
if not np.all([w in self['data'].attrs for w in 'abc']):
self.prep(prefix=prefix, quiet=quiet)
cc = self['data'].attrs['c']
if sol and np.nanmin(cc) < mlen and mlen < np.nanmax(cc):
i = isinteractive()
if i:
ioff()
gs = GridSpec(2, 16)
gs.update(wspace=1, hspace=0)
ct = pyplot.subplot(gs[0, -1])
cb = pyplot.subplot(gs[1, -1])
ax = pyplot.subplot(gs[:, :-1])
vmin = kw.pop('vmin', 0)
vmax = kw.pop('vmax', None)
cmap = kw.pop('cmap', None)
if cmap is None or not np.iterable(cmap):
cmapb = 'gist_earth_r'
cmapt = 'plasma'
else:
cmapb = cmap[0]
cmapt = cmap[1]
self.plot(mlen=mlen, sol=False, cmap=cmapt, cbar=False, vmin=mlen, vmax=vmax, inverse=inverse, prefix=prefix, quiet=quiet, **kw)
ct.set_title('len [m]')
colorbar(cax=ct, format='%d', ticks=pyplot.MaxNLocator(nbins=5, prune='both'))
self.plot(
mlen=(vmin, mlen),
sol=False,
cmap=cmapb,
cbar=False,
vmin=vmin,
vmax=mlen,
inverse=inverse,
prefix=prefix,
quiet=quiet,
**kw,
)
colorbar(cax=cb, format='%d', ticks=pyplot.MaxNLocator(nbins=5))
if i:
ion()
draw()
show()
return ax, (cb, ct)
# shortcuts
p = False
aa = self['data'].attrs['a']
bb = self['data'].attrs['b']
cc = self['data'].attrs['c']
ss = self['slim']
rlim = self['rlim']
zlim = self['zlim']
# total number of points
tot = np.size(self['data'][:, 0])
myformat = '{:' + str(len('{:,d}'.format(tot))) + ',d}'
tstart = time.time()
# magnetic length threshold
if mlen is None:
select = list(range(len(cc)))
elif np.size(mlen) == 1:
(select,) = np.where(cc >= mlen)
elif np.iterable(mlen) and np.size(mlen) == 2:
ave = np.mean(mlen)
(dif,) = np.diff(mlen) / 2.0
(select,) = np.where(abs(cc[:] - ave) <= dif)
else:
raise ValueError('unsupported magnetic length format: {:s} x {:d}.'.format(mlen.__class__.__name__, np.size(mlen)))
if not np.size(select):
if np.size(mlen) == 1:
if not quiet:
p = _print(self, 'no points with length of {:.0f} m.'.format(mlen), tag='w', prefix=prefix, p=p)
select = list(range(len(cc)))
else:
if not quiet:
p = _print(self, 'no points with length of [{:.0f}, {:.0f}] m.'.format(*mlen), tag='e', prefix=prefix, p=p)
return None
if not quiet:
p = _print(
self, 'magnetic length in [{:.0f}, {:.0f}] m.'.format(np.nanmin(cc[select]), np.nanmax(cc[select])), prefix=prefix, p=p
)
# limits
pyplot.xlim(0, 360)
pyplot.ylim(np.nanmin(bb[select]), np.nanmax(bb[select]))
# default keywords
kw.setdefault('cmap', 'plasma')
kw.setdefault('s', 1)
kw.setdefault('edgecolor', '')
kw.setdefault('alpha', 0.75)
kw.setdefault('rasterized', True)
# plot
scatter(aa[select], bb[select], c=cc[select], **kw)
ax = pyplot.gca()
# axes
if inverse:
pyplot.gca().invert_yaxis()
pyplot.xlabel(r'toroidal angle, $\phi$ [deg]')
pyplot.ylabel('hit parameter, $s$ [m]')
ticklabel_format(useOffset=False)
if cbar:
cb = colorbar(format='%d', ticks=pyplot.MaxNLocator(prune='upper'))
cb.ax.set_title('len [m]')
# summary
if not quiet:
p = _print(
self, 'plotted {:,d} points in {:.3f} s.'.format(np.sum(~np.isnan(bb[select])), time.time() - tstart), prefix=prefix, p=p
)
return ax, [cb.ax] if cbar else []
# Created by Trevisan at 01 May 2018 15:43
[docs]class OMFITtrip3Dstart(OMFITmatrix):
"""
The OMFITtrip3Dstart class parses and handles the TRIP3D start points input file.
A self-described xarray object is stored under self['data'].
"""
[docs] @dynaLoad
def load(self):
t = time.time()
OMFITmatrix.load(self, bin=False, zip=False, skiprows=1, comment='!')
if self['data'] is not None:
self.toxr()
_print(self, '%.2f s, %s, load from file.' % (time.time() - t, sizeof_fmt(self.filename, ' ')))
[docs] @dynaSave
def save(self):
t = time.time()
na = np.shape(self['data'])[0]
nb = np.max([1, np.size(np.where(self['data'][:, -1] == self['data'][0, -1]))])
nc = na / nb
with open(self.filename, 'w') as f:
f.write(3 * '%d ' % (na, nb, nc) + '\n')
OMFITmatrix.save(self, bin=False, zip=False, mode='a', na_rep='nan')
_print(self, '%.2f s, %s, saved to file.' % (time.time() - t, sizeof_fmt(self.filename, ' ')))
[docs] def toxr(self, data=None):
if data is None:
try:
data = self['data'].values
except Exception:
data = self['data']
self['data'] = xr.DataArray(data, name='startfile', dims=['P', 'C'], coords=[list(range(np.shape(data)[0])), ['rad', 'the', 'phi']])
[docs] def plot(self, radvar=None, geqdsk=None, pol=True, tor=True, lim=True, surf=True, **kw):
if radvar is None:
radvar = eval(treeLocation(self)[-2])['trip3d.in']['input']['iRadVar']
if geqdsk is None:
try:
geqdsk = eval(treeLocation(self)[-2])['gEQDSK']
except Exception:
pass
try:
rmax = geqdsk['RMAXIS']
zmax = geqdsk['ZMAXIS']
except Exception:
rmax = np.nan
zmax = np.nan
try:
rlim = geqdsk['RLIM']
zlim = geqdsk['ZLIM']
except Exception:
rlim = np.nan
zlim = np.nan
suptitle(treeLocation(self)[-1])
kw = {'rasterized': True}
if pol:
if tor:
pyplot.subplot(122)
pyplot.title('poloidal view')
if radvar == 1:
# rho
rr = rmax + self['data'].loc[:, 'rad'] * np.cos(np.deg2rad(self['data'].loc[:, 'the']))
zz = zmax + self['data'].loc[:, 'rad'] * np.sin(np.deg2rad(self['data'].loc[:, 'the']))
if surf and geqdsk['NBBBS']:
geqdsk.plot(only2D=True)
pyplot.gca().autoscale(tight=True)
kw['color'] = pyplot.gca().lines[-2].get_color()
if lim:
pyplot.plot(rlim, zlim, 'k-')
pyplot.gca().set_aspect('equal')
pyplot.xlabel('R [m]')
pyplot.ylabel('Z [m')
pyplot.plot(rr, zz, '.', **kw)
elif radvar == 3:
# psi normal
pyplot.plot(self['data'].loc[:, 'the'], self['data'].loc[:, 'rad'], '.', **kw)
pyplot.xlabel('theta [deg]')
pyplot.ylabel('psin')
if tor:
if pol:
pyplot.subplot(121)
pyplot.title('toroidal view')
if radvar == 1:
# rho
rr = rmax + self['data'].loc[:, 'rad'] * np.cos(np.deg2rad(self['data'].loc[:, 'the']))
pyplot.plot(
rr * np.cos(np.deg2rad(self['data'].loc[:, 'phi'])), rr * np.sin(np.deg2rad(self['data'].loc[:, 'phi'])), '.', **kw
)
if lim:
t = np.arange(361) * np.pi / 180.0
rmin = np.min(rlim)
rmax = np.max(rlim)
pyplot.plot(rmin * np.cos(t), rmin * np.sin(t), 'k-')
pyplot.plot(rmax * np.cos(t), rmax * np.sin(t), 'k-')
pyplot.gca().set_aspect('equal')
elif radvar == 3:
# psi normal
pyplot.plot(self['data'].loc[:, 'phi'], self['data'].loc[:, 'rad'], '.', **kw)
pyplot.xlabel('phi [deg]')
pyplot.ylabel('psin')
# Created by Trevisan at 01 May 2018 16:20
[docs]class OMFITtrip3Dprobeg(OMFITmatrix):
"""
The OMFITtrip3Dprobeg class parses and handles the TRIP3D 'probe_gb.out' output file.
A self-described xarray object is stored under self['data'].
"""
[docs] @dynaLoad
def load(self):
t = time.time()
OMFITmatrix.load(self, bin=False, zip=False, comment='%')
if self['data'] is not None:
self.toxr()
_print(self, '%.2f s, %s, load from file.' % (time.time() - t, sizeof_fmt(self.filename, ' ')))
[docs] @dynaSave
def save(self):
t = time.time()
with open(self.filename, 'w') as f:
f.write('%' + ' '.join(self['data'].coords['C'].values) + '\n')
OMFITmatrix.save(self, bin=False, zip=False, mode='a', float_format='%.11e', na_rep='nan')
_print(self, '%.2f s, %s, saved to file.' % (time.time() - t, sizeof_fmt(self.filename, ' ')))
[docs] def toxr(self, data=None):
if data is None:
try:
data = self['data'].values
except Exception:
data = self['data']
self['data'] = xr.DataArray(
data,
name='probeg',
dims=['P', 'C'],
coords=[list(range(np.shape(data)[0])), ['phi', 'rr', 'zz', 'Bphi', 'Brr', 'Bzz', 'Bpol', 'Bmag']],
)
[docs] def plot(self, cols=['Bpol', 'Bmag'], phi=None, geqdsk=None, stats=True, **kw):
if geqdsk is None:
try:
geqdsk = eval(treeLocation(self)[-3])['INPUTS']['gEQDSK']
except Exception:
pass
try:
rlim = geqdsk['RLIM']
zlim = geqdsk['ZLIM']
except Exception:
rlim = np.nan
zlim = np.nan
if phi is None:
phi = self['data'].loc[0, 'phi']
(idx,) = np.where(self['data'].loc[:, 'phi'] == phi)
if len(idx):
printi('{:,d} points at phi = {:.1f} deg'.format(np.size(idx), phi.values))
suptitle(treeLocation(self)[-1])
kw.setdefault('cmap', 'plasma')
kw.setdefault('s', 10)
kw.setdefault('edgecolor', '')
kw.setdefault('rasterized', True)
i = 0
for c in cols:
if np.size(cols) > 1:
i += 1
pyplot.subplot(1, np.size(cols), i)
if geqdsk:
pyplot.plot(rlim, zlim, 'k-')
scatter(self['data'].loc[idx, 'rr'], self['data'].loc[idx, 'zz'], c=self['data'].loc[idx, c], **kw)
pyplot.gca().set_aspect('equal')
pyplot.gca().autoscale(tight=True)
pyplot.title(c)
colorbar()
if stats:
printi(
c
+ ' min = %g %s'
% (np.nanmin(self['data'].loc[idx, c]), '(NaNs excluded)' if np.isnan(self['data'].loc[idx, c]).any() else '')
)
printi(
' ' * len(c)
+ ' max = %g %s'
% (np.nanmax(self['data'].loc[idx, c]), '(NaNs excluded)' if np.isnan(self['data'].loc[idx, c]).any() else '')
)
def _print(who, message, prefix=True, tag='i', p=False):
# short versions of tag_print's tags
tags = {
'o': 'STDOUT',
'e': 'STDERR',
'd': 'DEBUG',
'u': 'PROGRAM_OUT',
'r': 'PROGRAM_ERR',
'i': 'INFO',
't': 'HIST',
'h': 'HELP',
'w': 'WARNING',
}
if prefix:
# set the prefix
if prefix == True:
name = who.__class__.__name__
method = extract_stack()[-2][2]
if method == 'doplot':
method = 'plot'
if '__' in method:
method = method[1:5]
pref = '%s.%s' % (name, method)
else:
pref = str(prefix)
message = '%s: %s' % (pref, message)
# printed mask
if p:
msgs = message.split(': ')
message = ' ' * len(': '.join(msgs[:-1])) + ': ' + msgs[-1]
# print the message
tag_print(message, tag=tags[tag])
return True