Source code for omfit_classes.omfit_efund

try:
    # framework is running
    from .startup_choice import *
except ImportError as _excp:
    # class is imported by itself
    if (
        'attempted relative import with no known parent package' in str(_excp)
        or 'No module named \'omfit_classes\'' in str(_excp)
        or "No module named '__main__.startup_choice'" in str(_excp)
    ):
        from startup_choice import *
    else:
        raise

from omfit_classes.omfit_namelist import OMFITnamelist
from omfit_classes.omfit_ascii import OMFITascii
from omfit_classes.fluxSurface import rz_miller
from omfit_classes.omfit_eqdsk import OMFITgeqdsk
from omfit_classes.namelist import NamelistName
import numpy as np
import fortranformat
from omfit_classes.utils_fusion import tokamak
import omas

__all__ = ['OMFITmhdin', 'OMFITdprobe', 'OMFITnstxMHD', 'get_mhdindat', 'green_to_omas']


[docs]class OMFITmhdin(OMFITnamelist): scaleSizes = 50 invalid = 99
[docs] @dynaLoad def load(self, *args, **kw): r""" Load OMFITmhdin file :param \*args: arguments passed to OMFITnamelist.load() :param \**kw: keyword arguments passed to OMFITnamelist.load() """ # load namelist self.outsideOfNamelistIsComment = True self.noSpaceIsComment = True super().load(*args, **kw) if 'IN3' not in self: self['IN3'] = NamelistName() if 'MACHINEIN' not in self: self['MACHINEIN'] = NamelistName() # out-of-namelist format (DIII-D) if 'RF' not in self['IN3']: comment_items = list(self.keys()) for comment_item in comment_items: if '__comment' in comment_item and not isinstance(self, OMFITdprobe): fformat = {} fformat['FC'] = fortranformat.FortranRecordReader('(6e12.6)') fformat['OH'] = fortranformat.FortranRecordReader('(5e10.4)') fformat['VESSEL'] = fortranformat.FortranRecordReader('(6e12.6)') lines = str(self[comment_item]) tmp = lines.split('comment') if len(tmp) > 1: lines, comment = tmp else: lines = tmp[0] comment = '' del self[comment_item] lines = lines.split('\n') lines = [line.expandtabs(12) for line in lines if len(line.strip()) and line.strip()[0] != '!'] # number of elements per section nfc = len(self['IN3'].get('FCTURN', self['IN3'].get('TURNFC', None))) if self['IN5'].get('IECOIL', 0): nve = len(self['IN3']['RSISVS']) ntf = len(lines) - nfc - nve else: ntf = 0 nve = len(lines) - nfc - ntf # poloidal field coils # R,Z,DR,DZ,skew_angle1,skew_angle2 kk = 0 self['FC'] = {} for k in range(nfc): lines[kk] += '%12d%12d' % (0, 0) data = fformat['FC'].read(lines[kk]) # data[4] = '0.0' if not data[4].strip() or float(data[4]) == 0 else data[4] # data[5] = '90.0' if not data[5].strip() or float(data[5]) == 0 else data[5] data = np.array(list(map(float, data))) self['FC']['%d' % (k + 1)] = data kk += 1 self['FC'] = np.array(self['FC'].values()) # ohmic coils # R,Z,DR,DZ,block if not ntf: self['OH'] = np.array([]) else: self['OH'] = {} for k in range(ntf): data = fformat['OH'].read(lines[kk]) self['OH']['%d' % (k + 1)] = data kk += 1 self['OH'] = np.array(self['OH'].values()) # conducting vessel segments # R,Z,DR,DZ,skew_angle1,skew_angle2 if not nve: self['VESSEL'] = np.array([]) else: self['VESSEL'] = {} for k in range(nve): lines[kk] += '%12d%12d' % (0, 0) data = fformat['VESSEL'].read(lines[kk]) # data[4] = '0.0' if not data[4].strip() or float(data[4]) == 0 else data[4] # data[5] = '90.0' if not data[5].strip() or float(data[5]) == 0 else data[5] data = np.array(list(map(float, data))) data = np.array(data) self['VESSEL']['%d' % (k + 1)] = data kk += 1 self['VESSEL'] = np.array(self['VESSEL'].values()) # create namelist elements if len(self['FC']): self['IN3']['RF'] = self['FC'][:, 0] self['IN3']['ZF'] = self['FC'][:, 1] self['IN3']['WF'] = self['FC'][:, 2] self['IN3']['HF'] = self['FC'][:, 3] self['IN3']['AF'] = self['FC'][:, 4] self['IN3']['AF2'] = self['FC'][:, 5] if len(self['VESSEL']): self['IN3']['RVS'] = self['VESSEL'][:, 0] self['IN3']['ZVS'] = self['VESSEL'][:, 1] self['IN3']['WVS'] = self['VESSEL'][:, 2] self['IN3']['HVS'] = self['VESSEL'][:, 3] self['IN3']['AVS'] = self['VESSEL'][:, 4] self['IN3']['AVS2'] = self['VESSEL'][:, 5] if len(self['OH']): self['IN3']['RE'] = self['OH'][:, 0] self['IN3']['ZE'] = self['OH'][:, 1] self['IN3']['WE'] = self['OH'][:, 2] self['IN3']['HE'] = self['OH'][:, 3] self['IN3']['ECID'] = self['OH'][:, 4] else: # initialize namelist for geom in ['R{element}', 'Z{element}', 'W{element}', 'H{element}', 'A{element}', 'A{element}2']: for element in ['E', 'F', 'VS']: item = geom.format(element=element) if item not in self['IN3'] and item not in ['AE', 'AE2']: self['IN3'][item] = [] if 'FC' in self: del self['FC'] if 'OH' in self: del self['OH'] if 'VESSEL' in self: del self['VESSEL']
[docs] @dynaSave def save(self, *args, **kw): r""" Save OMFITmhdin file :param \*args: arguments passed to OMFITnamelist.save() :param \**kw: keyword arguments passed to OMFITnamelist.save() """ # remove non-namelist components angle2_special = {} for item in ['AF2', 'AVS2']: if item not in self['IN3']: continue self['IN3'][item] = np.atleast_1d(self['IN3'][item]) angle2_special[item] = self['IN3'][item].copy() self['IN3'][item][self['IN3'][item] == 90] = 0.0 # save namelist section # restore angles empty = [] for k in list(self['IN3'].keys()): if isinstance(self['IN3'][k], (list, np.ndarray)) and not len(self['IN3'][k]): empty.append(k) del self['IN3'][k] super().save(*args, **kw) for k in list(empty): self['IN3'][k] = []
[docs] @staticmethod def plot_coil(data, patch_facecolor='lightgray', patch_edgecolor='black', label=None, ax=None): """ plot individual coil :param data: FC, OH, VESSEL data array row :param patch_facecolor: face color :param patch_edgecolor: edge color :param label: [True, False] :param ax: axis :return: matplotlib rectangle patch """ import matplotlib.transforms as mtransforms from matplotlib import patches if ax is None: ax = pyplot.gca() rect = patches.Rectangle((0, 0), data[2], data[3], facecolor=patch_facecolor, edgecolor=patch_edgecolor) if len(data) == 6: angle1, angle2 = 90 - data[5], data[4] if angle1 == 90: angle1 = 0 rect.set_transform( mtransforms.Affine2D().translate(-data[2] / 2.0, -data[3] / 2.0) + mtransforms.Affine2D().skew_deg(angle1, angle2) + mtransforms.Affine2D().translate(data[0], data[1]) + ax.transData ) else: rect.set_transform( mtransforms.Affine2D().translate(-data[2] / 2.0, -data[3] / 2.0) + mtransforms.Affine2D().translate(data[0], data[1]) + ax.transData ) ax.add_patch(rect) if label: ax.text(data[0], data[1], label, color='w', size=8, ha='center', va='center', zorder=1000, weight='bold', clip_on=True) ax.text(data[0], data[1], label, color='m', size=8, ha='center', va='center', zorder=1001, clip_on=True) return rect
[docs] def plot_flux_loops(self, display=None, colors=None, label=False, ax=None): """ plot the flux loops :param display: array used to turn on/off display individual flux loops :param colors: array used to set the color of individual flux loops :param label: [True, False] :param ax: axis """ if not hasattr(self['IN3']['RSI'], '__len__') or 'RSI' not in self['IN3']: return if ax is None: ax = pyplot.gca() x0 = self['IN3']['RSI'] y0 = self['IN3']['ZSI'] if colors is not None: c0 = np.squeeze(colors)[: len(x0)] if display is not None: s0 = np.squeeze((display != 0))[: len(x0)] else: s0 = np.ones(x0.shape) s0 *= self.scaleSizes # trim x0 = x0[: len(s0)] y0 = y0[: len(s0)] # disable plotting of dummy flux loops x0 = x0[np.where(y0 != -self.invalid)] y0 = y0[np.where(y0 != -self.invalid)] # plot if colors is not None: ax.scatter(x0, y0, s=s0, c=c0, vmin=0, vmax=vmax, marker='o', cmap=cm, alpha=0.75, zorder=100) else: ax.scatter(x0, y0, s=s0, color='b', marker='o', alpha=0.75, zorder=100) # labels if label and 'LPNAME' in self['IN3']: for k, name in enumerate(self['IN3']['LPNAME']): ax.text(x0[k], y0[k], '\n ' + name, ha='left', va='top', size=8, color='w', zorder=1000, weight='bold', clip_on=True) ax.text(x0[k], y0[k], '\n ' + name, ha='left', va='top', size=8, color='b', zorder=1001, clip_on=True)
[docs] def plot_magnetic_probes(self, display=None, colors=None, label=False, ax=None): """ plot the magnetic probes :param display: array used to turn on/off the display of individual magnetic probes :param colors: array used to set the color of individual magnetic probes :param label: [True, False] :param ax: axis """ # The magnetic probes are characterized by: # - XMP2 and # - YMP2, cartesian coordinates of the center of the probe, # - SMP2, size/length of the probe in meters (read below!), # - AMP2, angle/orientation of the probe in degrees. # # the usual magnetic probe in EFIT is a partial rogowski coil, # yet beware! the EFIT D3D probe file also models saddle loops, # which extend in the toroidal direction and provide integrated # signals. such loops are characterized by a negative length. # # in order to plot non-rogowski coils correctly, a forced 90 deg # counter-clockwise rotation has to be applied on the probe's angle. # # the probes are plotted with different linestyles: rogowski coils # are plotted with a segment centered around a dot, whereas # saddle loops are plotted with a segment with dots on the endpoints. # # FURTHER REFERENCE as explained by T. Strait on 19-jul-2016 # # - The angle AMP2 always indicates the direction of the magnetic field # component that is being measured. # # - The length SMP2 indicates the length (in the R-Z plane) over which # the magnetic field is averaged by the sensor. # # - SMP2 > 0 indicates that the averaging length is in the direction of AMP2. # SMP2 < 0 indicates that the averaging length is perpendicular to AMP2. # # - In predicting the measurement of the sensor for purposes of fitting, # only the length SMP2 is considered. The width of the sensor in the # direction perpendicular to SMP2 (in the R-Z plane) is small and is # therefore neglected. # Since the EFIT equilibrium is assumed to be axisymmetric, the width # of the sensor in the toroidal direction is not relevant. # if 'XMP2' not in self['IN3'] or 'YMP2' not in self['IN3'] or 'SMP2' not in self['IN3'] or 'AMP2' not in self['IN3']: return if ax is None: ax = pyplot.gca() # first, get the arrays and make sure that their dimensions match x0 = np.squeeze(self['IN3']['XMP2']) y0 = np.squeeze(self['IN3']['YMP2']) l0 = np.squeeze(self['IN3']['SMP2']) a0 = np.squeeze(self['IN3']['AMP2']) if colors is not None: c0 = np.squeeze(colors)[: len(x0)] if display is not None: s0 = np.squeeze((display != 0))[: len(x0)] else: s0 = np.ones(x0.shape) s0 *= self.scaleSizes # trim x0 = x0[: len(s0)] y0 = y0[: len(s0)] l0 = l0[: len(s0)] a0 = a0[: len(s0)] # disable plotting of dummy probes l0 = l0[np.where(y0 != -self.invalid)] a0 = a0[np.where(y0 != -self.invalid)] x0 = x0[np.where(y0 != -self.invalid)] y0 = y0[np.where(y0 != -self.invalid)] def probe_endpoints(x0, y0, a0, l0): boo = (1 - np.sign(l0)) / 2.0 cor = boo * np.pi / 2.0 # then, compute the two-point arrays to build the partial rogowskis # as segments rather than single points, applying the correction px = x0 - l0 / 2.0 * np.cos(a0 * np.pi / 180.0 + cor) py = y0 - l0 / 2.0 * np.sin(a0 * np.pi / 180.0 + cor) qx = x0 + l0 / 2.0 * np.cos(a0 * np.pi / 180.0 + cor) qy = y0 + l0 / 2.0 * np.sin(a0 * np.pi / 180.0 + cor) segx = [] segy = [] for k in range(len(x0)): segx.append([px[k], qx[k]]) segy.append([py[k], qy[k]]) return segx, segy # finally, plot segx, segy = probe_endpoints(x0, y0, a0, l0) for k in range(len(x0)): if colors is None: col = 'r' else: col = cm(c0[k]) if l0[k] > 0: ax.plot(segx[k], segy[k], '-', lw=2, color=col, zorder=100, alpha=0.75) ax.plot(x0[k], y0[k], '.', color=col, zorder=100, alpha=0.75, mec='none') else: ax.plot(segx[k], segy[k], '.-', lw=2, color=col, zorder=100, alpha=0.75, mec='none') # labels if label and 'MPNAM2' in self['IN3']: for k, name in enumerate(self['IN3']['MPNAM2']): ax.text(x0[k], y0[k], '\n ' + name, ha='left', va='top', size=8, color='w', zorder=1000, weight='bold', clip_on=True) ax.text(x0[k], y0[k], '\n ' + name, ha='left', va='top', size=8, color='r', zorder=1001, clip_on=True)
[docs] def plot_poloidal_field_coils(self, edgecolor='none', facecolor='orange', label=False, ax=None): """ Plot poloidal field coils :param label: [True, False] :param ax: axis """ if 'RF' not in self['IN3'] or not hasattr(self['IN3']['RF'], '__len__') or len(self['IN3']['RF']) == 0: return if ax is None: ax = pyplot.gca() return self.plot_system('FC', edgecolor=edgecolor, facecolor=facecolor, label=label, ax=ax)
[docs] def plot_ohmic_coils(self, edgecolor='none', facecolor='none', label=False, ax=None): """ Plot ohmic coils :param label: [True, False] :param ax: axis """ if 'RE' not in self['IN3'] or not hasattr(self['IN3']['RE'], '__len__') or len(self['IN3']['RE']) == 0: return if ax is None: ax = pyplot.gca() return self.plot_system('OH', edgecolor=edgecolor, facecolor=facecolor, label=label, ax=ax)
[docs] def plot_vessel(self, edgecolor='none', facecolor='gray', label=False, ax=None): """ Plot vacuum vessel :param label: [True, False] :param ax: axis """ if 'RVS' not in self['IN3'] or not hasattr(self['IN3']['RVS'], '__len__') or len(self['IN3']['RVS']) == 0: return if ax is None: ax = pyplot.gca() return self.plot_system('VESSEL', edgecolor=edgecolor, facecolor=facecolor, label=label, ax=ax)
[docs] def plot_system(self, system, edgecolor, facecolor, label=False, ax=None): """ Plot coil/vessel system :param system: ['FC', 'OH', 'VESSEL'] :param edgecolor: color of patch edges :param facecolor: color of patch fill :param label: [True, False] :param ax: axis """ if ax is None: ax = pyplot.gca() kw = {'ax': ax} kw['patch_facecolor'] = facecolor kw['patch_edgecolor'] = edgecolor in3 = self['in3'] if system == 'OH': bn = 'E' system_array = np.array(list(zip(in3[f'R{bn}'], in3[f'Z{bn}'], in3[f'W{bn}'], in3[f'H{bn}'], in3[f'{bn}CID']))) if system == 'FC': bn = 'F' system_array = np.array(list(zip(in3[f'R{bn}'], in3[f'Z{bn}'], in3[f'W{bn}'], in3[f'H{bn}'], in3[f'A{bn}'], in3[f'A{bn}2']))) if system == 'VESSEL': bn = 'VS' system_array = np.array(list(zip(in3[f'R{bn}'], in3[f'Z{bn}'], in3[f'W{bn}'], in3[f'H{bn}'], in3[f'A{bn}'], in3[f'A{bn}2']))) if system == 'OH': if len(in3['RE']) == 0: return n = int(max(system_array[:, -1])) for k in range(system_array.shape[0]): # disable plotting of dummy probes/loops if system_array[k, 1] == self.invalid: continue if system == 'OH' and facecolor == 'none': kw['patch_facecolor'] = pyplot.cm.viridis(np.linspace(0.0, 1.0, n))[int(system_array[k, -1]) - 1] if label: kw['label'] = '%d' % k self.plot_coil(system_array[k, :], **kw) ax.set_frame_on(False) ax.set_aspect('equal') ax.autoscale(tight=True) ax.set_xlim(ax.get_xlim() * np.array([0.98, 1.02])) ax.set_ylim(ax.get_ylim() * np.array([1.02, 1.02]))
[docs] def plot_domain(self, ax=None): """ plot EFUND computation domain :param ax: axis """ if ax is None: ax = pyplot.gca() from matplotlib import patches rect = patches.Rectangle( (self['IN5']['RLEFT'], self['IN5']['ZBOTTO']), self['IN5']['RRIGHT'] - self['IN5']['RLEFT'], self['IN5']['ZTOP'] - self['IN5']['ZBOTTO'], facecolor='none', edgecolor='black', ls='--', ) ax.add_patch(rect)
[docs] def plot(self, label=False, plot_coils=True, plot_vessel=True, plot_measurements=True, plot_domain=True, ax=None): """ Composite plot :param label: label coils and measurements :param plot_coils: plot poloidal field and oh coils :param plot_vessel: plot conducting vessel :param plot_measurements: plot flux loops and magnetic probes :param plot_domain: plot EFUND computing domain :param ax: axis """ if ax is None: ax = pyplot.gca() if plot_coils: self.plot_poloidal_field_coils(label=label, ax=ax) self.plot_ohmic_coils(label=label, ax=ax) if plot_vessel: self.plot_vessel(label=label, ax=ax) if plot_measurements: self.plot_flux_loops(label=label, ax=ax) self.plot_magnetic_probes(label=label, ax=ax) if not isinstance(self, OMFITdprobe) and plot_domain: self.plot_domain(ax=ax) ax.autoscale(tight=True)
def __call__(self, *args, **kw): r""" Done to override default OMFIT GUI behaviour for OMFITnamelist """ return self.plotFigure(*args, **kw)
[docs] def aggregate_oh_coils(self, index=None, group=None): """ Aggregate selected OH coils into a single coil :param index of OH coils to aggregate :param group: group of OH coils to aggregate """ if group is not None: index = np.where(self['OH'][:, 4] == group)[0] mx = np.min(self['IN3']['RE'][index, 0] - self['OH'][index, 2] / 2.0) MX = np.max(self['OH'][index, 0] + self['OH'][index, 2] / 2.0) my = np.min(self['OH'][index, 1] - self['OH'][index, 3] / 2.0) MY = np.max(self['OH'][index, 1] + self['OH'][index, 3] / 2.0) group = self['OH'][index, 4][0] aggregated_coil = [(MX + mx) / 2.0, (MY + my) / 2.0, (MX - mx), (MY - my), group] index = np.array(sorted(list(set(list(range(self['OH'].shape[0]))).difference(list(index))))) self['OH'] = self['OH'][index, :] self['OH'] = np.vstack((self['OH'], aggregated_coil))
[docs] def disable_oh_group(self, group): """ remove OH group :param group: group of OH coils to disable """ index = np.where(self['OH'][:, 4] != group)[0] self['OH'] = self['OH'][index, :] groups = np.unique(self['OH'][:, 4]) groups_mapper = dict(zip(groups, range(1, len(groups) + 1))) for k in range(self['OH'].shape[0]): self['OH'][k, 4] = groups_mapper[self['OH'][k, 4]]
[docs] def change_R(self, deltaR=0.0): """ add or subtract a deltaR to coils, flux loops and magnetic probes radial location effectively changing the aspect ratio :param deltaR: radial shift in m """ for item in self['IN3']: if item[0].upper() in ['R', 'X']: self['IN3'][item] += deltaR
[docs] def change_Z(self, deltaZ=0.0): """ add or subtract a deltaZ to coils, flux loops and magnetic probes radial location effectively changing the aspect ratio :param deltaR: radial shift in m """ for item in self['IN3']: if item[0].upper() in ['Z', 'Y']: self['IN3'][item] += deltaZ
[docs] def scale_system(self, scale_factor=0.0): """ scale coils, flux loops and magnetic probes radial location effectively changing the major radius holding aspect ratio fixed :param scale_factor: scaling factor to multiple system by """ for item in self['IN3']: if item[0].upper() in ['R', 'X']: self['IN3'][item] *= scale_factor if item[0].upper() in ['Z', 'Y']: self['IN3'][item] *= scale_factor
[docs] def fill_coils_from(self, mhdin): """ Copy FC, OH, VESSEL from passed object into current object, without changing the number of elements in current object. This requires that the number of elements in the current object is greater or equal than the number of elements in the passed object. The extra elements in the current object will be placed at R=0, Z=0 :param mhdin: other mhdin object """ self['RF'][:, 0] = 0.01 self['ZF'][:, 1] = self.invalid self['WF'][:, 2] = 0.01 self['FC'][:, 3] = 0.01 self['FC'][:, 4] = 0.0 self['FC'][:, 5] = 90.0 self['OH'][:, 0] = 0.01 self['OH'][:, 1] = self.invalid self['OH'][:, 2] = 0.01 self['OH'][:, 3] = 0.01 if len(mhdin['OH']): delta_shape = self['OH'].shape[0] - mhdin['OH'].shape[0] if delta_shape and max(mhdin['OH'][:, 4]) >= max(self['OH'][:, 4]): raise ValueError('OMFITmhdin.fill_coils_from() has no space for an extra `invalid` OH group') self['OH'][mhdin['OH'].shape[0] :, 4] = np.linspace( max(mhdin['OH'][:, 4]) + 1, max(self['OH'][:, 4]) + 0.9999, delta_shape ).astype(int) self['OH'][: mhdin['OH'].shape[0], 4] = 0.0 self['VESSEL'][:, 0] = 0.01 self['VESSEL'][:, 1] = self.invalid self['VESSEL'][:, 2] = 0.01 self['VESSEL'][:, 3] = 0.01 self['VESSEL'][:, 4] = 0.0 self['VESSEL'][:, 5] = 90.0 self['IN3']['VSNAME'] = ['DUMMY_%d' % k for k in range(self['VESSEL'].shape[0])] if len(mhdin['VESSEL']): self['IN3']['VSNAME'][: self['VESSEL'].shape[0]] = mhdin['IN3'].get('VSNAME', ['vessel'] * self['VESSEL'].shape[0]) for system in ['FC', 'OH', 'VESSEL']: if len(mhdin[system]): self[system][: len(mhdin[system])] = mhdin[system]
[docs] def modify_vessel_elements(self, index, action=['keep', 'delete'][0]): """ Utility function to remove vessel elements :param index: index of the vessel elements to either keep or delete :param action: can be either 'keep' or 'delete' """ keep_index = index if action == 'delete': keep_index = [k for k in range(len(self['IN3']['VSNAME'])) if k not in index] self['VESSEL'] = self['VESSEL'][keep_index] self['IN3']['VSNAME'] = np.array(self['IN3']['VSNAME'])[keep_index]
[docs] def fill_probes_loops_from(self, mhdin): """ Copy flux loops and magnetic probes from other object into current object, without changing the number of elements in current object This requires that the number of elements in the current object is greater or equal than the number of elements in the passed object. The extra elements in the current object will be placed at R=0, Z=0 :param mhdin: other mhdin object """ for system in [['XMP2', 'YMP2', 'SMP2', 'AMP2', 'MPNAM2'], ['RSI', 'ZSI', 'LPNAME']]: for item in system: if item in ['MPNAM2', 'LPNAME']: self['IN3'][item] = ['DUMMY_%d' % k for k in range(len(self['IN3'][item]))] self['IN3'][item][: len(mhdin['IN3'][item])] = mhdin['IN3'][item] else: self['IN3'][item] *= 0 if item[0] in ['X', 'R']: self['IN3'][item] += 0.01 elif item[0] in ['Y', 'Z']: self['IN3'][item] += -self.invalid elif item[0] in ['S']: self['IN3'][item] += 0.01 self['IN3'][item][: len(mhdin['IN3'][item])] = mhdin['IN3'][item]
[docs] def fill_scalars_from(self, mhdin): """ copy scalar quantities in IN3 and IN5 namelists without overwriting ['IFCOIL', 'IECOIL', 'IVESEL'] :param mhdin: other mhdin object """ for item in self['IN3']: if item in mhdin['IN3']: if isinstance(mhdin['IN3'][item], (int, float)): self['IN3'][item] = mhdin['IN3'][item] else: self['IN3'][item] = np.array(self['IN3'][item]) for item in self['IN5']: if item in mhdin['IN5']: if isinstance(mhdin['IN5'][item], (int, float)): if item not in ['IFCOIL', 'IECOIL', 'IVESEL']: self['IN5'][item] = mhdin['IN5'][item] else: self['IN5'][item] = np.array(self['IN5'][item])
[docs] def pretty_print(self, default_tilt2=0): if 'RF' in self: print('# =======') print('# F-COILS') print('# =======') print('R_fcoil = ', end='') print(repr(self['IN3']['RF'])) print('Z_fcoil = ', end='') print(repr(self['IN3']['ZF'])) print('W_fcoil = ', end='') print(repr(self['IN3']['WF'])) print('H_fcoil = ', end='') print(repr(self['IN3']['HF'])) if 'RSI' in self: print('# ==========') print('# Flux loops') print('# ==========') print('R_flux_loop = ', end='') print(repr(self['IN3']['RSI'])) print('Z_flux_loop = ', end='') print(repr(self['IN3']['ZSI'])) print('name_flux_loop = ', end='') print(repr(self['IN3']['LPNAME'])) if 'XMP2' in self: print('# ===============') print('# Magnetic probes') print('# ===============') print('R_magnetic = ', end='') print(repr(self['IN3']['XMP2'])) print('Z_magnetic = ', end='') print(repr(self['IN3']['YMP2'])) print('A_magnetic = ', end='') print(repr(self['IN3']['AMP2'])) print('S_magnetic = ', end='') print(repr(self['IN3']['SMP2'])) print('name_magnetic = ', end='') print(repr(self['IN3']['MPNAM2'])) return self
[docs] def efund_to_outline(self, coil_data, outline): """ The routine converts efund data format to ods outline format :param coil_data: 6-index array, r,z,w,h,a1,a2 :param outline: ods outline entry :return: outline """ a1 = coil_data[4] * np.pi / 180.0 a2 = coil_data[5] * np.pi / 180.0 if abs(a1) < 1e-8 and abs(a2) > 1e-8: side = coil_data[3] / np.tan(a2) hw1 = (coil_data[2] + side) / 2.0 hw2 = (coil_data[2] - side) / 2.0 hh = coil_data[3] / 2.0 outline['r'] = [ coil_data[0] - hw1, coil_data[0] - hw2, coil_data[0] + hw1, coil_data[0] + hw2, ] outline['z'] = [ coil_data[1] - hh, coil_data[1] + hh, coil_data[1] + hh, coil_data[1] - hh, ] else: side = coil_data[2] * np.tan(a1) hw = coil_data[2] / 2.0 hh1 = (coil_data[3] + side) / 2.0 hh2 = (coil_data[3] - side) / 2.0 outline['r'] = [ coil_data[0] - hw, coil_data[0] - hw, coil_data[0] + hw, coil_data[0] + hw, ] outline['z'] = [ coil_data[1] - hh1, coil_data[1] + hh2, coil_data[1] + hh1, coil_data[1] - hh2, ] return outline
[docs] def outline_to_efund(self, outline): """ The routine converts ods outline format to efund data format Since efund only supports parallelograms and requires 2 sides to be either vertical or horizontal this will likely not match the outline very well. Instead, the parallelogram will only match the angle of the lower left side, the height of the upper right side, and width of the the left most top side. :param outline: ods outline entry :return: 6-index array, r,z,w,h,a1,a2 """ rcent = np.mean(outline['r']) zcent = np.mean(outline['z']) rrel = outline['r'] - rcent zrel = outline['z'] - zcent angle = np.arctan2(zrel, rrel) rsort = [r for _, r in sorted(zip(angle, rrel))] zsort = [z for _, z in sorted(zip(angle, zrel))] r11, z11 = rsort[0], zsort[0] r01, z01 = rsort[1], zsort[1] r00, z00 = rsort[2], zsort[2] r10, z10 = rsort[3], zsort[3] # handle fringe cases that should be valid n = 0 for i in range(4): if angle[i] <= -np.pi / 2: n += 1 if n == 0: if r11**2 + z11**2 > r10**2 + z10**2: r11, z11 = rsort[3], zsort[3] r01, z01 = rsort[0], zsort[0] r00, z00 = rsort[1], zsort[1] r10, z10 = rsort[2], zsort[2] elif n == 2: if r11**2 + z11**2 < r01**2 + z01**2: r11, z11 = rsort[1], zsort[1] r01, z01 = rsort[2], zsort[2] r00, z00 = rsort[3], zsort[3] r10, z10 = rsort[0], zsort[0] height = z00 - z01 width = r00 - r10 a1 = np.arctan2(z01 - z11, r01 - r11) a2 = 0.0 if abs(a1) < 1.0e-8: a1 = 0.0 a2 = np.arctan2(z00 - z01, r00 - r01) return [rcent, zcent, width, height, scipy.degrees(a1), scipy.degrees(a2)]
[docs] def rectangle_to_efund(self, rectangle): r = rectangle['r'] z = rectangle['z'] w = rectangle['width'] h = rectangle['height'] a1 = 0.0 a2 = 0.0 return [r, z, w, h, a1, a2]
[docs] def annulus_to_efund(self, annulus): """ The routine converts an ods annulus format to efund data format by approximating it as a square :param annulus: ods annulus entry :return: 6-index array, r,z,w,h,a1,a2 """ r = annulus['r'] z = annulus['z'] w = 2 * annulus['radius_outer'] h = w a1 = 0.0 a2 = 0.0 return [r, z, w, h, a1, a2]
[docs] def thick_line_to_efund(self, thick_line): """ The routine converts ods thick_line format to efund data format The only time a thick line is a valid shape in efund is when it is vertical or horizontal. All others will not be a great fit, but some approximation is used. :param thick_line: ods thick_line entry :return: 6-index array, r,z,w,h,a1,a2 """ r1, z1 = thick_line['first_point']['r'], thick_line['first_point']['z'] r2, z2 = thick_line['second_point']['r'], thick_line['second_point']['z'] a = np.pi / 2 - np.arctan2(z2 - z1, r2 - r1) dr = thick_line['thickness'] / 2.0 * np.cos(a) dz = thick_line['thickness'] / 2.0 * np.sin(a) r11, z11 = r1 - dr, z1 + dz r01, z01 = r1 + dr, z1 - dz r00, z00 = r2 - dr, z2 + dz r10, z10 = r2 + dr, z2 - dz outline = {"r": [r11, r10, r00, r01], "z": [z11, z10, z00, z01]} return self.outline_to_efund(outline)
[docs] def annular_to_efund(self, annular): """ The routine converts ods annular format to efund data format The only time annular segments are a valid shape in efund is when they are vertical or horizontal. All others will not be a great fit, but some approximation is used. :param annular: ods annular entry :return: 6-index array, r,z,w,h,a1,a2 in which each is an array over the number of segments """ ns = len(annular['centreline.r']) - 1 nt = ns + annular['centreline.closed'] r, z = np.zeros(nt), np.zeros(nt) w, h = np.zeros(nt), np.zeros(nt) a1, a2 = np.zeros(nt), np.zeros(nt) for i in range(ns): fp = {"r": annular['centreline.r'][i], "z": annular['centreline.z'][i]} sp = {"r": annular['centreline.r'][i + 1], "z": annular['centreline.z'][i + 1]} thick_line = {"first_point": fp, "second_point": sp, "thickness": annular['thickness'][i]} [r[i], z[i], w[i], h[i], a1[i], a2[i]] = self.thick_line_to_efund(thick_line) if annular['centreline.closed']: fp = {"r": annular['centreline.r'][-1], "z": annular['centreline.z'][-1]} sp = {"r": annular['centreline.r'][0], "z": annular['centreline.z'][0]} thick_line = {"first_point": fp, "second_point": sp, "thickness": annular['thickness'][-1]} [r[-1], z[-1], w[-1], h[-1], a1[-1], a2[-1]] = self.thick_line_to_efund(thick_line) return [r, z, w, h, a1, a2]
# Generate initial mhdin
[docs] def init_mhdin(self, device): # mhdin = OMFITnamelist(filename = 'mhdin') self['MACHINEIN'] = NamelistName() self['MACHINEIN']['device'] = device self['MACHINEIN']['nfcoil'] = 1 self['MACHINEIN']['nfsum'] = 1 self['MACHINEIN']['nsilop'] = 1 self['MACHINEIN']['magpri'] = 1 self['MACHINEIN']['necoil'] = 1 self['MACHINEIN']['nesum'] = 1 self['MACHINEIN']['nvesel'] = 1 self['MACHINEIN']['nvsum'] = 1 self['MACHINEIN']['nacoil'] = 1 self['IN3'] = NamelistName() self['IN3']['RF'] = [] self['IN3']['ZF'] = [] self['IN3']['WF'] = [] self['IN3']['HF'] = [] self['IN3']['AF'] = [] self['IN3']['AF2'] = [] self['IN3']['TURNFC'] = [] self['IN3']['FCTURN'] = [] self['IN3']['FCID'] = [] self['IN3']['RE'] = [] self['IN3']['ZE'] = [] self['IN3']['WE'] = [] self['IN3']['HE'] = [] self['IN3']['ECTURN'] = [] self['IN3']['ECID'] = [] self['IN3']['RVS'] = [] self['IN3']['ZVS'] = [] self['IN3']['WVS'] = [] self['IN3']['HVS'] = [] self['IN3']['AVS'] = [] self['IN3']['AVS2'] = [] self['IN3']['RSISVS'] = [] self['IN3']['VSID'] = [] self['IN3']['VSNAME'] = [] self['IN3']['RACOIL'] = [] self['IN3']['ZACOIL'] = [] self['IN3']['WACOIL'] = [] self['IN3']['HACOIl'] = [] self['IN3']['RSI'] = [] self['IN3']['ZSI'] = [] # self['IN3']['WSI'] = [] # self['IN3']['HSI'] = [] # self['IN3']['AS'] = [] # self['IN3']['AS2'] = [] self['IN5'] = NamelistName() self['IN5']['IGRID'] = 1 self['IN5']['RLEFT'] = 0.84 self['IN5']['RRIGHT'] = 2.54 self['IN5']['ZBOTTO'] = -1.6 self['IN5']['ZTOP'] = 1.6 self['IN5']['IFCOIL'] = 0 self['IN5']['ISLPFC'] = 0 self['IN5']['NSMP2'] = 25 self['IN5']['IECOIL'] = 0 self['IN5']['IVESEL'] = 0 self['IN5']['IACOIL'] = 0 self['IN5']['mgaus1'] = 8 self['IN5']['mgaus2'] = 12 return self
[docs] def from_omas(self, ods, passive_map='VS'): if 'dataset_description.data_entry.machine' in ods: device = ods['dataset_description.data_entry.machine'] else: device = 'my_device' self = self.init_mhdin(device) ncoil = {'F': 0, 'E': 0, 'ACOIL': 0, 'VS': 0} nsum = {'F': 0, 'E': 0, 'ACOIL': 0, 'VS': 0} # DIII-D coil_map = {'OH': 'E', 'PF': 'F', 'DIV': 'ACOIL'} # ITER coil_map['CS'] = 'E' # Central solenoid coil_map['VS'] = 'F' # Vertical Stabilzation (in-vessel coils) coil_map['TF'] = 'ACOIL' # TF coil busbars coil_map['VC'] = 'S' # Virtual coils (skip) for coil_type in ['pf_active.coil', 'pf_passive.loop']: if not coil_type in ods: continue for isum in ods[coil_type]: coil = ods[coil_type][isum] efund_name = 'S' if coil_type == 'pf_passive.loop': efund_name = passive_map else: for i in coil_map.keys(): if i in coil['name']: efund_name = coil_map[i] if efund_name == 'S': continue nsum[efund_name] += 1 if 'CS1L' in coil['name']: # ITER nsum[efund_name] -= 1 elif efund_name == 'F': if 'nstx' in device or 'mast' in device: self['IN3']['TURNFC'].append(1.0) else: self['IN3']['TURNFC'].append(coil['element'][0]['turns_with_sign']) for ielement in coil['element']: ncoil[efund_name] += 1 element = coil['element'][ielement] if 'rectangle' in element['geometry']: [r, z, w, h, a1, a2] = self.rectangle_to_efund(element['geometry.rectangle']) elif 'annulus' in element['geometry']: [r, z, w, h, a1, a2] = self.annulus_to_efund(element['geometry.annulus']) elif 'outline' in element['geometry']: [r, z, w, h, a1, a2] = self.outline_to_efund(element['geometry.outline']) elif 'thick_line' in element['geometry']: [r, z, w, h, a1, a2] = self.thick_line_to_efund(element['geometry.thick_line']) else: raise ValueError(f'No conversion defined for geometry of {coil_type}.{isum}') self['IN3'][f'R{efund_name}'].append(r) self['IN3'][f'Z{efund_name}'].append(z) self['IN3'][f'W{efund_name}'].append(w) self['IN3'][f'H{efund_name}'].append(h) if efund_name == 'F' or efund_name == 'VS': self['IN3'][f'A{efund_name}'].append(a1) self['IN3'][f'A{efund_name}2'].append(a2) if efund_name == 'VS': self['IN3'][f'VSID'].append(nsum[efund_name]) self['IN5'][f'IVESEL'] = 1 if 'resistance' in coil: self['IN3']['RSISVS'].append(coil['resistance']) else: self['IN3']['RSISVS'].append(coil['resistivity'] / 2 / np.pi / r) self['IN3']['VSNAME'].append(coil['name']) elif efund_name == 'F': self['IN3']['FCID'].append(nsum[efund_name]) if 'nstx' in device or 'mast' in device: self['IN3']['FCTURN'].append(coil['element'][0]['turns_with_sign']) else: self['IN3']['FCTURN'].append(1.0) self['IN5']['IFCOIL'] = 1 elif efund_name != 'ACOIL': self['IN3'][f'{efund_name}CID'].append(nsum[efund_name]) self['IN3'][f'{efund_name}CTURN'].append(coil['element'][0]['turns_with_sign']) self['IN5'][f'I{efund_name}COIL'] = 1 else: self['IN5'][f'I{efund_name}'] = 1 self['MACHINEIN']['nfsum'] = nsum['F'] self['machinein']['nfcoil'] = ncoil['F'] self['MACHINEIN']['nesum'] = nsum['E'] self['machinein']['necoil'] = ncoil['E'] self['MACHINEIN']['nvsum'] = nsum['VS'] self['machinein']['nvesel'] = ncoil['VS'] self['MACHINEIN']['nacoil'] = ncoil['ACOIL'] self['MACHINEIN']['magpri'] = 0 if 'magnetics' in ods: if 'b_field_pol_probe' in ods['magnetics']: self['MACHINEIN']['magpri'] = nmp2 = len(list(filter(None, ods['magnetics.b_field_pol_probe.:.length']))) self['IN3']['XMP2'] = xmp2 = np.zeros(nmp2) self['IN3']['YMP2'] = ymp2 = np.zeros(nmp2) self['IN3']['SMP2'] = smp2 = np.zeros(nmp2) self['IN3']['AMP2'] = amp2 = np.zeros(nmp2) self['IN3']['MPNAM2'] = mpnam2 = np.chararray(nmp2, itemsize=10, unicode=True) n = 0 for i in ods['magnetics.b_field_pol_probe']: probe = ods['magnetics.b_field_pol_probe'][i] if 'length' in probe: xmp2[n] = probe['position.r'] ymp2[n] = probe['position.z'] smp2[n] = probe['length'] amp2[n] = -180 / np.pi * probe['poloidal_angle'] mpnam2[n] = probe['name'] n += 1 self['MACHINEIN']['nsilop'] = 0 if 'flux_loop' in ods['magnetics']: self['MACHINEIN']['nsilop'] = nsilop = len( np.where(np.array(list(filter(None, ods['magnetics.flux_loop.:.type.index']))) == 1)[0] ) self['IN3']['RSI'] = rsi = np.zeros(nsilop) self['IN3']['ZSI'] = zsi = np.zeros(nsilop) self['IN3']['LPNAME'] = lpname = np.chararray(nsilop, itemsize=10, unicode=True) for i in ods['magnetics.flux_loop']: loop = ods['magnetics.flux_loop'][i] if 'type' in loop and 'index' in loop['type'] and loop['type.index'] == 1: rsi[i] = loop['position.0.r'] zsi[i] = loop['position.0.z'] lpname[i] = loop['name'] # only use vessel described in wall if pf_passive hasn't been already # (these should be equivalent, but the annular wall type is harder to represent) if 'wall.description_2d[0].vessel.unit' in ods and not 'pf_passive.loop' in ods: self['IN5']['IVESEL'] = 1 nvesel = 0 for i in ods['wall.description_2d.0.vessel.unit']: unit = ods['wall.description_2d.0.vessel.unit'][i] if 'element' in unit: nvesel += 1 elif 'annular' in unit: nvesel += len(unit['annular.centreline.r']) - 1 + unit['annular.centreline.closed'] else: raise ValueError(f'No conversion defined for geometry of vessel unit {iunit}') self['MACHINEIN']['nvesel'] = self['MACHINEIN']['nvsum'] = nvesel self['IN3']['RVS'] = rvs = np.zeros(nvesel) self['IN3']['ZVS'] = zvs = np.zeros(nvesel) self['IN3']['WVS'] = wvs = np.zeros(nvesel) self['IN3']['HVS'] = hvs = np.zeros(nvesel) self['IN3']['AVS'] = avs = np.zeros(nvesel) self['IN3']['AVS2'] = avs2 = np.zeros(nvesel) self['IN3']['RSISVS'] = rsisvs = np.zeros(nvesel) self['IN3']['VSID'] = vsid = np.zeros(nvesel, dtype=int) self['IN3']['VSNAME'] = vsname = np.chararray(nvesel, itemsize=10, unicode=True) n = 0 for i in ods['wall.description_2d.0.vessel.unit']: unit = ods['wall.description_2d.0.vessel.unit'][i] if 'element' in unit: rvs[n], zvs[n], wvs[n], hvs[n], avs[n], avs2[n] = self.outline_to_efund(unit['element.0.outline']) if 'resistance' in unit['annular.0']: rsisvs[n] = unit['element.0.resistance'] else: rsisvs[n] = unit['element.0.resistivity'] / 2 / np.pi / rvs[n] vsid[n] = n + 1 vsname[n] = unit['name'] n += 1 elif 'annular' in unit: nl = n + len(unit['annular.centreline.r']) - 1 + unit['annular.centreline.closed'] rvs[n:nl], zvs[n:nl], wvs[n:nl], hvs[n:nl], avs[n:nl], avs2[n:nl] = self.annular_to_efund(unit['annular']) # IMAS data schema is missing annular resistance... # if 'resistance' in unit['annular']: # rsisvs[n:nl] = unit['annular.resistance'] # else: rsisvs[n:nl] = unit['annular.resistivity'] / 2 / np.pi / rvs[n:nl] vsid[n:nl] = np.arange(n + 1, nl + 1) vsname[n:nl] = unit['name'] n = nl # currently unused (not part of mhdin.dat, but could be used to make lim.dat files) # if 'wall.description_2d[0].limiter.unit[0].outline' in ods: # rwall = ods['wall.description_2d[0].limiter.unit[0].outline.r'] # zwall = ods['wall.description_2d[0].limiter.unit[0].outline.z'] return self
[docs] def to_omas(self, ods=None, update=['pf_active', 'flux_loop', 'b_field_pol_probe', 'vessel']): """ Transfers data in EFIT mhdin.dat format to ODS WARNING: only rudimentary identifies are assigned for pf_active You should assign your own identifiers and only rely on this function to assign numerical geometry data. :param ods: ODS instance Data will be added in-place :param update: systems to populate ['oh', 'pf_active', 'flux_loop', 'b_field_pol_probe'] ['magnetics'] will enable both ['flux_loop', 'b_field_pol_probe'] NOTE that in IMAS the OH information goes under `pf_active` :return: ODS instance """ from omas.omas_plot import geo_type_lookup from omas import omas_environment, ODS if ods is None: ods = ODS() # pf_active if 'pf_active' in update and 'RE' in self['IN3']: r = self['IN3']['RE'] z = self['IN3']['ZE'] width = self['IN3']['WE'] height = self['IN3']['HE'] turns = self['IN3']['ECTURN'] elements_id = (np.array(self['IN3']['ECID']) - 1).astype(int) rect_code = geo_type_lookup('rectangle', 'pf_active', ods.imas_version, reverse=True) with omas_environment(ods, dynamic_path_creation='dynamic_array_structures'): for i in range(len(r)): c = elements_id[i] e = sum(elements_id[:i] == elements_id[i]) ods['pf_active.coil'][c]['name'] = f'OH{c}' ods['pf_active.coil'][c]['identifier'] = f'OH{c}' ods['pf_active.coil'][c]['element'][e]['name'] = f'OH{c}_{e}' ods['pf_active.coil'][c]['element'][e]['identifier'] = f'OH{c}_{e}' ods['pf_active.coil'][c]['element'][e]['turns_with_sign'] = turns[i] rect = ods['pf_active.coil'][c]['element'][e]['geometry.rectangle'] rect['r'] = r[i] rect['z'] = z[i] rect['width'] = width[i] rect['height'] = height[i] ods['pf_active.coil'][c]['element'][e]['geometry.geometry_type'] = rect_code if 'pf_active' in update and 'RF' in self['IN3']: r = self['IN3']['RF'] z = self['IN3']['ZF'] width = self['IN3']['WF'] height = self['IN3']['HF'] angle1 = self['IN3']['AF'] angle2 = self['IN3']['AF2'] turns = self['IN3']['FCTURN'] if 'TURNFC' in self['IN3']: turnfc = self['IN3']['TURNFC'] else: turnfc = np.ones(int(max(self['IN3']['FCID']))) elements_id = (np.array(self['IN3']['FCID']) - 1).astype(int) rect_code = geo_type_lookup('rectangle', 'pf_active', ods.imas_version, reverse=True) outline_code = geo_type_lookup('outline', 'pf_active', ods.imas_version, reverse=True) offset = len(ods['pf_active.coil']) for i in range(len(r)): c = elements_id[i] + offset e = sum(elements_id[:i] == elements_id[i]) ods['pf_active.coil'][c]['name'] = f'PF{c}' ods['pf_active.coil'][c]['identifier'] = f'PF{c}' ods['pf_active.coil'][c]['element'][e]['name'] = f'PF{c}_{e}' ods['pf_active.coil'][c]['element'][e]['identifier'] = f'PF{c}_{e}' ods['pf_active.coil'][c]['element'][e]['turns_with_sign'] = turns[i] * turnfc[elements_id[i]] if angle1[i] == 0 and angle2[i] == 0: rect = ods['pf_active.coil'][c]['element'][e]['geometry.rectangle'] rect['r'] = r[i] rect['z'] = z[i] rect['width'] = width[i] rect['height'] = height[i] ods['pf_active.coil'][c]['element'][e]['geometry.geometry_type'] = rect_code else: outline = ods['pf_active.coil'][c]['element'][e]['geometry.outline'] outline = self.efund_to_outline([r[i], z[i], width[i], height[i], angle1[i], angle2[i]], outline) ods['pf_active.coil'][c]['element'][e]['geometry.geometry_type'] = outline_code # flux_loop if ('magnetics' in update or 'flux_loop' in update) and 'RSI' in self['IN3']: R_flux_loop = np.atleast_1d(self['IN3']['RSI']) Z_flux_loop = np.atleast_1d(self['IN3']['ZSI']) name_flux_loop = list(map(lambda x: x.strip(), np.atleast_1d(self['IN3']['LPNAME']))) with omas_environment(ods, cocosio=1): for k, (r, z, name) in enumerate(zip(R_flux_loop, Z_flux_loop, name_flux_loop)): ods[f'magnetics.flux_loop.{k}.name'] = name ods[f'magnetics.flux_loop.{k}.identifier'] = name ods[f'magnetics.flux_loop.{k}.position[0].r'] = r ods[f'magnetics.flux_loop.{k}.position[0].z'] = z ods[f'magnetics.flux_loop.{k}.type.index'] = 1 # b_field_pol_probe if ('magnetics' in update or 'b_field_pol_probe' in update) and 'XMP2' in self['IN3']: R_magnetic = self['IN3']['XMP2'] Z_magnetic = self['IN3']['YMP2'] A_magnetic = self['IN3']['AMP2'] S_magnetic = self['IN3']['SMP2'] name_magnetic = list(map(lambda x: x.strip(), self['IN3']['MPNAM2'])) with omas_environment(ods, cocosio=1): for k, (r, z, a, s, name) in enumerate(zip(R_magnetic, Z_magnetic, A_magnetic, S_magnetic, name_magnetic)): ods[f'magnetics.b_field_pol_probe.{k}.name'] = name ods[f'magnetics.b_field_pol_probe.{k}.identifier'] = name ods[f'magnetics.b_field_pol_probe.{k}.position.r'] = r ods[f'magnetics.b_field_pol_probe.{k}.position.z'] = z ods[f'magnetics.b_field_pol_probe.{k}.length'] = s ods[f'magnetics.b_field_pol_probe.{k}.poloidal_angle'] = -a / 180 * np.pi ods[f'magnetics.b_field_pol_probe.{k}.toroidal_angle'] = 0.0 / 180 * np.pi ods[f'magnetics.b_field_pol_probe.{k}.type.index'] = 1 ods[f'magnetics.b_field_pol_probe.{k}.turns'] = 1 # Vessel if 'vessel' in update and 'RVS' in self['IN3'] and len(self['IN3']['RVS']): r = self['IN3']['RVS'] z = self['IN3']['ZVS'] width = self['IN3']['WVS'] height = self['IN3']['HVS'] angle1 = self['IN3']['AVS'] angle2 = self['IN3']['AVS2'] resistance = self['IN3']['RSISVS'] resistivity = np.array(self['IN3']['RSISVS']) * 2 * np.pi * np.array(r) elements_id = (np.array(self['IN3']['VSID']) - 1).astype(int) for i in range(len(r)): c = elements_id[i] e = sum(elements_id[:i] == elements_id[i]) ods['pf_passive.loop'][c]['name'] = f'VS{c}' # ods['pf_passive.loop'][c]['identifier'] = f'PF{c}' ods['pf_passive.loop'][c]['element'][e]['name'] = f'VS{c}_{e}' ods['pf_passive.loop'][c]['element'][e]['identifier'] = f'VS{c}_{e}' ods['pf_passive.loop'][c]['element'][e]['turns_with_sign'] = 1.0 ods['pf_passive.loop'][c]['resistance'] = resistance[c] ods['pf_passive.loop'][c]['resistivity'] = resistivity[c] if angle1[i] == 0 and angle2[i] == 0: rect = ods['pf_passive.loop'][c]['element'][e]['geometry.rectangle'] rect['r'] = r[i] rect['z'] = z[i] rect['width'] = width[i] rect['height'] = height[i] ods['pf_passive.loop'][c]['element'][e]['geometry.geometry_type'] = 2 else: outline = ods['pf_passive.loop'][c]['element'][e]['geometry.outline'] outline = self.efund_to_outline([r[i], z[i], width[i], height[i], angle1[i], angle2[i]], outline) ods['pf_passive.loop'][c]['element'][e]['geometry.geometry_type'] = 1 return ods
[docs] def from_miller(self, a=1.2, R=3.0, kappa=1.8, delta=0.4, zeta=0.0, zmag=0.0, nf=14, wf=0.05, hf=0.05, turns=100): self = self.init_mhdin('device') Rf, Zf = rz_miller(a=a, R=R, kappa=kappa, delta=delta, zeta=zeta, zmag=zmag, poloidal_resolution=nf + 2) Rf = Rf[1:-1] Zf = Zf[1:-1] # Conservatively set grid to include f-coils (bad idea for tokamak with blanket) self['IN5']['RLEFT'] = np.min(Rf) - 0.5 * wf self['IN5']['RRIGHT'] = np.max(Rf) + 0.5 * wf self['IN5']['ZTOP'] = np.max(Zf) + 0.5 * hf self['IN5']['ZBOTTO'] = np.min(Zf) - 0.5 * hf self['MACHINEIN']['nfsum'] = self['MACHINEIN']['nfcoil'] = nf self['IN5']['IFCOIL'] = 1 self['IN3']['FCID'] = np.arange(1, nf + 1, 1) self['IN3']['RF'] = Rf self['IN3']['ZF'] = Zf self['IN3']['WF'] = np.ones(nf) * wf self['IN3']['HF'] = np.ones(nf) * hf self['IN3']['AF'] = np.zeros(nf) self['IN3']['AF2'] = 90 * np.ones(nf) self['IN3']['FCTURN'] = np.ones(nf) self['IN3']['TURNFC'] = turns * np.ones(nf) self['IN3']['RSI'] = np.min(Rf) - 1 * wf self['IN3']['RE'] = np.min(Rf) - 1 * wf self['IN3']['MPNAM2'] = 'MP_A' self['IN3']['LPNAME'] = 'LP_A' return self
[docs] def fake_geqdsk(self, rbbbs, zbbbs, rlim, zlim, Bt, Ip, nw, nh): """ This function generates a fake geqdsk that can be used for fixed boundary EFIT modeling :param rbbbs: R of last closed flux surface [m] :param zbbbs: Z of last closed flux surface [m] :param rlim: R of limiter [m] :param zlim: Z of limiter [m] :param Bt: Central magnetic field [T] :param Ip: Plasma current [A] """ geqdsk = OMFITgeqdsk('geqdsk') geqdsk['CASE'] = ['1', '2', '3', '#999999', '1000ms', '6'] geqdsk['NW'] = nw geqdsk['NH'] = nh geqdsk['RDIM'] = self['IN5']['RRIGHT'] - self['IN5']['RLEFT'] geqdsk['ZDIM'] = self['IN5']['ZTOP'] - self['IN5']['ZBOTTO'] geqdsk['RLEFT'] = self['IN5']['RLEFT'] geqdsk['RCENTR'] = 0.5 * (self['IN5']['RRIGHT'] + self['IN5']['RLEFT']) geqdsk['ZMID'] = 0.5 * (self['IN5']['ZTOP'] + self['IN5']['ZBOTTO']) geqdsk['RMAXIS'] = 0.0 geqdsk['ZMAXIS'] = 0.0 geqdsk['SIMAG'] = 0.0 geqdsk['SIBRY'] = 0.0 geqdsk['BCENTR'] = Bt geqdsk['CURRENT'] = Ip geqdsk['FPOL'] = np.zeros(nw) geqdsk['PRES'] = np.zeros(nw) geqdsk['FFPRIM'] = -1 * np.ones(nw) geqdsk['PPRIME'] = -1 * np.ones(nw) geqdsk['PSIRZ'] = np.zeros([nw, nh]) geqdsk['QPSI'] = np.zeros(nw) geqdsk['NBBBS'] = len(rbbbs) geqdsk['RBBBS'] = rbbbs geqdsk['ZBBBS'] = zbbbs geqdsk['LIMITR'] = len(rlim) geqdsk['RLIM'] = rlim geqdsk['ZLIM'] = zlim geqdsk['KVTOR'] = 0 geqdsk['RVTOR'] = 1.0 geqdsk['NMASS'] = 0 geqdsk['RHOVN'] = np.zeros(nw) # Rederive AuxQuantities geqdsk.save() geqdsk.load() return geqdsk
[docs]class OMFITdprobe(OMFITmhdin):
[docs] @dynaLoad def load(self, *args, **kw): self.outsideOfNamelistIsComment = True self.noSpaceIsComment = True OMFITnamelist.load(self, *args, **kw) for item in list(self['IN3'].keys()): if item.upper() not in [ 'RSISVS', 'TURNFC', 'VSNAME', 'LPNAME', 'MPNAM2', 'RSI', 'ZSI', 'XMP2', 'YMP2', 'AMP2', 'SMP2', 'PATMP2', ]: del self['IN3'][item]
[docs]class OMFITnstxMHD(SortedDict, OMFITascii): """ OMFIT class to read NSTX MHD device files such as `device01152015.dat`, `diagSpec01152015.dat` and `signals_020916_PF4.dat` """ def __init__(self, filename, use_leading_comma=None, **kw): r""" OMFIT class to parse NSTX MHD device files :param filename: filename :param \**kw: arguments passed to __init__ of OMFITascii """ OMFITascii.__init__(self, filename, **kw) SortedDict.__init__(self) self.dynaLoad = True
[docs] @dynaLoad def load(self): self.clear() with open(self.filename, 'r') as f: lines = f.read().split('\n') lines = [line for line in lines if len(line.strip()) and not line.startswith(';')] self['all'] = {} for line in lines: line, description = (line + ';').split(';', 1) line = line.split() if line[1] not in self: self[line[1]] = {} piece = len(self[line[1]]) # signals.dat if len(line) in [16, 17]: self.type = 'signals' self[line[1]][piece] = dict( zip( 'name type map_var map_index mds_tree read_sig rel_error abs_error sig_thresh use_err scale pri fitwt t_index t_smooth mds_name'.split(), line, ) ) # diagspec.dat elif len(line) == 12: self.type = 'diagspec' self[line[1]][piece] = dict(zip('name type id rc zc wc hc ac ac2 turns div material'.split(), line)) # device.dat elif len(line) == 8: self.type = 'device' self[line[1]][piece] = dict(zip('tag type units rc zc pol_ang tor_ang1 tor_ang2'.split(), line)) else: raise ValueError(f'Format not recognized {len(line)}') self[line[1]][piece]['description'] = description.strip(';') for item in self[line[1]][piece]: try: self[line[1]][piece][item] = ast.literal_eval(self[line[1]][piece][item]) except (ValueError, SyntaxError): pass self['all'][line[0]] = self[line[1]][piece] # postprocess if self.type == 'signals': mds_tree_map = {'o': 'operations', 'e': 'engineering', 'a': 'activespec', 'r': 'radiatiion', 'E2': 'efit02', '-': 'computed'} for group in self: if group not in ['all']: for item in self[group]: self[group][item]['mds_tree'] = mds_tree_map.get(self[group][item]['mds_tree'], self[group][item]['mds_tree']) # sort entries according to their `map` fields self['mappings'] = {} for group in self: for item in self[group]: map_var = self[group][item].get('map_var', 'none') self['mappings'].setdefault(map_var, {}) map_index = self[group][item].get('map_index', len(self['mappings'][map_var])) self['mappings'][map_var][map_index] = value = self[group][item] if 'mds_tree' in value: if value['mds_tree'] == 'computed' and value['mds_name'] in self['all']: value['mds_tree_resolved'] = self['all'][value['mds_name']]['mds_tree'] value['mds_name_resolved'] = self['all'][value['mds_name']]['mds_name'] + '/' + str(value['scale']) else: value['mds_tree_resolved'] = value['mds_tree'] value['mds_name_resolved'] = value['mds_name'] return self
[docs] def pretty_print(self): """ Print data in file as arrays, as it is needed for a fortran namelist """ for group in self: for item in self[group][0]: print(f"{group.lower()}_{item.split('(')[0].lower()} = ", end='') print([self[group][k][item] for k in self[group]]) return self
[docs]def get_mhdindat( device=None, pulse=None, select_from_dict=None, filenames=['dprobe.dat', 'mhdin.dat'], ): """ :param device: name of the device to get the mhdin.dat file of :param pulse: for certain devices the mhdin.dat depends on the shot number :param select_from_dict: select from external dictionary :param filenames: filenames to get, typically 'mhdin.dat' and/or 'dprobe.dat' NOTE: 'dprobe.dat' is a subset of 'mhdin.dat' :return: OMFITmhdin object """ if device is None: selected_device = "*" else: selected_device = tokamak(device, 'OMAS').lower() mhd = dict() if select_from_dict is not None: for item in list(select_from_dict.keys()): if os.path.basename(select_from_dict[item].filename) in filenames: mhd[item] = select_from_dict[item] else: for device_dir in glob.glob(os.sep.join([omas.omas_dir, 'machine_mappings', 'support_files', '*'])): device = tokamak(os.path.basename(device_dir)) for mhdin in filenames: if mhdin == 'mhdin.dat': OMFIT_mhdclass = OMFITmhdin else: OMFIT_mhdclass = OMFITdprobe filename = os.sep.join([device_dir, mhdin]) if os.path.exists(filename): mhd[f'{device}_000000'] = OMFIT_mhdclass(filename) else: for device_dir_subdir in glob.glob(os.sep.join([device_dir, '*'])): filename = os.sep.join([device_dir_subdir, mhdin]) ranges_filename = os.sep.join([device_dir_subdir, 'ranges.dat']) if os.path.exists(filename) and os.path.exists(ranges_filename): with open(os.sep.join([device_dir_subdir, 'ranges.dat']), 'r') as f: start_at = int(f.read().split()[0]) mhd[f'{device}_{start_at:06d}'] = OMFIT_mhdclass(filename) if selected_device != '*': latest = None for item in list(sorted(mhd.keys())): try: device, shot = item.split("_") shot = int(shot) except Exception: continue if tokamak(selected_device) == device: latest = mhd[item] if tokamak(selected_device) == device and pulse is not None and shot >= pulse: return mhd[item] if latest is None: raise ValueError( f"No mhdin.dat for {selected_device}. Valid devices are " + str(np.unique([item.split("_")[0] for item in mhd])) ) return latest return mhd
[docs]def green_to_omas( ods=None, filedir='/fusion/projects/codes/efit/efitai/efit_support_files/DIII-D/green/168191/', nw=129, nh=129, nsilop=44, magpri=76, nesum=6, nfsum=18, nvsum=24, ): """ This function reads EFUND generate Green's function tables and put them into IMAS :param ods: input ods to populate :param filedir: directory which contains EFUND generated binary files :param nw: number of horizontal grid points :param nw: number of vertical grid points :param magpri: number of magnetic probes (will be overwritten if available in directory) :param nsilop: number of flux loops (will be overwritten if available in directory) :param nesum: number of e-coils (will be overwritten if available in directory) :param nfsum: number of f-coils (will be overwritten if available in directory) :param nvsum: number of vessel structures (will be overwritten if available in directory) returns ods """ from omas import omas_environment, ODS try: filename = '/mhdin.dat' file = filedir + filename mhdin = OMFITnamelist(file) if 'machinein' in mhdin: nfsum = mhdin['machinein']['nfsum'] nesum = mhdin['machinein']['nesum'] nvsum = mhdin['machinein']['nvsum'] magpri = mhdin['machinein']['magpri'] nsilop = mhdin['machinein']['nsilop'] except Exception: try: filename = '/dprobe.dat' # pretend dprobe file is mhdin for coil names file = filedir + filename mhdin = OMFITnamelist(file) printw('could not read array sizes from mhdin.dat') except Exception: printw('could not read array sizes from mhdin.dat or dprobe.dat') if ods is None: ods = ODS() green = {} green['nesum'] = nesum green['nsilop'] = nsilop green['magpri'] = magpri green['nfsum'] = nfsum green['nvsum'] = nvsum green['nw'] = nw green['nh'] = nh ods['em_coupling.code.name'] = 'EFUND' ods['em_coupling.ids_properties.homogeneous_time'] = 2 # Missing/TODO: These should be setup as URIs now but not sure how to do that... using names or meaningless strings for now if magpri > 0: ods['em_coupling.b_field_pol_probes'] = mhdin['in3']['mpnam2'] else: ods['em_coupling.b_field_pol_probes'] = np.empty(0) if nsilop > 0: ods['em_coupling.flux_loops'] = mhdin['in3']['lpname'] else: ods['em_coupling.flux_loops'] = np.empty(0) if nesum > 0 or nfsum > 0: ods['em_coupling.active_coils'] = np.chararray(nesum + nfsum, itemsize=1, unicode=True) else: ods['em_coupling.active_coils'] = np.empty(0) if nvsum > 0: ods['em_coupling.passive_loops'] = mhdin['in3']['vsname'] else: ods['em_coupling.passive_loops'] = np.empty(0) ods['em_coupling.plasma_elements'] = np.chararray(nw * nh, itemsize=1, unicode=True) endian = '>' # Response of poloidal field (F) coils on grid, and grid on itself. filename = f'ec{nw}{nh}.ddd' file = filedir + filename f = scipy.io.FortranFile(file, 'r', f'{endian}i4') [mw, mh] = f.read_ints(f'{endian}i4') grid = f.read_reals(f'{endian}f8') green['rgrid'] = grid[:mw] green['zgrid'] = grid[mw:] green['RR'], green['ZZ'] = np.meshgrid(green['rgrid'], green['zgrid']) try: ods['equilibrium.time_slice.0.profiles_2d.0.grid.dim1'] = green['rgrid'] ods['equilibrium.time_slice.0.profiles_2d.0.grid.dim2'] = green['zgrid'] except Exception: printw('could not write grid sizes to equilibium') ggridfc = f.read_reals(f'{endian}f8').reshape([nfsum, mw * mh]).T ods['em_coupling.mutual_plasma_plasma'] = f.read_reals(f'{endian}f8').reshape(mw, mh * mw).T # Response of poloidal field coil on flux loops, magnetic probes, and grid filename = f'rfcoil.ddd' file = filedir + filename f = scipy.io.FortranFile(file, 'r', f'{endian}i4') rsilfc = f.read_reals(f'{endian}f8').reshape([nfsum, nsilop]).T rmp2fc = f.read_reals(f'{endian}f8').reshape([nfsum, magpri]).T # Response of Ohmic heating coil on flux loops, magnetic probes, and plasma try: filename = f're{nw}{nh}.ddd' file = filedir + filename f = scipy.io.FortranFile(file, 'r', f'{endian}i4') rsilec = f.read_reals(f'{endian}f8').reshape([nesum, nsilop]).T rmp2ec = f.read_reals(f'{endian}f8').reshape([nesum, magpri]).T gridec = f.read_reals(f'{endian}f8').reshape([nesum, mw * mh]).T ods['em_coupling.mutual_loops_active'] = np.append(rsilec, rsilfc, axis=1) ods['em_coupling.b_field_pol_probes_active'] = np.append(rmp2ec, rmp2fc, axis=1) ods['em_coupling.mutual_plasma_active'] = np.append(gridec, ggridfc, axis=1) no_ecoil = False except Exception: ods['em_coupling.mutual_loops_active'] = rsilfc ods['em_coupling.b_field_pol_probes_active'] = rmp2fc ods['em_coupling.mutual_plasma_active'] = ggridfc no_ecoil = True printw('Ohmic heating coil tables not available') # Response of vessel on flux loops, probes, and plasma try: filename = f'rv{nw}{nh}.ddd' file = filedir + filename f = scipy.io.FortranFile(file, 'r', f'{endian}i4') ods['em_coupling.mutual_loops_passive'] = f.read_reals(f'{endian}f8').reshape([nvsum, nsilop]).T ods['em_coupling.b_field_pol_probes_passive'] = f.read_reals(f'{endian}f8').reshape([nvsum, magpri]).T ods['em_coupling.mutual_plasma_passive'] = f.read_reals(f'{endian}f8').reshape([nvsum, mw * mh]).T except Exception: printw('Vessel not available') # Response of coils on vessel try: gfcvs = f.read_reals(f'{endian}f8').reshape([nvsum, nfsum]) gecvs = f.read_reals(f'{endian}f8').reshape([nvsum, nesum]) if not no_ecoil: ods['em_coupling.mutual_passive_active'] = np.append(gecvs, gfcvs, axis=1) else: ods['em_coupling.mutual_passive_active'] = gfcvs ods['em_coupling.mutual_passive_passive'] = f.read_reals(f'{endian}f8').reshape([nvsum, nvsum]) except Exception: printw('Vessel-coil mutual inductances not available') # Response of magnetics flux loops and probes on plasma filename = f'ep{nw}{nh}.ddd' file = filedir + filename f = scipy.io.FortranFile(file, 'r', f'{endian}i4') ods['em_coupling.mutual_loops_plasma'] = f.read_reals(f'{endian}f8').reshape([mw * mh, nsilop]).T ods['em_coupling.b_field_pol_probes_plasma'] = f.read_reals(f'{endian}f8').reshape([mw * mh, magpri]).T ods['em_coupling.ids_properties.homogeneous_time'] = 2 ods['em_coupling.code.parameters'] = green return ods
############################################ if '__main__' == __name__: from matplotlib import pyplot test_classes_main_header() for mhdin in get_mhdindat().values(): mhdin.load() mhdin.pretty_print() mhdin.plot(label=True) pyplot.show()