Source code for omfit_classes.omfit_ufile

try:
    # framework is running
    from .startup_choice import *
except ImportError as _excp:
    # class is imported by itself
    if (
        'attempted relative import with no known parent package' in str(_excp)
        or 'No module named \'omfit_classes\'' in str(_excp)
        or "No module named '__main__.startup_choice'" in str(_excp)
    ):
        from startup_choice import *
    else:
        raise

from omfit_classes.omfit_ascii import OMFITascii
from omfit_classes.sortedDict import SortedDict
from omfit_classes.utils_plot import View2d, View3d

import numpy as np

__all__ = ['OMFITuFile']


[docs]class OMFITuFile(SortedDict, OMFITascii): r""" Class used to interface with TRANSP U-files :param filename: filename passed to OMFITobject class :param \**kw: keyword dictionary passed to OMFITobject class """ def __init__(self, filename, **kw): OMFITascii.__init__(self, filename, **kw) SortedDict.__init__(self) self.dynaLoad = True
[docs] @dynaLoad def load(self): """ Method used to load the content of the file specified in the .filename attribute :return: None """ if self.filename is None or not os.stat(self.filename).st_size: self.reset() return with open(self.filename, 'r') as f: fl = f.readlines() # if file is empty, make bare bones structure if not len(fl): self.reset() return self['SHOT'] = int(re.sub(r'\s*([0-9]+)\s*\w+\s+[0-9]\s+[0-9]\s+[0-9]*.*', r'\1', fl[0])) self['DEVICE'] = re.sub(r'\s*[0-9]+\s*(\w+)\s+[0-9]\s+[0-9]\s+[0-9]*.*', r'\1', fl[0]).strip() self['NDIM'] = int(re.sub(r'\s*[0-9]+\s*\w+\s+([0-9])\s+[0-9]\s+[0-9]*.*', r'\1', fl[0])) self['COMPRESSION'] = int(re.sub(r'\s[0-9]+\s*\w+\s+[0-9]\s+([0-9])\s+[0-9]*.*', r'\1', fl[0])) try: self['LSHOT'] = max(6, int(re.sub(r'\s*[0-9]*\w+\s+[0-9]\s+[0-9]\s+([0-9]*).*', r'\1', fl[0]))) except Exception: self['LSHOT'] = '' self['DATE'] = fl[1].split(';')[0].strip() nscal = int(fl[2].split(';')[0].strip()) self['SCALARS'] = SortedDict() n = 2 end_scalars = 3 for k in range(nscal): value = float(fl[3 + 2 * k].split(';')[0].strip()) name, desc = fl[3 + 2 * k + 1].split(';')[0].split(':') self['SCALARS'][k] = SortedDict() self['SCALARS'][k]['data'] = value self['SCALARS'][k]['name'] = name.strip() self['SCALARS'][k]['desc'] = desc.strip() n = 3 + 2 * k + 1 end_scalars = 3 + 2 * k + 2 # check dimensionality ndim = -1 for i, line in enumerate(fl[end_scalars:]): try: int(line.split(';')[0].strip()) break except Exception: ndim += 1 # generic variable line reader def setvar(k, line): tmp = [line[:21].strip(), line[21:31].strip()] self[k] = {'name': tmp[0], 'units': tmp[1]} # label independent then dependent variables dims = [] for i in range(ndim): setvar('X{:}'.format(i), fl[end_scalars + i]) dims.append(int(fl[end_scalars + ndim + 2 + i].split(';')[0])) setvar('F', fl[end_scalars + ndim]) # proc code self['PROCESSING_CODE'] = int(fl[end_scalars + ndim + 1].split(';')[0]) # keep the whole header end_header = end_scalars + ndim * 2 + 2 for i, line in enumerate(fl[:end_header]): self['__header%04d__' % i] = line.rstrip('\n') # find the trailer start_trailer = end_header for i, line in enumerate(fl[end_header:]): if 'END-OF-DATA' in line: start_trailer = end_header + i break # read data as array (TRANSP scruncher has non-critical bug where format leaves no space between negatives) flat = ' '.join(fl[end_header:start_trailer]) flat = flat.replace('E-', 'E~').replace('e-', 'e~').replace('-', ' -').replace('E~', 'E-').replace('e~', 'e-') flat = np.array(flat.split(), dtype=float) for i in range(ndim): start = int(np.sum(dims[:i])) self['X{:}'.format(i)]['data'] = flat[start : start + dims[i]] # data read in fortran conventional order? self['F']['data'] = flat[int(np.sum(dims)) :].reshape(dims[::-1]).T self['COMMENTS'] = [] for i, line in enumerate(fl[start_trailer + 1 :]): if 'Beams used:' in line: self['BEAMS'] = [_f for _f in [k.strip(', \n') for k in (line + fl[start_trailer + 2 + i]).split(' ')[3:]] if _f] if len(line.strip('\n')): self['COMMENTS'].append(line.rstrip('\n'))
[docs] @dynaSave def save(self): """ Save Ufile to the file specified in the .filename attribute :return: None """ # write empty line if an empty ufile if not (self['SHOT'] and self['DEVICE'] and self['NDIM']): with open(self.filename, 'w') as f: f.write('') # adds leading space to all lines return tmp = [] if self['LSHOT']: tmp.append( '{:>6}{:<4} {:} {:} {:} ;-SHOT #- F(X) DATA WRITEUF OMFIT'.format( self['SHOT'], self['DEVICE'], self['NDIM'], self['COMPRESSION'], max(6, self['LSHOT']) ) ) fmt = '12.5e' else: tmp.append( '{:>6}{:<4} {:} {:} ;-SHOT #- F(X) DATA WRITEUF OMFIT'.format( self['SHOT'], self['DEVICE'], self['NDIM'], self['COMPRESSION'] ) ) fmt = '11.4E' tmp.append('{:<11} ;-SHOT DATE- UFILES ASCII FILE SYSTEM'.format(self['DATE'])) tmp.append('{:3} ;-NUMBER OF ASSOCIATED SCALAR QUANTITIES-'.format(len(self['SCALARS']))) for k in list(self['SCALARS'].keys()): tmp.append('{:11.4E} ;-SCALAR, LABEL FOLLOWS:'.format(self['SCALARS'][k]['data'])) tmp.append('{:10}{:20}'.format(self['SCALARS'][k]['name'] + ':', self['SCALARS'][k]['desc'])) for i in range(len(self['F']['data'].shape)): k = 'X{:}'.format(i) tmp.append('{:<20}{:<10};-INDEPENDENT VARIABLE LABEL: {:}-'.format(self[k]['name'], self[k]['units'], k)) tmp.append('{:<20}{:<10};-DEPENDENT VARIABLE LABEL-'.format(self['F']['name'], self['F']['units'])) tmp.append('{:<3} ;-PROC CODE- 0:RAW 1:AVG 2:SM 3:AVG+SM'.format(self['PROCESSING_CODE'])) for i in range(len(self['F']['data'].shape)): k = 'X{:}'.format(i) tmp.append('{:>10} ;-# OF X{:} PTS-'.format(len(self[k]['data']), i)) tmp[-1].replace('PTS-', 'PTS- F(X) DATA FOLLOW:') for i in range(len(self['F']['data'].shape)): data = self['X{:}'.format(i)]['data'] dlen = len(data) for i in range(0, dlen, 6): j = min(6, dlen - i) tmp.append((('{:' + fmt + '} ') * j).format(*data[i : i + j]).rstrip(' ')) data = self['F']['data'].T.flatten() dlen = len(data) for i in range(0, dlen, 6): j = min(6, dlen - i) tmp.append((('{:' + fmt + '} ') * j).format(*data[i : i + j]).rstrip(' ')) tmp.append(';----END-OF-DATA-----------------COMMENTS:-----------;') for line in tolist(self['COMMENTS']): tmp.append(line.strip()) with open(self.filename, 'w') as f: f.write(' ' + '\n '.join(tmp) + '\n') # adds leading space to all lines return
[docs] @dynaLoad def plot(self, axes=None, figure=None, cmap=None, **kw): r""" Plot Ufile content :param axes: Axes object or None :param figure: Figure object or None :param cmap: Color map name used for multi-dimensional plots. :param \**kw: Extra key word arguments passed to matplotlib plot function. :return: Figure """ newf = False if axes is None and figure is None: fig, axes = pyplot.subplots() newf = True elif axes is None and figure is not None: fig = figure if fig.axes: axes = fig.axes[0] else: pyplot.figure(num=fig.number) axes = subplot.subplot(111) elif axes is not None: fig = axes.get_figure() im_opt = {} if cmap is not None: im_opt['cmap'] = cmap xlabels = [] for i in range(len(self['F']['data'].shape)): xlabels.append('{:} [{:}]'.format(self['X{:}'.format(i)]['name'], self['X{:}'.format(i)]['units'])) xlabels[-1] = xlabels[-1].replace('[]', '') name = '{:} [{:}]'.format(self['F']['name'], self['F']['units']).replace(' []', '') # 1D data if len(np.squeeze(self['F']['data']).shape) == 1: kw.setdefault('label', name) kw.setdefault('marker', '.' * (len(self['X0']['data']) < 100)) axes.plot(self['X0']['data'], self['F']['data'], **kw) axes.set_xlabel(xlabels[0]) if self['F']['units'] == self['X0']['units'] and self['X0']['units']: axes.set_aspect('equal') leg = axes.legend() if leg: leg.draggable(True) # 2D data elif len(np.squeeze(self['F']['data']).shape) == 2: view = View2d( self['F']['data'], coords=[self['X0']['data'], self['X1']['data']], dims=xlabels, name=name.split()[-1], axes=axes, im_opt=im_opt, **kw, ) view.colorbar.set_label(name) fig, axes = view.fig, view.axm fig.view = view # 3D data elif len(np.squeeze(self['F']['data']).shape) == 3: view = View3d( self['F']['data'], coords=[self['X0']['data'], self['X1']['data'], self['X2']['data']], dims=xlabels, name=name.split()[-1], axes=axes, im_opt=im_opt, **kw, ) view.colorbar.set_label(name) fig, axes = view.fig, view.axm fig.view = view else: 'Plotting not available above 3D' if 'BEAMS' in self and newf: # make a special beam overview fig2, axes = pyplot.subplots() for i in range(len(self['BEAMS'])): axes.plot(self['X0']['data'], self['F']['data'][:, i] / 1e6, label=self['BEAMS'][i]) axes.plot(self['X0']['data'], np.sum(self['F']['data'][:, : len(self['BEAMS'])], 1) / 1e6, 'k--', label='Total') axes.set_xlabel('Time $[s]$') axes.set_ylabel('Power $[MW]$') axes.set_title('Neutral beams power from ' + os.path.split(self.filename)[1]) leg = axes.legend(loc=0, frameon=False).draggable(True) # Additional info about the voltage, and current fractions fig_beam_quants, ax_beam_quants = pyplot.subplots(nrows=4, sharex=True) beams = OMFIT['TRANSP']['UFILES']['NBI']['BEAMS'] first_plot_lines = [] quants = [('power [MW]', 1e-6), ('voltage [kV]', 1e-3), ('full frac', 1), ('half frac', 1)] for ind, beam in enumerate(self['BEAMS']): for jj, (name, mult) in enumerate(quants): cur_ind = jj * len(self['BEAMS']) + ind ll = ax_beam_quants[jj].plot(self['X0']['data'], self['F']['data'][:, cur_ind] * mult, label=beam) if ind == 0: ax_beam_quants[jj].set_ylabel(name) if jj == 0: first_plot_lines.append(ll) ax_beam_quants[-1].set_xlabel('Time [s]') # ax_beam_quants[0].legend(loc='best',frameon=False) ax_beam_quants[0].set_title('Neutral beams power from ' + os.path.split(self.filename)[1]) fig_beam_quants.legend(first_plot_lines, labels=self['BEAMS'], loc='upper right', frameon=False) fig_beam_quants.subplots_adjust(right=0.8) for line in self['COMMENTS']: if 'tilt' in line: printi(line) return fig
[docs] @dynaLoad def smooth(self, window_x=None, window_len=11, window='hanning', axis=-1): """ This built in function makes use of the OMFIT utils.smooth function to smooth over a single dimension of the data. If the axis in question is irregular, the data is first linearly interpolated onto a regular grid with spacing equal to the minimum step size of the irregular grid. :param window_x: Smoothing window size in axis coordinate units. :param window_len: Smoothing window size in index units. Ignored if window_x present. Enforced odd integer. :param window: the type of window from 'flat', 'hanning', 'hamming', 'bartlett', 'blackman' flat window will produce a moving average smoothing. :param axis: Dimension over which to smooth. Accepts integer (0), key ('X0'), or name ('TIME'). :return: OMFITuFile with the requested dimension smoothed """ # generalize the axis specification if isinstance(axis, str): try: axis = int(axis.lstrip('X')) except Exception: for i in range(self['NDIM']): if axis == self['X{:}'.format(i)]['name']: axis = i if not isinstance(axis, int): raise ValueError('Unable to recognize axis {:}'.format(axis)) axis = axis % (self['NDIM']) akey = 'X{:}'.format(axis) # copy if needed newu = copy.deepcopy(self) # force regular grid x = self[akey]['data'] dx = np.diff(x) if not np.all(dx == dx[0]): printd('Interpolating to a regular grid', topic='OMFITuFile') newlen = int(np.ceil((x.max() - x.min()) / dx.min())) newx = np.linspace(x.min(), x.max(), newlen) fun = interp1d(x, self['F']['data'], axis=axis, kind='linear', bounds_error=False) newu[akey]['data'] = newx newu['F']['data'] = fun(newx) newu['COMMENTS'].append('Linearly interpolated to regular {:} grid.'.format(self[akey]['name'])) else: newx = x # smooth if window_x: window_len = int(np.round(window_x / (newx[1] - newx[0]), 0)) window_x = (newx[1] - newx[0]) * window_len newu['F']['data'] = utils.map1d(smooth, newu['F']['data'], axis=axis, window_len=window_len, window=window) # mark file as smoothed newu['PROCESSING_CODE'] = 2 newu['COMMENTS'].append( 'Smoothed over a {:} window with length of {:} in the {:} axis.'.format(window, window_x, newu[akey]['name']) ) return newu
[docs] def crop(self, xmin, xmax, axis=-1, endpoint=True): """ Crop ufile data to only include points within the specified range along a given axis. Modifies the ufile object in place. :param xmin: Lower bound. :param xmax: Upper bound. :param axis: Dimension to bound. Accepts integer (0), key ('X0'), or name ('TIME'). :param endpoint: Keeps an extra point on either end, ensuring the bounds are contained within the range of ufile data. """ # generalize the axis specification if isinstance(axis, str): try: axis = int(axis.lstrip('X')) except Exception: for i in range(self['NDIM']): if axis == self['X{:}'.format(i)]['name']: axis = i if not isinstance(axis, int): raise ValueError('Unable to recognize axis {:}'.format(axis)) axis = axis % (self['NDIM']) akey = 'X{:}'.format(axis) x = np.array(self[akey]['data']) # slice axis if endpoint: xslice = np.array([False] + list((x >= xmin) & (x <= xmax)) + [False]) xslice = xslice[2:] | xslice[1:-1] | xslice[0:-2] else: xslice = (x >= xmin) & (x <= xmax) self[akey]['data'] = self[akey]['data'][xslice] # slice data dslice = [slice(stop) for i, stop in enumerate(np.shape(self['F']['data']))] dslice[axis] = xslice self['F']['data'] = self['F']['data'][dslice] # annotate self['COMMENTS'].append('Cropped range ({:},{:}) along {:} axis'.format(xmin, xmax, akey))
[docs] def from_OMFITprofiles(self, OMFITprofiles_fit): """ populate u-file based on OMFITprofiles fit xarray DataArray :params OMFITprofiles_fit: input OMFITprofiles fit xarray DataArray """ self['NDIM'] = len(OMFITprofiles_fit.values.shape) self['SHOT'] = OMFITprofiles_fit.attrs['shot'] self['DEVICE'] = OMFITprofiles_fit.attrs['device'] self['DATE'] = now() self['COMMENTS'] = tolist('generated from OMFITprofiles') self['X0']['data'] = OMFITprofiles_fit['time'].values / 1000.0 self['X0']['name'] = 'time' self['X0']['units'] = 's' self['F']['data'] = nominal_values(OMFITprofiles_fit.values / 1000.0) self['F']['name'] = OMFITprofiles_fit.name try: self['F']['units'] = OMFITprofiles_fit.attrs['units'] except Exception: self['F']['units'] = '' if self['NDIM'] == 2: self['X1'] = dict() self['X1']['data'] = nominal_values(OMFITprofiles_fit[OMFITprofiles_fit.dims[1]].values) self['X1']['name'] = OMFITprofiles_fit.dims[1] try: self['X1']['units'] = OMFITprofiles_fit[OMFITprofiles_fit.dims[1]].attrs['units'] except Exception: self['X1']['units'] = ''
[docs] def reset(self): '''Set up basic bare bones structure''' self['SHOT'] = 0 self['DEVICE'] = '' self['NDIM'] = 0 self['COMPRESSION'] = 0 self['LSHOT'] = 0 self['DATE'] = '' self['SCALARS'] = SortedDict() self['X0'] = {'name': '', 'units': '', 'data': np.array([])} self['F'] = {'name': '', 'units': '', 'data': np.array([])} self['PROCESSING_CODE'] = 0 self['COMMENTS'] = []