Source code for omfit_classes.omfit_base

try:
    # framework is running
    from .startup_choice import *
except ImportError as _excp:
    # class is imported by itself
    if (
        'attempted relative import with no known parent package' in str(_excp)
        or 'No module named \'omfit_classes\'' in str(_excp)
        or "No module named '__main__.startup_choice'" in str(_excp)
    ):
        from startup_choice import *
    else:
        raise

from omfit_classes.omfit_namelist import OMFITnamelist
from omfit_classes.omfit_weblink import OMFITwebLink
from omfit_classes.omfit_ascii import OMFITascii
from omfit_classes.omfit_json import OMFITsettings, SettingsName
from omfit_classes.omfit_error import OMFITexpressionError, OMFITerror, OMFITobjectError
from omfit_classes.omfit_harvest import harvest_send
from omfit_classes.utils_base import _streams
from omfit_classes import utils_base

from omfit_classes import namelist
import zipfile
import traceback
import numpy as np
import xarray
import pandas
import errno
import os as _os

__all__ = [
    'OMFITtree',
    'OMFITlist',
    'module_selective_deepcopy',
    'module_noscratch_deepcopy',
    'OMFITcollection',
    'OMFITmcTree',
    'OMFITstorage',
    'OMFITtreeCompressed',
    'OMFITmodule',
    'OMFITtmp',
    'OMFITproject',
    'OMFIThelp',
    '_OMFITnoSave',
    'OMFITexpressionsReturnNone',
    'OMFITexpression',
    'OMFITiterableExpression',
    'OMFITlazyLoad',
    'relativeLocations',
    'absLocation',
    'isinstance',
    'type',
    'hasattr',
    'shotBookmarks',
    'OMFITshotBookmarks',
    'OMFITmainSettings',
    'OMFITtypes',
    'OMFITtypesStr',
    'OMFITdictypes',
    'OMFITdictypesStr',
    'omfit_log',
    'ismodule',
    'diffTreeGUI',
    'exportTreeGUI',
    'diffViewer',
    'askDescription',
    'all_pylab_imports',
    'evalExpr',
]

# Adding `from pylab import *` to OMFIT scripts namespace to simplify users life. This makes OMFIT behave a-la matlab.
# NOTE: `from pylab import *` will override `__builtin__.all`, `__builtin__.any`, `__builtin__.sum`  but not `__builtin__.max` and `__builtin__.min`
# For backward compatibility with the do some subsequent import statements that reproduce how the OMFIT externalImports was done in the past
all_pylab_imports = {}
exec('from pylab import *', all_pylab_imports)
exec('import copy', all_pylab_imports)
exec('from matplotlib import pyplot as plt', all_pylab_imports)

# ---------------------
# OMFIT dynamic expressions
# ---------------------
class OMFITunevaluatedExpression:
    pass


[docs]class OMFITexpression: """This class handles so called OMFIT `dynamic expressions`. If you generate dynamic expressions in a Python script, note that the relative location of the expression (root,parent,DEPENDENCIES,...) is evaluated with respect to where the expression is in the tree, not relative to where the script which generated the expression resides If you are using relative locations in your expression, things may not work if you have the same expression into two locations in the tree. A classic of this happening is if you do a memory copy of something (e.g. namelist) containing an expression to some other location in the tree. If this happens the results are unpredictable. :param expression: string containing python code to be dynamically evaluated every time an attribute of this object is accessed """ def __init__(self, expression): if not isinstance(expression, str): raise OMFITexception('OMFITexpression argument can only be a string') self.expression = expression self.lastEval = OMFITunevaluatedExpression() try: self._value_() except Exception as _excp: self.lastEval = OMFITexpressionError(self.expression) def __getattr__(self, attr): if attr == '__getinitargs__': raise AttributeError('bad attribute `%s`' % attr) elif attr in ['_OMFITcopyOf', '_OMFITparent', '_OMFITkeyName']: raise AttributeError('OMFITexpressions must be contained in SortedDict dictionaries') return getattr(self._value_(), attr) def __coerce__(self, other): # Unfortunately obsolete and no longer supported for Python3.x # Because of this the built-in type methods (eg. __add__, __radd__, ...) # need to be explicitly specified in the OMFITexpression class return self._value_(), other # ================== # We must explicitly implement methods that are always defined for classes in Python3.x # https://docs.python.org/3.7/library/operator.html # Use of explicit operators (eg. +, rather than __add__) is due to the fact that some objects # (like strings) are handled in somewhat special ways # ==================
[docs] def bool(self): if self._value_(): return True return False
def __bool__(self): if self._value_(): return True return False def __nonzero__(self): return bool(self._value_())
[docs] def real(self): return self._value_().real()
[docs] def imag(self): return self._value_().imag()
[docs] def lt(self, b): return self._value_() < b
[docs] def le(self, b): return self._value_() <= b
[docs] def eq(self, b): return self._value_() == b
[docs] def ne(self, b): return self._value_() != b
[docs] def ge(self, b): return self._value_() >= b
[docs] def gt(self, b): return self._value_() > b
def __lt__(self, b): return self._value_() < b def __le__(self, b): return self._value_() <= b def __eq__(self, b): return self._value_() == b def __ne__(self, b): return self._value_() != b def __ge__(self, b): return self._value_() >= b def __gt__(self, b): return self._value_() > b # ==================
[docs] def not_(self): return not self._value_()
def __not__(self): return not self._value_()
[docs] def truth(self): return self._value_().truth()
[docs] def is_(self, b): return self._value_() is b
[docs] def is_not(self, b): return self._value_() is not b
[docs] def abs(self): return abs(self._value_())
def __abs__(self): return abs(self._value_())
[docs] def add(self, b): return self._value_() + b
def __add__(self, b): return self._value_() + b def __radd__(self, b): return b + self._value_()
[docs] def and_(self, b): return self._value_() & b
def __and__(self, b): return self._value_() & b def __rand__(self, b): return b & self._value_()
[docs] def floordiv(self, b): return self._value_() // b
def __floordiv__(self, b): return self._value_() // b def __rfloordiv__(self, b): return self._value_().__rfloordiv__(b)
[docs] def index(self): return self._value_().index()
def __index__(self): return self._value_().__index__()
[docs] def inv(self): return self._value_().inv()
[docs] def invert(self): return self._value_().invert()
def __inv__(self): return self._value_().__inv__() def __invert__(self): return self._value_().__invert__()
[docs] def lshift(self, b): return self._value_().lshift(b)
def __lshift__(self, b): return self._value_().__lshift__(b) def __rlshift__(self, b): return self._value_().__rlshift__(b)
[docs] def mod(self, b): return self._value_() % b
def __mod__(self, b): return self._value_() % b def __rmod__(self, b): return self._value_().__rmod__(b)
[docs] def mul(self, b): return self._value_() * b
def __mul__(self, b): return self._value_() * b def __rmul__(self, b): return b * self._value_()
[docs] def matmul(self, b): return self._value_().matmul(b)
def __matmul__(self, b): return self._value_().__matmul__(b)
[docs] def neg(self): return -self._value_()
def __neg__(self): return -self._value_()
[docs] def or_(self, b): return self._value_() | b
def __or__(self, b): return self._value_() | b def __ror__(self, b): return self._value_() | b
[docs] def pos(self): return +self._value_()
def __pos__(self): return +self._value_()
[docs] def pow(self, b): return self._value_().pow(b)
def __pow__(self, b): return self._value_().__pow__(b) def __rpow__(self, b): return self._value_().__rpow__(b)
[docs] def rshift(self, b): return self._value_().rshift(b)
def __rshift__(self, b): return self._value_().__rshift__(b) def __rrshift__(self, b): return self._value_().__rrshift__(b)
[docs] def sub(self, b): return self._value_() - b
def __sub__(self, b): return self._value_() - b def __rsub__(self, b): return b - self._value_() # return self._value_().__rsub__(b) # Try to revert if https://github.com/gafusion/OMFIT-source/issues/4163 gets fixed
[docs] def truediv(self, b): return self._value_() / b
def __truediv__(self, b): return self._value_() / b def __rtruediv__(self, b): return self._value_().__rtruediv__(b) def __divmod__(self, b): return self._value_().__divmod__(b) def __rdivmod__(self, b): return self._value_().__rdivmod__(b)
[docs] def xor(self, b): return self._value_() ^ b
def __xor__(self, b): return self._value_() ^ b def __rxor__(self, b): return self._value_().__rxor__(b) # ================== @property def __dict__(self): return self._value_().__dict__ def __dir__(self): return dir(self._value_()) def __eq__(self, other): return self._value_() == other def __format__(self, *args, **kw): return self._value_().__format__(*args, **kw) def __ge__(self, other): return self._value_() >= other def __gt__(self, other): return self._value_() > other def __hash__(self, *args, **kw): return self._value_().__hash__(*args, **kw) # def __init_subclass__(self, *args, **kw): # return self._value_().__init_subclass__(*args, **kw) def __le__(self, other): return self._value_() <= other def __lt__(self, other): return self._value_() < other def __ne__(self, other): return self._value_() != other def __repr__(self): return repr(self._value_()) def __sizeof__(self): return sizeof(self._value_()) def __str__(self): return str(self._value_()) def __unicode__(self): return str(self._value_()) def __int__(self): return int(self._value_()) def __float__(self): return float(self._value_()) def __complex__(self): return complex(self._value_()) def __oct__(self): return oct(self._value_()) def __hex__(self): return hex(self._value_()) def __trunc__(self): import math return math.trunc(self._value_()) # ===================== def __deepcopy__(self, memo): return self.__class__(self.expression) def __getstate__(self): return {'expression': self.expression} def __setstate__(self, dict): self.__init__(dict['expression']) def _value_(self, executed_expr_ids=[]): global OMFITexpressionsReturnNone if OMFITexpressionsReturnNone: return None # first try to get an answer without evaluating any relativelocations at all self.lastEval = self._godeep(self.__value__(dependencies=None), executed_expr_ids=executed_expr_ids) # then try to get an answer with the relative locations but no dependencies if isinstance(self.lastEval, OMFITexpressionError): self.lastEval = self._godeep(self.__value__(dependencies=False), executed_expr_ids=executed_expr_ids) # finally try evaluating the module dependencies if isinstance(self.lastEval, OMFITexpressionError): self.lastEval = self._godeep(self.__value__(dependencies=True), executed_expr_ids=executed_expr_ids) if isinstance(self.lastEval, OMFITexpressionError): self.__class__ = OMFITiterableExpression elif isinstance(self.lastEval, OMFITexpression): self.__class__ = self.lastEval.__class__ else: try: iter(self.lastEval) self.__class__ = OMFITiterableExpression except TypeError: self.__class__ = OMFITexpression return self.lastEval def _godeep(self, val, executed_expr_ids=[]): # __godeep__ has to use _value_() to resolve layered expressions if id(val) in executed_expr_ids: errmsg = 'Error! OMFITexpression recursion loop detected!' printe(errmsg) return OMFITerror(errmsg) elif isinstance(val, OMFITexpression): return val._value_(executed_expr_ids=executed_expr_ids + [id(val)]) else: return val def __value__(self, dependencies): locals = {} locals.update(all_pylab_imports) locals.update(relativeLocations(self, dependencies=dependencies)) try: tmp = eval(self.expression, globals(), locals) except Exception: try: locals['testExpr'] = False exec(self.expression, globals(), locals) tmp = locals.get('return_variable', None) except Exception as _excp: printd('OMFITexpressionError: %s' % _excp) return OMFITexpressionError(repr(_excp)) return tmp
[docs]class OMFITiterableExpression(OMFITexpression): """ Subclass of OMFITexpression used for iterable objects The distinction between iterable and not iterable expressions is used in case someone tests for iterability of an object """ # ================== def __len__(self): return self._value_().__len__()
[docs] def concat(self, b): return self._value_().concat(b)
def __concat__(self, b): return self._value_().__concat__(b)
[docs] def contains(self, b): return self._value_().contains(b)
def __contains__(self, b): return self._value_().__contains__(b)
[docs] def countOf(self, b): return self._value_().countOf(b)
[docs] def delitem(self, b): return self._value_().delitem(b)
def __delitem__(self, b): return self._value_().__delitem__(b)
[docs] def getitem(self, b): return self._value_().getitem(b)
def __getitem__(self, b): return self._value_().__getitem__(b)
[docs] def indexOf(self, b): return self._value_().indexOf(b)
[docs] def setitem(self, b, c): return self._value_().setitem(b, c)
def __setitem__(self, b, c): return self._value_().__setitem__(b, c)
[docs] def length_hint(self, default=0): return self._value_().length_hint(default)
[docs] def attrgetter(self, *attrs): return self._value_().attrgetter(*attrs)
[docs] def itemgetter(self, *items): return self._value_().itemgetter(*items)
[docs] def methodcaller(self, name, *args): return self._value_().methodcaller(name, *args)
[docs]def relativeLocations(location, dependencies=True): """ This function provides a dictionary references to some useful quantities with respect to the object specified in `location`. Note that the variables in the returned dictionary are the same ones that are available within the namespace of OMFIT scripts and expressions. :param location: location in the OMFIT tree :return: dictionary containing the following variables: * OMFITlocation : list of references to the tree items that compose the OMFIT-tree path * OMFITlocationName : list of path strings to the tree items that compose the OMFIT-tree path * parent : reference to the parent object * parentName : path string to the parent object * this : reference to the current object * thisName : path string to the current object * OMFITmodules : list of modules to the current module * OMFITmodulesName : list of string paths to the current module * MainSettings : reference to OMFIT['MainSettings'] * MainScratch: reference to OMFIT['scratch'] * scratch : reference to module scratch * scratchName : string path to module scratch * root : reference to this module * rootName : string path to this module * DEPENDENCIES variables defined within the module """ locations = {} # global locations (some will be overwritten if treeLocation(location) is not None) locations['OMFIT'] = OMFIT locations['rootName'] = 'OMFIT' locations['root'] = OMFIT locations['MainSettings'] = OMFIT['MainSettings'] locations['scratchName'] = "OMFIT['scratch']" locations['MainScratch'] = locations['scratch'] = OMFIT['scratch'] locations['this'] = location locations['parent'] = getattr(location, '_OMFITparent', None) # location of this entry in the OMFIT tree OMFITlocationName = treeLocation(location) locations['OMFITlocationName'] = OMFITlocationName if OMFITlocationName: # if an entry is in the OMFIT tree if len(OMFITlocationName[0]): locations['OMFITlocation'] = [] for item in copy.deepcopy(OMFITlocationName): try: locations['OMFITlocation'].append(eval(item)) except Exception: OMFITlocationName.remove(item) # if an entry is not in the OMFIT tree at least we can try to set root # by navigating upstream and seing if there is a module above it else: h = location if builtins.isinstance(h, OMFITmodule): locations['root'] = h locations['rootName'] = '?' if '__scratch__' not in h: h['__scratch__'] = OMFITtmp() locations['scratch'] = h['__scratch__'] locations['scratchName'] = '?' else: for item in parseLocation(OMFITlocationName[-1]): if getattr(h, '_OMFITparent', None): h = h._OMFITparent if builtins.isinstance(h, OMFITmodule): locations['root'] = h locations['rootName'] = '?' if '__scratch__' not in h: h['__scratch__'] = OMFITtmp() locations['scratch'] = h['__scratch__'] locations['scratchName'] = '?' if OMFITlocationName and len(OMFITlocationName[0]): locations['OMFITmodulesName'] = [OMFITlocationName[0]] for tmpName in OMFITlocationName[1:]: if eval(tmpName).__class__ is OMFITmodule or eval(tmpName).__class__ is OMFITproject: locations['OMFITmodulesName'].append(tmpName) rootName = locations['OMFITmodulesName'][-1] locations['OMFITmodules'] = [] for item in locations['OMFITmodulesName']: locations['OMFITmodules'].append(eval(item)) root = locations['OMFITmodules'][-1] locations['rootName'] = rootName locations['root'] = root locations['thisName'] = OMFITlocationName[-1] locations['this'] = eval(locations['thisName']) if len(OMFITlocationName) > 1: locations['parentName'] = OMFITlocationName[-2] locations['parent'] = eval(locations['parentName']) if root is OMFIT: locations['MainScratchName'] = "OMFIT['scratch']" else: if '__scratch__' not in root: root['__scratch__'] = OMFITtmp() locations['scratchName'] = rootName + "['__scratch__']" locations['scratch'] = eval(locations['scratchName']) if not dependencies: return locations if root.__class__ is OMFITmodule and 'SETTINGS' in root and 'DEPENDENCIES' in root['SETTINGS']: for dep in list(root['SETTINGS']['DEPENDENCIES'].keys()): if root['SETTINGS']['DEPENDENCIES'][dep] is None: locations[dep] = None else: try: pathExists = True error = '' if root['SETTINGS']['DEPENDENCIES'][dep].__class__ not in [OMFITexpression, OMFITiterableExpression] and isinstance( root['SETTINGS']['DEPENDENCIES'][dep], str ): locdep = str(root['SETTINGS']['DEPENDENCIES'][dep]) else: tmp = str(root['SETTINGS']['DEPENDENCIES'][dep].expression) retvar = {'return_variable': None} tmpNamespace = {} tmpNamespace.update(globals()) tmpNamespace.update(locations) try: retvar['return_variable'] = eval(tmp, tmpNamespace) except Exception as _excp: try: exec(tmp, tmpNamespace, retvar) except Exception as _excp: printd( 'Broken dynamic expression dependency ' + rootName + "['SETTINGS']['DEPENDENCIES']['" + dep + "'] referred from " + OMFITlocationName[-1] + '\n' + _excp, level=3, topic='dependencies', ) locations[dep] = None continue locdep = retvar['return_variable'] whereFrom = re.sub(r'(^\w*)(\[.*\]$)', r'\1', locdep) path = '' if '[' in locdep: path = re.sub(r'(^\w*)(\[.*\]$)', r'\2', locdep) if whereFrom == 'scratch': locdep = locations['scratchName'] + path elif whereFrom == 'MainSettings': locdep = "OMFIT['MainSettings']" + path elif whereFrom == 'this': locdep = locations['thisName'] + path elif whereFrom == 'parent': locdep = locations['parentName'] + path elif whereFrom == 'root': locdep = locations['rootName'] + path elif whereFrom == 'OMFITlocation': # notice that OMFITlocation is relative to the script/expression calling, not to the location of ['DEPENDENCIES'] of a module locdep = locations['OMFITlocationName'][parseBuildLocation(path)[0]] + parseBuildLocation( parseBuildLocation(path)[1:] ) elif whereFrom == 'OMFITmodules': locdep = locations['OMFITmodulesName'][parseBuildLocation(path)[0]] + parseBuildLocation( parseBuildLocation(path)[1:] ) else: locdep = 'OMFIT' + path tmp = OMFIT for key in parseBuildLocation(locdep): if key in tmp: tmp = tmp[key] else: pathExists = False break if pathExists: # the purppose of this evaluate if just to raise an exception if the evaluation fails eval(locdep) # this is to catch errors which are not associated with path not existing except Exception as _excp: pathExists = False error = ': ' + repr(_excp) if pathExists: locations[dep] = eval(locdep) else: locations[dep] = None printd( 'Broken dependency ' + rootName + "['SETTINGS']['DEPENDENCIES']['" + dep + "'] referred from " + OMFITlocationName[-1] + '\n' + error, level=3, topic='dependencies', ) return locations
[docs]def absLocation(location, base, base_is_relativeLocations_output=False): """ This method translates relative location strings to absolute location strings in the OMFIT tree :param location: string (or list of strings) with relative/absolute location in the OMFIT tree :param base: tree object with respect to which the query is made :param base_is_relativeLocations_output: is the `base` parameter the output of the relativeLocations() function :return: absolute location string (or list of strings) in the OMFIT tree """ multiple = not isinstance(location, str) locations = np.atleast_1d(location).tolist() if not base_is_relativeLocations_output: base = relativeLocations(base) for k, location in enumerate(locations): # skip if it's already an absolute location if re.match(r'^OMFIT\[.*\]$', location): locations[k] = location continue # evaluate location based on relativeLocation of the running topGUI script if location.startswith("OMFITmodules"): tmp = parseLocation(location) whereFrom = buildLocation(tmp[:2]) path = buildLocation([''] + tmp[2:]) else: whereFrom = re.sub(r'(^\w*)(\[.*\]$)', r'\1', location) path = location[len(whereFrom) :] exec("location = treeLocation(" + whereFrom + ")[-1]", globals(), base) location = base['location'] locations[k] = location + path if multiple: return locations else: return locations[0]
# This variable is used to force all expressions to return None # This is used when saving the OMFITtree and is useful since the information in the Namelist, NETcdf, etc... is already # saved as an expression. Expressions entry tend to change depending on the user and result in differences which are # caught by git. By forcing expressions to be evaluated as None when saving, git should not see these differences. global OMFITexpressionsReturnNone OMFITexpressionsReturnNone = False
[docs]def isinstance(a, b): if b is type: return builtins.isinstance(a, builtins.type) elif builtins.isinstance(a, OMFITlazyLoad): for bb in tolist(b): if bb.__name__ == 'OMFITlazyLoad': return True for bb in tolist(b): if a.cls == bb.__name__: return True for bb in tolist(b): if hasattr(eval(a.cls), '__subclasses__'): for cls in eval(a.cls).__subclasses__(): if isinstance_str(cls.__name__, bb.__name__): return True return False if builtins.isinstance(a, OMFITdataset): for bb in tolist(b): if bb is xarray.Dataset: return True if builtins.isinstance(a, OMFITexpression) and b not in [OMFITexpression, OMFITiterableExpression]: a = a._value_() elif ( builtins.isinstance(a, OMFITcollection) and a.selector is not None and not (inspect.isclass(b) and builtins.issubclass(b, OMFITcollection)) ): a = a.GET(a.selector) try: b = tuple(b) # builtins.isinstance specifically wants a tuple, not a list (which it gets sometimes) except TypeError: pass # b is not iterable; don't need to convert list to tuple return builtins.isinstance(a, b)
isinstance.__doc__ = ( builtins.isinstance.__doc__ + '\n\nThis function is modified to account for special behavior of some OMFIT classes, such as OMFITexpression.' )
[docs]def type(a, *args, **kw): if builtins.isinstance(a, OMFITexpression): a = a._value_() elif builtins.isinstance(a, OMFITcollection) and a.selector is not None: a = a.GET(a.selector) return builtins.type(a, *args, **kw)
def issubclass(a, b): if builtins.isinstance(a, OMFITexpression) and b not in [OMFITexpression, OMFITiterableExpression]: a = a._value_() elif builtins.isinstance(a, OMFITcollection) and a.selector is not None and b != OMFITcollection: a = a.GET(a.selector) if not inspect.isclass(a): raise TypeError('issubclass() arg 1 must be a class, not type %s' % (type(a))) return builtins.issubclass(a, b)
[docs]def hasattr(object, attribute): # this should be lastEval because in __getattr__(), getattr is called after _lastEval_() if builtins.isinstance(object, OMFITexpression): return hasattr(object.lastEval, attribute) return builtins.hasattr(object, attribute)
_deepcopy = copy.deepcopy def _OMFITdeepcopy(object, memo=None, _nil=[]): if isinstance(object, OMFITexpression): global OMFITexpressionsReturnNone tmpExp = OMFITexpressionsReturnNone OMFITexpressionsReturnNone = True try: obj = _deepcopy(object, memo=memo, _nil=_nil) try: if hasattr(object, '_OMFITcopyOf'): obj._OMFITcopyOf = object._OMFITcopyOf else: obj._OMFITcopyOf = weakref.ref(object) except Exception: pass finally: OMFITexpressionsReturnNone = tmpExp else: if hasattr(object, 'dynaLoad'): if hasattr(object, 'duplicate') and object.dynaLoad: obj = object.duplicate() elif not object.dynaLoad: obj = _deepcopy(object, memo=memo, _nil=_nil) else: tmp = object.dynaLoad object.dynaLoad = False try: obj = _deepcopy(object, memo=memo, _nil=_nil) obj.dynaLoad = tmp finally: object.dynaLoad = tmp else: obj = _deepcopy(object, memo=memo, _nil=_nil) if framework and isinstance(obj, OMFITobject): from omfit_classes.utils_base import _allOMFITobjects _allOMFITobjects.setdefault(obj.filename, []).append(weakref.ref(obj)) return obj _OMFITdeepcopy.__doc__ = _deepcopy.__doc__ copy.deepcopy = _OMFITdeepcopy # monkey patch validation of path objects in os and unix_os Python modules _fspath = os.fspath def fspath(path): return _fspath(evalExpr(path)) os.fspath = fspath _os.fspath = fspath # --------------------- # OMFIT tree # --------------------- class _OMFITnoSave(object): # objects that are derived from this class will not be saved # Note: not even an entry in OMFITsave.txt will be made pass
[docs]class OMFITtree(SortedDict): """ A branch in the tree is represented in the filesystem as a directory. Note that the OMFIT object itself belongs to the OMFITmainTree class, which is a subclass of the OMFITtree class. """ _save_method = '_save' def __init__( self, filename='', only=None, modifyOriginal=False, readOnly=False, quiet=False, developerMode=False, serverPicker=None, remote='', server='localhost', tunnel='', **kw, ): r""" :param filename: 'directory/bla/OMFITsave.txt' or 'directory/bla.zip' where the OMFITtree will be saved (if '' it will be saved in the same folder of the parent OMFITtree) :param only: list of strings used to load only some of the branches from the tree (eg. ["['MainSettings']","['myModule']['SCRIPTS']"] :param modifyOriginal: by default OMFIT will save a copy and then overwrite previous save only if successful. If `modifyOriginal=True` and filename is not .zip, will write data directly at destination, which will be faster but comes with the risk of deleting a good save if the new save fails for some reason :param readOnly: will place entry in OMFITsave.txt of the parent so that this OMFITtree can be loaded, but will not save the actual content of this subtree. `readOnly=True` is meant to be used only after this subtree is deployed where its fileneme says it will be. Using this feature could result in much faster projects save if the content of this tree is large. :param quiet: Verbosity level :param developerMode: load OMFITpython objects within the tree as modifyOriginal :param serverPicker: take server/tunnel info from MainSettings['SERVER'] :param remote: access the filename in the remote directory :param server: if specified the file will be downsync from the server :param tunnel: access the filename via the tunnel :param \**kw: Extra keywords are passed to the SortedDict class """ if serverPicker or (server and server != 'localhost'): if developerMode: raise OMFITexception('Cannot load remote data in developerMode') if modifyOriginal: raise OMFITexception('Cannot load remote data as modifyOriginal') omfitsave = None if filename.endswith('OMFITsave.txt'): filename, omfitsave = os.path.split(filename) tmp = OMFITobject(filename, serverPicker=serverPicker, remote=remote, server=server, tunnel=tunnel) filename = tmp.filename if os.path.isdir(filename): omfitsave = 'OMFITsave.txt' if omfitsave: filename = filename + os.sep + omfitsave SortedDict.__init__(self, **kw) self.modifyOriginal = False self.readOnly = False self.load(filename, only=only, modifyOriginal=modifyOriginal, readOnly=readOnly, quiet=quiet, developerMode=developerMode)
[docs] def addBranchPath(self, location, leaf=special1, upTo=None): """ Creates a path in the tree, without overwriting branches which already exist :param location: string containing the path to be added :param leaf: Value to set the destination if not present :param upTo: location is traversed up to `upTo` e.g. OMFIT['branch'].addBranchPath("['branch2']['branch3']") """ path = parseBuildLocation(location) if upTo is not None: path = path[:upTo] addedPath = [] loc = self for k, branch in enumerate(path): from omas import ODS if isinstance(loc, ODS): from omas.omas_utils import p2l, l2o ods_branch = list(flatten_iterable(list(map(p2l, path[k:])))) try: loc[ods_branch] addedPath.append((loc, l2o(ods_branch), 0)) except ValueError: # catch ValueError: `...` has no data tmp = ODS() tmp.omas_data = {} loc[ods_branch] = tmp addedPath.append((loc, l2o(ods_branch), 1)) break if not isinstance(loc, dict): raise OMFITexception( 'Tree variable [\'' + path[k - 1] + '\'] is of type ' + str(type(loc).__name__) + ' and can not have sub-branches' ) if branch in loc: addedPath.append((loc, branch, 0)) else: if k + 1 < len(path) or leaf is special1: loc[branch] = OMFITtree() else: loc[branch] = leaf addedPath.append((loc, branch, 1)) loc = loc[branch] return addedPath
[docs] def duplicate(self, filename='', modifyOriginal=False, readOnly=False, quiet=True): """ Similarly to the duplicate method for OMFITobjects, this method makes a copy by files. This means that the returned subtree objects will be pointing to different files from the one of the original object. This is to be contrasted to a deepcopy of an object, which copies the objects in memory, but does not duplicate the objects themselves. :param filename: if filename='' then the duplicated subtree and its files will live in the OMFIT working directory, if filename='directory/OMFITsave.txt' then the duplicated subtree and its files will live in directory specified :param modifyOriginal: only if filename!='' by default OMFIT will save a copy and then overwrite previous save only if successful. If `modifyOriginal=True` and filename is not .zip, will write data directly at destination, which will be faster but comes with the risk of deleting a good save if the new save fails for some reason :param readOnly: only if filename!='' will place entry in OMFITsave.txt of the parent so that this OMFITtree can be loaded, but will not save the actual content of this subtree. `readOnly=True` is meant to be used only after this subtree is deployed where its fileneme says it will be. Using this feature could result in much faster projects save if the content of this tree is large. :param quiet: Verbosity level :return: new subtree, with objects pointing to different files from the one of the original object NOTE: readOnly+modifyOriginal is useful because one can get significant read (modifyOriginal) and write (readOnly) speed-ups, but this feature relies on the users pledging they will not modify the content under this subtree. """ # duplicate internally if not filename: # generate a temporary directory name (safe for parallel runs) file_type = self.__class__.__name__ subprocess_dir = '_'.join(map(str, OMFITaux['prun_process'])) if len(subprocess_dir): subprocess_dir = '__p' + subprocess_dir directory = ( OMFITcwd + os.sep + 'objects' + os.sep + file_type + '_' + utils_base.now("%Y-%m-%d__%H_%M" + subprocess_dir + os.sep + "%S__%f") ) while os.path.exists(directory): directory += "_" filename = directory + os.sep + 'OMFITsave.txt' # deploy self.deploy(filename, quiet=quiet) # Reload and return new object, here we use modifyOriginal just to avoid making # yet another copy of the subtree since we are already in the OMFITcwd directory tmp = self.__class__(filename, modifyOriginal=True, readOnly=False, quiet=quiet) tmp.modifyOriginal = False tmp.filename = '' # duplicate externally else: # Deploy self.deploy(filename, quiet=quiet) # reload and return new object (we do this explicitly here just to provide documentation) tmp = self.__class__(filename, modifyOriginal=modifyOriginal, readOnly=readOnly, quiet=quiet) return tmp
[docs] def duplicateGUI(self, initial=None, modifyOriginal=False, readOnly=False, quiet=True): if initial is None: if OMFITaux['lastBrowsedDirectory']: initial = OMFITaux['lastBrowsedDirectory'] else: initial = self.filename if os.path.isdir(initial): initialdir = initial initialfile = None else: initialdir = os.path.split(initial)[0] initialfile = os.path.split(initial)[1] location = tkFileDialog.asksaveasfilename(initialdir=initialdir, initialfile=initialfile, parent=OMFITaux['rootGUI']) if len(location): OMFITaux['lastBrowsedDirectory'] = os.path.split(location)[0] return self.duplicate(location + os.sep + 'OMFITsave.txt', modifyOriginal=modifyOriginal, readOnly=readOnly, quiet=quiet)
[docs] def deploy( self, filename='', zip=False, quiet=False, updateExistingDir=False, serverPicker=None, server='localhost', tunnel='', s3bucket=None, ignoreReturnCode=False, ): """ Writes the content of the branch on the filesystem :param filename: contains all of the information to reconstruct the tree and can be loaded with the load() function :param zip: whether the deploy should occur as a zip file :param updateExistingDir: if not `zip` and not `onlyOMFITsave` this option does not delete original directory but just updates it :param serverPicker: take server/tunnel info from MainSettings['SERVER'] :param server: server to which to upload the file :param tunnel: tunnel to connect to the server :param s3bucket: name of s3 bucket to upload to :param ignoreReturnCode: ignore return code of rsync command """ if filename == '' and self.filename: filename = os.path.split(self.filename)[1] # from serverPicker to server/tunnel if serverPicker is not None: server = SERVER[serverPicker]['server'] tunnel = SERVER[serverPicker]['tunnel'] # local deploy if is_localhost(server): getattr(self, self._save_method)(filename, zip=zip, quiet=quiet, updateExistingDir=updateExistingDir) return filename, server, tunnel # remote deploy else: if not len(os.path.split(os.path.abspath(filename))[0]): raise ValueError('Must specify full path for remote deployment') tmpDir = OMFITtmpDir + os.sep + 'scratch' + os.sep if not os.path.exists(tmpDir): os.makedirs(tmpDir) filename_save = getattr(self, self._save_method)( tmpDir + os.path.basename(filename), zip=zip, quiet=quiet, updateExistingDir=False ) if not os.path.splitext(filename_save)[1] == '.zip': filename_save = os.path.split(filename_save)[0] filename = os.path.split(filename)[0] + os.sep + os.path.split(filename_save)[1] tmp = OMFITobject(filename_save, modifyOriginal=True, readOnly=True) tmp_return = tmp.deploy(filename, server=server, tunnel=tunnel, s3bucket=s3bucket, ignoreReturnCode=ignoreReturnCode) import shutil del tmp shutil.rmtree(tmpDir) return tmp_return
[docs] def deployGUI(self, filename='', **kw): """ Opens GUI for .deploy() method :param initial: Starting filename used in the GUI. If None, then the last browsed directory is used (OMFITaux['lastBrowsedDirectory']) :return: if deployment was successful """ if filename: if isinstance(filename, str): filename = (filename, 'localhost', '') kw['directory'] = os.path.split(filename[0])[0] kw['default'] = os.path.split(filename[0])[1] kw['server'] = filename[1] kw['tunnel'] = filename[2] from omfit_classes.OMFITx import SaveFileDialog fd = SaveFileDialog() if fd.how: self.deploy(filename=fd.how[0], server=fd.how[1], tunnel=fd.how[2], **kw) if is_localhost(fd.how[1]): printi('Deploy ' + fd.how[0]) else: printi('Deploy ' + str(fd.how)) return fd.how
def _save( self, filename, only=None, zip=False, onlyOMFITsave=False, quiet=None, skipStorage=True, updateExistingDir=False, skip_save_errors=False, ): """ the save() and saveas() method exists only for the OMFITmodule and OMFITmaintree class :param filename: filename to save to :param only: list of strings used to save only some of the branches in the tree (eg. ["['MainSettings']","['myModule']['SCRIPTS']"] :param zip: save as zip file (used to force save as zip, even if filename is 'directory/OMFITsave.txt' :param onlyOMFITsave: only create the OMFITsave.txt file :param quiet: verbose output :param skipStorage: Skip OMFITstorage objects :param updateExistingDir: if not `zip` and not `onlyOMFITsave` this option does not delete original directory but just updates it :param skip_save_errors: skip errors when saving objects """ if not isinstance(filename, str) or not len(filename): raise IOError('The specified OMFITtree filename is not valid: `' + str(filename) + '`') if not onlyOMFITsave and not quiet: types = OMFITtypes + OMFITdictypes for item in [SortedDict, SettingsName, NamelistName, OMFITtmp]: types.remove(item) nodes = OMFIT.traverse(onlyDict=tuple(types), skipDynaLoad=True) if quiet is None: quiet = bool(eval(os.environ.get('OMFIT_PROGRESS_BAR_QUIET', '0'))) if not isinstance(quiet, dict): quiet = {'quiet': quiet} newline = quiet.get('newline', False) clean = quiet.get('clean', False) style = quiet.get('style', ' [{sfill}{svoid}] {perc:3.2f}% {mess}') quiet = quiet.get('quiet', quiet) def _print(text): if not onlyOMFITsave and not quiet: if text.startswith('['): if text in nodes and not text.startswith("['__COMMANDBOX__']"): ascii_progress_bar( nodes.index(text), 0, len(nodes) - 1, text, quiet=quiet, newline=newline, clean=clean, style=style, width=10 ) else: printi(text) print('', end='') def generateOMFITproperties(me, kid): # query item directly for OMFITproperties string tmp = {} if hasattr(me[kid], '__save_kw__'): tmp.update(me[kid].__save_kw__()) if isinstance(me[kid], _OMFITpython): tmp.pop('modifyOriginal', None) if isinstance(me[kid], OMFITascii) and kid == 'help': tmp.pop('modifyOriginal', None) return re.sub(r'\n', r'\\n', repr(tmp)) def f_traverse(me, myLocation, path): # store all paths in OMFITsave.txt the UNIX way mypath = path.replace('\\', '/') if isinstance(me, (SortedDict, OMFITlist)): if isinstance(me, SortedDict): keys = me.keys() else: keys = range(len(me)) same_filename_counter = {} for kid in keys: if isinstance(me, OMFITlist): kidName = "[+]" else: kidName = "[" + repr(kid) + "]" entryID = myLocation + kidName if only is None or any(entryID.startswith(k) for k in tolist(only)): if isinstance(me[kid], OMFITlazyLoad): tp = me[kid].tp elif not isinstance(me[kid], OMFITobjectError): tp = me[kid].__class__.__name__ else: tp = me[kid].className changed_dir = False if isinstance(me[kid], _OMFITnoSave): pass elif isinstance(me[kid], OMFITstorage) and skipStorage: pass elif isinstance(me[kid], OMFITlist): path = mypath + str(kid) + '/' if not onlyOMFITsave: if not os.path.exists(str(kid)): os.makedirs(str(kid)) os.chdir(str(kid)) changed_dir = True f.write(entryID + ' <-:-:-> ' + tp + ' <-:-:-> <-:-:-> ' + generateOMFITproperties(me, kid) + '\n') _print(entryID) elif isinstance(me[kid], OMFITtree): path = mypath + str(kid) + '/' if not onlyOMFITsave: if not (isinstance(me[kid], OMFITcollection) and me[kid].OMFITproperties['no_subdir']): if not os.path.exists(str(kid)): os.makedirs(str(kid)) os.chdir(str(kid)) changed_dir = True else: path = mypath + '/' if me[kid].filename and not me[kid].readOnly and me[kid].modifyOriginal: # if not readOnly and modifyOriginal then we have to update the deploy me[kid].deploy(me[kid].filename) if me[kid].filename and (me[kid].modifyOriginal or me[kid].readOnly): filename = me[kid].filename.replace('\\', '/') f.write( entryID + ' <-:-:-> ' + tp + ' <-:-:-> ' + filename + ' <-:-:-> ' + generateOMFITproperties(me, kid) + '\n' ) else: f.write(entryID + ' <-:-:-> ' + tp + ' <-:-:-> <-:-:-> ' + generateOMFITproperties(me, kid) + '\n') _print(entryID) elif isinstance(me[kid], OMFITexpression): f.write(entryID + ' <-:-:-> OMFITexpression <-:-:-> ' + '_' + repr(me[kid].expression) + ' <-:-:-> {}\n') _print(entryID) else: saved = False if isinstance(me[kid], (OMFITmds, OMFITmdsValue, OMFITtoksearch, OMFITwebLink)): f.write(entryID + ' <-:-:-> ' + tp + ' <-:-:-> <-:-:-> ' + generateOMFITproperties(me, kid) + '\n') saved = True _print(entryID) elif isinstance(me[kid], tuple(OMFITtypes)) and me[kid].filename is not None: _print(entryID) # NOTE: OMFITpython scripts are --always-- saved within a project, even if these are set to be modifyOriginal=True. # This means that scripts that are loaded as modifyOriginal they will be so only within that OMFIT session. # If a project is saved and then re-opened, then all scripts will be local to the project and will have modifyOriginal=False. # This is done to ensure consistency between the data in the project and the scripts that have been used to generate them. try: if ( not hasattr(me[kid], 'modifyOriginal') or not me[kid].modifyOriginal or isinstance(me[kid], _OMFITpython) or (isinstance(me[kid], OMFITascii) and kid == 'help') ): # this prevents overwriting of files with the same filename # directory naming is better if it's deterministic, so that files changes can be tracked with git if os.path.split(me[kid].filename)[1] not in same_filename_counter: same_filename_counter[os.path.split(me[kid].filename)[1]] = 0 same_filename_counter[os.path.split(me[kid].filename)[1]] += 1 count = same_filename_counter[os.path.split(me[kid].filename)[1]] if count > 1: tmpdir = os.path.split(me[kid].filename)[1] + '_' + str(count) # adding an hashing reduces the chances of filename clash to a minimum tmpdir += '_' + omfit_hash(tmpdir, 10) + os.sep else: tmpdir = '' # now for the actual saving... tmpfilename = tmpdir + os.path.split(me[kid].filename)[1] filename = mypath + tmpfilename if not onlyOMFITsave: if len(tmpdir) and not os.path.exists(tmpdir): os.makedirs(tmpdir) me[kid].deploy(tmpfilename) else: filename = os.path.abspath(me[kid].filename) if not onlyOMFITsave: me[kid].save() except Exception as _excp: if isinstance(_excp, OSError) and _excp.errno == errno.ENOSPC: raise elif not skip_save_errors: raise else: printw('Error saving %s: %s' % (entryID, repr(_excp))) filename = filename.replace('\\', '/') f.write( entryID + ' <-:-:-> ' + tp + ' <-:-:-> ' + filename + ' <-:-:-> ' + generateOMFITproperties(me, kid) + '\n' ) saved = True elif isinstance(me[kid], xarray.Dataset): try: filename = 'OMFITxarray_' + omfit_hash(repr(kid), 10) + '.nc' exportDataset(me[kid], filename) f.write(entryID + ' <-:-:-> importDataset <-:-:-> ' + mypath + filename + ' <-:-:-> \n') saved = True _print(entryID) except Exception as _excp: if isinstance(_excp, OSError) and _excp.errno == errno.ENOSPC: raise pass # if the save as NetCDF fails, the data will be saved as pickle elif isinstance(me[kid], pandas.DataFrame): try: filename = 'OMFITpandas_' + omfit_hash(repr(kid), 10) + '.json' me[kid].to_json(filename) f.write(entryID + ' <-:-:-> pandas_read_json <-:-:-> ' + mypath + filename + ' <-:-:-> \n') saved = True _print(entryID) except Exception as _excp: if isinstance(_excp, OSError) and _excp.errno == errno.ENOSPC: raise pass # if the save as NetCDF fails, the data will be saved as pickle elif (isinstance(me[kid], np.ndarray) and me[kid].dtype != 'object') and isinstance(me, (OMFITtree, OMFITlist)): try: filename = 'OMFITndarray_' + omfit_hash(repr(kid), 10) + '.npy' np.save(filename, me[kid], allow_pickle=False) f.write(entryID + ' <-:-:-> importNdarray <-:-:-> ' + mypath + filename + ' <-:-:-> \n') saved = True _print(entryID) except Exception as _excp: if isinstance(_excp, OSError) and _excp.errno == errno.ENOSPC: raise pass # if the save as npy fails, the data will be saved as pickle if not saved and isinstance( me, (OMFITtree, OMFITlist) ): # this is to save quantities which are under a OMFITtree/OMFITlist but are not associated to a OMFITobject etc... # Keep track of entries that are pickled if os.environ.get('USER', '') in [ 'meneghini', 'smithsp', 'eldond', 'ssmith', 'edeshaze', 'gavdeeva', 'todstrci', 'kthome', 'omenegh2', 'shaskey', 'nlogan', 'logannc', 'haskeysr', 'thomek', 'odstrcilt', 'deshazere', 'avdeevag', ]: pickled_objects.append(entryID) try: tmp = pickle.dumps(me[kid], pickle.OMFIT_PROTOCOL) filename = 'OMFITpickled_' + omfit_hash(repr(kid), 10) if not onlyOMFITsave: with open(filename, 'wb') as f1: f1.write(tmp) f.write( entryID + ' <-:-:-> pickle <-:-:-> ' + mypath + filename + ' <-:-:-> ' + generateOMFITproperties(me, kid) + '\n' ) except Exception as _excp1: if isinstance(_excp1, OSError) and _excp1.errno == errno.ENOSPC: raise try: import dill filename = 'OMFITdilled_' + omfit_hash(repr(kid), 10) if not onlyOMFITsave: with open(filename, 'wb') as f1: dill.dump(me[kid], f1) f.write( entryID + ' <-:-:-> dill <-:-:-> ' + mypath + filename + ' <-:-:-> ' + generateOMFITproperties(me, kid) + '\n' ) except Exception as _excp2: if isinstance(_excp2, OSError) and _excp2.errno == errno.ENOSPC: raise printe( entryID + ' is not an OMFITobject and could not be pickled and has been repr(...) instead!\nPICKLE:%s\nDILL:%s\nThis may not load correctly, please try to fix this if you can.' % (repr(_excp1), repr(_excp2)) ) f.write( entryID + ' <-:-:-> ' + tp + ' <-:-:-> ' + re.sub(r'\n', r'\\n', repr(me[kid])) + ' <-:-:-> ' + generateOMFITproperties(me, kid) + '\n' ) # list of all the reasons for which the save should not go deeper into an object if isinstance(me[kid], OMFITexpression): pass elif isinstance(me[kid], OMFITtree) and me[kid].filename and (me[kid].modifyOriginal or me[kid].readOnly): # if readOnly we skip because we should not save # if modifyOriginal we skip because we have deployed separately pass elif isinstance(me[kid], OMFITdir): pass elif isinstance(me[kid], OMFITmds): pass elif isinstance(me[kid], _OMFITnoSave): pass elif hasattr(me[kid], 'dynaLoad') and me[kid].dynaLoad: pass elif isinstance(me[kid], OMFITstorage) and skipStorage: pass elif ( isinstance(me[kid], tuple(OMFITtypes)) and not isinstance(me[kid], OMFITsettings) and not isinstance(me[kid], OMFITmainSettings) and not isinstance(me[kid], OMFITnamelist) ): # do not go inside of OMFITtypes, unless it's OMFITsettings, OMFITmainSettings or OMFITnamelists, since there can be OMFITexpressions that need to be saved there pass else: kidName = "[" + repr(kid) + "]" entryID = myLocation + kidName f_traverse(me[kid], entryID, path) if changed_dir: if not onlyOMFITsave: os.chdir('..') # setup the save filenames and directories oldDir = os.getcwd() if onlyOMFITsave: if not os.path.splitext(filename)[1] == '.txt': filename = filename + os.sep + 'OMFITsave.txt' file = os.path.abspath(filename) else: filename = os.path.abspath(filename) if zipfile.is_zipfile(filename) or os.path.splitext(filename)[1] == '.zip': filename = os.path.splitext(filename)[0] + os.sep + 'OMFITsave.txt' zip = True elif not os.path.splitext(filename)[1] == '.txt': filename = filename + os.sep + 'OMFITsave.txt' file = os.path.split(filename)[1] directoryOrig = os.path.split(filename)[0] # `directory` is the temporary save directory tmpSaveDir = OMFITcwd + os.sep + 'tmpSaveDir_' + utils_base.now("%Y-%m-%d__%H_%M_%S__%f") directory = tmpSaveDir + os.sep + os.path.split(directoryOrig)[1] if not os.path.exists(directory): os.makedirs(directory) os.chdir(directory) try: with open(file, 'w') as f: global OMFITexpressionsReturnNone tmpExp = OMFITexpressionsReturnNone OMFITexpressionsReturnNone = True pickled_objects = [] try: if only is None: f_traverse(self, '', '.' + os.sep) else: f_traverse( eval('self' + buildLocation(parseLocation(only)[:-1])), buildLocation(parseLocation(only)[:-1]), '.' + os.sep ) finally: OMFITexpressionsReturnNone = tmpExp if not onlyOMFITsave: if pickled_objects: # If printe or printw then fails to quick-save _print('The following objects were pickled:') for item in pickled_objects: _print(f"- {item}") if not zip: # if you got here, over-write original directory _print('\nMoving files to save directory: ' + directoryOrig + ' ' + directory) if not updateExistingDir: if os.path.exists(directoryOrig): shutil.rmtree(directoryOrig) try: shutil.move(directory, directoryOrig) except PermissionError as _excp: # on windows it raise an error, but content of folders is moved if os.name == 'nt': printe(_excp) else: raise else: update_dir(directory, directoryOrig) if os.path.exists(directory): shutil.rmtree(directory) return filename else: # compress directory os.chdir('..') filenameZip = os.path.abspath(directoryOrig + '.zip') _print('\nCompressing files to archive: ' + filenameZip) zipfolder(os.path.split(directoryOrig)[1], filenameZip) return filenameZip finally: os.chdir(oldDir) # remove compress directory if still present if not onlyOMFITsave and directoryOrig != directory and os.path.exists(directory): shutil.rmtree(directory) def _load( self, filename, only=None, modifyOriginal=False, readOnly=False, quiet=False, developerMode=False, lazyLoad=bool(safe_eval_environment_variable('OMFIT_LAZYLOAD', False)), ): global OMFITexpressionsReturnNone tmpExp = OMFITexpressionsReturnNone OMFITexpressionsReturnNone = True if not isinstance(quiet, dict): quiet = {'quiet': quiet} newline = quiet.get('newline', False) clean = quiet.get('clean', False) mess = quiet.get('mess', lambda x: x[0]) style = quiet.get('style', ' [{sfill}{svoid}] {perc:3.2f}% {mess}') quiet = quiet.get('quiet', quiet) oldDir = os.getcwd() if isinstance(only, str): only = [only] try: # extract archive if it's a zip file OMFITaux['noCopyToCWD'] = modifyOriginal if zipfile.is_zipfile(filename): if not quiet: printi('Deflating ZIP save file...') print('', end='') filenameZip = filename # use cherry_pick_OMFITsave for partial extraction of data from zip file (faster) deflate_dir = OMFITcwd + os.sep + 'deflate_' + utils_base.now("%Y-%m-%d__%H_%M_%S__%f") filename = cherry_pick_OMFITsave(filename, only, deflate_dir=deflate_dir) OMFITaux['noCopyToCWD'] = True # read save file OMFITsave.txt if not quiet and not os.path.abspath(filename).startswith(os.path.abspath(OMFITsrc + '/../modules/')): printi('Reading ' + os.path.abspath(filename)) os.chdir(os.path.split(filename)[0]) with open(filename, 'r') as f: text = f.read().split('\n') # split each line in the save file split_pttrn = re.compile(' *<-:-:-> *') split_text = [] for line in text: if len(line): split_text.append(re.split(split_pttrn, line)) # load all tree objects for line in ascii_progress_bar(split_text, mess=mess, quiet=quiet, newline=newline, clean=clean, style=style, width=10): only_match = [line[0].startswith(str(k)) for k in tolist(only)] if only is None or np.any(only_match): if np.any(only_match): need_path = only[only_match.index(True)] self.addBranchPath(need_path) # these are for backward compatibility if line[1] == 'OMFITpython': line[1] = 'OMFITpythonTask' elif line[1] == 'OMFITdict': line[1] = 'SortedDict' elif line[1] == 'OMFITfileASCII': line[1] = 'OMFITascii' elif line[1] == 'OMFIT_Ufile': line[1] = 'OMFITuFile' elif line[1] == 'gksoutClass': line[1] = 'OMFITgksout' elif line[1].startswith('OMFIT_tglf'): line[1] = line[1].replace('OMFIT_tglf', 'OMFITtglf') elif line[0] == "['help']" and line[1] == 'OMFITascii' and line[2].endswith('help.rst'): line[1] = 'OMFIThelp' try: issubclass(eval(line[1]), OMFITmds) classicClass = True except Exception: classicClass = False if line[0].strip().endswith('[+]'): line[0] = line[0].strip()[:-3] exec('self%s.append(None)' % line[0]) n = eval('len(self%s)' % line[0]) - 1 line[0] = line[0] + "[%d]" % n try: if line[1] in ['OMFITexpression', 'OMFITiterableExpression']: exec('self' + line[0] + '={}') # these entries will be over-written later during the load elif classicClass and issubclass(eval(line[1]), OMFITlist): exec('self' + line[0] + '=' + line[1] + '()') elif classicClass and issubclass(eval(line[1]), OMFITtree): developerModeString = {False: '', True: ',developerMode=True', 3: ',developerMode=3'}[developerMode] if len(line[3]): exec( 'self' + line[0] + '=' + line[1] + '(' + repr(line[2]) + developerModeString + ',**' + line[3] + ')', globals(), locals(), ) else: exec( 'self' + line[0] + '=' + line[1] + '(' + repr(line[2]) + developerModeString + ')', globals(), locals() ) elif line[1] in OMFITtypesStr or line[1] in ['importDataset', 'OMFITeqdsk', 'pandas_read_json']: kw = {} if len(line) > 3 and len(line[3]): kw = eval(line[3]) if line[1] == 'pandas_read_json': kw['convert_axes'] = False if lazyLoad and line[1] in ['importDataset', 'pandas_read_json']: exec( 'self' + line[0] + '=OMFITlazyLoad(' + repr('OMFIT' + line[0]) + ',' + repr(line[1]) + ',' + repr(line[2]) + ',**' + repr(kw) + ')' ) else: if os.path.abspath(line[2]) == line[2] or (developerMode and line[1].startswith('OMFITpython')): kw['modifyOriginal'] = True exec('self' + line[0] + '=' + line[1] + '(' + repr(line[2]) + ',**' + repr(kw) + ')') elif classicClass and issubclass(eval(line[1]), OMFITmds): if len(line[2]): # backward compatibility line[3] = {} line[3]['treename'], tmp = line[2].split(' #')[:2] line[3]['shot'], line[3]['server'] = tmp.split(' @ ')[:2] line[3]['shot'] = line[3]['shot'].strip('\'"') line[3]['subtree'] = '' line[3] = repr(line[3]) tmp = eval(line[1])(**eval(line[3])) exec('self' + line[0] + "=tmp") elif classicClass and issubclass(eval(line[1]), OMFITmdsValue): if len(line[2]): # backward compatibility line[3] = {} line[3]['treename'], tmp = line[2].split(' #')[:2] line[3]['shot'], tmp = tmp.split(' @ ')[:2] line[3]['shot'] = line[3]['shot'].strip('\'"') line[3]['server'], line[3]['TDI'] = tmp.split('|')[:2] line[3] = repr(line[3]) tmp = eval(line[1])(**eval(line[3])) exec('self' + line[0] + "=tmp") elif line[1] in ['OMFITtoksearch', 'OMFITwebLink']: tmp = eval(line[1])(**eval(line[3])) exec('self' + line[0] + "=tmp") elif line[1] == 'importNdarray': exec(f'self{line[0]}=np.load({repr(line[2])})') elif line[1] in ['pickle', 'OMFITpickle']: if lazyLoad and os.stat(line[2]).st_size > 1024.0: exec( 'self' + line[0] + '=OMFITlazyLoad(' + repr('OMFIT' + line[0]) + ',"OMFITpickle",' + repr(line[2]) + ')' ) else: exec('self' + line[0] + '=OMFITpickle(' + repr(line[2]) + ')') elif line[1] == 'dill': import dill with open(line[2], 'rb') as f1: exec('self' + line[0] + '=dill.load(f1)') else: exec('self' + line[0] + "=" + re.sub(r'\\n', r'\n', line[2])) except Exception as _excp: try: ctb = ", traceback=" + repr(''.join(traceback.format_exception(*sys.exc_info())).strip()) except Exception: ctb = '' try: exec( 'self' + line[0] + '=OMFITobjectError(' + repr(line[2]) + ',' + repr(line[1]) + ',error=' + repr(repr(_excp)) + ctb + ')' ) except Exception: try: exec('self' + line[0] + '=OMFITerror(' + repr(repr(_excp)) + ctb + ')') except Exception: pass printe(f'\n{repr(line)}:{repr(_excp)}') printd( '-' * 20 + '\n' + repr(line) + ': ' + repr(_excp) + '\n' + ''.join(traceback.format_exception(*sys.exc_info())).strip(), topic='project load errors', ) # now load all of the expressions for line in split_text: if only is None or np.any([line[0].startswith(k) for k in tolist(only)]): if line[1] in ['OMFITexpression', 'OMFITiterableExpression']: if line[2][0] == '_': # read new way of saving OMFITexpressions expr = line[2][1:] # backward compatibility to allow opening and running old TRANSP project expr = expr.replace("datetime.date(2000, 01, 01)", "datetime.date(2000, 1, 1)") tmp = OMFITexpression(eval(expr)) else: # read old way of saving OMFITexpressions (backward compatibility) tmp = OMFITexpression(re.sub(r'\\n', r'\n', line[2])) try: exec('self' + line[0] + "=tmp") except Exception as _excp: printe(repr(line) + ': ' + repr(_excp)) # add __scratch__ if this is a OMFITmodule class # (needs to happen before expression evaluation) if isinstance(self, OMFITmodule): self['__scratch__'] = OMFITtmp() # now evaluate all expressions # to determine if these are iterable or not OMFITexpressionsReturnNone = False for line in split_text: if (only is None or np.any([line[0].startswith(k) for k in tolist(only)])) and not line[0].startswith( "['SETTINGS']['DEPENDENCIES']" ): if line[1] in ['OMFITexpression', 'OMFITiterableExpression']: if quiet > 1: printi('eval expression: ' + line[0]) try: exec('self' + line[0] + "._value_()") except Exception: pass except Exception: printe('Not a valid OMFITsave file') raise finally: os.chdir(oldDir) OMFITexpressionsReturnNone = tmpExp
[docs] def load( self, filename=None, only=None, modifyOriginal=False, readOnly=False, quiet=False, developerMode=False, lazyLoad=bool(safe_eval_environment_variable('OMFIT_LAZYLOAD', False)), ): """ method for loading OMFITtree from disk :param filename: 'directory/bla/OMFITsave.txt' or 'directory/bla.zip' where the OMFITtree will be saved (if '' it will be saved in the same folder of the parent OMFITtree) :param only: list of strings used to load only some of the branches from the tree (eg. ["['MainSettings']","['myModule']['SCRIPTS']"] :param modifyOriginal: by default OMFIT will save a copy and then overwrite previous save only if successful. If `modifyOriginal=True` and filename is not .zip, will write data directly at destination, which will be faster but comes with the risk of deleting a good save if the new save fails for some reason :param readOnly: will place entry in OMFITsave.txt of the parent so that this OMFITtree can be loaded, but will not save the actual content of this subtree. `readOnly=True` is meant to be used only after this subtree is deployed where its fileneme says it will be. Using this feature could result in much faster projects save if the content of this tree is large. :param quiet: Verbosity level :param developerMode: load OMFITpython objects within the tree as modifyOriginal :param lazyLoad: enable/disable lazy load of picklefiles and xarrays """ if filename is None: # reload filename = self.filename modifyOriginal = self.modifyOriginal readOnly = self.readOnly else: if os.path.isdir(filename): if os.path.exists(filename + os.sep + 'OMFITsave.txt'): filename = filename + os.sep + 'OMFITsave.txt' else: raise ValueError(filename + ' is a directory without OMFITsave.txt in it') self.filename = filename self.modifyOriginal = modifyOriginal self.readOnly = readOnly if not hasattr(self, 'OMFITproperties'): self.OMFITproperties = {} self.clear() if not filename: return try: TMPnoCopyToCWD = OMFITaux['noCopyToCWD'] OMFITaux['noCopyToCWD'] = False return self._load( filename, only=only, modifyOriginal=modifyOriginal, readOnly=readOnly, quiet=quiet, developerMode=developerMode, lazyLoad=lazyLoad, ) finally: OMFITaux['noCopyToCWD'] = TMPnoCopyToCWD
def _moduleDict(self, onlyModuleID=None, level=-1): """ Returns a dictionary of currently loaded modules :param onlyModuleID: string or list of strings of module ID to search for :param level: how many modules deep to go The dictionary contains the modules settings for each of the modules """ def f_traverse(me, myLocation, modDict, level): if isinstance(me, OMFITtree): for kid in list(me.keys()): kidName = "[" + repr(kid) + "]" entryID = myLocation + kidName if isinstance(me[kid], OMFITtree) and not (hasattr(me[kid], 'dynaLoad') and me[kid].dynaLoad): if isinstance(me[kid], OMFITmodule): if 'SETTINGS' not in me[kid]: continue if 'MODULE' not in me[kid]['SETTINGS']: me[kid]['SETTINGS']['MODULE'] = SortedDict() if onlyModuleID is None or me[kid]['SETTINGS']['MODULE']['ID'] in tolist(onlyModuleID): modDict[entryID] = {} for item in _moduleAttrs: if item not in me[kid]['SETTINGS']['MODULE']: modDict[entryID][item] = None else: modDict[entryID][item] = evalExpr(me[kid]['SETTINGS']['MODULE'][item]) if level != 0: f_traverse(me[kid], entryID, modDict, level=level - 1) else: f_traverse(me[kid], entryID, modDict, level=level) return modDict return f_traverse(self, myLocation='', modDict=OrderedDict(), level=level) def __setitem__(self, key, value): """ Override SortedDict.__setitem__ and ensure save/load compatibility of sub-objects """ if np.iterable(key) and '/' in key and 'OMFITtree' in get_bases(type(value)): raise OMFITexception( 'The key `' + str(key) + ' contains the `/` character, which is invalid for an ' + self.__class__.__name__ + ' type object' ) return super().__setitem__(key, value) def __save_kw__(self): """ :return: kw dictionary used to save the attributes to be passed when reloading from OMFITsave.txt """ if self.modifyOriginal: self.OMFITproperties['modifyOriginal'] = self.modifyOriginal elif 'modifyOriginal' in self.OMFITproperties: del self.OMFITproperties['modifyOriginal'] if self.readOnly: self.OMFITproperties['readOnly'] = self.readOnly elif 'readOnly' in self.OMFITproperties: del self.OMFITproperties['readOnly'] return self.OMFITproperties
[docs] def close(self): """ Recursively calls the .close() method on OMFITobjects and OMFITtree objects """ for item in self: if isinstance(self[item], (OMFITobject, OMFITtree)): self[item].close()
[docs] def populate_from_directory(self, dirname, extensions={}, **kw): """ Load files from a directory maintaning directory structure :param dirname: directory path :param extensions: dictionary mapping filename expression to OMFIT classes, for example: {'*.dat': 'OMFITnamelist', '*.xml': 'OMFITxml'} """ def traverse(obj, dd, path=''): for k in dd: if path: p = path + os.sep + k else: p = dirname + os.sep + k # directory if isinstance(dd[k], dict): obj[k] = OMFITtree() traverse(obj[k], dd[k], p) # leaf else: match = False import fnmatch for ext in extensions: if fnmatch.fnmatch(p, ext): exec('from omfit_classes.omfit_python import ' + extensions[ext], globals(), locals()) obj[k] = eval(extensions[ext])(p, **kw) match = True break if not match: if is_binary_file(p): obj[k] = OMFITpath(p, **kw) else: obj[k] = OMFITascii(p, **kw) return obj dd = dir2dict(dirname)[dirname] return traverse(self, dd, '')
def cherry_pick_OMFITsave(filename, only=None, deflate_dir=None): """ Cherry pick data from a OMFIT save data file :param filename: path to the OMFIT save file (zipped or not) :param only: string or list of strings of the data to be loaded If None, then all of the data is extracted :param deflate_dir: extract data to this folder :return: either data or path to the OMFITsave.txt file that was extracted """ if isinstance(only, str): only = [only] if deflate_dir is None: deflate_dir = OMFITcwd + os.sep + 'projects_preview' + os.sep deflate_dir = os.path.abspath(deflate_dir) + os.sep if not os.path.exists(deflate_dir): os.makedirs(deflate_dir) # zip file extraction if zipfile.is_zipfile(filename): isZip = True myzipfile = zipfile.ZipFile(filename, 'r', allowZip64=True) basepath = os.path.split(myzipfile.namelist()[0])[0] if basepath: basepath = basepath + os.sep omfitsavetxt = deflate_dir + basepath + 'OMFITsave.txt' if only is None: myzipfile.extractall(deflate_dir) # return the omfitsavetxt return omfitsavetxt else: myzipfile.extract(basepath + 'OMFITsave.txt', deflate_dir) # "fake" zipfile extraction if not compressed else: basepath = '' omfitsavetxt = deflate_dir + 'OMFITsave.txt' shutil.copy2(filename, deflate_dir) load_dir = os.path.split(filename)[0] + os.sep isZip = False # OMFITsave with open(omfitsavetxt, 'r') as f: lines = f.read().split('\n') # filter only the data to be loaded mini_OMFITsave = [] for line in lines: if any(line.startswith(location) for location in only): mini_OMFITsave.append(line) # only extract the files that are needed for line in mini_OMFITsave: tmp = line.split(' <-:-:-> ') if tmp[2].startswith('./'): filename = re.sub(r'^\./', '', tmp[2]) if isZip: for fi in myzipfile.namelist(): if fi.startswith(basepath + filename): myzipfile.extract(fi, deflate_dir) else: shutil.copy2(load_dir + filename, deflate_dir) # Update OMFITsave.txt with only the relevant entries with open(omfitsavetxt, 'w') as f: f.write('\n'.join(mini_OMFITsave)) # return the omfitsavetxt return omfitsavetxt
[docs]class OMFITlist(list): """ list of which the individual values are saved to disk as it is done for the key:value pairs in an OMFITtree object """ pass
class specialList(list): def __call__(self, *args, **kw): out = specialList() for k in self: out.append(k.__call__(*args, **kw)) if len(out) and is_numeric(out[0]): out = np.array(out).T return out def __getattr__(self, key): if key in ['__array_struct__', '__array_interface__', '__array__']: raise AttributeError('bad attribute `%s`' % key) out = specialList() for k in self: out.append(getattr(k, key)) if len(out) and is_numeric(out[0]): out = np.array(out).T return out def _collection_decorator(f): def ff(*args, **kw): self = args[0] fname = f.__name__ global OMFITexpressionsReturnNone if OMFITexpressionsReturnNone or self.selector is None: return f(*args, **kw) elif fname in ['_keyCaseInsensitive']: # discard existing class decorators return f(*args, **kw) else: return getattr(super().__getitem__(self.selector), fname)(*args[1:], **kw) return ff def _for_all_inherited_methods(decorator): def decorate(cls): for name, fn in inspect.getmembers(cls, inspect.ismethod): if name not in cls.__dict__: # decorate inherited methods only, not overloaded or new methods setattr(cls, name, decorator(fn)) return cls return decorate
[docs]def module_selective_deepcopy(location, classes): """ Function that returns the equivalent of a module copy.deepcopy but can selectively include entries based on their class :param location: string with OMFIT tree location where the module resides :param classes: list of strings with allwed classes :return: copy of the module containing a deepcopy of the allowed classes """ treeClasses = ['OMFITtree', 'OMFITmodule'] # traverse the tree and avoid going deeper in the classes that are not of interet tmp_tree = traverse(eval(location), onlyDict=tuple(map(eval, classes + treeClasses)), skipDynaLoad=True) tmp_cls = traverse(eval(location), onlyLeaf=tuple(map(eval, classes)), skipDynaLoad=True) # prune empty trees for k, tr in list(enumerate(tmp_tree))[::-1]: keep = False for cl in tmp_cls: if tr in cl: keep = True break if not keep: tmp_tree.pop(k) # build structure and deepcopy module = OMFITmodule() for item in tmp_tree: obj = eval(location + item) if obj.__class__.__name__ in treeClasses: exec('module%s=%s()' % (item, obj.__class__.__name__)) else: exec('module%s=copy.deepcopy(obj)' % (item)) return module
[docs]def module_noscratch_deepcopy(module_root): """ deepcopy of a module (and submodules) but neglecting the scratch '__scratch__' directories :param module_root: instance of the module to be copied :return: module deepcopy """ tmp = traverse(module_root, onlyLeaf=(OMFITtmp,), skipDynaLoad=True) try: bkp = [] for item in tmp: if '__scratch__' in item: bkp.append(eval('module_root' + item)) exec('module_root' + item + '=OMFITtmp()') return copy.deepcopy(module_root) finally: for item in tmp: if '__scratch__' in item: exec('module_root' + item + '=bkp.pop(0)')
class env_from_import(dict): """ This class is used to expose the namespace of an import as a dictionary, which is useful to pass such namespace to eval() or exec() statements """ import_namespace = None def __getitem__(self, key): if key in self: return dict.__getitem__(self, key) elif self.import_namespace is not None: return getattr(self.import_namespace, key) else: raise KeyError(key)
[docs]@_for_all_inherited_methods(_collection_decorator) class OMFITcollection(OMFITtree): """ A class for holding sets of similar objects :param selector: sets selection of a specific realization (`None` for all realizations, `'random'` for a random realization). This can also be an OMFITexpression. :param strategy: sets operation to be performed on all realizations (if `.selector == None`) :param raise_errors: sets how to proceed in case of errors (eg. missing objects, attributes) * `None`: print warning on errors * `True`: raise errors * `False`: ignore errors :param no_subdir: This keyword affects how the OMFITcollection is deployed. * `False` (default) the OMFITcollection is deployed like a normal OMFITtree, such that the files of the collection are deployed under a subdirectory, whose name comes from the entry in the tree * `True` the files are deployed in the same level directory as the parent `.selector`, `.strategy`, `.raise_errors` can be modified after the object is instantiated >> tmp=OMFITcollection(raise_errors=True) >> for k in range(1,10): >> tmp[k]={} >> tmp[k]['hello']=np.linspace(0,1,100)**k >> >> #return a single realization >> tmp.selector=5 >> tmp.strategy=None >> pyplot.plot(tmp['hello'],'--r',lw=2) >> >> #return all realizations >> tmp.selector=None >> tmp.strategy=None >> plotc(tmp['hello']) >> >> #attribute on all realizations >> tmp.selector=None >> tmp.strategy=None >> print(tmp['hello'].mean()) >> >> #perform operation on all realizations >> tmp.selector=None >> tmp.strategy='np.mean(x,axis=1)' >> pyplot.plot(tmp['hello'],'k--',lw=2) >> >> OMFIT['variable']=3 >> tmp.selector=OMFITexpression("OMFIT['variable']") >> print(tmp) >> >> print(tmp.GET(3)['hello']-tmp['hello']) >> >> # to update values, you can use the UPDATE method >> tmp.UPDATE(location="['hello'][0]",values=-1e6) >> tmp.selector=None >> tmp.strategy=None >> print(tmp['hello'].mean()) """ def __init__( self, filename='', modifyOriginal=False, readOnly=False, quiet=False, selector=None, strategy=None, raise_errors=None, warn=False, no_subdir=False, **kw, ): OMFITtree.__init__(self, filename=filename, modifyOriginal=modifyOriginal, readOnly=readOnly, quiet=quiet, **kw) self.OMFITproperties = {'modifyOriginal': modifyOriginal, 'readOnly': readOnly} self.selector = selector self.strategy = strategy self.raise_errors = raise_errors self.warn = warn self._filename = filename self.OMFITproperties['no_subdir'] = no_subdir def _warning(self, k, _excp): if self.warn: printw('Error accessing item `' + str(k) + '` : ' + repr(_excp)) @property def selector(self): global OMFITexpressionsReturnNone if OMFITexpressionsReturnNone: return None if 'OMFITproperties' not in self.__dict__: return None if isinstance(self.OMFITproperties['selector'], str) and self.OMFITproperties['selector'].startswith('_!OMFITexpression!_'): selector_string = self.OMFITproperties['selector'][len('_!OMFITexpression!_') :] locals = {} if self._OMFITparent is not None: locals.update(relativeLocations(self)) try: selector = eval(selector_string, globals(), locals) except Exception: try: exec(selector_string, globals(), locals) if 'return_variable' in locals: selector = locals['return_variable'] else: selector = None except Exception: raise else: selector = self.OMFITproperties['selector'] return selector @selector.setter def selector(self, value): if value == 'random': import random self.OMFITproperties['selector'] = random.choice(self.KEYS()) elif isinstance(value, OMFITexpression): self.OMFITproperties['selector'] = '_!OMFITexpression!_' + value.expression else: self.OMFITproperties['selector'] = value @selector.deleter def selector(self): self.OMFITproperties['selector'] = None @property def filename(self): global OMFITexpressionsReturnNone if OMFITexpressionsReturnNone or self.selector is None: return self._filename else: return self.GET(self.selector).filename @filename.setter def filename(self, value): if self.selector is None: self._filename = value else: self.GET(self.selector).filename = value @property def strategy(self): if self.OMFITproperties['strategy'] is None: return 'x' else: return self.OMFITproperties['strategy'] @strategy.setter def strategy(self, value): if not (isinstance(value, str) or value is None): raise OMFITexception('strategy should be either a string or `None`') if isinstance(value, str) and 'x' not in value: raise OMFITexception('strategy string should cointain `x`') self.OMFITproperties['strategy'] = value @strategy.deleter def strategy(self): self.OMFITproperties['strategy'] = None @property def raise_errors(self): return self.OMFITproperties['raise_errors'] @raise_errors.setter def raise_errors(self, value): self.OMFITproperties['raise_errors'] = value @raise_errors.deleter def raise_errors(self): self.OMFITproperties['raise_errors'] = None
[docs] def pload(self, nprocs, no_mpl_pledge=True, **kw): r""" Parallel loading of all entries in an OMFITcollection. :param: nprocs: the number of processes to use while loading. (Might be decreased to prevent memory overage. :param: no_mpl_pledge: pledge that the load function does not use matplotlib. Its probably not needed anymore. """ # backward compatibility runNoGUI = True # human-readable run IDs and labels customids = True step_name = list(self.keys()) step_label = step_name nsteps = len(self.keys()) # set number of processors to reasonable default if not set if nprocs is None: nprocs = int(os.environ.get('SLURM_TASKS_PER_NODE', os.environ.get('OMP_NUM_THREADS', '4')).split(',')[0].split('(')[0]) # max_nprocs = self.estimate_nprocs_from_available_memory() # if nprocs > max_nprocs and not bool(os.environ.get('OMFIT_ALLOW_VIRTUAL_MEM', '')): # nprocs = max_nprocs if nprocs <= 1: nprocs = 1 myname = treeLocation(self)[-1] if not myname: myname = os.path.split(self.filename)[-1] printi('Starting parallel loading of ' + myname) try: steps = list(range(nsteps)) results = OMFITtree() nprocs = int(nprocs) step = [None] * nprocs p = [None] * nprocs p_alive = [0] * nprocs allDone = [0] # Get all the objs to be loaded in a list objs_to_load = [None] * nsteps for ii in range(nsteps): objs_to_load[ii] = self[step_name[ii]] def runLoad(nprocs, process_id, nsteps, step_id, obj_to_load, kw): whatHappened = '' OMFITaux['pythonRunWindows'] = [None] OMFITaux['prun_process'].append(process_id) OMFITaux['prun_nprocs'].append(nprocs) # Check for basic feasibility if 'load' not in dir(obj_to_load): process_result = None whatHappened = '\n' + f'{step_name[step_id]} failed to load: no load method' + '\n' retcode = -1 try: # force different seed initialization for random processes # here we purposly use Python's native __hash__ function # which returns an integer pylab.seed(omfit_numeric_hash(str(time.time()), 10) * (step_id + 1) % 4294967295) also_load_flux_surfaces = kw.pop('also_load_flux_surfaces', False) tmp = obj_to_load.load(**kw) except Exception as _excp: process_results = _excp etype, value, tb = sys.exc_info() excpStack = traceback.format_exception(etype, value, tb) whatHappened = '\n' + '\n'.join(excpStack) + '\n' retcode = -1 else: if also_load_flux_surfaces: try: if not 'fluxSurfaces' in obj_to_load.keys(): obj_to_load.addFluxSurfaces() obj_to_load['fluxSurfaces'].load() except KeyError: # This error occures when gfiles indicate `vacuum equilbiria` # fluxSurfaces will failed to be added. We just skip it. whatHappened = '\n' + f'\nvacuum equilibrium: fluxSurfaces skipped for t = {step_name[step_id]}' + '\n' process_results = OMFITtree() process_results['loaded_obj'] = obj_to_load retcode = 0 # pickle process results filename = OMFITcwd + os.sep + 'pickle_' + '_'.join(map(str, OMFITaux['prun_process'])) try: with open(filename, 'wb') as f: pickle.dump( [retcode, process_results, sys.stdout.getvalue(), sys.stderr.getvalue() + whatHappened], # actual results f, pickle.HIGHEST_PROTOCOL, ) except Exception as exc: printe('Failed to pickle results:', repr(exc)) with open(filename, 'wb') as f: pickle.dump( [retcode, exc, sys.stdout.getvalue(), sys.stderr.getvalue() + whatHappened], # resulting exception f, pickle.HIGHEST_PROTOCOL, ) def pManager(): for k in range(nprocs): if p[k] is not None: p_alive[k] = int(p[k].is_alive()) else: p_alive[k] = 0 # retrieval while np.any([(p_alive[kk] == 0 and step[kk] is not None) for kk in range(len(p_alive))]): k = [(p_alive[kk] == 0 and step[kk] is not None) for kk in range(len(p_alive))].index(True) # unpickle process results retry = 0 max_retry = 1 excp = '' while retry <= max_retry: retry = retry + 1 try: filename = OMFITcwd + os.sep + 'pickle_' + '_'.join(map(str, OMFITaux['prun_process'] + [k])) with open(filename, 'rb') as f: retcode, results[step_name[step[k]]], stdout, stderr = pickle.load(f) os.remove(filename) break except Exception as _excp: if isinstance(_excp, FileNotFoundError) and retry <= max_retry: # wait and retry, it might be a FS sync issue. sleep(1) else: results[step_name[step[k]]] = _excp retcode = -1 stdout = [] stderr = [] excp = repr(_excp) break _streams['INFO'].write('===============================\n') _streams['INFO'].write('--> STEP {s} OF {n}\n'.format(s=step[k] + 1, n=nsteps)) _streams['INFO'].write('===============================\n') if stdout: _streams['PROGRAM_OUT'].write(''.join(stdout) + '\n') if stderr: _streams['PROGRAM_ERR'].write(''.join(stderr) + '\n') if excp: _streams['STDERR'].write(excp + '\n') step[k] = None p[k] = None # start new while len(steps) and not np.all(p_alive): k = p_alive.index(0) if nsteps <= nprocs: step[k] = steps.pop(0) else: # shuffle run step for load balancing purposes step[k] = steps.pop(pylab.randint(len(steps))) # generate input dictionary kw0 = {} for kk in list(kw.keys()): try: if isinstance(kw[kk], str): raise TypeError() kw0[kk] = kw[kk][step[k]] except Exception: kw0[kk] = kw[kk] with parallel_environment(mpl_backend=not no_mpl_pledge): p[k] = multiprocessing.Process(target=runLoad, args=(nprocs, k, nsteps, step[k], objs_to_load[step[k]], kw0)) p[k].start() p_alive[k] = int(p[k].is_alive()) # check if everything was all done if allDone[0] == 0 and not len(steps) and not np.any([step[kk] is not None for kk in range(len(p_alive))]): allDone[0] = 1 # toggle between completed and active processes in progressBar progressBarToggle = False # while there are still some steps to be run or there are processes running T = time.time() while allDone[0] == 0: sleep(0.01) pManager() finally: pass if allDone[0] > 0: # Get results and placed them in the right place. myname = treeLocation(self)[-1] if not myname: myname = os.path.split(self.filename)[-1] printi('Ended parallel execution of ' + myname) sorted_results = OMFITtree() for key in step_name: # step_name comes from the keys if ( key in results and not isinstance(results[key], Exception) and results[key]['loaded_obj'] is not None and not isinstance(results[key]['loaded_obj'], Exception) ): self[key] = results[key]['loaded_obj'] return results else: raise EndAllOMFITpython('\n\n---> Aborted by user <---\n\n')
def __getstate__(self): global OMFITexpressionsReturnNone tmpExp = OMFITexpressionsReturnNone OMFITexpressionsReturnNone = True try: tmp = super().__getstate__() return tmp finally: OMFITexpressionsReturnNone = tmpExp def __setstate__(self, dict): global OMFITexpressionsReturnNone tmpExp = OMFITexpressionsReturnNone OMFITexpressionsReturnNone = True try: return super().__setstate__(dict) finally: OMFITexpressionsReturnNone = tmpExp def __deepcopy__(self, memo): global OMFITexpressionsReturnNone tmpExp = OMFITexpressionsReturnNone OMFITexpressionsReturnNone = True try: return pickle.loads(pickle.dumps(self, pickle.HIGHEST_PROTOCOL)) finally: OMFITexpressionsReturnNone = tmpExp
[docs] def KEYS(self): """ Returns list of keys within the OMFITcollection, regardless of the value of the `.selector`. This is equivalent to `self.keys()` when `self.selector=None`. """ return self.keyOrder
[docs] def SET(self, key, value): """ Writes the `key` entry in the collection dictionary, regardless of the value of the `.selector`. This is equivalent to `self[key]=value` when `self.selector=None`. :param key: entry of the collection :param value: value to be given """ return super().__setitem__(key, value)
[docs] def GET(self, key): """ Returns the `key` entry from the collection dictionary, regardless of the value of the `.selector`. This is equivalent to `self[key]` when `self.selector=None`. :param key: entry of the collection """ return super().__getitem__(key)
[docs] def CONTAINS(self, key): """ Returns whether `key` is in the collection dictionary, regardless of the value of the `.selector`. This is equivalent to key in self when `self.selector=None`. :param key: entry of the collection """ return super().__contains__(key)
def __contains__(self, key): global OMFITexpressionsReturnNone if OMFITexpressionsReturnNone or key in self.KEYS(): return self.CONTAINS(key) elif self.selector is not None: return key in self.GET(self.selector) else: if not self.CONTAINS(key): if not len(self.KEYS()): return False for k in self.KEYS(): try: self.GET(k)[key] except Exception: return False return True def __getitem__(self, key): global OMFITexpressionsReturnNone if OMFITexpressionsReturnNone or key in self.KEYS(): return self.GET(key) elif self.selector is not None: return self.GET(self.selector).__getitem__(key) else: is_dict = False for k in self.KEYS(): try: tmp = self.GET(k)[key] if isinstance(tmp, (dict, xarray.Dataset)): is_dict = True break except Exception as _excp: if self.raise_errors is True: raise elif self.raise_errors is None: self._warning(k, _excp) else: continue if is_dict: tmp = self.__class__() tmp.selector = self.selector tmp.strategy = self.strategy tmp.raise_errors = self.raise_errors for k in self.KEYS(): try: tmp[k] = self.GET(k)[key] except Exception as _excp: if self.raise_errors is True: raise elif self.raise_errors is None: self._warning(k, _excp) else: continue return tmp else: out = specialList() for k in self.KEYS(): try: out.append(self.GET(k)[key]) except Exception as _excp: if self.raise_errors is True: raise elif self.raise_errors is None: self._warning(k, _excp) else: continue if len(out) and is_numeric(out[0]): out = np.array(out).T if self.strategy is None: return out else: from omfit_classes import omfit_python namespace = env_from_import() namespace['x'] = out namespace.import_namespace = omfit_python return eval(self.strategy, namespace) def __setitem__(self, key, value): global OMFITexpressionsReturnNone if OMFITexpressionsReturnNone or self.selector is None or key in self.KEYS(): return self.SET(key, value) else: return self.GET(self.selector).__setitem__(key, value) def __getattr__(self, key): if key in ['__getnewargs__', '__tree_keys__']: raise AttributeError('bad attribute `%s`' % key) # print key global OMFITexpressionsReturnNone if OMFITexpressionsReturnNone or key in self.__dict__: return super().__getattr__(key) elif self.selector is not None: return self.GET(self.selector).__getattr__(key) else: out = specialList() for k in self.KEYS(): try: out.append(getattr(self.GET(k), key)) except Exception as _excp: if self.raise_errors is True: raise elif self.raise_errors is None: self._warning(k, _excp) else: continue if len(out) and is_numeric(out[0]): out = np.array(out).T return out def __tree_repr__(self): if self.selector is None: strategy = 'STRATEGY:%s' % repr(self.strategy) if self.strategy != 'x' else '' count = '--{\t%d\t}--' % len(self) if self.selector is None else '' modifyOriginal = 'modifyOriginal' if hasattr(self, 'modifyOriginal') and getattr(self, 'modifyOriginal') else '' readOnly = 'readOnly' if hasattr(self, 'readOnly') and getattr(self, 'readOnly') else '' return ' '.join([count, strategy, modifyOriginal, readOnly]), [] else: selector = 'SELECTOR[%s]: ' % repr(self.selector) if self.selector is not None else '' tmp = itemTagsValues(self.GET(self.selector))[1] return selector + tmp[0], tolist(tmp[1])
[docs] def UPDATE(self, location, values, verbose=False): """ Update the location of the ith key of self.KEYS() with the ith value of values :param location: A string indicating the the common part that will be updated :param values: An iterable of the same length as ``self.KEYS()`` or a single value to be broadcast to all keys :returns: ``None`` Example:: coll.UPDATE(location="['SAT_RULE']",values=[1]*len(coll.KEYS())) coll.UPDATE(location="['SAT_RULE']",values=1) """ if np.size(values) == 1: values = [np.atleast_1d(values)[0]] * len(self.KEYS()) if verbose: printi('OMFITcollection.UPDATE of "%s":' % location) printi(' Values (old):', eval("self" + location)) for k, v in zip(self.KEYS(), values): exec("self[k]{location} = v".format(location=location)) if verbose: printi(' Values (new):', eval("self" + location))
[docs]class OMFITmcTree(OMFITcollection): """ A class for holding results from a Monte Carlo set of runs Effectively this is a `OMFITcollection` class with default strategy set to: `uarray(np.mean(x,1),np.std(x,1))` """ def __init__(self, *args, **kw): kw.setdefault('strategy', 'np.atleast_1d(uarray(np.mean(x,-1),np.std(x,-1)))') OMFITcollection.__init__(self, *args, **kw)
[docs] def get_mean_std(self, path): printe('WARNING: get_mean_std() will be deprecated in the future') tmp_list = [] for k in self: try: eval("tmp_list.append(self[k]%s)" % path) except KeyError: continue tmp_array = np.array(tmp_list) return (tmp_array.mean(axis=0), tmp_array.std(axis=0))
[docs]class OMFITstorage(OMFITtree): pass
[docs]class OMFITtreeCompressed(OMFITobject): def __init__(self, input, **kw): quiet = kw.pop('quiet', False) if isinstance(input, str): OMFITobject.__init__(self, input, quiet=quiet, **kw) elif isinstance(input, OMFITtree): filename = input.__class__.__name__ + '__compressed_' + utils_base.now("%Y-%m-%d_%H_%M_%S_%f") OMFITobject.__init__(self, filename + '.zip', quiet=quiet, **kw) location = self.filename[:-4] input.deploy(location + os.sep + 'OMFITsave.txt', zip=True, quiet=quiet)
_moduleAttrs = ['ID', 'edited_by', 'date', 'contact', 'defaultGUI', 'commit', 'comment']
[docs]class OMFITmodule(OMFITtree): """ Class for OMFIT modules :param filename: None: create new module from skeleton, '': create an empty module """ _settings_class = OMFITsettings def __init__(self, filename='', modifyOriginal=False, readOnly=False, quiet=False, developerMode=False, **kw): # new module from skeleton if filename is None: OMFITtree.__init__( self, os.path.split(os.path.abspath(__file__))[0] + os.sep + 'skeleton' + os.sep + 'NEW_MODULE' + os.sep + 'OMFITsave.txt', quiet=quiet, ) date = utils_base.now() self['SETTINGS']['MODULE']['edited_by'] = os.environ['USER'] self['SETTINGS']['MODULE']['date'] = date self['SETTINGS']['MODULE']['contact'] = tolist(self['SETTINGS']['MODULE']['contact'], empty_lists=[None, '']) if is_email(OMFIT['MainSettings']['SETUP']['email']): self['SETTINGS']['MODULE']['contact'] += [OMFIT['MainSettings']['SETUP']['email']] self['SETTINGS']['MODULE']['python3'] = '.'.join(map(str, sys.version_info[:2])) self.filename = '' # load module from file elif len(filename): OMFITtree.__init__( self, filename, modifyOriginal=modifyOriginal, readOnly=readOnly, quiet=quiet, developerMode=developerMode, **kw ) # update old modules so they have all required attributes self._check_module_attributes() # convert documentation converted_legacy_help = False if 'help' in self: if not isinstance(self['help'], OMFIThelp): if os.path.split(self['help'].filename)[1] != 'help.rst': converted_legacy_help = True self['help'] = OMFIThelp('help.rst', fromString=self['help'].read()) else: self['help'] = OMFIThelp('help.rst') if isinstance(self['help'], OMFIThelp) and 'version' in self['SETTINGS']['MODULE']: self['help'].write(self['SETTINGS']['MODULE']['version'] + '\n\n' + self['help'].read()) del self['SETTINGS']['MODULE']['version'] converted_legacy_help = True if converted_legacy_help and self['help'].read().strip(): old_help = self['help'].read() self['help'].write( re.sub(r'\.\. \.(\. |\.)', '.. ', '.. ' + '\n.. '.join(OMFITaux['moduleSkeletonCache']['help'].read().split('\n'))) ) self['help'].append( '\n.. \n.. the following module documentation should be updated to comply with the schema provided above\n\n' ) self['help'].append('.. code-block:: none\n\n ' + '\n '.join(map(encode_ascii_ignore, old_help.split('\n')))) printw(str(self.ID) + "['help'] is out of date -- please consider updating it --") # store settings at import time # NOTE: this is done both here and in OMFIT.loadModule because here is needed when reloading a module and the other will execute OMFITlib_startup self._save_settings_at_import() # empty module else: OMFITtree.__init__( self, filename, modifyOriginal=modifyOriginal, readOnly=readOnly, quiet=quiet, developerMode=developerMode, **kw ) # sort objects by their class self.sort_class([OMFITtmp, OMFITmodule, OMFITtree, OMFITnamelist, OMFITsettings, OMFITascii, OMFITwebLink]) def _save_settings_at_import(self): """ Generate __SETTINGS_AT_IMPORT__ by freezing all OMFITexpressions under self['SETTINGS'] """ self.insert(self.index('SETTINGS') + 1, '__SETTINGS_AT_IMPORT__', copy.deepcopy(self['SETTINGS'])) self['__SETTINGS_AT_IMPORT__'].__class__ = OMFITjson # use json since places may assume that OMFITsettings is only root['SETTINGS'] # does not make sense to store shot/time/device etc... if 'EXPERIMENT' in self['__SETTINGS_AT_IMPORT__']: del self['__SETTINGS_AT_IMPORT__']['EXPERIMENT'] # we remove workdirs since these will always change when reloading a module if 'SETUP' in self['__SETTINGS_AT_IMPORT__'] and 'workDir' in self['__SETTINGS_AT_IMPORT__']['SETUP']: del self['__SETTINGS_AT_IMPORT__']['SETUP']['workDir'] if 'REMOTE_SETUP' in self['__SETTINGS_AT_IMPORT__'] and 'workDir' in self['__SETTINGS_AT_IMPORT__']['REMOTE_SETUP']: del self['__SETTINGS_AT_IMPORT__']['REMOTE_SETUP']['workDir'] # remove dependencies (these sometimes have expressions that are used to dynamically load modules!) if 'DEPENDENCIES' in self['__SETTINGS_AT_IMPORT__']: del self['__SETTINGS_AT_IMPORT__']['DEPENDENCIES'] # remove all expressions freezeExpr(self['__SETTINGS_AT_IMPORT__'], remove_OMFITexpressionError=True) def _get_module_attribute(self, attr): """ This function gets the attribute and sets it from the MODULES skeleton, if the attribute is missing Setting from the skeleton is not done if OMFITexpressionsReturnNone is True, which occurs when exporting/saving modules :param attr: attribute to get :return: value of the attribute """ if 'SETTINGS' not in self: self['SETTINGS'] = OMFITaux['moduleSkeletonCache']['SETTINGS'].duplicate() if 'MODULE' not in self['SETTINGS']: self['SETTINGS']['MODULE'] = SortedDict() if not isinstance(self['SETTINGS'], self._settings_class): self['SETTINGS'].__class__ = self._settings_class if attr not in self['SETTINGS']['MODULE']: return self._set_module_attribute(attr, copy.deepcopy(OMFITaux['moduleSkeletonCache']['SETTINGS']['MODULE'][attr])) return self['SETTINGS']['MODULE'][attr] def _check_module_attributes(self): for item in _moduleAttrs: setattr(self, item, getattr(self, item)) def _set_module_attribute(self, attr, value): """ This function is used to sets the attribute, unless OMFITexpressionsReturnNone is True, which occurs when exporting/saving modules :param attr: attribute to set :param value: value to set the attribute to :return: value of the attribute """ if 'MODULE' not in self['SETTINGS']: self['SETTINGS']['MODULE'] = SortedDict() global OMFITexpressionsReturnNone if not OMFITexpressionsReturnNone: self['SETTINGS']['MODULE'][attr] = value return value @property def settings_class(self): return self._settings_class @settings_class.setter def settings_class(self, value): tmp = self['SETTINGS'] try: self['SETTINGS'] = value('SettingsNamelist.txt') self['SETTINGS'].update(tmp) self['SETTINGS'].save() self._settings_class = value except Exception as _excp: printe(repr(_excp)) self['SETTINGS'] = tmp @property def ID(self): return self._get_module_attribute('ID') @ID.setter def ID(self, value): return self._set_module_attribute('ID', value) @property def edited_by(self): return self._get_module_attribute('edited_by') @edited_by.setter def edited_by(self, value): return self._set_module_attribute('edited_by', value) @property def date(self): return self._get_module_attribute('date') @date.setter def date(self, value): return self._set_module_attribute('date', value) @property def description(self): return self['help'].txt() @property def contact(self): # make sure contacts is a list return tolist(self._get_module_attribute('contact'), empty_lists=[None, '']) @contact.setter def contact(self, value): return tolist(self._set_module_attribute('contact', value), empty_lists=[None, '']) @property def defaultGUI(self): return self._get_module_attribute('defaultGUI') @defaultGUI.setter def defaultGUI(self, value): return self._set_module_attribute('defaultGUI', value) @property def commit(self): return self._get_module_attribute('commit') @commit.setter def commit(self, value): return self._set_module_attribute('commit', value) @property def comment(self): return self._get_module_attribute('comment') @comment.setter def comment(self, value): return self._set_module_attribute('comment', value)
[docs] def moduleDict(self, onlyModuleID=None, level=-1): """ Returns a dictionary of currently loaded modules :param onlyModuleID: string or list of strings of module ID to search for :param level: how many modules deep to go The dictionary contains the modules settings for each of the modules """ return self._moduleDict(onlyModuleID=onlyModuleID, level=level)
def _storeDecorator(f): """ Decorator which calls `storage` methods """ @functools.wraps(f) def decoratedStorage(self, *args, **kw): if not len(args): args = [None] args = list(args) if 'runid' in kw: args = [kw.pop('runid')] runid = args[0] if runid is None: runid = self['SETTINGS']['EXPERIMENT']['runid'] modID = self['SETTINGS']['MODULE']['ID'] if not isinstance(runid, str): raise OMFITexception(f.__name__ + ': Need to specify a runid for module ' + str(modID)) if '__STORAGE__' not in self: self['__STORAGE__'] = OMFITstorage() printi(f.__name__[:-1] + 'ing ' + str(modID) + ' runid ' + runid + ' ...') args[0] = runid try: old_runid = self['SETTINGS']['EXPERIMENT']['runid'] self['SETTINGS']['EXPERIMENT']['runid'] = runid tmp = f(self, *args, **kw) finally: self['SETTINGS']['EXPERIMENT']['runid'] = old_runid return tmp return decoratedStorage
[docs] @_storeDecorator def store(self, runid, metadata={}): """ Method to store a snapshot of the current module status and save it under self['__STORAGE__'][runid] where runid is set under self['SETTINGS']['EXPERIMENT']['runid'] :param runid: runid to be de-stored. If None the runid is taken from self['SETTINGS']['EXPERIMENT']['runid'] """ self['SETTINGS']['EXPERIMENT']['runid'] = runid tmp = OMFITtreeCompressed(self, quiet=True) self['__STORAGE__'][runid] = OMFITtree() self['__STORAGE__'][runid]['comment'] = self['SETTINGS']['EXPERIMENT'].setdefault('comment', '') self['__STORAGE__'][runid].update(metadata) self['__STORAGE__'][runid]['data'] = tmp self['__STORAGE__'].setdefault('__restoreScripts__', False)
[docs] @_storeDecorator def restore(self, runid, restoreScripts=None): """ Method to restore a snapshot of the current module status as it was saved under self['__STORAGE__'][runid] :param runid: runid to be restored. If None the runid is taken from self['SETTINGS']['EXPERIMENT']['runid'] """ tmp = OMFITtree(self['__STORAGE__'][runid]['data'].filename, quiet=True) if restoreScripts is None: restoreScripts = self['__STORAGE__'].setdefault('__restoreScripts__', False) if not restoreScripts: def f_traverse(me, myLocation='', scriptDict={}): if isinstance(me, dict): for kid in list(me.keys()): kidName = "[" + repr(kid) + "]" entryID = myLocation + kidName if isinstance(me[kid], OMFITtree): f_traverse(me[kid], entryID, scriptDict) if isinstance(me[kid], _OMFITpython): scriptDict[entryID] = me[kid] return scriptDict currentScriptDict = f_traverse(self, 'self', {}) restoreScriptDict = f_traverse(tmp, 'self', {}) storage = self['__STORAGE__'] self.clear() self.update(tmp) self['__STORAGE__'] = storage if not restoreScripts: for item in list(restoreScriptDict.keys()): del eval(buildLocation(parseLocation(item)[:-1]))[parseLocation(item)[-1]] for item in list(currentScriptDict.keys()): tmp.addBranchPath(item) eval(buildLocation(parseLocation(item)[:-1]))[parseLocation(item)[-1]] = currentScriptDict[item]
[docs] @_storeDecorator def destore(self, runid): """ Method to de-store a snapshot of the current module status as it was saved under self['__STORAGE__'][runid] :param runid: runid to be de-stored. If None the runid is taken from self['SETTINGS']['EXPERIMENT']['runid'] """ del self['__STORAGE__'][runid]
[docs] @staticmethod def info(filename): """ This (static) method returns a dictionary with the module information, including the content of the ['help'] file :param filename: module filename :return: dictionary with module info """ with open(filename, 'r') as f: lines = f.readlines() settings = {} subs = [] help = None catchSubmodule = False for line in lines: if catchSubmodule: for ll in lines: if catchSubmodule + "['SETTINGS'] <-:-:->" in ll and ( " <-:-:-> OMFITsettings <-:-:-> ./" in line or " <-:-:-> OMFITnamelist <-:-:-> ./" in line ): tmp = os.path.split(filename)[0] + os.sep + ll.split(' <-:-:-> ')[2] try: subs.append(OMFITsettings(tmp, modifyOriginal=True)['MODULE']['ID']) except Exception as _excp: printe('Error loading dependency info (%s): %s' % (tmp, repr(_excp))) break catchSubmodule = False elif " <-:-:-> OMFITmodule <-:-:->" in line: catchSubmodule = line.split(' <-:-:-> ')[0] continue elif line.startswith("['SETTINGS'] <-:-:-> OMFITsettings <-:-:-> ./") or line.startswith( "['SETTINGS'] <-:-:-> OMFITnamelist <-:-:-> ./" ): tmp = os.path.split(filename)[0] + os.sep + line.split(' <-:-:-> ')[2] try: settings.update(OMFITsettings(tmp, modifyOriginal=True, readOnly=True)['MODULE']) except Exception as _excp: printe('Error loading module info (%s): %s' % (tmp, repr(_excp))) elif line.startswith("['help'] <-:-:-> OMFIThelp <-:-:-> ./"): tmp = os.path.split(filename)[0] + os.sep + line.split(' <-:-:-> ')[2] try: help = OMFITascii(tmp, modifyOriginal=True).read() except Exception as _excp: printw('Error loading module help.rst (%s): %s' % (tmp, repr(_excp))) if help is not None and len(settings): break if len(settings): # convert old date format to new one if 'date' in settings and '/' in settings['date']: settings['date'] = utils_base.convertDateFormat(settings['date'], '%d/%m/%Y %H:%M') # set module description in `version` if help is not None: settings['description'] = help else: settings['description'] = '' # add modules submodules info settings['submodules'] = np.unique(subs).tolist() # add status info settings.setdefault('status', '') return settings
[docs] @staticmethod def directories(return_associated_git_branch=False, separator=None, checkIsWriteable=False, directories=None): """ return lists of valid modules paths :param return_associated_git_branch: whether to return just the path of each directory or also the remote/branch info. This requires parsing of the OMFIT modules in a directory and it can be quite slow, however the info is buffered, so later accesses are faster. :param separator: text to use to separate the path and the remote/branch info :param checkIsWriteable: checks if user has write access. Note: if checkIsWriteable='git' will return a directory even if it is not writable, but it is a git repository :param directories: list of directories to check. If None the list of directories is taken from OMFIT['MainSettings']['SETUP']['modulesDir'] :return: list of valid modules directories """ # make sure that OMFITsrc/../modules is always there and is the first option if directories is None: directories = tolist(evalExpr(OMFIT['MainSettings']['SETUP']['modulesDir'])) if not checkIsWriteable or checkIsWriteable == 'git' or not os.path.exists(os.sep.join([OMFITsrc, '..', 'public'])): directories.insert(0, os.sep.join([OMFITsrc, '..', 'modules'])) if 'lastModulesDir' in OMFITaux: directories.insert(0, OMFITaux['lastModulesDir']) else: directories = tolist(directories) directories = list(map(os.path.abspath, directories)) directories = list(map(os.path.realpath, directories)) if not os.path.exists(os.path.abspath(os.environ['HOME'] + os.sep + 'OMFITdata' + os.sep + 'modules') + os.sep): try: os.makedirs(os.path.abspath(os.environ['HOME'] + os.sep + 'OMFITdata' + os.sep + 'modules') + os.sep) except Exception as _excp: printe( 'Error creating directory %s: %s' % (os.path.abspath(os.environ['HOME'] + os.sep + 'OMFITdata' + os.sep + 'modules') + os.sep, repr(_excp)) ) # remove invalid paths and duplicates (while keeping order) validPaths = [] for item in directories: is_git_repo = True try: repo = OMFITgit(item) except Exception: if item.endswith('modules'): try: repo = OMFITgit(item + os.sep + '..') except Exception: is_git_repo = False if item.endswith('modules'): is_public = os.path.exists(os.sep.join([item, '..', 'public'])) else: is_public = os.path.exists(os.sep.join([item, 'public'])) if not os.path.exists(item): printd('Modules directory does not exist: %s' % item) elif not os.access(item, os.R_OK): printd('No read permission to modules directory: %s' % item) elif checkIsWriteable and is_public and not (is_git_repo and checkIsWriteable == 'git'): printd('No write permission to modules directory of public installation: %s' % item) elif checkIsWriteable and not os.access(item, os.W_OK) and not (is_git_repo and checkIsWriteable == 'git'): printd('No write permission to modules directory: %s' % item) elif item not in validPaths: validPaths.append(item) # add branch info if return_associated_git_branch: for k, item in enumerate(validPaths): directory = validPaths[k] cases_errors = [] cases = [] if os.path.split(directory)[1] == 'modules': cases.append([os.path.split(directory)[0], 'modules']) cases.append([directory, '']) for repo_dir, modules_subpath in cases: try: # try to access as a git repository repo = OMFITgit(repo_dir) commit = repo("log -1 --pretty='%H'") branch = repo('rev-parse --abbrev-ref HEAD') remote = repo.get_branches().get(branch, {'remote': ''})['remote'] validPaths[k] = (directory, remote + '/' + branch) if separator: validPaths[k] = validPaths[k][0] + separator + validPaths[k][1] break except Exception as _excp: pass return validPaths
[docs] @staticmethod def submodules(go_deep=True, directory=None): """ This (static) method returns a dictionary with the list of submodules for each of the modules in a directory :param go_deep: include submodules of submodules :param directory: modules directory to use, by default the one of the repository where OMFIT is running :return: dictionary with submodule info for all modules in a directory """ if directory is None: directory = OMFITmodule.directories()[0] availableModulesList = OMFIT.availableModules(quiet=True, directories=tolist(directory)) submodules = {module['ID']: module['submodules'] for filename, module in availableModulesList.items()} while go_deep: go_deep = False for mod in submodules: for submod in submodules[mod]: for subsubmod in submodules[submod]: if subsubmod not in submodules[mod]: submodules[mod].append(subsubmod) go_deep = True return submodules
def __delitem__(self, key): super().__delitem__(key) # cannot delete __scratch__ if key == '__scratch__': self.insert(0, '__scratch__', OMFITtmp()) def __tree_repr__(self): if 'MODULE' not in self['SETTINGS'] or 'COMMENT' not in self['SETTINGS']['MODULE'] or not self.comment: return -1, [] else: return '--- ' + self.comment + ' ' + '-' * 300, []
[docs] def saveUserSettings(self, variant='__default__', locations=["['PHYSICS']"]): """ Save user settings in ~/.OMFIT/modulesSettings/ :param variant: variant name of the user setting to save :param locations: by default only save ['PHYSICS'] """ if self.ID is None: raise OMFITexception('Module ID must be set to save the module settings') filename = os.sep.join([_f for _f in [os.environ['HOME'], '.OMFIT', 'modulesSettings', self.ID] + [variant, 'OMFITsave.txt'] if _f]) tmp = copy.deepcopy(self['SETTINGS']) tmp = prune_mask(tmp, locations) try: orig = self['SETTINGS'] self['SETTINGS'] = tmp self._save(filename=filename, only="['SETTINGS']", quiet=True) finally: self['SETTINGS'] = orig printi('Saved user settings for module %s%s' % (self.ID, [' variant ' + variant, ''][variant == '__default__']))
[docs] def loadUserSettings(self, variant='__default__', diff=False): """ Load user settings in ~/.OMFIT/modulesSettings/ :param variant: variant name of the user setting to load :param diff: open a diff GUI to let users choose what to merge """ if self.ID is None: raise OMFITexception('Module ID must be set to load the module settings') filename = os.sep.join([_f for _f in [os.environ['HOME'], '.OMFIT', 'modulesSettings', self.ID] + [variant, 'OMFITsave.txt'] if _f]) if not os.path.exists(filename): printi('User settings for module %s%s not found' % (self.ID, [' variant ' + variant, ''][variant == '__default__'])) return tmp = OMFITtree(filename, quiet=True) if not diff: self['SETTINGS'].recursiveUpdate(tmp['SETTINGS'], overwrite=True) else: tmp['SETTINGS'].recursiveUpdate(self['SETTINGS'], overwrite=False) diffTreeGUI(self['SETTINGS'], tmp['SETTINGS']) printi('Loaded user settings for module %s%s' % (self.ID, [' variant ' + variant, ''][variant == '__default__']))
[docs] def listUserSettings(self, verbose=False): """ List user settings in ~/.OMFIT/modulesSettings/ """ if self.ID is None: raise OMFITexception('Module ID must be set to load the module settings') filename = os.sep.join([_f for _f in [os.environ['HOME'], '.OMFIT', 'modulesSettings', self.ID] if _f]) variants = [] for item in glob.glob(filename + os.sep + '*' + os.sep + 'OMFITsave.txt'): variants.append(os.path.split(os.path.split(item)[0])[1]) if verbose: printi('Users settings variants: %s' % variants) return variants
[docs] def deleteUserSettings(self, variant='__default__'): """ Delete user settings in ~/.OMFIT/modulesSettings/ :param variant: variant name of the user setting to delete """ if self.ID is None: raise OMFITexception('Module ID must be set to load the module settings') filename = os.sep.join([_f for _f in [os.environ['HOME'], '.OMFIT', 'modulesSettings', self.ID] + [variant] if _f]) if not os.path.exists(filename): printi('User settings for module %s%s not found' % (self.ID, [' variant ' + variant, ''][variant == '__default__'])) else: printi('Deleted user settings for module %s%s' % (self.ID, [' variant ' + variant, ''][variant == '__default__'])) shutil.rmtree(filename)
[docs] def experiment_location(self, *args): r""" Method that resolves the OMFITexpressions that are found in a module root['SETTINGS']['EXPERIMENT'] and returns the location that those expressions points to. :params \*args: list of keys to return the absolute location of :returns: dictionary with the absolute location the expressions point to """ rootName = treeLocation(self)[-1] root = eval(rootName) for item in args: if item not in root['SETTINGS']['EXPERIMENT']: raise AttributeError("%s is not in %s['SETTINGS']['EXPERIMENT']" % (item, rootName)) tmp = parseLocation(rootName) OMFITlocationName = [buildLocation(tmp[: k + 1]) for k, item in enumerate(tmp)] OMFITmodulesName = [] for tmpName in OMFITlocationName: if eval(tmpName).__class__ is OMFITmodule and tmpName != 'OMFIT': OMFITmodulesName.append(tmpName) OMFITmodules = list(map(eval, OMFITmodulesName)) locations = {} for item in args: locations[item] = rootName + "['SETTINGS']['EXPERIMENT']" for moduleName in reversed(OMFITmodulesName): if not isinstance(eval(moduleName)['SETTINGS']['EXPERIMENT'][item], OMFITexpression): locations[item] = moduleName + "['SETTINGS']['EXPERIMENT']" break if isinstance(eval(moduleName)['SETTINGS']['EXPERIMENT'][item], OMFITexpression): locations[item] = "OMFIT['MainSettings']['EXPERIMENT']" return locations
[docs] def experiment(self, *args, **kw): r""" method used to set the value of the entries under root['SETTINGS']['EXPERIMENT'] This method resolves the OMFITexpressions that are found in a module root['SETTINGS']['EXPERIMENT'] and sets the value at the location that those expressions points to. :params \**kw: keyword arguments with the values to be set :returns: root['SETTINGS']['EXPERIMENT'] """ locations = self.experiment_location(*list(kw.keys())) for item in list(locations.keys()): # Don't allow 0-D np arrays if isinstance(kw[item], np.ndarray) and len(kw[item].shape) == 0: kw[item] = np.atleast_1d(kw[item]) eval(locations[item])[item] = evalExpr(kw[item]) return self['SETTINGS']['EXPERIMENT']
[docs] def deploy_module(self, *args, **kw): r""" Method used to deploy a module in its format for being stored as part of a modules repository :param \*args: arguments passed to the deploy method :param \**kw: keywords arguments passed to the deploy method """ try: tmp = self['SETTINGS']['EXPERIMENT'] except Exception: raise else: # Always export EXPERIMENT settings that are in MainSettings as OMFITexpressions for item in list(OMFIT['MainSettings']['EXPERIMENT'].keys()): if item in self['SETTINGS']['EXPERIMENT']: self['SETTINGS']['EXPERIMENT'][item] = OMFITexpression( '''try:\n return_variable=OMFITmodules[-2]['SETTINGS']['EXPERIMENT']['{}']\nexcept Exception:\n return_variable=MainSettings['EXPERIMENT']['{}']\n'''.format( item, item ) ) # do not store these info for item in ['commit', 'edited_by', 'date', 'comment']: if item in self['SETTINGS']['MODULE']: del self['SETTINGS']['MODULE'][item] # make sure contacts are list if 'contact' in self['SETTINGS']['MODULE'] and self['SETTINGS']['MODULE']['contact'] is not None: self['SETTINGS']['MODULE']['contact'] = [str(k).strip('\'"') for k in tolist(self['SETTINGS']['MODULE']['contact'])] else: self['SETTINGS']['MODULE']['contact'] = [] self.deploy(*args, **kw) finally: self['SETTINGS']['EXPERIMENT'] = tmp
[docs] def convert_to_developer_mode( self, processSubmodules=True, modulesDir=os.path.abspath(OMFITsrc + os.sep + '..' + os.sep + 'modules'), operation='DEVEL', loadNewSettings=True, quiet=False, ): """ Convert scripts in this module to be modifyOriginal versions of the scripts under the moduleDir (so called `developer mode`) :param processSubmodules: bool convert scripts in the submodules :param modulesDir: string modules directory to use :param operation: string One of ['DEVEL', 'RELOAD', 'FREEZE'] DEVEL: reload scripts with modifyOriginal = True RELOAD: reload scripts with modifyOriginal = False FREEZE: set modifyOriginal = False :param loadNewSettings: bool Load new entries in the modules settings or not (ignored if operation=='FREEZE') :param quiet: bool Suppress print statements """ if operation == 'FREEZE': loadNewSettings = False # options options = SortedDict() options['Developer mode'] = 'DEVEL' options['Non-developer mode (RELOAD)'] = 'RELOAD' options['Non-developer mode (FREEZE)'] = 'FREEZE' valueOptions = flip_values_and_keys(options) # Identify modules if processSubmodules: modules = traverse(self, onlyDict=(OMFITmodule,), skipDynaLoad=True, noSubmodules=False) if modules != ['']: modules = [''] + modules else: modules = [''] printd('convert_to_developer_mode: {} modules = {}'.format(valueOptions[operation], modules)) # Loop over modules for module in modules: module_location = buildLocation(['self'] + parseLocation(module)[1:]) module_name = eval(module_location)['SETTINGS']['MODULE']['ID'] # Find _OMFITpython scripts as efficiently as possible locations = traverse( eval(module_location), onlyDict=(OMFITtree,), onlyLeaf=(_OMFITpython, OMFITsettings), skipDynaLoad=True, noSubmodules=True ) if not quiet: printi(valueOptions[operation] + ' under ' + self['SETTINGS']['MODULE']['ID'] + module_location[4:]) # Also module help as modifyOriginal if 'help' in eval(module_location): locations.append("['help']") # Parse the relative OMFITsave.txt file for the module try: with open(modulesDir + os.sep + module_name + os.sep + 'OMFITsave.txt', 'r') as f: lines = f.readlines() except IOError as _excp_: printe(modulesDir + os.sep + module_name + os.sep + 'OMFITsave.txt') printe(repr(_excp_)) continue # For each script for item in locations: if isinstance(eval(module_location + item), _OMFITpython) or ( loadNewSettings and isinstance(eval(module_location + item), OMFITsettings) ): # Find corresponding filename filename = None if operation == 'FREEZE': filename = eval(module_location + item).filename else: for line in lines: if line.startswith(item): filename = line.split(' <-:-:-> ')[2] if not filename.startswith(os.sep): filename = os.path.abspath(modulesDir + os.sep + module_name + os.sep + filename) break if filename is None: printi('skipped ' + item) continue # copy used later for updating settings backup = eval(module_location + item) # Reload script and set modifyOriginal accordingly try: eval(buildLocation(parseLocation(module_location + item)[:-1]))[parseLocation(item)[-1]] = eval( module_location + item ).__class__( filename, modifyOriginal=((operation == 'DEVEL') and isinstance(eval(module_location + item), _OMFITpython)) ) if not quiet: printi(item.ljust(max([len(x) for x in locations])), filename) except Exception as _excp_: printe(repr(_excp_)) continue # allow new settings if loadNewSettings and isinstance(eval(module_location + item), OMFITsettings): eval(module_location + item).recursiveUpdate(backup, overwrite=True)
[docs]class OMFIThelp(OMFITascii, SortedDict): """ Class used to parse OMFIT modules help.rst files """ def __init__(self, filename, **kw): OMFITascii.__init__(self, filename, **kw) SortedDict.__init__(self) self.dynaLoad = True
[docs] @dynaLoad def load(self): with open(self.filename, 'r') as f: hlp = f.read() # parse module help as manually written sections = re.sub('\n?(.*)\n-+\n', r'\n>->-> \1 <-<-<', hlp) sections = sections.split('>->->') sections = [_f for _f in [x.strip() for x in sections] if _f] section2 = SortedDict() for section in sections: sec_split = [x.strip() for x in section.split('<-<-<')] if len(sec_split) > 1: section2[sec_split[0]] = sec_split[1] else: section2[''] = sec_split[0] self.update(section2)
[docs] def verify(self): invalid = f"* `{os.path.split(self.filename)[1]}` module is an invalid OMFIT module help format" for section in ['Short Description', 'Keywords']: if section not in self: raise ValueError(invalid + f'\n {section} should always be there') self[section] = self[section].strip().strip('\n').strip('.') if '\n' in self[section]: raise ValueError(invalid + f'\n {section} should be on one line')
[docs] @dynaSave def save(self): try: self.verify() except ValueError as _excp: printe(_excp) printe(' Your changes to this file (if any) have not been saved!') return self._save_by_copy() else: with open(self.filename, 'w') as f: f.write(self.txt())
[docs] def txt(self): return '\n\n'.join(['%s\n%s\n%s' % (section, '-' * len(section), self[section]) for section in self]) + '\n'
[docs]def ismodule(module, module_ids): """ convenience function to check if an OMFITmodule is one of a given set of IDs :param module: module :param module_ids: list of module IDs to check for :return: True if the module ID matches one of the IDs in the module_ids list """ module_ids = tolist(module_ids) for module_id in module_ids: if isinstance(module, OMFITmodule) and module.ID == module_id: return True return False
[docs]class OMFITtmp(SortedDict, _OMFITnoSave): """ Same class of SortedDict, but is not saved when a project is saved This class is used to define the __scratch__ space under each module as well as the global OMFIT['scratch'] """ pass
[docs]class OMFITproject(OMFITtree): """Similar to OMFITtree class""" def __init__(self, filename='', **kw): OMFITtree.__init__(self, filename, **kw) if 'scratch' not in self: self['scratch'] = OMFITtmp()
[docs] def moduleDict(self, onlyModuleID=None, level=-1): """ Returns a dictionary of currently loaded modules :param onlyModuleID: string or list of strings of module ID to search for :param level: how many modules deep to go The dictionary contains the modules settings for each of the modules """ return self._moduleDict(onlyModuleID=onlyModuleID, level=level)
[docs] @staticmethod def info(filename=''): """ Returns dictionary with the saved project information. :param filename: filename of the project to return info about. If filename='' then returns info about the current project. Note that projects information are updated only when the project is saved. :return: dictionary with project infos """ if not filename: filename = OMFIT.filename if not filename: raise Exception('Must specify a filename') return_dict = {} if zipfile.is_zipfile(filename): omfitsavetxt = 'OMFITsave.txt' myzipfile = zipfile.ZipFile(filename, 'r', allowZip64=True) deflate_dir = OMFITcwd + os.sep + 'projects_preview' + os.sep if not os.path.exists(deflate_dir): os.makedirs(deflate_dir) infos = myzipfile.infolist() files = list([x.filename for x in infos]) projectName = os.path.commonprefix(files) myzipfile.extract(projectName + 'OMFITsave.txt', deflate_dir) myzipfile.extract(projectName + 'MainSettingsNamelist.txt', deflate_dir) isZip = True else: deflate_dir, omfitsavetxt = os.path.split(filename) projectName = os.path.split(deflate_dir)[1] + os.sep deflate_dir = os.path.split(deflate_dir)[0] isZip = False # OMFITsave with open(deflate_dir + os.sep + projectName + omfitsavetxt, 'r') as f: lines = f.readlines() # MainSettings return_dict['device'] = [] return_dict['shot'] = [] return_dict['time'] = [] return_dict['MainSettings'] = namelist.NamelistFile(deflate_dir + os.sep + projectName + 'MainSettingsNamelist.txt') for k in ['device', 'shot', 'shots', 'time', 'times']: try: return_dict[k.rstrip('s')].extend(tolist(return_dict['MainSettings']['EXPERIMENT'][k])) except Exception: pass for k in ['device', 'shot', 'time']: return_dict[k] = np.unique([_f for _f in return_dict[k] if _f]).tolist() # get modules module_sep = " <-:-:-> OMFITmodule <-:-:-> " module_location = [] return_dict['modules'] = OrderedDict() for line in lines: if module_sep in line: module_location.append(line.split('<-:-:->')[0].strip()) return_dict['modules'][module_location[-1]] = '' if module_location and f"{module_location[-1]}['SETTINGS'] <-:-:->" in line: filename = line.split('<-:-:->')[2].strip().replace('./', '') if isZip: myzipfile.extract(projectName + filename, deflate_dir) settings = OMFITsettings(deflate_dir + os.sep + projectName + filename) return_dict['modules'][module_location[-1]] = settings module_location.pop() # get notes, etc... (things are a bit clumsy here for backward compatibility) guisav = None for whereSave in ['__GUISAVE__', '__COMMANDBOX__']: guisav_sep = "['%s'] <-:-:-> pickle <-:-:-> " % whereSave for line in lines[::-1]: if line.startswith(guisav_sep): guisav = line.split(' <-:-:-> ')[2] if guisav is not None: break if guisav is not None: if isZip: myzipfile.extract(projectName + guisav[2:], deflate_dir) tmp = OMFITpickle(deflate_dir + os.sep + projectName + guisav[2:]) if isinstance(tmp, str): # -------------------------------------------------v0 return_dict['commands'] = tmp elif isinstance(tmp, dict): # -----------------------------------------------------v3 and final for item in tmp: if item not in return_dict: return_dict[item] = tmp[item] elif isinstance(tmp, tuple) and len(tmp) == 2: # ------------------------------------v1 return_dict['notes'], return_dict['commands'] = tmp elif isinstance(tmp, tuple) and len(tmp) == 3: # ------------------------------------v2 return_dict['notes'], return_dict['commands'], return_dict['console'] = tmp return_dict.setdefault('persistent_projectID', False) for item in OMFIT.prj_options_choices: return_dict.setdefault(item, '') if return_dict['color'] not in list(OMFIT.prj_options_choices['color'].keys()): return_dict['color'] = '' if return_dict['type'] not in OMFIT.prj_options_choices['type']: return_dict['type'] = '' return return_dict
[docs]class shotBookmarks(namelist.NamelistFile, _OMFITnoSave): def __init__(self, filename): if not os.path.exists(os.path.split(filename)[0]): os.makedirs(os.path.split(filename)[0]) if not os.path.exists(filename): with open(filename, 'w') as f: pass self.filename = filename namelist.NamelistFile.__init__(self, filename)
[docs] @dynaLoad def load(self, *args, **kw): namelist.NamelistFile.load(self, self.filename, *args, **kw) updated = False for device in list(self.keys()): for shot in list(self[device].keys()): if 'description' not in self[device][shot]: continue description = self[device][shot]['description'] del self[device][shot]['description'] if 'times' not in self[device][shot]: continue times = list(map(str, tolist(self[device][shot]['times']))) del self[device][shot]['times'] for time in times: self[device][shot][time] = description updated = True if updated: self.save()
OMFITshotBookmarks = shotBookmarks(OMFITsettingsDir + os.sep + 'OMFITshotBookmarks.txt')
[docs]class OMFITmainSettings(OMFITnamelist): """Contains system wide settings""" def __init__(self, filename=None): super().__init__(filename, noCopyToCWD=True)
[docs] def load(self, *args, **kw): # load the namelist super().load() # sort keys self.sort() # make sure 'shots' and 'times' are lists if 'EXPERIMENT' in self: for item in ['shots', 'times']: if item in self['EXPERIMENT']: if self['EXPERIMENT'][item] is None: self['EXPERIMENT'][item] = [] elif isinstance(self['EXPERIMENT'][item], (int, float)): self['EXPERIMENT'][item] = [self['EXPERIMENT'][item]]
[docs] def sort(self): if 'SERVER' in self: self['SERVER'].sort() self['SERVER'].sort_class([dict]) self['SERVER'].sort(reverse=True, key=lambda x: x.endswith('_username'))
def __tree_repr__(self): if id(self) == id(OMFIT['MainSettings']): return -1, ['MainSettings'] else: return super().__tree_repr__()
[docs]class OMFITlazyLoad: """ OMFIT class that imports xarray datasets with dynamic loading """ def __init__(self, location, tp, filename, tree_repr=None, **kw): self.location = location self.tp = tp self.filename = os.path.abspath(filename) self.link = self.filename self.tree_repr = tree_repr self.kw = kw self.__loaded__ = special1 self.dynaLoad = True def __lazy_load__(self, verbose=True): if self.__loaded__ is special1: self.dynaLoad = False if isinstance(verbose, str): printi('LazyLoad [%s]: %s' % (verbose, self.location)) elif verbose: printi('LazyLoad: %s' % self.location) # location = parseLocation(treeLocation(self)[-1]) # need to fix OMFITlist before switching to this on location = parseLocation(self.location) eval(buildLocation(location[:-1]))[location[-1]] = self.__loaded__ = eval(self.tp)(self.filename, **self.kw) return self.__loaded__ def __getattr__(self, attr): if attr in ['modifyOriginal', 'readOnly']: if attr in self.kw: return self.kw[attr] else: raise AttributeError('bad attribute `%s`' % attr) tmp = self.__lazy_load__() return getattr(tmp, attr) def __tree_repr__(self): if self.tree_repr: return [tree_repr, []] else: return [self.cls, []] def __iter__(self): tmp = self.__lazy_load__() return tmp.__iter__() def __getitem__(self, key): tmp = self.__lazy_load__(key) return tmp[key] def __deepcopy__(self, memo={}): tmp = self.__lazy_load__() return copy.deepcopy(tmp)
[docs] def load(self): tmp = self.__lazy_load__() if self.tp != 'OMFITpickle': return tmp.load() else: return tmp
[docs] def save(self): if os.path.abspath(self.link) != os.path.abspath(self.filename): if os.path.isdir(self.link): shutil.copytree(self.link, self.filename) else: shutil.copy2(self.link, self.filename)
[docs] def save_as(self, filename): self.filename = os.path.abspath(filename) directory = os.path.split(os.path.abspath(filename))[0] if os.path.exists(directory) == 0: os.makedirs(directory) self.save()
[docs] def deploy(self, filename): if os.path.abspath(filename) != os.path.abspath(self.filename): directory = os.path.split(os.path.abspath(filename))[0] if os.path.exists(directory) == 0: os.makedirs(directory) if os.path.isdir(self.filename): shutil.copytree(self.filename, filename) else: shutil.copy2(self.filename, filename)
def __save_kw__(self): return self.kw @property def cls(self): mapper = {'importDataset': 'Dataset', 'pandas_read_json': 'DataFrame'} return mapper.get(self.tp, self.tp)
# --------------------- # OMFIT types # --------------------- global OMFITtypes, OMFITtypesStr, OMFITdictypes, OMFITdictypesStr OMFITtypes = [] OMFITtypesStr = [] OMFITdictypes = [] OMFITdictypesStr = [] # this method must be called to update the list of types def _updateTypes(): try: # this try-except is used for loading omfit_base as a standalone OMFIT class (without requiring omfit_mds as a dependency) from omfit_classes.omfit_mds import OMFITmdsObjects extra_objects = [OMFITmdsObjects] except Exception: extra_objects = [] global OMFITtypes, OMFITtypesStr, OMFITdictypes, OMFITdictypesStr # OMFITtypes inherit from OMFITobject OMFITtypes[:] = [OMFITlazyLoad] for _itemName in list(globals().keys()): _item = eval(_itemName) if _itemName.startswith('OMFIT') and inspect.isclass(_item) and _itemName[0] != '_' and issubclass(_item, OMFITobject): OMFITtypes.append(_item) OMFITtypesStr[:] = [re.sub(r"\<.*'(.*)'\>", r'\1', item.strip(')(')).split('.')[-1] for item in str(OMFITtypes).strip('[]').split(',')] # OMFITdictypes inherit from SortedDict or OMFITmdsObjects and are not OMFITtypes OMFITdictypes[:] = [] for _itemName in list(globals().keys()): _item = eval(_itemName) if ( inspect.isclass(_item) and _itemName[0] != '_' and issubclass(_item, tuple([SortedDict] + extra_objects)) and _item not in OMFITtypes ): OMFITdictypes.append(_item) OMFITdictypesStr[:] = [re.sub(r"\<.*'(.*)'\>", r'\1', item.strip(')(')).split('.')[-1] for item in str(OMFITdictypes).split(',')] _updateTypes() def itemTagsValues(meKid, expressionsExpression=False, dictionaryContent=False, showHidden=False, treeview=None, parent_tags=[]): def get_bases(clss, tp): bases = getattr(clss, '__bases__') if not len(bases): return None else: for item in bases: tp.append(item.__name__) get_bases(item, tp) invisible = '' separator = '-' * 300 tags = ['treeItem', 'editableTreeItem'] # this allows expression to be treated as if they were the value itself if isinstance(meKid, OMFITexpression) and not expressionsExpression: tags.append(meKid.__class__.__name__) meKid = meKid._value_() if isinstance(meKid, dict): tags.append('dictTreeItem') # get the data type and its bases if hasattr(meKid, '__class__'): tp = meKid.__class__.__name__ else: tp = 'oldStyleClass' values = (meKid, tp) tags.append(tp) old_dynaLoad_switch = OMFITaux['dynaLoad_switch'] OMFITaux['dynaLoad_switch'] = False try: # refine values and tags if possible if hasattr(meKid, '__class__') and not inspect.isclass(meKid): # add all of the tags on which the data type is based on get_bases(meKid.__class__, tags) if isinstance(meKid, OMFITexpression): # this is used in GUIs when expressionsExpression=True values = (meKid.expression, tp) # setup values and tags depending on the data type elif hasattr(meKid, '__tree_repr__'): values, tag = meKid.__tree_repr__() if values is None: values = invisible elif values is -1: values = separator if isinstance(values, str): values = (values, tp) else: values = (treeText(values, False, -1, True), tp) tags = tag + tags elif isinstance(meKid, dict) and dictionaryContent: values = (treeText(list(meKid.keys()), False, -1, True), tp) else: values = (treeText(meKid, True, 500, True), tp) tags.extend(parent_tags) tags_hash = omfit_hash(str(tuple(tags)), 10) # generate unique tag for each individual set of treeview visualization option # this is to avoid some visualization issues that depend on the underlying version of TK if treeview is not None: if tags_hash not in treeview.single_tags: # handle modifyOriginal, readOnly, modifyOriginal_readOnly tags = unsorted_unique(tags) if len([tag for tag in tags if tag in ['modifyOriginal', 'readOnly', 'modifyOriginal_readOnly']]) >= 2: i = max([tags.index(tag) for tag in tags if tag in ['modifyOriginal', 'readOnly', 'modifyOriginal_readOnly']]) if 'modifyOriginal_readOnly' in tags: tags.remove('modifyOriginal_readOnly') tags.insert(i, 'modifyOriginal_readOnly') if 'modifyOriginal' in tags: tags.remove('modifyOriginal') if 'readOnly' in tags: tags.remove('readOnly') tags_config = {} for k in tags: tmp = treeview.tag_configure(k) for tag in tmp: if (tag not in tags_config and tmp[tag]) or tag in ['this', 'other']: tags_config[tag] = tmp[tag] treeview.tag_configure(tags_hash, **tags_config) treeview.single_tags.append(tags_hash) tags = [tags_hash] + list([tag + '_action' for tag in tags]) return tags, list(values) except Exception as _excp: printe('Error in loading tree representation:\n' + repr(_excp)) return ['OMFITerror'], ('--error loading tree representation--', tp) finally: OMFITaux['dynaLoad_switch'] = old_dynaLoad_switch # --------------------- # Statistics # ---------------------
[docs]def omfit_log(action, details): if not os.path.exists(str(OMFIT['MainSettings']['SETUP']['stats_file'])): try: with open(str(OMFIT['MainSettings']['SETUP']['stats_file']), "w") as f: pass except Exception: pass try: if os.access(str(OMFIT['MainSettings']['SETUP']['stats_file']), os.W_OK): header = ( os.environ['USER'] + ' ' + utils_base.now("%Y/%m/%d %H_%M_%S") + ' ' + repo_active_branch_commit + ' ' + repo_active_branch ) with open(str(OMFIT['MainSettings']['SETUP']['stats_file']), "a") as f: f.write(header + ' ' + action + ' ' + details + '\n') else: raise except Exception: warnings.warn('Unable to write to stats file: ' + str(OMFIT['MainSettings']['SETUP']['stats_file'])) provenanceID = '' if 'EXPERIMENT' in OMFIT['MainSettings'] and 'provenanceID' in OMFIT['MainSettings']['EXPERIMENT']: provenanceID = OMFIT['MainSettings']['EXPERIMENT']['provenanceID'] try: payload = { 'version': repo_active_branch_commit, 'institution': OMFIT['MainSettings']['SETUP']['institution'], 'action': action, 'details': details, 'provenanceID': provenanceID, '_ip_address': '?', '_date': '?', } host, port = evalExpr(SERVER['gadb-harvest']['HARVEST_server']).split(':') harvest_send(payload, '+omfit_stats_2', host=host, port=int(port)) except Exception as _excp: printt('Exception: Failed send_db: ' + repr(_excp))
# ------------------- # diff GUI functions # NOTE: these functions rely on tk and will not be available outside of the framework # -------------------
[docs]def diffTreeGUI( this, other, thisName='Original', otherName='Compared to', resultName='Final result', title=None, diffSubModules=True, precision=0.0, description=False, tellDescription=False, deepcopyOther=True, skipClasses=(), noloadClasses=(), flatten=False, allow_merge=True, always_show_GUI=False, order=True, favor_my_order=False, modify_order=False, ): """ Function used to compare two dictionary objects and manage their merge :param this: reference object (the one where data will be written to in case of merge) :param other: secondary object to compare to :param thisName: ['Original'] representation of reference object in the GUI :param otherName: ['Compared to'] representation of secondary object in the GUI :param resultName: ['Final result'] representation of result :param title: [None] GUI title :param diffSubModules: [True] do diff of sumbmodules or not. If `None` only compare module ID and its DEPENDENCIES. :param precision: [0.0] relative precision to which to compare objects :param description: [False] commit to be input by the user :param tellDescription: [False] show description or not :param deepcopyOther: [True] Whether to perform internally `other=copy.deepcopy(other)` to avoid `diffTreeGUI` to modify the original content :param skipClasses: () tuple containing classes to skip :param noloadClasses: () tuple containing classes to not load :param flatten: whether to flatten the data in nested dictionaries :param allow_merge: whether to allow merging of data :param always_show_GUI: whether to show GUIs even if there are no differences :param order: order of the keys matters :param favor_my_order: favor my order of keys :param modify_order: update order of input dictionaries based on diff :return: (switch,False/True,keys) * switch is a dictionary which lists all of the differences * False/True will be used to keep track of what the user wants to switch between dictionaries * keys is used to keep the keys of the merged dictionary in order """ if flatten: this = this.flatten() other = other.flatten() allow_merge = False elif deepcopyOther: other = copy.deepcopy(other) # for how the merge is done (flipping), I need to make a copy of the `other` object if not diffSubModules: remove_submodules(this, keepModuleAndDependencies=(diffSubModules is None)) remove_submodules(other, keepModuleAndDependencies=(diffSubModules is None)) switch = this.diff( other, skipClasses=skipClasses, noloadClasses=noloadClasses, ignoreComments=True, precision=precision, order=order, favor_my_order=favor_my_order, modify_order=modify_order, ) def set_merge_button_text(count): if count == 0: merge_button_text.set('Nothing to merge: press <spacebar> to select items to merge') else: merge_button_text.set('Merge selected %d items' % count) def switcher(onlyThisNode=False): location = parseBuildLocation(treeGUI.selection()[0]) tree = switch[0] for item in location[:-1]: tree = tree[item][0] def set(a, val): if isinstance(a[0], SortedDict): a[1] = val if not onlyThisNode: for kid in a[0]: set(a[0][kid], val) else: a[1] = val set(tree[location[-1]], not tree[location[-1]][1]) count = f_traverse(switch[0], '') set_merge_button_text(count) treeGUI.update_idletasks() def f_traverse(me, myLocation='', myLocationSwitch=''): count = 0 for kid in me: if isinstance(me[kid][0], SortedDict): state = 'changed_sub' else: state = me[kid][0] kidName = "[" + repr(kid) + "]" entryID = myLocation + kidName entryIDswitch = myLocationSwitch + '[0]' + kidName try: this_tags, this_values = itemTagsValues( eval('this' + entryID), expressionsExpression=True, dictionaryContent=True, showHidden=False, treeview=treeGUI ) except Exception: this_tags = [] this_values = ['', ''] try: other_tags, other_values = itemTagsValues( eval('other' + entryID), expressionsExpression=True, dictionaryContent=True, showHidden=False, treeview=treeGUI ) except Exception: other_tags = [] other_values = ['', ''] if createTree: tags = [] if allow_merge: values = (this_values[0], other_values[0], this_values[0]) else: if not len(set(this_tags).difference(set(other_tags))): tags.extend(this_tags) values = (this_values[0], other_values[0]) treeGUI.insert(myLocation, tk.END, entryID, text=treeText(kid, False, -1, False), values=values, tags=tuple(tags)) values = list(treeGUI.item(entryID, 'values')) if allow_merge: if eval('switch' + entryIDswitch)[1]: values = (values[0], values[1], values[1], '|') # other tags = [state, 'mergeable_action', 'other'] tags.extend(other_tags) count += 1 else: values = (values[0], values[1], values[0], '_') # this tags = [state, 'mergeable_action', 'this'] tags.extend(this_tags) if state in ['added', 'changed', 'removed', 'order']: treeGUI.item(entryID, tags=tuple(tags), values=values) elif state == 'changed_sub': count += f_traverse(me[kid][0], entryID, entryIDswitch) treeGUI.item(entryID, tags=tuple(tags), values=('', '', '', values[3])) else: if state == 'changed_sub': count += f_traverse(me[kid][0], entryID, entryIDswitch) treeGUI.item(entryID, tags=tuple(tags), values=('', '')) else: treeGUI.item(entryID, tags=tuple(tags), values=values) # this is just to prevent the garbage collector to delete these variables this, other, switch return count def merger(this_, other_, switch_, flipCompare=True): this_k = 0 other_k = 0 for kid in switch_[2]: if kid in this_: this_k += 1 if kid in other_: other_k += 1 if kid in switch_[0]: # go deeper in dictionaries if isinstance(switch_[0][kid][0], SortedDict): if kid not in this_: this_[kid] = SortedDict() if kid not in other_: other_[kid] = SortedDict() # go deeper if switch_[0][kid][1] == flipCompare: merger(this_[kid], other_[kid], switch_[0][kid], not flipCompare) else: merger(this_[kid], other_[kid], switch_[0][kid], flipCompare) # merge content if switch_[0][kid][1] == flipCompare: printi("Merging: " + str(kid)) # sorting of the content if len(switch_[0][kid]) > 2: this_[kid].sort(key=lambda x: switch_[0][kid][2].index(x) if x in switch_[0][kid][2] else 1e10) other_[kid].sort(key=lambda x: switch_[0][kid][2].index(x) if x in switch_[0][kid][2] else 1e10) if kid not in other_: other_.insert(other_k, kid, this_[kid]) del this_[kid] elif kid not in this_: this_.insert(this_k, kid, other_[kid]) del other_[kid] else: tmp = this_[kid] this_.insert(this_k, kid, other_[kid]) other_.insert(other_k, kid, tmp) def diffDetail(event): entryID = treeGUI.identify_row(event.y) try: eval('this' + entryID) thisMisses = False except Exception: thisMisses = True try: eval('other' + entryID) otherMisses = False except Exception: otherMisses = True if (thisMisses or isinstance(eval('this' + entryID), str) or eval('this' + entryID).__class__ is OMFITexpression) and ( otherMisses or isinstance(eval('other' + entryID), str) or eval('other' + entryID).__class__ is OMFITexpression ): if thisMisses: thisFile = None elif eval('this' + entryID).__class__ is OMFITexpression: thisFile = eval('this' + entryID).expression elif isinstance(eval('this' + entryID), str): thisFile = eval('this' + entryID) else: return if otherMisses: otherFile = None elif eval('other' + entryID).__class__ is OMFITexpression: otherFile = eval('other' + entryID).expression elif isinstance(eval('other' + entryID), str): otherFile = eval('other' + entryID) else: return diffViewer(top, thisString=thisFile, otherString=otherFile, thisName=thisName, otherName=otherName, title=entryID) elif (thisMisses or isinstance(eval('this' + entryID), OMFITascii)) and ( otherMisses or isinstance(eval('other' + entryID), OMFITascii) ): if thisMisses: thisFile = None else: thisFile = eval('this' + entryID).filename if otherMisses: otherFile = None else: otherFile = eval('other' + entryID).filename diffViewer(top, thisFile, otherFile, thisName=thisName, otherName=otherName, title=entryID) elif (thisMisses or isinstance(eval('this' + entryID), np.ndarray)) and ( otherMisses or isinstance(eval('other' + entryID), np.ndarray) ): if thisMisses: thisArray = np.tile(np.nan, eval('other' + entryID).shape) else: thisArray = eval('this' + entryID) if otherMisses: otherArray = np.tile(np.nan, eval('this' + entryID).shape) else: otherArray = eval('other' + entryID) # cleverly choose whether it is better to compare in 1d or 2d when there is squeezable dimension(s). thisArray = np.squeeze(thisArray) otherArray = np.squeeze(otherArray) s1 = len(thisArray.shape) s2 = len(otherArray.shape) if s1 == 1 and s2 == 1: fig = pyplot.figure() fig.add_subplot(1, 2, 1) pyplot.plot(thisArray, 'b.-', label=thisName) pyplot.plot(otherArray, 'gx-', label=otherName) pyplot.title('Compare arrays') fig.add_subplot(1, 2, 2) pyplot.plot(np.linspace(0, 1, thisArray.size), thisArray, 'b.-', label=thisName) pyplot.plot(np.linspace(0, 1, otherArray.size), otherArray, 'gx-', label=otherName) pyplot.legend(labelspacing=0.2, loc=0).draggable(state=True) pyplot.title('Compare arrays (x from 0 to 1)') elif s1 == 2 and s2 == 2: fig = pyplot.figure() fig.add_subplot(1, 2, 1) if not thisMisses: pyplot.contour(thisArray, 20, colors='b', label=thisName) if not otherMisses: pyplot.contour(otherArray, 20, colors='g', label=thisName) pyplot.title('Compare 2D arrays') fig.add_subplot(1, 2, 2) pyplot.plot(np.nan, 'b.-', label=thisName) if not thisMisses: pyplot.contour( np.linspace(0, 1, thisArray.shape[1]), np.linspace(0, 1, thisArray.shape[0]), thisArray, 20, colors=pyplot.gca().lines[-1].get_color(), ) pyplot.plot(np.nan, 'gx-', label=otherName) if not otherMisses: pyplot.contour( np.linspace(0, 1, otherArray.shape[1]), np.linspace(0, 1, otherArray.shape[0]), otherArray, 20, colors=pyplot.gca().lines[-1].get_color(), ) pyplot.legend(labelspacing=0.2, loc=0).draggable(state=True) pyplot.title('Compare 2D arrays (x and y from 0 to 1)') # these are here to prevent the garbage collector to remove these variables from the namespace this, other if not always_show_GUI and not len(switch[0]): printi('No differences found between `%s` and `%s`' % (thisName, otherName)) if description or isinstance(description, str): return {}, None else: return {} merge_button_text = tk.StringVar() top = tk.Toplevel(OMFITaux['rootGUI']) top.withdraw() top.transient(OMFITaux['rootGUI']) top.geometry(str(int(OMFITaux['rootGUI'].winfo_width() * 8.0 / 9)) + "x" + str(int(OMFITaux['rootGUI'].winfo_height() * 8.0 / 9))) if allow_merge: top.wm_title('Tree diff/merge') else: top.wm_title('Tree diff') if not (title is None): ttk.Label(top, text=title, font=OMFITfont('bold', 2)).pack(side=tk.TOP, expand=tk.NO, fill=tk.X, padx=10) if allow_merge: ttk.Label( top, text='<space> toggle merge, <double-click> diff of strings, ASCII files and arrays, <Esc> abort merging', justify=tk.LEFT, anchor=tk.W, ).pack(side=tk.TOP) treeGUI = tk.Treeview(top) treeGUI.frame.pack(side=tk.TOP, expand=tk.YES, fill=tk.BOTH, padx=5, pady=5) if allow_merge: treeGUI["columns"] = ('this', 'other', 'merge', 'switch') displaycolumns = ['merge', 'this', 'other', 'switch'] else: treeGUI["columns"] = ('this', 'other') displaycolumns = ['this', 'other'] if thisName is None: displaycolumns.remove('this') if otherName is None: displaycolumns.remove('other') if allow_merge and resultName is None: displaycolumns.remove('merge') treeGUI["displaycolumns"] = tuple(displaycolumns) treeGUI.column("#0", minwidth=150, width=180, stretch=False) treeGUI.column("this", minwidth=180, width=180, stretch=True) treeGUI.column("other", minwidth=180, width=180, stretch=True) treeGUI.heading("#0", text="location") treeGUI.heading("this", text=thisName) treeGUI.heading("other", text=otherName) if allow_merge: treeGUI.column("switch", minwidth=20, width=20, stretch=False, anchor=tk.CENTER) treeGUI.heading("switch", text="") treeGUI.column("merge", minwidth=180, width=180, stretch=False) treeGUI.heading("merge", text=resultName) global_event_bindings.add( 'COMPARE/MERGE: toggle entry and sub-entries', treeGUI, '<space>', lambda event=None: treeGUI.after(1, switcher()), tag="mergeable", ) global_event_bindings.add( 'COMPARE/MERGE: toggle single entry', treeGUI, '<Shift-space>', lambda event=None: treeGUI.after(1, switcher(True)), tag="mergeable", ) def onDouble(event): diffDetail(event) def onMerge(event=None): merger(this, other, switch) OMFITaux['GUI'].update_treeGUI_and_GUI() anySwitched.set(True) if description is not False: _description.append(desc.get(1.0, tk.END).strip()) top.destroy() def onEscape(event=None): anySwitched.set(False) top.destroy() _description = [] if tellDescription is not False: desc = askDescription(top, tellDescription, 'Description:', showInsertDate=False, showHistorySeparate=False) desc.config(state=tk.DISABLED) elif description is not False: desc = askDescription(top, description, 'Commit message:', showInsertDate=False, showHistorySeparate=False) if allow_merge: desc.bind(f'<{ctrlCmd()}-Return>', onMerge) global_event_bindings.add('COMPARE/MERGE: show difference details', treeGUI, '<Double-1>', onDouble) if allow_merge: ttk.Button(top, textvar=merge_button_text, command=onMerge).pack(side=tk.TOP, expand=tk.NO, fill=tk.X) top.bind('<Return>', onMerge) top.bind('<KP_Enter>', onMerge) else: top.bind('<Return>', onEscape) top.bind('<KP_Enter>', onEscape) top.bind('<Escape>', onEscape) if 'thome' in OMFIT['MainSettings']['SETUP']['email']: ttk.Button(top, text=['Close', 'Cancel'][allow_merge], command=onEscape).pack(side=tk.TOP, expand=tk.NO, fill=tk.X) anySwitched = tk.BooleanVar() createTree = True count = f_traverse(switch[0], '') set_merge_button_text(count) createTree = False top.protocol("WM_DELETE_WINDOW", top.destroy) top.update_idletasks() top.deiconify() top.wait_window(top) if not anySwitched.get(): if description is not False: return None, None else: return None else: if description is not False: return switch, _description[0] else: return switch
[docs]def exportTreeGUI(this, title=None, description=False): # returns [switch,False/True,keys] # switch is a dictionary which lists all of the differences # False/True will be used to keep track of what the user wants to switch between dictionaries # keys is used to keep the keys of the merged dictionary in order # create an empty structure def s_traverse(me): switch = SortedDict() for key in list(me.keys()): if isinstance(me[key], OMFITmodule): for item in list(me[key].keys()): if item == 'SETTINGS': for sett in list(me[key]['SETTINGS'].keys()): if sett not in ['MODULE', 'DEPENDENCIES']: del me[key]['SETTINGS'][sett] elif sett == 'MODULE': for k in list(me[key]['SETTINGS']['MODULE'].keys()): if k not in ['ID']: del me[key]['SETTINGS']['MODULE'][k] elif not isinstance(me[key][item], OMFITmodule): del me[key][item] # deleting items will prevent them to show up if key == '__scratch__': del me[key] # _OMFITnoSave and OMFITobject items that are not OMFITnamelist will be showed in their entirety elif isinstance(me[key], _OMFITnoSave) or ( isinstance(me[key], OMFITobject) and not isinstance(me[key], OMFITnamelist) and not isinstance(me[key], OMFITsettings) ): switch[key] = ['bla', True] elif isinstance(me[key], SortedDict): switch[key] = s_traverse(me[key]) else: switch[key] = ['bla', True] return [switch, True, list(me.keys())] switch = s_traverse(this) def switcher(onlyThisNode=False): location = parseBuildLocation(treeGUI.selection()[0]) tree = switch[0] for item in location[:-1]: tree = tree[item][0] def set(a, val): if isinstance(a[0], SortedDict): a[1] = val if not onlyThisNode: for kid in a[0]: set(a[0][kid], val) else: a[1] = val set(tree[location[-1]], not tree[location[-1]][1]) # if the last thing that I switched was "other" # then I should set to "other" also the parents if tree[location[-1]][1]: tree = switch[0] for item in location[:-1]: tree[item][1] = True tree = tree[item][0] f_traverse(switch[0], '') treeGUI.update_idletasks() def deleter(this_, switch_, flipCompare=True): for this_k, kid in enumerate(switch_[2]): if kid in switch_[0]: if isinstance(switch_[0][kid][0], SortedDict): if switch_[0][kid][1] != flipCompare: deleter(this_[kid], switch_[0][kid], not flipCompare) else: deleter(this_[kid], switch_[0][kid], flipCompare) if switch_[0][kid][1] != flipCompare: printi("Deleter: " + str(kid)) del this_[kid] def f_traverse(me, myLocation='', myLocationSwitch=''): for kid in me: kidName = "[" + repr(kid) + "]" entryID = myLocation + kidName entryIDswitch = myLocationSwitch + '[0]' + kidName try: this_tags, this_values = itemTagsValues( eval('this' + entryID), expressionsExpression=True, showHidden=False, treeview=treeGUI ) except Exception: this_tags = [] this_values = [''] if createTree: treeGUI.insert(myLocation, tk.END, entryID, text=treeText(kid, False, -1, False), values=(this_values[0],)) if me[kid][1]: tags = ['exportable_action', 'other'] else: tags = ['exportable_action', 'this'] tags.extend(this_tags) treeGUI.item(entryID, tags=tuple(tags)) if isinstance(me[kid][0], SortedDict): f_traverse(me[kid][0], entryID, entryIDswitch) this, switch top = tk.Toplevel(OMFITaux['rootGUI']) top.withdraw() top.transient(OMFITaux['rootGUI']) top.geometry(str(int(OMFITaux['rootGUI'].winfo_width() * 8.0 / 9)) + "x" + str(int(OMFITaux['rootGUI'].winfo_height() * 8.0 / 9))) top.wm_title('Export tree') if not (title is None): ttk.Label(top, text=title, font=OMFITfont('bold', 2)).pack(side=tk.TOP, expand=tk.NO, fill=tk.X, padx=10) ttk.Label( top, text='<space> toggle export, <Shift-space> toggle export single entry, <Esc> to abort', justify=tk.LEFT, anchor=tk.W ).pack(side=tk.TOP) treeGUI = tk.Treeview(top) treeGUI.frame.pack(side=tk.TOP, expand=tk.YES, fill=tk.BOTH, padx=5, pady=5) treeGUI["columns"] = ('value', 'dummy') displaycolumns = ['value'] treeGUI["displaycolumns"] = tuple(displaycolumns) treeGUI.column("#0", minwidth=150, width=180, stretch=False) treeGUI.column("value", minwidth=180, width=180, stretch=True) treeGUI.column("dummy", minwidth=180, width=180, stretch=True) treeGUI.heading("#0", text="location") treeGUI.heading("value", text='value') global_event_bindings.add( 'COMPARE/MERGE: toggle entry and sub-entries', treeGUI, '<space>', lambda event=None: treeGUI.after(1, switcher()), tag="exportable" ) global_event_bindings.add( 'COMPARE/MERGE: toggle single entry', treeGUI, '<Shift-space>', lambda event=None: treeGUI.after(1, switcher(True)), tag="exportable", ) def onExport(event=None): deleter(this, switch) OMFITaux['GUI'].update_treeGUI_and_GUI() anySwitched.set(True) if description is not False: _description.append(desc.get(1.0, tk.END).strip()) top.destroy() def onEscape(event=None): anySwitched.set(False) top.destroy() _description = [] if description is not False: desc = askDescription(top, '', 'Description:', showInsertDate=False) desc.bind(f'<{ctrlCmd()}-Return>', onExport) ttk.Button(top, text='Export highlighted entries', command=onExport).pack(side=tk.TOP, expand=tk.NO, fill=tk.X) if 'thome' in OMFIT['MainSettings']['SETUP']['email']: ttk.Button(top, text='Cancel', command=onEscape).pack(side=tk.TOP, expand=tk.NO, fill=tk.X) top.bind('<Return>', onExport) top.bind('<KP_Enter>', onExport) top.bind('<Escape>', onEscape) anySwitched = tk.BooleanVar() createTree = True f_traverse(switch[0], '') createTree = False top.protocol("WM_DELETE_WINDOW", top.destroy) top.update_idletasks() top.deiconify() top.wait_window(top) if not anySwitched.get(): if description is not False: return None, None else: return None else: if description is not False: return switch, _description[0] else: return switch
[docs]def diffViewer( root, thisFilename=None, otherFilename=None, thisName='Original', otherName='New', title=None, thisString=None, otherString=None ): """ Present a side by side visual comparison of two strings :param root: A tk master GUI """ if thisFilename is None and thisString is None: this = '' elif thisFilename is not None: with open(thisFilename, 'r') as f: this = f.read().splitlines(1) elif thisString is not None: this = str(thisString).splitlines(1) if otherFilename is None and otherString is None: other = '' elif otherFilename is not None: with open(otherFilename, 'r') as f: other = f.read().splitlines(1) elif otherString is not None: other = str(otherString).splitlines(1) inputDiffText = list(difflib.Differ(linejunk=None).compare(this, other)) def yview(*args): textA.yview(*args) textB.yview(*args) def mouse_wheel(event): # respond to Linux or Windows wheel event if event.num == 5 or event.delta == -120: yview('scroll', 1, 'units') elif event.num == 4 or event.delta == 120: yview('scroll', -1, 'units') return 'break' top = tk.Toplevel(root) top.withdraw() top.transient(root) top.geometry(str(int(OMFITaux['rootGUI'].winfo_width() * 8.0 / 9)) + "x" + str(int(OMFITaux['rootGUI'].winfo_height() * 8.0 / 9))) if title is None: top.wm_title('ASCII files diff') else: top.wm_title(title) # GUI items labelA = ttk.Label(top, text=thisName, font=OMFITfont('bold', 2)) labelB = ttk.Label(top, text=otherName, font=OMFITfont('bold', 2)) textA = tk.Text(top, relief=tk.SOLID, borderwidth=1, font=OMFITfont('normal', 0, 'Courier'), wrap=tk.NONE) sb = ttk.Scrollbar(top, command=yview, orient='vertical') textB = tk.Text(top, relief=tk.SOLID, borderwidth=1, font=OMFITfont('normal', 0, 'Courier'), wrap=tk.NONE) sbA = ttk.Scrollbar(top, command=textA.xview, orient='horizontal') sbB = ttk.Scrollbar(top, command=textB.xview, orient='horizontal') # arrange GUI items top.rowconfigure(1, weight=1) top.columnconfigure(0, weight=1) top.columnconfigure(2, weight=1) labelA.grid(column=0, row=0, sticky='ew') labelB.grid(column=2, row=0, sticky='ew') textA.grid(column=0, row=1, sticky='nsew') sb.grid(column=1, row=1, sticky='ns') textB.grid(column=2, row=1, sticky='nsew') sbA.grid(column=0, row=2, sticky='ew') sbB.grid(column=2, row=2, sticky='ew') # synchronized scrolling textA['xscrollcommand'] = sbA.set textB['xscrollcommand'] = sbB.set textA['yscrollcommand'] = sb.set textB['yscrollcommand'] = sb.set for k in ["<Button-4>", "<Button-5>", "<MouseWheel>"]: textA.bind(k, mouse_wheel) textB.bind(k, mouse_wheel) # colors for text in textA, textB: text.tag_configure('+', background='DarkSeaGreen1') text.tag_configure('-', background='RosyBrown2') text.tag_configure('?', background='light blue') text.tag_configure(' ', background='white') text.tag_configure('/', background='gray85') text.tag_configure('#', foreground='gray50') # differences offset = 2 lines = 0 lineA = 1 lineB = 1 lno_offset = len(str(len(inputDiffText))) + 1 # for the colon lno_format = '{:0%sd}:{}' % (lno_offset - 1) # for the colon def format_lno(text_obj, line_no): for ki in range(lno_offset): text_obj.tag_add('#', '%s.%s' % (line_no, ki)) for kk in range(len(inputDiffText)): item = inputDiffText[kk] if item[0] == '+': textA.insert('end', '\n', '/') textB.insert('end', lno_format.format(lineB, item[offset:]), '+') lineB += 1 lastText = textB notLastText = textA lines += 1 format_lno(textB, lines) elif item[0] == '-': textA.insert('end', lno_format.format(lineA, item[offset:]), '-') lineA += 1 textB.insert('end', '\n', '/') lastText = textA notLastText = textB lines += 1 format_lno(textA, lines) elif item[0] == '?': for k, c in enumerate(item[offset:]): if c in ['-', '+', '^']: lastText.tag_add('?', str(lines) + '.' + str(k + lno_offset)) else: textA.insert('end', lno_format.format(lineA, item[offset:])) lineA += 1 textB.insert('end', lno_format.format(lineB, item[offset:])) lineB += 1 lines += 1 format_lno(textA, lines) format_lno(textB, lines) textA.config(state=tk.DISABLED) textB.config(state=tk.DISABLED) top.bind('<Escape>', lambda event=None: top.destroy()) top.protocol("WM_DELETE_WINDOW", top.destroy) top.update_idletasks() top.deiconify() top.wait_window(top)
[docs]def askDescription(parent, txt, label, showInsertDate=False, showHistorySeparate=False, expand=0, scrolledTextKW={}): top = ttk.Frame(parent) top.pack(side=tk.TOP, padx=5, expand=expand, fill=tk.BOTH) lblFrame = ttk.Frame(top) lblFrame.pack(side=tk.TOP, padx=5, expand=tk.NO, fill=tk.X) ttk.Label(lblFrame, text=label, justify=tk.LEFT, anchor=tk.W).pack(side=tk.LEFT, expand=tk.NO, fill=tk.X) def onInsertDate(msglist): desc.insert('insert', ' '.join(msglist)) desc.see('insert') desc.focus_set() def onInsertDateLong(event=None): onInsertDate(['-->>>', utils_base.now(), os.environ['USER'], '<<<--\n']) def onInsertDateShort(event=None): onInsertDate([utils_base.now(), '|| ']) if showInsertDate: ttk.Label(lblFrame, text=' ', justify=tk.LEFT, anchor=tk.W).pack(side=tk.LEFT, expand=tk.NO, fill=tk.X) bsho = ttk.Button(lblFrame, text="Insert user/date/time (long)", command=onInsertDateLong) bsho.pack(side=tk.LEFT, padx=5, pady=5) blon = ttk.Button(lblFrame, text="Insert date/time (short)", command=onInsertDateShort) blon.pack(side=tk.LEFT, padx=5, pady=5) descFrame = ttk.Frame(top) descFrame.pack(side=tk.TOP, padx=5, expand=tk.YES, fill=tk.BOTH) desc = tk.ScrolledText(descFrame, height=8, undo=tk.TRUE, maxundo=-1, **scrolledTextKW) desc.tag_configure('historic', foreground='dark slate gray') if showHistorySeparate: hist = tk.ScrolledText(descFrame, height=8, undo=tk.TRUE, maxundo=-1) hist.tag_configure('historic', foreground='dark slate gray') where = hist desc.grid(column=0, row=0, sticky='nsew') hist.grid(column=1, row=0, sticky='nsew') descFrame.columnconfigure(0, weight=1) descFrame.columnconfigure(1, weight=1) descFrame.rowconfigure(0, weight=1) else: where = desc desc.pack(side=tk.LEFT, expand=tk.YES, fill=tk.BOTH) if isinstance(txt, str): txt = txt.strip() if len(txt): where.insert(1.0, txt.strip() + '\n\n', 'historic') where.see('insert') if showHistorySeparate: hist.configure(state=tk.DISABLED) def insertReturn(event=None): desc.insert('insert', '\n') desc.see('insert') return 'break' desc.bind('<Return>', insertReturn) desc.bind('<KP_Enter>', insertReturn) desc.frame = top return desc
def remove_submodules(module, keepModuleAndDependencies=True): """ This function removes all submodules from the current module (operates in place) :param module: module to cleanup :param keepModuleAndDependencies: whether to keep the MODULE and DEPENDENCIES entries of the submodules SETTINGS """ for key in list(module.keys()): if isinstance(module[key], OMFITmodule) and keepModuleAndDependencies: for item in list(module[key].keys()): if item == 'SETTINGS': for sett in list(module[key]['SETTINGS'].keys()): if sett not in ['MODULE', 'DEPENDENCIES']: del module[key]['SETTINGS'][sett] elif sett == 'MODULE': for k in list(module[key]['SETTINGS']['MODULE'].keys()): if k not in ['ID']: del module[key]['SETTINGS']['MODULE'][k] elif not isinstance(module[key][item], OMFITmodule): del module[key][item] elif isinstance(module[key], OMFITmodule) and not keepModuleAndDependencies: del module[key] continue # deleting items will prevent them to show up elif key == '__scratch__': del module[key] continue # skipping items will prevents pruning further down elif isinstance(module[key], _OMFITnoSave) or (hasattr(module[key], 'dynaLoad') and module[key].dynaLoad): continue if isinstance(module[key], SortedDict): remove_submodules(module[key], keepModuleAndDependencies=keepModuleAndDependencies) ############################################ if '__main__' == __name__: test_classes_main_header() with open(OMFITsrc + '/../regression/test_expressions.py') as f: exec(compile(f.read(), OMFITsrc + '/../regression/test_expressions.py', 'exec'))