"""
Contains classes and utility/support functions for parsing DIII-D patch panel files
"""
try:
# framework is running
from .startup_choice import *
except ImportError as _excp:
# class is imported by itself
if (
'attempted relative import with no known parent package' in str(_excp)
or 'No module named \'omfit_classes\'' in str(_excp)
or "No module named '__main__.startup_choice'" in str(_excp)
):
from startup_choice import *
else:
raise
import struct
import numpy as np
from omfit_classes.omfit_ascii import OMFITascii
__all__ = ['OMFITpatch', 'OMFITpatchObject', 'OMFITpatchList']
def lrl(item):
"""Shortcut to avoid writing list(range(len(...))) so many times. Does special format for long lists."""
if len(item) > 5:
return '0:{}'.format(len(item) - 1)
return '{}'.format(list(range(len(item))))
[docs]class OMFITpatchList(list):
"""
Handles list-like data within patch panels.
__setitem__ notifies parent object of new assignment so values index/name pairs can be kept self consistent.
Without this intervention, a list's element can be changed without changing the list itself and so the parent
OMFITpatchObject instance gets no notification, and lists of chopper indices and chopper names can get out of sync.
"""
def __init__(self, *args, **kw):
self.OMFITproperties = {}
parent = kw.pop('parent', None)
assert isinstance(parent, (OMFITpatchObject, OMFITpatch)), 'parent object should be OMFITpatch or OMFITpatchObject'
list.__init__(self, *args)
self.parent = parent
@property
def parent(self):
return self.OMFITproperties.get('parent', None)
@parent.setter
def parent(self, value):
self.OMFITproperties['parent'] = value
def __setitem__(self, idx, value):
pkey = [k for k in self.parent if self.parent[k] is self][0]
new_list = [v for v in self]
new_list[idx] = value
self.parent.__setitem__(pkey, new_list)
def __copy__(self):
"""
Instead of a true copy, return a mundane list.
Special properties exist in relation to parent only, and that relationship can't be guaranteed during copy.
"""
return [v for v in self]
def __deepcopy__(self, memo):
"""
Instead of a true copy, return a mundane list.
Special properties exist in relation to parent only, and that relationship can't be guaranteed during copy.
"""
return [copy.deepcopy(v) for v in self]
[docs]class OMFITpatchObject(SortedDict, OMFITobject):
"""
Handles objects within a patch panel (such as a single F coil) and keeps indices & names consistent with each other
"""
rail_map = ['E supply', 'VFI', '?', ''] # Maps rail_name to rail_index
power_supply_map = { # Maps power_supply_name to power_supply_index. Values < 1 are special & affect chopper, too.
-3: '', # RV2 or RV1
0: '', # SHORT
1: 'D',
2: 'V',
3: 'T1',
4: 'T2',
5: 'HV1',
6: 'HV2',
8: 'D2',
}
# If the first chopper is 0, then it's named 'SHORT'. Otherwise, it's '' for unused.
# If dealing with key 'cl' instead of an F-coil, it's 'OPEN'.
chopper_map = (
['?']
+ ['X{}'.format(i + 1) for i in range(20)]
+ ['HX{}'.format(i + 1) for i in range(16)]
+ ['RV1', 'RV2']
+ ['?39', '?40', '?41', 'FSUP']
)
direction_map = ['', 'PUSH', 'PULL']
allowed_choppers = {
'1A': list(range(0, 21)) + [37, 38],
'1B': list(range(0, 21)) + [37, 38], # All X, RV1, RV2
'2A': list(range(0, 21)) + [37, 38],
'2B': list(range(0, 21)) + [37, 38], # All X, RV1, RV2
'3A': list(range(0, 21)) + [37, 38],
'3B': list(range(0, 21)) + [37, 38], # All X, RV1, RV2
'4A': list(range(0, 21)) + [37, 38],
'4B': list(range(0, 21)) + [37, 38], # All X, RV1, RV2
'5A': list(range(0, 21)) + [37, 38],
'5B': list(range(0, 21)) + [37, 38], # All X, RV1, RV2
'6A': [0] + list(range(5, 13)) + list(range(21, 29)) + list(range(31, 37)),
'6B': [0] + list(range(5, 13)) + list(range(21, 31)) + list(range(33, 37)),
'7A': [0] + list(range(5, 13)) + list(range(21, 29)) + list(range(31, 37)),
'7B': [0] + list(range(5, 13)) + list(range(21, 31)) + list(range(33, 37)),
'8A': [0] + list(range(5, 13)) + [19, 20] + list(range(21, 27)) + list(range(31, 35)) + [37, 38],
'8B': [0] + list(range(5, 13)) + [19, 20] + list(range(21, 27)) + list(range(33, 37)) + [37, 38],
'9A': [0] + list(range(5, 13)) + [19, 20] + list(range(21, 27)) + list(range(31, 35)) + [37, 38],
'9B': [0] + list(range(5, 13)) + [19, 20] + list(range(21, 27)) + list(range(33, 37)) + [37, 38],
'cl': [0, 42], # Leave it open or use FSUP to connect D-supply
'HV1': list(range(21, 37)),
'HV2': list(range(21, 37)), # All HX choppers
'D': list(range(1, 30)) + [42], # All X, HX1-9 and FSUP to C-coils
'V': list(range(1, 30)), # All X, HX1-9
'T1': list(range(1, 21)), # All X choppers
'T2': list(range(1, 21)) + list(range(21, 28)), # All X, HX1-7
'D2': list(range(1, 30)), # All X choppers, HX1-9
}
cmc = {'{}{}'.format(i, a): 3 for i in [1, 2, 3, 4, 5, 8] for a in 'AB'}
cmc['9A'] = cmc['9B'] = 4
cmc['6A'] = cmc['6B'] = cmc['7A'] = cmc['7B'] = 6
cmc['cl'] = 1
chopper_max_counts = cmc
lonely_choppers = [37, 38, 42] # These chopper-like items aren't allowed to be in parallel with anything
def __init__(self, filename=None, **kw):
self.OMFITproperties = {}
self.locked = False
OMFITobject.__init__(self, filename, **kw)
SortedDict.__init__(self)
self.filename = filename # This doesn't actually have a filename; it just can't be missing.
return
@property
def locked(self):
return self.OMFITproperties.get('locked', None)
@locked.setter
def locked(self, value):
self.OMFITproperties['locked'] = value
return
def __copy__(self): # https://stackoverflow.com/a/48339837/6605826
"""
Special copy method that unlocks, does copy, then restores locked status.
Without the unlock first, there can be an infinite loop as the special __setitem__ fights itself.
"""
result = OMFITpatchObject()
result.locked = False
for k, v in self.items():
result[k] = self[k]
result.__dict__.update(self.__dict__)
return result
def __deepcopy__(self, memo): # https://stackoverflow.com/a/15774013/6605826
"""
Special copy method that unlocks, does copy, then restores locked status.
Without the unlock first, there can be an infinite loop as the special __setitem__ fights itself.
"""
result = OMFITpatchObject()
memo[id(self)] = result
result.locked = False
for k, v in self.items():
result[copy.deepcopy(k)] = copy.deepcopy(self[k])
if isinstance(self[k], OMFITpatchList):
result[k] = OMFITpatchList(result[k], parent=result)
for k, v in self.__dict__.items():
if k in ['parent', '_OMFITparent']:
setattr(result, k, None)
else:
setattr(result, k, copy.deepcopy(v, memo))
return result
def __setitem__(self, key, value):
"""
The point of this special setitem method is to keep names and indices in sync.
For example, changing 'rail_name' will also change 'rail_index'.
If you want to mess with the file without synchronized updates taking place,
unlock by setting self.locked = False and then this method will pass through
to the super class's setitem method.
:param key:
Key for looking up stuff in self
:param value:
Value to assign to self[key]
:return: None
"""
if not getattr(self, 'locked', False):
# This instance is unlocked, so don't do synchronized updates.
SortedDict.__setitem__(self, key, value)
return
# Handle power supply rails
if key == 'rail_name':
other_key = 'rail_index'
if value not in self.rail_map:
raise ValueError('Invalid power rail name: {}. Valid = {}'.format(value, repr(self.rail_map)))
if value == '':
other_value = 3 # This is for D supply on C coils, which is a special case.
# There may be other weird setups
else:
other_value = self.rail_map.index(value)
elif key == 'rail_index':
value = int(value)
other_key = 'rail_name'
if (value >= len(self.rail_map)) or (value < 0):
raise ValueError('Invalid power rail index: {}. Valid = {}'.format(value, lrl(self.rail_map)))
other_value = self.rail_map[value]
# Handle current directions
elif key == 'direction_name':
other_key = 'direction_index'
if value not in self.direction_map:
raise ValueError('Invalid current direction name: {}. Valid = {}'.format(repr(value), repr(self.direction_map)))
other_value = self.direction_map.index(value)
elif key == 'direction_index':
value = int(value)
other_key = 'direction_name'
if (value >= len(self.direction_map)) or (value < 0):
raise ValueError('Invalid current dir index: {}. Valid = {}'.format(value, lrl(self.direction_map)))
other_value = self.direction_map[value]
# Handle choppers
elif key == 'chopper_name':
other_key = 'chopper_index'
for val in value:
if val not in (self.chopper_map + ['', 'SHORT', 'OPEN', 'SHOR']):
raise ValueError('Invalid chopper name: {}. Valid = {}'.format(repr(val), repr(self.chopper_map)))
other_value = [self.chopper_map.index(val) if val in self.chopper_map else 0 for val in value]
new_count = np.sum([ov != 0 for ov in other_value])
SortedDict.__setitem__(self, 'chopper_count', new_count)
if ('power_supply_name' in self) and (self['power_supply_name'] == ''):
new_power_supply_index = 0 if other_value[0] == 0 else -3
SortedDict.__setitem__(self, 'power_supply_index', new_power_supply_index)
elif key == 'chopper_index':
value = [int(val) for val in value]
other_key = 'chopper_name'
for val in value:
if (val >= len(self.chopper_map)) or (val < 0):
raise ValueError('Invalid chopper index: {}. Valid = {}'.format(val, lrl(self.chopper_map)))
other_value = [self.chopper_map[val] for val in value]
# Resolve stuff stored in the 0 index
if other_value[0] == '?':
if self['coil_index'] == 19:
other_value[0] = 'OPEN'
else:
other_value[0] = 'SHORT'
for i in range(1, len(other_value)):
if other_value[i] == '?':
other_value[i] = ''
new_count = np.sum([ov != '' for ov in other_value])
SortedDict.__setitem__(self, 'chopper_count', new_count)
if self['power_supply_name'] == '':
new_power_supply_index = 0 if value[0] == 0 else -3
SortedDict.__setitem__(self, 'power_supply_index', new_power_supply_index)
elif key == 'chopper_count':
raise ValueError('Do not change chopper count manually')
# Handle power supplies
elif key == 'power_supply_name':
other_key = 'power_supply_index'
if value not in list(self.power_supply_map.values()):
raise ValueError(
'Invalid power supply name: {}. Valid = {}'.format(repr(value), repr(list(self.power_supply_map.values())))
)
other_value = [k for k, v in self.power_supply_map.items() if v == value]
# Resolve possible duplicates that have '' as the value
if (self['chopper_index'][0] > 36) and (0 in other_value):
other_value = [ov for ov in other_value if ov < 0]
elif (self['chopper_index'][0] == 0) and (min(other_value) < 0):
other_value = [ov for ov in other_value if ov >= 0]
if len(other_value) == 1:
other_value = other_value[0]
else:
raise ValueError('Values in power supply map should be unique after handling values of ""')
elif key == 'power_supply_index':
other_key = 'power_supply_name'
if value not in self.power_supply_map:
raise ValueError('Invalid power supply index: {}. Valid = {}'.format(value, repr(list(self.power_supply_map.keys()))))
other_value = self.power_supply_map[value]
# Contingency
else:
other_key = other_value = None
# The OMFITpatchList class notifies its parent when an element is updated, so replace lists with that.
if isinstance(value, list):
value = OMFITpatchList(value, parent=self)
if isinstance(other_value, list):
other_value = OMFITpatchList(other_value, parent=self)
# Write the new values to self
SortedDict.__setitem__(self, key, value)
if other_key is not None and other_value is not None:
SortedDict.__setitem__(self, other_key, other_value)
# Update status
if (getattr(self, '_OMFITparent', None) is not None) and self.locked:
self._OMFITparent.check()
return None
def __tree_repr__(self):
"""Provides custom tree representation for this object"""
try:
patch_type = self._OMFITparent.patch_type
except AttributeError:
patch_type = '?'
if patch_type in ['F', 'P']:
chopper_note = self['chopper_name'][0] if self['chopper_count'] == 1 else '{} choppers'.format(self['chopper_count'])
tree_repr = '{rail:10s} {ps:8s} {dir:8s} {chopper_note:}'.format(
rail=self['rail_name'], ps=self['power_supply_name'], dir=self['direction_name'], chopper_note=chopper_note
)
elif patch_type == 'I':
tree_repr = []
if 'c_coil_parity' in self:
tree_repr += ['C parity: {}'.format(self['c_coil_parity'])]
if 'i_coil_file' in self:
tree_repr += ['I-coils: {}'.format(self['i_coil_file'])]
if np.all([a in self for a in ['slot', 'sign', 'spa', 'c_idx', 'coil']]):
tree_repr += ['{slot:2s} {sign:3s} {spa:4s} {c_idx:1d} {coil:}'.format(**self)]
if np.all(['AA{}'.format(i) in self for i in range(1, 13)]):
aa = []
for i in range(1, 13):
if self['AA{}'.format(i)]:
aa += ['AA{}:{}'.format(i, self['AA{}'.format(i)])]
tree_repr += aa if aa else ['No AA used']
tree_repr = ', '.join(tree_repr)
else:
tree_repr = None
return tree_repr, []
# List attributes which need to be saved with projects.
OMFITpatch_save_attrs = [
'debug_topic',
'patch_type',
'auto_clean',
'status',
'contents',
'original_contents',
'patch_name',
'original_patch_name',
'original_patch_type',
'fpconvert',
'shot',
'_load_complete',
'server',
'tunnel',
'remote_dir',
'work_dir',
# Deliberately not listed:
# 'problems',
]
[docs]class OMFITpatch(SortedDict, OMFITascii):
"""
Parses DIII-D PCS patch panel files.
Several types of files are recognized:
- Type-F: F-coil patch panel in ASCII archival format.
- Type-P: F-coil patch panel in binary format. Can be converted to type-F by
changing .patch_type attribute and executing .add_c_coil() method. May be obsolete.
- Type-I: I&C coil patch panel in ASCII format.
"""
# Define the marker for P-files with binary data. Also split them here.
bin_mark = r'\x00'
def __getattr__(self, attr_name):
"""
Custom attribute access method.
There are too many funky attributes and I don't want to do a @property
declaration for each one, so we're doing this instead.
- https://docs.python.org/3/reference/datamodel.html#customizing-attribute-access
- https://stackoverflow.com/a/52729406/6605826
:param attr_name: string
Name of the attribute
:return: Attribute value
"""
if attr_name in OMFITpatch_save_attrs:
return self.OMFITproperties.get(attr_name, None)
return SortedDict.__getattr__(self, attr_name)
def __setattr__(self, attr_name, value):
"""
Allows handling of a large number of attributes without @property declarations
:param attr_name: string
:param value:
:return: None
"""
if attr_name in OMFITpatch_save_attrs:
self.OMFITproperties[attr_name] = value
if attr_name.startswith('custom_'):
# Setting a custom attribute should update the attribute it overrides, as it does in init
setattr(self, attr_name[len('custom_') :], value)
return
SortedDict.__setattr__(self, attr_name, value)
return
def __delattr__(self, attr_name):
"""
Allows handling of a large number of attributes without @property declarations
:param attr_name: string
:return: None
"""
if attr_name in OMFITpatch_save_attrs:
self.OMFITproperties[attr_name] = None
if attr_name.startswith('custom_'):
# Deleting a custom attribute should revert its counterpart to default
setattr(self, attr_name[len('custom_') :], getattr(self, 'default_' + attr_name[len('custom_') :], None))
return
SortedDict.__delattr__(self, attr_name)
return
def __init__(
self,
filename=None,
shot=None,
patch_type=None,
debug_topic='OMFITpatch',
auto_clean=True,
fpconvert=True,
server=None,
tunnel=None,
work_dir=None,
remote_dir=None,
default_patch_type='F',
**kw,
):
"""
:param filename: string [optional if shot is provided]
Filename of original source file, including path. This will be preserved as
self.source_file, even if the class updates to a temporary file copy.
If shot is provided, filename controls output only; no source file is read.
In this case, filename need not include path.
:param shot: int [optional if filename is provided]
Shot number to use to look up patch data. Must provide patch_type when using shot.
If a filename is provided as well, shot will be used for lookup and filename will control output only.
:param patch_type: None or string
None lets the class auto-assign, which should be fine.
You can force it explicitly if you really want to.
:param debug_topic: string
Topic keyword to pass to printd. Allows all printd from the class to be consistent.
:param auto_clean: bool
Run cleanup method after parsing. This will remove some problems,
but prevent exact recovery of problematic original contents.
:param fpconvert: bool
Automatically convert type P files into type F so they can be saved.
:param server: string [optional if running in OMFIT framework]
Complete access instruction for server that runs viwhed, like "eldond@iris.gat.com:22"
:param tunnel: string [optional if running in OMFIT framework]
Complete access instruction for tunnel used to reach server, like "eldond@cybele.gat.com:2039".
Use empty string '' if no tunnel is needed.
:param work_dir: string [optional if running in OMFIT framework]
Local working directory (temporary/scratch) for temporary files related to remote executable calls
:param remote_dir: string [optional if running in OMFIT framework]
Remote working directory (temporary/scratch) for temporary files related to remote executable calls
:param default_patch_type: string
Patch panel type to assign if auto detection fails, such as if you're initializing a blank patch panel file.
If you are reading a valid patch panel file, auto detection will probably work because it is very good.
Please choose from 'F' or 'I'. 'P' is a valid patch_type, but it's read-only so it's a very bad choice for
initializing a blank file to fill in yourself.
:param kw: Other keywords passed to OMFITascii
"""
self.OMFITproperties = {}
self.source_file = copy.copy(filename) if shot is None else shot
SortedDict.__init__(self)
if filename is None:
fn = funny_random_name_generator(patch_type == 'I')
elif filename.endswith(os.sep):
fn = filename + funny_random_name_generator(patch_type == 'I')
else:
fn = filename
if os.sep in fn:
from pathlib import Path
Path(fn).touch() # OMFITobjects don't like getting a filename with a path unless the file exists
OMFITascii.__init__(self, fn, **kw)
self._load_complete = kw.pop('_load_complete', False)
self.shot = shot
self.server = server
self.tunnel = tunnel
self.remote_dir = remote_dir
self.work_dir = work_dir
self.debug_topic = debug_topic
self.status = 'init'
self.auto_clean = auto_clean
self.fpconvert = fpconvert
self.patch_type = patch_type
if ((shot is None) or self._load_complete) and (filename is not None):
try:
with open(self.filename, 'r') as f:
self.contents = f.read()
except UnicodeDecodeError:
with open(self.filename, 'rb') as f:
self.contents = str(f.read())
elif shot is not None:
self.contents = self._read_shot(default_patch_type)
else:
self.contents = ''
self.original_contents = copy.copy(self.contents)
self.patch_type = self._guess_patch_type(default=default_patch_type) if patch_type is None else patch_type
self.original_patch_type = copy.copy(self.patch_type)
self.patch_name = getattr(self, 'patch_name', None) # Allows for possibility that it was set by OMFITproperties
if self.patch_type == 'F':
if (self.shot is None) or self._load_complete:
# The type-F method is suitable from a type-F FILE or from contents from a shot after parsing
self._read_type_f()
else:
# The type-P method is suitable for contents read directly from a shot
self._read_type_p_inner(self.contents)
if self.auto_clean:
self.printq('Running automatic cleanup of F-patch file to remove junk chopper data')
self._clean_type_f()
elif self.patch_type == 'P':
self._read_type_p()
elif self.patch_type == 'I':
if (self.shot is None) or self._load_complete:
self._read_type_i()
else:
self._read_type_j()
else:
raise ValueError('Invalid or unhandled patch panel file.')
self.check()
self.original_patch_name = copy.copy(self.patch_name)
self.problems = []
if self.filename is None and self.patch_name is not None:
# This must've been read in from a shot numbers instead of a file. Let's make a real filename.
# Generate a temporary directory name
subprocess_dir = '_'.join(map(str, OMFITaux['prun_process']))
if len(subprocess_dir):
subprocess_dir = '__p' + subprocess_dir
directory = (
OMFITcwd + os.sep + 'objects' + os.sep + 'file_' + utils_base.now("%Y-%m-%d__%H_%M" + subprocess_dir + os.sep + "%S__%f")
)
while os.path.exists(directory):
directory += "_"
# Assign the new filename
self.filename = directory + os.sep + self.patch_name
if self.patch_type in ['I', 'F']:
if not os.path.exists(self.filename):
os.makedirs(os.path.split(self.filename)[0])
self.save()
self.printq('Auto-assigned filename {}'.format(self.filename))
# Make sure all saved attributes are restored by allowing them to be passed in through **kw
for attr in OMFITpatch_save_attrs:
kw_attr = kw.pop(attr, None)
if kw_attr is not None:
setattr(self, attr, kw_attr)
if ((filename is None) or filename.endswith(os.sep)) and self.patch_name is not None:
# Replace "working title" filename with good patch name if we were able to find one
self.filename = self.patch_name if filename is None else filename + self.patch_name
self._load_complete = True # Prevents instance from saved project from re-doing read from shot or whatever
return
def __tree_repr__(self):
"""Forms strings to display in OMFIT tree GUI to represent instances of this class"""
file_rep = OMFITascii.__tree_repr__(self)[0]
tree_rep = file_rep + ', STATUS: {}, PATCH TYPE: {}, NAME: {}'.format(self.status, self.patch_type, self.patch_name)
return tree_rep, []
def _read_shot(self, default_patch_type):
"""Reads patch data from a shot number"""
if (self.patch_type is None) and (default_patch_type in ['F', 'I', 'P']):
self.patch_type = default_patch_type
self.original_patch_type = default_patch_type
self.printq('Assigned patch_type to default ("{}") while reading from shot.'.format(default_patch_type))
if self.patch_type in ['F', 'P']:
patch_code = 'PATCH'
elif self.patch_type == 'I':
patch_code = 'SPATCH'
else:
raise ValueError(
'Bad patch_type={}. Please specify patch_type="I" or patch_type="F". True automatic patch_type '
'determination is not possible when reading from a shot number.'.format(repr(self.patch_type))
)
executable = 'module load defaults\nviwhed'
interactive = ['sh {}'.format(self.shot), 'na {}'.format(patch_code), 'df', 'ex']
stdout_catch = []
stderr_catch = []
self._set_default_server_info()
import omfit_classes.OMFITx as OMFITx
OMFITx.executable(
None,
[],
[],
server=self.server,
tunnel=self.tunnel,
remotedir=self.remote_dir,
workdir=self.work_dir,
std_out=stdout_catch,
std_err=stderr_catch,
executable=executable,
interactive_input='\n'.join(interactive),
)
i = 0
props = 'server = {server:}, tunnel = {tunnel:}, remote_dir = {remote_dir:}, work_dir = {work_dir:}'.format(**self.OMFITproperties)
while (i < len(stdout_catch)) and (' ASCII -' not in stdout_catch[i]):
i += 1
if i == len(stdout_catch):
printe('ERROR in OMFITpatch._read_shot: Reached end of stdout without finding expected ASCII marker!')
print('-' * 79)
print('\n'.join(stdout_catch))
print('-' * 79)
print(props)
raise ValueError('Reached end of stdout without finding expected ASCII marker!')
j = i + 1
while (j < len(stdout_catch)) and (' INT16 - ' not in stdout_catch[j]):
j += 1
if i == len(stdout_catch):
printe('ERROR in OMFITpatch._read_shot: Reached end of stdout without finding expected INT16 marker!')
print('-' * 79)
print('\n'.join(stdout_catch))
print('-' * 79)
print(props)
raise ValueError('Reached end of stdout without finding expected INT16 marker!')
return ''.join([line.split('"')[1] for line in stdout_catch[i + 1 : j]])
def _set_default_server_info(self):
"""If any of server, tunnel, work_dir, or remote_dir are None, update them with default values"""
import getpass
server_name = 'iris' # The server options are VERY limited. It's okay to leave this at just iris.
if None in [self.server, self.tunnel, self.remote_dir, self.work_dir]:
# Need to load defaults
dir_suffix = os.sep + str(datetime.datetime.now()).replace(' ', '_').replace(':', '_')
# noinspection PyBroadException
try:
default_server = evalExpr(OMFIT['MainSettings']['SERVER'][server_name]['server'])
default_remote_dir = evalExpr(OMFIT['MainSettings']['SERVER'][server_name]['workDir']) + dir_suffix
localhost = evalExpr(OMFIT['MainSettings']['SERVER']['localhost'])
if isinstance(localhost, str):
localhost = evalExpr(OMFIT['MainSettings']['SERVER'][localhost])
default_work_dir = evalExpr(localhost['workDir']) + dir_suffix
default_tunnel = evalExpr(OMFIT['MainSettings']['SERVER'][server_name]['server'])
except Exception:
# This plan used to be relevant, but maybe not anymore?
# It might trigger on KeyError for 'MainSettings' not being in OMFIT, if that can ever happen
self.printq(
'Failed to reference OMFIT MainSettings. Guessing default settings for remote server access. '
'If these do not work, try specifying server, tunnel, remote_dir, and work_dir next time.'
)
username = getpass.getuser()
default_server = '{}@iris.gat.com:22'.format(username)
default_tunnel = '{}@cybele.gat.com:2039'.format(username)
default_work_dir = tempfile.mkdtemp('_OMFIT_patch_read_shot') + os.sep
default_remote_dir = os.sep.join(['cluster-scratch', username, 'OMFIT', dir_suffix])
self.server = default_server if self.server is None else self.server
self.tunnel = default_tunnel if self.tunnel is None else self.tunnel
self.remote_dir = default_remote_dir if self.remote_dir is None else self.remote_dir
self.work_dir = default_work_dir if self.work_dir is None else self.work_dir
else:
self.printq('server, work_dir, and remote_dir settings are already good; no need to load defaults')
[docs] def check(self):
"""Checks for problems and returns a status string that is either 'OKAY' or a list of problems."""
if self.patch_type in ['F', 'P']:
status = self._check_type_f()
else:
status = '?'
self.status = status
return status
def _check_type_f(self):
"""
Specific checking method for type-F or P patch panel files.
:return: string
"OKAY" if no problems are found. Otherwise, a terse description of problems.
"""
assert self.patch_type in ['F', 'P'], 'This method only applies to patch_type F or P, not {}'.format(self.patch_type)
problems = []
coils = [k for k in self if isinstance(self[k], OMFITpatchObject)]
# Check for duplicate choppers
all_chopper_name = []
for k in coils:
all_chopper_name += self[k]['chopper_name']
ignores = ['', 'SHORT', 'OPEN']
for ignore in ignores:
while ignore in all_chopper_name:
all_chopper_name.remove(ignore)
duplicate_choppers = []
noted = []
for chop in all_chopper_name:
chop_count = all_chopper_name.count(chop)
if chop_count > 1:
if chop not in noted:
c_on_coils = [k for k in coils if chop in self[k]['chopper_name']]
duplicate_choppers += ['{}({})'.format(chop, ','.join(c_on_coils))]
noted += [chop]
if len(duplicate_choppers):
problems += ['DUPL.CHOP: [' + ', '.join(duplicate_choppers) + ']']
# Check for mis-matched current directions for the same power supply
ps_map = OMFITpatchObject.power_supply_map
pwr_dirs = {k: None for k in ps_map if k > 0}
coils_used = {k: [] for k in ps_map if k > 0}
for k in coils:
ps = self[k]['power_supply_index']
if ps in pwr_dirs:
coils_used[ps] += [k]
if pwr_dirs[ps] is None:
pwr_dirs[ps] = self[k]['direction_index']
else:
if pwr_dirs[ps] != self[k]['direction_index']:
pwr_dirs[ps] = -1
dir_problems = []
for k in pwr_dirs:
if pwr_dirs[k] == -1:
dir_problems += ['{}({})'.format(ps_map[k], ','.join(coils_used[k]))]
if len(dir_problems):
problems += ['DIR.CONFL.: [' + ', '.join(dir_problems) + ']']
# Check for missing C-coil
if 'cl' not in self:
problems += ['MISSING.C.CL']
if len(self) > 0:
# Can't have these problems if there are no coils defined
# Check for too many choppers
cmc = self[coils[0]].chopper_max_counts
too_many_choppers = []
for coil in coils:
if coil not in cmc:
too_many_choppers += ['{}({}>?)'.format(coil, self[coil]['chopper_count'])]
elif self[coil]['chopper_count'] > cmc[coil]:
too_many_choppers += ['{}({}>{})'.format(coil, self[coil]['chopper_count'], cmc[coil])]
if too_many_choppers:
problems += ['EXCESS.CHOP.: [' + ', '.join(too_many_choppers) + ']']
# Check for illegal chopper vs. coil and power supply pairings
ac = self[coils[0]].allowed_choppers
illegal_choppers = []
for coil in coils:
illp = []
illc = []
psn = self[coil].get('power_supply_name', '')
if coil not in ac:
self.printq('Coil {} is not recognized in dict of allowed choppers; cannot check properly'.format(coil))
for ci in self[coil]['chopper_index']:
if (coil in ac) and (ci not in ac[coil]):
illc += [self[coil].chopper_map[ci]]
if (psn in ac) and (ci != 0) and (ci not in ac[psn]):
illp += ['{}:{}'.format(self[coil].chopper_map[ci], coil)]
if illc:
illegal_choppers += ['{}({})'.format(coil, ','.join(illc))]
if illp:
illegal_choppers += ['{}({})'.format(psn, ','.join(illp))]
if illegal_choppers:
problems += ['ILLEG.CHOP.: [' + ', '.join(illegal_choppers) + ']']
bad_parallel_choppers = []
for coil in coils:
# Check for parallel chopper-like objects that aren't allowed to be in parallel
for lc in self[coils[0]].lonely_choppers:
if (lc in self[coil]['chopper_index']) and (self[coil]['chopper_count'] > 1):
names = [cn for cn in self[coil]['chopper_name'] if cn]
bad_parallel_choppers += ['{}({})'.format(coil, ', '.join(names))]
if bad_parallel_choppers:
problems += ['BAD.PARALLEL.: [' + ', '.join(list(set(bad_parallel_choppers))) + ']']
else:
problems += ['NO.COILS.']
# Summarize problems
if problems:
status = ', '.join(problems)
else:
status = 'OKAY'
self.problems = problems
return status
[docs] def printq(self, *args):
"""Print wrapper for keeping topic consistent within the instance"""
printd(*args, topic=self.debug_topic)
def _guess_patch_type(self, default='?'):
"""
Differentiates between various file types and returns a string or None.
:return:
'P': PCS format, like ptpatch.dat. Similar to 'F' type, but includes strange binary data
'F': F-coil patch panel, ASCII
'I': I&C-coil patch panel, ASCII
None: Failed to determine type
"""
if self.bin_mark in self.contents:
return 'P'
elif 'C-coil parity:' in self.contents:
return 'I'
elif 'I-coil file:' in self.contents:
return 'I'
elif (('PUSH' in self.contents) or ('PULL' in self.contents)) and ('1A' in self.contents):
return 'F'
else:
return default
def _read_type_f(self):
"""Parses F-coil patch panel files into dict-like structure"""
assert self.patch_type == 'F', 'This method only works for type "F". Type is instead: %s' % self.patch_type
field_widths = [3, 4, 3, 6, 3, 4, 2, 5, 2, 2, 5, 5, 5, 5, 5, 3, 3, 3, 3, 3]
for i, line in enumerate(self.contents.split('\n')):
if len(line) != np.sum(field_widths):
self.printq('Line {} does not match expected length ({}); contents = {}'.format(i, np.sum(field_widths), line))
continue
fields = [''] * len(field_widths)
start = 0
for j in range(len(field_widths)):
fields[j] = line[start : start + field_widths[j]]
start += field_widths[j]
out = self[fields[1].strip()] = OMFITpatchObject()
out['coil_index'] = int(fields[0])
out['chopper_count'] = int(fields[9])
out['chopper_index'] = [int(fields[2])]
out['power_supply_index'] = int(fields[4])
out['direction_index'] = int(fields[6])
out['rail_index'] = int(fields[8])
out['chopper_name'] = [fields[3].strip()]
out['power_supply_name'] = fields[5].strip()
out['direction_name'] = fields[7].strip()
if out['rail_index'] == 0:
out['rail_name'] = 'E supply'
elif out['rail_index'] == 1:
out['rail_name'] = 'VFI'
else:
out['rail_name'] = ''
out['chopper_name'] = [out['chopper_name'][0]] + [cn.strip() for cn in fields[10:15]]
out['chopper_index'] = [out['chopper_index'][0]] + [int(ci) for ci in fields[15:20]]
out['chopper_name'] = OMFITpatchList(out['chopper_name'], parent=out)
out['chopper_index'] = OMFITpatchList(out['chopper_index'], parent=out)
out.locked = True
self.get_patch_name()
if self.auto_clean:
self.printq('Running automatic cleanup of F-patch file (archive format/type F) to remove junk chopper data')
self._clean_type_f()
def _read_type_p(self, include_c_coil=False):
"""Parses type-P patch panel files, which are F-patch files in binary format"""
self._read_type_p_inner(self.contents.split(self.bin_mark)[45])
if include_c_coil:
self.add_c_coil()
# This might be needed to make converted type-P to F files consistent with natural type-F files
if self.auto_clean:
self.printq('Running automatic cleanup of F-patch file (binary format/type P) to remove junk chopper data')
self._clean_type_f()
if self.fpconvert:
printd('Automatically converting from type-P (binary) to type-F (ASCII) to enable saving.')
self.patch_type = 'F'
if 'cl' not in self:
printw('WARNING: please use .add_c_coil(d_supply=True/False) to add C-coil to converted P-type patch!')
def _read_type_p_inner(self, item_names):
"""Splits apart a string that's formatted like the relevant section of a type-P file"""
fcoil_len = 36 # Number of characters for each F-coil
title_len = 12 # Number of characters for title
self.patch_name = item_names[:title_len].strip()
rest = item_names[title_len:]
coil_index = 1
while len(rest) >= fcoil_len:
out = self[rest[:4].strip()] = OMFITpatchObject()
out['coil_index'] = coil_index
# Load placeholders so the order will be consistent with F-type
out['chopper_count'] = 0
out['chopper_index'] = [0, 0, 0, 0, 0, 0]
out['power_supply_index'] = 0
out['direction_index'] = 0
out['rail_index'] = 0
# Done with placeholders
out.locked = True
out['chopper_name'] = [rest[16 + i * 4 : 20 + i * 4].strip() for i in range(5)] + [''] # P-type only holds 5 choppers
ps = rest[4:8].strip()
out['power_supply_name'] = '' if ps == 'NONE' else ps
out['direction_name'] = rest[12:16].strip()
out['rail_name'] = 'VFI' if rest[8:11] == 'YES' else 'E supply'
rest = rest[fcoil_len:]
coil_index = coil_index + 1
assert len(rest) == 0, 'Inner P contents after read FC should be empty, but instead: len{} remainder={}'.format(
len(rest), repr(rest)
)
return
def _read_type_i(self):
"""Parses I & C coil files"""
assert self.patch_type == 'I', 'This method only works for type "I". Type is instead: %s' % self.patch_type
if self.contents:
lines = self.contents.split('\n')
self._read_type_i_inner(lines)
self.get_patch_name()
def _read_type_i_inner(self, lines, xs=0):
self['config'] = OMFITpatchObject()
self['config']['c_coil_parity'] = lines[0][15:].strip()
self['config']['i_coil_file'] = lines[11][13:].strip()
for i in range(10):
line = lines[1 + i]
out = self['c{}'.format(i)] = OMFITpatchObject()
out['slot'] = line[2:4]
out['sign'] = line[5:8]
out['spa'] = line[9:13].strip()
out['c_idx'] = int(line[14 + xs : 15 + xs])
out['coil'] = line[16:].strip()
self['AA'] = OMFITpatchObject()
for i in range(12):
line = lines[12 + i]
self['AA'][line[:5].strip()] = line[5:].strip()
self['SS'] = OMFITpatchObject()
if len(lines) > 25:
# Has SS entries
for i in range(12):
line = lines[24 + i]
self['SS'][line[:7].strip()] = line[7:].strip()
else:
# Old file with no SS entries, set them all disconnected
for i in [1, 2]:
for a in 'ABCDEF':
self['SS']['SS{}{}'.format(i, a)] = ''
def _read_type_j(self):
"""Parses I & C patch information obtained from a shot number"""
line_len1 = 36
line_len2 = 10
line_lens = [line_len1] * 10 + [32, 4] + [line_len2] * 12 + [line_len1] * 12
rest = copy.copy(self.contents)
lines = []
for line_len in line_lens:
lines += [rest[:line_len]]
rest = rest[line_len:]
self.patch_name = rest.strip()
lines_out = ['C-coil parity: ' + lines[11]] + lines[0:10] + ['I-Coil file: ' + lines[10]] + lines[12:]
lines_out = [line.replace(' pos', 'pos').replace(' neg', 'neg') for line in lines_out]
self._read_type_i_inner(lines_out, xs=1)
[docs] def get_patch_name(self):
"""Tries to determine patch panel name"""
if (getattr(self, '_load_complete', None) is not None) and (getattr(self, 'patch_name', None) is not None):
# Already have patch_name determined by previous load
return
elif (self.source_file is not None) and os.path.split(self.source_file)[-1].lower().endswith('patnow.dat'):
name_file = self.source_file.replace('patnow.dat', 'patnow.nam')
if os.path.exists(name_file):
with open(name_file, 'r') as f:
patch_dat = f.read()
self.patch_name = patch_dat.split('\n')[0]
return
elif getattr(self, 'filename', None) is not None:
self.patch_name = os.path.split(self.filename)[-1]
return
self.patch_name = funny_random_name_generator(self.patch_type == 'I')
return
[docs] def add_c_coil(self, d_supply=False):
"""
Adds an entry for C-coils to type-F patch panels, which is useful if converted from type-P.
Type-P files don't have a C-coil entry.
:param d_supply: bool
True: Add the default D supply on C-coil setup.
False: Add default C-coil setup with no F-coil supplies on the C-coils (C-coil entry in F is blank)
"""
if 'cl' in self:
self.printq('This patch panel already has C-coils. Skipping.')
return
out = self['cl'] = OMFITpatchObject()
out['coil_index'] = max([a.get('coil_index', 0) for a in self.values() if isinstance(a, OMFITpatchObject)]) + 1
if d_supply:
out['chopper_count'] = 1
out['chopper_index'] = [42, 0, 0, 0, 0, 0]
out['power_supply_index'] = 1
out['direction_index'] = 1
out['rail_index'] = 3
out['chopper_name'] = ['FSUP', '', '', '', '', '']
out['power_supply_name'] = 'D'
out['direction_name'] = 'PUSH'
out['rail_name'] = ''
else:
out['chopper_count'] = 1
out['chopper_index'] = [0, 0, 0, 0, 0, 0]
out['power_supply_index'] = 0
out['direction_index'] = 0
out['rail_index'] = 1
out['chopper_name'] = ['OPEN', '', '', '', '', '']
out['power_supply_name'] = ''
out['direction_name'] = ''
out['rail_name'] = 'VFI'
out.locked = True
def _clean_type_f(self):
"""Removes unused chopper indices and other problems"""
assert self.patch_type in ['F', 'P'], 'This method should only be used for type F or P patch panels'
coils = [k for k in self if isinstance(self[k], OMFITpatchObject)]
for coil in coils:
a = self[coil]
# If the first chopper isn't a real chopper, any other indices specified are junk
if (a['chopper_index'][0] < 1) or (a['chopper_name'][0] in ['RV1', 'RV2', 'OPEN', 'SHORT', '']):
for i in range(1, 6):
a['chopper_index'][i] = 0
[docs] def cleanup(self):
"""Cleans up junk data and blatant inconsistencies. May prevent output from matching buggy input."""
if self.patch_type in ['F', 'P']:
self._clean_type_f()
else:
print('No cleanup method for patch panel type {}'.format(self.patch_type))
def _write_type_f(self):
"""Method for forming output string (file contents) for type-F patch panel files."""
out_format = (
'{coil_index:3d} {coil:>3s}{c0i:3d} {c0n:5s}{power_supply_index:3d} {power_supply_name:3s}'
'{direction_index:2d} {direction_name:4s}{rail_index:2d}{chopper_count:2d} {c1n:24s}{c1i:15s}'
)
output = []
for i, coil in enumerate([k for k in self if 'coil_index' in self[k]]):
d = dict(**self[coil])
d['c0i'] = d['chopper_index'][0]
d['c0n'] = d['chopper_name'][0]
d['c1n'] = ' '.join(['{:4s}'.format(cn) for cn in d['chopper_name'][1:]])
d['c1i'] = ''.join(['{:3d}'.format(ci) for ci in d['chopper_index'][1:]])
d['coil'] = coil
output += [out_format.format(**d)]
if not self.filename.startswith('patnow'):
output += ['Configuration Description']
output = '\n'.join(output)
self.contents = output
return output
def _write_type_i(self):
"""Method for forming output string for type-I patch panel files"""
output = []
output += ['C-coil parity: ' + self.get('config', {}).get('c_coil_parity', '')]
blank_c = dict(slot='', sign='', spa='', coil='', c_idx=0)
for i in range(10):
output += [
str(1 + int(np.floor(i / 2)))
+ ' {slot:2s} {sign:3s} {spa:>4s} {c_idx:1d} {coil:58s}'.format(**self.get('c{}'.format(i), blank_c))
]
output += ['I-Coil file: ' + self.get('config', {}).get('i_coil_file', '')]
for a in range(1, 13):
output += ['AA{:<2d} {:69s}'.format(a, self.get('AA', {}).get('AA{}'.format(a), ''))]
for i in [1, 2]:
for a in 'ABCDEF':
ss = 'SS{}{}'.format(i, a)
output += ['{:6s} {:67s}'.format(ss, self.get('SS', {}).get(ss, ''))]
output = '\n'.join(output)
self.contents = output
return output
[docs] @dynaSave
def save(self, no_write=False):
"""Saves file to disk"""
if self.patch_type == 'F':
output = self._write_type_f()
elif self.patch_type == 'P':
raise TypeError(
'Cannot save type P files; set self.patch_type="F" first to allow saving. Type P is F-coil patch in '
'some old format, which is binary and hard to use. Type F is F-coil patch in archival format, which is '
'ASCII and easy to deal with. If a type-P file is converted, it is recommended that a C-coil entry be '
'added using self.add_c_coil(). Specify d_supply=True if the D power supply should power C-coils.'
)
elif self.patch_type == 'I':
output = self._write_type_i()
else:
raise NotImplementedError('Patch type {} does not have a write method yet; sorry'.format(self.patch_type))
if not no_write:
with open(self.filename, 'w') as f:
self.printq('Writing patch panel main file {}...'.format(self.filename))
f.write(output)
if os.path.split(self.filename)[1].endswith('patnow.dat') and (self.patch_type in 'FI'):
name_file = self.filename.replace('patnow.dat', 'patnow.nam')
name_output = '\n'.join([self.patch_name, 'Configuration Description'])
self.printq('Writing patch panel name file {}...'.format(name_file))
with open(name_file, 'w') as f:
f.write(name_output)
return output