Source code for classes.OMFITx

# file processed by 2to3
from __future__ import print_function, absolute_import

print('Loading OMFIT APIs...')

try:
    # framework is running
    from .startup_choice import *
    # Don't do this if building the documentation
    if not numpy.any([('sphinx' in k and not 'sphinxcontrib' in k) for k in sys.modules]):
        # here we do a `from pylab import *` to allow GUIs to evaluate user input
        from pylab import *
except (ValueError, SystemError):  # catch error in Python2.x
    # class is imported by itself
    from startup_choice import *
except ImportError as _excp:  # catch error in Python3.x
    # class is imported by itself
    if 'attempted relative import with no known parent package' in str(_excp) or 'No module named \'classes\'' in str(_excp) or "No module named '__main__.startup_choice'" in str(_excp):
        from startup_choice import *
    else:
        raise

from utils_widgets import _defaultFont, OMFITfont
from classes import utils_base
from classes.utils_base import _streams
from classes.omfit_base import *
from classes.omfit_data import *
from classes import omfit_mds
from classes.omfit_mds import *
from classes.omfit_ascii import OMFITascii
from classes.omfit_environment import OMFITenv
from utils_tk import tk, ttk

from classes.omfit_weblink import openInBrowser
from glob import glob as _glob
import numpy
import re
import copy
import subprocess
import fcntl
import select
if sys.version_info < (3, 0):
    from collections import Callable as CollectionsCallable
else:
    from collections.abc import Callable as CollectionsCallable

# Decorator @_available_to_user is used to define which functions should appear in the OMFIT documentation
def _available_to_userTASK(f):
    OMFITaux.setdefault('OMFITxTASK_functions',[])
    OMFITaux['OMFITxTASK_functions'].append(f)
    OMFITaux['OMFITxTASK_functions'].sort(key=lambda x:str(x).lower())
    return f

def _available_to_userGUI(f):
    OMFITaux.setdefault('OMFITxGUI_functions',[])
    OMFITaux['OMFITxGUI_functions'].append(f)
    OMFITaux['OMFITxGUI_functions'].sort(key=lambda x:str(x).lower())
    return f

#This dictionary is used to keep track of the open GUIs
_GUIs={}

# This dictionary is used to keep track of the inner workings of the active GUI
_aux = {}
_aux['topGUI'] = None
_aux['open_tabs'] = {}
_aux['parentGUI'] = None
_aux['is_compoundGUI'] = False
_aux['notebook'] = None
_aux['tab_list'] = {}
_aux['tab_name'] = ''
_aux['configure_size'] = []
_aux['harvest'] = {}
_aux['same_row'] = None
_aux['packing'] = 'top' #tk.TOP # we use 'top' instead of 'tk.top' to avoid importing tk environment when only OMFIT classes are loaded

#------------------------------------
# Nice representation of floats
#------------------------------------
_originalPrintOptions = numpy.get_printoptions()

[docs]def repr(value): ''' repr modified to work for GUI functions ''' if xarray is not None and isinstance(evalExpr(value), DataArray): value = value.values if isinstance(value, numpy.ndarray) and len(value.shape) == 0: value = numpy.atleast_1d(value)[0] numpy.set_printoptions(formatter={'float': repr}) try: if isinstance(evalExpr(value), float): if numpy.isnan(evalExpr(value)): value = 'nan' else: value = '{:.15g}'.format(value) if '.' not in value and 'e' not in value: tmp = re.sub('[1-9]', 'x', value[::-1]) if 'x' in tmp: index = int((tmp.index('x')) // 3 * 3) if index != 0: value = value[:-index] + 'e{:d}'.format(index) else: value = value + '.0' else: value = value + '.0' elif isinstance(value, numpy.ndarray) and len(value.shape) == 1: value = '[' + ', '.join(map(repr, numpy.atleast_1d(value))) + ']' else: value = builtins.repr(value) finally: numpy.set_printoptions(**_originalPrintOptions) return value
[docs]def repr_eval(location, preentry=None, collect=False): ''' evaluate location and provide representation ''' value=_eval(location) if preentry is None: def preentry(val): return val if not isinstance(location,basestring) and collect: if numpy.all(numpy.atleast_1d(value)==value[0]): value=value[0] return repr(preentry(value))
#------------------------------------ # GUI management #------------------------------------
[docs]class GUI(object): """ This class creates a new GUI. It is used internally by OMFIT when a OMFITpythonGUI object is executed :param pythonFile: OMFITpythonGUI object to be executed :return: None """ def __init__(self,pythonFile,**kw): top = tk.Toplevel(OMFITaux['rootGUI']) top.withdraw() top.transient(OMFITaux['rootGUI']) top.protocol("WM_DELETE_WINDOW", lambda top=top:_clearClosedGUI(top)) top.wm_title(treeLocation(pythonFile)[-1]) #register GUI _GUIs[str(top)]=self self.top=top self.pythonFile=pythonFile self.notebook_list=[] self.kw=kw self.locked=[] self.relativeLocations=relativeLocations(self.pythonFile) try: thereWasError=True updateGUI(top) top.update_idletasks() tk_center(top,parent=OMFITaux['rootGUI']) top.deiconify() top.lift() thereWasError=False except Exception: raise finally: if thereWasError: _clearClosedGUI(top)
[docs] def update(self): try: #if there was a notebook, check what tabs were selected _aux['open_tabs']={} if len(self.notebook_list): for pf,nb in self.notebook_list: index=nb.tabs().index(nb.select()) _aux['open_tabs'][pf]=index top=self.top _aux['topGUI']=top if hasattr(self,'parentGUI'): #this is executed for GUI redraws #delete everything inside the existing scrollable canvas _clearKids(self.parentGUI) _clearKids(self.prefFrame) canvas=self.canvas taskGUIframeInterior=self.taskGUIframeInterior interior_id=self.interior_id yscrollbar=self.yscrollbar prefFrame=self.prefFrame else: #this is executed at the first GUI creation #create an empty scrollable canvas top.grid_rowconfigure(0, weight=1) top.grid_columnconfigure(0, weight=1) self.yscrollbar = yscrollbar = ttk.Scrollbar(top) yscrollbar.grid(row=0, column=1, sticky=tk.N+tk.S+tk.W) self.canvas = canvas = tk.Canvas(top, bd=0, yscrollcommand=yscrollbar.set) canvas.grid(row=0, column=0, sticky=tk.N+tk.S+tk.E+tk.W) yscrollbar.config(command=canvas.yview) self.taskGUIframeInterior = taskGUIframeInterior = ttk.Frame(canvas) taskGUIframeInterior.pack(side=tk.LEFT, expand=tk.YES, fill=tk.BOTH) self.interior_id = interior_id = canvas.create_window(0, 0, anchor=tk.NW, window=taskGUIframeInterior) self.parentGUI=taskGUIframeInterior #preference window self.prefFrame = prefFrame = ttk.Frame(top) prefFrame.grid(row=0, column=2, sticky=tk.N+tk.S) #mousewheel scroll on users GUI def mouse_wheel(event): # respond to Linux or Windows wheel event if event.num == 5 or event.delta == -120: canvas.yview('scroll', 1, 'units') if event.num == 4 or event.delta == 120: canvas.yview('scroll', -1, 'units') return 'break' top.bind("<MouseWheel>", mouse_wheel) top.bind("<Button-4>", mouse_wheel) top.bind("<Button-5>", mouse_wheel) taskGUIframeInterior.pack_propagate(0) # #add setupModule buttons # if self.relativeLocations['root'] is not OMFIT: # try: # _setupModule(prefFrame) # except Exception as _excp: # printe('Error in setupModule GUI: '+repr(_excp)) # initialize things for compound GUIs, tabs, notebooks and configuration of sizes _aux['parentGUI'] = self.parentGUI _aux['is_compoundGUI'] = False _aux['notebook'] = None _aux['tab_name'] = '' _aux['tab_list'] = {} _aux['tab_list'][''] = _aux['parentGUI'] _aux['configure_size'] = [] old_same_row = _aux['same_row'] _aux['same_row'] = None old_packing = _aux['packing'] _aux['packing'] = tk.TOP _GUIs[str(_aux['topGUI'])].notebook_list = [] _GUIs[str(_aux['topGUI'])].locked = [] # execute the main pythonGUI self.kw['compoundGUI'] = False self.pythonFile.__run__(**self.kw) _aux['same_row'] = old_same_row _aux['packing'] = old_packing #handle resizing def configure_size(event=None): canvas.unbind('<Configure>') canvas.update_idletasks() canvas.configure(scrollregion=(0, 0, top.winfo_width(), taskGUIframeInterior.winfo_reqheight())) GUIheight=min([taskGUIframeInterior.winfo_reqheight(),int(OMFITaux['rootGUI'].winfo_height()*0.9)]) GUIwidth=max([taskGUIframeInterior.winfo_reqwidth(),top.winfo_width()-yscrollbar.winfo_reqwidth()-prefFrame.winfo_reqwidth()]) canvas.itemconfigure(interior_id, width = GUIwidth, height = taskGUIframeInterior.winfo_reqheight()) canvas.configure(width = GUIwidth, height = GUIheight) top.configure( width = GUIwidth+yscrollbar.winfo_reqwidth()+prefFrame.winfo_reqwidth(), height = GUIheight) for item,action in _aux['configure_size']: try: item.bind('<Configure>', action) except tk.TclError: pass _aux['configure_size']=[] canvas.update_idletasks() canvas.bind('<Configure>', configure_size) taskGUIframeInterior.pack_propagate(1) configure_size() OMFITaux['rootGUI'].event_generate("<<update_treeGUI>>") except Exception: #if anything goes wrong close the GUI and raise the error _clearClosedGUI(self.top) raise
def _clearClosedGUI(top): """ This method is used internally by OMFIT to clear the data of GUIs which have been closed by users :param top: TkInter ID of the closed GUI :return: None """ if str(top) in _GUIs: del _GUIs[str(top)] try: _clearKids(top) top.destroy() except Exception: pass def _clearKids(top): """ Recursive cleanup of all elements under TkInter ID :param top: TkInter ID of the parent whose kids must be cleared :return: None """ try: top.winfo_children() except Exception: return for kid in top.winfo_children(): _clearKids(kid) for kid in top.winfo_children(): try: kid.pack_forget() except Exception: pass kid.destroy() def _absLocation(location): """ This method is used internally by OMFIT to translate relative locations used in a OMFITpythonGUI script to absolute locations in the OMFIT tree. :param location: string with relative/absolute location in the OMFIT tree :return: absolute location string in the OMFIT tree """ return absLocation(location,relativeLocations(_GUIs[str(_aux['topGUI'])].pythonFile),base_is_relativeLocations_output=True)
[docs]@_available_to_userGUI def UpdateGUI(top=None): """ Function used to update users GUIs :param top: TopLevel tk GUI to be updated """ if top is None: topList=[_GUIs[k].top for k in list(_GUIs.keys())] else: topList=[top] for _aux['topGUI'] in topList: _GUIs[str(_aux['topGUI'])].update()
[docs]@_available_to_userGUI def Refresh(): ''' Force a refresh of the OMFIT GUI by issuing a TkInter .update() ''' if OMFITaux['GUI'] is not None and not len(OMFITaux['prun_process']): OMFITaux['console'].flush()
[docs]@_available_to_userGUI def CloseAllGUIs(): """ Function for closing all users GUIs """ for k in list(_GUIs.keys()): top=_GUIs[k].top _clearClosedGUI(top)
def _topGUI(item=None): """ Function to reach the TopLevel tk from a GUI element :param item: tk GUI element :return: TopLevel tk GUI """ if item is None: item=_aux['parentGUI'] return TKtopGUI(item) def _tk_ttk_process_kw(type, kw): ''' converts tk configure attributes to ttk styles :param type: one of the ttk_styles :param kw: tk configure instructions :return: custom ttk style ''' if not len(kw) or 'style' in kw: if 'style' in kw and type not in kw['style']: kw['style'] = kw['style'] + '.' + type return kw if 'fg' in kw: kw['foreground'] = kw.pop('fg') if 'bg' in kw: kw['background'] = kw.pop('bg') style_name = custom_ttk_style(type, **kw) kw.pop('foreground',None) kw.pop('background',None) kw['style'] = style_name return kw def _ttk_tk_process_kw(kw): if 'foreground' in kw: kw['fg'] = kw.pop('foreground') return kw #--------------------------- # GUI elements #--------------------------- def _harvest_experiment_info(extra_info={}): ''' harvest experiment info ''' from classes.omfit_base import harvest_send project=OMFIT.prj_options for item in list(_aux['harvest'].keys()): if OMFIT.filename is not None and len(OMFIT.filename): if 'device' in _aux['harvest'][item]: _aux['harvest'][item]['_tag']=tokamak(_aux['harvest'][item]['device']) for k in ['color','type','notes']: if k in project: _aux['harvest'][item]['_'+k]=project[k] _aux['harvest'][item]['project']=OMFIT.filename _aux['harvest'][item]['user']=os.environ['USER'] _aux['harvest'][item].update(extra_info) for k in _aux['harvest'][item]: try: _aux['harvest'][item][k]=evalExpr(_aux['harvest'][item][k]) except Exception: printd('Error harvesting %s'%k,topic='harvest') host,port=evalExpr(SERVER['gadb-harvest']['HARVEST_server']).split(':') harvest_send(_aux['harvest'][item],'omfit_experiment',host=host,protocol='UDP') del _aux['harvest'][item] def _for_each_collection(location): loc=parseLocation(location) where=[loc.pop(0)] loc0=buildLocation(where) for k in range(1,len(loc)): where+=[loc.pop(0)] loc0=buildLocation(where) if isinstance(eval(loc0),OMFITcollection) and loc[0] not in eval(loc0).KEYS(): break if isinstance(eval(loc0),OMFITcollection): locs=[] for k in eval(loc0).KEYS(): locs.append( buildLocation(where+[k]+loc) ) return locs,True return location,False def _setDefault(location, default=special1): """ This method is used internally by OMFIT to set the default values of GUI elements :param location: location in the OMFIT tree (can be multiple locations) :param default: Default value (multiple defaults if multiple locations) :return: None """ default_at_import = False if default is special1 and isinstance(location, basestring): try: default = eval(location.replace('SETTINGS','__SETTINGS_AT_IMPORT__')) default_at_import = True except KeyError: pass if default is not special1 and not (not isinstance(location, basestring) and numpy.iterable(location) and numpy.iterable(default) and len(default) and next(default.__iter__()) is special1): updated = False if isinstance(location, basestring): try: eval(location) except Exception: OMFIT.addBranchPath(location) eval(buildLocation(parseLocation(location)[:-1]))[parseLocation(location)[-1]] = default updated = True elif numpy.iterable(location): if len(location) != len(default): raise Exception('GUI error: Length of defaults must equal length of locations') for k, loc in enumerate(location): try: eval(loc) except Exception: OMFIT.addBranchPath(loc) eval(buildLocation(parseLocation(loc)[:-1]))[parseLocation(loc)[-1]] = default[k] updated = True return default, default_at_import def _reveal(location=None, lbl=None, help=None): ''' Reveals path/value of OMFIT GUI elements :param location: location (already pre-processed by _absLocation) :param lbl: human readeable description :param help: help ''' if lbl is None: lbl = '' lbl = tolist(lbl) if callable(location): location_str = str(location) if '<lambda' in location_str: location_str = 'Calls lambda function defined as:\n' + inspect.getsource(location) if '<function' in location_str: location_str = 'Calls function defined as:\n' + inspect.getsource(location) location = location_str locations = tolist(location) lbls = lbl * len(locations) for lbl, location in zip(lbls, locations): loc = str(location) if OMFITcwd in loc: loc = os.path.split(loc)[1] value = '' if location.startswith('OMFIT['): try: if eval(location) is None or isinstance(eval(location), (int, float, basestring)): value = ' = ' + repr(eval(location)) except Exception: pass printi('* %s --> %s%s' % (lbl, loc, value)) if help is not None and len(help): printi('== HELP ==') printi(' ' + '\n '.join(help.split('\n'))) def _eval(location): ''' Same as eval() but works also when location is a list of locations ''' multiple = not isinstance(location, basestring) locations = numpy.atleast_1d(location).tolist() for k, location in enumerate(locations): locations[k] = eval(location) if multiple: return locations else: return locations[0] def _Entry(parent, location, lbl=None, updateGUI=False, help='', preentry=None, postcommand=None, check=special1, collect=False, **kw): r""" Private method used to add entry GUI elements The background of the GUI gets colored green/red depending on whether the input by the user is a valid Python entry :param parent: parent TkInter object :param location: location in the OMFIT tree to be updated (it's a string) :param lbl: text on the left of the entry box :param updateGUI: whether the whole GUI should be redrawn when the value is updated :param help: string to be printed when the user right-clicks on the element :param preentry: function to pre-process the data at the OMFIT location to be displayed in the entry GUI element :param postcommand: function to post-process what the user has entered in the entry GUI element :param check: function that returns whether what the user has entered in the entry GUI element is a valid entry This will make the background colored yellow, and users will not be able to set the value. :param \**kw: keywords passed to the ttk.Entry object :return: associated ttk.Entry object """ kw = _tk_ttk_process_kw('TEntry', kw) value = tk.StringVar() oldValueRepr = dict(val=repr_eval(location, preentry, collect=collect)) e = ttk.Entry(parent, textvariable=value, takefocus=False, **kw) e.configure(font=OMFITfont()) e.pack(side=tk.LEFT, expand=tk.YES, fill=tk.X) value.set(repr_eval(location, preentry, collect=collect)) if Lock(location, checkLock=True): e.config(state=tk.DISABLED) else: def set_location(location, tmp): if isinstance(eval(tmp), list) and isinstance(eval(location), numpy.ndarray) and len(eval(location).shape) == 1: tmp = 'numpy.atleast_1d(' + tmp + ')' exec(location + "=" + tmp, globals(), locals()) def update_tree_values(location, updateGUI): tmp = checkKeyPressed() if tmp[0] is False: printe('Invalid Python entry: ' + value.get()) return elif tmp[0] is None and tmp[1] is None: printe('Tree entry ' + location + ' should not be set as: ' + value.get()) return elif tmp[0] is None: printe('Tree entry ' + location + ' should be ' + tmp[1]) return # valid entry - lets use it tmp = value.get() e.state(['!invalid', '!alternate', '!active']) # return style to default (remove color etc.) e.config(style='TEntry') if not isinstance(location, basestring): if len(tolist(eval(tmp))) > 1: for k, loc in enumerate(location): set_location(loc, tmp + "[%d]" % k) else: for loc in location: set_location(loc, tmp) else: set_location(location, tmp) oldValueRepr['val'] = repr_eval(location, preentry, collect=collect) if postcommand is not None: manage_user_errors(lambda: postcommand(location=location), reportUsersErrorByEmail=True) try: if isinstance(location, basestring): eval(location) except KeyError: # this is done to handle `delete_if_default` option or any post-command which deletes the entry pass else: oldValueRepr['val'] = repr_eval(location, preentry, collect=collect) OMFITaux['rootGUI'].event_generate("<<update_treeGUI>>") if updateGUI: OMFITaux['rootGUI'].event_generate("<<update_GUI>>") def checkKeyPressed(event=None): try: try: tmp = repr_eval(value.get(), collect=collect) if not tmp == oldValueRepr['val']: # new value if check is not special1: if not check(_eval(tmp)): e.state(['invalid', '!alternate', '!active']) e.config(style='check.TEntry') else: e.state(['!invalid', '!alternate', 'active']) e.config(style='valid.TEntry') else: e.state(['!invalid', '!alternate', 'active']) e.config(style='valid.TEntry') else: # no change in value e.state(['!invalid', '!alternate', '!active']) e.config(style='TEntry') except Exception as _excp: e.state(['alternate', '!invalid', '!active']) e.config(style='error.TEntry') # printe('Error: ' + repr(_excp) + '\nIs this a valid Python entry?\nDid you forget to add string quotations?') # too much printing if e.instate(['alternate']): return False, None elif e.instate(['invalid']): tmp = None if 'is_' == check.__name__[:3]: tmp = check.__name__[3:] return None, tmp else: return True, None except Exception: pass def escape(event=None): value.set(oldValueRepr['val']) checkKeyPressed() if check is not special1: if not check(_eval(value.get())): e.config(background='goldenrod1') e.bind(sequence="<Return>", func=lambda event=None: update_tree_values(location, updateGUI)) e.bind(sequence="<KP_Enter>", func=lambda event=None: update_tree_values(location, updateGUI)) e.bind(sequence="<Key>", func=lambda event=None: e.after(1, checkKeyPressed)) e.bind(sequence="<<virtualKey>>", func=checkKeyPressed) e.bind(sequence="<Escape>", func=lambda event=None: e.after(1, escape)) e.bind(sequence="<" + rightClick + ">", func=lambda event=None: _reveal(location, lbl, help)) return e, value def _Label(parent, text, **kw): """ Private method used to add labels and comments :param parent: parent TkInter object :param text: text in the label :return: associated ttk.Label object """ kw=_tk_ttk_process_kw('TLabel',kw) lbl=ttk.Label(parent,text=text,**kw) lbl.config(justify=tk.LEFT) return lbl def _Text(parent, location, lbl=None, updateGUI=False, help='', preentry=None, postcommand=None, **kw): kw=_ttk_tk_process_kw(kw) top=tk.Toplevel(parent) top.transient(parent) top.wm_title(lbl) e = tk.ScrolledText(top, wrap=tk.NONE, undo=tk.TRUE, maxundo=-1, relief=tk.GROOVE, border=1, height=12, takefocus=False, **kw) e.pack(side=tk.TOP,expand=tk.YES,fill=tk.BOTH,padx=5,pady=5) if isinstance(eval(location),basestring): if preentry is None: e.set(eval(location)) else: e.set(preentry(eval(location))) elif isinstance(eval(location),numpy.ndarray): if preentry is None: e.set(repr(eval(location))) else: e.set(repr(preentry(eval(location)))) # allow line splitting of multi item lists in multiline edit option of Entry elif isinstance(eval(location),list): e.set(repr_eval(location,preentry).replace('],','],\n')) else: e.set(repr_eval(location,preentry)) if Lock(location,checkLock=True): e.config(state=tk.DISABLED) else: def update_tree_values(location,updateGUI): tmp=e.get() if isString.get(): tmp=repr(tmp) else: if isinstance(eval(tmp),list) and isinstance(eval(location),numpy.ndarray) and len(eval(location).shape)==1: tmp='numpy.atleast_1d('+tmp+')' exec(location+"="+tmp, globals(), locals()) if postcommand is not None: manage_user_errors(lambda:postcommand(location=location),reportUsersErrorByEmail=True) OMFITaux['rootGUI'].event_generate("<<update_treeGUI>>") if updateGUI: OMFITaux['rootGUI'].event_generate("<<update_GUI>>") top.destroy() frm=ttk.Frame(top) frm.pack(side=tk.TOP,expand=tk.YES,fill=tk.BOTH) isString=tk.BooleanVar() if isinstance(eval(location),basestring): isString.set(True) ttk.Checkbutton(frm, text="is string", variable=isString,takefocus=False).pack(side=tk.LEFT,expand=tk.NO,padx=5) ttk.Button(frm,text="Update",command=lambda event=None:update_tree_values(location,updateGUI),takefocus=False).pack(side=tk.LEFT,expand=tk.YES,fill=tk.X,padx=5) top.bind(sequence="<Escape>", func=lambda event=None:top.destroy()) e.bind(sequence="<"+rightClick+">", func=lambda event=None:_reveal(location,lbl,help)) return e def _helpButton(master, help): if help: ttk.Button(master, text='?', command=lambda help=help:helpTip.showtip(help,move=False,strip=True), style='flat.TButton', takefocus=False).pack(side=tk.LEFT,expand=tk.NO,fill=tk.NONE,padx=0,pady=0) def _urlButton(master, url): if url: if '://' not in url: url='http://'+url ttk.Button(master, text='w', command=lambda url=url:openInBrowser(url), takefocus=False, style='flat.TButton').pack(side=tk.LEFT,expand=tk.NO,fill=tk.NONE,padx=0,pady=0)
[docs]@_available_to_userGUI def CompoundGUI(pythonFile, title=None, **kw): """ This method allows the creation of nested GUI. :param pythonFile: is meant to be an OMFITpythonGUI object in the OMFIT tree :param title: title to appear in the compound GUI frame. If None, the location of the `pythonFile` object in the OMFIT tree will be shown. If an empty string, the compound GUI title is suppressed. :return: None """ from classes.omfit_python import OMFITpythonGUI if isinstance(pythonFile, basestring): pythonFile = eval(pythonFile) if not isinstance(pythonFile, OMFITpythonGUI): raise Exception('GUI scripts must be declared as OMFITpythonGUI objects') tmp = _GUIs[str(_aux['topGUI'])].pythonFile tmp_parentGUI = _aux['parentGUI'], _aux['notebook'], _aux['tab_name'], _aux['tab_list'], _aux['same_row'], _aux['packing'] try: _GUIs[str(_aux['topGUI'])].pythonFile = pythonFile bw = 2 if title is not None and not len(title): bw = 0 _aux['parentGUI'] = ttk.Frame(_aux['parentGUI'], borderwidth=bw, relief=tk.GROOVE) _aux['parentGUI'].pack(side=_aux['packing'], expand=tk.NO, fill=tk.BOTH, padx=bw, pady=bw) _aux['same_row'] = None _aux['packing'] = tk.TOP _aux['notebook'] = None _aux['tab_name'] = '' _aux['tab_list'] = {} _aux['tab_list'][''] = _aux['parentGUI'] if title is None: Label(treeLocation(_GUIs[str(_aux['topGUI'])].pythonFile)[-1], font=OMFITfont('bold', -2)) elif len(title.strip()): Label(title, font=OMFITfont('bold', -2)) tmp_is_compoundGUI = _aux['is_compoundGUI'] _aux['is_compoundGUI'] = True kw.setdefault('compoundGUI', True) out_namespace = pythonFile.__run__(**kw) _aux['is_compoundGUI'] = tmp_is_compoundGUI return out_namespace finally: _aux['parentGUI'], _aux['notebook'], _aux['tab_name'], _aux['tab_list'], _aux['same_row'], _aux['packing'] = tmp_parentGUI _GUIs[str(_aux['topGUI'])].pythonFile = tmp
[docs]@_available_to_userGUI def Tab(name=''): """ This method creates a Tab under which the successive GUI elements will be placed :param name: Name to assign to the TAB :return: None """ _aux['tab_name'] = name # if there was no notebook create one if _aux['notebook'] is None and len(name): _aux['notebook'] = ttk.Notebook(_aux['parentGUI'], takefocus=False) _aux['notebook'].pack(side=tk.TOP, expand=tk.YES, fill=tk.X) _GUIs[str(_aux['topGUI'])].notebook_list.append([_GUIs[str(_aux['topGUI'])].pythonFile, _aux['notebook']]) # if the name is not in the tab, then add it if name not in _aux['tab_list']: _aux['tab_list'][name] = ttk.Frame(_aux['notebook']) _aux['notebook'].add(_aux['tab_list'][name], text=name) # if there was a notebook, try to re-select the tabs which were selected if _GUIs[str(_aux['topGUI'])].pythonFile in _aux['open_tabs']: try: _aux['notebook'].select(_aux['open_tabs'][_GUIs[str(_aux['topGUI'])].pythonFile]) except Exception: pass # activate the tab _aux['parentGUI'] = _aux['tab_list'][name]
[docs]@_available_to_userGUI def Entry( location, lbl=None, comment=None, updateGUI=False, help='', preentry=None, postcommand=None, check=special1, multiline=False, norm=None, default=special1, delete_if_default=False, url='', kwlabel={}, **kw): """ This method creates a GUI element of the entry type The background of the GUI gets colored green/red depending on whether the input by the user is a valid Python entry :param location: location in the OMFIT tree (notice that this is a string) :param lbl: Label which is put on the left of the entry :param comment: A comment which appears on top of the entry :param updateGUI: Force a re-evaluation of the GUI script when this parameter is changed :param help: help provided when user right-clicks on GUI element (adds GUI button) :param preentry: function to pre-process the data at the OMFIT location to be displayed in the entry GUI element :param postcommand: command to be executed after the value in the tree is updated. This command will receive the OMFIT location string as an input :param check: function that returns whether what the user has entered in the entry GUI element is a valid entry. This will make the background colored yellow, and users will not be able to set the value. :param default: Set the default value if the tree location does not exist (adds GUI button) :param delete_if_default: Delete tree entry if the value is the default value :param multiline: Force display of button for multiple-line text entry :param norm: normalizes numeric variables (overrides `preentry` or `postcommand`) :param url: open url in web-browser (adds GUI button) :param kwlabel: keywords passed to ttk.Label :return: associated ttk.Entry object """ kwlabel=_tk_ttk_process_kw('TLabel',kwlabel) location=_absLocation(location) if isinstance(location,basestring): location,collection=_for_each_collection(location) multiple=not isinstance(location,basestring) if multiple: default=[default]*len(location) default,default_at_import=_setDefault(location,default) frm_top=ttk.Frame(_aux['parentGUI']) frm_top.pack(side=_aux['packing'],expand=[tk.YES, tk.NO][_aux['packing'] == tk.TOP],fill=tk.X,padx=5,pady=1) if comment is not None: frm=ttk.Frame(frm_top) frm.pack(side=tk.TOP,expand=tk.NO,fill=tk.X) _Label(frm,text=comment).pack(side=tk.LEFT) frm=ttk.Frame(frm_top) frm.pack(side=tk.TOP,expand=tk.NO,fill=tk.X) if lbl is None: lbl=location ttk.Label(frm,text=str(lbl)+" = "*numpy.sign(len(str(lbl))), **kwlabel).pack(side=tk.LEFT) if norm is not None: def post(location,norm): exec(location+'*='+str(norm), globals(), locals()) def pre(value,norm): return value/norm preentry=lambda value,norm=norm:pre(value,norm) postcommand=lambda location,norm=norm:post(location,norm) if delete_if_default: postcommand=lambda location,postcommand=postcommand:delete_default(location,postcommand) e,value=_Entry(frm, location, lbl, updateGUI, help, preentry, postcommand, check=check, collect=multiple, **kw) # multiple entries if multiple: ttk.Label(frm,text='[%d]'%len(location),**kwlabel).pack(side=tk.LEFT) # multiline button elif multiline or isinstance(eval(location),basestring) and '\n' in eval(location): def showText(event=None): _Text(frm, location, lbl, True, help, preentry, postcommand, **kw) ttk.Button(frm, text="...", command=showText,takefocus=False, width=3).pack(side=tk.LEFT,expand=tk.NO,fill=tk.NONE,padx=5) if not Lock(location,checkLock=True): # default button if default is not special1: def writeToEntry(): e.focus_set() e.grab_set() value.set(repr(default)) _aux['parentGUI'].update_idletasks() e.event_generate("<<virtualKey>>") e.grab_release() ttk.Button(frm, text=['d','D'][not default_at_import], command=writeToEntry, style='flat.TButton', takefocus=False, state=kw.get('state', 'enabled') ).pack(side=tk.LEFT, expand=tk.NO, fill=tk.NONE, padx=0, pady=0) # help button _helpButton(frm,help) # web button _urlButton(frm,url) def delete_default(location,postcommand=None): if postcommand: postcommand() if eval(location)==default: exec(('del '+location), globals(), locals()) if delete_if_default: delete_default(location) return e
[docs]@_available_to_userGUI def ComboBox( location, options, lbl=None, comment=None, updateGUI=False, state='readonly', help='', postcommand=None, check=special1, default=special1, url='', kwlabel={}, **kw): """ This method creates a GUI element of the combobox type. The background of the GUI gets colored green/red depending on whether the input by the user is a valid Python entry Notice that this method can be used to set multiple entries at once: `ComboBox(["root['asd']","root['dsa']","root['aaa']",],{'':[0,0,0],'a':[1,1,0],'b':[1,0,'***']},'Test multi',default=[0,0,0])` which comes very handy when complex/exclusive switch combinations need to be set in a namelist file, for example. Use the string `***` to leave parameters unchanged. :param location: location in the OMFIT tree (notice that this is either a string or a list of strings) :param options: possible options the user can choose from. This can be a list or a dictionary. :param lbl: Label which is put on the left of the entry :param comment: A comment which appears on top of the entry :param updateGUI: Force a re-evaluation of the GUI script when this parameter is changed :param state: * 'readonly' (default) the user can not type in whatever he wants * 'normal' allow user to type in * 'search' allow searching for entries :param help: help provided when user right-clicks on GUI element (adds GUI button) :param postcommand: command to be executed after the value in the tree is updated. This command will receive the OMFIT location string as an input :param check: function that returns whether what the user has entered in the entry GUI element is a valid entry. This will make the background colored yellow, and users will not be able to set the value. :param default: Set the default value if the tree location does not exist (adds GUI button) :param url: open url in web-browser (adds GUI button) :param kwlabel: keywords passed to ttk.Label :return: associated TkInter combobox object """ kwlabel=_tk_ttk_process_kw('TLabel',kwlabel) kw=_tk_ttk_process_kw('TEntry',kw) multiple=not isinstance(location,basestring) location=_absLocation(location) default,default_at_import=_setDefault(location,default) #escapeDescription will escape string options whose description matches the value def escapeDescription(description,value=None): description=str(description) if state=='normal' and isinstance(value,basestring) and description==value and value!='': description=repr(description) return '***'+description+'***' # create key-text pair tmp = OrderedDict() if isinstance(options, (SortedDict, OrderedDict)): for key in list(options.keys()): tmp[escapeDescription(key, options[key])] = options[key] elif isinstance(options, dict): for key in sorted(list(options.keys()),key=str): tmp[escapeDescription(key, options[key])] = options[key] elif isinstance(options, (list, tuple)): for item in options: tmp[escapeDescription(item, item)] = item else: raise OMFITexception('ComboBox options can only be dictionaries, lists or tuples') options = tmp #handle wildcards # do not add wildcard entries if they have the same effective # value of an entry that does not have wildcards if multiple: for opt in list(options.keys()): modified=False tmp=copy.deepcopy(options[opt]) for k,loc in enumerate(location): if tmp[k]=='***': tmp[k]=eval(loc) modified=True if modified: alreadyThere=False for opt1 in options: if repr(options[opt1])==repr(tmp): alreadyThere=True if alreadyThere: del options[opt] else: options[opt]=tmp #create inverse lookup dictionary values=SortedDict() for description in options: values[repr(options[description])]=description #add also the current entry if repr(_eval(location)) not in values: options[escapeDescription(repr_eval(location),_eval(location))]=copy.deepcopy(_eval(location)) values[repr_eval(location)]=escapeDescription(repr_eval(location),_eval(location)) if lbl is None: lbl=location frm_top=ttk.Frame(_aux['parentGUI']) frm_top.pack(side=_aux['packing'],expand=[tk.YES, tk.NO][_aux['packing'] == tk.TOP],fill=tk.X,padx=5,pady=1) if comment is not None: frm=ttk.Frame(frm_top) frm.pack(side=tk.TOP,expand=tk.NO,fill=tk.X) _Label(frm,text=comment).pack(side=tk.LEFT) frm=ttk.Frame(frm_top) frm.pack(side=tk.TOP,expand=tk.NO,fill=tk.X) ttk.Label(frm,text=lbl+" = "*numpy.sign(len(str(lbl))),**kwlabel).pack(side=tk.LEFT) comboBox = Combobox(frm, state=state if state!='search' else 'normal', values=tuple([k.strip('*') for k in list(options.keys())]),takefocus=False,**kw) comboBox.configure(font=OMFITfont()) comboBox.pack(side=tk.LEFT,expand=tk.YES,fill=tk.X) comboBox.set(values[repr_eval(location)][3:-3]) if Lock(location,checkLock=True): comboBox.config(state=tk.DISABLED) else: def update_tree_values(location,updateGUI): tmp=checkKeyPressed() if state == 'search': if escapeDescription(comboBox.get()) not in options: escape() return if tmp[0] is False: printe('Invalid Python entry: '+comboBox.get()) return elif tmp[0] is None and tmp[1] is None: printe('Tree entry '+location+' should not be set as: '+comboBox.get()) return elif tmp[0] is None: printe('Tree entry '+location+' should be '+tmp[1]) return if escapeDescription(comboBox.get()) in options: tmp=options[escapeDescription(comboBox.get())] else: tmp=_eval(comboBox.get()) if isinstance(tmp,list) and isinstance(_eval(location),numpy.ndarray) and len(_eval(location).shape)==1: tmp=numpy.atleast_1d(tmp) if isinstance(location,basestring): exec(location+'='+repr(tmp), globals(), locals()) else: for k,loc in enumerate(location): exec(loc+'='+repr(tmp)+'[k]', globals(), locals()) if postcommand is not None: manage_user_errors(lambda:postcommand(location=location),reportUsersErrorByEmail=True) OMFITaux['rootGUI'].event_generate("<<update_treeGUI>>") if updateGUI: OMFITaux['rootGUI'].event_generate("<<update_GUI>>") else: checkKeyPressed() def checkKeyPressed(event=None): # reorder entries based on matching search if state == 'search': try: coptions=[x for x in comboBox.configure('values')[4] if x!='-----'] except tk.TclError: pass else: tmp_options=[] for item in coptions: if comboBox.get().lower() in item.lower(): tmp_options.append(item) tmp_options.append('-----') for item in coptions: if comboBox.get().lower() not in item.lower(): tmp_options.append(item) comboBox.configure(values=tuple(tmp_options)) try: try: # if the value is one of the options if escapeDescription(comboBox.get()) in options: if repr(options[escapeDescription(comboBox.get())])==repr_eval(location): comboBox.state(['active', '!invalid', '!alternate']) comboBox.config(style='TCombobox') else: comboBox.state(['active', '!invalid', '!alternate']) comboBox.config(style='valid.TCombobox') # if in search mode elif state == 'search': comboBox.state(['active', '!invalid', '!alternate']) comboBox.config(style='exist.TCombobox') return True, 'search' # if the value is not a valid option else: tmp=repr_eval(comboBox.get()) # if the value equals what's in the tree location (--> white) if tmp==repr_eval(location): comboBox.state(['!active', '!invalid', '!alternate']) comboBox.config(style='TCombobox') # if the value equals what's in the tree location (--> green) else: comboBox.state(['active', '!invalid', '!alternate']) comboBox.config(style='valid.TCombobox') # if there is a check and does not pass (--> orange) if check is not special1 and not check(_eval(tmp)): comboBox.state(['!active', 'invalid', '!alternate']) comboBox.config(style='check.TCombobox') # if anything fails (--> red) except Exception as _excp: comboBox.state(['!active', '!invalid', 'alternate']) comboBox.config(style='error.TCombobox') if comboBox.instate(['alternate']): return False, None elif comboBox.instate(['invalid']): tmp=None if 'is_' == check.__name__[:3]: tmp=check.__name__[3:] return None,tmp else: return True,None except Exception: pass def escape(event=None): try: comboBox.set(values[repr_eval(location)][3:-3]) except Exception: comboBox.set(repr_eval(location)) checkKeyPressed() comboBox.bind('<<ComboboxSelected>>', func=lambda event=None:update_tree_values(location,updateGUI)) comboBox.bind('<Return>', func=lambda event=None:update_tree_values(location,updateGUI)) comboBox.bind('<KP_Enter>', func=lambda event=None:update_tree_values(location,updateGUI)) comboBox.bind(sequence="<Key>", func=lambda event=None:comboBox.after(1,checkKeyPressed)) comboBox.bind(sequence="<Escape>", func=lambda event=None:comboBox.after(1,escape)) checkKeyPressed() #default button if default is not special1: def writeToEntry(): if repr(default) in values: comboBox.set(values[repr(default)][3:-3]) elif state=='normal': comboBox.set(repr(default)) else: raise OMFITexception('Combobox %s: Default value %s is not a valid option'%(location,default)) update_tree_values(location,updateGUI) checkKeyPressed() ttk.Button( frm, text=['d','D'][not default_at_import], command=writeToEntry, style='flat.TButton', takefocus=False,state=['enabled','disabled'][state=='disabled'] ).pack(side=tk.LEFT,expand=tk.NO,fill=tk.NONE,padx=0,pady=0) # help button _helpButton(frm,help) # web button _urlButton(frm,url) comboBox.bind(sequence="<"+rightClick+">", func=lambda event=None:_reveal(location,lbl,help)) #break mousewheel scroll comboBox.bind("<MouseWheel>", 'break') comboBox.bind("<Button-4>", 'break') comboBox.bind("<Button-5>", 'break') return comboBox
[docs]@_available_to_userGUI class same_row(object): ''' Environment to place GUI elements on the same row For example to place two buttons side by side: >> with OMFITx.same_row(): >> OMFITx.Button('Run', lambda: None) >> OMFITx.Button('Plot', lambda: None) ''' def __enter__(self): self.frm_top = ttk.Frame(_aux['parentGUI']) self.old_aux={} self.old_aux.update(_aux) _aux['same_row'] = _aux['parentGUI'] _aux['parentGUI'] = self.frm_top _aux['packing'] = tk.LEFT return self def __exit__(self, type, value, traceback): _aux['packing'] = self.old_aux['packing'] _aux['parentGUI'] = self.old_aux['parentGUI'] _aux['same_row'] = self.old_aux['same_row'] self.frm_top.pack(side=tk.TOP, expand=tk.NO, fill=tk.X, padx=0, pady=0)
[docs]@_available_to_userGUI class same_tab(object): ''' Environment to place GUI elements within the same tab For example to place two buttons in the same tab named 'test' >> with OMFITx.same_tab('test'): >> OMFITx.Button('Run', lambda: None) >> OMFITx.Button('Plot', lambda: None) ''' def __init__(self, tab_name): self.tab_name = tab_name def __enter__(self): self.old_tab_name = _aux['tab_name'] Tab(self.tab_name) return self def __exit__(self, type, value, traceback): Tab(self.old_tab_name)
[docs]@_available_to_userGUI def Button(text, command, help='', updateGUI=False, url='', **kw): r""" This method creates a GUI element of the button type :param text: the text to be written on the button :param command: the command to be executed :param help: help provided when user right-clicks on GUI element (adds GUI button) :param url: open url in web-browser (adds GUI button) :param \**kw: extra keywords are passed to the ttk.Button :return: associated ttk.Button object """ kw=_tk_ttk_process_kw('TButton',kw) location=None if isinstance(command,basestring): location=command tmp=os.path.splitext(location) command=eval(_absLocation(tmp[0])+tmp[1]) frm_top=ttk.Frame(_aux['parentGUI']) frm_top.pack(side=_aux['packing'],expand=[tk.YES, tk.NO][_aux['packing'] == tk.TOP],fill=tk.X,padx=5,pady=1) def buttonClick(btt,command2,updateGUI): btt.config(state=tk.DISABLED) try: command2() finally: def back2normal(btt): try: btt.config(state=tk.NORMAL) except Exception: pass OMFITaux['rootGUI'].event_generate("<<update_treeGUI>>") if updateGUI: OMFITaux['rootGUI'].event_generate("<<update_GUI>>") else: btt.after(250,lambda:back2normal(btt)) btt=ttk.Button(frm_top,text=text,command=lambda event=None:buttonClick(btt,lambda:manage_user_errors(command,reportUsersErrorByEmail=True),updateGUI),takefocus=False,**kw) btt.pack(side=tk.LEFT,expand=tk.YES,fill=tk.X) #help button _helpButton(frm_top,help) # web button _urlButton(frm_top,url) btt.bind(sequence="<"+rightClick+">", func=lambda event=None:_reveal(command,text,help)) return btt
[docs]@_available_to_userGUI def Label(text, align='center', **kw): """ This method creates a GUI element of the label type :param text: the text to be written in the label :param align: alignment of the text in the frame :return: associated ttk.Label object """ frm_top=ttk.Frame(_aux['parentGUI']) frm_top.pack(side=_aux['packing'], expand=tk.NO, fill=tk.X, padx=2, pady=2) image=kw.pop('image',None) if image is not None: try: im = tk.PhotoImage(master=frm_top,file=image) except tk.TclError: import PIL.ImageTk im = PIL.ImageTk.PhotoImage(master=frm_top,file=image) lbl=_Label(frm_top,text,**kw) if image is not None: lbl.configure(image=im) lbl._ntimage = im alignment={'left':tk.LEFT,'center':tk.TOP,'right':tk.RIGHT} lbl.pack(side=alignment[align],expand=tk.NO,fill=tk.X) _aux['configure_size'].append( ( frm_top , lambda event=None,lbl=lbl,frm_top=frm_top:lbl.config(wraplength=frm_top.winfo_width()) ) ) return lbl
[docs]@_available_to_userGUI def Separator(lbl=None, kwlabel={}, **kw): r""" This method creates a TkInter separator object :param lbl: text to be written between separator lines :param kwlabel: keywords passed to ttk.Label :param \**kw: keywords passed to ttk.Label :return: associated ttk.Label object """ kwlabel.update(kw) if 'font' not in kwlabel: kwlabel['font']=OMFITfont('bold') kwlabel=_tk_ttk_process_kw('TLabel',kwlabel) if not lbl: sep=ttk.Separator(_aux['parentGUI']) sep.pack(side=_aux['packing'],expand=tk.NO,fill=tk.X,padx=5,pady=2) else: frm_top=ttk.Frame(_aux['parentGUI']) frm_top.pack(side=_aux['packing'],expand=[tk.YES, tk.NO][_aux['packing'] == tk.TOP],fill=tk.X,padx=0,pady=0) ttk.Separator(frm_top).pack(side=tk.LEFT,expand=tk.YES,fill=tk.X,padx=5,pady=2) tmp=ttk.Label(frm_top,text=str(lbl), **kwlabel) tmp.pack(expand=tk.NO,fill=tk.NONE,side=tk.LEFT) ttk.Separator(frm_top).pack(side=tk.LEFT,expand=tk.YES,fill=tk.X,padx=5,pady=2) return tmp
[docs]@_available_to_userGUI def FilePicker(location, lbl=None, comment=None, updateGUI=False, help='', postcommand=None, localRemote=True, transferRemoteFile=True, directory=False, action='open', tree=True, default=special1, url='', kwlabel={}, init_directory_location=None, init_pattern_location=None, favorite_list_location=None, pattern_list_location=None, **kw): r""" This method creates a GUI element of the filePicker type, which allows to pick a file/directory :param location: location in the OMFIT tree (notice that this is a string) :param lbl: lbl to be shown on the button :param help: help provided when user right-clicks on GUI element (adds GUI button) :param postcommand: command to be executed after the value in the tree is updated. This command will receive the OMFIT location string as an input :param updateGUI: Force a re-evaluation of the GUI script when this parameter is changed :param localRemote: True: both, 'local': only local, 'remote': only remote :param transferRemoteFile: controls what goes into location * string with local filename (if transferRemoteFile==True) * string with the filename (if transferRemoteFile==False) * tuple with the filename,server,tunnel (if transferRemoteFile==None) if transferRemoteFile=True, then the file is transferred to a temporary folder if transferRemoteFile is a string, then it will be interpreted as the directory where to move the file :param directory: whether it's a directory or a file :param action: 'open' or 'save' :param tree: load from OMFIT tree location :param url: open url in web-browser (adds GUI button) :param kwlabel: keywords passed to ttk.Label :param init_directory_location: The contents of this location are used to set the initial directory for file searches. If a file name is specified the directory will be determined from the file name and this input ignored. Otherwise, if set this will be used to set the initial directory. :param init_pattern_location: The default pattern is '*'. If this is specified then the contents of the tree location will replace the default intial pattern. :param favorite_list_location: OMFIT tree location which contains a possibly empty list of favorite file directories. To keep with the general omfit approach this should be a string. :param pattern_list_location: OMFIT tree location which contains a possibly empty list of favorite search patterns. To keep with the general omfit approach this should be a string. :param \**kw: keywords passed to Entry object :return: associated ttk.Entry object """ kwlabel = _tk_ttk_process_kw('TLabel', kwlabel) location = _absLocation(location) default, default_at_import = _setDefault(location, default) if action != 'open': transferRemoteFile = False localRemote = 'local' # todo: allow save to remote files def processFilename(tmp): exec(location + '=' + repr(tmp), globals(), locals()) if postcommand is not None: manage_user_errors(lambda: postcommand(location=location), reportUsersErrorByEmail=True) value.set(repr(tmp)) try: e.icursor(tk.END) e.xview(tk.END) except Exception: pass OMFITaux['rootGUI'].event_generate("<<update_treeGUI>>") if updateGUI: OMFITaux['rootGUI'].event_generate("<<update_GUI>>") def askTree(): top = tk.Toplevel(_topGUI(_aux['parentGUI'])) top.withdraw() top.transient(_aux['parentGUI']) top.wm_title('Pick file tree location') _Label(top, text='Tree location: ').pack(side=tk.LEFT) var = tk.OneLineText(top, width=50, percolator=True) var.set(OMFITaux['GUI'].focusRoot) var.pack(side=tk.LEFT, padx=2, pady=5, fill=tk.X, expand=tk.YES) def onReturn(event=None, var=None): if hasattr(eval(var.get()), 'filename'): if hasattr(eval(var.get()), 'save'): eval(var.get()).save() else: printe(var.get() + " has no .save() method! The chosen object may not be in sync.") processFilename(eval(var.get()).filename) else: processFilename(var.get()) top.destroy() def onEscape(event=None): top.destroy() var.focus_set() top.bind('<Return>', lambda event: onReturn(event=event, var=var)) top.bind('<KP_Enter>', onReturn) top.bind('<Escape>', onEscape) top.protocol("WM_DELETE_WINDOW", top.destroy) top.update_idletasks() top.deiconify() top.wait_window(top) def askRemote(): remoteFilename = None server = 'localhost' tunnel = '' try: tmp = eval(location) except Exception: pass else: if (isinstance(tmp, basestring) and len(tmp)): remoteFilename = tmp elif (isinstance(tmp, (tuple, list, numpy.ndarray)) and len(tmp[0])): if len(tmp): remoteFilename = tmp[0] if len(tmp) > 1: server = tmp[1] if len(tmp) > 2: tunnel = tmp[2] tmp = remoteFile(_aux['parentGUI'], transferRemoteFile, remoteFilename=remoteFilename, server=server, tunnel=tunnel, init_directory_location=init_directory_location, init_pattern_location=init_pattern_location, favorite_list_location=favorite_list_location, pattern_list_location=pattern_list_location) if (isinstance(tmp, basestring) and len(tmp)) or (isinstance(tmp, (tuple, list, numpy.ndarray)) and len(tmp[0])): processFilename(tmp) frm_top = ttk.Frame(_aux['parentGUI']) frm_top.pack(side=_aux['packing'], expand=[tk.YES, tk.NO][_aux['packing'] == tk.TOP], fill=tk.X, padx=5, pady=1) if comment is not None: frm = ttk.Frame(frm_top) frm.pack(side=tk.TOP, expand=tk.NO, fill=tk.X) _Label(frm, text=comment).pack(side=tk.LEFT) frm = ttk.Frame(frm_top) frm.pack(side=tk.TOP, expand=tk.YES, fill=tk.X, pady=1) if lbl is None: if not directory: lbl = 'Pick File' else: lbl = 'Pick Directory' ttk.Label(frm, text=str(lbl) + " = " * numpy.sign(len(str(lbl))), **kwlabel).pack(side=tk.LEFT) e, value = _Entry(frm, location=location, lbl=lbl, updateGUI=updateGUI, help=help, postcommand=postcommand, **kw) e.icursor(tk.END) e.xview(tk.END) e.config(width=30) bttTree = ttk.Button(frm, text='Tree', takefocus=False) if tree and (not localRemote or localRemote is True or localRemote == 'local' or (localRemote == 'remote' and transferRemoteFile is not False)): bttTree.pack(side=tk.LEFT, expand=tk.NO, fill=tk.X) bttTree.bind(sequence="<" + rightClick + ">", func=lambda event=None: _reveal(location, lbl, help)) bttRemote = ttk.Button(frm, text=['File', 'Directory'][directory], takefocus=False) if localRemote: bttRemote.pack(side=tk.LEFT, expand=tk.NO, fill=tk.X) bttRemote.bind(sequence="<" + rightClick + ">", func=lambda event=None: _reveal(location, lbl, help)) if Lock(location, checkLock=True): bttTree.config(state=tk.DISABLED) bttRemote.config(state=tk.DISABLED) else: bttTree.config(command=askTree) bttRemote.config(command=askRemote) # default button if default is not special1: def writeToEntry(): e.focus_set() e.grab_set() value.set(repr(default)) _aux['parentGUI'].update_idletasks() e.event_generate("<<virtualKey>>") e.grab_release() ttk.Button(frm, text=['d', 'D'][not default_at_import], command=writeToEntry, style='flat.TButton', takefocus=False).pack(side=tk.LEFT, expand=tk.NO, fill=tk.NONE, padx=0, pady=0) # help button _helpButton(frm, help) # web button _urlButton(frm, url) return e
[docs]@_available_to_userGUI def ObjectPicker(location, lbl=None, objectType=None, objectKW={}, postcommand=None, unset_postcommand=None, kwlabel={}, init_directory_location=None, init_pattern_location=None, favorite_list_location=None, pattern_list_location=None, **kw): r""" This helper method creates a GUI element of the objectPicker type, which allows to load objects in the tree. If an object is already present at the location, then a button allows picking of a different object. Notice that this GUI element will always call an updateGUI :param location: location in the OMFIT tree (notice that this is a string) :param lbl: lbl to be shown on the button/file picker :param objectType: class of the object that one wants to load (e.g. OMFITnamelist, OMFITgeqdsk, ...) if `objectType is None` then the object selected with `Tree` is deepcopied :param objectKW: keywords passed to the object :param postcommand: command to be executed after the value in the tree is updated. This command will receive the OMFIT location string as an input. :param unset_postcommand: command to be executed after the value in the tree is deleted. This command will receive the OMFIT location string as an input. :param kwlabel: keywords passed to ttk.Label :param init_directory_location: The contents of this location are used to set the initial directory for file searches. If a file name is specified the directory will be determined from the file name and this input ignored. Otherwise, if set this will be used to set the initial directory. :param init_pattern_location: The default pattern is '*'. If this is specified then the contents of the tree location will replace the default intial pattern. :param favorite_list_location: OMFIT tree location which contains a possibly empty list of favorite file directories. To keep with the general omfit approach this should be a string. :param pattern_list_location: OMFIT tree location which contains a possibly empty list of favorite search patterns. To keep with the general omfit approach this should be a string. :param \**kw: extra keywords are pased to the FilePicker object :return: associated ttk.Entry object """ if isinstance(objectType,basestring): raise ValueError('ObjectPicker: object_type is not supposed to be a string') loc_orig = location location=_absLocation(location) locationObject=location locationScratch=_absLocation("scratch["+repr('objectPicker_'+omfit_hash(location,10))+"]") tmp=parseLocation(location) where=eval(buildLocation(tmp[:-1])) what=tmp[-1] if objectType is None: kw['tree']=True kw['localRemote']=False def set(location=None): if objectType is None: where[what]=copy.deepcopy(eval(eval(locationScratch))) else: where[what]=objectType(eval(locationScratch), **objectKW) if postcommand is not None: manage_user_errors(lambda:postcommand(location=locationObject),reportUsersErrorByEmail=True) def unset(): del where[what] if unset_postcommand is not None: manage_user_errors(lambda:unset_postcommand(location=locationObject),reportUsersErrorByEmail=True) if lbl is None: lbl = loc_orig if what in where: return Button('Pick a different '+lbl,unset,updateGUI=True) else: kw.setdefault('default','') kw.pop('updateGUI',None) kw.setdefault('transferRemoteFile',[True,None][isinstance(objectType,OMFITobject)]) return FilePicker(locationScratch, "Pick "+lbl, postcommand=set, updateGUI=True, kwlabel=kwlabel, init_directory_location=init_directory_location, init_pattern_location=init_pattern_location, favorite_list_location=favorite_list_location, pattern_list_location=pattern_list_location, **kw)
[docs]@_available_to_userGUI def ModulePicker(location, modules=None, *args, **kw): r""" This method creates a GUI element of the combobox type for the selection of modules within the OMFIT project. :param location: location in the OMFIT tree (notice that this is either a string or a list of strings) :param modules: string or list of strings with IDs of the allowed modules. If modules is None all modules in OMFIT are listed :param load: list of two elements lists with module name and location where modules can be loaded eg. [['OMFITprofiles',"root['OMFITprofiles']"],['EFIT',"OMFITmodules[-2]['EFIT']"],] Setting `load=True` will set loading of the modules as submodules :param \*args: arguments passed to OMFITx.ComboBox :param \**kw: keywords passed to OMFITx.ComboBox :return: returns from OMFITx.ComboBox """ if 'default' in kw: default,default_at_import=_setDefault(_absLocation(location),kw.pop('default')) try: existing_location=_absLocation(eval(_absLocation(location))) except Exception: existing_location=None options={} modulesList=OMFIT.moduleDict() for module in modulesList: if modules is None or modulesList[module]['ID'] in tolist(modules): if 'OMFIT'+module==existing_location: options["[%s] -- %s"%(modulesList[module]['ID'],existing_location)]=eval(_absLocation(location)) else: options["[%s] -- OMFIT%s"%(modulesList[module]['ID'],module)]='OMFIT'+module if kw.get('load',[]) is True: kw['load']=[] for module in tolist(modules): kw['load'].append([module,"root['%s']"%module]) if not kw.get('load',[]): kw['load']=[] for m,l0cation in kw.pop('load',[]): l0=_absLocation(l0cation) try: eval(l0) except KeyError: options["load %s in %s"%(m,l0)]='%s/%s'%(m,l0cation) if eval(_absLocation(location)) not in list(options.values()): options['--']=eval(_absLocation(location)) def postcommand(location): if '/' in eval(_absLocation(location)): m,l=eval(_absLocation(location)).split('/') OMFIT.loadModule(m,_absLocation(l)) exec('%s="%s"'%(location,l), globals(), locals()) UpdateGUI() kw['postcommand']=postcommand kw.setdefault('width',int(max([len(k) for k in list(options.keys())]))) return ComboBox(location, options, *args, **kw)
[docs]@_available_to_userGUI def TreeLocationPicker(location, lbl=None, comment=None, kwlabel={}, default=special1, help='', url='', updateGUI=False, postcommand=None, check=None, base=None, **kw): r""" This method creates a GUI element used to select a tree location The label of the GUI gets colored green/red depending on whether the input by the user is a valid OMFITtree entry :param location: location in the OMFIT tree (notice that this is a string) :param lbl: Label which is put on the left of the entry :param comment: A comment which appears on top of the entry :param updateGUI: Force a re-evaluation of the GUI script when this parameter is changed :param help: help provided when user right-clicks on GUI element (adds GUI button) :param postcommand: command to be executed after the value in the tree is updated. This command will receive the OMFIT location string as an input :param default: Set the default value if the tree location does not exist (adds GUI button) :param url: open url in web-browser (adds GUI button) :param kwlabel: keywords passed to ttk.Label :param \**kw: keywords passed to OneLineText object :param base: object in location with respect to which relative locations are evaluated :return: associated ttk.Entry object """ location=_absLocation(location) default,default_at_import=_setDefault(location,default) frm_top=ttk.Frame(_aux['parentGUI']) frm_top.pack(side=_aux['packing'],expand=[tk.YES, tk.NO][_aux['packing'] == tk.TOP],fill=tk.X,padx=5,pady=1) if comment is not None: frm=ttk.Frame(frm_top) frm.pack(side=tk.TOP,expand=tk.NO,fill=tk.X) _Label(frm,text=comment).pack(side=tk.LEFT,expand=tk.YES,fill=tk.X) frm=ttk.Frame(frm_top) frm.pack(side=tk.TOP,expand=tk.YES,fill=tk.X) if lbl is None: lbl=location label=ttk.Label(frm,text=str(lbl), **kwlabel) label.pack(side=tk.LEFT,expand=tk.NO,fill=tk.X) ttk.Label(frm,text=" = "*numpy.sign(len(str(lbl))), **kwlabel).pack(side=tk.LEFT) kw.setdefault('percolator',True) kw.setdefault('width',15) e=tk.OneLineText(frm, **kw) e.pack(side=tk.LEFT,expand=tk.YES,fill=tk.X) e.set(eval(location)) if Lock(location,checkLock=True): e.config(state=tk.DISABLED) else: def update_tree_values(event=None): if checkKeyPressed() is False: printe('Broken dependency: '+e.get()) else: label.config(foreground='forestgreen') exec(location+"="+repr(e.get()), globals(), locals()) if postcommand is not None: manage_user_errors(lambda:postcommand(location=location),reportUsersErrorByEmail=True) OMFITaux['rootGUI'].event_generate("<<update_treeGUI>>") if updateGUI: OMFITaux['rootGUI'].event_generate("<<update_GUI>>") def checkKeyPressed(event=None): try: if base is None: tmp=_absLocation(e.get()) else: tmp=absLocation(e.get(),base,False) eval(tmp) #check that the tree location exists label.config(foreground='forestgreen') if check is not None: return check(e.get()) return True except Exception as _excp: label.config(foreground='red2') return False def escape(event=None): e.set(eval(location)) checkKeyPressed() def pickTree(): e.set(OMFITaux['GUI'].focusRoot) update_tree_values() ttk.Button(frm, text='Tree', command=pickTree).pack(side=tk.LEFT,expand=tk.NO,fill=tk.NONE) # default button if default is not special1: def writeToEntry(): e.focus_set() e.grab_set() e.set(default) _aux['parentGUI'].update_idletasks() e.event_generate("<<virtualKey>>") e.grab_release() ttk.Button(frm, text=['d','D'][not default_at_import], command=writeToEntry, style='flat.TButton', takefocus=False).pack(side=tk.LEFT,expand=tk.NO,fill=tk.NONE,padx=0,pady=0) e.bind(sequence="<Return>", func=update_tree_values) e.bind(sequence="<KP_Enter>", func=update_tree_values) e.bind(sequence="<Key>", func=lambda event=None:e.after(1,checkKeyPressed)) e.bind(sequence="<<virtualKey>>", func=checkKeyPressed) e.bind(sequence="<Escape>", func=lambda event=None:e.after(1,escape)) checkKeyPressed() e.bind(sequence="<"+rightClick+">", func=lambda event=None:_reveal(location,lbl,help)) # help button _helpButton(frm,help) # web button _urlButton(frm,url)
[docs]@_available_to_userGUI def CheckBox( location, lbl=None, comment=None, useInt=False, mapFalseTrue=[], updateGUI=False, help='', postcommand=None, default=special1, url='', kwlabel={}, **kw): r""" This method creates a GUI element of the checkbutton type This method accepts a list of locations, labels and defaults :param location: location in the OMFIT tree (notice that this is a string) :param lbl: Label which is put on the left of the entry :param comment: A comment which appears on top of the entry :param useInt: Use integers (1 or 0) instead of boolean (True or False) :param mapFalseTrue: a 2 elements list, the first one for the unchecked, the second one for the checked button :param updateGUI: Force a re-evaluation of the GUI script when this parameter is changed :param help: help provided when user right-clicks on GUI element (adds GUI button) :param postcommand: function to be called after a button is pushed. It is called as postcommand(location=location,button=button) where button is in ['add','add_all','remove','remove_all'] :param default: Set the default value if the tree location does not exist (adds GUI button) :param url: open url in web-browser (adds GUI button) :param kwlabel: keywords passed to ttk.Label :param \**kw: extra keywords are pased to the Checkbutton object :return: associated TkInter checkbutton object >>> OMFITx.CheckBox(["OMFIT['ck']","OMFIT['ck1']"],['hello','asd'],default=[False,True]) >>> OMFITx.CheckBox("OMFIT['ck']",'hello',default=False) """ kw.update(kwlabel) kw=_tk_ttk_process_kw('TCheckbutton',kw) location0=location location=_absLocation(location) default,default_at_import=_setDefault(location,default) location=tolist(location) if not isinstance(location0,(list,tuple)) and numpy.iterable(eval(location[0])) and len(eval(location[0]))>1: if os.environ['USER'] in ['meneghini', 'smithsp']: printw('%s uses old Checkbox format'%location[0]) return _old_CheckBox(location0,lbl=lbl,comment=comment, useInt=useInt, mapFalseTrue=mapFalseTrue, updateGUI=updateGUI, help=help, postcommand=postcommand, default=default, **kw) #define direct and inverse mappings if not len(mapFalseTrue): mapFalseTrue=[False,True] if useInt: mapFalseTrue=[0,1] invMapFalseTrue={repr(mapFalseTrue[0]):False,repr(mapFalseTrue[1]):True} frm_top=ttk.Frame(_aux['parentGUI']) frm_top.pack(side=_aux['packing'],expand=[tk.YES, tk.NO][_aux['packing'] == tk.TOP],fill=tk.X,padx=5,pady=1) if comment is not None: frm=ttk.Frame(frm_top) frm.pack(side=tk.TOP,expand=tk.NO,fill=tk.X) _Label(frm,text=comment).pack(side=tk.LEFT) frm=ttk.Frame(frm_top) frm.pack(side=tk.TOP,expand=tk.NO,fill=tk.X) if lbl is None: lbl=location lbl=tolist(lbl) #set the ticks e=[] for k,loc in enumerate(location): e.append(ttk.Checkbutton(frm,text=str(lbl[k]),takefocus=False,**kw)) e[k].pack(side=tk.LEFT,expand=tk.NO,fill=tk.X) e[k].state(['!alternate']) # removes color box bg indicating user has not clicked it, which may confuse people if invMapFalseTrue.get(repr(eval(loc)),eval(loc)): e[k].state(['selected']) else: e[k].state(['!selected']) ttk.Frame(frm).pack(side=tk.LEFT,expand=tk.YES,fill=tk.X) if Lock(location,checkLock=True): for k in range(len(location)): e[k].config(state=tk.DISABLED) else: def update_tree_values(location,updateGUI,k): loc=location[k] eloc=mapFalseTrue[int (not invMapFalseTrue.get(repr(eval(loc)),eval(loc)))] exec(loc+"="+repr(eloc), globals(), locals()) if postcommand is not None: #pass the location of the item to postcommand manage_user_errors(lambda:postcommand(location=loc),reportUsersErrorByEmail=True) OMFITaux['rootGUI'].event_generate("<<update_treeGUI>>") if updateGUI: OMFITaux['rootGUI'].event_generate("<<update_GUI>>") for k,loc in enumerate(location): e[k].bind('<ButtonRelease-1>', func=lambda event=None,k=k:update_tree_values(location,updateGUI,k)) #default button if default is not special1: def writeToEntry(): for k,loc in enumerate(location): exec(loc+"="+repr(tolist(default)[k]), globals(), locals()) if postcommand is not None: #one single post-command when hitting `default` loc=location if len(loc)==1: loc=loc[0] manage_user_errors(lambda:postcommand(location=loc),reportUsersErrorByEmail=True) for k,loc in enumerate(location): if invMapFalseTrue.get(repr(eval(loc)),eval(loc)): e[k].state(['selected']) else: e[k].state(['!selected']) OMFITaux['rootGUI'].event_generate("<<update_treeGUI>>") if updateGUI: OMFITaux['rootGUI'].event_generate("<<update_GUI>>") ttk.Button(frm, text=['d','D'][not default_at_import], command=writeToEntry, style='flat.TButton', takefocus=False, state=kw.get('state', 'enabled') ).pack(side=tk.LEFT, expand=tk.NO, fill=tk.NONE, padx=0, pady=0) #help button _helpButton(frm,help) # web button _urlButton(frm,url) for k,loc in enumerate(location): e[k].bind(sequence="<"+rightClick+">", func=lambda event=None:_reveal(location,lbl,help)) return e
def _old_CheckBox( location, lbl=None, comment=None, useInt=False, mapFalseTrue=[], updateGUI=False, help='', postcommand=None, default=special1, url='', **kw): r""" This method creates a GUI element of the checkbutton type. Notice that this method can be used on boolean, int, and also 1D arrays :param location: location in the OMFIT tree (notice that this is a string) :param lbl: Label which is put on the left of the entry :param comment: A comment which appears on top of the entry :param useInt: Use integers (1 or 0) instead of boolean (True or False) :param mapFalseTrue: a 2 elements list, the first one for the unchecked, the second one for the checked button :param updateGUI: Force a re-evaluation of the GUI script when this parameter is changed :param help: help provided when user right-clicks on GUI element (adds GUI button) :param default: Set the default value if the tree location does not exist (adds GUI button) :param url: open url in web-browser (adds GUI button) :param \**kw: extra keywords are pased to the Checkbutton object :return: associated TkInter checkbutton object """ location=_absLocation(location) default,default_at_import=_setDefault(location,default) #define direct and inverse mappings if not len(mapFalseTrue): mapFalseTrue=[False,True] if useInt: mapFalseTrue=[0,1] invMapFalseTrue={repr(mapFalseTrue[0]):False,repr(mapFalseTrue[1]):True} frm_top=ttk.Frame(_aux['parentGUI']) frm_top.pack(side=_aux['packing'],expand=[tk.YES, tk.NO][_aux['packing'] == tk.TOP],fill=tk.X,padx=5,pady=1) if comment is not None: frm=ttk.Frame(frm_top) frm.pack(side=tk.TOP,expand=tk.NO,fill=tk.X) _Label(frm,text=comment).pack(side=tk.LEFT) frm=ttk.Frame(frm_top) frm.pack(side=tk.TOP,expand=tk.NO,fill=tk.X) #set the defaults def set_default(val): if repr(val) not in invMapFalseTrue: if default is special1: return mapFalseTrue[int(bool(val))] else: return default return val eloc=copy.deepcopy(eval(location)) if not numpy.iterable(eloc) or isinstance(eloc,basestring): exec(location+"="+repr(set_default(eloc)), globals(), locals()) eloc=[eval(location)] else: for k in range(len(eloc)): exec(location+"[k]="+repr(set_default(eloc[k])), globals(), locals()) eloc=copy.deepcopy(eval(location)) #set the ticks e=[] for k in range(len(eloc)): e.append(ttk.Checkbutton(frm,takefocus=False,**kw)) e[k].pack(side=tk.LEFT,expand=tk.NO,fill=tk.X) e[k].state(['!alternate']) # removes color box bg indicating user has not clicked it, which may confuse people if invMapFalseTrue[repr(eloc[k])]: e[k].state(['selected']) else: e[k].state(['!selected']) if lbl is None: lbl=location frm1=ttk.Frame(frm) frm1.pack(side=tk.LEFT,expand=tk.YES,fill=tk.X) ttk.Label(frm1,text=str(lbl),justify=tk.LEFT).pack(side=tk.LEFT,expand=tk.NO,fill=tk.NONE) if Lock(location,checkLock=True): for k in range(len(eloc)): e[k].config(state=tk.DISABLED) else: def update_tree_values(location,updateGUI,k): eloc=copy.deepcopy(eval(location)) if not numpy.iterable(eloc) or isinstance(eloc,basestring): eloc_new=[eloc] else: eloc_new=eloc eloc_new[k]=mapFalseTrue[int (not invMapFalseTrue[repr(eloc_new[k])])] if not numpy.iterable(eloc) or isinstance(eloc,basestring): exec(location+"="+repr(eloc_new[0]), globals(), locals()) else: exec(location+"="+repr(eloc_new), globals(), locals()) if postcommand is not None: manage_user_errors(lambda:postcommand(location=location),reportUsersErrorByEmail=True) OMFITaux['rootGUI'].event_generate("<<update_treeGUI>>") if updateGUI: OMFITaux['rootGUI'].event_generate("<<update_GUI>>") for k in range(len(eloc)): e[k].bind('<ButtonRelease-1>', func=lambda event=None,k=k:update_tree_values(location,updateGUI,k)) #default button if default is not special1: def writeToEntry(): exec(location+"="+repr(default), globals(), locals()) OMFITaux['rootGUI'].event_generate("<<update_treeGUI>>") if updateGUI: OMFITaux['rootGUI'].event_generate("<<update_GUI>>") ttk.Button(frm, text=['d','D'][not default_at_import], command=writeToEntry, style='flat.TButton', takefocus=False).pack(side=tk.LEFT,expand=tk.NO,fill=tk.NONE,padx=0,pady=0) #help button _helpButton(frm,help) # web button _urlButton(frm,url) for k in range(len(eloc)): e[k].bind(sequence="<"+rightClick+">", func=lambda event=None:_reveal(location,lbl,help)) return e
[docs]@_available_to_userGUI def ListEditor(location, options, lbl=None, default=None, unique=True, ordered=True, updateGUI=False, postcommand=None, only_valid_options=False, help='', url='', show_delete_button=False): ''' GUI element to add or remove objects to a list Note: multiple items selection possible with the Shift and Ctrl keys :param location: location in the OMFIT tree (notice that this is a string). :param options: possible options the user can choose from. This can be a tree location, a list, or a dictionary. If a dictinoary, then keys are shown in the GUI and values are set in the list. In order to use "show_delete_button", this must be a string giving the location of a list in the tree. :param lbl: Label which is put on the left of the entry :param default: Set the default value if the tree location does not exist :param unique: Do not allow repetitions in the list :param ordered: Keep the same order as in the list of options If false, then buttons to move elements up/down are shown :param updateGUI: Force a re-evaluation of the GUI script when this parameter is changed :param postcommand: function to be called after a button is pushed. It is called as postcommand(location=location,button=button) where button is in ['add','add_all','remove','remove_all'] :param only_valid_options: list can only contain valid options :param help: help provided when user right-clicks on GUI element (adds GUI button) :param url: open url in web-browser (adds GUI button) :param show_delete_button: bool: Show an additional button for deleting items from the left hand list ''' if default is None: default=[] location=_absLocation(location) default,default_at_import=_setDefault(location,default) if isinstance(options, basestring): options = _absLocation(options) if unique: eval(options)[:] = unsorted_unique(eval(options)) opts = eval(options) elif isinstance(options, dict): opts = options.keys() reversed_options = flip_values_and_keys(options) else: if unique: options = unsorted_unique(options) opts = options if unique: eval(location)[:]=unsorted_unique(eval(location)) if lbl is None: lbl=location if only_valid_options: for item in set(eval(location)).difference(set(opts)): eval(location).remove(item) selects=eval(location) if not isinstance(selects,list): raise Exception('%s: OMFITx.ListEditor works only with lists'%location) def post(button): if postcommand is not None: manage_user_errors(lambda:postcommand(location=location,button=button),reportUsersErrorByEmail=True) def add(): #the GUI ixs = olx.curselection() if not ixs: return items = [opts[int(item)] for item in ixs] #the tree elements for item in items: if isinstance(options, dict): item = options[item] if not unique or item not in selects: selects.append(item) if ordered: if isinstance(options, dict): selects.sort(key=lambda x:options.values().index(x)) else: selects.sort(key=lambda x:opts.index(x)) #the GUI slx.delete(0, tk.END) for item in selects: if isinstance(options, dict): item = reversed_options.get(item, item) slx.insert(tk.END, item) OMFITaux['rootGUI'].event_generate("<<update_treeGUI>>") if updateGUI: OMFITaux['rootGUI'].event_generate("<<update_GUI>>") post('add') def add_all(): #the tree elements if isinstance(options, dict): selects[:]=list(options.values()) else: selects[:]=opts #the GUI slx.delete(0, tk.END) for item in selects: if isinstance(options, dict): item = reversed_options.get(item, item) slx.insert(tk.END, item) OMFITaux['rootGUI'].event_generate("<<update_treeGUI>>") if updateGUI: OMFITaux['rootGUI'].event_generate("<<update_GUI>>") post('add_all') def remove(): #the GUI ixs = slx.curselection() if not ixs: return ixs=list(map(int,ixs)) for k in sorted(ixs)[::-1]: slx.delete(k) selects.pop(k) #the tree elements OMFITaux['rootGUI'].event_generate("<<update_treeGUI>>") if updateGUI: OMFITaux['rootGUI'].event_generate("<<update_GUI>>") post('remove') def remove_all(): #the GUI slx.delete(0, tk.END) selects[:]=[] OMFITaux['rootGUI'].event_generate("<<update_treeGUI>>") if updateGUI: OMFITaux['rootGUI'].event_generate("<<update_GUI>>") post('remove_all') def delete_item(): # Update list editor GUI ixs = olx.curselection() if not ixs: # Nothing is selected if slx.curselection(): print('Delete removes items from the left hand list; please select from the left list.') return ixs = list(map(int, ixs)) for k in sorted(ixs)[::-1]: slx.delete(k) j = numpy.where(numpy.array(selects) == opts[k])[0] if len(j): olx.delete(j[0]) selects.pop(j[0]) opts.pop(k) # Update the tree elements OMFITaux['rootGUI'].event_generate("<<update_treeGUI>>") if updateGUI: OMFITaux['rootGUI'].event_generate("<<update_GUI>>") post('delete_item') def ud(direction): ixs = slx.curselection() if not ixs: # Nothing is selected if slx.curselection(): print('Delete removes items from the left hand list; please select from the left list.') return ixs = ixs[0] if direction == 'u' and (ixs - 1) >= 0: selects.insert(ixs - 1, selects.pop(ixs)) elif direction == 'd' and (ixs + 1) < len(selects): selects.insert(ixs + 1, selects.pop(ixs)) else: return slx.delete(0, tk.END) for item in selects: if isinstance(options, dict): item = reversed_options.get(item, item) slx.insert(tk.END, item) # the tree elements OMFITaux['rootGUI'].event_generate("<<update_treeGUI>>") if updateGUI: OMFITaux['rootGUI'].event_generate("<<update_GUI>>") if direction == 'u': slx.select_set(ixs - 1) slx.see(ixs - 1) else: slx.select_set(ixs + 1) slx.see(ixs + 1) axu = lambda event=None: ud('u') axd = lambda event=None: ud('d') top=ttk.Frame(_aux['parentGUI']) top.pack(side=tk.TOP,expand=tk.NO,fill=tk.X,padx=5,pady=1) frm=ttk.Frame(top) frm.pack(side=tk.LEFT,fill=tk.BOTH,expand=tk.YES) scrollbaro = ttk.Scrollbar(frm) scrollbaro.pack(side=tk.RIGHT, fill=tk.Y) olx = tk.Listbox(frm,selectmode=tk.EXTENDED) olx.config(yscrollcommand=scrollbaro.set) scrollbaro.config(command=olx.yview) olx.pack(side=tk.LEFT,fill=tk.BOTH,expand=tk.YES) for item in opts: olx.insert(tk.END, item) frm=ttk.Frame(top) frm.pack(side=tk.LEFT) bt=ttk.Label(frm,text=lbl) bt.pack(side=tk.TOP,fill=tk.X) frm1=ttk.Frame(frm) frm1.pack(side=tk.TOP) # help button _helpButton(frm1,help) # web button _urlButton(frm1,url) bt=ttk.Label(frm,text='') bt.pack(side=tk.TOP,fill=tk.X) bt=ttk.Button(frm,text='Add >',command=add,takefocus=False) bt.pack(side=tk.TOP,fill=tk.X) bt=ttk.Button(frm,text='Add all >>',command=add_all,takefocus=False) bt.pack(side=tk.TOP,fill=tk.X) frmm = ttk.Frame(frm) frmm.pack(side=tk.TOP, fill=tk.X) btxu = ttk.Button(frmm, text='up', command=axu, takefocus=False, width=5) if not ordered: btxu.pack(side=tk.LEFT, expand=tk.NO, padx=0) ttk.Label(frmm, text='').pack(side=tk.LEFT, expand=tk.YES, fill=tk.X) btxd = ttk.Button(frmm, text='down', command=axd, takefocus=False, width=5) if not ordered: btxd.pack(side=tk.LEFT, expand=tk.NO, padx=0) bt.pack(side=tk.TOP,fill=tk.X) bt=ttk.Button(frm,text='Remove <',command=remove,takefocus=False) bt.pack(side=tk.TOP,fill=tk.X) bt=ttk.Button(frm,text='Remove all <<',command=remove_all,takefocus=False) bt.pack(side=tk.TOP,fill=tk.X) if show_delete_button and is_string(options): bt = ttk.Label(frm, text='') bt.pack(side=tk.TOP, fill=tk.X) bt = ttk.Button(frm, text='Delete', command=delete_item, takefocus=False) bt.pack(side=tk.TOP, fill=tk.X) frm=ttk.Frame(top) frm.pack(side=tk.LEFT,fill=tk.BOTH,expand=tk.YES) scrollbarx = ttk.Scrollbar(frm) scrollbarx.pack(side=tk.LEFT, fill=tk.Y) slx = tk.Listbox(frm,selectmode=tk.EXTENDED) slx.config(yscrollcommand=scrollbarx.set) scrollbarx.config(command=slx.yview) slx.pack(side=tk.LEFT,fill=tk.BOTH,expand=tk.YES) for item in selects: if isinstance(options, dict): item = reversed_options.get(item, item) slx.insert(tk.END, item) def mouse_wheel(event,what): # respond to Linux or Windows wheel event if event.num == 5 or event.delta == -120: what.yview('scroll', 1, 'units') elif event.num == 4 or event.delta == 120: what.yview('scroll', -1, 'units') return 'break' for k in ["<Button-4>","<Button-5>","<MouseWheel>"]: olx.bind(k,lambda event=None,what=olx:mouse_wheel(event,what)) slx.bind(k,lambda event=None,what=slx:mouse_wheel(event,what)) olx.bind(sequence="<"+rightClick+">", func=lambda event=None:_reveal(location,lbl,help)) slx.bind(sequence="<"+rightClick+">", func=lambda event=None:_reveal(location,lbl,help)) return olx,slx
[docs]@_available_to_userGUI def Slider(location, start_stop_step, lbl=None, comment=None, digits=None, updateGUI=False, help='', preentry=None, postcommand=None, norm=None, default=special1, url='', kwlabel={}, **kw): """ This method creates a GUI element of the slider type :param location: location in the OMFIT tree (notice that this is a string) :param start_stop_step: list of tree elements with start/stop/step of the slider :param lbl: Label which is put on the left of the entry :param comment: A comment which appears on top of the entry :param digits: How many digits to use (if None uses 3 digits if start_stop_step has floats or else 0 digits if these are integers) :param updateGUI: Force a re-evaluation of the GUI script when this parameter is changed :param help: help provided when user right-clicks on GUI element (adds GUI button) :param preentry: function to pre-process the data at the OMFIT location to be displayed in the entry GUI element :param postcommand: command to be executed after the value in the tree is updated. This command will receive the OMFIT location string as an input :param default: Set the default value if the tree location does not exist (adds GUI button) :param norm: normalizes numeric variables (overrides `preentry` or `postcommand`) :param url: open url in web-browser (adds GUI button) :param kwlabel: keywords passed to ttk.Label :return: associated TtkScale object """ collect = False kwlabel = _tk_ttk_process_kw('TLabel', kwlabel) location = _absLocation(location) if isinstance(location, basestring): location, collection = _for_each_collection(location) default,default_at_import=_setDefault(location,default) frm_top = ttk.Frame(_aux['parentGUI']) frm_top.pack(side=_aux['packing'], expand=[tk.YES, tk.NO][_aux['packing'] == tk.TOP], fill=tk.X, padx=5, pady=1) if comment is not None: frm = ttk.Frame(frm_top) frm.pack(side=tk.TOP, expand=tk.NO, fill=tk.X) _Label(frm, text=comment).pack(side=tk.LEFT) frm = ttk.Frame(frm_top) frm.pack(side=tk.TOP, expand=tk.NO, fill=tk.X) if lbl is None: lbl = location ttk.Label(frm, text=str(lbl) + " = " * numpy.sign(len(str(lbl))), **kwlabel).pack(side=tk.LEFT) if norm is not None: def post(location, norm): exec(location + '*=' + str(norm), globals(), locals()) def pre(value, norm): return value / norm preentry = lambda value, norm=norm: pre(value, norm) postcommand = lambda location, norm=norm: post(location, norm) kw = _tk_ttk_process_kw('Horizontal.TScale', kw) def set_location(location, val): if isinstance(eval(val), list) and isinstance(eval(location), numpy.ndarray) and len(eval(location).shape) == 1: val = 'numpy.atleast_1d(' + val + ')' exec(location + "=" + val, globals(), locals()) def update_tree_values(location, val, updateGUI): set_location(location, val) if postcommand is not None: manage_user_errors(lambda: postcommand(location=location), reportUsersErrorByEmail=True) OMFITaux['rootGUI'].after(500,"<<update_treeGUI>>") if updateGUI: OMFITaux['rootGUI'].event_generate("<<update_GUI>>") def command(val): return update_tree_values(location, val, updateGUI) e = TtkScale(frm, from_=start_stop_step[0], to=start_stop_step[1], tickinterval=start_stop_step[2], value=eval(location), digits=digits, command=command, **kw) #e.after(500, command) e.pack(side=tk.LEFT, expand=tk.YES, fill=tk.X) if Lock(location, checkLock=True): e.scale.configure(state=tk.DISABLED) e.bind(sequence="<" + rightClick + ">", func=lambda event=None: _reveal(location, lbl, help)) #e.bind("<ButtonRelease-1>", command) if not Lock(location, checkLock=True): # default button if default is not special1: def writeToEntry(): e.scale.set(default) e.on_configure(None) _aux['parentGUI'].update_idletasks() ttk.Button(frm, text=['d','D'][not default_at_import], command=writeToEntry, style='flat.TButton', takefocus=False, state=kw.get('state', 'enabled') ).pack(side=tk.LEFT, expand=tk.NO, fill=tk.NONE, padx=0, pady=0) # help button _helpButton(frm, help) # web button _urlButton(frm, url) return e
[docs]@_available_to_userGUI def Lock(location, value=special1, checkLock=False, clear=False): """ The lock method prevents users from using a GUI element which would affect a specific location in the OMFIT tree :param location: location in the OMFIT tree (notice that this is a string or a list of strings) If location is None, then all locks are cleared. :param checkLock: False=set the lock | True=return the lock value :param value: lock location at this value :param clear: clear or set the lock :return: None if checkLock=False, otherwise True/False depending on value of the lock """ multiple=not isinstance(location,basestring) locations=numpy.atleast_1d(location).tolist() for k,location in enumerate(locations): if checkLock: location=_absLocation(location) locations[k] = location in _GUIs[str(_aux['topGUI'])].locked elif location is None: _GUIs[str(_aux['topGUI'])].locked=[] else: location=_absLocation(location) if not clear: _GUIs[str(_aux['topGUI'])].locked.append(location) if value is not special1: tmp=parseLocation(location) if multiple: eval(buildLocation(tmp[:-1]))[tmp[-1]]=value[k] else: eval(buildLocation(tmp[:-1]))[tmp[-1]]=value elif location in _GUIs[str(_aux['topGUI'])].locked: _GUIs[str(_aux['topGUI'])].locked.remove(location) if checkLock: return numpy.all(locations)
[docs]@_available_to_userGUI def TitleGUI(title=None): ''' Sets the title to the user gui window (if it's not a compound GUI) :param title: string containing the title :return: None ''' if not _aux['is_compoundGUI']: _aux['topGUI'].wm_title(title)
[docs]@_available_to_userGUI def ShotTimeDevice(postcommand=None, showDevice=True, showShot=True, showTime=True, showRunID=False, multiShots=False, multiTimes=False, showSingleTime=False, checkDevice=None, checkShot=None, checkTime=None, checkRunID=None, subMillisecondTime=False, stopIfNotSet=True, updateGUI=True): ''' This high level GUI allows setting of DEVICE/SHOT/TIME of each module (sets up OMFIT MainSettings if root['SETTINGS']['EXPERIMENT']['XXX'] is an expression) :param postcommand: command to be executed every time device,shot,time are changed (location is passed to postcommand) :param showDevice: True/False show device section or list of suggested devices :param showShot: True/False show shot section or list with list of suggested shots :param showTime: True/False show time section or list with list of suggested times :param showRunID: True/False show runID Entry :param multiShots: True/False show single/multi shots :param multiTimes: True/False show single/multi times :param showSingleTime: True/False if multiTimes, still show single time :param checkDevice: check if device user input satisfies condition :param checkShot: check if shot user input satisfies condition :param checkTime: check if time user input satisfies condition :param checkRunID: check if runID user input satisfies condition :param subMillisecondTime: Allow floats as times :param stopIfNotSet: Stop GUI visualization if shot/time/device are not set :return: None ''' if not numpy.any([True if numpy.iterable(showDevice) else showDevice, True if numpy.iterable(showShot) else showShot, True if numpy.iterable(showTime) else showTime]): return tmp = relativeLocations(_GUIs[str(_aux['topGUI'])].pythonFile) root = module = tmp['root'] rootName = moduleName = tmp['rootName'] tmp = parseLocation(rootName) OMFITlocationName = [buildLocation(tmp[:k + 1]) for k, item in enumerate(tmp)] OMFITmodulesName = [] for tmpName in OMFITlocationName: if eval(tmpName).__class__ is OMFITmodule and tmpName != 'OMFIT': OMFITmodulesName.append(tmpName) OMFITmodules = list(map(eval, OMFITmodulesName)) MainSettings = OMFIT['MainSettings'] # point to root['SETTINGS']['EXPERIMENT'][xx] if root['SETTINGS']['EXPERIMENT'][xx] is not an expression # else point to MainSettings['EXPERIMENT'][xx] linked = 'linked' for item in ['device', 'shot', 'time', 'runid', 'shots', 'times']: if item in root['SETTINGS']['EXPERIMENT']: if not isinstance(root['SETTINGS']['EXPERIMENT'][item], OMFITexpression): linked = 'unlinked' break # point to root['SETTINGS']['EXPERIMENT'][xx] # if root['SETTINGS']['EXPERIMENT'][xx] is not an expression # else point to MainSettings['EXPERIMENT'][xx] exp = {} for item in ['device', 'shot', 'time', 'runid', 'shots', 'times']: if item in root['SETTINGS']['EXPERIMENT']: exp[item] = root['SETTINGS']['EXPERIMENT'][item] exp['%s_location' % item] = "root['SETTINGS']['EXPERIMENT']" for moduleName in reversed(OMFITmodulesName): module = eval(moduleName) if not isinstance(module['SETTINGS']['EXPERIMENT'][item], OMFITexpression): exp[item] = module['SETTINGS']['EXPERIMENT'][item] exp['%s_location' % item] = moduleName + "['SETTINGS']['EXPERIMENT']" break if not is_none(MainSettings['EXPERIMENT']['shots']) and not numpy.iterable(MainSettings['EXPERIMENT']['shots']): MainSettings['EXPERIMENT']['shots'] = numpy.atleast_1d(MainSettings['EXPERIMENT']['shots']) if not is_none(eval(exp['shots_location'])['shots']) and not isinstance(eval(exp['shots_location'])['shots'], numpy.ndarray): exec (exp['shots_location'] + "['shots']=numpy.atleast_1d(" + exp['shots_location'] + "['shots'])", globals(), locals()) if not is_none(MainSettings['EXPERIMENT']['times']) and not numpy.iterable(MainSettings['EXPERIMENT']['times']): MainSettings['EXPERIMENT']['times'] = numpy.atleast_1d(MainSettings['EXPERIMENT']['times']) if not is_none(eval(exp['shots_location'])['times']) and not isinstance(eval(exp['times_location'])['times'], numpy.ndarray): exec (exp['times_location'] + "['times']=numpy.atleast_1d(" + exp['times_location'] + "['times'])", globals(), locals()) def postcommandArray(location=None): exec (location + "=numpy.unique(numpy.atleast_1d(" + location + "))", globals(), locals()) return postcommand_mainsettings(location) def postcommand_mainsettings(location): MainSettings['EXPERIMENT'][parseLocation(location)[-1]] = evalExpr(eval(location)) if postcommand is not None: return postcommand(location) oldParent = _aux['parentGUI'] frm = ttk.Frame(oldParent) frm.pack(side=tk.TOP, expand=tk.NO, fill=tk.X) _aux['parentGUI'] = ttk.Frame(frm) _aux['parentGUI'].pack(side=tk.LEFT, expand=tk.YES, fill=tk.X) set_values = {} if numpy.iterable(showDevice) or showDevice: def checkDeviceFunction(inv): if checkDevice is None: return is_string(inv) else: return is_string(inv) and checkDevice(inv) if numpy.iterable(showDevice): deviceList = tolist(showDevice) else: deviceList = list(OMFIT['shotBookmarks'].keys()) deviceList = list(numpy.unique(deviceList)) ComboBox(exp['device_location'] + "['device']", deviceList, 'Device', state=tk.NORMAL, updateGUI=updateGUI, postcommand=postcommand_mainsettings, check=checkDeviceFunction) set_values['device'] = eval(exp['device_location'] + "['device']") if numpy.iterable(showShot) or showShot: if not multiShots: if numpy.iterable(showShot): shotList = list(numpy.unique(tolist(showShot))) else: try: shotList = list(map(int, sorted(OMFIT['shotBookmarks'][device].keys()))) except Exception: shotList = [] def checkShotFunction(inv): if checkShot is None: return is_int(inv) else: return is_int(inv) and checkShot(inv) ComboBox(exp['shot_location'] + "['shot']", shotList, 'Shot', state=tk.NORMAL, updateGUI=updateGUI, postcommand=postcommand_mainsettings, check=checkShotFunction) set_values['shot'] = eval(exp['shot_location'] + "['shot']") else: def checkShotsFunction(inv): if checkShot is None: return is_int_array(inv) else: return is_int_array(inv) and checkShot(inv) Entry(exp['shots_location'] + "['shots']", 'Shots', updateGUI=updateGUI, postcommand=postcommandArray, check=checkShotsFunction, multiline=True) set_values['shots'] = eval(exp['shots_location'] + "['shots']") if numpy.iterable(showTime) or showTime: if multiTimes: def checkTimesFunction(inv): if subMillisecondTime: if checkTime is None: valid = is_array(inv) else: valid = is_array(inv) and checkTime(inv) else: if checkTime is None: valid = is_int_array(inv) else: valid = is_int_array(inv) and checkTime(inv) if valid: return numpy.all(numpy.diff(inv) > 0) Entry(exp['times_location'] + "['times']", 'Times' + ' [ms]', updateGUI=updateGUI, postcommand=postcommandArray, check=checkTimesFunction, multiline=True, help='Enter list of monotonically increasing times for analysis.\nAccepts python statements.\n' 'Example: Enter\n arange(start,stop,step)\nfor a uniform time base.\n\nIt may be useful to start with a smaller ' 'subset of times to get a feel for what is needed, then change to the full set of desired times.') set_values['times'] = eval(exp['times_location'] + "['times']") if (multiTimes and showSingleTime) or not multiTimes: if numpy.iterable(showTime): timeList = list(numpy.unique(tolist(showTime))) else: try: timeList = list(map(int, sorted(OMFIT['shotBookmarks'][device][str(shot)].keys()))) except Exception: timeList = [] def checkTimeFunction(inv): if subMillisecondTime: if checkTime is None: return is_numeric(inv) else: return is_numeric(inv) and checkTime(inv) else: if checkTime is None: return is_int(inv) else: return is_int(inv) and checkTime(inv) ComboBox(exp['time_location'] + "['time']", timeList, 'Time' + ' [ms]', state=tk.NORMAL, updateGUI=updateGUI, postcommand=postcommand_mainsettings, check=checkTimeFunction) set_values['time'] = eval(exp['time_location'] + "['time']") if showRunID: Entry(exp['runid_location'] + "['runid']", 'run-ID', updateGUI=updateGUI, postcommand=postcommand_mainsettings, check=is_alphanumeric) def addBookmarks(device, shot, time, description=''): if device != None: OMFIT['shotBookmarks'].setdefault(device, namelist.NamelistName()) if device != None and shot != None: OMFIT['shotBookmarks'][device].setdefault(str(shot), namelist.NamelistName()) if device != None and shot != None and time != None: if not description and str(time) in OMFIT['shotBookmarks'][device][str(shot)]: del OMFIT['shotBookmarks'][device][str(shot)][str(time)] printi('Deleted bookmark for %s #%s @ %s' % (device, str(shot), str(time))) elif description: OMFIT['shotBookmarks'][device][str(shot)][str(time)] = description else: printi('Bookmarks: no time') else: printi('Bookmarks: no shot') else: printi('Bookmarks: no device') OMFIT['shotBookmarks'].save() def addBookmarksWithDescription(device, shot, time): def onEscape(location=None, event=None): top.destroy() top = tk.Toplevel(_topGUI(_aux['parentGUI'])) top.withdraw() top.transient(_aux['parentGUI']) top.wm_title('Store shot bookmark with description') oldParent = _aux['parentGUI'] _aux['parentGUI'] = top OMFIT['shotBookmarks'].load() try: OMFIT['scratch']['bookmarksDescription'] = OMFIT['shotBookmarks'][device][str(shot)][str(time)] except Exception: description = '' e = Entry("OMFIT['scratch']['bookmarksDescription']", "Shot description", postcommand=onEscape, default='') e.configure(width=30) _aux['parentGUI'] = oldParent top.bind('<Escape>', onEscape) top.deiconify() top.wait_window(top) addBookmarks(device, shot, time, description=OMFIT['scratch']['bookmarksDescription']) del OMFIT['scratch']['bookmarksDescription'] OMFITaux['rootGUI'].event_generate("<<update_GUI>>") def linkUnlinkSettings(linked): if linked == 'linked': printi(rootName + "['SETTINGS']['EXPERIMENT'] is now set only for this module") for item in ['device', 'shot', 'time', 'shots', 'times', 'runid']: root['SETTINGS']['EXPERIMENT'][item] = evalExpr(root['SETTINGS']['EXPERIMENT'][item]) else: printi(rootName + "['SETTINGS']['EXPERIMENT'] inherits from the parent module or MainSettings") for item in ['device', 'shot', 'time', 'shots', 'times', 'runid']: root['SETTINGS']['EXPERIMENT'][item] = OMFITexpression("""try: return_variable=OMFITmodules[-2]['SETTINGS']['EXPERIMENT']['%s'] except Exception: return_variable=MainSettings['EXPERIMENT']['%s'] """ % (item, item)) OMFITaux['rootGUI'].event_generate("<<update_GUI>>") OMFITaux['rootGUI'].event_generate("<<update_treeGUI>>") frm1 = ttk.Frame(frm) frm1.pack(side=tk.LEFT, expand=tk.NO, fill=tk.BOTH) im = tk.PhotoImage(master=frm1, file=os.path.join(OMFITsrc, 'extras', 'graphics', linked + '.ppm')) b = ttk.Button(master=frm1, text=linked, image=im, command=lambda linked=linked: linkUnlinkSettings(linked), takefocus=False) b._ntimage = im b.pack(side=[tk.LEFT, tk.TOP][int(numpy.sum([numpy.iterable(showShot) or showShot, numpy.iterable(showTime) or showTime, numpy.iterable(showDevice) or showDevice, showRunID]) > 2)], expand=tk.NO, fill=tk.NONE) im = tk.PhotoImage(master=frm1, file=os.path.join(OMFITsrc, 'extras', 'graphics', 'bookmark.ppm')) b = ttk.Button(master=frm1, text='bookmark', image=im, command=lambda device=exp['device'], shot=exp['shot'], time=exp['time']: addBookmarksWithDescription(exp['device'], exp['shot'], exp['time']), takefocus=False) b._ntimage = im b.pack(side=[tk.LEFT, tk.TOP][int(numpy.sum([numpy.iterable(showShot) or showShot, numpy.iterable(showTime) or showTime, numpy.iterable(showDevice) or showDevice, showRunID]) > 2)], expand=tk.NO, fill=tk.NONE) _aux['parentGUI'] = oldParent if stopIfNotSet: if numpy.any([not len(list([_f for _f in tolist(k) if _f])) for k in list(set_values.values())]): Label("Device, shot(s) and time(s) entries can not be None!", foreground='red') End() # set data to be harvested set_values['module'] = module['SETTINGS']['MODULE']['ID'] _aux['harvest'].setdefault(moduleName, {}) _aux['harvest'][moduleName] = set_values
[docs]@_available_to_userGUI def CloseGUI(): """ Function for closing the active user GUI """ _clearClosedGUI(_aux['topGUI']) raise EndOMFITpython()
[docs]@_available_to_userGUI def End(what='single'): ''' End execution of OMFITpython script :param what: * 'single' terminates the running script * 'all' terminates the whole workflow ''' if _aux['topGUI'] is not None and not len(OMFITaux['prun_process']): _aux['topGUI'].update_idletasks() if str(what).lower()=='all': raise EndAllOMFITpython() else: raise EndOMFITpython()
[docs]@_available_to_userGUI def Open(object): ''' Open OMFITascii object in editor or OMFITweb in browser File extension behaviour can be specified in OMFIT['MainSettings']['SETUP']['EXTENSIONS'] :param object: OMFIT object or filename to be opened in external editor ''' OMFITaux['GUI'].openFile(thisObject=object)
[docs]@_available_to_userGUI def Figure(toolbar=True, returnFigure=False, fillX=False, **kw): r""" Embed a matplotlib figure in an OMFIT GUI :param toolbar: [True] show/hide the figure toolbar :param returnFigure: [False] function returns figure `f` or axis `f.add_subplot(111)` :param fillX: [False] fill X dimension of screen :param figsize: (5*2./3., 4*2./3.) figure size :param \**kw: keyword arguments passed to pyplot.Figure """ if fillX: fillX=X else: fillX=tk.NONE frm_top=ttk.Frame(_aux['parentGUI']) frm_top.pack(side=_aux['packing'], fill=tk.X, expand=[tk.YES, tk.NO][_aux['packing'] == tk.TOP]) kw.setdefault('figsize',(5*2./3., 4*2./3.)) f=pyplot.Figure(**kw) canvas = FigureCanvasTkAgg(f, master=frm_top) canvas.get_tk_widget().pack(side=tk.TOP, fill=fillX, expand=tk.YES) if toolbar: toolbar = NavigationToolbar2(canvas, frm_top) toolbar.update_idletasks() canvas._tkcanvas.pack(side=tk.TOP, fill=fillX, expand=tk.YES) if returnFigure: return f else: return f.add_subplot(111)
[docs]@_available_to_userGUI def Dialog(*args,**kw): """ Display a dialog box and wait for user input :param message: the text to be written in the label :param answers: list of possible answers :param icon: "question", "info", "warning", "error" :param title: title of the frame :param options: dictionary of True/False options that are displayed as checkbuttons in the dialog :param entries: dictionary of string options that are displayed as entries in the dialog :return: return the answer chosen by the user (a dictionary if options keyword was passed) """ kw.setdefault('parent', _aux['parentGUI']) return dialog(*args,**kw)
def _setupModule(prefGUI,extraSettings=None): root=relativeLocations(_GUIs[str(_aux['topGUI'])].pythonFile)['root'] if not _aux['is_compoundGUI']: root['__scratch__'].setdefault('showSettings',False) root['__scratch__'].setdefault('showStorage',False) def showTools(): try: bckp_parent=_aux['parentGUI'] _aux['parentGUI']=prefGUI _clearKids(_aux['parentGUI']) frm=ttk.Frame(_aux['parentGUI']) frm.pack(side=tk.TOP,expand=tk.NO,fill=tk.X) orient=tk.TOP if root['__scratch__']['showStorage'] or root['__scratch__']['showSettings']: orient=tk.LEFT def showSettings(): root['__scratch__']['showStorage'] = False root['__scratch__']['showSettings'] = not root['__scratch__']['showSettings'] showTools() frm1 = ttk.Frame(frm) frm1.pack(side=tk.TOP,expand=tk.NO,fill=tk.X) im = tk.PhotoImage(master=frm1, file=os.path.join(OMFITsrc, 'extras', 'graphics', 'settings.ppm')) b = ttk.Button(master=frm1, text='settings', image=im, command=showSettings, takefocus=False) b._ntimage = im b.pack(side=orient,expand=tk.NO,fill=tk.NONE) def showStorage(): root['__scratch__']['showSettings'] = False root['__scratch__']['showStorage'] = not root['__scratch__']['showStorage'] showTools() im = tk.PhotoImage(master=frm1, file=os.path.join(OMFITsrc, 'extras', 'graphics', 'storage.ppm')) b = ttk.Button(master=frm1, text='storage', image=im, command=showStorage, takefocus=False) b._ntimage = im b.pack(side=orient,expand=tk.NO,fill=tk.NONE) if root['__scratch__']['showStorage'] or root['__scratch__']['showSettings']: ttk.Separator(frm).pack(side=tk.TOP,expand=tk.NO,fill=tk.X,padx=5,pady=2) if root['__scratch__']['showSettings']: Label('SETTINGS: '+treeLocation(root)[-1],font=OMFITfont('bold',-2)) ComboBox("root['SETTINGS']['REMOTE_SETUP']['serverPicker']",list(SERVER.listServers().keys()),'server',updateGUI=True) try: location="OMFIT['MainSettings']['SERVER']['%s']"%SERVER(root['SETTINGS']['REMOTE_SETUP']['serverPicker']) except Exception: Label('Invalid server %s'%str(root['SETTINGS']['REMOTE_SETUP']['serverPicker'])) for item in list(eval(location).keys()): Entry(location+"["+repr(item)+"]",item[0].upper()+item[1:],updateGUI=True) if extraSettings: Separator() extraSettings() if root['__scratch__']['showStorage']: Label('STORAGE: '+treeLocation(root)[-1],font=OMFITfont('bold',-2)) def next_runid(): #find the run_id with the maximum number index0=0 for k in root['__STORAGE__'].keys(filter=hide_ptrn): index=int(re.findall('[0-9]+$',k)[-1]) if index>index0: root['SETTINGS']['EXPERIMENT']['runid']=k #go to the next available runid while root['SETTINGS']['EXPERIMENT']['runid'] in root['__STORAGE__'].keys(filter=hide_ptrn): index=re.findall('[0-9]+$',root['SETTINGS']['EXPERIMENT']['runid']) if len(index): root['SETTINGS']['EXPERIMENT']['runid'] =re.sub(index[-1]+'$',str(int(index[-1])+1),root['SETTINGS']['EXPERIMENT']['runid']) else: root['SETTINGS']['EXPERIMENT']['runid']+='1' root['SETTINGS']['EXPERIMENT']['comment']='' def reload_comment(location=None): if root['SETTINGS']['EXPERIMENT']['runid'] in root['__STORAGE__'].keys(filter=hide_ptrn): root['SETTINGS']['EXPERIMENT']['comment']=root['__STORAGE__'][root['SETTINGS']['EXPERIMENT']['runid']]['comment'] #handle runids if 'runid' not in root['SETTINGS']['EXPERIMENT'] or root['SETTINGS']['EXPERIMENT']['runid'] == None: root['SETTINGS']['EXPERIMENT']['runid']='sim1' if '__STORAGE__' in root and root['SETTINGS']['EXPERIMENT']['runid'] in root['__STORAGE__'].keys(filter=hide_ptrn): Button('New run-ID',next_runid,updateGUI=True) runs=SortedDict() if '__STORAGE__' in root: for k in root['__STORAGE__'].keys(filter=hide_ptrn): if 'comment' in root['__STORAGE__'][k]: runs[k+': '+root['__STORAGE__'][k]['comment']]=k ComboBox("root['SETTINGS']['EXPERIMENT']['runid']",runs,'Run-ID',state='readonly',check=is_string,default='',updateGUI=True,postcommand=reload_comment) Separator() Separator() #handle operations state='normal' if 'comment' not in root['SETTINGS']['EXPERIMENT'] or not root['SETTINGS']['EXPERIMENT']['comment']: state='disabled' Entry("root['SETTINGS']['EXPERIMENT']['comment']",'Comment',check=is_string,default='',updateGUI=True) if '__STORAGE__' not in root or root['SETTINGS']['EXPERIMENT']['runid'] not in root['__STORAGE__'].keys(filter=hide_ptrn): Button('Store %s (new)'%root['SETTINGS']['EXPERIMENT']['runid'],root.store,state=state,updateGUI=True) else: Button('Store %s (update)'%root['SETTINGS']['EXPERIMENT']['runid'],root.store,state=state,updateGUI=True) Separator() CheckButton("root['__STORAGE__']['__restoreScripts__']",'Restore scripts',default=False) Button('Restore %s'%root['SETTINGS']['EXPERIMENT']['runid'],root.restore,updateGUI=True) Separator() Button('Delete %s'%root['SETTINGS']['EXPERIMENT']['runid'],root.destore,updateGUI=True) except Exception: raise finally: _aux['parentGUI']=bckp_parent OMFITaux['rootGUI'].event_generate("<<update_treeGUI>>") showTools()
[docs]def clc(tag=None): ''' clear console (possible tags are) INFO : forest green HIST : dark slate gray WARNING : DarkOrange2 HELP : PaleGreen4 STDERR : red3 STDOUT : black DEBUG : gold4 PROGRAM_OUT : blue PROGRAM_ERR : purple :param tag: specific tag to clear ''' from classes.utils_base import _Streams if not tag: OMFITaux['console'].clear() elif tag.upper() in _Streams.tags: OMFITaux['console'].clear(tag) if tag=='STDERR': for k in range(10): OMFITaux['console'].clear('STDERR'+str(k)) else: printe('clc: console tag `%s` is not recognized. Valid options are:'%tag) for tag in _Streams.tags: printe(' %s: %s'%(tag.ljust(12),_Streams.tags[tag]))
[docs]@_available_to_userGUI def EditASCIIobject(location, lbl=None, comment=None, updateGUI=False, help='', postcommand=None, url='', **kw): """ This method creates a GUI element that edits ASCII files in the OMFIT tree Sample usage:: OMFITx.EditASCIIobject("root['INPUTS']['TRANSP']", 'edit namelist', postcommand=lambda location:eval(location).load()) :param location: location of the ASCII OMFITobject in the OMFIT tree (notice that this is a string) :param lbl: Label which is put on the left of the entry :param comment: A comment which appears on top of the entry :param updateGUI: Force a re-evaluation of the GUI script when this parameter is changed :param help: help provided when user right-clicks on GUI element (adds GUI button) :param postcommand: command to be executed after the value in the tree is updated. This command will receive the OMFIT location string as an input :param url: open url in web-browser (adds GUI button) :return: associated ttk.Button object """ location=_absLocation(location) postcommand_location=location scratchLocation=_absLocation("scratch['%s']"%('editASCII_'+omfit_hash(location,10))) try: filename=eval(postcommand_location).filename except Exception: filename='' frm_top=ttk.Frame(_aux['parentGUI']) frm_top.pack(side=_aux['packing'],expand=[tk.YES, tk.NO][_aux['packing'] == tk.TOP],fill=tk.X,padx=5,pady=1) if comment is not None: frm=ttk.Frame(frm_top) frm.pack(side=tk.TOP,expand=tk.NO,fill=tk.X) _Label(frm,text=comment).pack(side=tk.LEFT) frm=ttk.Frame(frm_top) frm.pack(side=tk.TOP,expand=tk.NO,fill=tk.X) if lbl is None: lbl='Edit '+location def onDone(location): with open(filename,'w') as f: f.write(eval(scratchLocation)) del eval(buildLocation(parseLocation(scratchLocation)[:-1]))[parseLocation(scratchLocation)[-1]] if postcommand is not None: postcommand(postcommand_location) def showText(event=None): with open(filename,'r') as f: eval(buildLocation(parseLocation(scratchLocation)[:-1]))[parseLocation(scratchLocation)[-1]]=f.read() _Text(parent=frm, location=scratchLocation, lbl=lbl, updateGUI=updateGUI, help=help, preentry=None, postcommand=onDone, percolator=filename.endswith('.py'), **kw) bttn = ttk.Button(frm, text=lbl, command=showText, takefocus=False).pack(side=tk.LEFT,expand=tk.NO,fill=tk.X,padx=5,pady=1) if len(filename) == 0: bttn.state(['disabled']) return bttn
#--------------------------- # file browsing #---------------------------
[docs]class FileDialog(object): """ Standard remote file selection dialog -- no checks on selected file. :param directory: directory where to start browsing :param serverPicker: serverPicker wins over server/tunnel settings serverpicker=None will reuse latest server/tunnel that the user browsed to :param server: server :param tunnel: tunnel :param pattern: glob regular expression for files selection :param default: default filename selection :param master: Tkinter master GUI :param lockServer: allow users to change server settings :param focus: what to focus in the GUI ('filterDirs','filterFiles') :param favorite_list_location: OMFIT tree location which contains a possibly empty list of favorite file directories. To keep with the general omfit approach this should be a string. :param pattern_list_location: OMFIT tree location which contains a possibly empty list of favorite search patterns. To keep with the general omfit approach this should be a string. """ def __init__(self, directory=None, serverPicker='', server='localhost', tunnel='', pattern='*', default='', master=None, lockServer=False, focus='filterDirs', favorite_list_location=None, pattern_list_location=None): if master is None: master=OMFITaux['rootGUI'] self.master = master if serverPicker is None: server=OMFITaux['lastBrowsed'].setdefault('__lastServer__',server) tunnel=OMFITaux['lastBrowsed'].setdefault('__lastTunnel__',tunnel) if serverPicker: server=SERVER[serverPicker]['server'] tunnel=SERVER[serverPicker]['tunnel'] if not server: serverPicker='localhost' server='localhost' tunnel='' self.favorite_list = None if favorite_list_location is not None: # This should work correctly - the self.favorite_list is # set to the variable in the tree and as a result any updates # to this are reflected in the tree entry since lists are # modified in place. #print("favorite_list_location:",favorite_list_location) favorite_list_location = _absLocation(favorite_list_location) try: self.favorite_list=eval(favorite_list_location) except Exception as _excp: printe(_excp) # if there is a problem with evaluation turn off the option print("favorite_list_Location could not be evaluated: %s"%favorite_list_location) favorite_list_location = None else: # if a list is not passed in then turn this off if not isinstance(self.favorite_list,list): self.favorite_list = None favorite_list_location=None self.pattern_list = None if pattern_list_location is not None: # This should work correctly - the self.favorite_list is # set to the variable in the tree and as a result any updates # to this are reflected in the tree entry since lists are # modified in place. #print("pattern_list_location:",pattern_list_location) pattern_list_location = _absLocation(pattern_list_location) try: self.pattern_list=eval(pattern_list_location) except Exception as _excp: printe(_excp) # if there is a problem with evaluation turn off the option print("pattern_list_Location could not be evaluated: %s"%pattern_list_location) pattern_list_location = None else: # if a list is not passed in then turn this off if not isinstance(self.pattern_list,list): self.pattern_list = None pattern_list_location=None self.server0,self.tunnel0=server,tunnel self.pattern=pattern self.default=default # set the top level window self.top = tk.Toplevel(master) self.top.withdraw() self.top.transient(master) def tab_complete(event=None): def complete(what,options): options=list(map(os.path.abspath,options)) what=os.path.abspath(what) possible_options=[item+os.sep for item in options if item.startswith(what)] return os.path.commonprefix(possible_options),possible_options filter_get=self.get_filter()[0] selection_get=self.selection.get() if os.path.abspath(os.path.split(filter_get)[0])!=os.path.abspath(selection_get) and os.path.abspath(filter_get)!=os.path.abspath(selection_get): self.filter_command(dir=os.path.abspath(os.path.split(filter_get)[0])) completed=complete(filter_get,[os.path.split(filter_get)[0]+os.sep+item for item in self.dirs.get(0, tk.END)]) if completed[0]: self.filterDirs.delete(0, tk.END) self.filterDirs.insert(tk.END, re.sub('//','/',completed[0])) if len(completed[1])==1 and completed[0].endswith('/'): self.filter_command() self.filterDirs.xview(tk.END) return 'break' def fill_server_tunnel(event=None): tmp=self.comboBox.get().split(' -- ')[0] serverTK.set(str(SERVER[tmp]['server'])) tunnelTK.set(str(SERVER[tmp].get('tunnel',''))) set_server_tunnel() def set_server_tunnel(event=None): serverTK.set(serverTK.get().strip()) try: serverTK.set(SERVER[serverTK.get().strip()]['server']) tunnelTK.set(SERVER[serverTK.get().strip()]['tunnel']) except Exception: tunnelTK.set(tunnelTK.get().strip()) self.server0=serverTK.get() self.tunnel0=tunnelTK.get() self.go(directory=None) # Set server string serverTK=tk.StringVar() serverTK.set(self.server0) # Set tunnel string tunnelTK=tk.StringVar() tunnelTK.set(self.tunnel0) # set frame from top level window frm=ttk.Frame(self.top) frm.pack(side=tk.TOP,expand=tk.NO,fill=tk.X) # add another frame for the server and tunnel information frm1=ttk.Frame(frm) ttk.Label(frm1,text='Fill server/tunnel from: ').pack(side=tk.LEFT) tmp=list(SERVER.listServers().values()) # convert everything to lower case - could be a problem if folks use case to distinguish servers tmp.sort(key=lambda x:x.lower()) tmp=[tmp.pop(tmp.index(SERVER.listServers()['localhost']))]+tmp # setup a comboBox for choosing the server based on the list in mainSettings - SERVER is a global self.comboBox = ttk.Combobox(frm1, state='readonly', values=tmp) self.comboBox.pack(side=tk.LEFT,expand=tk.YES,fill=tk.X) self.comboBox.bind('<<ComboboxSelected>>', func=fill_server_tunnel) frm1.pack(side=tk.TOP,expand=tk.YES,fill=tk.X) # add aonther frame for the server label and entry frm1=ttk.Frame(frm) ttk.Label(frm1,text='On server: ').pack(side=tk.LEFT) serverTKGUI=ttk.Entry(frm1,textvariable=serverTK) serverTKGUI.pack(side=tk.LEFT,expand=tk.YES,fill=tk.X) serverTKGUI.bind('<Return>',set_server_tunnel) frm1.pack(side=tk.TOP,expand=tk.YES,fill=tk.X) # add another frame for the tunnel label and entry frm1=ttk.Frame(frm) ttk.Label(frm1,text='Via tunnel: ').pack(side=tk.LEFT) tunnelTKGUI=ttk.Entry(frm1,textvariable=tunnelTK) tunnelTKGUI.pack(side=tk.LEFT,expand=tk.YES,fill=tk.X) tunnelTKGUI.bind('<Return>',set_server_tunnel) frm1.pack(side=tk.TOP,expand=tk.YES,fill=tk.X) # if server is locked then turn off selection functionality if lockServer: self.comboBox.configure(state='disabled') serverTKGUI.configure(state='disabled') tunnelTKGUI.configure(state='disabled') # add a separator to the main frame ttk.Separator(frm).pack(side=tk.TOP,expand=tk.NO,fill=tk.X,padx=5,pady=2) # add the bottom frame showing the file selection - why is this here? self.botframe = ttk.Frame(self.top) self.botframe.pack(side=tk.BOTTOM, fill=tk.X) # Add another frame to the main window frm=ttk.Frame(self.top) ttk.Separator(frm).pack(side=tk.TOP,expand=tk.NO,fill=tk.X,padx=5,pady=2) frm.pack(side=tk.BOTTOM, fill=tk.X) # add label and entry to show file selection at the bottom ttk.Label(frm,text='Selection: ').pack(side=tk.LEFT) self.selection = ttk.Entry(frm) self.selection.pack(side=tk.LEFT, fill=tk.X, expand=tk.YES) self.selection.bind('<Return>', self.ok_event) # create a middle frame self.midframe = ttk.Frame(self.top) self.midframe.pack(expand=tk.YES, fill=tk.BOTH) # create a scroll bar in the middle frame to scroll file names self.filesbar = ttk.Scrollbar(self.midframe) self.filesbar.pack(side=tk.RIGHT, fill=tk.Y) # create another frame as part of midframe to hold file filter and files frm=ttk.Frame(self.midframe) frm.pack(side=tk.RIGHT, fill=tk.BOTH, expand=tk.YES) # branch on whether pattern_list favorites are asked for if pattern_list_location is None: # add a file filter self.filterFiles = ttk.Entry(frm) self.filterFiles.pack(side=tk.TOP, fill=tk.X) self.filterFiles.bind('<Return>', self.filter_command) # add the files list box self.files = tk.Listbox(frm, exportselection=0, yscrollcommand=(self.filesbar, 'set')) self.files.pack(side=tk.RIGHT, expand=tk.YES, fill=tk.BOTH) # not sure what the first lines are doing - the rest bind event handlers btags = self.files.bindtags() self.files.bindtags(btags[1:] + btags[:1]) self.files.bind('<ButtonRelease-1>', self.files_select_event) self.files.bind('<Double-ButtonRelease-1>', self.files_double_event) self.files.bind('<Return>', self.files_double_event) # connect the files scrollbar self.filesbar.config(command=(self.files, 'yview')) else: # add extra frames to manage pattern list buttons # These are together because both frma and frmb are in frm # create frame for file filter and filter list buttons # expand is off for frma so it does not expand to file half the space frma=ttk.Frame(frm) frma.pack(side=tk.TOP,fill=tk.X) # create frame for file list frmb=ttk.Frame(frm) frmb.pack(side=tk.BOTTOM,fill=tk.BOTH,expand=tk.YES) # setup a comboBox for choosing the file filter pattern from favorites - add to frame a self.filterFiles = ttk.Combobox(frma, state='normal', values=self.pattern_list) self.filterFiles.pack(side=tk.LEFT,expand=tk.YES,fill=tk.BOTH) self.filterFiles.bind('<<ComboboxSelected>>', self.filter_command) self.filterFiles.bind('<Return>', self.filter_command) # Add add_to_favorite_patterns button self.add_to_favorite_patterns = ttk.Button(frma,text="+", command= lambda: self.manage_list(self.pattern_list,self.filterFiles,'+')) self.add_to_favorite_patterns.pack(side=tk.LEFT) # Add remove_from_favorite_patterns button self.remove_from_favorite_patterns = ttk.Button(frma,text="-",command= lambda: self.manage_list(self.pattern_list,self.filterFiles,'-')) self.remove_from_favorite_patterns.pack(side=tk.LEFT) # add the files list box in frame b self.files = tk.Listbox(frmb, exportselection=0, yscrollcommand=(self.filesbar, 'set')) self.files.pack(side=tk.RIGHT, expand=tk.YES, fill=tk.BOTH) # not sure what the first lines are doing - the rest bind event handlers btags = self.files.bindtags() self.files.bindtags(btags[1:] + btags[:1]) self.files.bind('<ButtonRelease-1>', self.files_select_event) self.files.bind('<Double-ButtonRelease-1>', self.files_double_event) self.files.bind('<Return>', self.files_double_event) # connect the files scrollbar self.filesbar.config(command=(self.files, 'yview')) # add directory list scroll bar self.dirsbar = ttk.Scrollbar(self.midframe) self.dirsbar.pack(side=tk.LEFT, fill=tk.Y) # add another frame for the directory information frm=ttk.Frame(self.midframe) frm.pack(side=tk.RIGHT, fill=tk.BOTH, expand=tk.YES) # branch on whether favorite list is requested for directories if favorite_list_location is None: # add the directory filter self.filterDirs = ttk.Entry(frm) self.filterDirs.pack(side=tk.TOP, fill=tk.X) self.filterDirs.bind('<Return>', self.filter_command) self.filterDirs.bind('<Tab>', tab_complete) self.filterDirs.bind('<Shift-Tab>', self.dirs_back_event) try: self.filterDirs.bind('<ISO_Left_Tab>', self.dirs_back_event) except tk.TclError: pass # add the directory list self.dirs = tk.Listbox(frm, exportselection=0, yscrollcommand=(self.dirsbar, 'set')) self.dirs.pack(side=tk.BOTTOM, expand=tk.YES, fill=tk.BOTH) # bind events for directory selection btags = self.dirs.bindtags() self.dirs.bindtags(btags[1:] + btags[:1]) self.dirs.bind('<ButtonRelease-1>', self.dirs_select_event) self.dirs.bind('<Double-ButtonRelease-1>', self.dirs_double_event) self.dirs.bind('<Return>', self.dirs_double_event) self.dirs.bind('<BackSpace>', self.dirs_back_event) # connect the directory scrollbar self.dirsbar.config(command=(self.dirs, 'yview')) else: # if there is a favorite list add some frames # create frame for directory filter and directory filter buttons frma=ttk.Frame(frm) frma.pack(side=tk.TOP,fill=tk.X) # create frame for list of directories frmb=ttk.Frame(frm) frmb.pack(side=tk.BOTTOM,fill=tk.BOTH,expand=tk.YES) # setup a comboBox for choosing the directory filter pattern from favorites - add to frame a self.filterDirs = ttk.Combobox(frma, state='normal', values=self.favorite_list) self.filterDirs.pack(side=tk.LEFT,expand=tk.YES,fill=tk.BOTH) self.filterDirs.bind('<<ComboboxSelected>>', self.filter_command) self.filterDirs.bind('<Return>', self.filter_command) self.filterDirs.bind('<Tab>', tab_complete) self.filterDirs.bind('<Shift-Tab>', self.dirs_back_event) try: self.filterDirs.bind('<ISO_Left_Tab>', self.dirs_back_event) except tk.TclError: pass # Add add_to_favorite_dirs button self.add_to_favorite_dirs = ttk.Button(frma,text="+",command = lambda: self.manage_list(self.favorite_list,self.filterDirs,'+')) self.add_to_favorite_dirs.pack(side=tk.LEFT) # Add remove_from_favorite_dirs button self.remove_from_favorite_dirs = ttk.Button(frma,text="-",command= lambda: self.manage_list(self.favorite_list,self.filterDirs,'-')) self.remove_from_favorite_dirs.pack(side=tk.LEFT) # add the directory list self.dirs = tk.Listbox(frmb, exportselection=0, yscrollcommand=(self.dirsbar, 'set')) self.dirs.pack(side=tk.BOTTOM, expand=tk.YES, fill=tk.BOTH) # bind events for directory selection btags = self.dirs.bindtags() self.dirs.bindtags(btags[1:] + btags[:1]) self.dirs.bind('<ButtonRelease-1>', self.dirs_select_event) self.dirs.bind('<Double-ButtonRelease-1>', self.dirs_double_event) self.dirs.bind('<Return>', self.dirs_double_event) self.dirs.bind('<BackSpace>', self.dirs_back_event) # connect the directory scrollbar self.dirsbar.config(command=(self.dirs, 'yview')) # Add ok button to the bottom of the frame self.ok_button = ttk.Button(self.botframe, text="Ok", command=self.ok_command) self.ok_button.pack(side=tk.LEFT,expand=tk.YES,fill=tk.X) # Add the cancel button to the bottom of the frame self.cancel_button = ttk.Button(self.botframe, text="Cancel", command=self.cancel_command) self.cancel_button.pack(side=tk.RIGHT,expand=tk.YES,fill=tk.X) self.top.protocol('WM_DELETE_WINDOW', self.cancel_command) self.top.bind('<Escape>', self.cancel_command) tk_center(self.top,self.master,800,600) self.top.deiconify() self.top.update_idletasks() self.go(directory) if focus=='filterFiles': self.filterFiles.focus_set() self.filterFiles.icursor(0) elif focus=='filterDirs': self.filterDirs.focus_set() self.top.wait_window(self.top) self.top.destroy()
[docs] def go(self, directory=None): if directory is None: if self.server0+'-'+self.tunnel0 in OMFITaux['lastBrowsed']: directory=OMFITaux['lastBrowsed'][self.server0+'-'+self.tunnel0] else: directory='' try: serverPicker=SERVER(SERVER[self.server0]['server']) self.comboBox.set(serverPicker+' -- '+self.server0) except KeyError: pass self.curdir=directory self.directory = directory self.set_filter(self.directory, self.pattern) self.set_selection(self.default) self.how = None self.filter_command() self.filterDirs.xview(tk.END)
[docs] def quit(self, how=None): if how is None: self.how=None else: self.how = how,self.server0,self.tunnel0 self.top.destroy()
[docs] def dirs_double_event(self, event): dir, pat = self.get_filter() subdir = self.dirs.get('active') dir = os.path.normpath(os.path.join(dir, subdir)) self.set_filter(dir, pat) self.filter_command()
[docs] def dirs_back_event(self, event): dir, pat = self.get_filter() dir = os.path.normpath(os.path.join(dir, '..')) self.set_filter(dir, pat) self.filter_command() return 'break'
[docs] def dirs_select_event(self, event): pass
[docs] def files_double_event(self, event): self.ok_command()
[docs] def files_select_event(self, event): file = self.files.get('active') self.set_selection(file)
[docs] def ok_event(self, event): self.ok_command()
[docs] def ok_command(self): self.quit(self.get_selection())
[docs] def remote_command(self, command): self.top.title('File browser: %s' % (parse_server(self.server0)[2])) self.username, self.server, self.port = setup_ssh_tunnel(self.server0, self.tunnel0) def ssh_cd(inv): if is_localhost(self.server0): return inv return sshOptions() + controlmaster(self.username, self.server, self.port, self.server0) + " -Y -q -p " + self.port + " " + self.username + "@" + self.server + " '" + inv + "'" child = subprocess.Popen(ssh_cd(command), shell=True, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE) out, err = list(map(b2s, child.communicate())) if child.poll() == 0: pass elif child.poll() and len(out.strip()): printw('Warning in file browser:\n\n%s' % err) elif child.poll(): raise Exception('Error in file browser:\n\n%s' % err) return out, err, child.poll()
[docs] def filter_command(self, event=None, dir=None): dir0, pat = self.get_filter() if dir is None: dir=dir0 strdir="\""+dir+"\""if dir else '' try: out,err,retcode=self.remote_command("cd {dir} && \\pwd && \\ls -1 -p -L {dir}".format(dir=strdir)) self.filterDirs.configure(foreground='black') except Exception as _excp: printe(_excp) self.filterDirs.configure(foreground='red') return names = list([x.strip() for x in out.strip().split('\n')]) dir=names.pop(0)+os.sep self.directory=dir self.set_filter(dir, pat) OMFITaux['lastBrowsed'][self.server0+'-'+self.tunnel0]=dir OMFITaux['lastBrowsed']['__lastServer__']=self.server0 OMFITaux['lastBrowsed']['__lastTunnel__']=self.tunnel0 names.sort(key=lambda x:x.lower()) if self.directory!='/': subdirs = [os.pardir] else: subdirs = [] matchingfiles = [] for name in names: fullname = os.path.join(dir, name) if fullname.endswith('/') or os.path.isdir(fullname): subdirs.append(name) elif fnmatch.fnmatch(name, pat): matchingfiles.append(name) self.dirs.delete(0, tk.END) for name in subdirs: self.dirs.insert(tk.END, name) self.files.delete(0, tk.END) for name in matchingfiles: self.files.insert(tk.END, name) head, tail = os.path.split(self.get_selection()) if tail == self.curdir: tail = '' self.set_selection(tail)
[docs] def get_filter(self): return re.sub('//','/',self.filterDirs.get()),self.filterFiles.get()
[docs] def get_selection(self): return re.sub('//','/',self.selection.get())
[docs] def cancel_command(self, event=None): self.quit()
[docs] def set_filter(self, dir, pat): pat=pat.strip() if not pat: pat = self.get_filter()[-1] if not pat: pat="*" self.filterDirs.delete(0, tk.END) self.filterDirs.insert(tk.END, re.sub('//','/',dir)) self.filterFiles.delete(0, tk.END) self.filterFiles.insert(tk.END, re.sub('//','/',pat))
[docs] def set_selection(self, file): self.selection.delete(0, tk.END) self.selection.insert(tk.END, re.sub('//','/',os.path.join(self.directory, file))) self.selection.xview(tk.END)
[docs] def manage_list(self,fav_list,obj,op): # obj is expected to be a combobox object from the FileDialog class (for either filterFiles or filterDirs) # get value from combobox object value = obj.get() if op=='+': printd('FileDialog:manage_list:add:',value) if value not in fav_list: fav_list.append(value) elif op == '-': printd('FileDialog:manage_list:remove:',value) if value in fav_list: fav_list.remove(value) # update combobox selections obj['values']=fav_list
[docs]class LoadFileDialog(FileDialog): """File selection dialog which checks that the file exists.""" def __init__(self, *args, **kw): self.transferRemoteFile = kw.pop('transferRemoteFile', False) super(LoadFileDialog, self).__init__(*args, **kw)
[docs] def ok_command(self): file = self.get_selection() self.remote_command('\\ls "{}"'.format(file)) # transfer remote file locally if requested if self.transferRemoteFile and not is_localhost(self.server0): if isinstance(self.transferRemoteFile, basestring): directory = self.transferRemoteFile else: directory = OMFITcwd + os.sep + 'dir_' + utils_base.now("%Y-%m-%d__%H:%M:%S__%f") while os.path.exists(directory): directory += '_' if not os.path.exists(directory): os.makedirs(directory) remote_downsync(self.server0, file, directory + os.sep, self.tunnel0) self.server0 = 'localhost' self.tunnel0 = '' self.quit(directory + os.sep + os.path.split(file)[1]) else: self.quit(file)
[docs]class SaveFileDialog(FileDialog): """File selection dialog which checks that the file exists before saving."""
[docs] def ok_command(self): file = self.get_selection() try: self.remote_command('\\ls "{}"'.format(file)) answer = Dialog('Overwrite %s ?' % file, answers=['Yes', 'No'], icon='question', title='File exists', parent=self.top) except Exception: answer = 'Yes' if answer is 'Yes': self.quit(file) else: pass
[docs]def remoteFile(parent=None, transferRemoteFile=True, remoteFilename=None, server=None, tunnel=None, init_directory_location=None, init_pattern_location=None, favorite_list_location=None, pattern_list_location=None): ''' Opens up a dialogue asking filename, server/tunnel for remote file transfer This function is mostly used within the framework; for use in OMFIT GUI scripts please consider using the OMFITx.FilePicker and OMFITx.ObjectPicker functions instead. :param parent: Tkinter parent GUI :param transferRemoteFile: [True,False,None] if True the remote file is transferred to the OMFITcwd directory :param remoteFilename: initial string for remote filename :param server: initial string for server :param tunnel: initial string for tunnel :param init_directory_location: The contents of this location are used to set the initial directory for file searches. If a file name is specified the directory will be determined from the file name and this input ignored. Otherwise, if set this will be used to set the initial directory. :param init_pattern_location: The default pattern is '*'. If this is specified then the contents of the tree location will replace the default intial pattern. :param favorite_list_location: OMFIT tree location which contains a possibly empty list of favorite file directories. To keep with the general omfit approach this should be a string. :param pattern_list_location: OMFIT tree location which contains a possibly empty list of favorite search patterns. To keep with the general omfit approach this should be a string. :return: is controlled with transferRemoteFile parameter * string with local filename (if transferRemoteFile==True) * string with the filename (if transferRemoteFile==False) * tuple with the filename,server,tunnel (if transferRemoteFile==None) ''' #if server is specified do not reuse last server serverPicker=None if server is not None: serverPicker='' default='' directory=None if remoteFilename is not None: if server is not None: directory,default=os.path.split(remoteFilename) else: default=os.path.split(remoteFilename)[1] directory=None # allow specification of initial directory if directory is None and init_directory_location is not None: # if the initial directory is a non-empty string then assign the # value to the starting directory init_directory_loc=_absLocation(init_directory_location) try: init_directory = eval(init_directory_loc) if init_directory != '' and isinstance(init_directory,basestring): directory = init_directory except Exception: pass # allow specification of initial pattern if init_pattern_location is not None: # if there is any issue with the evaluation set the search pattern to the default init_pattern_loc = _absLocation(init_pattern_location) try: pattern = eval(init_pattern_loc) except Exception: print("RemoteFile: Problem evaluating the contents of %s"%init_pattern_loc) pattern = '*' else: pattern = '*' if transferRemoteFile: fd = LoadFileDialog(directory=directory, serverPicker=serverPicker, server=server, tunnel=tunnel, pattern=pattern, master=parent, default=default, transferRemoteFile=transferRemoteFile,favorite_list_location=favorite_list_location,pattern_list_location=pattern_list_location) else: fd = FileDialog(directory=directory, serverPicker=None, server=server, tunnel=tunnel, pattern=pattern, master=parent, default=default, favorite_list_location=favorite_list_location,pattern_list_location=pattern_list_location) if fd.how is None: return #return either the filename, or a tuple with filename,sever,tunnel if transferRemoteFile is None if transferRemoteFile is None: return fd.how[0],fd.how[1],fd.how[2] else: return fd.how[0]
#--------------------------- # tool functions #---------------------------
[docs]@_available_to_userTASK def remote_sysinfo(server, tunnel='', quiet=False): r''' This function retrieves information from a remote server (like the shell which is running there):: {'ARG_MAX': 4611686018427387903, 'QSTAT': '', 'SQUEUE': '/opt/slurm/default/bin/squeue', 'environment': OMFITenv([]), 'id': 6216321643098941518, 'login': ['.cshrc', '.login'], 'logout': ['.logout'], 'shell': 'csh', 'shell_path': '/bin/csh', 'sysinfo': 'csh\nARG_MAX=4611686018427387903\nQSTAT=\nSQUEUE=/opt/slurm/default/bin/squeue\necho: No match.' } Information from the remote server is stored in a dictionary :param server: remote server :param tunnel: via tunnel :param quiet: suppress output or not :return: dictionary with info from the server ''' server0 = server tunnel0 = tunnel username, server, port = setup_ssh_tunnel(server0, tunnel0) def ssh_cd(inv): if is_localhost(server0): return inv return sshOptions() + controlmaster(username, server, port, server0) + " -t -t -Y -q -p " + port + " " + username + "@" + server + " '" + inv + "'" id_ = omfit_hash(str(username + server + port)) if id_ in OMFITaux['sysinfo'] and len(set(['id', 'sysinfo', 'shell', 'login', 'logout']).difference(set(OMFITaux['sysinfo'][id_].keys()))): del OMFITaux['sysinfo'][id_] if id_ not in OMFITaux['sysinfo']: command = """ echo '-----------OMFIT-----------' echo SHELL=$0 echo SHELL_PATH=`which $0` echo ARG_MAX=`getconf ARG_MAX` echo QSTAT=`which qstat | grep -v "not found"` echo SQUEUE=`which squeue | grep -v "not found"` echo '------------ENV------------' env echo '------------ENV------------' """.strip() command = re.sub('\n', ' ; ', command) std_out = [] if not quiet: printi('Collecting remote system info') _system(ssh_cd(command), message='Collecting remote system info', ignoreReturnCode=True, std_out=std_out, quiet=True) std_out = [_f for _f in std_out if _f] if not len(std_out): raise Exception(('Could not reach {username}@{server}:{port}\n\n' ' Possible causes:\n' '1. Do you have an account on `{server}` ?\n' ' -> request an account\n' '2. Is `{username}` your username on `{server}`?\n' " -> edit your username under OMFIT['MainSettings']['SERVER']\n" '3. Do you have your SSH keys setup on `{server}`?\n' ' -> run `omfit -s` to setup password-less access\n' ).format(username=username, server=server, port=port)) OMFITaux['sysinfo'][id_] = {} OMFITaux['sysinfo'][id_]['id'] = id_ OMFITaux['sysinfo'][id_]['sysinfo'] = '\n'.join(std_out) env = False ENV = [] for k in range(len(std_out)): if env is None: if std_out[k].startswith('SHELL='): OMFITaux['sysinfo'][id_]['shell'] = std_out[k].split('=')[1].strip() elif std_out[k].startswith('SHELL_PATH='): OMFITaux['sysinfo'][id_]['shell_path'] = std_out[k].split('=')[1].strip() elif std_out[k].startswith('ARG_MAX='): OMFITaux['sysinfo'][id_]['ARG_MAX'] = int(std_out[k].split('=')[1].strip()) elif std_out[k].startswith('QSTAT='): OMFITaux['sysinfo'][id_]['QSTAT'] = std_out[k].split('=')[1].strip() elif std_out[k].startswith('SQUEUE='): OMFITaux['sysinfo'][id_]['SQUEUE'] = std_out[k].split('=')[1].strip() if '-----------OMFIT-----------' in std_out[k]: env = None elif '------------ENV------------' in std_out[k]: env = not env elif env is True: ENV.append(std_out[k]) if not quiet: printi(OMFITaux['sysinfo'][id_]['shell'].upper() + ' shell detected: ' + OMFITaux['sysinfo'][id_]['shell_path']) if 'shell' not in OMFITaux['sysinfo'][id_]: OMFITaux['sysinfo'][id_]['shell'] = '' # overview on http://kb.iu.edu/data/abdy.html if OMFITaux['sysinfo'][id_]['shell'] == 'tcsh': # from http://www.bo.infn.it/alice/alice-doc/mll-doc/impgde/node23.html login = ['/etc/csh.cshrc', '/etc/csh.login', '$HOME/.cshrc', '$HOME/.tcshrc', '$HOME/.login'] logout = ['/etc/csh.logout', '$HOME/.logout'] elif OMFITaux['sysinfo'][id_]['shell'] == 'csh': login = ['.cshrc', '.login'] logout = ['.logout'] elif OMFITaux['sysinfo'][id_]['shell'] in ['sh', 'ksh']: # https://blog.flowblok.id.au/2013-02/shell-startup-scripts.html login = ['/etc/profile', '/etc/ksh.kshrc', '$HOME/.profile'] logout = [] elif OMFITaux['sysinfo'][id_]['shell'] == 'bash': # https://blog.flowblok.id.au/2013-02/shell-startup-scripts.html # from http://shreevatsa.wordpress.com/2008/03/30/zshbash-startup-files-loading-order-bashrc-zshrc-etc/ # echo exit | strace bash -li |& less | grep '^open' login = ['/etc/profile', '/etc/bash.bashrc', '/etc/bashrc', '$HOME/.bashrc', '$HOME/.bash_profile', '$HOME/.bash_login', '$HOME/.profile'] logout = ['$HOME/.bash_logout', '/etc/bash.bash_logout'] elif OMFITaux['sysinfo'][id_]['shell'] == 'zsh': # https://blog.flowblok.id.au/2013-02/shell-startup-scripts.html # from http://shreevatsa.wordpress.com/2008/03/30/zshbash-startup-files-loading-order-bashrc-zshrc-etc/ login = ['/etc/zshenv', '$HOME/.zshenv', '/etc/zprofile', '$HOME/.zprofile', '/etc/zshrc', '$HOME/.zshrc', '/etc/zlogin', '$HOME/.zlogin'] logout = ['$HOME/.zlogout', '/etc/zlogout'] elif OMFITaux['sysinfo'][id_]['shell'] == 'rc': login = ['.rcrc'] logout = [] else: login_ok = login = [] logout = [] if len(login): # find what login files are there std_out = [] std_err = [] _system(ssh_cd('\\ls -1 ' + ' '.join(login)), message='Collecting remote system info', ignoreReturnCode=True, std_out=std_out, std_err=std_err, quiet=quiet) login_ok = [] for line in std_out + std_err: line = line.strip() if len(line) and 'cannot' not in line.lower() and 'no such file or directory' not in line.lower() and ': ' not in line.lower(): login_ok.append(line) if not quiet: printi('Source files at login: ' + ' '.join(login_ok)) OMFITaux['sysinfo'][id_]['login'] = login_ok OMFITaux['sysinfo'][id_]['logout'] = logout OMFITaux['sysinfo'][id_]['environment'] = OMFITenv(string='\n'.join(ENV)) # expand environmental variables defined in the environment on the server side try: serverPicker = SERVER(server0) except (TypeError, NameError): #If running outside the framework pass else: if serverPicker in list(SERVER.keys()) and 'workDir' in SERVER[serverPicker] and '$' in SERVER[serverPicker]['workDir']: tmp = evalExpr(SERVER[serverPicker]['workDir']) for item in re.findall(r'\$\w+', tmp): tmp = tmp.replace(item, OMFITaux['sysinfo'][id_]['environment'][item.strip('$')]) OMFIT['MainSettings']['SERVER'][serverPicker]['workDir'] = tmp OMFIT.addMainSettings(updateUserSettings=True) raise OMFITexception('User configuration for `%s` server has been initialized. Try again.' % serverPicker) return OMFITaux['sysinfo'][id_]
[docs]def manage_user_errors(command, reportUsersErrorByEmail=False, **kw): """ This method wraps around users calls of _OMFITpython scripts and manages printout of errors :param command: command to be executed :return: whatever the command returned """ try: tmp=command(**kw) return tmp,False except (EndOMFITpython, EndAllOMFITpython): return None,False except Exception as handled_exception: etype, value, tb = sys.exc_info() excpStack=traceback.format_exception(etype, value, tb) # record the full exception stack OMFITaux['lastUserError']=excpStack # find out what is the last user error kuser=None showExcp=False inconsole=False for k in range(len(excpStack)): if 'GlobLoc' in excpStack[k]: showExcp=True elif showExcp: if OMFITcwd in excpStack[k] or re.match(r' File "OMFIT.*", line [0-9]*.*',excpStack[k]): kuser=k if '____console____.py' in excpStack[k]: inconsole=True # if kuser is not found report the whole error if kuser is None: raise # print and highlight the latest user error moduleExceptionText='' if inconsole: printe("\nException in command box script:") else: try: if OMFITaux['lastRunModule']!='OMFIT': eval(OMFITaux['lastRunModule']) printe("\nException in script of module %s:"%str(OMFITaux['lastRunModule'])) moduleExceptionText=' ['+str(eval(OMFITaux['lastRunModule']).ID)+']' except Exception: printe("\nException in script:") showExcp=False # simplify exception by hiding the top part of the stack simpleExcp='' for k,line in enumerate(excpStack): if 'GlobLoc' in line: showExcp=True elif showExcp: if re.match(r' File ".*____console____.py", line [0-9]*.*',line): linenumber=re.findall(r'.*line ([0-9]+)*',line)[0] line='Error in command box'+'\n'+line.split('\n')[1] elif os.path.abspath(OMFITcwd) in line: filename=re.findall(r' File "(.*)"',line) if len(filename): filename=os.path.split(filename[0])[1] else: filename=line linenumber=re.findall(r'.*line ([0-9]+)*',line) if len(linenumber): linenumber='" at line '+linenumber[0]+'\n' else: linenumber='' line='Error in "'+filename+linenumber+line.split('\n')[1] elif re.match(r' File "',line): continue simpleExcp+=line+'\n' simpleExcp+='\nPress <F6> to see full error report...\n' # list of developers developers=tolist(OMFIT['MainSettings']['SETUP']['report_to'],empty_lists=[None,'']) try: if OMFITaux['lastRunModule']!='OMFIT': developers+=eval(OMFITaux['lastRunModule']).contact except Exception as _excp1: printe(_excp1) developers=list(set(developers)) # check if this error is the same as the last error that was reported lastSeen=False if '\n'.join(OMFITaux.setdefault('lastReportedUserError',['']))=='\n'.join(excpStack): lastSeen=True # files attachments (relevant python scripts, MainSettings, and module settings) files=[] lines=re.findall(r' File ".*", line [0-9]+.*','\n'.join(excpStack)) for line in lines: filename=re.sub(r' File "(.*)", line [0-9]+.*',r'\1',line) if OMFITcwd in filename: files.append(filename) files=unsorted_unique(files[::-1])[::-1] creator = [''] if files: # see who created the python file by parsing the header '# Created by ...' with open(files[-1],'r') as f: creator = re.findall(r'\#\s+\w+\s+by\s+\w+\s*.*', '/n'.join(f.readlines()[:5])) creator = [re.sub(r'\#\s+.*\s+by\s+(\w+)\s*.*', r'\1', x) for x in creator] # Notify OMFIT and modules developers with an email # if error occurred when user pushed on a GUI button, # if it is not a SyntaxError (likely that a user has edited a script), # if the user is not part of the developers (of OMFIT or of the module), # if the user has not disabled the report error and # if it is not the same error as the last error that occured # if it is not the user that created the file if (not isinstance(handled_exception,(doNotReportException,SyntaxError)) and reportUsersErrorByEmail and OMFIT['MainSettings']['SETUP']['error_report'] and OMFIT['MainSettings']['SETUP']['email'] not in developers and not lastSeen and os.environ['USER'] not in creator): try: #Add MainSettings and module settings to the list of files OMFIT['MainSettings'].save() files.append(OMFIT['MainSettings'].filename) try: if OMFITaux['lastRunModule']!='OMFIT': eval(OMFITaux['lastRunModule'])['SETTINGS'].save() files.append(eval(OMFITaux['lastRunModule'])['SETTINGS'].filename) except Exception: pass message=('OMFIT project: {project}\n' 'OMFIT directory: {directory}\n' 'Git commit: {commit}\n' 'Python executable: {executable}\n' 'Hostname: {hostname}\n' 'Error in module location: {module}\n\n' '====================================\n' 'Console output\n' '====================================\n' '{console}\n\n' '===================================\n' 'Exception stack\n' '====================================\n' '{exception}\n\n' ''.format( project = str(OMFIT.filename), directory = OMFITsrc, commit = repo_str, executable = sys.executable, hostname = ' | '.join(platform.uname()), module = str('' if OMFITaux['lastRunModule']=='OMFIT' else OMFITaux['lastRunModule']), #Note that .split('+------------------------------+')[-1] is done to avoid receiving: #+------------------------------+ +------------------------------+ #| BELOW IS WHAT YOU WERE DOING | and | ABOVE IS WHAT YOU WERE DOING | #+------------------------------+ +------------------------------+ console = OMFITaux['console'].get()[-10000:].split('+------------------------------+')[-1], exception = '\n'.join(excpStack))) # send email send_email(to = developers, fromm = OMFIT['MainSettings']['SETUP']['email'], subject = 'OMFIT user error report'+moduleExceptionText, message = message, attachments = files) printt('OMFIT user error report sent to < '+', '.join(developers)+' >') OMFITaux['lastReportedUserError']=excpStack except Exception as _excp1: raise #printt('Problem sending error report: '+repr(_excp1)) #print simplified exception to console printe(simpleExcp) OMFITaux['rootGUI'].event_generate("<<update_treeGUI>>") return None,True
def _system(command_line, message='Processing...', fixedWidthDetails='', ignoreReturnCode=False, std_out=None, std_err=None, executable=None, quiet=False, progressFunction=None, extraButtons=None, noGUI=False): """ :param command_line: command to be executed :param message: message to appear in the GUI :param fixedWidthDetails: further details to be formatted with a fixed width font, e.g., a script :param ignoreReturnCode: ignore return code of the command :param std_out: if a list is passed (e.g. []), the stdout of the program will be put there line by line :param std_err: if a list is passed (e.g. []), the stderr of the program will be put there line by line :param executable: shell executable, defaults to os.environ['SHELL'] and fallsback on default of subprocess.Popen() :param quiet: switch to turn ON/OFF print to console :param progressFunction: user function to which the std-out of the process is passed and returns values from 0 to 100 to indicate progress towards completion :param extraButtons: dictionary with key/function that is used to add extra buttons to the GUI. The function receives a dictionary with the process std_out and pid :param noGUI: do not show execution in GUI :return: """ if 'DYLD_LIBRARY_PATH' in os.environ: print('Removed DYLD_LIBRARY_PATH from os.environ') del os.environ['DYLD_LIBRARY_PATH'] if executable is None: executable=os.environ['SHELL'] if executable=='OMFITbash': bashlink=OMFITbinsDir+os.sep+'OMFITbash' if not os.path.exists(bashlink): with open(bashlink,'w') as f: f.write('''#!/bin/bash\n/bin/bash -l "$@"\n''') os.chmod(bashlink,0o700) executable=bashlink if extraButtons is None: extraButtons = {} child = subprocess.Popen(command_line, shell=True, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, close_fds=True, executable=executable, preexec_fn=os.setsid) #non blocking reading of buffers fcntl.fcntl(child.stdout.fileno(), fcntl.F_SETFL, os.O_NONBLOCK) fcntl.fcntl(child.stderr.fileno(), fcntl.F_SETFL, os.O_NONBLOCK) gbl={} gbl['std_out']='' gbl['pid']=None if len(OMFITaux['prun_process']) or noGUI or (len(OMFITaux['pythonRunWindows']) and OMFITaux['pythonRunWindows'][-1] is None): gbl['std_out'], std_err_ = list(map(b2s, child.communicate())) if std_out is not None: std_out.extend(gbl['std_out'].split('\n')) if std_err is not None: std_err.extend(std_err_.split('\n')) else: max_buffer_len=10000000 show=(OMFITaux['rootGUI'] is None or OMFITaux['console'].show) and not quiet printd(command_line,topic='framework') if OMFITaux['rootGUI'] is not None: tkMessage=tk.StringVar() if isinstance(message,basestring): tkMessage.set(message) elif isinstance(message, CollectionsCallable): tkMessage.set(message()) tkDetails = tk.StringVar() if fixedWidthDetails: tkDetails.set(fixedWidthDetails) def onKill(): killed.set(True) top.update_idletasks() finished.set(True) top.update_idletasks() def onAbort(): top.update_idletasks() aborted.set(True) finished.set(True) top.update_idletasks() def checkChild(): if gbl['pid'] is None: gbl['pid']=child.pid elif child.poll() is not None: try: p['value']=100 except tk.TclError: pass finished.set(True) if OMFITaux['rootGUI'] is not None: top.destroy() return elif select.select([child.stdout], [], [], 0)[0]: try: tmp=b2s(child.stdout.read()) #return output gbl['std_out']+=tmp gbl['std_out']=gbl['std_out'][:max_buffer_len] if progressFunction: p['value']=progressFunction(gbl['std_out']) #output to console if show: tag_print(tmp,tag='PROGRAM_OUT',end='') except IOError: tmp=None if OMFITaux['rootGUI'] is not None and not finished.get(): if isinstance(message, CollectionsCallable): tkMessage.set(message()) if OMFITaux['console'].show: OMFITaux['rootGUI'].after(1,checkChild) else: OMFITaux['rootGUI'].after(100,checkChild) p = {'value':0, 'maximum': 100} if OMFITaux['rootGUI'] is not None: try: if OMFITaux['pythonRunWindows'][-1] is None: raise RuntimeError('--') top=ttk.Frame(OMFITaux['pythonRunWindows'][-1],borderwidth=2,relief=tk.GROOVE) top.pack(side=tk.TOP, expand=tk.NO, fill=tk.BOTH, padx=5, pady=5) except Exception: top=tk.Toplevel(_topGUI(OMFITaux['rootGUI'])) top.transient(OMFITaux['rootGUI']) top.protocol("WM_DELETE_WINDOW", 'break') if isinstance(message,basestring): top.wm_title(message) elif isinstance(message, CollectionsCallable): top.wm_title(message()) top.update_idletasks() ttk.Label(top,textvar=tkMessage).pack(side=tk.TOP,expand=tk.NO,fill=tk.X) if fixedWidthDetails: fixedWidthDetailsText=tk.Text(top,background=ttk.Style().lookup('TFrame', 'background'),wrap=tk.NONE,font=OMFITfont(size=-2,family='Courier')) fixedWidthDetailsText.pack(padx=5) fixedWidthDetailsText.insert(1.0,tkDetails.get()) fixedWidthDetailsText.configure(state='disabled',height=min([10,len(tkDetails.get().split('\n'))]),width=80,relief=tk.FLAT) if progressFunction: p = ttk.Progressbar(top, orient=tk.HORIZONTAL, mode='determinate') p['value'] = 0 p['maximum'] = 100 else: p = ttk.Progressbar(top, orient=tk.HORIZONTAL, mode='indeterminate') p.start() p.pack(padx=5,pady=5,expand=tk.NO,fill=tk.X) frm=ttk.Frame(top) ttk.Button(frm, text="Kill local", command=onKill, takefocus=False).pack(side=tk.LEFT,expand=tk.NO,fill=tk.X) for btt in list(extraButtons.keys()): ttk.Button(frm, text=btt, command=lambda btt=btt:extraButtons[btt](gbl), takefocus=False).pack(side=tk.LEFT,expand=tk.NO,fill=tk.X) ttk.Button(frm, text="Abort", command=onAbort, takefocus=False).pack(side=tk.LEFT,expand=tk.NO,fill=tk.X) frm.pack(side=tk.TOP) top.update_idletasks() finished=tk.BooleanVar() finished.set(False) aborted=tk.BooleanVar() aborted.set(False) killed=tk.BooleanVar() killed.set(False) if OMFITaux['rootGUI'] is not None: checkChild() top.wait_variable(finished) top.destroy() else: while not finished.get(): checkChild() sleep(0.1) if child.poll() is None and (aborted.get() or killed.get()): try: os.killpg(gbl['pid'],signal.SIGTERM) for k in range(20): if child.poll() is not None: break if k==10: os.killpg(gbl['pid'],signal.SIGKILL) sleep(0.1) except OSError: pass #show the remaining standard output if std_out is not None or show: if select.select([child.stdout], [], [], 0)[0]: try: tmp=b2s(child.stdout.read()) gbl['std_out']+=tmp gbl['std_out']=gbl['std_out'][:max_buffer_len] if std_out is not None: std_out.extend(gbl['std_out'].split('\n')) if show: tag_print(tmp,tag='PROGRAM_OUT',end='') except IOError: tmp=None #show the standard error (all at the end) if std_err is not None or show: if select.select([child.stderr], [], [], 0)[0]: try: tmp=b2s(child.stderr.read()) if std_err is not None: std_err_=tmp[:max_buffer_len] std_err.extend(std_err_.split('\n')) if show: tag_print(tmp,tag='PROGRAM_ERR',end='') except IOError: tmp=None if OMFITaux['rootGUI'] is not None: OMFITaux['rootGUI'].update_idletasks() OMFITaux['rootGUI'].focus_set() if killed.get(): pass elif aborted.get(): raise EndAllOMFITpython('\n\n---> Aborted by user <---\n\n') elif not ignoreReturnCode and child.poll()!=0: raise ReturnCodeException('\n\nReturn code was '+str(child.poll())+' for command:\n\n'+command_line+'\n\nYou can ignore the return code by setting the keyword `ignoreReturnCode=True`') return_val = child.poll() child.stderr.close() child.stdout.close() child.stdin.close() return return_val def _xargs(command, data): data_string='' commands=[] data=copy.deepcopy(data) while len(data): data_string+=data.pop()+' ' if not len(data) or (len(command)+len(data_string)+len(data[-1])+len('\n echo Done ')+9) > ARG_MAX: commands.append(re.sub(r'\{\}', data_string.strip(), command)) data_string='' if len(commands)>1: printd('\n'.join(commands),topic='framework') commands=[item+'\necho Done '+str(k+1)+'/'+str(len(commands)) for k,item in enumerate(commands)] return commands def _remote_upsync(server, local, remote, tunnel=None, ignoreReturnCode=False, quiet=False): if not len(local): return server0=server tunnel0=tunnel username,server,port=setup_ssh_tunnel(server0,tunnel0) if not isinstance(local,list): local=[local] remote=str(remote) local = [re.sub(' ', r'\ ', x) for x in local] local_str=' '.join(local) if len(local_str)>128: local_str=local_str[:64]+'...'+local_str[-64:] if not len(server): ret_code=0 if not quiet: printi('Local copy of: '+' '.join(local)) if not os.path.exists(os.path.split(remote)[0]): os.makedirs(os.path.split(remote)[0]) upsync=system_executable('cp')+' -Rfp {} '+remote upsync=_xargs(upsync,local) while len(upsync): command=upsync.pop(0) ret_code=_system(command, 'Local copy of:', fixedWidthDetails=local_str, ignoreReturnCode=True, noGUI=True) if ret_code!=0 and not ignoreReturnCode: break else: msg = 'Up-sync to '+server0 if tunnel0 is not None and tunnel0!='': msg += ' (via '+tunnel0+')' msg += ' of: '+' '.join(local) if not quiet: printi(msg) mkdir=sshOptions()+controlmaster(username,server,port,server0)+" -Y -q -p "+port+" "+username+"@"+server+" 'mkdir -p "+remote+"'" _system(mkdir, 'Building remote directory structure...', ignoreReturnCode=True) upsync = "{path} {progress} --copy-links --update --inplace --compress --recursive -e '".format(**system_executable('rsync', True)) + sshOptions() + controlmaster(username, server, port, server0) + " -q -p " + port + "' {} " + username + "@" + server + ":" + remote upsync = _xargs(upsync, local) def progressFunction(out): n=float(len(upsync)+1) try: return int(100*segment/n+float(re.findall(' [0-9]+% ',out)[-1].strip(' %'))/n) except IndexError: return 0 std_out=[] std_err=[] segment=0 while len(upsync): command=upsync.pop(0) ret_code=_system(command, 'Up-Sync of:', fixedWidthDetails=local_str, ignoreReturnCode=True, progressFunction=progressFunction, quiet=True, std_out=std_out, std_err=std_err) if ret_code!=0 and not ignoreReturnCode: break segment+=1 if ret_code!=0: if not ignoreReturnCode: if ret_code==255 and (server is not None and len(server)): raise ReturnCodeException('\n\n'+command+'\n\n---> SSH connection to `%s` via `%s` was terminated <---\n\n'%(server0,tunnel0)) else: _streams['PROGRAM_ERR'].write(''.join(std_err)+'\n') raise ReturnCodeException('\n\nReturn code was '+str(ret_code)+' for UPSYNC command:\n\n'+command+'\n\nYou can ignore the return code by setting the keyword `ignoreReturnCode=True`') return ret_code def _remote_downsync(server, remote, local, tunnel=None, ignoreReturnCode=False, quiet=False): if not len(remote): return server0=server tunnel0=tunnel username,server,port=setup_ssh_tunnel(server0,tunnel0) if not isinstance(remote,list): remote=[remote] local=str(local) remote = [re.sub(' ', r'\ ', x) for x in remote] remote_str=' '.join(remote) if len(remote_str)>128: remote_str=remote_str[:64]+'...'+remote_str[-64:] if not len(server): ret_code=0 if not quiet: printi('Local copy of: '+' '.join(remote)) if not os.path.exists(os.path.split(local)[0]): os.makedirs(os.path.split(local)[0]) downsync=system_executable('cp')+' -Rfp {} '+local downsync=_xargs(downsync,remote) while len(downsync): command=downsync.pop(0) ret_code=_system(command, 'Local copy of:', fixedWidthDetails=remote_str, ignoreReturnCode=True, noGUI=True) if ret_code!=0 and not ignoreReturnCode: break else: msg = 'Down-sync from '+server0 if tunnel0 is not None and tunnel0!='': msg += ' (via '+tunnel0+')' msg += ' of: '+' '.join(remote) if not quiet: printi(msg) if not os.path.exists(os.path.split(local)[0]): os.makedirs(os.path.split(local)[0]) downsync = "{path} {progress} --copy-links --update --inplace --compress --recursive -e '".format(**system_executable('rsync', True)) + sshOptions() + controlmaster(username, server, port, server0) + " -q -p " + port + "' " + username + "@" + server + ":'{}' " + local downsync = _xargs(downsync, remote) def progressFunction(out): n=float(len(downsync)+1) try: return int(100*segment/n+float(re.findall(' [0-9]+% ',out)[-1].strip(' %'))/n) except IndexError: return 0 std_out=[] std_err=[] segment=0 while len(downsync): command=downsync.pop(0) ret_code=_system(command, 'Down-Sync of:', fixedWidthDetails=remote_str, ignoreReturnCode=True, progressFunction=progressFunction, quiet=True, std_out=std_out, std_err=std_err) if ret_code!=0 and not ignoreReturnCode: break segment+=1 if ret_code!=0: if not ignoreReturnCode: if ret_code==255 and (server is not None and len(server)): raise ReturnCodeException('\n\n'+command+'\n\n---> SSH connection to `%s` via `%s` was terminated <---\n\n'%(server0,tunnel0)) else: _streams['PROGRAM_ERR'].write(''.join(std_err)+'\n') raise ReturnCodeException('\n\nReturn code was '+str(ret_code)+' for DOWNSYNC command:\n\n'+command+'\n\nYou can ignore the return code by setting the keyword `ignoreReturnCode=True`') return ret_code def _shell_escape(command): return re.sub(r'([\$\\\`])', r'\\\1', command) def _bang_commander(command, use_bang_command='OMFIT_run_command.sh', shell='bash', xterm=False): if use_bang_command is True: use_bang_command = 'OMFIT_run_command.sh' bang_command = '\n\\rm -f %s\n' % use_bang_command command = command.strip() if command.startswith('#!'): shell = re.sub('^#!', '', command.split('\n')[0]) bang_command += "\n\\cat << __b__MATCHING_EOF__b__ > %s \n" % use_bang_command + _shell_escape(command) + "\n__b__MATCHING_EOF__b__\n" else: if shell == 'bash': shell = '/bin/bash -l' elif not shell.startswith('/'): shell = '/bin/' + shell bang_command += "\n\\cat << __b__MATCHING_EOF__b__ > %s \n" % use_bang_command + _shell_escape('#!%s\n'%shell+command) + "\n__b__MATCHING_EOF__b__\n" if xterm: bang_command += "chmod +x %s && xterm -e $PWD/%s" % (use_bang_command, use_bang_command) else: bang_command += "chmod +x %s && $PWD/%s" % (use_bang_command, use_bang_command) return bang_command, shell
[docs]def execute(command_line, interactive_input=None, ignoreReturnCode=False, std_out=None, std_err=None, quiet=False, arguments='', script=None, use_bang_command='OMFIT_run_command.sh', progressFunction=None, extraButtons=None): """ This function allows execution of commands on the local workstation. :param command_line: string to be executed locally :param interactive_input: interactive input to be passed to the command :param ignoreReturnCode: ignore return code of the command :param std_out: if a list is passed (e.g. []), the stdout of the program will be put there line by line :param std_err: if a list is passed (e.g. []), the stderr of the program will be put there line by line :param quiet: print command to screen or not :param arguments: arguments that are passed to the `command_line` :param script: string with script to be executed. `script` option substitutes `%s` with the automatically generated name of the script if `script` is a list or a tuple, then the first item should be the script itself and the second should be the script name :param use_bang_command: Execute commands via `OMFIT_run_command.sh` script (useful to execute scripts within a given shell: #!/bin/...) If `use_bang_command` is a string, then the run script will take that filename. Notice that setting `use_bang_command=True` is not safe for multiple processes running in the same directory. :param progressFunction: user function to which the std-out of the process is passed and returns values from 0 to 100 to indicate progress towards completion :param extraButtons: dictionary with key/function that is used to add extra buttons to the GUI. The function receives a dictionary with the process std_out and pid :return: return code of the command """ if extraButtons is None: extraButtons={} #add open-terminal button to GUI def remote_terminal(kw, terminal_command): subprocess.Popen(terminal_command,shell=True) terminal_command="xterm -e \"%s\""%('cd '+os.getcwd()+';exec '+os.environ['SHELL']) extraButtons.setdefault('Open terminal',lambda kw={},terminal_command=terminal_command:remote_terminal(kw,terminal_command)) scriptName='' if isinstance(script,(list,tuple)): scriptName=script[1] script=script[0] elif isinstance(script,OMFITascii): scriptName=os.path.split(script.filename)[1] with open(script.filename,'r') as f: script=f.read() if isinstance(script,basestring): if not len(scriptName) and '%s' not in command_line: raise RuntimeError('`script` option requires that you place `%s` in the command line location where you want the script filename to appear') scriptFile=OMFITascii(scriptName) with open(scriptFile.filename,'w') as f: f.write(script) os.chmod(scriptFile.filename,os.stat(scriptFile.filename).st_mode | stat.S_IEXEC) #make the script executable by user if '%s' in command_line: command_line=command_line%(os.path.split(scriptFile.filename)[1]) scriptFile.deploy() if isinstance(arguments,basestring): command_line=command_line+' '+arguments else: command_line=command_line+' '.join(arguments) if interactive_input: command=str(command_line.rstrip())+ """ << __c__MATCHING_EOF__c__\n"""+ \ _shell_escape(interactive_input) + \ """\n__c__MATCHING_EOF__c__\n""" else: command=command_line bang_command=command shell = os.path.split(os.environ['SHELL'])[1] if use_bang_command: bang_command, shell=_bang_commander(command,use_bang_command,shell=shell) bang_command="\\uname -n\n\\pwd\n\\echo local_PID=$$\n\\echo\n"+bang_command msg = 'Local execute' msg += ' of:\n'+command_line.strip() msg += ' in:\n'+os.getcwd() if not quiet: printi(msg) printd(bang_command,topic='execution') ret_code=_system(bang_command, message='Running locally:', fixedWidthDetails=command_line, ignoreReturnCode=ignoreReturnCode, std_out=std_out, std_err=std_err, executable='OMFITbash', quiet=quiet, progressFunction=progressFunction, extraButtons=extraButtons) #use /bin/bash because of syntax used by interactive_input return ret_code
[docs]@_available_to_userTASK def remote_execute(server, command_line, remotedir, tunnel=None, interactive_input=None, ignoreReturnCode=False, std_out=None, std_err=None, quiet=False, arguments='', script=None, forceRemote=False, use_bang_command='OMFIT_run_command.sh', progressFunction=None, extraButtons=None, xterm=False): """ This function allows execution of commands on remote workstations. It has the logic to check if the remote workstation is the local workstation and in that case executes locally. :param server: server to connect and execute the command :param command_line: string to be executed remotely (NOTE that if server='', the command is executed locally in the local directory) :param remotedir: remote working directory, if remote directory does not exist it will be created :param tunnel: tunnel to go through to connect to the server :param interactive_input: interactive input to be passed to the command :param ignoreReturnCode: ignore return code of the command :param std_out: if a list is passed (e.g. []), the stdout of the program will be put there line by line :param std_err: if a list is passed (e.g. []), the stderr of the program will be put there line by line :param quiet: print command to screen or not :param arguments: arguments that are passed to the `command_line` :param script: string with script to be executed. `script` option substitutes `%s` with the automatically generated name of the script if `script` is a list or a tuple, then the first item should be the script itself and the second should be the script name :param forceRemote: force remote connection even if server is localhost :param use_bang_command: execute commands via `OMFIT_run_command.sh` script (useful to execute scripts within a given shell: #!/bin/...) If `use_bang_command` is a string, then the run script will take that filename. Notice that setting `use_bang_command=True` is not safe for multiple processes running in the same directory. :param progressFunction: user function to which the std-out of the process is passed and returns values from 0 to 100 to indicate progress towards completion :param extraButtons: dictionary with key/function that is used to add extra buttons to the GUI. The function receives a dictionary with the process std_out and pid :param xterm: if True, launch command in its own xterm :return: return code of the command """ if extraButtons is None: extraButtons = {} server0 = server tunnel0 = tunnel username, server, port = setup_ssh_tunnel(server0, tunnel0, forceRemote=forceRemote) remotedir = evalExpr(remotedir) if not server: oldDir = os.getcwd() if not os.path.exists(remotedir): os.makedirs(remotedir) os.chdir(remotedir) try: return execute(command_line, interactive_input=interactive_input, arguments=arguments, ignoreReturnCode=ignoreReturnCode, std_out=std_out, std_err=std_err, quiet=quiet, script=script, use_bang_command=use_bang_command, progressFunction=progressFunction, extraButtons=extraButtons) finally: os.chdir(oldDir) scriptName = '' if isinstance(script, (list, tuple)): scriptName = script[1] script = script[0] elif isinstance(script, OMFITascii): scriptName = os.path.split(script.filename)[1] with open(script.filename, 'r') as f: script = f.read() if isinstance(script, basestring): if not len(scriptName) and '%s' not in command_line: raise RuntimeError('`script` option requires that you place `%s` in the command line location where you want the script filename to appear') scriptFile = OMFITascii(scriptName) with open(scriptFile.filename, 'w') as f: f.write(script) if '%s' in command_line: command_line = command_line % (os.path.split(scriptFile.filename)[1]) if isinstance(arguments, basestring): command_line = command_line + ' ' + arguments else: command_line = command_line + ' '.join(arguments) if interactive_input: command = str(command_line) + ' << __a__MATCHING_EOF__a__\n' + _shell_escape(interactive_input) + '\n__a__MATCHING_EOF__a__\n' else: command = command_line sysinfo = remote_sysinfo(server0, tunnel0, quiet=quiet) bang_command = command shell = sysinfo['shell'] if use_bang_command or xterm: bang_command, shell = _bang_commander(command, use_bang_command, shell=shell, xterm=xterm) header = "\\uname -n;\\pwd;\\echo remote_PID=$$;\\echo;" tmp_files = [] def ssh_cd(inv, tty): if tty: tty = ' -t -t ' else: tty = '' inv = "\nmkdir -p " + remotedir + "\ncd " + remotedir + "\n\n" + inv + "\n\n" tmp_files.append(tempfile._get_default_tempdir() + os.sep + next(tempfile._get_candidate_names())) with open(tmp_files[-1], 'w') as f: f.write(inv) printd(inv, topic='execution') ssh_inv = 'cat %s | ' % tmp_files[-1] + sshOptions() + controlmaster(username, server, port, server0) + tty + "-Y -q -p " + port + " " + username + "@" + server + " '%s%s -s'" % (header,shell) printd(ssh_inv, topic='execution') return ssh_inv msg = 'Remote execute on ' + server0 if tunnel0 is not None and tunnel0 != '': msg += ' (via ' + tunnel0 + ')' msg += ' of:\n' + command_line.strip() if not quiet: printi(msg) # add kill remote button to GUI def kill_remote(kw, kill_command): if kw['std_out'] is None or not len(kw['std_out']): printw('Remote process has not started yet. Nothing to kill.') else: found = re.search('remote_PID=[0-9]+', kw['std_out']).group() if not found: printw('Remote process has not started yet. Nothing to kill.') return kill_command = kill_command.format(PID=found.split('=')[1]) subprocess.Popen(kill_command, shell=True) printi('Issued kill remote PID '+found.split('=')[1]) # send INT to the process group, allow 2 seconds, send KILL to the process group kill_command = sshOptions() + controlmaster(username, server, port, server0) + " -q -p " + port + " " + username + "@" + server + " 'kill -INT -{PID}; sleep 2; kill -KILL -{PID}'" extraButtons.setdefault('Kill remote', lambda kw={}, kill_command=kill_command: kill_remote(kw, kill_command)) # add open-terminal button to GUI def remote_terminal(kw, terminal_command): subprocess.Popen(terminal_command, shell=True) terminal_command = "xterm -e \"%s\"" % (ssh_cd('exec ' + sysinfo['shell'], True)) extraButtons.setdefault('Open terminal', lambda kw={}, terminal_command=terminal_command: remote_terminal(kw, terminal_command)) # logic for setting TTY tty = not use_bang_command and interactive_input if isinstance(script, basestring): _remote_upsync(server0, scriptFile.filename, remotedir, tunnel=tunnel0, ignoreReturnCode=ignoreReturnCode) ret_code = _system(ssh_cd(bang_command, tty), message='Running remotely:', fixedWidthDetails=command_line, ignoreReturnCode=True, std_out=std_out, std_err=std_err, executable='OMFITbash', quiet=quiet, progressFunction=progressFunction, extraButtons=extraButtons) # use /bin/bash because of syntax used by interactive_input for file in tmp_files: if os.path.exists(file): os.remove(file) if ret_code != 0: if not ignoreReturnCode: if ret_code == 255: raise ReturnCodeException( '\n\n' + command + '\n\n---> SSH connection to `%s` via `%s` was terminated <---\n\n' % ( server0, tunnel0)) else: raise ReturnCodeException( '\n\nReturn code was ' + str(ret_code) + ' for REMOTE command:\n\n' + ssh_cd(command, tty) + '\n\nYou can ignore the return code by setting the keyword `ignoreReturnCode=True`') return ret_code
[docs]@_available_to_userTASK def remote_upsync(server, local, remote, tunnel=None, ignoreReturnCode=False, keepRelativeDirectoryStructure='', quiet=False): """ Function to upload files/directories to remote server (possibly via tunnel connection) NOTE: this function relies on rsync. There is no way to arbitrarily rename files with rsync. All rsync can do is move files to a different directory. :param server: server to connect and execute the command :param local: local file(s) (string or list strings) to upsync :param remote: remote directory or file to save files to :param tunnel: tunnel to go through to connect to the server :param ignoreReturnCode: whether to ignore return code of the rsync command :param keepRelativeDirectoryStructure: string with common based directory of the locals files to be removed (usually equals `local_dir`) :param quiet: print command to screen or not :return: return code of the rsync command (or True if keepRelativeDirectoryStructure and ignoreReturnCode and some rsync fail) """ local = tolist(local) if keepRelativeDirectoryStructure: bases = [str(x).replace(str(keepRelativeDirectoryStructure), '').lstrip(os.sep) for x in local] base_dirs = set([os.path.split(x)[0] for x in bases]) ret_codes = [] for bd in base_dirs: individual = [str(keepRelativeDirectoryStructure) + os.sep + f for f in bases if os.path.split(f)[0] == bd] ret_codes.append(_remote_upsync(server, individual, remote + os.sep + bd + os.sep, tunnel=tunnel, ignoreReturnCode=ignoreReturnCode, quiet=quiet)) return 0 if not numpy.any(ret_codes) else True else: return _remote_upsync(server, local, remote, tunnel=tunnel, ignoreReturnCode=ignoreReturnCode, quiet=quiet)
[docs]@_available_to_userTASK def remote_downsync(server, remote, local, tunnel=None, ignoreReturnCode=False, keepRelativeDirectoryStructure='', quiet=False): """ Function to download files/directories from remote server (possibly via tunnel connection) NOTE: this function relies on rsync. There is no way to arbitrarily rename files with rsync. All rsync can do is move files to a different directory. :param server: server to connect and execute the command :param remote: remote file(s) (string or list strings) to downsync :param local: local directory or file to save files to :param tunnel: tunnel to go through to connect to the server :param ignoreReturnCode: whether to ignore return code of the rsync command :param keepRelativeDirectoryStructure: string with common based directory of the remote files to be removed (usually equals `remote_dir`) :param quiet: print command to screen or not :param use_scp: (bool) If this flag is True remote_downsync will be executed with "scp" instead of "rsync". Use for increased download speed. (default: False) :return: return code of the rsync command (or True if keepRelativeDirectoryStructure and ignoreReturnCode and some rsync fail) """ remote = tolist(remote) if keepRelativeDirectoryStructure: bases = [str(x).replace(str(keepRelativeDirectoryStructure), '').lstrip(os.sep) for x in remote] base_dirs = set([os.path.split(x)[0] for x in bases]) ret_codes = [] for bd in base_dirs: individual = [str(keepRelativeDirectoryStructure) + os.sep + f for f in bases if os.path.split(f)[0] == bd] ret_codes.append(_remote_downsync(server, individual, local + os.sep + bd + os.sep, tunnel=tunnel, ignoreReturnCode=ignoreReturnCode, quiet=quiet)) return 0 if not numpy.any(ret_codes) else True else: return _remote_downsync(server, remote, local, tunnel=tunnel, ignoreReturnCode=ignoreReturnCode, quiet=quiet)
[docs]def mdsvalue(server, treename=None, shot=None, TDI=None): """ :param server: MDS+ server to connect to :param treename: name of the MDS+ tree :param shot: shot number :param TDI: TDI string to be executed :return: result of TDI command """ server, treename, shot = OMFITmdsObjects(server, treename, shot).interpret_id() tunneled_server = tunneled_MDSserver(server, quiet=True) tmp = omfit_mds.OMFITmdsConnectionBase().mdsvalue(tunneled_server, treename, shot, TDI) tmp = numpy.atleast_1d(tmp) if tmp.dtype.char.lower() in ['s', 'u']: return '\n'.join(list(map(b2s, tmp))) elif tmp.size == 1: return tmp[0] else: return tmp
[docs]@_available_to_userTASK class IDL(object): """ This class provides a live IDL session via the pidly module: https://pypi.python.org/pypi/pyIDL/ In practice this class wraps the pidly.IDL session so that it can handle SERVERS remote connections (including tunneling) and directory management the OMFIT way. The IDL executable is taken from the `idl` entry of this server under OMFIT['MainSettings']['SERVER']. Local and remote working directories are specified in root['SETTINGS']['SETUP']['workDir'] and root['SETTINGS']['REMOTE_SETUP']['workDir']. Server and tunnel are specified in root['SETTINGS']['REMOTE_SETUP']['server'] and root['SETTINGS']['REMOTE_SETUP']['tunnel']. If the tunnel is an empty string, the connection to the remote server is direct. If server is an empty string, everything will occur locally and the remote working directory will be ignored. :param module_root: root of the module (e.g. `root`) :param server: override module server :param tunnel: override module tunnel :param executable: override the executable is taken from the `idl` entry of this server under OMFIT['MainSettings']['SERVER']. :param workdir: override module local working directory :param remotedir: override module remote working directory :param clean: clear local/remote working directories * "local": clean local working directory only * "local_force": force clean local working directory only * "remote": clean remote working directory only * "remote_force": force clean remote working directory only * True: clean both * "force": force clean both * False: clean neither [DEFAULT] >>> idl=OMFITx.IDL(OMFIT['EFIT']) >>> idl('$pwd') >>> idl('x = total([1, 1], /int)') >>> print idl.x >>> tmp=OMFITgeqdsk(OMFITsrc+'/../samples/g133221.01000') >>> idl.upsync([tmp]) >>> idl.downsync(['g133221.01000']) """ def __init__(self, module_root, server=None, tunnel=None, executable=None, workdir=None, remotedir=None, clean=False): if server is None: server=str(module_root['SETTINGS']['REMOTE_SETUP']['server']) if tunnel is None: tunnel=str(module_root['SETTINGS']['REMOTE_SETUP']['tunnel']) if executable is None: try: executable=SERVER[SERVER(server)]['idl'] except Exception: raise OMFITexception("Server "+server +" does not have `idl` setup. You may need to update OMFIT['MainSettings']['SERVER'] to fix this.") if remotedir is None: remotedir=str(module_root['SETTINGS']['REMOTE_SETUP']['workDir']) if workdir is None: workdir=str(module_root['SETTINGS']['SETUP']['workDir']) server0=server tunnel0=tunnel username,server,port=setup_ssh_tunnel(server0,tunnel0) self._OMFITserver=server0 self._OMFITtunnel=tunnel0 initWorkdir(module_root, clean=clean, server=server0, tunnel=tunnel0, workdir=workdir, remotedir=remotedir) def ssh_cd(inv): source='' if len(sysinfo['login']): source='source '+'\nsource '.join(sysinfo['login'])+'\n' return sshOptions()+controlmaster(username,server,port,server0)+" -t -t -Y -q -p "+port+" "+username+"@"+server+" '"+source+"mkdir -p "+remotedir+"\ncd "+remotedir+"\n\n"+inv+"\n\n'" if len(server): sysinfo=remote_sysinfo(self._OMFITserver,self._OMFITtunnel) executable=ssh_cd(executable) self.idl=pidly.IDL(executable)
[docs] def upsync(self, inputs=[], keepRelativeDirectoryStructure=False, ignoreReturnCode=False, quiet=False): ''' Function used to upload files from the local working directory to remote IDL directory :param inputs: list of input objects or path to files, which will be deployed in the local or remote working directory. To deploy objects with a different name one can specify tuples (inputObject,'deployName') :param ignoreReturnCode: whether to ignore return code of the rsync command :param quiet: print command to screen or not ''' self.idl('CD, Current=OMFIT_idl_pwd') remotedir=str(self.idl.OMFIT_idl_pwd)+'/' upsyncFiles=[] for item in inputs: if isinstance(item,list) or isinstance(item,tuple): item[0].deploy(item[1]) upsyncFiles.append(item[1]) elif hasattr(item,'deploy'): item.deploy() upsyncFiles.append(os.path.split(item.filename)[1]) elif isinstance(item,basestring): shutil.copy2(item,os.path.split(item)[1]) upsyncFiles.append(os.path.split(item)[1]) remote_upsync(server=self._OMFITserver,local=upsyncFiles,remote=remotedir,tunnel=self._OMFITtunnel,ignoreReturnCode=ignoreReturnCode,keepRelativeDirectoryStructure=keepRelativeDirectoryStructure,quiet=quiet)
[docs] def downsync(self, outputs=[], ignoreReturnCode=False, keepRelativeDirectoryStructure=False, quiet=False): ''' Function used to download files from the remote IDL directory to the local working directory :param outputs: list of output files which will be fetched from the remote directory :param keepRelativeDirectoryStructure: [True/False] keep relative directory structure of the remote files :param ignoreReturnCode: whether to ignore return code of the rsync command :param quiet: print command to screen or not ''' self.idl('CD, Current=OMFIT_idl_pwd') remotedir=str(self.idl.OMFIT_idl_pwd)+'/' downsyncFiles=[remotedir+file for file in outputs] if outputs: if keepRelativeDirectoryStructure: remote_downsync(server=self._OMFITserver,remote=downsyncFiles,local='./',tunnel=self._OMFITtunnel,ignoreReturnCode=ignoreReturnCode,keepRelativeDirectoryStructure=remotedir,quiet=quiet) else: remote_downsync(server=self._OMFITserver,remote=downsyncFiles,local='./',tunnel=self._OMFITtunnel,ignoreReturnCode=ignoreReturnCode,keepRelativeDirectoryStructure=False,quiet=quiet)
def __getattr__(self,attr): if attr in ['__tree_repr__','_OMFITcopyOf','_OMFITparent','_OMFITkeyName','dynaLoad','dynaSave','__len__','__getitem__']: raise AttributeError('bad attribute `%s`'%attr) else: tmp=getattr(self.idl,attr) if tmp is None: raise AttributeError('bad attribute `%s`'%attr) return tmp def __call__(self,*args,**kw): if not len(args) and not len(kw): return return self.idl(*args,**kw)
#--------------------------- # module tool functions #---------------------------
[docs]@_available_to_userTASK def initWorkdir(module_root, server=None, tunnel=None, workdir=None, remotedir=None, clean=True, quiet=False): """ High level function to simplify initialization of directories within a module. This function will: 1) Create and clear local and remote working directories 2) Change directory to local working directory Server and tunnel are specified in root['SETTINGS']['REMOTE_SETUP']['server'] and root['SETTINGS']['REMOTE_SETUP']['tunnel'] Local and remote working directories are specified in root['SETTINGS']['SETUP']['workDir'] and root['SETTINGS']['REMOTE_SETUP']['workDir'] :param module_root: root of the module :param server: string that overrides module server :param tunnel: string that overrides module tunnel :param workdir: string that overrides module local working directory :param remotedir: string that overrides module remote working directory :param clean: clear local/remote working directories * "local": clean local working directory only * "local_force": force clean local working directory only * "remote": clean remote working directory only * "remote_force": force clean remote working directory only * True: clean both * "force": force clean both * False: clean neither :param quiet: print command to screen or not :return: strings for local and remote directories (None if there was a problem in either one) """ if workdir is None: workdir=module_root['SETTINGS']['SETUP']['workDir'] if workdir.strip()==OMFITtmpDir: raise ValueError('Setting workdir as OMFITtmpDir is a bad idea and would lead to problems') workdir=str(workdir) if remotedir is None: remotedir=module_root['SETTINGS']['REMOTE_SETUP']['workDir'] remotedir=str(remotedir) if clean is False: cleanLocal=False cleanRemote=False elif clean is True or (isinstance(clean,basestring) and 'force'==clean.lower()): cleanLocal=True cleanRemote=True elif (isinstance(clean,basestring) and 'remote' in clean.lower()): cleanLocal=False cleanRemote=True elif (isinstance(clean,basestring) and 'local' in clean.lower()): cleanLocal=True cleanRemote=False else: raise AttributeError('`clean` argument not valid') # safety mechanism to detect if workdir and remotedir coincide. # if that is the case, delete files only if both cleanRemote and cleanLocal are True if os.path.exists(workdir) and os.path.exists(remotedir): tmpfile=omfit_hash(str(time.time()),10) with open(workdir+'/'+tmpfile,'w') as f: pass try: with open(remotedir+'/'+tmpfile,'r') as f: pass if not quiet: printi('Local and remote directory coincide!') cleanAtAll=bool(cleanLocal) & bool(cleanRemote) cleanLocal=cleanAtAll cleanRemote=cleanAtAll except Exception: pass os.remove(workdir+'/'+tmpfile) if cleanLocal and os.path.exists(workdir): if 'OMFIT' in workdir or (isinstance(clean,basestring) and 'force' in clean.lower()): if not quiet: printi('Clearing local directory: '+workdir) for dirTop, dirs, files in os.walk(workdir): for f in files: os.unlink(os.path.join(dirTop, f)) for d in dirs: shutil.rmtree(os.path.join(dirTop, d),ignore_errors=True) else: raise RuntimeError('This is a safety measure. The local working directory '+workdir+' does not contain the word "OMFIT" in it.\nTo allow clearing of local directory data, add the word "OMFIT" as part of the path, or set clean="local_force".') if not os.path.exists(workdir): os.makedirs(workdir) if not quiet: printi('Changed to working directory: '+workdir) os.chdir(workdir) if server is None: server=str(module_root['SETTINGS']['REMOTE_SETUP']['server']) if tunnel is None: tunnel=str(module_root['SETTINGS']['REMOTE_SETUP']['tunnel']) if cleanRemote: if 'OMFIT' in remotedir or (isinstance(clean,basestring) and 'force' in clean.lower()): if not quiet: printi('Clearing remote directory: '+remotedir) # steps to make sure that the remote filesystem is writable by the user # and that future remote_execute calls can `cd` to the remote directory: #1) create the directory #2) touch a file in it #3) remove all file in the directory remote_execute(server, '\\mkdir -p '+remotedir+'\n\\echo > '+remotedir+'/'+utils_base.now("touch_%Y-%m-%d__%H:%M:%S__%f")+'\n\\rm -rf '+remotedir+'/*', './', tunnel, ignoreReturnCode=False, quiet=True, use_bang_command=False) else: raise RuntimeError('This is a safety measure. The remote working directory '+remotedir+' does not contain the word "OMFIT" in it.\nTo allow clearing of remote directory data, add the word "OMFIT" as part of the path, or set clean="remote_force".') else: remote_execute(server, '\\mkdir -p '+remotedir, './', tunnel, ignoreReturnCode=False, quiet=True, use_bang_command=False) return workdir, remotedir
[docs]@_available_to_userTASK def executable(module_root=None, inputs=[], outputs=[], clean=True, interactive_input=None, server=None, tunnel=None, executable=None, workdir=None, remotedir=None, ignoreReturnCode=True, std_out=None, std_err=None, quiet=False, queued=False, keepRelativeDirectoryStructure=True, arguments='', script=None, forceRemote=False, progressFunction=None, use_bang_command='OMFIT_run_command.sh', extraButtons=None, xterm=False, clean_after=False): """ High level function that simplifies local/remote execution of software within a module. This function will: 1. cd to the local working directory 2. Clear local/remote working directories [True] by default 3. Deploy the the "input" objects to local working directory 4. Upload files them remotely 5. Executes the software 6. Download "output" files to local working directory Executable command is specified in root['SETTINGS']['SETUP']['executable'] Local and remote working directories are specified in root['SETTINGS']['SETUP']['workDir'] and root['SETTINGS']['REMOTE_SETUP']['workDir']. Server and tunnel are specified in root['SETTINGS']['REMOTE_SETUP']['server'] and root['SETTINGS']['REMOTE_SETUP']['tunnel']. If the tunnel is an empty string, the connection to the remote server is direct. If server is an empty string, everything will occur locally and the remote working directory will be ignored. :param module_root: root of the module (e.g. `root`) used to set default values for 'executable','server','tunnel','workdir', 'remotedir' if `module_root` is None or `module_root` is OMFIT then 'executable','server','tunnel','workdir', 'remotedir' must be specified :param inputs: list of input objects or path to files, which will be deployed in the local or remote working directory. To deploy objects with a different name one can specify tuples (inputObject,'deployName') :param outputs: list of output files which will be fetched from the remote directory :param clean: clear local/remote working directories * "local": clean local working directory only * "local_force": force clean local working directory only * "remote": clean remote working directory only * "remote_force": force clean remote working directory only * True: clean both [DEFAULT] * "force": force clean both * False: clean neither :param arguments: arguments which will be passed to the executable :param interactive_input: interactive input to be passed to the executable :param server: override module server :param tunnel: override module tunnel :param executable: override module executable :param workdir: override module local working directory :param remotedir: override module remote working directory :param ignoreReturnCode: ignore return code of executable :param std_out: if a list is passed (e.g. []), the stdout of the program will be put there line by line; if a string is passed and bool(queued), this should indicate the path of the file that gives the stdout of the queued job :param std_err: if a list is passed (e.g. []), the stderr of the program will be put there line by line; if a string is passed and bool(queued), this should indicate the path of the file that gives the stdout of the queued job :param quiet: if True, suppress output to the command box :param keepRelativeDirectoryStructure: [True/False] keep relative directory structure of the remote files :param script: string with script to be executed. `script` option requires `%s` in the command line location where you want the script filename to appear if `script` is a list or a tuple, then the first item should be the script itself and the second should be the script name :param forceRemote: force remote connection even if server is localhost :param progressFunction: user function to which the std-out of the process is passed and returns values from 0 to 100 to indicate progress towards completion :param queued: If cast as bool is True, invokes manage_job, using queued as qsub_findID keyword of manage_job, and also takes over std_out and std_err :param use_bang_command: Execute commands via `OMFIT_run_command.sh` script (useful to execute scripts within a given shell: #!/bin/...) If `use_bang_command` is a string, then the run script will take that filename. Notice that setting `use_bang_command=True` is not safe for multiple processes running in the same directory. :param extraButtons: dictionary with key/function that is used to add extra buttons to the GUI. The function receives a dictionary with the process std_out and pid :param xterm: if True, launch the command in its own xterm :param clean_after: (bool) If this flag is True, the remote working directory will be cleaned and deleted once the results have been transferred back to the local working directory. The remote directory must be on a remote server (not local) and have OMFIT in it's name. (default: False) :param use_scp: (bool) If this flag is True, the remote downsync of data will use the "scp" command instead of "rsync". This should be used for increased download speed. (default: False) :return: return code of the command """ if extraButtons is None: extraButtons = {} # if module_root is not set if module_root is None or module_root is OMFIT: from classes.omfit_python import OMFITworkDir if is_localhost(server): server = '' tunnel = '' # remotedir as relative path if remotedir is None and isinstance(workdir, basestring) and not workdir.strip().startswith(os.sep): remotedir = OMFITworkDir(root='out_of_module_runs' + os.sep + workdir, server=SERVER(server))