Source code for omfit_classes.OMFITx

print('Loading OMFIT APIs...')

try:
    # framework is running
    from .startup_choice import *

    # Don't do this if building the documentation
    if not any(('sphinx' in k and not 'sphinxcontrib' in k) for k in sys.modules):
        # here we do a `from pylab import *` to allow GUIs to evaluate user input
        from pylab import *
except ImportError as _excp:
    # class is imported by itself
    if (
        'attempted relative import with no known parent package' in str(_excp)
        or 'No module named \'omfit_classes\'' in str(_excp)
        or "No module named '__main__.startup_choice'" in str(_excp)
    ):
        from startup_choice import *
    else:
        raise

from omfit_classes import utils_base
from omfit_classes.utils_base import _streams
from omfit_classes.omfit_base import *
from omfit_classes.omfit_data import *
from omfit_classes import omfit_mds
from omfit_classes.omfit_mds import *
from omfit_classes.omfit_ascii import OMFITascii
from omfit_classes.omfit_python import _OMFITpython
from omfit_classes.omfit_environment import OMFITenv
from omfit_classes.omfit_weblink import openInBrowser

import tkinter as tk
from tkinter import ttk
from glob import glob as _glob
import numpy as np
import re
import copy
import subprocess
from collections.abc import Callable as CollectionsCallable

# Decorators @_available_to_user... are used to define which functions should appear in the OMFIT documentation
def _available_to_userTASK(f):
    OMFITaux.setdefault('OMFITxTASK_functions', [])
    OMFITaux['OMFITxTASK_functions'].append(f)
    OMFITaux['OMFITxTASK_functions'].sort(key=lambda x: str(x).lower())
    return f


def _available_to_userGUI(f):
    OMFITaux.setdefault('OMFITxGUI_functions', [])
    OMFITaux['OMFITxGUI_functions'].append(f)
    OMFITaux['OMFITxGUI_functions'].sort(key=lambda x: str(x).lower())
    return f


# This dictionary is used to keep track of the open GUIs
_GUIs = {}

# This dictionary is used to keep track of the inner workings of the active GUI
_aux = {}
_aux['topGUI'] = None
_aux['open_tabs'] = {}
_aux['parentGUI'] = None
_aux['is_compoundGUI'] = False
_aux['notebook'] = None
_aux['compoundGUIid'] = None
_aux['compoundGUIcounter'] = None
_aux['tab_list'] = {}
_aux['tab_name'] = ''
_aux['configure_size'] = []
_aux['harvest'] = {}
_aux['same_row'] = None
_aux['packing'] = 'top'  # tk.TOP # we use 'top' instead of 'tk.top' to avoid importing tk environment when only OMFIT classes are loaded

# ------------------------------------
# Nice representation of floats
# ------------------------------------
_originalPrintOptions = np.get_printoptions()


[docs]def repr(value): ''' repr modified to work for GUI functions ''' if xarray is not None and isinstance(evalExpr(value), DataArray): value = value.values if isinstance(value, np.ndarray) and len(value.shape) == 0: value = np.atleast_1d(value)[0] np.set_printoptions(formatter={'float': repr}) try: if isinstance(evalExpr(value), float): if np.isnan(evalExpr(value)): value = 'nan' elif np.isinf(evalExpr(value)): if np.sign(evalExpr(value)) > 0: value = 'inf' else: value = '-inf' else: value = '{:.15g}'.format(value) if '.' not in value and 'e' not in value: tmp = re.sub('[1-9]', 'x', value[::-1]) if 'x' in tmp: index = int((tmp.index('x')) // 3 * 3) if index != 0: value = value[:-index] + 'e{:d}'.format(index) else: value = value + '.0' else: value = value + '.0' elif isinstance(value, np.ndarray) and len(value.shape) == 1: value = '[' + ', '.join(map(repr, np.atleast_1d(value))) + ']' elif isinstance(value, uncertainties.UFloat): value = f'ufloat({value.n},{value.s})' else: value = builtins.repr(value) finally: np.set_printoptions(**_originalPrintOptions) return value
[docs]def repr_eval(location, preentry=None, collect=False): ''' evaluate location and provide representation ''' value = _eval(location) if preentry is None: def preentry(val): return val if not isinstance(location, str) and collect: if np.all(np.atleast_1d(value) == value[0]): value = value[0] return repr(preentry(value))
# ------------------------------------ # GUI management # ------------------------------------
[docs]class GUI(object): """ This class creates a new GUI. It is used internally by OMFIT when a OMFITpythonGUI object is executed :param pythonFile: OMFITpythonGUI object to be executed :return: None """ def __init__(self, pythonFile, relativeLocations, **kw): if OMFITaux['rootGUI'] is None: raise Exception('OMFIT GUI elements can only run within full OMFIT graphical framework') top = tk.Toplevel(OMFITaux['rootGUI']) top.withdraw() top.transient(OMFITaux['rootGUI']) top.protocol("WM_DELETE_WINDOW", lambda top=top: _clearClosedGUI(top)) top.wm_title(treeLocation(pythonFile)[-1]) # register GUI _GUIs[str(top)] = self self.top = top self.pythonFile = pythonFile self.notebooks = {} self.kw = kw self.locked = [] self.relativeLocations = relativeLocations try: thereWasError = True self.update() top.update_idletasks() tk_center(top, parent=OMFITaux['rootGUI']) if len(self.parentGUI.winfo_children()): top.deiconify() top.lift() thereWasError = False except Exception: raise finally: if thereWasError: _clearClosedGUI(top)
[docs] def update(self): try: _aux['topGUI'] = top = self.top _aux['compoundGUIid'] = str(id(self.pythonFile)) _aux['compoundGUIcounter'] = 0 # if there was a notebook, store the info about what tabs were selected _aux['open_tabs'] = {} for cg, nb in self.notebooks.items(): index = nb.tabs().index(nb.select()) _aux['open_tabs'][cg] = index if hasattr(self, 'parentGUI'): # this is executed for GUI redraws # delete everything inside the existing scrollable canvas _clearKids(self.parentGUI) _clearKids(self.prefFrame) canvas = self.canvas taskGUIframeInterior = self.taskGUIframeInterior interior_id = self.interior_id yscrollbar = self.yscrollbar prefFrame = self.prefFrame else: # this is executed at the first GUI creation # create an empty scrollable canvas top.grid_rowconfigure(0, weight=1) top.grid_columnconfigure(0, weight=1) self.yscrollbar = yscrollbar = ttk.Scrollbar(top) yscrollbar.grid(row=0, column=1, sticky=tk.N + tk.S + tk.W) self.canvas = canvas = tk.Canvas(top, bd=0, yscrollcommand=yscrollbar.set) canvas.grid(row=0, column=0, sticky=tk.N + tk.S + tk.E + tk.W) yscrollbar.config(command=canvas.yview) self.taskGUIframeInterior = taskGUIframeInterior = ttk.Frame(canvas) taskGUIframeInterior.pack(side=tk.LEFT, expand=tk.YES, fill=tk.BOTH) self.interior_id = interior_id = canvas.create_window(0, 0, anchor=tk.NW, window=taskGUIframeInterior) self.parentGUI = taskGUIframeInterior # preference window self.prefFrame = prefFrame = ttk.Frame(top) prefFrame.grid(row=0, column=2, sticky=tk.N + tk.S) # mousewheel scroll on users GUI def mouse_wheel(event): # respond to Linux or Windows wheel event if event.num == 5 or event.delta == -120: canvas.yview('scroll', 1, 'units') if event.num == 4 or event.delta == 120: canvas.yview('scroll', -1, 'units') return 'break' top.bind("<MouseWheel>", mouse_wheel) top.bind("<Button-4>", mouse_wheel) top.bind("<Button-5>", mouse_wheel) taskGUIframeInterior.pack_propagate(0) # #add setupModule buttons # if self.relativeLocations['root'] is not OMFIT: # try: # _setupModule(prefFrame) # except Exception as _excp: # printe('Error in setupModule GUI: '+repr(_excp)) # initialize things for compound GUIs, tabs, notebooks and configuration of sizes _aux['parentGUI'] = self.parentGUI _aux['is_compoundGUI'] = False _aux['notebook'] = None _aux['tab_name'] = '' _aux['tab_list'] = {} _aux['tab_list'][''] = _aux['parentGUI'] _aux['configure_size'] = [] old_same_row = _aux['same_row'] _aux['same_row'] = None old_packing = _aux['packing'] _aux['packing'] = tk.TOP _GUIs[str(_aux['topGUI'])].notebooks = {} _GUIs[str(_aux['topGUI'])].locked = [] # header GUI headerGUI = self.kw.pop('headerGUI', '') if not isinstance(headerGUI, str): headerGUI = headerGUI.read() if headerGUI: self.kw['prerun'] = self.kw.get('prerun', '') + '\n' + headerGUI # footer GUI footerGUI = self.kw.pop('footerGUI', '') if not isinstance(footerGUI, str): footerGUI = footerGUI.read() if footerGUI: self.kw['postrun'] = self.kw.get('postrun', '') + '\n' + footerGUI # execute the main pythonGUI self.kw['compoundGUI'] = False self.pythonFile.__run__(**self.kw) _aux['same_row'] = old_same_row _aux['packing'] = old_packing # handle resizing def configure_size(): canvas.unbind('<Configure>') canvas.update_idletasks() canvas.configure(scrollregion=(0, 0, top.winfo_width(), taskGUIframeInterior.winfo_reqheight())) GUIheight = min([taskGUIframeInterior.winfo_reqheight(), int(OMFITaux['rootGUI'].winfo_height() * 0.9)]) GUIwidth = max( [taskGUIframeInterior.winfo_reqwidth(), top.winfo_width() - yscrollbar.winfo_reqwidth() - prefFrame.winfo_reqwidth()] ) canvas.itemconfigure(interior_id, width=GUIwidth, height=taskGUIframeInterior.winfo_reqheight()) canvas.configure(width=GUIwidth, height=GUIheight) top.configure(width=GUIwidth + yscrollbar.winfo_reqwidth() + prefFrame.winfo_reqwidth(), height=GUIheight) for item, action in _aux['configure_size']: try: item.bind('<Configure>', lambda event: action()) except tk.TclError: pass _aux['configure_size'] = [] canvas.update_idletasks() canvas.bind('<Configure>', lambda event: configure_size()) taskGUIframeInterior.pack_propagate(1) configure_size() OMFITaux['rootGUI'].event_generate("<<update_treeGUI>>") except Exception: # if anything goes wrong close the GUI and raise the error _clearClosedGUI(self.top) raise
def _clearClosedGUI(top): """ This method is used internally by OMFIT to clear the data of GUIs which have been closed by users :param top: TkInter ID of the closed GUI :return: None """ if str(top) in _GUIs: del _GUIs[str(top)] try: _clearKids(top) top.destroy() except Exception: pass def _clearKids(top): """ Recursive cleanup of all elements under TkInter ID :param top: TkInter ID of the parent whose kids must be cleared :return: None """ try: top.winfo_children() except Exception: return for kid in top.winfo_children(): _clearKids(kid) for kid in top.winfo_children(): try: kid.pack_forget() except Exception: pass kid.destroy() def _absLocation(location): """ This method is used internally by OMFIT to translate relative locations used in a OMFITpythonGUI script to absolute locations in the OMFIT tree. :param location: string with relative/absolute location in the OMFIT tree :return: absolute location string in the OMFIT tree """ return absLocation(location, relativeLocations(_GUIs[str(_aux['topGUI'])].pythonFile), base_is_relativeLocations_output=True)
[docs]@_available_to_userGUI def inputbox(prompt='Input box'): ''' Open a Dialog box to prompt for user input. Note: this is a blocking call. Return input string if user chooses submit and None otherwise ''' result = Dialog(entries={'Input': ''}, message=prompt, answers=['Submit', 'Cancel'], icon="question", title='Waiting for user input') if result[0] == 'Submit': return result[1]['Input']
[docs]@_available_to_userGUI def UpdateGUI(top=None): """ Function used to update users GUIs :param top: TopLevel tk GUI to be updated """ if top is None: topList = [_GUIs[k].top for k in list(_GUIs.keys())] else: topList = [top] for _aux['topGUI'] in topList: _GUIs[str(_aux['topGUI'])].update()
[docs]@_available_to_userGUI def Refresh(): ''' Force a refresh of the OMFIT GUI by issuing a TkInter .update() ''' if OMFITaux['GUI'] is not None and not len(OMFITaux['prun_process']): OMFITaux['console'].flush()
[docs]@_available_to_userGUI def CloseAllGUIs(): """ Function for closing all users GUIs """ for k in list(_GUIs.keys()): top = _GUIs[k].top _clearClosedGUI(top)
def _topGUI(item=None): """ Function to reach the TopLevel tk from a GUI element :param item: tk GUI element :return: TopLevel tk GUI """ if item is None: item = _aux['parentGUI'] return TKtopGUI(item) def _tk_ttk_process_kw(type, kw): ''' converts tk configure attributes to ttk styles :param type: one of the ttk_styles :param kw: tk configure instructions :return: custom ttk style ''' if not len(kw) or 'style' in kw: if 'style' in kw and type not in kw['style']: kw['style'] = kw['style'] + '.' + type return kw if 'fg' in kw: kw['foreground'] = kw.pop('fg') if 'bg' in kw: kw['background'] = kw.pop('bg') style_name = custom_ttk_style(type, **kw) kw.pop('foreground', None) kw.pop('background', None) kw['style'] = style_name return kw def _ttk_tk_process_kw(kw): if 'foreground' in kw: kw['fg'] = kw.pop('foreground') return kw # --------------------------- # GUI elements # --------------------------- def _harvest_experiment_info(extra_info={}): ''' harvest experiment info ''' from omfit_classes.omfit_harvest import harvest_send project = OMFIT.prj_options for item in list(_aux['harvest'].keys()): if OMFIT.filename is not None and len(OMFIT.filename): if 'device' in _aux['harvest'][item]: _aux['harvest'][item]['_tag'] = tokamak(_aux['harvest'][item]['device']) for k in ['color', 'type', 'notes']: if k in project: _aux['harvest'][item]['_' + k] = project[k] _aux['harvest'][item]['project'] = OMFIT.filename _aux['harvest'][item]['user'] = os.environ['USER'] _aux['harvest'][item].update(extra_info) for k in _aux['harvest'][item]: try: _aux['harvest'][item][k] = evalExpr(_aux['harvest'][item][k]) except Exception: printd('Error harvesting %s' % k, topic='harvest') host, port = evalExpr(SERVER['gadb-harvest']['HARVEST_server']).split(':') harvest_send(_aux['harvest'][item], 'omfit_experiment', host=host, protocol='UDP') del _aux['harvest'][item] def _for_each_collection(location): loc = parseLocation(location) where = [loc.pop(0)] loc0 = buildLocation(where) for k in range(1, len(loc)): where += [loc.pop(0)] loc0 = buildLocation(where) if isinstance(eval(loc0), OMFITcollection) and loc[0] not in eval(loc0).KEYS(): break if isinstance(eval(loc0), OMFITcollection): locs = [] for k in eval(loc0).KEYS(): locs.append(buildLocation(where + [k] + loc)) return locs, True return location, False def _setDefault(location, default=special1): """ This method is used internally by OMFIT to set the default values of GUI elements :param location: location in the OMFIT tree (can be multiple locations) :param default: Default value (multiple defaults if multiple locations) :return: None """ default_at_import = False if default is special1 and isinstance(location, str): try: default = eval(location.replace('SETTINGS', '__SETTINGS_AT_IMPORT__')) default_at_import = True except KeyError: pass if default is not special1 and not ( not isinstance(location, str) and np.iterable(location) and np.iterable(default) and len(default) and next(default.__iter__()) is special1 ): updated = False if isinstance(location, str): try: eval(location) except Exception: OMFIT.addBranchPath(location) eval(buildLocation(parseLocation(location)[:-1]))[parseLocation(location)[-1]] = default updated = True elif np.iterable(location): if len(location) != len(default): raise Exception('GUI error: Length of defaults must equal length of locations') for k, loc in enumerate(location): try: eval(loc) except Exception: OMFIT.addBranchPath(loc) eval(buildLocation(parseLocation(loc)[:-1]))[parseLocation(loc)[-1]] = default[k] updated = True return default, default_at_import def _reveal(location=None, lbl=None, help=None): ''' Reveals path/value of OMFIT GUI elements :param location: location (already pre-processed by _absLocation) :param lbl: human readeable description :param help: help ''' if lbl is None: lbl = '' lbl = tolist(lbl) if isinstance(location, _OMFITpython): location = relativeLocations(location)['thisName'] elif callable(location): location_str = str(location) if '<lambda' in location_str: location_str = 'Calls lambda function defined as:\n' + inspect.getsource(location) if '<function' in location_str: location_str = 'Calls function defined as:\n' + inspect.getsource(location) location = location_str locations = tolist(location) lbls = lbl * len(locations) for lbl, location in zip(lbls, locations): loc = str(location) if OMFITcwd in loc: loc = os.path.split(loc)[1] value = '' if location.startswith('OMFIT['): try: if eval(location) is None or isinstance(eval(location), (int, float, str)): value = ' = ' + repr(eval(location)) except Exception: pass printi('* %s --> %s%s' % (lbl, loc, value)) if help is not None and len(help): printi('== HELP ==') printi(' ' + '\n '.join(help.split('\n'))) def _eval(location): ''' Same as eval() but works also when location is a list of locations ''' multiple = not isinstance(location, str) locations = np.atleast_1d(location).tolist() for k, location in enumerate(locations): locations[k] = eval(location) if multiple: return locations else: return locations[0] def _Entry( parent, location, lbl=None, updateGUI=False, help='', preentry=None, postcommand=None, check=special1, collect=False, reveal_location=None, **kw, ): r""" Private method used to add entry GUI elements The background of the GUI gets colored green/red depending on whether the input by the user is a valid Python entry :param parent: parent TkInter object :param location: location in the OMFIT tree to be updated (it's a string) :param lbl: text on the left of the entry box :param updateGUI: whether the whole GUI should be redrawn when the value is updated :param help: string to be printed when the user right-clicks on the element :param preentry: function to pre-process the data at the OMFIT location to be displayed in the entry GUI element :param postcommand: function to post-process what the user has entered in the entry GUI element :param check: function that returns whether what the user has entered in the entry GUI element is a valid entry This will make the background colored yellow, and users will not be able to set the value. :param \**kw: keywords passed to the ttk.Entry object :return: associated ttk.Entry object """ from utils_widgets import OMFITfont kw = _tk_ttk_process_kw('TEntry', kw) value = tk.StringVar() oldValueRepr = dict(val=repr_eval(location, preentry, collect=collect)) e = ttk.Entry(parent, textvariable=value, takefocus=False, **kw) e.configure(font=OMFITfont()) e.pack(side=tk.LEFT, expand=tk.YES, fill=tk.X) value.set(repr_eval(location, preentry, collect=collect)) if Lock(location, checkLock=True): e.config(state=tk.DISABLED) else: def set_location(location, tmp): if isinstance(eval(tmp), list) and isinstance(eval(location), np.ndarray) and len(eval(location).shape) == 1: tmp = 'np.atleast_1d(' + tmp + ')' exec(location + "=" + tmp, globals(), locals()) def update_tree_values(location, updateGUI): tmp = checkKeyPressed() if tmp[0] is False: printe('Invalid Python entry: ' + value.get()) return elif tmp[0] is None and tmp[1] is None: printe('Tree entry ' + location + ' should not be set as: ' + value.get()) return elif tmp[0] is None: printe('Tree entry ' + location + ' should be ' + tmp[1]) return # valid entry - lets use it tmp = value.get() e.state(['!invalid', '!alternate', '!active']) # return style to default (remove color etc.) e.config(style='TEntry') if not isinstance(location, str): if len(tolist(eval(tmp))) > 1: for k, loc in enumerate(location): set_location(loc, tmp + "[%d]" % k) else: for loc in location: set_location(loc, tmp) else: set_location(location, tmp) oldValueRepr['val'] = repr_eval(location, preentry, collect=collect) if postcommand is not None: manage_user_errors(lambda: postcommand(location=location), reportUsersErrorByEmail=True) try: if isinstance(location, str): eval(location) except KeyError: # this is done to handle `delete_if_default` option or any post-command which deletes the entry pass else: oldValueRepr['val'] = repr_eval(location, preentry, collect=collect) OMFITaux['rootGUI'].event_generate("<<update_treeGUI>>") if updateGUI: OMFITaux['rootGUI'].event_generate("<<update_GUI>>") def checkKeyPressed(): try: try: tmp = repr_eval(value.get(), collect=collect) if not tmp == oldValueRepr['val']: # new value if check is not special1: if not check(_eval(tmp)): e.state(['invalid', '!alternate', '!active']) e.config(style='check.TEntry') else: e.state(['!invalid', '!alternate', 'active']) e.config(style='valid.TEntry') else: e.state(['!invalid', '!alternate', 'active']) e.config(style='valid.TEntry') else: # no change in value e.state(['!invalid', '!alternate', '!active']) e.config(style='TEntry') except Exception as _excp: e.state(['alternate', '!invalid', '!active']) e.config(style='error.TEntry') # printe('Error: ' + repr(_excp) + '\nIs this a valid Python entry?\nDid you forget to add string quotations?') # too much printing if e.instate(['alternate']): return False, None elif e.instate(['invalid']): tmp = None if 'is_' == check.__name__[:3]: tmp = check.__name__[3:] return None, tmp else: return True, None except Exception: pass def escape(): value.set(oldValueRepr['val']) checkKeyPressed() if check is not special1: if not check(_eval(value.get())): e.config(background='goldenrod1') e.bind("<Return>", lambda event: update_tree_values(location, updateGUI)) e.bind("<KP_Enter>", lambda event: update_tree_values(location, updateGUI)) e.bind("<Key>", lambda event: e.after(1, checkKeyPressed)) e.bind("<<virtualKey>>", lambda event: checkKeyPressed()) e.bind("<Escape>", lambda event: e.after(1, escape)) if reveal_location is None: reveal_location = location e.bind(f"<{rightClick}>", lambda event: _reveal(reveal_location, lbl, help)) return e, value def _Label(parent, lbl, **kw): """ Private method used to add labels and comments :param parent: parent TkInter object :param lbl: text in the label :return: associated ttk.Label object """ kw = _tk_ttk_process_kw('TLabel', kw) lbl = ttk.Label(parent, text=lbl, **kw) lbl.config(justify=tk.LEFT) return lbl def _Text(parent, location, lbl=None, updateGUI=False, help='', preentry=None, postcommand=None, reveal_location=None, **kw): kw = _ttk_tk_process_kw(kw) top = tk.Toplevel(parent) top.transient(parent) top.wm_title(lbl) e = tk.ScrolledText(top, wrap=tk.NONE, undo=tk.TRUE, maxundo=-1, relief=tk.GROOVE, border=1, height=12, takefocus=False, **kw) e.pack(side=tk.TOP, expand=tk.YES, fill=tk.BOTH, padx=5, pady=5) if isinstance(eval(location), str): if preentry is None: e.set(eval(location)) else: e.set(preentry(eval(location))) elif isinstance(eval(location), np.ndarray): if preentry is None: e.set(repr(eval(location))) else: e.set(repr(preentry(eval(location)))) # allow line splitting of multi item lists in multiline edit option of Entry elif isinstance(eval(location), list): e.set(repr_eval(location, preentry).replace('],', '],\n')) else: e.set(repr_eval(location, preentry)) if Lock(location, checkLock=True): e.config(state=tk.DISABLED) else: def update_tree_values(location, updateGUI): tmp = e.get() if isString.get(): tmp = repr(tmp) else: if isinstance(eval(tmp), list) and isinstance(eval(location), np.ndarray) and len(eval(location).shape) == 1: tmp = 'np.atleast_1d(' + tmp + ')' exec(location + "=" + tmp, globals(), locals()) if postcommand is not None: manage_user_errors(lambda: postcommand(location=location), reportUsersErrorByEmail=True) OMFITaux['rootGUI'].event_generate("<<update_treeGUI>>") if updateGUI: OMFITaux['rootGUI'].event_generate("<<update_GUI>>") top.destroy() frm = ttk.Frame(top) frm.pack(side=tk.TOP, expand=tk.YES, fill=tk.BOTH) isString = tk.BooleanVar() if isinstance(eval(location), str): isString.set(True) ttk.Checkbutton(frm, text="is string", variable=isString, takefocus=False).pack(side=tk.LEFT, expand=tk.NO, padx=5) update_button = ttk.Button(frm, text="Update", command=lambda: update_tree_values(location, updateGUI), takefocus=False) update_button.pack(side=tk.LEFT, expand=tk.YES, fill=tk.X, padx=5) top.bind("<Escape>", lambda event: top.destroy()) if reveal_location is None: reveal_location = location e.bind(f"<{rightClick}>", lambda event: _reveal(reveal_location, lbl, help)) return e def _helpButton(master, help): if help: ttk.Button( master, text='?', command=lambda help=help: helpTip.showtip(help, move=False, strip=True), style='flat.TButton', takefocus=False ).pack(side=tk.LEFT, expand=tk.NO, fill=tk.NONE, padx=0, pady=0) def _urlButton(master, url): if url: if '://' not in url: url = 'http://' + url ttk.Button(master, text='w', command=lambda url=url: openInBrowser(url), takefocus=False, style='flat.TButton').pack( side=tk.LEFT, expand=tk.NO, fill=tk.NONE, padx=0, pady=0 )
[docs]@_available_to_userGUI def CompoundGUI(pythonFile, title=None, **kw): """ This method allows the creation of nested GUI. :param pythonFile: is meant to be an OMFITpythonGUI object in the OMFIT tree :param title: title to appear in the compound GUI frame. If None, the location of the `pythonFile` object in the OMFIT tree will be shown. If an empty string, the compound GUI title is suppressed. :return: None """ from omfit_classes.omfit_python import OMFITpythonGUI from utils_widgets import OMFITfont if isinstance(pythonFile, str): pythonFile = eval(pythonFile) if not isinstance(pythonFile, OMFITpythonGUI): raise Exception('GUI scripts must be declared as OMFITpythonGUI objects') tmp = _GUIs[str(_aux['topGUI'])].pythonFile tmp_parentGUI = ( _aux['parentGUI'], _aux['notebook'], _aux['tab_name'], _aux['tab_list'], _aux['same_row'], _aux['packing'], _aux['compoundGUIid'], ) try: _GUIs[str(_aux['topGUI'])].pythonFile = pythonFile bw = 2 if title is not None and not len(title): bw = 0 _aux['parentGUI'] = ttk.Frame(_aux['parentGUI'], borderwidth=bw, relief=tk.GROOVE) _aux['parentGUI'].pack(side=_aux['packing'], expand=tk.NO, fill=tk.BOTH, padx=bw, pady=bw) _aux['compoundGUIcounter'] += 1 _aux['compoundGUIid'] += '(%d)@' % _aux['compoundGUIcounter'] + _aux['tab_name'] _aux['same_row'] = None _aux['packing'] = tk.TOP _aux['notebook'] = None _aux['tab_name'] = '' _aux['tab_list'] = {} _aux['tab_list'][''] = _aux['parentGUI'] if title is None: Label(treeLocation(_GUIs[str(_aux['topGUI'])].pythonFile)[-1], font=OMFITfont('bold', -2)) elif len(title.strip()): Label(title, font=OMFITfont('bold', -2)) tmp_is_compoundGUI = _aux['is_compoundGUI'] _aux['is_compoundGUI'] = True kw.setdefault('compoundGUI', True) out_namespace = pythonFile.__run__(**kw) _aux['is_compoundGUI'] = tmp_is_compoundGUI return out_namespace finally: ( _aux['parentGUI'], _aux['notebook'], _aux['tab_name'], _aux['tab_list'], _aux['same_row'], _aux['packing'], _aux['compoundGUIid'], ) = tmp_parentGUI _GUIs[str(_aux['topGUI'])].pythonFile = tmp
[docs]@_available_to_userGUI def Tab(name=''): """ This method creates a Tab under which the successive GUI elements will be placed :param name: Name to assign to the TAB :return: None """ _aux['tab_name'] = name # if there was no notebook create one if _aux['notebook'] is None and len(name): _aux['notebook'] = ttk.Notebook(_aux['parentGUI'], takefocus=False) _aux['notebook'].pack(side=tk.TOP, expand=tk.YES, fill=tk.X) _GUIs[str(_aux['topGUI'])].notebooks[_aux['compoundGUIid']] = _aux['notebook'] # if the name is not in the tab, then add it if name not in _aux['tab_list']: _aux['tab_list'][name] = ttk.Frame(_aux['notebook']) _aux['notebook'].add(_aux['tab_list'][name], text=name) # if there was a notebook, try to re-select the tabs which were selected if len(_aux['open_tabs']): try: _aux['notebook'].select(_aux['open_tabs'][_aux['compoundGUIid']]) # printi(name,_aux['compoundGUIid'], _aux['open_tabs'][_aux['compoundGUIid']]) except Exception as _excp: pass # printe(name,_aux['compoundGUIid'], _aux['open_tabs'][_aux['compoundGUIid']], repr(_excp)) # activate the tab _aux['parentGUI'] = _aux['tab_list'][name]
[docs]@_available_to_userGUI def Entry( location, lbl=None, comment=None, updateGUI=False, help='', preentry=None, postcommand=None, check=special1, multiline=False, norm=None, default=special1, delete_if_default=False, url='', kwlabel={}, **kw, ): """ This method creates a GUI element of the entry type The background of the GUI gets colored green/red depending on whether the input by the user is a valid Python entry :param location: location in the OMFIT tree (notice that this is a string) :param lbl: Label which is put on the left of the entry :param comment: A comment which appears on top of the entry :param updateGUI: Force a re-evaluation of the GUI script when this parameter is changed :param help: help provided when user right-clicks on GUI element (adds GUI button) :param preentry: function to pre-process the data at the OMFIT location to be displayed in the entry GUI element :param postcommand: command to be executed after the value in the tree is updated. This command will receive the OMFIT location string as an input :param check: function that returns whether what the user has entered in the entry GUI element is a valid entry. This will make the background colored yellow, and users will not be able to set the value. :param default: Set the default value if the tree location does not exist (adds GUI button) :param delete_if_default: Delete tree entry if the value is the default value :param multiline: Force display of button for multiple-line text entry :param norm: normalizes numeric variables (overrides `preentry` or `postcommand`) :param url: open url in web-browser (adds GUI button) :param kwlabel: keywords passed to ttk.Label :return: associated ttk.Entry object """ kwlabel = _tk_ttk_process_kw('TLabel', kwlabel) location = _absLocation(location) if isinstance(location, str): location, collection = _for_each_collection(location) multiple = not isinstance(location, str) if multiple: default = [default] * len(location) default, default_at_import = _setDefault(location, default) frm_top = ttk.Frame(_aux['parentGUI']) frm_top.pack(side=_aux['packing'], expand=[tk.YES, tk.NO][_aux['packing'] == tk.TOP], fill=tk.X, padx=5, pady=1) if comment is not None: frm = ttk.Frame(frm_top) frm.pack(side=tk.TOP, expand=tk.NO, fill=tk.X) _Label(frm, comment).pack(side=tk.LEFT) frm = ttk.Frame(frm_top) frm.pack(side=tk.TOP, expand=tk.NO, fill=tk.X) if lbl is None: lbl = location ttk.Label(frm, text=str(lbl) + " = " * np.sign(len(str(lbl))), **kwlabel).pack(side=tk.LEFT) if norm is not None: def post(location, norm): exec(location + '*=' + str(norm), globals(), locals()) def pre(value, norm): return value / norm preentry = lambda value, norm=norm: pre(value, norm) postcommand = lambda location, norm=norm: post(location, norm) if delete_if_default: postcommand = lambda location, postcommand=postcommand: delete_default(location, postcommand) e, value = _Entry(frm, location, lbl, updateGUI, help, preentry, postcommand, check=check, collect=multiple, **kw) # multiple entries if multiple: ttk.Label(frm, text='[%d]' % len(location), **kwlabel).pack(side=tk.LEFT) # multiline button elif multiline or isinstance(eval(location), str) and '\n' in eval(location): def showText(): _Text(frm, location, lbl, True, help, preentry, postcommand, **kw) ttk.Button(frm, text="...", command=showText, takefocus=False, width=3).pack(side=tk.LEFT, expand=tk.NO, fill=tk.NONE, padx=5) if not Lock(location, checkLock=True): # default button if default is not special1: def writeToEntry(): e.focus_set() e.grab_set() value.set(repr(default)) _aux['parentGUI'].update_idletasks() e.event_generate("<<virtualKey>>") e.grab_release() ttk.Button( frm, text=['d', 'D'][not default_at_import], command=writeToEntry, style='flat.TButton', takefocus=False, state=kw.get('state', 'enabled'), ).pack(side=tk.LEFT, expand=tk.NO, fill=tk.NONE, padx=0, pady=0) # help button _helpButton(frm, help) # web button _urlButton(frm, url) def delete_default(location, postcommand=None): if postcommand: postcommand() if eval(location) == default: exec(('del ' + location), globals(), locals()) if delete_if_default: delete_default(location) return e
[docs]@_available_to_userGUI def ComboBox( location, options, lbl=None, comment=None, updateGUI=False, state='readonly', help='', postcommand=None, check=special1, default=special1, url='', kwlabel={}, **kw, ): """ This method creates a GUI element of the combobox type. The background of the GUI gets colored green/red depending on whether the input by the user is a valid Python entry Notice that this method can be used to set multiple entries at once: `ComboBox(["root['asd']","root['dsa']","root['aaa']",],{'':[0,0,0],'a':[1,1,0],'b':[1,0,'***']},'Test multi',default=[0,0,0])` which comes very handy when complex/exclusive switch combinations need to be set in a namelist file, for example. Use the string `***` to leave parameters unchanged. :param location: location in the OMFIT tree (notice that this is either a string or a list of strings) :param options: possible options the user can choose from. This can be a list or a dictionary. :param lbl: Label which is put on the left of the entry :param comment: A comment which appears on top of the entry :param updateGUI: Force a re-evaluation of the GUI script when this parameter is changed :param state: * 'readonly' (default) the user can not type in whatever he wants * 'normal' allow user to type in * 'search' allow searching for entries :param help: help provided when user right-clicks on GUI element (adds GUI button) :param postcommand: command to be executed after the value in the tree is updated. This command will receive the OMFIT location string as an input :param check: function that returns whether what the user has entered in the entry GUI element is a valid entry. This will make the background colored yellow, and users will not be able to set the value. :param default: Set the default value if the tree location does not exist (adds GUI button) :param url: open url in web-browser (adds GUI button) :param kwlabel: keywords passed to ttk.Label :return: associated TkInter combobox object """ from utils_widgets import OMFITfont kwlabel = _tk_ttk_process_kw('TLabel', kwlabel) kw = _tk_ttk_process_kw('TEntry', kw) multiple = not isinstance(location, str) location = _absLocation(location) default, default_at_import = _setDefault(location, default) # escapeDescription will escape string options whose description matches the value def escapeDescription(description, value=None): description = str(description) if state == 'normal' and isinstance(value, str) and description == value and value != '': description = repr(description) return '***' + description + '***' # create key-text pair tmp = OrderedDict() if isinstance(options, (SortedDict, OrderedDict)): for key in list(options.keys()): tmp[escapeDescription(key, options[key])] = options[key] elif isinstance(options, dict): for key in sorted(list(options.keys()), key=str): tmp[escapeDescription(key, options[key])] = options[key] elif isinstance(options, (list, tuple)): for item in options: tmp[escapeDescription(item, item)] = item else: raise OMFITexception('ComboBox options can only be dictionaries, lists or tuples') options = tmp # handle wildcards # do not add wildcard entries if they have the same effective # value of an entry that does not have wildcards if multiple: for opt in list(options.keys()): modified = False tmp = copy.deepcopy(options[opt]) for k, loc in enumerate(location): if tmp[k] == '***': tmp[k] = eval(loc) modified = True if modified: alreadyThere = False for opt1 in options: if repr(options[opt1]) == repr(tmp): alreadyThere = True if alreadyThere: del options[opt] else: options[opt] = tmp # create inverse lookup dictionary values = SortedDict() for description in options: values[repr(options[description])] = description # add also the current entry if repr(_eval(location)) not in values: options[escapeDescription(repr_eval(location), _eval(location))] = copy.deepcopy(_eval(location)) values[repr_eval(location)] = escapeDescription(repr_eval(location), _eval(location)) if lbl is None: lbl = location frm_top = ttk.Frame(_aux['parentGUI']) frm_top.pack(side=_aux['packing'], expand=[tk.YES, tk.NO][_aux['packing'] == tk.TOP], fill=tk.X, padx=5, pady=1) if comment is not None: frm = ttk.Frame(frm_top) frm.pack(side=tk.TOP, expand=tk.NO, fill=tk.X) _Label(frm, comment).pack(side=tk.LEFT) frm = ttk.Frame(frm_top) frm.pack(side=tk.TOP, expand=tk.NO, fill=tk.X) ttk.Label(frm, text=lbl + " = " * np.sign(len(str(lbl))), **kwlabel).pack(side=tk.LEFT) comboBox = Combobox( frm, state=state if state != 'search' else 'normal', values=tuple([k.strip('*') for k in list(options.keys())]), takefocus=False, **kw, ) comboBox.configure(font=OMFITfont()) comboBox.pack(side=tk.LEFT, expand=tk.YES, fill=tk.X) comboBox.set(values[repr_eval(location)][3:-3]) if Lock(location, checkLock=True): comboBox.config(state=tk.DISABLED) else: def update_tree_values(location, updateGUI): tmp = checkKeyPressed() if state == 'search': if escapeDescription(comboBox.get()) not in options: escape() return if tmp[0] is False: printe('Invalid Python entry: ' + comboBox.get()) return elif tmp[0] is None and tmp[1] is None: printe('Tree entry ' + location + ' should not be set as: ' + comboBox.get()) return elif tmp[0] is None: printe('Tree entry ' + location + ' should be ' + tmp[1]) return if escapeDescription(comboBox.get()) in options: tmp = options[escapeDescription(comboBox.get())] else: tmp = _eval(comboBox.get()) if isinstance(tmp, list) and isinstance(_eval(location), np.ndarray) and len(_eval(location).shape) == 1: tmp = np.atleast_1d(tmp) if isinstance(location, str): exec(location + '=' + repr(tmp), globals(), locals()) else: for k, loc in enumerate(location): exec(loc + '=' + repr(tmp) + '[k]', globals(), locals()) if postcommand is not None: manage_user_errors(lambda: postcommand(location=location), reportUsersErrorByEmail=True) OMFITaux['rootGUI'].event_generate("<<update_treeGUI>>") if updateGUI: OMFITaux['rootGUI'].event_generate("<<update_GUI>>") else: checkKeyPressed() def checkKeyPressed(): # reorder entries based on matching search if state == 'search': try: coptions = [x for x in comboBox.configure('values')[4] if x != '-----'] except tk.TclError: pass else: tmp_options = [] for item in coptions: if comboBox.get().lower() in item.lower(): tmp_options.append(item) tmp_options.append('-----') for item in coptions: if comboBox.get().lower() not in item.lower(): tmp_options.append(item) comboBox.configure(values=tuple(tmp_options)) try: try: # if the value is one of the options if escapeDescription(comboBox.get()) in options: if repr(options[escapeDescription(comboBox.get())]) == repr_eval(location): comboBox.state(['active', '!invalid', '!alternate']) comboBox.config(style='TCombobox') else: comboBox.state(['active', '!invalid', '!alternate']) comboBox.config(style='valid.TCombobox') # if in search mode elif state == 'search': comboBox.state(['active', '!invalid', '!alternate']) comboBox.config(style='exist.TCombobox') return True, 'search' # if the value is not a valid option else: tmp = repr_eval(comboBox.get()) # if the value equals what's in the tree location (--> white) if tmp == repr_eval(location): comboBox.state(['!active', '!invalid', '!alternate']) comboBox.config(style='TCombobox') # if the value equals what's in the tree location (--> green) else: comboBox.state(['active', '!invalid', '!alternate']) comboBox.config(style='valid.TCombobox') # if there is a check and does not pass (--> orange) if check is not special1 and not check(_eval(tmp)): comboBox.state(['!active', 'invalid', '!alternate']) comboBox.config(style='check.TCombobox') # if anything fails (--> red) except Exception as _excp: comboBox.state(['!active', '!invalid', 'alternate']) comboBox.config(style='error.TCombobox') if comboBox.instate(['alternate']): return False, None elif comboBox.instate(['invalid']): tmp = None if 'is_' == check.__name__[:3]: tmp = check.__name__[3:] return None, tmp else: return True, None except Exception: pass def escape(): try: comboBox.set(values[repr_eval(location)][3:-3]) except Exception: comboBox.set(repr_eval(location)) checkKeyPressed() comboBox.bind('<<ComboboxSelected>>', lambda event: update_tree_values(location, updateGUI)) comboBox.bind('<Return>', lambda event: update_tree_values(location, updateGUI)) comboBox.bind('<KP_Enter>', lambda event: update_tree_values(location, updateGUI)) comboBox.bind("<Key>", lambda event: comboBox.after(1, checkKeyPressed)) comboBox.bind("<Escape>", lambda event: comboBox.after(1, escape)) checkKeyPressed() # default button if default is not special1: def writeToEntry(): if repr(default) in values: comboBox.set(values[repr(default)][3:-3]) elif state == 'normal': comboBox.set(repr(default)) else: raise OMFITexception('Combobox %s: Default value %s is not a valid option' % (location, default)) update_tree_values(location, updateGUI) checkKeyPressed() ttk.Button( frm, text=['d', 'D'][not default_at_import], command=writeToEntry, style='flat.TButton', takefocus=False, state=['enabled', 'disabled'][state == 'disabled'], ).pack(side=tk.LEFT, expand=tk.NO, fill=tk.NONE, padx=0, pady=0) # help button _helpButton(frm, help) # web button _urlButton(frm, url) comboBox.bind(f"<{rightClick}>", lambda event: _reveal(location, lbl, help)) # break mousewheel scroll comboBox.bind("<MouseWheel>", 'break') comboBox.bind("<Button-4>", 'break') comboBox.bind("<Button-5>", 'break') return comboBox
[docs]@_available_to_userGUI class same_row(object): ''' Environment to place GUI elements on the same row For example to place two buttons side by side: >> with OMFITx.same_row(): >> OMFITx.Button('Run', lambda: None) >> OMFITx.Button('Plot', lambda: None) ''' def __enter__(self): self.frm_top = ttk.Frame(_aux['parentGUI']) self.old_aux = {} self.old_aux.update(_aux) _aux['same_row'] = _aux['parentGUI'] _aux['parentGUI'] = self.frm_top _aux['packing'] = tk.LEFT return self def __exit__(self, type, value, traceback): _aux['packing'] = self.old_aux['packing'] _aux['parentGUI'] = self.old_aux['parentGUI'] _aux['same_row'] = self.old_aux['same_row'] self.frm_top.pack(side=tk.TOP, expand=tk.NO, fill=tk.X, padx=0, pady=0)
[docs]@_available_to_userGUI class same_tab(object): ''' Environment to place GUI elements within the same tab For example to place two buttons in the same tab named 'test' >> with OMFITx.same_tab('test'): >> OMFITx.Button('Run', lambda: None) >> OMFITx.Button('Plot', lambda: None) ''' def __init__(self, tab_name): self.tab_name = tab_name def __enter__(self): self.old_tab_name = _aux['tab_name'] Tab(self.tab_name) return self def __exit__(self, type, value, traceback): Tab(self.old_tab_name)
[docs]@_available_to_userGUI def Button(lbl, command, help='', url='', closeGUI=False, updateGUI=False, **kw): r""" This method creates a GUI element of the button type :param lbl: the text to be written on the button :param command: the command to be executed :param help: help provided when user right-clicks on GUI element (adds GUI button) :param url: open url in web-browser (adds GUI button) :param updateGUI: Force a re-evaluation of the GUI script when this parameter is changed :param closeGUI: Close current GUI after executing the command :param \**kw: extra keywords are passed to the ttk.Button :return: associated ttk.Button object """ kw = _tk_ttk_process_kw('TButton', kw) reveal = command if isinstance(command, str): tmp = os.path.splitext(command) reveal = _absLocation(tmp[0]) + tmp[1] command = eval(reveal) if closeGUI: def command_and_close(current_gui, user_command): user_command() _clearClosedGUI(current_gui) command = lambda current_gui=_aux['topGUI'], user_command=command: command_and_close(current_gui, user_command) frm_top = ttk.Frame(_aux['parentGUI']) frm_top.pack(side=_aux['packing'], expand=[tk.YES, tk.NO][_aux['packing'] == tk.TOP], fill=tk.X, padx=5, pady=1) def buttonClick(btt, command, updateGUI): btt.config(state=tk.DISABLED) try: command() finally: def back2normal(btt): try: btt.config(state=tk.NORMAL) except Exception: pass OMFITaux['rootGUI'].event_generate("<<update_treeGUI>>") if updateGUI: OMFITaux['rootGUI'].event_generate("<<update_GUI>>") else: btt.after(250, lambda: back2normal(btt)) btt = ttk.Button( frm_top, text=lbl, command=lambda: buttonClick(btt, lambda: manage_user_errors(command, reportUsersErrorByEmail=True), updateGUI), takefocus=False, **kw, ) btt.pack(side=tk.LEFT, expand=tk.YES, fill=tk.X) # help button _helpButton(frm_top, help) # web button _urlButton(frm_top, url) btt.bind(f"<{rightClick}>", lambda event: _reveal(reveal, lbl, help)) return btt
[docs]@_available_to_userGUI def Label(lbl, align='center', **kw): """ This method creates a GUI element of the label type :param lbl: the text to be written in the label :param align: alignment of the text in the frame :return: associated ttk.Label object """ frm_top = ttk.Frame(_aux['parentGUI']) frm_top.pack(side=_aux['packing'], expand=tk.NO, fill=tk.X, padx=2, pady=2) image = kw.pop('image', None) if image is not None: try: im = tk.PhotoImage(master=frm_top, file=image) except tk.TclError: import PIL.ImageTk im = PIL.ImageTk.PhotoImage(master=frm_top, file=image) label = _Label(frm_top, lbl, **kw) if image is not None: label.configure(image=im) label._ntimage = im alignment = {'left': tk.LEFT, 'center': tk.TOP, 'right': tk.RIGHT} label.pack(side=alignment[align], expand=tk.NO, fill=tk.X) def robust_configure_size(event=None, label=label, frm_top=frm_top): try: label.config(wraplength=frm_top.winfo_width()) except tk.TclError: pass _aux['configure_size'].append( (frm_top, lambda event=None, label=label, frm_top=frm_top: robust_configure_size(event=None, label=label, frm_top=frm_top)) ) return label
[docs]@_available_to_userGUI def Separator(lbl=None, kwlabel={}, **kw): r""" This method creates a TkInter separator object :param lbl: text to be written between separator lines :param kwlabel: keywords passed to ttk.Label :param \**kw: keywords passed to ttk.Label :return: associated ttk.Label object """ from utils_widgets import OMFITfont kwlabel.update(kw) if 'font' not in kwlabel: kwlabel['font'] = OMFITfont('bold') kwlabel = _tk_ttk_process_kw('TLabel', kwlabel) if not lbl: sep = ttk.Separator(_aux['parentGUI']) sep.pack(side=_aux['packing'], expand=tk.NO, fill=tk.X, padx=5, pady=2) else: frm_top = ttk.Frame(_aux['parentGUI']) frm_top.pack(side=_aux['packing'], expand=[tk.YES, tk.NO][_aux['packing'] == tk.TOP], fill=tk.X, padx=0, pady=0) ttk.Separator(frm_top).pack(side=tk.LEFT, expand=tk.YES, fill=tk.X, padx=5, pady=2) tmp = ttk.Label(frm_top, text=str(lbl), **kwlabel) tmp.pack(expand=tk.NO, fill=tk.NONE, side=tk.LEFT) ttk.Separator(frm_top).pack(side=tk.LEFT, expand=tk.YES, fill=tk.X, padx=5, pady=2) return tmp
[docs]@_available_to_userGUI def FilePicker( location, lbl=None, comment=None, updateGUI=False, help='', postcommand=None, localRemote=True, transferRemoteFile=True, directory=False, action='open', tree=True, default=special1, url='', kwlabel={}, init_directory_location=None, init_pattern_location=None, favorite_list_location=None, pattern_list_location=None, reveal_location=None, **kw, ): r""" This method creates a GUI element of the filePicker type, which allows to pick a file/directory :param location: location in the OMFIT tree (notice that this is a string) :param lbl: label to be shown near the button :param help: help provided when user right-clicks on GUI element (adds GUI button) :param postcommand: command to be executed after the value in the tree is updated. This command will receive the OMFIT location string as an input :param updateGUI: Force a re-evaluation of the GUI script when this parameter is changed :param localRemote: True: both, 'local': only local, 'remote': only remote :param transferRemoteFile: controls what goes into location * string with local filename (if transferRemoteFile==True) * string with the filename (if transferRemoteFile==False) * tuple with the filename,server,tunnel (if transferRemoteFile==None) if transferRemoteFile=True, then the file is transferred to a temporary folder if transferRemoteFile is a string, then it will be interpreted as the directory where to move the file :param directory: whether it's a directory or a file :param action: 'open' or 'save' :param tree: load from OMFIT tree location :param url: open url in web-browser (adds GUI button) :param kwlabel: keywords passed to ttk.Label :param init_directory_location: The contents of this location are used to set the initial directory for file searches. If a file name is specified the directory will be determined from the file name and this input ignored. Otherwise, if set this will be used to set the initial directory. :param init_pattern_location: The default pattern is '*'. If this is specified then the contents of the tree location will replace the default intial pattern. :param favorite_list_location: OMFIT tree location which contains a possibly empty list of favorite file directories. To keep with the general omfit approach this should be a string. :param pattern_list_location: OMFIT tree location which contains a possibly empty list of favorite search patterns. To keep with the general omfit approach this should be a string. :param reveal_location: location used for creation of the help (this is used internally by OMFIT, should not be used by users) :param \**kw: keywords passed to Entry object :return: associated ttk.Entry object """ kwlabel = _tk_ttk_process_kw('TLabel', kwlabel) location = _absLocation(location) default, default_at_import = _setDefault(location, default) if reveal_location is None: reveal_location = location if action != 'open': transferRemoteFile = False localRemote = 'local' # todo: allow save to remote files def processFilename(tmp): exec(location + '=' + repr(tmp), globals(), locals()) if postcommand is not None: manage_user_errors(lambda: postcommand(location=location), reportUsersErrorByEmail=True) value.set(repr(tmp)) try: e.icursor(tk.END) e.xview(tk.END) except Exception: pass OMFITaux['rootGUI'].event_generate("<<update_treeGUI>>") if updateGUI: OMFITaux['rootGUI'].event_generate("<<update_GUI>>") def askTree(): top = tk.Toplevel(_topGUI(_aux['parentGUI'])) top.withdraw() top.transient(_aux['parentGUI']) top.wm_title('Pick file tree location') _Label(top, 'Tree location: ').pack(side=tk.LEFT) var = tk.OneLineText(top, width=50, percolator=True) var.set(OMFITaux['GUI'].focusRoot) var.pack(side=tk.LEFT, padx=2, pady=5, fill=tk.X, expand=tk.YES) def onReturn(var=None): if hasattr(eval(var.get()), 'filename') and eval(var.get()).filename: if hasattr(eval(var.get()), 'deploy'): filename = tempfile._get_default_tempdir() + '/' + os.path.basename(eval(var.get()).filename) eval(var.get()).deploy(filename) processFilename(filename) if os.path.isdir(filename): shutil.rmtree(filename, ignore_errors=True) else: os.remove(filename) elif hasattr(eval(var.get()), 'save'): eval(var.get()).save() processFilename(eval(var.get()).filename) else: printw(var.get() + " has no .deploy() or .save() methods! The chosen object may not be in sync.") processFilename(eval(var.get()).filename) else: processFilename(var.get()) top.destroy() def onEscape(): top.destroy() var.focus_set() top.bind('<Return>', lambda event: onReturn(var=var)) top.bind('<KP_Enter>', lambda event: onReturn(var=var)) top.bind('<Escape>', lambda event: onEscape()) top.protocol("WM_DELETE_WINDOW", top.destroy) top.update_idletasks() top.deiconify() top.wait_window(top) def askRemote(): remoteFilename = None server = 'localhost' tunnel = '' try: tmp = eval(location) except Exception: pass else: if isinstance(tmp, str) and len(tmp): remoteFilename = tmp elif isinstance(tmp, (tuple, list, np.ndarray)) and len(tmp[0]): if len(tmp): remoteFilename = tmp[0] if len(tmp) > 1: server = tmp[1] if len(tmp) > 2: tunnel = tmp[2] tmp = remoteFile( _aux['parentGUI'], transferRemoteFile, remoteFilename=remoteFilename, server=server, tunnel=tunnel, init_directory_location=init_directory_location, init_pattern_location=init_pattern_location, favorite_list_location=favorite_list_location, pattern_list_location=pattern_list_location, is_dir=directory, ) if (isinstance(tmp, str) and len(tmp)) or (isinstance(tmp, (tuple, list, np.ndarray)) and len(tmp[0])): processFilename(tmp) frm_top = ttk.Frame(_aux['parentGUI']) frm_top.pack(side=_aux['packing'], expand=[tk.YES, tk.NO][_aux['packing'] == tk.TOP], fill=tk.X, padx=5, pady=1) if comment is not None: frm = ttk.Frame(frm_top) frm.pack(side=tk.TOP, expand=tk.NO, fill=tk.X) _Label(frm, comment).pack(side=tk.LEFT) frm = ttk.Frame(frm_top) frm.pack(side=tk.TOP, expand=tk.YES, fill=tk.X, pady=1) if lbl is None: if not directory: lbl = 'Pick File' else: lbl = 'Pick Directory' ttk.Label(frm, text=str(lbl) + " = " * np.sign(len(str(lbl))), **kwlabel).pack(side=tk.LEFT) e, value = _Entry( frm, location=location, lbl=lbl, updateGUI=updateGUI, help=help, postcommand=postcommand, reveal_location=reveal_location, **kw ) e.icursor(tk.END) e.xview(tk.END) e.config(width=30) bttTree = ttk.Button(frm, text='Tree', takefocus=False) if tree and ( not localRemote or localRemote is True or localRemote == 'local' or (localRemote == 'remote' and transferRemoteFile is not False) ): bttTree.pack(side=tk.LEFT, expand=tk.NO, fill=tk.X) bttTree.bind(f"<{rightClick}>", lambda event: _reveal(reveal_location, lbl, help)) bttRemote = ttk.Button(frm, text=['File', 'Directory'][directory], takefocus=False) if localRemote: bttRemote.pack(side=tk.LEFT, expand=tk.NO, fill=tk.X) bttRemote.bind(f"<{rightClick}>", lambda event: _reveal(reveal_location, lbl, help)) if Lock(location, checkLock=True): bttTree.config(state=tk.DISABLED) bttRemote.config(state=tk.DISABLED) else: bttTree.config(command=askTree) bttRemote.config(command=askRemote) # default button if default is not special1: def writeToEntry(): e.focus_set() e.grab_set() value.set(repr(default)) _aux['parentGUI'].update_idletasks() e.event_generate("<<virtualKey>>") e.grab_release() ttk.Button(frm, text=['d', 'D'][not default_at_import], command=writeToEntry, style='flat.TButton', takefocus=False).pack( side=tk.LEFT, expand=tk.NO, fill=tk.NONE, padx=0, pady=0 ) # help button _helpButton(frm, help) # web button _urlButton(frm, url) return e
[docs]@_available_to_userGUI def ObjectPicker( location, lbl=None, objectType=None, objectKW={}, postcommand=None, unset_postcommand=None, kwlabel={}, init_directory_location=None, init_pattern_location=None, favorite_list_location=None, pattern_list_location=None, **kw, ): r""" This helper method creates a GUI element of the objectPicker type, which allows to load objects in the tree. If an object is already present at the location, then a button allows picking of a different object. Notice that this GUI element will always call an updateGUI :param location: location in the OMFIT tree (notice that this is a string) :param lbl: label to be shown near the button/object picker :param objectType: class of the object that one wants to load (e.g. OMFITnamelist, OMFITgeqdsk, ...) if `objectType is None` then the object selected with `Tree` is deepcopied :param objectKW: keywords passed to the object :param postcommand: command to be executed after the value in the tree is updated. This command will receive the OMFIT location string as an input. :param unset_postcommand: command to be executed after the value in the tree is deleted. This command will receive the OMFIT location string as an input. :param kwlabel: keywords passed to ttk.Label :param init_directory_location: The contents of this location are used to set the initial directory for file searches. If a file name is specified the directory will be determined from the file name and this input ignored. Otherwise, if set this will be used to set the initial directory. :param init_pattern_location: The default pattern is '*'. If this is specified then the contents of the tree location will replace the default intial pattern. :param favorite_list_location: OMFIT tree location which contains a possibly empty list of favorite file directories. To keep with the general omfit approach this should be a string. :param pattern_list_location: OMFIT tree location which contains a possibly empty list of favorite search patterns. To keep with the general omfit approach this should be a string. :param \**kw: extra keywords are pased to the FilePicker object :return: associated ttk.Entry object """ if isinstance(objectType, str): raise ValueError('ObjectPicker: object_type is not supposed to be a string') loc_orig = location location = _absLocation(location) locationObject = location locationScratch = _absLocation("scratch[" + repr('objectPicker_' + omfit_hash(location, 10)) + "]") tmp = parseLocation(location) where = eval(buildLocation(tmp[:-1])) what = tmp[-1] if objectType is None: kw['tree'] = True kw['localRemote'] = False def set(location=None): if objectType is None: where[what] = copy.deepcopy(eval(eval(locationScratch))) else: where[what] = objectType(eval(locationScratch), **objectKW) if postcommand is not None: manage_user_errors(lambda: postcommand(location=locationObject), reportUsersErrorByEmail=True) def unset(): del where[what] if unset_postcommand is not None: manage_user_errors(lambda: unset_postcommand(location=locationObject), reportUsersErrorByEmail=True) if lbl is None: lbl = loc_orig if what in where: return Button('Pick a different ' + lbl, unset, updateGUI=True) else: kw.setdefault('default', '') kw.pop('updateGUI', None) kw.setdefault('transferRemoteFile', [True, None][isinstance(objectType, OMFITobject)]) return FilePicker( locationScratch, "Pick " + lbl, postcommand=set, updateGUI=True, kwlabel=kwlabel, init_directory_location=init_directory_location, init_pattern_location=init_pattern_location, favorite_list_location=favorite_list_location, pattern_list_location=pattern_list_location, reveal_location=location, **kw, )
[docs]@_available_to_userGUI def ModulePicker(location, modules=None, lbl=None, *args, **kw): r""" This method creates a GUI element of the combobox type for the selection of modules within the OMFIT project. :param location: location in the OMFIT tree (notice that this is either a string or a list of strings) :param modules: string or list of strings with IDs of the allowed modules. If modules is None all modules in OMFIT are listed :param lbl: label to be shown near the combobox :param load: list of two elements lists with module name and location where modules can be loaded eg. [['OMFITprofiles',"root['OMFITprofiles']"],['EFIT',"OMFITmodules[-2]['EFIT']"],] Setting `load=True` will set loading of the modules as submodules :param \*args: arguments passed to OMFITx.ComboBox :param \**kw: keywords passed to OMFITx.ComboBox :return: returns from OMFITx.ComboBox """ if 'default' in kw: default, default_at_import = _setDefault(_absLocation(location), kw.pop('default')) try: existing_location = _absLocation(eval(_absLocation(location))) except Exception: existing_location = None options = {} modulesList = OMFIT.moduleDict() for module in modulesList: if modules is None or modulesList[module]['ID'] in tolist(modules): if 'OMFIT' + module == existing_location: options["[%s] -- %s" % (modulesList[module]['ID'], existing_location)] = eval(_absLocation(location)) else: options["[%s] -- OMFIT%s" % (modulesList[module]['ID'], module)] = 'OMFIT' + module if kw.get('load', []) is True: kw['load'] = [] for module in tolist(modules): kw['load'].append([module, "root['%s']" % module]) if not kw.get('load', []): kw['load'] = [] for m, l0cation in kw.pop('load', []): l0 = _absLocation(l0cation) try: eval(l0) except KeyError: options["load %s in %s" % (m, l0)] = '%s/%s' % (m, l0cation) if eval(_absLocation(location)) not in list(options.values()): options['--'] = eval(_absLocation(location)) old_postcommand = kw.get('postcommand', None) def postcommand(location): if '/' in eval(_absLocation(location)): m, l = eval(_absLocation(location)).split('/') OMFIT.loadModule(m, _absLocation(l)) exec('%s="%s"' % (location, l), globals(), locals()) UpdateGUI() if old_postcommand is not None: old_postcommand(location) kw['postcommand'] = postcommand kw.setdefault('width', int(max([len(k) for k in list(options.keys())]))) return ComboBox(location, options, lbl, *args, **kw)
[docs]@_available_to_userGUI def TreeLocationPicker( location, lbl=None, comment=None, kwlabel={}, default=special1, help='', url='', updateGUI=False, postcommand=None, check=None, base=None, **kw, ): r""" This method creates a GUI element used to select a tree location The label of the GUI turns green/red if the input by the user is a valid OMFIT tree entry (non existing tree entries are allowed) The label of the GUI turns green/red if the input by the user does or doesn't satisfy the `check` (non valid tree entries are NOT allowed) :param location: location in the OMFIT tree (notice that this is a string) :param lbl: Label which is put on the left of the entry :param comment: A comment which appears on top of the entry :param kwlabel: keywords passed to ttk.Label :param default: Set the default value if the tree location does not exist (adds GUI button) :param help: help provided when user right-clicks on GUI element (adds GUI button) :param url: open url in web-browser (adds GUI button) :param updateGUI: Force a re-evaluation of the GUI script when this parameter is changed :param postcommand: command to be executed after the value in the tree is updated. This command will receive the OMFIT location string as an input :param check: function that returns whether what the user has entered in the entry GUI element is a valid entry This will make the label colored yellow, and users will not be able to set the value. :param base: object in location with respect to which relative locations are evaluated :param \**kw: keywords passed to OneLineText object :return: associated ttk.Entry object """ location = _absLocation(location) default, default_at_import = _setDefault(location, default) frm_top = ttk.Frame(_aux['parentGUI']) frm_top.pack(side=_aux['packing'], expand=[tk.YES, tk.NO][_aux['packing'] == tk.TOP], fill=tk.X, padx=5, pady=1) if comment is not None: frm = ttk.Frame(frm_top) frm.pack(side=tk.TOP, expand=tk.NO, fill=tk.X) _Label(frm, comment).pack(side=tk.LEFT, expand=tk.YES, fill=tk.X) frm = ttk.Frame(frm_top) frm.pack(side=tk.TOP, expand=tk.YES, fill=tk.X) if lbl is None: lbl = location label = ttk.Label(frm, text=str(lbl), **kwlabel) label.pack(side=tk.LEFT, expand=tk.NO, fill=tk.X) ttk.Label(frm, text=" = " * np.sign(len(str(lbl))), **kwlabel).pack(side=tk.LEFT) kw.setdefault('percolator', True) kw.setdefault('width', 15) e = tk.OneLineText(frm, **kw) e.pack(side=tk.LEFT, expand=tk.YES, fill=tk.X) e.set(eval(location)) if Lock(location, checkLock=True): e.config(state=tk.DISABLED) else: def update_tree_values(): tmp = checkKeyPressed() if check is None and tmp is False: printw(f'Location {e.get()} does not exist') elif tmp is None: printe(f'Location {e.get()} does not satisfy checks. Value not set. Press <ESC> to restore previous value.') return else: label.config(foreground='forestgreen') exec(location + "=" + repr(e.get()), globals(), locals()) if postcommand is not None: manage_user_errors(lambda: postcommand(location=location), reportUsersErrorByEmail=True) OMFITaux['rootGUI'].event_generate("<<update_treeGUI>>") if updateGUI: OMFITaux['rootGUI'].event_generate("<<update_GUI>>") def checkKeyPressed(): if check is not None: try: if not check(e.get()): label.config(foreground='red2') return None except Exception: label.config(foreground='red2') return None label.config(foreground='forestgreen') return True else: try: if base is None: tmp = _absLocation(e.get()) else: tmp = absLocation(e.get(), base, False) eval(tmp) # check that the tree location exists label.config(foreground='forestgreen') return True except Exception as _excp: label.config(foreground='red2') return False def escape(): e.set(eval(location)) checkKeyPressed() def pickTree(): e.set(OMFITaux['GUI'].focusRoot) update_tree_values() ttk.Button(frm, text='Tree', command=pickTree).pack(side=tk.LEFT, expand=tk.NO, fill=tk.NONE) # default button if default is not special1: def writeToEntry(): e.focus_set() e.grab_set() e.set(default) _aux['parentGUI'].update_idletasks() e.event_generate("<<virtualKey>>") e.grab_release() d = ttk.Button(frm, text=['d', 'D'][not default_at_import], command=writeToEntry, style='flat.TButton', takefocus=False) d.pack(side=tk.LEFT, expand=tk.NO, fill=tk.NONE, padx=0, pady=0) e.bind("<Return>", lambda event: update_tree_values()) e.bind("<KP_Enter>", lambda event: update_tree_values()) e.bind("<Key>", lambda event: e.after(1, checkKeyPressed)) e.bind("<<virtualKey>>", lambda event: checkKeyPressed()) e.bind("<Escape>", lambda event: e.after(1, escape)) checkKeyPressed() e.bind(f"<{rightClick}>", func=lambda event: _reveal(location, lbl, help)) # help button _helpButton(frm, help) # web button _urlButton(frm, url)
[docs]@_available_to_userGUI def CheckBox( location, lbl=None, comment=None, useInt=False, mapFalseTrue=[], updateGUI=False, help='', postcommand=None, default=special1, url='', kwlabel={}, **kw, ): r""" This method creates a GUI element of the checkbutton type This method accepts a list of locations, labels and defaults :param location: location in the OMFIT tree (notice that this is a string) :param lbl: Label which is put on the left of the entry :param comment: A comment which appears on top of the entry :param useInt: Use integers (1 or 0) instead of boolean (True or False) :param mapFalseTrue: a 2 elements list, the first one for the unchecked, the second one for the checked button :param updateGUI: Force a re-evaluation of the GUI script when this parameter is changed :param help: help provided when user right-clicks on GUI element (adds GUI button) :param postcommand: command to be executed after the value in the tree is updated. This command will receive the OMFIT location string as an input :param default: Set the default value if the tree location does not exist (adds GUI button) :param url: open url in web-browser (adds GUI button) :param kwlabel: keywords passed to ttk.Label :param \**kw: extra keywords are pased to the Checkbutton object :return: associated TkInter checkbutton object >>> OMFITx.CheckBox(["OMFIT['ck']","OMFIT['ck1']"],['hello','asd'],default=[False,True]) >>> OMFITx.CheckBox("OMFIT['ck']",'hello',default=False) """ kw.update(kwlabel) kw = _tk_ttk_process_kw('TCheckbutton', kw) location0 = location location = _absLocation(location) default, default_at_import = _setDefault(location, default) location = tolist(location) # define direct and inverse mappings if not len(mapFalseTrue): mapFalseTrue = [False, True] if useInt: mapFalseTrue = [0, 1] invMapFalseTrue = {repr(mapFalseTrue[0]): False, repr(mapFalseTrue[1]): True} frm_top = ttk.Frame(_aux['parentGUI']) frm_top.pack(side=_aux['packing'], expand=[tk.YES, tk.NO][_aux['packing'] == tk.TOP], fill=tk.X, padx=5, pady=1) if comment is not None: frm = ttk.Frame(frm_top) frm.pack(side=tk.TOP, expand=tk.NO, fill=tk.X) _Label(frm, comment).pack(side=tk.LEFT) frm = ttk.Frame(frm_top) frm.pack(side=tk.TOP, expand=tk.NO, fill=tk.X) if lbl is None: lbl = location lbl = tolist(lbl) # set the ticks e = [] for k, loc in enumerate(location): e.append(ttk.Checkbutton(frm, text=str(lbl[k]), takefocus=False, **kw)) e[k].pack(side=tk.LEFT, expand=tk.NO, fill=tk.X) e[k].state(['!alternate']) if repr(eval(loc)) not in invMapFalseTrue: # color box bg indicating user that value is invalid with FalseTrue possibilities e[k].state(['alternate']) elif invMapFalseTrue[repr(eval(loc))]: e[k].state(['selected']) else: e[k].state(['!selected']) ttk.Frame(frm).pack(side=tk.LEFT, expand=tk.YES, fill=tk.X) if Lock(location, checkLock=True) or kw.get('state', False) == 'disabled': for k in range(len(location)): e[k].config(state=tk.DISABLED) else: def update_tree_values(location, updateGUI, k): loc = location[k] if repr(eval(loc)) not in invMapFalseTrue: eloc = mapFalseTrue[1] else: eloc = mapFalseTrue[int(not invMapFalseTrue[repr(eval(loc))])] exec(loc + "=" + repr(eloc), globals(), locals()) if postcommand is not None: # pass the location of the item to postcommand manage_user_errors(lambda: postcommand(location=loc), reportUsersErrorByEmail=True) OMFITaux['rootGUI'].event_generate("<<update_treeGUI>>") if updateGUI: OMFITaux['rootGUI'].event_generate("<<update_GUI>>") for k, loc in enumerate(location): e[k].bind('<ButtonRelease-1>', lambda event, k=k: update_tree_values(location, updateGUI, k)) # default button if default is not special1: def writeToEntry(): for k, loc in enumerate(location): exec(loc + "=" + repr(tolist(default)[k]), globals(), locals()) if postcommand is not None: # one single post-command when hitting `default` loc = location if len(loc) == 1: loc = loc[0] manage_user_errors(lambda: postcommand(location=loc), reportUsersErrorByEmail=True) for k, loc in enumerate(location): if invMapFalseTrue.get(repr(eval(loc)), eval(loc)): e[k].state(['selected']) else: e[k].state(['!selected']) OMFITaux['rootGUI'].event_generate("<<update_treeGUI>>") if updateGUI: OMFITaux['rootGUI'].event_generate("<<update_GUI>>") ttk.Button( frm, text=['d', 'D'][not default_at_import], command=writeToEntry, style='flat.TButton', takefocus=False, state=kw.get('state', 'enabled'), ).pack(side=tk.LEFT, expand=tk.NO, fill=tk.NONE, padx=0, pady=0) # help button _helpButton(frm, help) # web button _urlButton(frm, url) for k, loc in enumerate(location): e[k].bind(f"<{rightClick}>", func=lambda event: _reveal(location, lbl, help)) return e
[docs]@_available_to_userGUI def ListEditor( location, options, lbl=None, default=None, unique=True, ordered=True, updateGUI=False, postcommand=None, only_valid_options=False, help='', url='', show_delete_button=False, max=None, ): ''' GUI element to add or remove objects to a list Note: multiple items selection possible with the Shift and Ctrl keys :param location: location in the OMFIT tree (notice that this is a string). :param options: possible options the user can choose from. This can be a tree location, a list, or a dictionary. If a dictinoary, then keys are shown in the GUI and values are set in the list. In order to use "show_delete_button", this must be a string giving the location of a list in the tree. :param lbl: Label which is put on the left of the entry :param default: Set the default value if the tree location does not exist :param unique: Do not allow repetitions in the list :param ordered: Keep the same order as in the list of options If false, then buttons to move elements up/down are shown :param updateGUI: Force a re-evaluation of the GUI script when this parameter is changed :param postcommand: function to be called after a button is pushed. It is called as postcommand(location=location,button=button) where button is in ['add','add_all','remove','remove_all'] :param only_valid_options: list can only contain valid options :param help: help provided when user right-clicks on GUI element (adds GUI button) :param url: open url in web-browser (adds GUI button) :param show_delete_button: bool: Show an additional button for deleting items from the left hand list :param max: allow at most MAX choices ''' if default is None: default = [] location = _absLocation(location) default, default_at_import = _setDefault(location, default) if isinstance(options, str): options = _absLocation(options) if unique: eval(options)[:] = unsorted_unique(eval(options)) opts = eval(options) elif isinstance(options, dict): opts = list(options.keys()) reversed_options = flip_values_and_keys(options) else: if unique: options = unsorted_unique(options) opts = options if unique: eval(location)[:] = unsorted_unique(eval(location)) if lbl is None: lbl = location if only_valid_options: for item in set(eval(location)).difference(set(opts)): eval(location).remove(item) selects = eval(location) if not isinstance(selects, list): raise Exception('%s: OMFITx.ListEditor works only with lists' % location) def post(button): if postcommand is not None: manage_user_errors(lambda: postcommand(location=location, button=button), reportUsersErrorByEmail=True) def add(): # the GUI ixs = olx.curselection() if not ixs: return items = [opts[int(item)] for item in ixs] # the tree elements for item in items: if isinstance(options, dict): item = options[item] if not unique or item not in selects: selects.append(item) if max is not None and len(selects) > max: for n in range(len(selects) - max): selects.pop(0) if ordered: if isinstance(options, dict): selects.sort(key=lambda x: list(options.values()).index(x)) else: selects.sort(key=lambda x: opts.index(x)) # the GUI slx.delete(0, tk.END) for item in selects: if isinstance(options, dict): item = reversed_options.get(item, item) slx.insert(tk.END, item) OMFITaux['rootGUI'].event_generate("<<update_treeGUI>>") if updateGUI: OMFITaux['rootGUI'].event_generate("<<update_GUI>>") post('add') def add_all(): # the tree elements if isinstance(options, dict): selects[:] = list(options.values()) else: selects[:] = opts # the GUI slx.delete(0, tk.END) for item in selects: if isinstance(options, dict): item = reversed_options.get(item, item) slx.insert(tk.END, item) OMFITaux['rootGUI'].event_generate("<<update_treeGUI>>") if updateGUI: OMFITaux['rootGUI'].event_generate("<<update_GUI>>") post('add_all') def remove(): # the GUI ixs = slx.curselection() if not ixs: return ixs = list(map(int, ixs)) for k in sorted(ixs)[::-1]: slx.delete(k) selects.pop(k) # the tree elements OMFITaux['rootGUI'].event_generate("<<update_treeGUI>>") if updateGUI: OMFITaux['rootGUI'].event_generate("<<update_GUI>>") post('remove') def remove_all(): # the GUI slx.delete(0, tk.END) selects[:] = [] OMFITaux['rootGUI'].event_generate("<<update_treeGUI>>") if updateGUI: OMFITaux['rootGUI'].event_generate("<<update_GUI>>") post('remove_all') def delete_item(): # Update list editor GUI ixs = olx.curselection() if not ixs: # Nothing is selected if slx.curselection(): print('Delete removes items from the left hand list; please select from the left list.') return ixs = list(map(int, ixs)) for k in sorted(ixs)[::-1]: slx.delete(k) j = np.where(np.array(selects) == opts[k])[0] if len(j): olx.delete(j[0]) selects.pop(j[0]) opts.pop(k) # Update the tree elements OMFITaux['rootGUI'].event_generate("<<update_treeGUI>>") if updateGUI: OMFITaux['rootGUI'].event_generate("<<update_GUI>>") post('delete_item') def ud(direction): ixs = slx.curselection() if not ixs: # Nothing is selected if slx.curselection(): print('Delete removes items from the left hand list; please select from the left list.') return ixs = ixs[0] if direction == 'u' and (ixs - 1) >= 0: selects.insert(ixs - 1, selects.pop(ixs)) elif direction == 'd' and (ixs + 1) < len(selects): selects.insert(ixs + 1, selects.pop(ixs)) else: return slx.delete(0, tk.END) for item in selects: if isinstance(options, dict): item = reversed_options.get(item, item) slx.insert(tk.END, item) # the tree elements OMFITaux['rootGUI'].event_generate("<<update_treeGUI>>") if updateGUI: OMFITaux['rootGUI'].event_generate("<<update_GUI>>") if direction == 'u': slx.select_set(ixs - 1) slx.see(ixs - 1) else: slx.select_set(ixs + 1) slx.see(ixs + 1) top = ttk.Frame(_aux['parentGUI']) top.pack(side=tk.TOP, expand=tk.NO, fill=tk.X, padx=5, pady=1) frm = ttk.Frame(top) frm.pack(side=tk.LEFT, fill=tk.BOTH, expand=tk.YES) scrollbaro = ttk.Scrollbar(frm) scrollbaro.pack(side=tk.RIGHT, fill=tk.Y) olx = tk.Listbox(frm, selectmode=tk.EXTENDED) olx.config(yscrollcommand=scrollbaro.set) scrollbaro.config(command=olx.yview) olx.pack(side=tk.LEFT, fill=tk.BOTH, expand=tk.YES) for item in opts: olx.insert(tk.END, item) frm = ttk.Frame(top) frm.pack(side=tk.LEFT) bt = ttk.Label(frm, text=lbl) bt.pack(side=tk.TOP, fill=tk.X) frm1 = ttk.Frame(frm) frm1.pack(side=tk.TOP) # help button _helpButton(frm1, help) # web button _urlButton(frm1, url) bt = ttk.Label(frm, text='') bt.pack(side=tk.TOP, fill=tk.X) bt = ttk.Button(frm, text='Add >', command=add, takefocus=False) bt.pack(side=tk.TOP, fill=tk.X) if max is None: bt = ttk.Button(frm, text='Add all >>', command=add_all, takefocus=False) bt.pack(side=tk.TOP, fill=tk.X) frmm = ttk.Frame(frm) frmm.pack(side=tk.TOP, fill=tk.X) btxu = ttk.Button(frmm, text='up', command=lambda: ud('u'), takefocus=False, width=5) if not ordered: btxu.pack(side=tk.LEFT, expand=tk.NO, padx=0) ttk.Label(frmm, text='').pack(side=tk.LEFT, expand=tk.YES, fill=tk.X) btxd = ttk.Button(frmm, text='down', command=lambda: ud('d'), takefocus=False, width=5) if not ordered: btxd.pack(side=tk.LEFT, expand=tk.NO, padx=0) bt.pack(side=tk.TOP, fill=tk.X) bt = ttk.Button(frm, text='Remove <', command=remove, takefocus=False) bt.pack(side=tk.TOP, fill=tk.X) if max is None or max > 1: bt = ttk.Button(frm, text='Remove all <<', command=remove_all, takefocus=False) bt.pack(side=tk.TOP, fill=tk.X) if show_delete_button and isinstance(options, str): bt = ttk.Label(frm, text='') bt.pack(side=tk.TOP, fill=tk.X) bt = ttk.Button(frm, text='Delete', command=delete_item, takefocus=False) bt.pack(side=tk.TOP, fill=tk.X) frm = ttk.Frame(top) frm.pack(side=tk.LEFT, fill=tk.BOTH, expand=tk.YES) scrollbarx = ttk.Scrollbar(frm) scrollbarx.pack(side=tk.LEFT, fill=tk.Y) slx = tk.Listbox(frm, selectmode=tk.EXTENDED) slx.config(yscrollcommand=scrollbarx.set) scrollbarx.config(command=slx.yview) slx.pack(side=tk.LEFT, fill=tk.BOTH, expand=tk.YES) for item in selects: if isinstance(options, dict): item = reversed_options.get(item, item) slx.insert(tk.END, item) def mouse_wheel(event, what): # respond to Linux or Windows wheel event if event.num == 5 or event.delta == -120: what.yview('scroll', 1, 'units') elif event.num == 4 or event.delta == 120: what.yview('scroll', -1, 'units') return 'break' for k in ["<Button-4>", "<Button-5>", "<MouseWheel>"]: olx.bind(k, lambda event, what=olx: mouse_wheel(event, what)) slx.bind(k, lambda event, what=slx: mouse_wheel(event, what)) olx.bind(f"<{rightClick}>", lambda event: _reveal(location, lbl, help)) slx.bind(f"<{rightClick}>", lambda event: _reveal(location, lbl, help)) return olx, slx
[docs]@_available_to_userGUI def Slider( location, start_stop_step, lbl=None, comment=None, digits=None, updateGUI=False, help='', preentry=None, postcommand=None, norm=None, default=special1, url='', kwlabel={}, refresh_every=100, **kw, ): """ This method creates a GUI element of the slider type :param location: location in the OMFIT tree (notice that this is a string) :param start_stop_step: list of tree elements with start/stop/step of the slider :param lbl: Label which is put on the left of the entry :param comment: A comment which appears on top of the entry :param digits: How many digits to use (if None uses 3 digits if start_stop_step has floats or else 0 digits if these are integers) :param updateGUI: Force a re-evaluation of the GUI script when this parameter is changed :param help: help provided when user right-clicks on GUI element (adds GUI button) :param preentry: function to pre-process the data at the OMFIT location to be displayed in the entry GUI element :param postcommand: command to be executed after the value in the tree is updated. This command will receive the OMFIT location string as an input :param default: Set the default value if the tree location does not exist (adds GUI button) :param norm: normalizes numeric variables (overrides `preentry` or `postcommand`) :param url: open url in web-browser (adds GUI button) :param refresh_every: how often to call postcommand function (in ms) :param kwlabel: keywords passed to ttk.Label :return: associated TtkScale object """ collect = False kwlabel = _tk_ttk_process_kw('TLabel', kwlabel) location = _absLocation(location) if isinstance(location, str): location, collection = _for_each_collection(location) default, default_at_import = _setDefault(location, default) frm_top = ttk.Frame(_aux['parentGUI']) frm_top.pack(side=_aux['packing'], expand=[tk.YES, tk.NO][_aux['packing'] == tk.TOP], fill=tk.X, padx=5, pady=1) if comment is not None: frm = ttk.Frame(frm_top) frm.pack(side=tk.TOP, expand=tk.NO, fill=tk.X) _Label(frm, comment).pack(side=tk.LEFT) frm = ttk.Frame(frm_top) frm.pack(side=tk.TOP, expand=tk.NO, fill=tk.X) if lbl is None: lbl = location ttk.Label(frm, text=str(lbl) + " = " * np.sign(len(str(lbl))), **kwlabel).pack(side=tk.LEFT) if norm is not None: def post(location, norm): exec(location + '*=' + str(norm), globals(), locals()) def pre(value, norm): return value / norm preentry = lambda value, norm=norm: pre(value, norm) postcommand = lambda location, norm=norm: post(location, norm) kw = _tk_ttk_process_kw('Horizontal.TScale', kw) def set_location(location, val): if isinstance(eval(val), list) and isinstance(eval(location), np.ndarray) and len(eval(location).shape) == 1: val = 'np.atleast_1d(' + val + ')' exec(location + "=" + val, globals(), locals()) def update_tree_values(location, val, updateGUI): set_location(location, val) if postcommand is not None: manage_user_errors(lambda: postcommand(location=location), reportUsersErrorByEmail=True) OMFITaux['rootGUI'].event_generate("<<update_treeGUI>>") if updateGUI: OMFITaux['rootGUI'].event_generate("<<update_GUI>>") alarm = [None] def command(val): if alarm[0] is not None: OMFITaux['rootGUI'].after_cancel(alarm[0]) alarm[0] = None alarm[0] = OMFITaux['rootGUI'].after(int(refresh_every), lambda: update_tree_values(location, val, updateGUI)) e = TtkScale( frm, from_=start_stop_step[0], to=start_stop_step[1], tickinterval=start_stop_step[2], value=eval(location), digits=digits, command=command, **kw, ) # e.after(500, command) e.pack(side=tk.LEFT, expand=tk.YES, fill=tk.X) if Lock(location, checkLock=True): e.scale.configure(state=tk.DISABLED) e.bind(f"<{rightClick}>", lambda event: _reveal(location, lbl, help)) # e.bind("<ButtonRelease-1>", command) if not Lock(location, checkLock=True): # default button if default is not special1: def writeToEntry(): e.scale.set(default) e.on_configure(None) _aux['parentGUI'].update_idletasks() ttk.Button( frm, text=['d', 'D'][not default_at_import], command=writeToEntry, style='flat.TButton', takefocus=False, state=kw.get('state', 'enabled'), ).pack(side=tk.LEFT, expand=tk.NO, fill=tk.NONE, padx=0, pady=0) # help button _helpButton(frm, help) # web button _urlButton(frm, url) return e
[docs]@_available_to_userGUI def Lock(location, value=special1, checkLock=False, clear=False): """ The lock method prevents users from using a GUI element which would affect a specific location in the OMFIT tree :param location: location in the OMFIT tree (notice that this is a string or a list of strings) If location is None, then all locks are cleared. :param checkLock: False=set the lock | True=return the lock value :param value: lock location at this value :param clear: clear or set the lock :return: None if checkLock=False, otherwise True/False depending on value of the lock """ multiple = not isinstance(location, str) locations = np.atleast_1d(location).tolist() for k, location in enumerate(locations): if checkLock: location = _absLocation(location) locations[k] = location in _GUIs[str(_aux['topGUI'])].locked elif location is None: _GUIs[str(_aux['topGUI'])].locked = [] else: location = _absLocation(location) if not clear: _GUIs[str(_aux['topGUI'])].locked.append(location) if value is not special1: tmp = parseLocation(location) if multiple: eval(buildLocation(tmp[:-1]))[tmp[-1]] = value[k] else: eval(buildLocation(tmp[:-1]))[tmp[-1]] = value elif location in _GUIs[str(_aux['topGUI'])].locked: _GUIs[str(_aux['topGUI'])].locked.remove(location) if checkLock: return np.all(locations)
[docs]@_available_to_userGUI def TitleGUI(title=None): ''' Sets the title to the user gui window (if it's not a compound GUI) :param title: string containing the title :return: None ''' if not _aux['is_compoundGUI']: _aux['topGUI'].wm_title(title)
[docs]@_available_to_userGUI def ShotTimeDevice( postcommand=None, showDevice=True, showShot=True, showTime=True, showRunID=False, multiShots=False, multiTimes=False, showSingleTime=False, checkDevice=None, checkShot=None, checkTime=None, checkRunID=None, subMillisecondTime=False, stopIfNotSet=True, updateGUI=True, ): ''' This high level GUI allows setting of DEVICE/SHOT/TIME of each module (sets up OMFIT MainSettings if root['SETTINGS']['EXPERIMENT']['XXX'] is an expression) :param postcommand: command to be executed every time device,shot,time are changed (location is passed to postcommand) :param showDevice: True/False show device section or list of suggested devices :param showShot: True/False show shot section or list with list of suggested shots :param showTime: True/False show time section or list with list of suggested times :param showRunID: True/False show runID Entry :param multiShots: True/False show single/multi shots :param multiTimes: True/False show single/multi times :param showSingleTime: True/False if multiTimes, still show single time :param checkDevice: check if device user input satisfies condition :param checkShot: check if shot user input satisfies condition :param checkTime: check if time user input satisfies condition :param checkRunID: check if runID user input satisfies condition :param subMillisecondTime: Allow floats as times :param stopIfNotSet: Stop GUI visualization if shot/time/device are not set :return: None ''' if not np.any( [ True if np.iterable(showDevice) else showDevice, True if np.iterable(showShot) else showShot, True if np.iterable(showTime) else showTime, ] ): return tmp = relativeLocations(_GUIs[str(_aux['topGUI'])].pythonFile) root = module = tmp['root'] rootName = moduleName = tmp['rootName'] tmp = parseLocation(rootName) OMFITlocationName = [buildLocation(tmp[: k + 1]) for k, item in enumerate(tmp)] OMFITmodulesName = [] for tmpName in OMFITlocationName: if eval(tmpName).__class__ is OMFITmodule and tmpName != 'OMFIT': OMFITmodulesName.append(tmpName) OMFITmodules = list(map(eval, OMFITmodulesName)) MainSettings = OMFIT['MainSettings'] # point to root['SETTINGS']['EXPERIMENT'][xx] if root['SETTINGS']['EXPERIMENT'][xx] is not an expression # else point to MainSettings['EXPERIMENT'][xx] linked = 'linked' for item in ['device', 'shot', 'time', 'runid', 'shots', 'times']: if item in root['SETTINGS']['EXPERIMENT']: if not isinstance(root['SETTINGS']['EXPERIMENT'][item], OMFITexpression): linked = 'unlinked' break # point to root['SETTINGS']['EXPERIMENT'][xx] # if root['SETTINGS']['EXPERIMENT'][xx] is not an expression # else point to MainSettings['EXPERIMENT'][xx] exp = {} for item in ['device', 'shot', 'time', 'runid', 'shots', 'times']: if item in root['SETTINGS']['EXPERIMENT']: exp[item] = root['SETTINGS']['EXPERIMENT'][item] exp['%s_location' % item] = f"{treeLocation(root)[-1]}['SETTINGS']['EXPERIMENT']" for moduleName in reversed(OMFITmodulesName): module = eval(moduleName) if not isinstance(module['SETTINGS']['EXPERIMENT'][item], OMFITexpression): exp[item] = module['SETTINGS']['EXPERIMENT'][item] exp['%s_location' % item] = moduleName + "['SETTINGS']['EXPERIMENT']" break if not is_none(MainSettings['EXPERIMENT']['shots']) and not np.iterable(MainSettings['EXPERIMENT']['shots']): MainSettings['EXPERIMENT']['shots'] = np.atleast_1d(MainSettings['EXPERIMENT']['shots']) if not is_none(eval(exp['shots_location'])['shots']) and not isinstance(eval(exp['shots_location'])['shots'], np.ndarray): exec(exp['shots_location'] + "['shots']=np.atleast_1d(" + exp['shots_location'] + "['shots'])", globals(), locals()) if not is_none(MainSettings['EXPERIMENT']['times']) and not np.iterable(MainSettings['EXPERIMENT']['times']): MainSettings['EXPERIMENT']['times'] = np.atleast_1d(MainSettings['EXPERIMENT']['times']) if not is_none(eval(exp['shots_location'])['times']) and not isinstance(eval(exp['times_location'])['times'], np.ndarray): exec(exp['times_location'] + "['times']=np.atleast_1d(" + exp['times_location'] + "['times'])", globals(), locals()) def postcommandArray(location=None): exec(location + "=np.unique(np.atleast_1d(" + location + "))", globals(), locals()) return postcommand_mainsettings(location) def postcommand_mainsettings(location): MainSettings['EXPERIMENT'][parseLocation(location)[-1]] = evalExpr(eval(location)) if postcommand is not None: return postcommand(location) oldParent = _aux['parentGUI'] frm = ttk.Frame(oldParent) frm.pack(side=tk.TOP, expand=tk.NO, fill=tk.X) _aux['parentGUI'] = ttk.Frame(frm) _aux['parentGUI'].pack(side=tk.LEFT, expand=tk.YES, fill=tk.X) set_values = {} if np.iterable(showDevice) or showDevice: def checkDeviceFunction(inv): if checkDevice is None: return isinstance(inv, str) else: return isinstance(inv, str) and checkDevice(inv) if np.iterable(showDevice): deviceList = tolist(showDevice) else: deviceList = list(OMFIT['shotBookmarks'].keys()) deviceList = list(np.unique(deviceList)) ComboBox( exp['device_location'] + "['device']", deviceList, 'Device', state=tk.NORMAL, updateGUI=updateGUI, postcommand=postcommand_mainsettings, check=checkDeviceFunction, ) set_values['device'] = eval(exp['device_location'] + "['device']") if np.iterable(showShot) or showShot: if not multiShots: if np.iterable(showShot): shotList = list(np.unique(tolist(showShot))) else: try: shotList = list(map(int, sorted(OMFIT['shotBookmarks'][device].keys()))) except Exception: shotList = [] def checkShotFunction(inv): if checkShot is None: def checkShot_fun(inv): try: shot = int(inv) return True except ValueError: print(f'{inv} is not int') return False else: checkShot_fun = checkShot return is_int(inv) and checkShot_fun(inv) ComboBox( exp['shot_location'] + "['shot']", shotList, 'Shot', state=tk.NORMAL, updateGUI=updateGUI, postcommand=postcommand_mainsettings, check=checkShotFunction, ) set_values['shot'] = eval(exp['shot_location'] + "['shot']") else: def checkShotsFunction(inv): if checkShot is None: return is_int_array(inv) else: return is_int_array(inv) and checkShot(inv) Entry( exp['shots_location'] + "['shots']", 'Shots', updateGUI=updateGUI, postcommand=postcommandArray, check=checkShotsFunction, multiline=True, ) set_values['shots'] = eval(exp['shots_location'] + "['shots']") if np.iterable(showTime) or showTime: if multiTimes: def checkTimesFunction(inv): if subMillisecondTime: if checkTime is None: valid = is_array(inv) else: valid = is_array(inv) and checkTime(inv) else: if checkTime is None: valid = is_int_array(inv) else: valid = is_int_array(inv) and checkTime(inv) if valid: return np.all(np.diff(inv) > 0) Entry( exp['times_location'] + "['times']", 'Times' + ' [ms]', updateGUI=updateGUI, postcommand=postcommandArray, check=checkTimesFunction, multiline=True, help='Enter list of monotonically increasing times for analysis.\nAccepts python statements.\n' 'Example: Enter\n arange(start,stop,step)\nfor a uniform time base.\n\nIt may be useful to start with a smaller ' 'subset of times to get a feel for what is needed, then change to the full set of desired times.', ) set_values['times'] = eval(exp['times_location'] + "['times']") if (multiTimes and showSingleTime) or not multiTimes: if np.iterable(showTime): timeList = list(np.unique(tolist(showTime))) else: try: timeList = list(map(int, sorted(OMFIT['shotBookmarks'][device][str(shot)].keys()))) except Exception: timeList = [] def checkTimeFunction(inv): if subMillisecondTime: if checkTime is None: return is_numeric(inv) else: return is_numeric(inv) and checkTime(inv) else: if checkTime is None: return is_int(inv) else: return is_int(inv) and checkTime(inv) ComboBox( exp['time_location'] + "['time']", timeList, 'Time' + ' [ms]', state=tk.NORMAL, updateGUI=updateGUI, postcommand=postcommand_mainsettings, check=checkTimeFunction, ) set_values['time'] = eval(exp['time_location'] + "['time']") if showRunID: Entry( exp['runid_location'] + "['runid']", 'run-ID', updateGUI=updateGUI, postcommand=postcommand_mainsettings, check=is_alphanumeric ) def addBookmarks(device, shot, time, description=''): if device != None: OMFIT['shotBookmarks'].setdefault(device, namelist.NamelistName()) if device != None and shot != None: OMFIT['shotBookmarks'][device].setdefault(str(shot), namelist.NamelistName()) if device != None and shot != None and time != None: if not description and str(time) in OMFIT['shotBookmarks'][device][str(shot)]: del OMFIT['shotBookmarks'][device][str(shot)][str(time)] printi('Deleted bookmark for %s #%s @ %s' % (device, str(shot), str(time))) elif description: OMFIT['shotBookmarks'][device][str(shot)][str(time)] = description else: printi('Bookmarks: no time') else: printi('Bookmarks: no shot') else: printi('Bookmarks: no device') OMFIT['shotBookmarks'].save() def addBookmarksWithDescription(device, shot, time): def onEscape(location=None): top.destroy() top = tk.Toplevel(_topGUI(_aux['parentGUI'])) top.withdraw() top.transient(_aux['parentGUI']) top.wm_title('Store shot bookmark with description') oldParent = _aux['parentGUI'] _aux['parentGUI'] = top OMFIT['shotBookmarks'].load() try: OMFIT['scratch']['bookmarksDescription'] = OMFIT['shotBookmarks'][device][str(shot)][str(time)] except Exception: description = '' e = Entry("OMFIT['scratch']['bookmarksDescription']", "Shot description", postcommand=onEscape, default='') e.configure(width=30) _aux['parentGUI'] = oldParent top.bind('<Escape>', lambda event: onEscape()) top.deiconify() top.wait_window(top) addBookmarks(device, shot, time, description=OMFIT['scratch']['bookmarksDescription']) del OMFIT['scratch']['bookmarksDescription'] OMFITaux['rootGUI'].event_generate("<<update_GUI>>") def linkUnlinkSettings(linked): if linked == 'linked': printi(rootName + "['SETTINGS']['EXPERIMENT'] is now set only for this module") for item in ['device', 'shot', 'time', 'shots', 'times', 'runid']: root['SETTINGS']['EXPERIMENT'][item] = evalExpr(root['SETTINGS']['EXPERIMENT'][item]) else: printi(rootName + "['SETTINGS']['EXPERIMENT'] inherits from the parent module or MainSettings") for item in ['device', 'shot', 'time', 'shots', 'times', 'runid']: root['SETTINGS']['EXPERIMENT'][item] = OMFITexpression( """try: return_variable=OMFITmodules[-2]['SETTINGS']['EXPERIMENT']['%s'] except Exception: return_variable=MainSettings['EXPERIMENT']['%s'] """ % (item, item) ) OMFITaux['rootGUI'].event_generate("<<update_GUI>>") OMFITaux['rootGUI'].event_generate("<<update_treeGUI>>") frm1 = ttk.Frame(frm) frm1.pack(side=tk.LEFT, expand=tk.NO, fill=tk.BOTH) im = tk.PhotoImage(master=frm1, file=os.path.join(OMFITsrc, 'extras', 'graphics', linked + '.ppm')) b = ttk.Button(master=frm1, text=linked, image=im, command=lambda linked=linked: linkUnlinkSettings(linked), takefocus=False) b._ntimage = im b.pack( side=[tk.LEFT, tk.TOP][ int( np.sum( [np.iterable(showShot) or showShot, np.iterable(showTime) or showTime, np.iterable(showDevice) or showDevice, showRunID] ) > 2 ) ], expand=tk.NO, fill=tk.NONE, ) im = tk.PhotoImage(master=frm1, file=os.path.join(OMFITsrc, 'extras', 'graphics', 'bookmark.ppm')) b = ttk.Button( master=frm1, text='bookmark', image=im, command=lambda device=exp['device'], shot=exp['shot'], time=exp['time']: addBookmarksWithDescription( exp['device'], exp['shot'], exp['time'] ), takefocus=False, ) b._ntimage = im b.pack( side=[tk.LEFT, tk.TOP][ int( np.sum( [np.iterable(showShot) or showShot, np.iterable(showTime) or showTime, np.iterable(showDevice) or showDevice, showRunID] ) > 2 ) ], expand=tk.NO, fill=tk.NONE, ) _aux['parentGUI'] = oldParent if stopIfNotSet: if np.any([not len(list([_f for _f in tolist(k) if evalExpr(_f) is not None])) for k in list(set_values.values())]): Label("Device, shot(s) and time(s) entries can not be None!", foreground='red') End() inv = list(set_values.values())[0] if isinstance(inv, str) and is_device(inv, 'NSTX') and eval(exp['shot_location'] + "['shot']") >= 200000: Label('NSTX shot must be < 200000', foreground='red') End() if isinstance(inv, str) and is_device(inv, 'NSTX-U') and eval(exp['shot_location'] + "['shot']") < 200000: Label('NSTX-U shot must be >= 200000', foreground='red') End() # set data to be harvested set_values['module'] = module['SETTINGS']['MODULE']['ID'] _aux['harvest'].setdefault(moduleName, {}) _aux['harvest'][moduleName] = set_values
[docs]@_available_to_userGUI def CloseGUI(): """ Function for closing the active user GUI """ _clearClosedGUI(_aux['topGUI']) raise EndOMFITpython()
[docs]@_available_to_userTASK def End(what='single'): ''' End execution of OMFITpython script :param what: * 'single' terminates the running script * 'all' terminates the whole workflow ''' if _aux['topGUI'] is not None and not len(OMFITaux['prun_process']): _aux['topGUI'].update_idletasks() if str(what).lower() == 'all': raise EndAllOMFITpython() else: raise EndOMFITpython()
[docs]@_available_to_userGUI def Open(object): ''' Open OMFITascii object in editor or OMFITweb in browser File extension behaviour can be specified in OMFIT['MainSettings']['SETUP']['EXTENSIONS'] :param object: OMFIT object or filename to be opened in external editor ''' OMFITaux['GUI'].openFile(thisObject=object)
[docs]@_available_to_userGUI def Figure(toolbar=True, returnFigure=False, fillX=False, **kw): r""" Embed a matplotlib figure in an OMFIT GUI :param toolbar: [True] show/hide the figure toolbar :param returnFigure: [False] function returns figure `f` or axis `f.add_subplot(111)` :param fillX: [False] fill X dimension of screen :param figsize: (5*2./3., 4*2./3.) figure size :param \**kw: keyword arguments passed to pyplot.Figure """ if fillX: fillX = X else: fillX = tk.NONE frm_top = ttk.Frame(_aux['parentGUI']) frm_top.pack(side=_aux['packing'], fill=tk.X, expand=[tk.YES, tk.NO][_aux['packing'] == tk.TOP]) kw.setdefault('figsize', (5 * 2.0 / 3.0, 4 * 2.0 / 3.0)) f = pyplot.Figure(**kw) from matplotlib.backends.backend_tkagg import FigureCanvasTkAgg from matplotlib.backends._backend_tk import NavigationToolbar2Tk as NavigationToolbar2 canvas = FigureCanvasTkAgg(f, master=frm_top) canvas.get_tk_widget().pack(side=tk.TOP, fill=fillX, expand=tk.YES) if toolbar: toolbar = NavigationToolbar2(canvas, frm_top) toolbar.update_idletasks() canvas._tkcanvas.pack(side=tk.TOP, fill=fillX, expand=tk.YES) OMFITfigure(f) # Add enhancements like superzoom if returnFigure: return f else: return f.add_subplot(111) # OKadd
[docs]@_available_to_userGUI def Dialog(*args, **kw): """ Display a dialog box and wait for user input :param message: the text to be written in the label :param answers: list of possible answers :param icon: "question", "info", "warning", "error" :param title: title of the frame :param options: dictionary of True/False options that are displayed as checkbuttons in the dialog :param entries: dictionary of string options that are displayed as entries in the dialog :return: return the answer chosen by the user (a dictionary if options keyword was passed) """ kw.setdefault('parent', _aux['parentGUI']) return dialog(*args, **kw)
def _setupModule(prefGUI, extraSettings=None): from utils_widgets import OMFITfont root = relativeLocations(_GUIs[str(_aux['topGUI'])].pythonFile)['root'] if not _aux['is_compoundGUI']: root['__scratch__'].setdefault('showSettings', False) root['__scratch__'].setdefault('showStorage', False) def showTools(): try: bckp_parent = _aux['parentGUI'] _aux['parentGUI'] = prefGUI _clearKids(_aux['parentGUI']) frm = ttk.Frame(_aux['parentGUI']) frm.pack(side=tk.TOP, expand=tk.NO, fill=tk.X) orient = tk.TOP if root['__scratch__']['showStorage'] or root['__scratch__']['showSettings']: orient = tk.LEFT def showSettings(): root['__scratch__']['showStorage'] = False root['__scratch__']['showSettings'] = not root['__scratch__']['showSettings'] showTools() frm1 = ttk.Frame(frm) frm1.pack(side=tk.TOP, expand=tk.NO, fill=tk.X) im = tk.PhotoImage(master=frm1, file=os.path.join(OMFITsrc, 'extras', 'graphics', 'settings.ppm')) b = ttk.Button(master=frm1, text='settings', image=im, command=showSettings, takefocus=False) b._ntimage = im b.pack(side=orient, expand=tk.NO, fill=tk.NONE) def showStorage(): root['__scratch__']['showSettings'] = False root['__scratch__']['showStorage'] = not root['__scratch__']['showStorage'] showTools() im = tk.PhotoImage(master=frm1, file=os.path.join(OMFITsrc, 'extras', 'graphics', 'storage.ppm')) b = ttk.Button(master=frm1, text='storage', image=im, command=showStorage, takefocus=False) b._ntimage = im b.pack(side=orient, expand=tk.NO, fill=tk.NONE) if root['__scratch__']['showStorage'] or root['__scratch__']['showSettings']: ttk.Separator(frm).pack(side=tk.TOP, expand=tk.NO, fill=tk.X, padx=5, pady=2) if root['__scratch__']['showSettings']: Label('SETTINGS: ' + treeLocation(root)[-1], font=OMFITfont('bold', -2)) ComboBox( "root['SETTINGS']['REMOTE_SETUP']['serverPicker']", list(SERVER.listServers().keys()), 'server', updateGUI=True ) try: location = "OMFIT['MainSettings']['SERVER']['%s']" % SERVER(root['SETTINGS']['REMOTE_SETUP']['serverPicker']) except Exception: Label('Invalid server %s' % str(root['SETTINGS']['REMOTE_SETUP']['serverPicker'])) for item in list(eval(location).keys()): Entry(location + "[" + repr(item) + "]", item[0].upper() + item[1:], updateGUI=True) if extraSettings: Separator() extraSettings() if root['__scratch__']['showStorage']: Label('STORAGE: ' + treeLocation(root)[-1], font=OMFITfont('bold', -2)) def next_runid(): # find the run_id with the maximum number index0 = 0 for k in root['__STORAGE__'].keys(filter=hide_ptrn): index = int(re.findall('[0-9]+$', k)[-1]) if index > index0: root['SETTINGS']['EXPERIMENT']['runid'] = k # go to the next available runid while root['SETTINGS']['EXPERIMENT']['runid'] in root['__STORAGE__'].keys(filter=hide_ptrn): index = re.findall('[0-9]+$', root['SETTINGS']['EXPERIMENT']['runid']) if len(index): root['SETTINGS']['EXPERIMENT']['runid'] = re.sub( index[-1] + '$', str(int(index[-1]) + 1), root['SETTINGS']['EXPERIMENT']['runid'] ) else: root['SETTINGS']['EXPERIMENT']['runid'] += '1' root['SETTINGS']['EXPERIMENT']['comment'] = '' def reload_comment(location=None): if root['SETTINGS']['EXPERIMENT']['runid'] in root['__STORAGE__'].keys(filter=hide_ptrn): root['SETTINGS']['EXPERIMENT']['comment'] = root['__STORAGE__'][root['SETTINGS']['EXPERIMENT']['runid']][ 'comment' ] # handle runids if 'runid' not in root['SETTINGS']['EXPERIMENT'] or root['SETTINGS']['EXPERIMENT']['runid'] == None: root['SETTINGS']['EXPERIMENT']['runid'] = 'sim1' if '__STORAGE__' in root and root['SETTINGS']['EXPERIMENT']['runid'] in root['__STORAGE__'].keys(filter=hide_ptrn): Button('New run-ID', next_runid, updateGUI=True) runs = SortedDict() if '__STORAGE__' in root: for k in root['__STORAGE__'].keys(filter=hide_ptrn): if 'comment' in root['__STORAGE__'][k]: runs[k + ': ' + root['__STORAGE__'][k]['comment']] = k ComboBox( "root['SETTINGS']['EXPERIMENT']['runid']", runs, 'Run-ID', state='readonly', check=is_string, default='', updateGUI=True, postcommand=reload_comment, ) Separator() Separator() # handle operations state = 'normal' if 'comment' not in root['SETTINGS']['EXPERIMENT'] or not root['SETTINGS']['EXPERIMENT']['comment']: state = 'disabled' Entry("root['SETTINGS']['EXPERIMENT']['comment']", 'Comment', check=is_string, default='', updateGUI=True) if '__STORAGE__' not in root or root['SETTINGS']['EXPERIMENT']['runid'] not in root['__STORAGE__'].keys( filter=hide_ptrn ): Button('Store %s (new)' % root['SETTINGS']['EXPERIMENT']['runid'], root.store, state=state, updateGUI=True) else: Button('Store %s (update)' % root['SETTINGS']['EXPERIMENT']['runid'], root.store, state=state, updateGUI=True) Separator() CheckButton("root['__STORAGE__']['__restoreScripts__']", 'Restore scripts', default=False) Button('Restore %s' % root['SETTINGS']['EXPERIMENT']['runid'], root.restore, updateGUI=True) Separator() Button('Delete %s' % root['SETTINGS']['EXPERIMENT']['runid'], root.destore, updateGUI=True) except Exception: raise finally: _aux['parentGUI'] = bckp_parent OMFITaux['rootGUI'].event_generate("<<update_treeGUI>>") showTools()
[docs]def clc(tag=None): ''' clear console (possible tags are) INFO : forest green HIST : dark slate gray WARNING : DarkOrange2 HELP : PaleGreen4 STDERR : red3 STDOUT : black DEBUG : gold4 PROGRAM_OUT : blue PROGRAM_ERR : purple :param tag: specific tag to clear ''' if OMFITaux['console'] is None: return from omfit_classes.utils_base import _Streams if not tag: OMFITaux['console'].clear() elif tag.upper() in _Streams.tags: OMFITaux['console'].clear(tag) if tag == 'STDERR': for k in range(10): OMFITaux['console'].clear('STDERR' + str(k)) else: printe('clc: console tag `%s` is not recognized. Valid options are:' % tag) for tag in _Streams.tags: printe(' %s: %s' % (tag.ljust(12), _Streams.tags[tag]))
[docs]@_available_to_userGUI def EditASCIIobject(location, lbl=None, comment=None, updateGUI=False, help='', postcommand=None, url='', **kw): """ This method creates a GUI element that edits ASCII files in the OMFIT tree Sample usage:: OMFITx.EditASCIIobject("root['INPUTS']['TRANSP']", 'edit namelist', postcommand=lambda location:eval(location).load()) :param location: location of the ASCII OMFITobject in the OMFIT tree (notice that this is a string) :param lbl: Label which is put on the left of the entry :param comment: A comment which appears on top of the entry :param updateGUI: Force a re-evaluation of the GUI script when this parameter is changed :param help: help provided when user right-clicks on GUI element (adds GUI button) :param postcommand: command to be executed after the value in the tree is updated. This command will receive the OMFIT location string as an input :param url: open url in web-browser (adds GUI button) :return: associated ttk.Button object """ location = _absLocation(location) postcommand_location = reveal_location = location scratchLocation = _absLocation("scratch['%s']" % ('editASCII_' + omfit_hash(location, 10))) try: filename = eval(postcommand_location).filename except Exception: filename = '' frm_top = ttk.Frame(_aux['parentGUI']) frm_top.pack(side=_aux['packing'], expand=[tk.YES, tk.NO][_aux['packing'] == tk.TOP], fill=tk.X, padx=5, pady=1) if comment is not None: frm = ttk.Frame(frm_top) frm.pack(side=tk.TOP, expand=tk.NO, fill=tk.X) _Label(frm, comment).pack(side=tk.LEFT) frm = ttk.Frame(frm_top) frm.pack(side=tk.TOP, expand=tk.NO, fill=tk.X) if lbl is None: lbl = 'Edit ' + location def onDone(location): with open(filename, 'w') as f: f.write(eval(scratchLocation)) del eval(buildLocation(parseLocation(scratchLocation)[:-1]))[parseLocation(scratchLocation)[-1]] if postcommand is not None: postcommand(postcommand_location) def showText(): with open(filename, 'r') as f: eval(buildLocation(parseLocation(scratchLocation)[:-1]))[parseLocation(scratchLocation)[-1]] = f.read() _Text( parent=frm, location=scratchLocation, lbl=lbl, updateGUI=updateGUI, help=help, preentry=None, postcommand=onDone, percolator=filename.endswith('.py'), reveal_location=reveal_location, **kw, ) bttn = ttk.Button(frm, text=lbl, command=showText, takefocus=False).pack(side=tk.LEFT, expand=tk.NO, fill=tk.X, padx=5, pady=1) if len(filename) == 0: bttn.state(['disabled']) return bttn
# --------------------------- # file browsing # ---------------------------
[docs]class FileDialog(object): """ Standard remote file selection dialog -- no checks on selected file. :param directory: directory where to start browsing :param serverPicker: serverPicker wins over server/tunnel settings serverpicker=None will reuse latest server/tunnel that the user browsed to :param server: server :param tunnel: tunnel :param pattern: glob regular expression for files selection :param default: default filename selection :param master: Tkinter master GUI :param lockServer: allow users to change server settings :param focus: what to focus in the GUI ('filterDirs','filterFiles') :param favorite_list_location: OMFIT tree location which contains a possibly empty list of favorite file directories. To keep with the general omfit approach this should be a string. :param pattern_list_location: OMFIT tree location which contains a possibly empty list of favorite search patterns. To keep with the general omfit approach this should be a string. :param is_dir: (bool) Whether the requested file is a directory """ def __init__( self, directory=None, serverPicker='', server='localhost', tunnel='', pattern='*', default='', master=None, lockServer=False, focus='filterDirs', favorite_list_location=None, pattern_list_location=None, is_dir=False, title='File Browser', ): if master is None: master = OMFITaux['rootGUI'] self.master = master self.is_dir = is_dir if serverPicker is None: printd(server, tunnel, topic='FilePicker') if server == 'localhost': server = OMFITaux['lastBrowsed'].setdefault('__lastServer__', server) if tunnel == '': tunnel = OMFITaux['lastBrowsed'].setdefault('__lastTunnel__', tunnel) if serverPicker: server = SERVER[serverPicker]['server'] tunnel = SERVER[serverPicker]['tunnel'] printd(server, tunnel, topic='FilePicker') if not server: serverPicker = 'localhost' server = 'localhost' tunnel = '' self.favorite_list = None if favorite_list_location is not None: # This should work correctly - the self.favorite_list is # set to the variable in the tree and as a result any updates # to this are reflected in the tree entry since lists are # modified in place. # print("favorite_list_location:",favorite_list_location) favorite_list_location = _absLocation(favorite_list_location) try: self.favorite_list = eval(favorite_list_location) except Exception as _excp: printe(_excp) # if there is a problem with evaluation turn off the option print("favorite_list_Location could not be evaluated: %s" % favorite_list_location) favorite_list_location = None else: # if a list is not passed in then turn this off if not isinstance(self.favorite_list, list): self.favorite_list = None favorite_list_location = None self.pattern_list = None if pattern_list_location is not None: # This should work correctly - the self.favorite_list is # set to the variable in the tree and as a result any updates # to this are reflected in the tree entry since lists are # modified in place. # print("pattern_list_location:",pattern_list_location) pattern_list_location = _absLocation(pattern_list_location) try: self.pattern_list = eval(pattern_list_location) except Exception as _excp: printe(_excp) # if there is a problem with evaluation turn off the option print("pattern_list_Location could not be evaluated: %s" % pattern_list_location) pattern_list_location = None else: # if a list is not passed in then turn this off if not isinstance(self.pattern_list, list): self.pattern_list = None pattern_list_location = None self.server0, self.tunnel0 = server, tunnel printd(self.server0, self.tunnel0, topic='FilePicker') self.pattern = pattern self.default = default # set the top level window self.top = tk.Toplevel(master) self.top.withdraw() self.top.transient(master) self.title = title def tab_complete(): def complete(what, options): options = list(map(os.path.abspath, options)) what = os.path.abspath(what) possible_options = [item + os.sep for item in options if item.startswith(what)] return os.path.commonprefix(possible_options), possible_options filter_get = self.get_filter()[0] selection_get = self.selection.get() if os.path.abspath(os.path.split(filter_get)[0]) != os.path.abspath(selection_get) and os.path.abspath( filter_get ) != os.path.abspath(selection_get): self.filter_command(dir=os.path.abspath(os.path.split(filter_get)[0])) completed = complete(filter_get, [os.path.split(filter_get)[0] + os.sep + item for item in self.dirs.get(0, tk.END)]) if completed[0]: self.filterDirs.delete(0, tk.END) self.filterDirs.insert(tk.END, re.sub('//', '/', completed[0])) if len(completed[1]) == 1 and completed[0].endswith('/'): self.filter_command() self.filterDirs.xview(tk.END) return 'break' def fill_server_tunnel(): tmp = self.comboBox.get().split(' -- ')[0] serverTK.set(str(SERVER[tmp]['server'])) tunnelTK.set(str(SERVER[tmp].get('tunnel', ''))) set_server_tunnel() def set_server_tunnel(): serverTK.set(serverTK.get().strip()) try: serverTK.set(SERVER[serverTK.get().strip()]['server']) tunnelTK.set(SERVER[serverTK.get().strip()]['tunnel']) except Exception: tunnelTK.set(tunnelTK.get().strip()) self.server0 = serverTK.get() self.tunnel0 = tunnelTK.get() self.go(directory=None) # Set server string serverTK = tk.StringVar() serverTK.set(self.server0) # Set tunnel string tunnelTK = tk.StringVar() tunnelTK.set(self.tunnel0) # set frame from top level window frm = ttk.Frame(self.top) frm.pack(side=tk.TOP, expand=tk.NO, fill=tk.X) # add another frame for the server and tunnel information frm1 = ttk.Frame(frm) ttk.Label(frm1, text='Fill server/tunnel from: ').pack(side=tk.LEFT) tmp = list(SERVER.listServers().values()) # convert everything to lower case - could be a problem if folks use case to distinguish servers tmp.sort(key=lambda x: x.lower()) tmp = [tmp.pop(tmp.index(SERVER.listServers()['localhost']))] + tmp # setup a comboBox for choosing the server based on the list in mainSettings - SERVER is a global self.comboBox = ttk.Combobox(frm1, state='readonly', values=tmp) self.comboBox.pack(side=tk.LEFT, expand=tk.YES, fill=tk.X) self.comboBox.bind('<<ComboboxSelected>>', lambda event: fill_server_tunnel()) frm1.pack(side=tk.TOP, expand=tk.YES, fill=tk.X) # add aonther frame for the server label and entry frm1 = ttk.Frame(frm) ttk.Label(frm1, text='On server: ').pack(side=tk.LEFT) serverTKGUI = ttk.Entry(frm1, textvariable=serverTK) serverTKGUI.pack(side=tk.LEFT, expand=tk.YES, fill=tk.X) serverTKGUI.bind('<Return>', lambda event: set_server_tunnel()) frm1.pack(side=tk.TOP, expand=tk.YES, fill=tk.X) # add another frame for the tunnel label and entry frm1 = ttk.Frame(frm) ttk.Label(frm1, text='Via tunnel: ').pack(side=tk.LEFT) tunnelTKGUI = ttk.Entry(frm1, textvariable=tunnelTK) tunnelTKGUI.pack(side=tk.LEFT, expand=tk.YES, fill=tk.X) tunnelTKGUI.bind('<Return>', lambda event: set_server_tunnel()) frm1.pack(side=tk.TOP, expand=tk.YES, fill=tk.X) # if server is locked then turn off selection functionality if lockServer: self.comboBox.configure(state='disabled') serverTKGUI.configure(state='disabled') tunnelTKGUI.configure(state='disabled') # add a separator to the main frame ttk.Separator(frm).pack(side=tk.TOP, expand=tk.NO, fill=tk.X, padx=5, pady=2) # add the bottom frame showing the file selection - why is this here? self.botframe = ttk.Frame(self.top) self.botframe.pack(side=tk.BOTTOM, fill=tk.X) # Add another frame to the main window frm = ttk.Frame(self.top) ttk.Separator(frm).pack(side=tk.TOP, expand=tk.NO, fill=tk.X, padx=5, pady=2) frm.pack(side=tk.BOTTOM, fill=tk.X) # add label and entry to show file selection at the bottom ttk.Label(frm, text='Selection: ').pack(side=tk.LEFT) self.selection = ttk.Entry(frm) self.selection.pack(side=tk.LEFT, fill=tk.X, expand=tk.YES) self.selection.bind('<Return>', lambda event: self.ok_command()) # create a middle frame self.midframe = ttk.Frame(self.top) self.midframe.pack(expand=tk.YES, fill=tk.BOTH) # create a scroll bar in the middle frame to scroll file names self.filesbar = ttk.Scrollbar(self.midframe) self.filesbar.pack(side=tk.RIGHT, fill=tk.Y) # create another frame as part of midframe to hold file filter and files frm = ttk.Frame(self.midframe) frm.pack(side=tk.RIGHT, fill=tk.BOTH, expand=tk.YES) # branch on whether pattern_list favorites are asked for if pattern_list_location is None: # add a file filter self.filterFiles = ttk.Entry(frm) self.filterFiles.pack(side=tk.TOP, fill=tk.X) self.filterFiles.bind('<Return>', lambda event: self.filter_command()) # add the files list box self.files = tk.Listbox(frm, exportselection=0, yscrollcommand=(self.filesbar, 'set')) self.files.pack(side=tk.RIGHT, expand=tk.YES, fill=tk.BOTH) # not sure what the first lines are doing - the rest bind event handlers btags = self.files.bindtags() self.files.bindtags(btags[1:] + btags[:1]) self.files.bind('<ButtonRelease-1>', lambda event: self.files_select_event()) self.files.bind('<Double-ButtonRelease-1>', lambda event: self.ok_command()) self.files.bind('<Return>', lambda event: self.ok_command()) # connect the files scrollbar self.filesbar.config(command=(self.files, 'yview')) else: # add extra frames to manage pattern list buttons # These are together because both frma and frmb are in frm # create frame for file filter and filter list buttons # expand is off for frma so it does not expand to file half the space frma = ttk.Frame(frm) frma.pack(side=tk.TOP, fill=tk.X) # create frame for file list frmb = ttk.Frame(frm) frmb.pack(side=tk.BOTTOM, fill=tk.BOTH, expand=tk.YES) # setup a comboBox for choosing the file filter pattern from favorites - add to frame a self.filterFiles = ttk.Combobox(frma, state='normal', values=self.pattern_list) self.filterFiles.pack(side=tk.LEFT, expand=tk.YES, fill=tk.BOTH) self.filterFiles.bind('<<ComboboxSelected>>', lambda event: self.filter_command()) self.filterFiles.bind('<Return>', lambda event: self.filter_command()) # Add add_to_favorite_patterns button self.add_to_favorite_patterns = ttk.Button( frma, text="+", command=lambda: self.manage_list(self.pattern_list, self.filterFiles, '+') ) self.add_to_favorite_patterns.pack(side=tk.LEFT) # Add remove_from_favorite_patterns button self.remove_from_favorite_patterns = ttk.Button( frma, text="-", command=lambda: self.manage_list(self.pattern_list, self.filterFiles, '-') ) self.remove_from_favorite_patterns.pack(side=tk.LEFT) # add the files list box in frame b self.files = tk.Listbox(frmb, exportselection=0, yscrollcommand=(self.filesbar, 'set')) self.files.pack(side=tk.RIGHT, expand=tk.YES, fill=tk.BOTH) # not sure what the first lines are doing - the rest bind event handlers btags = self.files.bindtags() self.files.bindtags(btags[1:] + btags[:1]) self.files.bind('<ButtonRelease-1>', lambda event: self.files_select_event()) self.files.bind('<Double-ButtonRelease-1>', lambda event: self.ok_command()) self.files.bind('<Return>', lambda event: self.ok_command()) # connect the files scrollbar self.filesbar.config(command=(self.files, 'yview')) # add directory list scroll bar self.dirsbar = ttk.Scrollbar(self.midframe) self.dirsbar.pack(side=tk.LEFT, fill=tk.Y) # add another frame for the directory information frm = ttk.Frame(self.midframe) frm.pack(side=tk.RIGHT, fill=tk.BOTH, expand=tk.YES) # branch on whether favorite list is requested for directories if favorite_list_location is None: # add the directory filter self.filterDirs = ttk.Entry(frm) self.filterDirs.pack(side=tk.TOP, fill=tk.X) self.filterDirs.bind('<Return>', lambda event: self.filter_command()) self.filterDirs.bind('<Tab>', lambda event: tab_complete()) self.filterDirs.bind('<Shift-Tab>', lambda event: self.dirs_back_event()) try: self.filterDirs.bind('<ISO_Left_Tab>', lambda event: self.dirs_back_event()) except tk.TclError: pass # add the directory list self.dirs = tk.Listbox(frm, exportselection=0, yscrollcommand=(self.dirsbar, 'set')) self.dirs.pack(side=tk.BOTTOM, expand=tk.YES, fill=tk.BOTH) # bind events for directory selection btags = self.dirs.bindtags() self.dirs.bindtags(btags[1:] + btags[:1]) self.dirs.bind('<ButtonRelease-1>', lambda event: None) self.dirs.bind('<Double-ButtonRelease-1>', lambda event: self.dirs_double_event()) self.dirs.bind('<Return>', lambda event: self.dirs_double_event()) self.dirs.bind('<BackSpace>', lambda event: self.dirs_back_event()) # connect the directory scrollbar self.dirsbar.config(command=(self.dirs, 'yview')) else: # if there is a favorite list add some frames # create frame for directory filter and directory filter buttons frma = ttk.Frame(frm) frma.pack(side=tk.TOP, fill=tk.X) # create frame for list of directories frmb = ttk.Frame(frm) frmb.pack(side=tk.BOTTOM, fill=tk.BOTH, expand=tk.YES) # setup a comboBox for choosing the directory filter pattern from favorites - add to frame a self.filterDirs = ttk.Combobox(frma, state='normal', values=self.favorite_list) self.filterDirs.pack(side=tk.LEFT, expand=tk.YES, fill=tk.BOTH) self.filterDirs.bind('<<ComboboxSelected>>', lambda event: self.filter_command()) self.filterDirs.bind('<Return>', lambda event: self.filter_command()) self.filterDirs.bind('<Tab>', lambda event: tab_complete()) self.filterDirs.bind('<Shift-Tab>', lambda event: self.dirs_back_event()) try: self.filterDirs.bind('<ISO_Left_Tab>', lambda event: self.dirs_back_event()) except tk.TclError: pass # Add add_to_favorite_dirs button self.add_to_favorite_dirs = ttk.Button( frma, text="+", command=lambda: self.manage_list(self.favorite_list, self.filterDirs, '+') ) self.add_to_favorite_dirs.pack(side=tk.LEFT) # Add remove_from_favorite_dirs button self.remove_from_favorite_dirs = ttk.Button( frma, text="-", command=lambda: self.manage_list(self.favorite_list, self.filterDirs, '-') ) self.remove_from_favorite_dirs.pack(side=tk.LEFT) # add the directory list self.dirs = tk.Listbox(frmb, exportselection=0, yscrollcommand=(self.dirsbar, 'set')) self.dirs.pack(side=tk.BOTTOM, expand=tk.YES, fill=tk.BOTH) # bind events for directory selection btags = self.dirs.bindtags() self.dirs.bindtags(btags[1:] + btags[:1]) self.dirs.bind('<ButtonRelease-1>', lambda event: None) self.dirs.bind('<Double-ButtonRelease-1>', lambda event: self.dirs_double_event()) self.dirs.bind('<Return>', lambda event: self.dirs_double_event()) self.dirs.bind('<BackSpace>', lambda event: self.dirs_back_event()) # connect the directory scrollbar self.dirsbar.config(command=(self.dirs, 'yview')) # Add ok button to the bottom of the frame self.ok_button = ttk.Button(self.botframe, text="Ok", command=self.ok_command) self.ok_button.pack(side=tk.LEFT, expand=tk.YES, fill=tk.X) # Add the cancel button to the bottom of the frame self.cancel_button = ttk.Button(self.botframe, text="Cancel", command=self.quit) self.cancel_button.pack(side=tk.RIGHT, expand=tk.YES, fill=tk.X) self.top.protocol('WM_DELETE_WINDOW', self.quit) self.top.bind('<Escape>', lambda event: self.quit()) tk_center(self.top, self.master, 800, 600) self.top.deiconify() self.top.update_idletasks() self.go(directory) if focus == 'filterFiles': self.filterFiles.focus_set() self.filterFiles.icursor(0) elif focus == 'filterDirs': self.filterDirs.focus_set() self.top.wait_window(self.top) self.top.destroy()
[docs] def go(self, directory=None): if directory is None: if self.server0 + '-' + self.tunnel0 in OMFITaux['lastBrowsed']: directory = OMFITaux['lastBrowsed'][self.server0 + '-' + self.tunnel0] else: directory = '' try: serverPicker = SERVER(SERVER[self.server0]['server']) self.comboBox.set(serverPicker + ' -- ' + self.server0) except KeyError: pass self.curdir = directory self.directory = directory self.set_filter(self.directory, self.pattern) self.set_selection(self.default) self.how = None self.filter_command() self.filterDirs.xview(tk.END)
[docs] def quit(self, how=None): if how is None: self.how = None else: self.how = how, self.server0, self.tunnel0 self.top.destroy()
[docs] def dirs_double_event(self): dir, pat = self.get_filter() subdir = self.dirs.get('active') dir = os.path.normpath(os.path.join(dir, subdir)) self.set_filter(dir, pat) self.filter_command()
[docs] def dirs_back_event(self): dir, pat = self.get_filter() dir = os.path.normpath(os.path.join(dir, '..')) self.set_filter(dir, pat) self.filter_command() return 'break'
[docs] def files_select_event(self): file = self.files.get('active') self.set_selection(file)
[docs] def ok_command(self): self.quit(self.get_selection())
[docs] def remote_command(self, command): self.top.title('%s: %s' % (self.title, parse_server(self.server0)[2])) self.username, self.server, self.port = setup_ssh_tunnel(self.server0, self.tunnel0) def ssh_cd(inv): if is_localhost(self.server0): return inv return ( sshOptions() + controlmaster(self.username, self.server, self.port, self.server0) + " -Y -q -p " + self.port + " " + self.username + "@" + self.server + " '" + inv + "'" ) child = subprocess.Popen(ssh_cd(command), shell=True, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE) out, err = map(b2s, child.communicate()) if child.poll() == 0: pass elif child.poll() and len(out.strip()): printw('Warning in file browser:\n\n%s' % err) elif child.poll(): raise Exception('Error in file browser:\n\n%s' % err) return out, err, child.poll()
[docs] def filter_command(self, dir=None): dir0, pat = self.get_filter() if dir is None: dir = dir0 strdir = "\"" + dir + "\"" if dir else '' try: out, err, retcode = self.remote_command("cd {dir} && \\pwd && \\ls -1 -p -L {dir}".format(dir=strdir)) self.filterDirs.configure(foreground='black') except Exception as _excp: printe(_excp) self.filterDirs.configure(foreground='red') return names = list([x.strip() for x in out.strip().split('\n')]) dir = names.pop(0) + os.sep self.directory = dir self.set_filter(dir, pat) OMFITaux['lastBrowsed'][self.server0 + '-' + self.tunnel0] = dir OMFITaux['lastBrowsed']['__lastServer__'] = self.server0 OMFITaux['lastBrowsed']['__lastTunnel__'] = self.tunnel0 names.sort(key=lambda x: x.lower()) if self.directory != '/': subdirs = [os.pardir] else: subdirs = [] if self.is_dir: matchingfiles = ['.'] else: matchingfiles = [] for name in names: fullname = os.path.join(dir, name) if fullname.endswith('/') or os.path.isdir(fullname): subdirs.append(name) elif fnmatch.fnmatch(name, pat): matchingfiles.append(name) self.dirs.delete(0, tk.END) for name in subdirs: self.dirs.insert(tk.END, name) self.files.delete(0, tk.END) for name in matchingfiles: self.files.insert(tk.END, name) head, tail = os.path.split(self.get_selection()) if tail == self.curdir: tail = '' self.set_selection(tail)
[docs] def get_filter(self): return re.sub('//', '/', self.filterDirs.get()), self.filterFiles.get()
[docs] def get_selection(self): selection = self.selection.get().replace('//', '/') if self.is_dir: if not selection.endswith('/'): selection = os.path.dirname(selection) + '/' return selection
[docs] def set_filter(self, dir, pat): pat = pat.strip() if not pat: pat = self.get_filter()[-1] if not pat: pat = "*" self.filterDirs.delete(0, tk.END) self.filterDirs.insert(tk.END, re.sub('//', '/', dir)) self.filterFiles.delete(0, tk.END) self.filterFiles.insert(tk.END, re.sub('//', '/', pat))
[docs] def set_selection(self, file): self.selection.delete(0, tk.END) self.selection.insert(tk.END, re.sub('//', '/', os.path.join(self.directory, file))) self.selection.xview(tk.END)
[docs] def manage_list(self, fav_list, obj, op): # obj is expected to be a combobox object from the FileDialog class (for either filterFiles or filterDirs) # get value from combobox object value = obj.get() if op == '+': printd('FileDialog:manage_list:add:', value) if value not in fav_list: fav_list.append(value) elif op == '-': printd('FileDialog:manage_list:remove:', value) if value in fav_list: fav_list.remove(value) # update combobox selections obj['values'] = fav_list
[docs]class LoadFileDialog(FileDialog): """File selection dialog which checks that the file exists.""" def __init__(self, *args, **kw): self.transferRemoteFile = kw.pop('transferRemoteFile', False) super().__init__(*args, **kw)
[docs] def ok_command(self): file = self.get_selection() self.remote_command('\\ls "{}"'.format(file)) # transfer remote file locally if requested if self.transferRemoteFile and not is_localhost(self.server0): if isinstance(self.transferRemoteFile, str): directory = self.transferRemoteFile else: directory = OMFITcwd + os.sep + 'dir_' + utils_base.now("%Y-%m-%d__%H_%M_%S__%f") while os.path.exists(directory): directory += '_' if not os.path.exists(directory): os.makedirs(directory) remote_downsync(self.server0, file, directory + os.sep, self.tunnel0) self.server0 = 'localhost' self.tunnel0 = '' self.quit(directory + os.sep + os.path.split(file)[1]) else: self.quit(file)
[docs]class SaveFileDialog(FileDialog): """File selection dialog which checks that the file exists before saving."""
[docs] def ok_command(self): file = self.get_selection() try: self.remote_command('\\ls "{}"'.format(file)) answer = Dialog('Overwrite %s ?' % file, answers=['Yes', 'No'], icon='question', title='File exists', parent=self.top) except Exception: answer = 'Yes' if answer == 'Yes': self.quit(file) else: pass
[docs]def remoteFile( parent=None, transferRemoteFile=True, remoteFilename=None, server=None, tunnel=None, init_directory_location=None, init_pattern_location=None, favorite_list_location=None, pattern_list_location=None, is_dir=False, ): ''' Opens up a dialogue asking filename, server/tunnel for remote file transfer This function is mostly used within the framework; for use in OMFIT GUI scripts please consider using the OMFITx.FilePicker and OMFITx.ObjectPicker functions instead. :param parent: Tkinter parent GUI :param transferRemoteFile: [True,False,None] if True the remote file is transferred to the OMFITcwd directory :param remoteFilename: initial string for remote filename :param server: initial string for server :param tunnel: initial string for tunnel :param init_directory_location: The contents of this location are used to set the initial directory for file searches. If a file name is specified the directory will be determined from the file name and this input ignored. Otherwise, if set this will be used to set the initial directory. :param init_pattern_location: The default pattern is '*'. If this is specified then the contents of the tree location will replace the default intial pattern. :param favorite_list_location: OMFIT tree location which contains a possibly empty list of favorite file directories. To keep with the general omfit approach this should be a string. :param pattern_list_location: OMFIT tree location which contains a possibly empty list of favorite search patterns. To keep with the general omfit approach this should be a string. :return: is controlled with transferRemoteFile parameter * string with local filename (if transferRemoteFile==True) * string with the filename (if transferRemoteFile==False) * tuple with the filename,server,tunnel (if transferRemoteFile==None) ''' # if server is specified do not reuse last server serverPicker = None if server is not None: serverPicker = '' default = '' directory = None if remoteFilename is not None: if server is not None: directory, default = os.path.split(remoteFilename) else: default = os.path.split(remoteFilename)[1] directory = None # allow specification of initial directory if directory is None and init_directory_location is not None: # if the initial directory is a non-empty string then assign the # value to the starting directory init_directory_loc = _absLocation(init_directory_location) try: init_directory = eval(init_directory_loc) if init_directory != '' and isinstance(init_directory, str): directory = init_directory except Exception: pass # allow specification of initial pattern if init_pattern_location is not None: # if there is any issue with the evaluation set the search pattern to the default init_pattern_loc = _absLocation(init_pattern_location) try: pattern = eval(init_pattern_loc) except Exception: print("RemoteFile: Problem evaluating the contents of %s" % init_pattern_loc) pattern = '*' else: pattern = '*' if transferRemoteFile: fd = LoadFileDialog( directory=directory, serverPicker=serverPicker, server=server, tunnel=tunnel, pattern=pattern, master=parent, default=default, transferRemoteFile=transferRemoteFile, favorite_list_location=favorite_list_location, pattern_list_location=pattern_list_location, is_dir=is_dir, ) else: fd = FileDialog( directory=directory, serverPicker=None, server=server, tunnel=tunnel, pattern=pattern, master=parent, default=default, favorite_list_location=favorite_list_location, pattern_list_location=pattern_list_location, is_dir=is_dir, ) if fd.how is None: return # return either the filename, or a tuple with filename,sever,tunnel if transferRemoteFile is None if transferRemoteFile is None: return fd.how[0], fd.how[1], fd.how[2] else: return fd.how[0]
# --------------------------- # tool functions # ---------------------------
[docs]@_available_to_userTASK def remote_sysinfo(server, tunnel='', quiet=False): r''' This function retrieves information from a remote server (like the shell which is running there):: {'ARG_MAX': 4611686018427387903, 'QSTAT': '', 'SQUEUE': '/opt/slurm/default/bin/squeue', 'environment': OMFITenv([]), 'id': 6216321643098941518, 'login': ['.cshrc', '.login'], 'logout': ['.logout'], 'shell': 'csh', 'shell_path': '/bin/csh', 'sysinfo': 'csh\nARG_MAX=4611686018427387903\nQSTAT=\nSQUEUE=/opt/slurm/default/bin/squeue\necho: No match.' } Information from the remote server is stored in a dictionary :param server: remote server :param tunnel: via tunnel :param quiet: suppress output or not :return: dictionary with info from the server ''' server0 = server tunnel0 = tunnel username, server, port = setup_ssh_tunnel(server0, tunnel0) def ssh_cd(inv): if is_localhost(server0): return inv return ( sshOptions() + controlmaster(username, server, port, server0) + " -t -t -Y -q -p " + port + " " + username + "@" + server + " '" + inv + "'" ) id_ = omfit_hash(str(username + server + port)) if id_ in OMFITaux['sysinfo'] and len( set(['id', 'sysinfo', 'shell', 'login', 'logout']).difference(set(OMFITaux['sysinfo'][id_].keys())) ): del OMFITaux['sysinfo'][id_] if id_ not in OMFITaux['sysinfo']: command = '' if 'iter.org' in server0: command = ''' touch ~/.nomotdnx touch ~/.nomotd '''.lstrip() command += ''' echo '-----------OMFIT-----------' echo SHELL=$0 echo SHELL_PATH=`which $0` echo ARG_MAX=`getconf ARG_MAX` echo QSTAT=`which qstat | grep -v "not found"` echo SQUEUE=`which squeue | grep -v "not found"` echo '------------ENV------------' env echo '------------ENV------------' '''.strip() if not quiet: printi(command) command = re.sub('\n', ' ; ', command) std_out = [] if not quiet: printi('Collecting remote system info') _system(ssh_cd(command), message='Collecting remote system info', ignoreReturnCode=True, std_out=std_out, quiet=quiet) std_out = [_f for _f in std_out if _f] if not len(std_out): raise Exception( ( 'Could not reach {username}@{server}:{port}\n\n' ' Possible causes:\n' '1. Do you have an account on `{server}` ?\n' ' -> request an account\n' '2. Is `{username}` your username on `{server}`?\n' " -> edit your username under OMFIT['MainSettings']['SERVER']\n" '3. Do you have your SSH keys setup on `{server}`?\n' ' -> run `omfit -s` to setup password-less access\n' ).format(username=username, server=server, port=port) ) OMFITaux['sysinfo'][id_] = {} OMFITaux['sysinfo'][id_]['id'] = id_ OMFITaux['sysinfo'][id_]['sysinfo'] = '\n'.join(std_out) env = False ENV = [] for k in range(len(std_out)): if env is None: if std_out[k].startswith('SHELL='): OMFITaux['sysinfo'][id_]['shell'] = std_out[k].split('=')[1].strip().split('/')[-1] elif std_out[k].startswith('SHELL_PATH='): OMFITaux['sysinfo'][id_]['shell_path'] = std_out[k].split('=')[1].strip() elif std_out[k].startswith('ARG_MAX='): OMFITaux['sysinfo'][id_]['ARG_MAX'] = int(std_out[k].split('=')[1].strip()) elif std_out[k].startswith('QSTAT='): OMFITaux['sysinfo'][id_]['QSTAT'] = std_out[k].split('=')[1].strip() elif std_out[k].startswith('SQUEUE='): OMFITaux['sysinfo'][id_]['SQUEUE'] = std_out[k].split('=')[1].strip() if '-----------OMFIT-----------' in std_out[k]: env = None elif '------------ENV------------' in std_out[k]: env = not env elif env is True: ENV.append(std_out[k]) if not quiet: printi(OMFITaux['sysinfo'][id_]['shell'].upper() + ' shell detected: ' + OMFITaux['sysinfo'][id_]['shell_path']) if 'shell' not in OMFITaux['sysinfo'][id_]: OMFITaux['sysinfo'][id_]['shell'] = '' # overview on http://kb.iu.edu/data/abdy.html if OMFITaux['sysinfo'][id_]['shell'] == 'tcsh': # from http://www.bo.infn.it/alice/alice-doc/mll-doc/impgde/node23.html login = ['/etc/csh.cshrc', '/etc/csh.login', '$HOME/.cshrc', '$HOME/.tcshrc', '$HOME/.login'] logout = ['/etc/csh.logout', '$HOME/.logout'] elif OMFITaux['sysinfo'][id_]['shell'] == 'csh': login = ['.cshrc', '.login'] logout = ['.logout'] elif OMFITaux['sysinfo'][id_]['shell'] in ['sh', 'ksh']: # https://blog.flowblok.id.au/2013-02/shell-startup-scripts.html login = ['/etc/profile', '/etc/ksh.kshrc', '$HOME/.profile'] logout = [] elif OMFITaux['sysinfo'][id_]['shell'] == 'bash': # https://blog.flowblok.id.au/2013-02/shell-startup-scripts.html # from http://shreevatsa.wordpress.com/2008/03/30/zshbash-startup-files-loading-order-bashrc-zshrc-etc/ # echo exit | strace bash -li |& less | grep '^open' login = [ '/etc/profile', '/etc/bash.bashrc', '/etc/bashrc', '$HOME/.bashrc', '$HOME/.bash_profile', '$HOME/.bash_login', '$HOME/.profile', ] logout = ['$HOME/.bash_logout', '/etc/bash.bash_logout'] elif OMFITaux['sysinfo'][id_]['shell'] == 'zsh': # https://blog.flowblok.id.au/2013-02/shell-startup-scripts.html # from http://shreevatsa.wordpress.com/2008/03/30/zshbash-startup-files-loading-order-bashrc-zshrc-etc/ login = [ '/etc/zshenv', '$HOME/.zshenv', '/etc/zprofile', '$HOME/.zprofile', '/etc/zshrc', '$HOME/.zshrc', '/etc/zlogin', '$HOME/.zlogin', ] logout = ['$HOME/.zlogout', '/etc/zlogout'] elif OMFITaux['sysinfo'][id_]['shell'] == 'rc': login = ['.rcrc'] logout = [] else: login_ok = login = [] logout = [] if len(login): # find what login files are there std_out = [] std_err = [] _system( ssh_cd('\\ls -1 ' + ' '.join(login)), message='Collecting remote system info', ignoreReturnCode=True, std_out=std_out, std_err=std_err, quiet=quiet, ) login_ok = [] for line in std_out + std_err: line = line.strip() if ( len(line) and 'cannot' not in line.lower() and 'no such file or directory' not in line.lower() and ': ' not in line.lower() ): login_ok.append(line) if not quiet: printi('Source files at login: ' + ' '.join(login_ok)) OMFITaux['sysinfo'][id_]['login'] = login_ok OMFITaux['sysinfo'][id_]['logout'] = logout OMFITaux['sysinfo'][id_]['environment'] = OMFITenv(string='\n'.join(ENV)) # expand environmental variables defined in the environment on the server side try: serverPicker = SERVER(server0) except (TypeError, NameError): # If running outside the framework pass else: if ( serverPicker in list(SERVER.keys()) and 'workDir' in SERVER[serverPicker] and isinstance(SERVER[serverPicker]['workDir'], str) and '$' in SERVER[serverPicker]['workDir'] ): tmp = evalExpr(SERVER[serverPicker]['workDir']) for item in re.findall(r'\$\w+', tmp): tmp = tmp.replace(item, OMFITaux['sysinfo'][id_]['environment'][item.strip('$')]) OMFIT['MainSettings']['SERVER'][serverPicker]['workDir'] = tmp OMFIT.addMainSettings(updateUserSettings=True) raise OMFITexception('User configuration for `%s` server has been initialized. Try again.' % serverPicker) return OMFITaux['sysinfo'][id_]
[docs]def manage_user_errors(command, reportUsersErrorByEmail=False, **kw): """ This method wraps around users calls of _OMFITpython scripts and manages printout of errors :param command: command to be executed :return: whatever the command returned """ if not callable(command): raise ValueError('manage_user_errors `command` should be a callable function') try: tmp = command(**kw) return tmp, False except (EndOMFITpython, EndAllOMFITpython): return None, False except Exception as handled_exception: etype, value, tb = sys.exc_info() excpStack = traceback.format_exception(etype, value, tb) # record the full exception stack OMFITaux['lastUserError'] = excpStack # find out what is the last user error kuser = None showExcp = False inconsole = False for k in range(len(excpStack)): if 'GlobLoc' in excpStack[k]: showExcp = True elif showExcp: if OMFITcwd in excpStack[k] or re.match(r' File "OMFIT.*", line [0-9]*.*', excpStack[k]): kuser = k if '____console____.py' in excpStack[k]: inconsole = True # if kuser is not found report the whole error if kuser is None: raise # print and highlight the latest user error moduleExceptionText = '' if inconsole: printe("\nException in command box script:") else: try: if OMFITaux['lastRunModule'] != 'OMFIT': eval(OMFITaux['lastRunModule']) printe("\nException in script of module %s:" % str(OMFITaux['lastRunModule'])) moduleExceptionText = ' [' + str(eval(OMFITaux['lastRunModule']).ID) + ']' except Exception: printe("\nException in script:") showExcp = False # simplify exception by hiding the top part of the stack simpleExcp = '' for k, line in enumerate(excpStack): if 'GlobLoc' in line: showExcp = True elif showExcp: if re.match(r' File ".*____console____.py", line [0-9]*.*', line): linenumber = re.findall(r'.*line ([0-9]+)*', line)[0] line = 'Error in command box' + '\n' + line.split('\n')[1] elif os.path.abspath(OMFITcwd) in line: filename = re.findall(r' File "(.*)"', line) if len(filename): filename = os.path.split(filename[0])[1] else: filename = line linenumber = re.findall(r'.*line ([0-9]+)*', line) if len(linenumber): linenumber = '" at line ' + linenumber[0] + '\n' else: linenumber = '' line = 'Error in "' + filename + linenumber + line.split('\n')[1] elif re.match(r' File "', line): continue simpleExcp += line + '\n' if isinstance(handled_exception, OMFITexception): simpleExcp = repr(handled_exception) simpleExcp += '\nPress <F6> to see full error report...\n' # list of developers developers = tolist(OMFIT['MainSettings']['SETUP']['report_to'], empty_lists=[None, '']) try: if OMFITaux['lastRunModule'] != 'OMFIT': developers += eval(OMFITaux['lastRunModule']).contact except Exception as _excp1: printe(_excp1) developers = list(set(developers)) # check if this error is the same as the last error that was reported lastSeen = False if '\n'.join(OMFITaux.setdefault('lastReportedUserError', [''])) == '\n'.join(excpStack): lastSeen = True # files attachments (relevant python scripts, MainSettings, and module settings) files = [] lines = re.findall(r' File ".*", line [0-9]+.*', '\n'.join(excpStack)) for line in lines: filename = re.sub(r' File "(.*)", line [0-9]+.*', r'\1', line) if OMFITcwd in filename: files.append(filename) files = unsorted_unique(files[::-1])[::-1] creator = [''] if files: # see who created the python file by parsing the header '# Created by ...' with open(files[-1], 'r') as f: creator = re.findall(r'\#\s+\w+\s+by\s+\w+\s*.*', '/n'.join(f.readlines()[:5])) creator = [re.sub(r'\#\s+.*\s+by\s+(\w+)\s*.*', r'\1', x) for x in creator] # Notify OMFIT and modules developers with an email # if error occurred when user pushed on a GUI button, # if it is not a SyntaxError (likely that a user has edited a script), # if the user is not part of the developers (of OMFIT or of the module), # if the user has not disabled the report error and # if it is not the same error as the last error that occured # if it is not the user that created the file if ( not isinstance(handled_exception, (doNotReportException, SyntaxError)) and reportUsersErrorByEmail and OMFIT['MainSettings']['SETUP']['error_report'] and OMFIT['MainSettings']['SETUP']['email'] not in developers and not lastSeen and os.environ['USER'] not in creator ): try: # Add MainSettings and module settings to the list of files OMFIT['MainSettings'].save() files.append(OMFIT['MainSettings'].filename) try: if OMFITaux['lastRunModule'] != 'OMFIT': eval(OMFITaux['lastRunModule'])['SETTINGS'].save() files.append(eval(OMFITaux['lastRunModule'])['SETTINGS'].filename) except Exception: pass message = ( 'OMFIT project: {project}\n' 'OMFIT directory: {directory}\n' 'Git commit: {commit}\n' 'Python executable: {executable}\n' 'Hostname: {hostname}\n' 'Error in module location: {module}\n\n' '====================================\n' 'Console output\n' '====================================\n' '{console}\n\n' '===================================\n' 'Exception stack\n' '====================================\n' '{exception}\n\n' ''.format( project=str(OMFIT.filename), directory=OMFITsrc, commit=repo_str, executable=sys.executable, hostname=' | '.join(platform.uname()), module=str('' if OMFITaux['lastRunModule'] == 'OMFIT' else OMFITaux['lastRunModule']), # Note that .split('+------------------------------+')[-1] is done to avoid receiving: # +------------------------------+ +------------------------------+ # | BELOW IS WHAT YOU WERE DOING | and | ABOVE IS WHAT YOU WERE DOING | # +------------------------------+ +------------------------------+ console=OMFITaux['console'].get()[-10000:].split('+------------------------------+')[-1], exception='\n'.join(excpStack), ) ) # send email send_email( to=developers, fromm=OMFIT['MainSettings']['SETUP']['email'], subject='OMFIT user error report' + moduleExceptionText, message=message, attachments=files, ) printt('OMFIT user error report sent to < ' + ', '.join(developers) + ' >') OMFITaux['lastReportedUserError'] = excpStack except Exception as _excp1: printt('Problem sending error report: ' + repr(_excp1)) # print simplified exception to console printe(simpleExcp) OMFITaux['rootGUI'].event_generate("<<update_treeGUI>>") return None, True
def _system( command_line, message='Processing...', fixedWidthDetails='', ignoreReturnCode=False, std_out=None, std_err=None, executable=None, quiet=False, progressFunction=None, extraButtons=None, noGUI=False, ): """ :param command_line: command to be executed :param message: message to appear in the GUI :param fixedWidthDetails: further details to be formatted with a fixed width font, e.g., a script :param ignoreReturnCode: ignore return code of the command :param std_out: if a list is passed (e.g. []), the stdout of the program will be put there line by line :param std_err: if a list is passed (e.g. []), the stderr of the program will be put there line by line :param executable: shell executable, defaults to os.environ['SHELL'] and fallsback on default of subprocess.Popen() :param quiet: switch to turn ON/OFF print to console :param progressFunction: user function to which the std-out of the process is passed and returns values from 0 to 100 to indicate progress towards completion :param extraButtons: dictionary with key/function that is used to add extra buttons to the GUI. The function receives a dictionary with the process std_out and pid :param noGUI: do not show execution in GUI :return: return code of the spawned process """ if os.name == 'nt' or int(os.environ.get('OMFIT_WINDOWS_SYSTEM', '0')): return _system_windows( command_line=command_line, message=message, fixedWidthDetails=fixedWidthDetails, ignoreReturnCode=ignoreReturnCode, std_out=std_out, std_err=std_err, executable=executable, quiet=quiet, progressFunction=progressFunction, extraButtons=extraButtons, noGUI=noGUI, ) else: return _system_unix( command_line=command_line, message=message, fixedWidthDetails=fixedWidthDetails, ignoreReturnCode=ignoreReturnCode, std_out=std_out, std_err=std_err, executable=executable, quiet=quiet, progressFunction=progressFunction, extraButtons=extraButtons, noGUI=noGUI, ) def _system_unix( command_line, message, fixedWidthDetails, ignoreReturnCode, std_out, std_err, executable, quiet, progressFunction, extraButtons, noGUI ): import fcntl import select if executable is None: executable = '/bin/bash' if executable == 'OMFITbash': bashlink = OMFITbinsDir + os.sep + 'OMFITbash' if not os.path.exists(bashlink): with open(bashlink, 'w') as f: f.write('''#!/bin/bash\n/bin/bash -l "$@"\n''') os.chmod(bashlink, 0o700) executable = bashlink if extraButtons is None: extraButtons = {} try: # use original environmental variables before OMFIT messed with them backup_environ = {} for k in ['PATH', 'LD_LIBRARY_PATH', 'DYLD_LIBRARY_PATH']: if k in os.environ: backup_environ[k] = os.environ[k] os.environ[k] = os.environ.get(f'ORIGINAL_{k}', os.environ.get(k, '')) child = subprocess.Popen( command_line, shell=True, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, close_fds=True, executable=executable, preexec_fn=os.setsid, ) # non blocking reading of buffers fcntl.fcntl(child.stdout.fileno(), fcntl.F_SETFL, os.O_NONBLOCK) fcntl.fcntl(child.stderr.fileno(), fcntl.F_SETFL, os.O_NONBLOCK) gbl = {} gbl['std_out'] = '' gbl['pid'] = None if len(OMFITaux['prun_process']) or noGUI or (len(OMFITaux['pythonRunWindows']) and OMFITaux['pythonRunWindows'][-1] is None): gbl['std_out'], std_err_ = map(b2s, child.communicate()) if std_out is not None: std_out.extend(gbl['std_out'].split('\n')) if std_err is not None: std_err.extend(std_err_.split('\n')) else: max_buffer_len = 10000000 show = (OMFITaux['rootGUI'] is None or OMFITaux['console'].show) and not quiet printd(command_line, topic='framework') if OMFITaux['rootGUI'] is not None: tkMessage = tk.StringVar() if isinstance(message, str): tkMessage.set(message) elif isinstance(message, CollectionsCallable): tkMessage.set(message()) tkDetails = tk.StringVar() if fixedWidthDetails: tkDetails.set(fixedWidthDetails) def onKill(): killed.set(True) top.update_idletasks() finished.set(True) top.update_idletasks() def onAbort(): top.update_idletasks() aborted.set(True) finished.set(True) top.update_idletasks() def checkChild(): if gbl['pid'] is None: gbl['pid'] = child.pid elif child.poll() is not None: try: p['value'] = 100 except tk.TclError: pass finished.set(True) if OMFITaux['rootGUI'] is not None: top.destroy() return elif select.select([child.stdout], [], [], 0)[0]: try: tmp = b2s(child.stdout.read()) # return output gbl['std_out'] += tmp gbl['std_out'] = gbl['std_out'][:max_buffer_len] if progressFunction: p['value'] = progressFunction(gbl['std_out']) # output to console if show: tag_print(tmp, tag='PROGRAM_OUT', end='') except IOError: tmp = None if OMFITaux['rootGUI'] is not None and not finished.get(): if isinstance(message, CollectionsCallable): tkMessage.set(message()) if OMFITaux['console'].show: OMFITaux['rootGUI'].after(1, checkChild) else: OMFITaux['rootGUI'].after(100, checkChild) p = {'value': 0, 'maximum': 100} if OMFITaux['rootGUI'] is not None: try: if OMFITaux['pythonRunWindows'][-1] is None: raise RuntimeError('--') top = ttk.Frame(OMFITaux['pythonRunWindows'][-1], borderwidth=2, relief=tk.GROOVE) top.pack(side=tk.TOP, expand=tk.NO, fill=tk.BOTH, padx=5, pady=5) except Exception: top = tk.Toplevel(_topGUI(OMFITaux['rootGUI'])) top.transient(OMFITaux['rootGUI']) top.protocol("WM_DELETE_WINDOW", 'break') if isinstance(message, str): top.wm_title(message) elif isinstance(message, CollectionsCallable): top.wm_title(message()) top.update_idletasks() ttk.Label(top, textvar=tkMessage).pack(side=tk.TOP, expand=tk.NO, fill=tk.X) if fixedWidthDetails: from utils_widgets import OMFITfont fixedWidthDetailsText = tk.Text( top, background=ttk.Style().lookup('TFrame', 'background'), wrap=tk.NONE, font=OMFITfont(size=-2, family='Courier') ) fixedWidthDetailsText.pack(padx=5) fixedWidthDetailsText.insert(1.0, tkDetails.get()) fixedWidthDetailsText.configure( state='disabled', height=min([10, len(tkDetails.get().split('\n'))]), width=80, relief=tk.FLAT ) if progressFunction: p = ttk.Progressbar(top, orient=tk.HORIZONTAL, mode='determinate') p['value'] = 0 p['maximum'] = 100 else: p = ttk.Progressbar(top, orient=tk.HORIZONTAL, mode='indeterminate') p.start() p.pack(padx=5, pady=5, expand=tk.NO, fill=tk.X) frm = ttk.Frame(top) ttk.Button(frm, text="Kill local", command=onKill, takefocus=False).pack(side=tk.LEFT, expand=tk.NO, fill=tk.X) for btt in list(extraButtons.keys()): ttk.Button(frm, text=btt, command=lambda btt=btt: extraButtons[btt](gbl), takefocus=False).pack( side=tk.LEFT, expand=tk.NO, fill=tk.X ) ttk.Button(frm, text="Abort", command=onAbort, takefocus=False).pack(side=tk.LEFT, expand=tk.NO, fill=tk.X) frm.pack(side=tk.TOP) top.update_idletasks() finished = tk.BooleanVar() finished.set(False) aborted = tk.BooleanVar() aborted.set(False) killed = tk.BooleanVar() killed.set(False) if OMFITaux['rootGUI'] is not None: checkChild() top.wait_variable(finished) top.destroy() else: while not finished.get(): checkChild() sleep(0.1) if child.poll() is None and (aborted.get() or killed.get()): try: os.killpg(gbl['pid'], signal.SIGTERM) for k in range(20): if child.poll() is not None: break if k == 10: os.killpg(gbl['pid'], signal.SIGKILL) sleep(0.1) except OSError: pass # show the remaining standard output if std_out is not None or show: if select.select([child.stdout], [], [], 0)[0]: try: tmp = b2s(child.stdout.read()) gbl['std_out'] += tmp gbl['std_out'] = gbl['std_out'][:max_buffer_len] if std_out is not None: std_out.extend(gbl['std_out'].split('\n')) if show: tag_print(tmp, tag='PROGRAM_OUT', end='') except IOError: tmp = None # show the standard error (all at the end) if std_err is not None or show: if select.select([child.stderr], [], [], 0)[0]: try: tmp = b2s(child.stderr.read()) if std_err is not None: std_err_ = tmp[:max_buffer_len] std_err.extend(std_err_.split('\n')) if show: tag_print(tmp, tag='PROGRAM_ERR', end='') except IOError: tmp = None if OMFITaux['rootGUI'] is not None: OMFITaux['rootGUI'].update_idletasks() OMFITaux['rootGUI'].focus_set() if killed.get(): pass elif aborted.get(): raise EndAllOMFITpython('\n\n---> Aborted by user <---\n\n') elif not ignoreReturnCode and child.poll() != 0: raise ReturnCodeException( '\n\nReturn code was ' + str(child.poll()) + ' for command:\n\n' + command_line + '\n\nYou can ignore the return code by setting the keyword `ignoreReturnCode=True`' ) return_val = child.poll() child.stderr.close() child.stdout.close() child.stdin.close() finally: # restore OMFIT-modified version of the environmental variables for k in ['PATH', 'LD_LIBRARY_PATH', 'DYLD_LIBRARY_PATH']: if k in backup_environ: os.environ[k] = backup_environ[k] elif k in os.environ: del os.environ[k] return return_val def _system_windows( command_line, message, fixedWidthDetails, ignoreReturnCode, std_out, std_err, executable, quiet, progressFunction, extraButtons, noGUI ): if 'DYLD_LIBRARY_PATH' in os.environ: print('Removed DYLD_LIBRARY_PATH from os.environ') del os.environ['DYLD_LIBRARY_PATH'] if executable is None: executable = os.environ.get('SHELL', None) if executable == 'OMFITbash': bashlink = OMFITbinsDir + os.sep + 'OMFITbash' if not os.path.exists(bashlink): with open(bashlink, 'w') as f: f.write('''#!/bin/bash\n/bin/bash -l "$@"\n''') os.chmod(bashlink, 0o700) executable = bashlink if extraButtons is None: extraButtons = {} kwarg = dict(stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, close_fds=True, executable=executable) if os.name == 'nt': # only for windows kwarg['creationflags'] = subprocess.CREATE_NEW_PROCESS_GROUP else: kwarg['preexec_fn'] = os.setsid # only for unix gbl = {} gbl['std_out'] = '' gbl['pid'] = None from threading import Thread from queue import Queue, Empty def enqueue_output(out, queue): while True: try: line = out.readline() except ValueError: break if not line: break queue.put(b2s(line)) child = subprocess.Popen(command_line, shell=True, **kwarg) # non blocking reading of buffers # fcntl.fcntl(child.stdout.fileno(), fcntl.F_SETFL, os.O_NONBLOCK) # fcntl.fcntl(child.stderr.fileno(), fcntl.F_SETFL, os.O_NONBLOCK) # non blocking reading of buffers q_out = Queue() t_out = Thread(target=enqueue_output, args=(child.stdout, q_out)) q_err = Queue() t_err = Thread(target=enqueue_output, args=(child.stderr, q_err)) t_out.daemon = True # thread dies with the program t_out.start() t_err.daemon = True # thread dies with the program t_err.start() if len(OMFITaux['prun_process']) or noGUI or (len(OMFITaux['pythonRunWindows']) and OMFITaux['pythonRunWindows'][-1] is None and False): gbl['std_out'], std_err_ = map(b2s, child.communicate()) if std_out is not None: std_out.extend(gbl['std_out'].split('\n')) if std_err is not None: std_err.extend(std_err_.split('\n')) else: max_buffer_len = 10000000 show = (OMFITaux['rootGUI'] is None or OMFITaux['console'].show) and not quiet printd(command_line, topic='framework') if OMFITaux['rootGUI'] is not None: tkMessage = tk.StringVar() if isinstance(message, str): tkMessage.set(message) elif isinstance(message, CollectionsCallable): tkMessage.set(message()) tkDetails = tk.StringVar() if fixedWidthDetails: tkDetails.set(fixedWidthDetails) def onKill(): killed.set(True) top.update_idletasks() finished.set(True) top.update_idletasks() def onAbort(): top.update_idletasks() aborted.set(True) finished.set(True) top.update_idletasks() def checkChild(): if gbl['pid'] is None: gbl['pid'] = child.pid elif child.poll() is not None: try: p['value'] = 100 except tk.TclError: pass finished.set(True) if OMFITaux['rootGUI'] is not None: top.destroy() return else: try: tmp = q_out.get_nowait() # or q.get(timeout=.1) except Empty: tmp = '' gbl['std_out'] += tmp gbl['std_out'] = gbl['std_out'][:max_buffer_len] if progressFunction: p['value'] = progressFunction(gbl['std_out']) # output to console if show and tmp != '': tag_print(tmp, tag='PROGRAM_OUT', end='') if OMFITaux['rootGUI'] is not None and not finished.get(): if isinstance(message, CollectionsCallable): tkMessage.set(message()) if OMFITaux['console'].show: OMFITaux['rootGUI'].after(1, checkChild) else: OMFITaux['rootGUI'].after(100, checkChild) p = {'value': 0, 'maximum': 100} if OMFITaux['rootGUI'] is not None: try: if OMFITaux['pythonRunWindows'][-1] is None: raise RuntimeError('--') top = ttk.Frame(OMFITaux['pythonRunWindows'][-1], borderwidth=2, relief=tk.GROOVE) top.pack(side=tk.TOP, expand=tk.NO, fill=tk.BOTH, padx=5, pady=5) except Exception: top = tk.Toplevel(_topGUI(OMFITaux['rootGUI'])) top.transient(OMFITaux['rootGUI']) top.protocol("WM_DELETE_WINDOW", 'break') if isinstance(message, str): top.wm_title(message) elif isinstance(message, CollectionsCallable): top.wm_title(message()) top.update_idletasks() ttk.Label(top, textvar=tkMessage).pack(side=tk.TOP, expand=tk.NO, fill=tk.X) if fixedWidthDetails: from utils_widgets import OMFITfont fixedWidthDetailsText = tk.Text( top, background=ttk.Style().lookup('TFrame', 'background'), wrap=tk.NONE, font=OMFITfont(size=-2, family='Courier') ) fixedWidthDetailsText.pack(padx=5) fixedWidthDetailsText.insert(1.0, tkDetails.get()) fixedWidthDetailsText.configure( state='disabled', height=min([10, len(tkDetails.get().split('\n'))]), width=80, relief=tk.FLAT ) if progressFunction: p = ttk.Progressbar(top, orient=tk.HORIZONTAL, mode='determinate') p['value'] = 0 p['maximum'] = 100 else: p = ttk.Progressbar(top, orient=tk.HORIZONTAL, mode='indeterminate') p.start() p.pack(padx=5, pady=5, expand=tk.NO, fill=tk.X) frm = ttk.Frame(top) ttk.Button(frm, text="Kill local", command=onKill, takefocus=False).pack(side=tk.LEFT, expand=tk.NO, fill=tk.X) for btt in list(extraButtons.keys()): ttk.Button(frm, text=btt, command=lambda btt=btt: extraButtons[btt](gbl), takefocus=False).pack( side=tk.LEFT, expand=tk.NO, fill=tk.X ) ttk.Button(frm, text="Abort", command=onAbort, takefocus=False).pack(side=tk.LEFT, expand=tk.NO, fill=tk.X) frm.pack(side=tk.TOP) top.update_idletasks() finished = tk.BooleanVar() finished.set(False) aborted = tk.BooleanVar() aborted.set(False) killed = tk.BooleanVar() killed.set(False) if OMFITaux['rootGUI'] is not None: checkChild() top.wait_variable(finished) top.destroy() else: while not finished.get(): checkChild() sleep(0.1) if child.poll() is None and (aborted.get() or killed.get()): try: if os.name == 'nt': child.send_signal(signal.SIGTERM) else: os.killpg(gbl['pid'], signal.SIGTERM) for k in range(20): if child.poll() is not None: break if k == 10: if os.name == 'nt': child.send_signal(signal.SIGKILL) else: os.killpg(gbl['pid'], signal.SIGKILL) sleep(0.1) except OSError: pass # show the remaining standard output if std_out is not None or show: try: tmp = q_out.get_nowait() # or q.get(timeout=.1) except Empty: tmp = '' gbl['std_out'] += tmp gbl['std_out'] = gbl['std_out'][:max_buffer_len] if std_out is not None: std_out.extend(gbl['std_out'].split('\n')) if show and tmp != '': tag_print(tmp, tag='PROGRAM_OUT', end='') # show the standard error (all at the end) if std_err is not None or show: try: tmp = q_err.get_nowait() # or q.get(timeout=.1) except Empty: tmp = '' if std_err is not None: std_err_ = tmp[:max_buffer_len] std_err.extend(std_err_.split('\n')) if show and tmp != '': tag_print(tmp, tag='PROGRAM_ERR', end='') if OMFITaux['rootGUI'] is not None: OMFITaux['rootGUI'].update_idletasks() OMFITaux['rootGUI'].focus_set() if killed.get(): pass elif aborted.get(): raise EndAllOMFITpython('\n\n---> Aborted by user <---\n\n') elif not ignoreReturnCode and child.poll() != 0: raise ReturnCodeException( '\n\nReturn code was ' + str(child.poll()) + ' for command:\n\n' + command_line + '\n\nYou can ignore the return code by setting the keyword `ignoreReturnCode=True`' ) return_val = child.poll() child.stderr.close() child.stdout.close() child.stdin.close() if t_out and t_out.is_alive(): # wait for thread to finish t_out.join(timeout=1) if t_err and t_err.is_alive(): # wait for thread to finish t_err.join(timeout=1) return return_val def _xargs(command, data): data_string = '' commands = [] data = copy.deepcopy(data) while len(data): data_string += data.pop() + ' ' if not len(data) or (len(command) + len(data_string) + len(data[-1]) + len('\n echo Done ') + 9) > ARG_MAX: commands.append(re.sub(r'\{\}', data_string.strip(), command)) data_string = '' if len(commands) > 1: printd('\n'.join(commands), topic='framework') commands = [item + '\necho Done ' + str(k + 1) + '/' + str(len(commands)) for k, item in enumerate(commands)] return commands def _remote_upsync(server, local, remote, tunnel=None, ignoreReturnCode=False, quiet=False): if not len(local): return server0 = server tunnel0 = tunnel username, server, port = setup_ssh_tunnel(server0, tunnel0) if not isinstance(local, list): local = [local] remote = str(remote) local = [re.sub(' ', r'\ ', x) for x in local] local_str = ' '.join(local) if len(local_str) > 128: local_str = local_str[:64] + '...' + local_str[-64:] if not len(server): ret_code = 0 if not quiet: printi('Local copy of: ' + ' '.join(local)) if not os.path.exists(os.path.split(remote)[0]): os.makedirs(os.path.split(remote)[0]) upsync = system_executable('cp') + ' -Rfp {} ' + remote upsync = _xargs(upsync, local) while len(upsync): command = upsync.pop(0) ret_code = _system(command, 'Local copy of:', fixedWidthDetails=local_str, ignoreReturnCode=True, noGUI=True) if ret_code != 0 and not ignoreReturnCode: break else: msg = 'Up-sync to ' + server0 if tunnel0 is not None and tunnel0 != '' and is_localhost(server) and not is_localhost(server0): msg += ' (via ' + tunnel0 + ')' msg += ' of: ' + ' '.join(local) if not quiet: printi(msg) mkdir = ( sshOptions() + controlmaster(username, server, port, server0) + " -Y -q -p " + port + " " + username + "@" + server + " 'mkdir -p " + remote + "'" ) _system(mkdir, 'Building remote directory structure...', ignoreReturnCode=True) upsync = ( "{path} {progress} --copy-links --update --inplace --compress --recursive -e '".format(**system_executable('rsync', True)) + sshOptions() + controlmaster(username, server, port, server0) + " -q -p " + port + "' {} " + username + "@" + server + ":" + remote ) upsync = _xargs(upsync, local) def progressFunction(out): n = float(len(upsync) + 1) try: return int(100 * segment / n + float(re.findall(' [0-9]+% ', out)[-1].strip(' %')) / n) except IndexError: return 0 std_out = [] std_err = [] segment = 0 while len(upsync): command = upsync.pop(0) ret_code = _system( command, 'Up-Sync of:', fixedWidthDetails=local_str, ignoreReturnCode=True, progressFunction=progressFunction, quiet=True, std_out=std_out, std_err=std_err, ) if ret_code != 0 and not ignoreReturnCode: break segment += 1 if ret_code != 0: if not ignoreReturnCode: if ret_code == 255 and (server is not None and len(server)): raise ReturnCodeException( '\n\n' + command + '\n\n---> SSH connection to `%s` via `%s` was terminated <---\n\n' % (server0, tunnel0) ) else: _streams['PROGRAM_ERR'].write(''.join(std_err) + '\n') raise ReturnCodeException( '\n\nReturn code was ' + str(ret_code) + ' for UPSYNC command:\n\n' + command + '\n\nYou can ignore the return code by setting the keyword `ignoreReturnCode=True`' ) return ret_code def _remote_downsync(server, remote, local, tunnel=None, ignoreReturnCode=False, quiet=False): if not len(remote): return server0 = server tunnel0 = tunnel username, server, port = setup_ssh_tunnel(server0, tunnel0) if not isinstance(remote, list): remote = [remote] local = str(local) remote = [re.sub(' ', r'\ ', x) for x in remote] remote_str = ' '.join(remote) if len(remote_str) > 128: remote_str = remote_str[:64] + '...' + remote_str[-64:] if not len(server): ret_code = 0 if not quiet: printi('Local copy of: ' + ' '.join(remote)) if not os.path.exists(os.path.split(local)[0]): os.makedirs(os.path.split(local)[0]) downsync = system_executable('cp') + ' -Rfp {} ' + local downsync = _xargs(downsync, remote) while len(downsync): command = downsync.pop(0) ret_code = _system(command, 'Local copy of:', fixedWidthDetails=remote_str, ignoreReturnCode=True, noGUI=True) if ret_code != 0 and not ignoreReturnCode: break else: msg = 'Down-sync from ' + server0 if tunnel0 is not None and tunnel0 != '' and is_localhost(server) and not is_localhost(server0): msg += ' (via ' + tunnel0 + ')' msg += ' of: ' + ' '.join(remote) if not quiet: printi(msg) if not os.path.exists(os.path.split(local)[0]): os.makedirs(os.path.split(local)[0]) # newer versions of Rsync require remote files to start by colon (that's not supported by OSX) remote_files_with_colon = False remote_placeholder = ":'{}' " if platform.system() != 'Darwin': remote_files_with_colon = True remote_placeholder = "{} " downsync = ( "{path} {progress} --copy-links --update --inplace --compress --recursive -e '".format(**system_executable('rsync', True)) + sshOptions() + controlmaster(username, server, port, server0) + " -q -p " + port + "' " + username + "@" + server + remote_placeholder + local ) if remote_files_with_colon: downsync = _xargs(downsync, [':' + x for x in remote]) else: downsync = _xargs(downsync, remote) def progressFunction(out): n = float(len(downsync) + 1) try: return int(100 * segment / n + float(re.findall(' [0-9]+% ', out)[-1].strip(' %')) / n) except IndexError: return 0 std_out = [] std_err = [] segment = 0 while len(downsync): command = downsync.pop(0) ret_code = _system( command, 'Down-Sync of:', fixedWidthDetails=remote_str, ignoreReturnCode=True, progressFunction=progressFunction, quiet=True, std_out=std_out, std_err=std_err, ) if ret_code != 0 and not ignoreReturnCode: break segment += 1 if ret_code != 0: if not ignoreReturnCode: if ret_code == 255 and (server is not None and len(server)): raise ReturnCodeException( '\n\n' + command + '\n\n---> SSH connection to `%s` via `%s` was terminated <---\n\n' % (server0, tunnel0) ) else: _streams['PROGRAM_ERR'].write(''.join(std_err) + '\n') raise ReturnCodeException( '\n\nReturn code was ' + str(ret_code) + ' for DOWNSYNC command:\n\n' + command + '\n\nYou can ignore the return code by setting the keyword `ignoreReturnCode=True`' ) return ret_code def _shell_escape(command): return re.sub(r'([\$\\\`])', r'\\\1', command) def _bang_commander(command, use_bang_command='OMFIT_run_command.sh', shell='bash', xterm=False): if use_bang_command is True: use_bang_command = 'OMFIT_run_command.sh' bang_command = '\n\\rm -f %s\n' % use_bang_command command = command.strip() if command.startswith('#!'): shell = re.sub('^#!', '', command.split('\n')[0]) bang_command += ( "\n\\cat << __b__MATCHING_EOF__b__ > %s \n" % use_bang_command + _shell_escape(command) + "\n__b__MATCHING_EOF__b__\n" ) else: if shell == 'bash': shell = '/bin/bash -l' elif not shell.startswith('/'): shell = '/bin/' + shell bang_command += ( "\n\\cat << __b__MATCHING_EOF__b__ > %s \n" % use_bang_command + _shell_escape('#!%s\n' % shell + command) + "\n__b__MATCHING_EOF__b__\n" ) if xterm: bang_command += "chmod +x %s && xterm -e $PWD/%s" % (use_bang_command, use_bang_command) else: bang_command += "chmod +x %s && $PWD/%s" % (use_bang_command, use_bang_command) return bang_command, shell
[docs]def execute( command_line, interactive_input=None, ignoreReturnCode=False, std_out=None, std_err=None, quiet=False, arguments='', script=None, use_bang_command='OMFIT_run_command.sh', progressFunction=None, extraButtons=None, ): """ This function allows execution of commands on the local workstation. :param command_line: string to be executed locally :param interactive_input: interactive input to be passed to the command :param ignoreReturnCode: ignore return code of the command :param std_out: if a list is passed (e.g. []), the stdout of the program will be put there line by line :param std_err: if a list is passed (e.g. []), the stderr of the program will be put there line by line :param quiet: print command to screen or not :param arguments: arguments that are passed to the `command_line` :param script: string with script to be executed. `script` option substitutes `%s` with the automatically generated name of the script if `script` is a list or a tuple, then the first item should be the script itself and the second should be the script name :param use_bang_command: Execute commands via `OMFIT_run_command.sh` script (useful to execute scripts within a given shell: #!/bin/...) If `use_bang_command` is a string, then the run script will take that filename. Notice that setting `use_bang_command=True` is not safe for multiple processes running in the same directory. :param progressFunction: user function to which the std-out of the process is passed and returns values from 0 to 100 to indicate progress towards completion :param extraButtons: dictionary with key/function that is used to add extra buttons to the GUI. The function receives a dictionary with the process std_out and pid :return: return code of the command """ if extraButtons is None: extraButtons = {} # add open-terminal button to GUI def remote_terminal(kw, terminal_command): subprocess.Popen(terminal_command, shell=True) terminal_command = "xterm -e \"%s\"" % ('cd ' + os.getcwd() + ';exec ' + os.environ['SHELL']) extraButtons.setdefault('Open terminal', lambda kw={}, terminal_command=terminal_command: remote_terminal(kw, terminal_command)) scriptName = '' if isinstance(script, (list, tuple)): scriptName = script[1] script = script[0] elif isinstance(script, OMFITascii): scriptName = os.path.split(script.filename)[1] with open(script.filename, 'r') as f: script = f.read() if isinstance(script, str): if not len(scriptName) and '%s' not in command_line: raise RuntimeError( '`script` option requires that you place `%s` in the command line location where you want the script filename to appear' ) scriptFile = OMFITascii(scriptName) with open(scriptFile.filename, 'w') as f: f.write(script) os.chmod(scriptFile.filename, os.stat(scriptFile.filename).st_mode | stat.S_IEXEC) # make the script executable by user if '%s' in command_line: command_line = command_line % (os.path.split(scriptFile.filename)[1]) scriptFile.deploy() if isinstance(arguments, str): command_line = command_line + ' ' + arguments else: command_line = command_line + ' '.join(arguments) if interactive_input: command = ( str(command_line.rstrip()) + """ << __c__MATCHING_EOF__c__\n""" + _shell_escape(interactive_input) + """\n__c__MATCHING_EOF__c__\n""" ) else: command = command_line bang_command = command shell = os.path.split(os.environ['SHELL'])[1] if use_bang_command: bang_command, shell = _bang_commander(command, use_bang_command, shell=shell) bang_command = "\\uname -n\n\\pwd\n\\echo local_PID=$$\n\\echo\n" + bang_command msg = 'Local execute' msg += ' of:\n' + command_line.strip() msg += ' in:\n' + os.getcwd() if not quiet: printi(msg) printd(bang_command, topic='execution') ret_code = _system( bang_command, message='Running locally:', fixedWidthDetails=command_line, ignoreReturnCode=ignoreReturnCode, std_out=std_out, std_err=std_err, executable='OMFITbash', quiet=quiet, progressFunction=progressFunction, extraButtons=extraButtons, ) # use /bin/bash because of syntax used by interactive_input return ret_code
[docs]@_available_to_userTASK def remote_execute( server, command_line, remotedir, tunnel=None, interactive_input=None, ignoreReturnCode=False, std_out=None, std_err=None, quiet=False, arguments='', script=None, forceRemote=False, use_bang_command='OMFIT_run_command.sh', progressFunction=None, extraButtons=None, xterm=False, ): """ This function allows execution of commands on remote workstations. It has the logic to check if the remote workstation is the local workstation and in that case executes locally. :param server: server to connect and execute the command :param command_line: string to be executed remotely (NOTE that if server='', the command is executed locally in the local directory) :param remotedir: remote working directory, if remote directory does not exist it will be created :param tunnel: tunnel to go through to connect to the server :param interactive_input: interactive input to be passed to the command :param ignoreReturnCode: ignore return code of the command :param std_out: if a list is passed (e.g. []), the stdout of the program will be put there line by line :param std_err: if a list is passed (e.g. []), the stderr of the program will be put there line by line :param quiet: print command to screen or not :param arguments: arguments that are passed to the `command_line` :param script: string with script to be executed. `script` option substitutes `%s` with the automatically generated name of the script if `script` is a list or a tuple, then the first item should be the script itself and the second should be the script name :param forceRemote: force remote connection even if server is localhost :param use_bang_command: execute commands via `OMFIT_run_command.sh` script (useful to execute scripts within a given shell: #!/bin/...) If `use_bang_command` is a string, then the run script will take that filename. Notice that setting `use_bang_command=True` is not safe for multiple processes running in the same directory. :param progressFunction: user function to which the std-out of the process is passed and returns values from 0 to 100 to indicate progress towards completion :param extraButtons: dictionary with key/function that is used to add extra buttons to the GUI. The function receives a dictionary with the process std_out and pid :param xterm: if True, launch command in its own xterm :return: return code of the command """ if extraButtons is None: extraButtons = {} server0 = server tunnel0 = tunnel username, server, port = setup_ssh_tunnel(server0, tunnel0, forceRemote=forceRemote) remotedir = evalExpr(remotedir) if not server: oldDir = os.getcwd() if not os.path.exists(remotedir): os.makedirs(remotedir) os.chdir(remotedir) try: return execute( command_line, interactive_input=interactive_input, arguments=arguments, ignoreReturnCode=ignoreReturnCode, std_out=std_out, std_err=std_err, quiet=quiet, script=script, use_bang_command=use_bang_command, progressFunction=progressFunction, extraButtons=extraButtons, ) finally: os.chdir(oldDir) scriptName = '' if isinstance(script, (list, tuple)): scriptName = script[1] script = script[0] elif isinstance(script, OMFITascii): scriptName = os.path.split(script.filename)[1] with open(script.filename, 'r') as f: script = f.read() if isinstance(script, str): if not len(scriptName) and '%s' not in command_line: raise RuntimeError( '`script` option requires that you place `%s` in the command line location where you want the script filename to appear' ) scriptFile = OMFITascii(scriptName) with open(scriptFile.filename, 'w') as f: f.write(script) if '%s' in command_line: command_line = command_line % (os.path.split(scriptFile.filename)[1]) if isinstance(arguments, str): command_line = command_line + ' ' + arguments else: command_line = command_line + ' '.join(arguments) if interactive_input: command = str(command_line) + ' << __a__MATCHING_EOF__a__\n' + _shell_escape(interactive_input) + '\n__a__MATCHING_EOF__a__\n' else: command = command_line sysinfo = remote_sysinfo(server0, tunnel0, quiet=quiet) bang_command = command shell = sysinfo['shell'] if use_bang_command or xterm: bang_command, shell = _bang_commander(command, use_bang_command, shell=shell, xterm=xterm) header = "\\uname -n;\\pwd;\\echo remote_PID=$$;\\echo;" tmp_files = [] def ssh_cd(inv, tty): if tty: tty = ' -t -t ' else: tty = '' inv = "\nmkdir -p " + remotedir + "\ncd " + remotedir + "\n\n" + inv + "\n\n" tmp_files.append(tempfile._get_default_tempdir() + os.sep + next(tempfile._get_candidate_names())) with open(tmp_files[-1], 'w') as f: f.write(inv) printd(inv, topic='execution') ssh_inv = ( 'cat %s | ' % tmp_files[-1] + sshOptions() + controlmaster(username, server, port, server0) + tty + "-Y -q -p " + port + " " + username + "@" + server + " '%s%s -s'" % (header, shell) ) printd(ssh_inv, topic='execution') return ssh_inv msg = 'Remote execute on ' + server0 if tunnel0 is not None and tunnel0 != '' and is_localhost(server) and not is_localhost(server0): msg += ' (via ' + tunnel0 + ')' msg += ' of:\n' + command_line.strip() if not quiet: printi(msg) # add kill remote button to GUI def kill_remote(kw, kill_command): if kw['std_out'] is None or not len(kw['std_out']): printw('Remote process has not started yet. Nothing to kill.') else: found = re.search('remote_PID=[0-9]+', kw['std_out']).group() if not found: printw('Remote process has not started yet. Nothing to kill.') return kill_command = kill_command.format(PID=found.split('=')[1]) subprocess.Popen(kill_command, shell=True) printi('Issued kill remote PID ' + found.split('=')[1]) # send INT to the process group, allow 2 seconds, send KILL to the process group kill_command = ( sshOptions() + controlmaster(username, server, port, server0) + " -q -p " + port + " " + username + "@" + server + " 'kill -INT -{PID}; sleep 2; kill -KILL -{PID}'" ) extraButtons.setdefault('Kill remote', lambda kw={}, kill_command=kill_command: kill_remote(kw, kill_command)) # add open-terminal button to GUI def remote_terminal(kw, terminal_command): subprocess.Popen(terminal_command, shell=True) terminal_command = "xterm -e \"%s\"" % (ssh_cd('exec ' + sysinfo['shell'], True)) extraButtons.setdefault('Open terminal', lambda kw={}, terminal_command=terminal_command: remote_terminal(kw, terminal_command)) # logic for setting TTY tty = not use_bang_command and interactive_input if isinstance(script, str): _remote_upsync(server0, scriptFile.filename, remotedir, tunnel=tunnel0, ignoreReturnCode=ignoreReturnCode) ret_code = _system( ssh_cd(bang_command, tty), message='Running remotely:', fixedWidthDetails=command_line, ignoreReturnCode=True, std_out=std_out, std_err=std_err, executable='OMFITbash', quiet=quiet, progressFunction=progressFunction, extraButtons=extraButtons, ) # use /bin/bash because of syntax used by interactive_input for file in tmp_files: if os.path.exists(file): os.remove(file) if ret_code != 0: if not ignoreReturnCode: if ret_code == 255: raise ReturnCodeException( '\n\n' + command + '\n\n---> SSH connection to `%s` via `%s` was terminated <---\n\n' % (server0, tunnel0) ) else: raise ReturnCodeException( '\n\nReturn code was ' + str(ret_code) + ' for REMOTE command:\n\n' + ssh_cd(command, tty) + '\n\nYou can ignore the return code by setting the keyword `ignoreReturnCode=True`' ) return ret_code
[docs]@_available_to_userTASK def remote_upsync(server, local, remote, tunnel=None, ignoreReturnCode=False, keepRelativeDirectoryStructure='', quiet=False): """ Function to upload files/directories to remote server (possibly via tunnel connection) NOTE: this function relies on rsync. There is no way to arbitrarily rename files with rsync. All rsync can do is move files to a different directory. :param server: server to connect and execute the command :param local: local file(s) (string or list strings) to upsync :param remote: remote directory or file to save files to :param tunnel: tunnel to go through to connect to the server :param ignoreReturnCode: whether to ignore return code of the rsync command :param keepRelativeDirectoryStructure: string with common based directory of the locals files to be removed (usually equals `local_dir`) :param quiet: print command to screen or not :return: return code of the rsync command (or True if keepRelativeDirectoryStructure and ignoreReturnCode and some rsync fail) """ local = tolist(local) if keepRelativeDirectoryStructure: bases = [str(x).replace(str(keepRelativeDirectoryStructure), '').lstrip(os.sep) for x in local] base_dirs = set([os.path.split(x)[0] for x in bases]) ret_codes = [] for bd in base_dirs: individual = [str(keepRelativeDirectoryStructure) + os.sep + f for f in bases if os.path.split(f)[0] == bd] ret_codes.append( _remote_upsync( server, individual, remote + os.sep + bd + os.sep, tunnel=tunnel, ignoreReturnCode=ignoreReturnCode, quiet=quiet ) ) return 0 if not np.any(ret_codes