Source code for omfit_classes.omfit_omas

try:
    # framework is running
    from .startup_choice import *
except ImportError as _excp:
    # class is imported by itself
    if (
        'attempted relative import with no known parent package' in str(_excp)
        or 'No module named \'omfit_classes\'' in str(_excp)
        or "No module named '__main__.startup_choice'" in str(_excp)
    ):
        from startup_choice import *
    else:
        raise

from omfit_classes.utils_base import _streams
from omfit_classes.omfit_json import OMFITjson
from omfit_classes.omfit_base import OMFITexpression, evalExpr
from omfit_classes.omfit_mds import translate_MDSserver, tunneled_MDSserver

import numpy as np
import glob
import omas
from omas import *


# hook up function to process OMFITexpressions
def process_OMFITexpression(value):
    if isinstance(value, OMFITexpression):
        return evalExpr(value)
    return value


try:
    omas.omas_core.input_data_process_functions.append(process_OMFITexpression)
except AttributeError:
    # avoid OMFIT not starting if OMAS version is not the latest
    printe(f'WARNING: Your OMAS installation {os.path.dirname(omas.__file__)} is old. Please update it.')

# hook up omfit and omas print streams
omas.omas_utils._streams = _streams


def tunnel_mds(server, treename):
    """
    Resolve MDS+ server

    NOTE: This function makes use of the optional `omfit_classes` dependency to establish a SSH tunnel to the MDS+ server.

    :param server: MDS+ server address:port

    :param treename: treename (in case treename affects server to be used)

    :return: string with MDS+ server and port to be used
    """
    try:
        import omfit_classes.omfit_mds
    except (ImportError, ModuleNotFoundError):
        return server.format(**os.environ)
    else:
        server0 = omfit_classes.omfit_mds.translate_MDSserver(server, treename)
        tunneled_server = omfit_classes.omfit_mds.tunneled_MDSserver(server0, quiet=False)
        return tunneled_server

    return server.format(**os.environ)


try:
    omas.omas_machine.tunnel_mds = tunnel_mds
except AttributeError:
    # avoid OMFIT not starting if OMAS version is not the latest
    printe(f'WARNING: Your OMAS installation {os.path.dirname(omas.__file__)} is old. Please update it.')

# export __all__ from omas.__all__
__all__ = list([x for x in ['omas'] + list(omas.__dict__.keys()) if not x.startswith('_')])


[docs]def pprint_imas_data_dictionary_info(location): """ pretty print IMAS data dictionary info :param location: location in IMAS data dictionary """ location = omas.omas_utils.p2l(location) location = omas.omas_utils.l2u(location) info = omas_info_node(location.rstrip('.:')) printi('\n' + location) printi('-' * len(location)) prioritize = ['documentation', 'units', 'data_type', 'coordinates'] skip = ['full_path', 'structure_reference', 'type'] for item in prioritize: if item in info: printi(f'* {item}: ', end='') print(f'{info[item]}') for item in sorted(info): if item not in prioritize + skip: printi(f'* {item}: ', end='') print(f'{info[item]}')
def _base_omas_remote( serverPicker, target_function, python_script='from omas import *', python_prelude=None, python_command=None, quiet=False, forceRemote=False, IMAS_module=None, OMAS_module=None, UDA_module=None, **kw, ): r""" low-level function that uses OMFITx.remote_python to interface with remote IMAS servers :param serverPicker: remote server name :param target_function: 'save_omas_imas' or 'load_omas_imas' :param python_script: string with python script to execute 'from omas import *' :param python_prelude: anything before the python command :param python_command: python command :param quiet: verbose output or not :param forceRemote: force remote connection :param IMAS_module: allow specifying IMAS module version when doing `module load XXX` :param OMAS_module: allow specifying OMAS module version when doing `module load XXX` :param UDA_module: allow specifying UDA module version when doing `module load XXX` :param \**kw: parameters to be fed to the target_function :return: output of the target function """ serverPicker = SERVER(serverPicker) # set python_prelude and python_command based on what is run and where if python_prelude is not None and python_command is not None: pass elif python_prelude is not None and python_command is None: raise ValueError('If python_prelude is specified, so must be python_command') elif python_prelude is None and python_command is not None: raise ValueError('If python_command is specified, so must be python_prelude') elif 'iter' in target_function or is_server(serverPicker, ['iter']): if IMAS_module is None: IMAS_module = 'IMAS' if OMAS_module is None: OMAS_module = 'OMAS' python_prelude = '''#!/bin/sh -l module purge module load {IMAS_module} # module load {OMAS_module} ## disabled since OMAS module at ITER is not kept up-to-date export PYTHONPATH=/home/ITER/menghio/atom/omas:$PYTHONPATH export PYTHONPATH=/home/ITER/menghio/atom/omas/site-packages:$PYTHONPATH imasdb {machine} ''' elif is_server(serverPicker, ['altair', 'andromede']): if IMAS_module is None: IMAS_module = 'IMAS' if OMAS_module is None: OMAS_module = 'OMAS' python_prelude = '''#!/bin/sh -l module purge module load {IMAS_module}/3.30.0-4.8.4 # module load {OMAS_module} ## disabled since OMAS module at WEST does not exist export PYTHONPATH=/Applications/omfit/atom/omas:$PYTHONPATH export PYTHONPATH=/Applications/omfit/atom/omas/site-packages:$PYTHONPATH imasdb {machine} ''' elif is_server(serverPicker, 'itm_gateway'): if IMAS_module is None: IMAS_module = 'imasenv' if OMAS_module is None: OMAS_module = 'omas' # notice no `module purge` for itm_gateway python_prelude = '''#!/bin/sh -l module load {IMAS_module} module load {OMAS_module} imasdb {machine} ''' elif is_server(serverPicker, ['freia', 'heimdall']): python_prelude = '''#!/bin/sh -l module purge module load imas-modules module load imasenv/3.23.3 module load OMAS export PYTHONPATH=/common/transp_shared/omfit/atom-dev/omas/site-packages:$PYTHONPATH imasdb {machine} ''' elif is_server(serverPicker, ['sophie']): python_prelude = '''#!/bin/sh -l module purge module use /work/imas/etc/modulefiles module load java/jdk/1.8.0_231 module load python/3.6/9 module load mdsplus/alpha3 module load Blitz++/0.10p module load GCC/6.5.0 module load IMAS4OMAS imasdb {machine} ''' else: if is_localhost(serverPicker): python_command = sys.executable else: python_command = 'python' # use the default Python from the module load IMAS_module if python_command is None: python_command = 'python' if python_prelude is None: python_prelude = '' # if IMAS_module/OMAS_module have not been set yet, set them as imas/omas if IMAS_module is None: IMAS_module = 'imas' if OMAS_module is None: OMAS_module = 'omas' if UDA_module is None: UDA_module = 'uda' # note: user=None gets the user on remote user = kw.pop('user', None) if user is None: user = parse_server(SERVER[serverPicker]['server'])[0] if user: kw['user'] = user # substitute format strings python = ( python_prelude.strip().format( machine=kw.get('machine', ''), IMAS_module=IMAS_module, OMAS_module=OMAS_module, UDA_module=UDA_module, user=user ) + '\nexport OMAS_DEBUG_STDOUT=1\n' + python_command ) import omfit_classes.OMFITx as OMFITx return OMFITx.remote_python( None, executable=python + ' -u %s', python_script=python_script, target_function=target_function, workdir='./', server=SERVER[serverPicker]['server'], tunnel=SERVER[serverPicker]['tunnel'], remotedir=os.path.abspath(str(SERVER[serverPicker]['workdir']) + os.sep + 'OMAS_tmp') + os.sep, namespace=kw, quiet=quiet, forceRemote=forceRemote, )
[docs]def save_omas_imas_remote(serverPicker, ods, **kw): r""" Save OMAS data set to a remote IMAS server :param serverPicker: remote server name where to connect to :param \**kw: all other parameters are passed to the save_omas_imas function """ kw['ods'] = ods return _base_omas_remote(serverPicker, 'save_omas_imas', **kw)
try: save_omas_imas_remote.__doc__ += save_omas_imas.__doc__.replace('\n ', '\n ') except (NameError, AttributeError): pass
[docs]def load_omas_imas_remote(serverPicker, **kw): r""" Load OMAS data set from a remote IMAS server :param serverPicker: remote server name where to connect to :param \**kw: all other parameters are passed to the load_omas_imas function """ return _base_omas_remote(serverPicker, 'load_omas_imas', **kw)
try: load_omas_imas_remote.__doc__ += load_omas_imas.__doc__.replace('\n ', '\n ') except (NameError, AttributeError): pass
[docs]def load_omas_uda_remote(serverPicker, **kw): r""" Load OMAS data set from a remote UDA server :param serverPicker: remote server name where to connect to :param \**kw: all other parameters are passed to the load_omas_uda function """ kw['user'] = False return _base_omas_remote(serverPicker, 'load_omas_uda', **kw)
try: load_omas_uda_remote.__doc__ += load_omas_uda.__doc__.replace('\n ', '\n ') except (NameError, AttributeError): pass
[docs]def browse_imas_remote(serverPicker, **kw): r""" Browse available IMAS data (machine/pulse/run) for given user on remote IMAS server :param serverPicker: remote server name where to connect to :param \**kw: all other parameters are passed to the browse_imas function """ return _base_omas_remote(serverPicker, 'browse_imas', **kw)
try: load_omas_imas_remote.__doc__ += load_omas_imas.__doc__.replace('\n ', '\n ') except (NameError, AttributeError): pass
[docs]class OMFITiterscenario(OMFITjson): def __str__(self): output = [] space = 2 fields = {k: len(k) + space for k in list(self.values())[0]} for item in self: for k in fields: fields[k] = max([fields[k], len('%s' % self[item][k]) + space]) header = ''.join([k.ljust(fields[k]) for k in fields]) output.append(header) output.append('-' * len(header)) for item in self: row = [] for k in fields: row.append(('%s' % self[item][k]).ljust(fields[k])) output.append(''.join(row)) return '\n'.join(output)
[docs] def filter(self, conditions, squash=False): """ Filter database for certain conditions :param conditions: dictionary with conditions for returning a match. For example: {'List of IDSs':['equilibrium','core_profiles','core_sources','summary'], 'Workflow':'CORSICA', 'Fuelling':'D-T'} :param squash: remove attributes that are equal among all entries :return: OMFITiterscenario dictionary only with matching entries """ matches = OMFITiterscenario('filtered.json') for item in self: match = True for cnd in conditions: # mismatch of None if self[item][cnd] is not None and conditions[cnd] is None: match = False # mismatch of None elif self[item][cnd] is None and conditions[cnd] is not None: match = False # mismatch of strings elif isinstance(self[item][cnd], str) and conditions[cnd] not in self[item][cnd]: match = False # mismatch of numbers elif isinstance(self[item][cnd], (int, float)) and conditions[cnd] != self[item][cnd]: match = False # mismatch of entries in a list elif isinstance(self[item][cnd], list) and not np.all([v in self[item][cnd] for v in conditions[cnd]]): match = False if match: matches[item] = copy.deepcopy(self[item]) if squash and len(matches) > 1: fields = [k for k in list(self.values())[0]] different_fields = [] for k in fields: if np.all([matches[item][k] == list(matches.values())[0][k] for item in matches]): for item in matches: del matches[item][k] return matches
[docs]def iter_scenario_summary_remote(quiet=False, environment="module purge\nmodule load IMAS"): """ Access iter server and sun `scenario_summary` command :param quiet: print output of scenario_summary to screen or not :param environment: `module load {IMAS_module}` or the like :return: dictionary with info from available scenarios """ # fields to be read # 'composition', what = [ 'ref_name', 'ro_name', 'shot', 'run', 'type', 'workflow', 'database', 'confinement', 'ip', 'b0', 'fuelling', 'ne0', 'zeff', 'npeak', 'p_hcd', 'p_ec', 'p_ic', 'p_nbi', 'p_lh', 'location', 'idslist', ] # execute scenario_summary remotely executable = ( environment + ''' scenario_summary -c {what} '''.format( what=','.join(what) ) ) std_out = [] std_err = [] import omfit_classes.OMFITx as OMFITx try: OMFITx.remote_execute( SERVER['iter_login']['server'], executable, './', tunnel=SERVER['iter_login']['tunnel'], std_out=std_out, std_err=std_err, quiet=True, ) # identify scenario_summary block # (read data in reverse order because the ITER terminal prints out a lot of unwanted infos when connecting) count = 0 for k, line in enumerate(reversed(std_out)): if line.startswith('----'): if count != 0: break count = k header = std_out[-k - 2] headers = re.sub(r'\s\s+', '\t', header).strip().split('\t') header_start = [header.index(h) for h in headers] + [None] scenario_summary = std_out[-k : len(std_out) - count - 1] except Exception: tag_print('\n'.join(std_out), tag='PROGRAM_OUT') tag_print('\n'.join(std_err), tag='PROGRAM_ERR') raise # print original output to screen if not quiet: print('\n'.join(std_out[-k - 3 : len(std_out) - count])) # parse scenario_summary output scenarios = OMFITiterscenario('iter_scenarios.json') for li, line in enumerate(scenario_summary): items = [] for hi, start in enumerate(header_start[:-1]): items.append(line[start : header_start[hi + 1]].strip()) if len(items) != len(headers): print('items=', items) print('headers=', headers) print('Error in parsing table') raise OMFITexception('Did not parse scenario summary table correctly') scenario = dict(list(zip(map(lambda x: x.strip(), headers), items))) for k in headers: if scenario[k] == 'tbd': scenario[k] = None for k in [h for h in headers if '[' in h] + ['Zeff']: try: scenario[k] = float(scenario[k]) except (TypeError, ValueError): pass for k in ['Pulse', 'Run']: scenario[k] = int(scenario[k]) for k in ['List of IDSs']: scenario[k] = scenario[k].split() scenarios['%d_%d' % (scenario['Pulse'], scenario['Run'])] = scenario return scenarios
[docs]def load_omas_iter_scenario_remote(**kw): r""" Load OMAS iter scenario from a remote ITER IMAS server :param \**kw: all other parameters are passed to the load_omas_iter_scenario function """ kw['user'] = False return _base_omas_remote('iter_login', 'load_omas_iter_scenario', **kw)
try: load_omas_iter_scenario_remote.__doc__ += load_omas_iter_scenario.__doc__.replace('\n ', '\n ') except (NameError, AttributeError): pass ods_method_list = [func for func in dir(ODS) if callable(getattr(ODS, func)) and not func.startswith("__")]
[docs]class OMFITods(OMFITobject, ODS): def __init__(self, filename, ods=None, **kw): ODS_kw = function_arguments(ODS.__init__)[1] for kwarg in list(kw.keys()): if kwarg in ODS_kw: ODS_kw[kwarg] = kw.pop(kwarg) OMFITobject.__init__(self, filename, **kw) self.dynaLoad = False ODS.__init__(self, **ODS_kw) if ods is not None: self.update(ods) else: self.dynaLoad = True def __getstate__(self): from omas.omas_core import omas_dictstate state = {} for item in omas_dictstate + [ 'OMFITproperties', 'dynaLoad', 'modifyOriginal', 'readOnly', 'file_type', 'originalFilename', 'filename', 'link', ]: if item in self.__dict__: state[item] = self.__dict__[item] return state
[docs] @dynaLoad def load(self): ODS.load(self, self.filename)
[docs] @dynaSave def save(self): tmp = ODS() tmp.copy_attrs_from(self) tmp.update(self) tmp.save(self.filename)
for func in ods_method_list: if func not in ['save', 'load']: exec( ''' def {func}(self, *args, **kw): r"""{doc}""" dynaLoader(self) return ODS.{func}(self, *args, **kw) '''.format( func=func, doc=getattr(ODS, func).__doc__ ) )
# use OMFIT MDS+ tunneling within OMAS if compare_version(omas.__version__, '0.70') >= 0: def mds_machine_to_server_mapping(machine, treename, quiet=False): server0 = translate_MDSserver(machine, treename) tunneled_server = tunneled_MDSserver(server0, quiet=quiet) return tunneled_server from omas import omas_machine omas.omas_machine.mds_machine_to_server_mapping = mds_machine_to_server_mapping __all__.extend( [ 'save_omas_imas_remote', 'load_omas_imas_remote', 'load_omas_uda_remote', 'browse_imas_remote', 'iter_scenario_summary_remote', 'load_omas_iter_scenario_remote', 'OMFITiterscenario', 'OMFITods', 'pprint_imas_data_dictionary_info', ] ) if __name__ == '__main__': test_classes_main_header() # Sets output to PDF if no display & changes to temporary directory to avoid messes ods = ODS() # Just make sure it can initialize