"""
This file contains classes and functions for setting up regression test suites with convenient utilities
Usage:
 - Subclass a test case from OMFITtest
 - Override any of the default settings you like at the top of the class
 - Write test methods
 - Supply your test case (or a list of OMFITtest-based test cases) to manage_tests() as the first argument
See also: OMFIT-source/regression/test_template.py
"""
try:
    # Framework is running
    from .startup_choice import *
except ImportError as _excp:
    # class is imported by itself
    if (
        'attempted relative import with no known parent package' in str(_excp)
        or 'No module named \'omfit_classes\'' in str(_excp)
        or "No module named '__main__.startup_choice'" in str(_excp)
    ):
        from startup_choice import *
    else:
        raise
import traceback
import numpy as np
import unittest
import warnings
from matplotlib import pyplot
from matplotlib.cbook import MatplotlibDeprecationWarning
import yaml
import traceback
from omfit_classes.omfit_github import (
    set_gh_status,
    post_comment_to_github,
    get_pull_request_number,
    get_gh_remote_org_repo_branch,
    get_OMFIT_GitHub_token,
    delete_matching_gh_comments,
    on_latest_gh_commit,
)
__all__ = [
    'OMFITtest',
    'manage_tests',
    'setup_warnings',
    'get_server_name',
    'clear_old_test_report_comments',
    'auto_comment_marker',
    'run_test_outside_framework',
    'RedirectStdStreams',
    'auto_test_mode',
    'standard_test_keywords',
]
# Make a class for standard_test_keywords so it can have a docstring that describes the keywords
class StandardTestKeywords(dict):
    """
    Set of standard keywords to be accepted by defaultVars in OMFIT regression tests
    Provided as a central feature to make developing new tests easier.
    :param run_tests: bool
        Switch for enabling execution of tests. Disabling execution allows
        importing of objects from the test script.
    :param allow_gui: bool
        Allows test units that require GUI interactions to execute. Recommended:
        decorate affected units with:
            @unittest.skipUnless(allow_gui, 'GUIs not allowed')
            def test_blah_blah(...
    :param allow_long: bool
        Allows long test units to run. What qualifies as long is subjective.
        Recommended: decorate with:
            @unittest.skipUnless(allow_long, 'Long tests are not allowed')
            def test_blah(...
    :param failfast: bool
        Halt execution at the first failure instead of testing remaining units.
        Passed to unittest.
    :param force_gh_status: bool [optional]
        If not None, override default GitHub status posting behavior and force posting or not posting.
    :param force_gh_comment: int [optional]
        If not None, override default GitHub comment posting behavior with this value.
        The value is interpreted as bits, so 5 means 1 and 4.
            1 (bit 0): do a post or update
            2 (bit 1): if there's a failure to report, do a post or update
            4 (bit 2): edit an existing comment instead of posting if possible, otherwise post
            8 (bit 3): edit the top comment to add or update the test report (ignored if not combined with bit 2)
    :param only_these: string or list of strings [optional]
        Names of test units to run (with or without leading `test_`).
        Other test units will not be run. (None to run all tests)
    :param there_can_be_only_one: int or bool
        Set test report comment cleanup behavior; not very relevant if bit 3 of
        gh_comment isn't set. This value is interpreted as a set of binary flags,
        so 6 should be interpreted as options 2 and 4 being active.
        A value of True is converted into 255 (all the bits are True, including unused bits).
        A value of None is replaced by the default value, which is True or 255.
        A float or string will work if it can be converted safely by int().
        1: Any of the flags will activate this feature. The 1 bit has no special meaning beyond activation.
            If active, old github comments will be deleted. The newest report may be retained.
        2: Limit deletion to reports that match the combined context of the test being run.
        4: Only protect the latest comment if it reports a failure; if the last test passed, all comments may be deleted
        8: Limit scope to comments with matching username
    """
    def __init__(self):
        self.update(
            dict(
                run_tests=True,
                allow_gui=bool(('rootGUI' in OMFITaux) and OMFITaux['rootGUI']),
                allow_long=True,
                failfast=False,
                force_gh_status=None,
                force_gh_comment=None,
                only_these=None,
                there_can_be_only_one=True,
            )
        )
standard_test_keywords = StandardTestKeywords()
auto_comment_marker = (
    '<!--This comment was automatically generated by OMFIT and was not posted directly by a human. ' 'A2ZZJfxk2910x2AZZf -->'
)
# auto_test_mode is a flag that indicates that the tests are being run automatically, such as by morti, and so behavior should be modified.
try:
    auto_test_mode = bool(is_server('localhost', 'morti')) and repo is not None
except KeyError:
    auto_test_mode = False
[docs]class setup_warnings(object):
    """
    A context manager like `catch_warnings`, that copies and restores the warnings
    filter upon exiting the context, with preset levels of warnings that turn some
    warnings into exceptions.
    :param record: specifies whether warnings should be captured by a
        custom implementation of warnings.showwarning() and be appended to a list
        returned by the context manager. Otherwise None is returned by the context
        manager. The objects appended to the list are arguments whose attributes
        mirror the arguments to showwarning().
    :param module: to specify an alternative module to the module
        named 'warnings' and imported under that name. This argument is only useful
        when testing the warnings module itself.
    :param level: (int) Controls how many warnings should throw errors
        -1: Do nothing at all and return immediately
        0: No warnings are promoted to exceptions. Specific warnings defined in
            higher levels are ignored and the rest appear as warnings, but with
            'always' instead of 'default' behavior: they won't disappear after
            the first instance.
        All higher warning levels turn all warnings into exceptions and then
        selectively ignore some of them:
        1: Ignores everything listed in level 2, but also ignores many common
            math errors that produce NaN.
        2: RECOMMENDED: In addition to level 3, also ignores several warnings
            of low-importance, but still leaves many math warnings (divide by 0)
            as errors.
        3: Ignores warnings which are truly irrelevant to almost any normal
            regression testing, such as the warning about not being able to make
            backup copies of scripts that are loaded in developer mode. Should
            be about as brutal as level 4 during the actual tests, but somewhat
            more convenient while debugging afterward.
        4: No warnings are ignored. This will be really annoying and not useful
            for many OMFIT applications.
    """
    def __init__(self, level=2, record=False, module=None):
        """Specify whether to record warnings and if an alternative module
        should be used other than sys.modules['warnings'].
        For compatibility with Python 3.0, please consider all arguments to be keyword-only.
        """
        self._level = level
        self._record = record
        self._module = module or sys.modules['warnings']
        self._entered = False
    def __enter__(self):
        if self._entered:
            raise RuntimeError("Cannot enter %r twice" % self)
        self._entered = True
        self._filters = self._module.filters
        self._module.filters = self._filters[:]
        self._showwarning = self._module.showwarning
        if self._record:
            log = []
            def showwarning(*args, **kwargs):
                log.append(WarningMessage(*args, **kwargs))
            self._module.showwarning = showwarning
        else:
            log = None
        level = self._level
        if level < 0:
            if self._record:
                return log
            else:
                return None
        warnings.resetwarnings()
        np.seterr(all='warn')
        if level <= 0:
            warnings.simplefilter('always')
        else:
            warnings.simplefilter('error')
        if level <= 3:
            warnings.filterwarnings('ignore', message='OMFIT is unable to create script backup copies')
            warnings.filterwarnings('ignore', message='findfont: Font family*')
            # Future
            warnings.filterwarnings('ignore', category=FutureWarning)
        if level <= 2:
            # tight_layout()
            warnings.filterwarnings('ignore', message='tight_layout cannot make axes width small*')
            warnings.filterwarnings('ignore', message='tight_layout : falling back to Agg renderer')
            warnings.filterwarnings('ignore', message='Tight layout not applied*')
            # Underflow
            warnings.filterwarnings('ignore', message="underflow encountered in *")
            np.seterr(under='ignore')
            # Deprecation
            warnings.filterwarnings('ignore', category=DeprecationWarning)
            warnings.filterwarnings('ignore', category=MatplotlibDeprecationWarning)
            warnings.filterwarnings('ignore', category=np.VisibleDeprecationWarning)
            # xarray and numpy
            warnings.filterwarnings('ignore', category=DeprecationWarning, message='Using or importing the ABCs*')
            warnings.filterwarnings('ignore', category=FutureWarning, message='The Panel class is removed*')
            warnings.filterwarnings('ignore', category=PendingDeprecationWarning)
            warnings.filterwarnings('ignore', category=UserWarning, message="Warning: converting a masked element to nan.")
            warnings.filterwarnings('ignore', category=DeprecationWarning, message='distutils Version classes are deprecated')
            # psycopg2
            warnings.filterwarnings('ignore', message='The psycopg2 wheel package will be renamed from release 2.8*')
            # pandas
            warnings.filterwarnings('ignore', message="Pandas doesn't allow columns to be created via a new attribute name*")
            # omas
            warnings.filterwarnings('ignore', message='omas cython failed*')
        if level <= 1:
            # Divide by 0
            warnings.filterwarnings('ignore', message='divide by zero encountered.*')
            warnings.filterwarnings('ignore', message='invalid value encountered in divide')
            warnings.filterwarnings('ignore', message='invalid value encountered in true_divide')
            # Too many NaNs
            warnings.filterwarnings('ignore', message="All-NaN slice encountered")
            # Bad comparisons
            warnings.filterwarnings('ignore', message="invalid value encountered in *", category=RuntimeWarning)
            np.seterr(divide='ignore', invalid='ignore')
            # These warnings come out of NetCDF4
            warnings.filterwarnings(
                'ignore', message=r"tostring\(\) is deprecated\. Use tobytes\(\) instead\.", category=DeprecationWarning
            )
            # psycopg2 forward compatibility (which has already taken care of)
            warnings.filterwarnings(
                'ignore', message=r'The psycopg2 wheel package will be renamed from release 2\.8*"', category=UserWarning
            )
        if self._record:
            return log
        else:
            return None
    def __repr__(self):
        args = []
        if self._record:
            args.append("record=True")
        if self._module is not sys.modules['warnings']:
            args.append("module=%r" % self._module)
        name = type(self).__name__
        return "%s(%s)" % (name, ", ".join(args))
    def __exit__(self, *exc_info):
        if not self._entered:
            raise RuntimeError("Cannot exit %r without entering first" % self)
        self._module.filters = self._filters
        self._module.showwarning = self._showwarning
        return 
[docs]def get_server_name(hostname=socket.gethostname()):
    """
    Returns the hostname.
    For known servers, it sanitizes hostname; e.g. omega-b.cluster --> omega.gat.com
    """
    try:
        SERVER
    except NameError:
        printd(
            'get_server_name could not look up SERVER dictionary; returning unmodified hostname {}'.format(hostname), topic='omfit_testing'
        )
        return hostname
    else:
        server = is_server(hostname, list(SERVER.keys()))
        if server == 'localhost':
            printd('server is localhost so returning it unmodified', topic='omfit_testing')
            return hostname
        elif server in list(SERVER.keys()):
            printd('server {} is in SERVER dictionary, so parsing it'.format(server), topic='omfit_testing')
            return parse_server(SERVER[server]['server'])[2]
        else:
            printd('server {} is not in the server dictionary; will be returned without modification'.format(server), topic='omfit_testing')
            printd(SERVER, topic='omfit_testing')
            return hostname 
def enforce_test_report_header_in_top_comment(thread=None, **kw):
    """
    Checks for a test report header in the top of a pull request and appends it if it's missing.
    :param thread: int
        Pull request number. If not provided, it will be looked up.
    :param kw: Keywords passed to GitHub functions.
        These may include org, repository, token. Most of the functions will figure this stuff out on their own.
    """
    from omfit_classes.omfit_github import OMFITgithub_paged_fetcher, edit_github_comment
    test_report_header = '### Test reports\n\n---\n'
    if thread is None:
        thread = get_pull_request_number(**kw)
    if thread is None:
        print('Could not find an open pull request: no top comment to edit. Aborting.')
        return
    content = OMFITgithub_paged_fetcher(path='issues/{}'.format(thread), **kw).fetch()[0]['body']
    if content is None:  # Top comment is blank
        content = ''
    # Change line break style; otherwise the report header won't match after comment has been edited manually on GitHub.
    content = content.replace('\r\n', '\n')
    if test_report_header not in content:
        printd('Adding test report header to top comment of #{}'.format(thread), topic='omfit_testing')
        edit_github_comment(new_content=test_report_header, mode='append', separator=None)
    else:
        printd('#{} already has a test report header; nothing to do about this.'.format(thread), topic='omfit_testing')
    return
def gh_test_report_update(
    the_gh_comment='',
    contexts=None,
    gh_comment=None,
    skipped_comment=False,
    rtf=None,
    there_can_be_only_one=True,
    errors=0,
    post_comment_to_github_keywords=None,
    total_test_count_report='',
):
    """
    Handles github comment posts, edits, and cleanup
    :param the_gh_comment: str
        The comment to maybe post, including the test report
    :param contexts: str or list of strings
        Context(s) to consider when clearing old test report comments
    :param gh_comment: int or list of ints
        Comment behavior bits. Interpretation: 5 means do behaviors associated with 1 and 4.
        If a list is provided, the bits are combined with or. So [1, 4] comes out to 5, and [1, 5] comes out to 5.
        1 (bit 0): do a post or update (this function shouldn't be called by manage_tests unless bit 0 or bit 1 is set)
        2 (bit 1): if there's a failure to report, do a post or update
        4 (bit 2): edit an existing comment instead of posting if possible, otherwise post
        8 (bit 3): edit the top comment to add or update the test report (ignored if not combined with bit 2)
    :param skipped_comment: bool
        Flag indicating whether building the comment was already skipped; manage_tests has already interpreted
        there_can_be_only_one to mean it should suppress building of content to post as a comment and thus if it
        passes in nothing to post, we should act as if the last comment isn't really the last comment, because we're
        aware of a "true" last comment that would've existed if we hadn't skipped its creation and immediate deletion.
        So, the actual last comment doesn't get protected for being the last comment.
    :param rtf: str
        Formatted results table
    :param there_can_be_only_one: int
        Old report cleanup behavior bits. See clear_old_test_report_comments
    :param errors: int
        Number of errors encountered. Mostly matters whether this is 0 or not.
    :param post_comment_to_github_keywords: dict
        Keywords like org, repository to pass to post_comment_to_github() or edit_github_comment
    :param total_test_count_report: str
        A brief summary of the total number of test units executed and skipped
    """
    from omfit_classes.omfit_github import edit_github_comment
    import functools
    printd('Trying to update test report comments...', topic='omfit_testing')
    # Combine github comment bits
    gh_comment = functools.reduce(lambda x, y: x | y, gh_comment)
    ghc_bits = [int(bb) for bb in np.binary_repr(int(gh_comment), 8)[::-1]]
    # Now ghc_bits[0] is 1 if any of the 2^0 bits were set for any of the test suites in the list
    # Some setup stuff
    notice = 'Some tests failed!' if errors > 0 else 'All tests passed!'
    comment_would_be_deleted = int(np.binary_repr(int(there_can_be_only_one), 8)[::-1][2]) and (errors == 0)
    clear_shortcut = False
    # Update obsolete fork keyword into org, if it exists. Newer functions aren't being built with compatibility for the
    # old convention.
    if 'fork' in post_comment_to_github_keywords:
        post_comment_to_github_keywords['org'] = post_comment_to_github_keywords.pop('fork')
    jcontexts = ' + '.join(contexts)
    keytag = 'errors were detected while testing `||{}||`'.format(jcontexts)
    if ghc_bits[2] and ghc_bits[3]:
        # This is being inserted into the top comment, so apply some extra formatting and define separators.
        open_separator = '\n<details><summary>{}</summary>\n\n'.format(jcontexts)
        close_separator = '\n\n</details><!--{}-->\n'.format(jcontexts)
        comment_mark = None
        separator = [open_separator, close_separator]
        mode = 'replace_between'
        new_content = None  # Default new content to cause clearing of old report; overwrite if not clearing
    elif ghc_bits[2]:
        # Edit existing comment, completely replacing contents
        comment_mark = keytag
        mode = 'replace'
        separator = None
        new_content = ''
    else:
        mode = 'no_edit'  # Not a valid mode; don't run edit_github_comment
        separator = comment_mark = new_content = None
    edits_handled = False
    printd('  Update comments has edit mode = {}, and searches for comment_mark = {}'.format(mode, comment_mark), topic='omfit_testing')
    if len(the_gh_comment) and comment_would_be_deleted:
        # There's some content to post as a comment, but it would be cleaned up, so let's just skip everything.
        printd(
            'Skipping comment posting because it would be instantly deleted because the test passed and '
            'there_can_be_only_one = {}, which will cause the deletion of all passing test report comments '
            'for this context.'.format(there_can_be_only_one),
            topic='delete_comments',
        )
        clear_shortcut = True  # Don't preserve the last comment because we already resolved last-comment behavior
    elif len(the_gh_comment) and not comment_would_be_deleted:
        # There's content to post that wouldn't be automatically cleaned up after, so proceed with posting.
        # Add some header stuff to the comment
        the_gh_comment = '## {}\n{} {}\n{}\n{}'.format(notice, errors, keytag, total_test_count_report, the_gh_comment)
        the_gh_comment += '<details><summary>Test environment details</summary>\n\n```\n{}```\n</details>\n\n'.format(
            test_comp_info_summary(long_form=True)
        )
        if rtf is not None:
            the_gh_comment += '\n\n```\n{}\n```'.format(rtf)
        the_gh_comment += (
            '\n{} <!--The presence of the previous statement makes this comment vulnerable to automatic '
            'deletion. To protect it, delete the thing about it being automatically generated. -->'.format(auto_comment_marker)
        )
        # Figure out how to target it
        if ghc_bits[2] and ghc_bits[3]:
            # This is being inserted into the top comment, so apply some extra formatting.
            line_prefix = '> '  # Indents nested <details> tags to make them easier to interpret
            new_content = the_gh_comment.replace('\n', '\n' + line_prefix)
            enforce_test_report_header_in_top_comment()
        elif ghc_bits[2]:
            # Edit existing comment, completely replacing contents
            new_content = the_gh_comment
        else:
            new_content = ''
        if mode == 'no_edit':  # This isn't a real option in edit_github_comment, so we use it internally
            post_needed = True
        else:
            edits_handled = True
            post_needed = False
            printd('  Attempting to edit a comment...', topic='omfit_testing')
            try:
                edit_github_comment(
                    new_content=new_content, mode=mode, separator=separator, comment_mark=comment_mark, **post_comment_to_github_keywords
                )
            except KeyError:
                if not ghc_bits[3]:
                    printd('Could not find the GitHub comment to edit; create it instead', topic='omfit_testing')
                    # Trying to edit a regular comment and couldn't find a matching one, so matching[0] throws KeyError
                    # Post the comment instead of editing
                    post_needed = True
                else:
                    print('Attempt to edit GitHub comment has failed!')
            else:
                printd('  Success! Comment edited.', topic='omfit_testing')
        if post_needed:
            printd('Posting GitHub comment:\n{}'.format(the_gh_comment), topic='omfit_testing')
            try:
                post_comment_to_github(comment=the_gh_comment, **post_comment_to_github_keywords)
            except ValueError:  # Bad/missing token (expected common problem) results in ValueError
                print('GitHub comment post failed')
    elif (not len(the_gh_comment)) and skipped_comment:
        printd('No comment to post because it was already suppressed', topic='delete_comments')
        clear_shortcut = True
    if (np.any(gh_comment) or skipped_comment) and there_can_be_only_one != 0:
        printd('  Running cleanup tasks at the end of gh_test_report_update...', topic='omfit_testing')
        # Clear old GitHub comments
        relevant_keywords = ['thread', 'org', 'repository', 'token']
        if isinstance(post_comment_to_github_keywords, dict):
            clear_kw = {k: v for k, v in post_comment_to_github_keywords.items() if k in relevant_keywords}
        else:
            clear_kw = {}
        if int(np.binary_repr(int(there_can_be_only_one), 8)[-1]):
            lvl = there_can_be_only_one
        else:
            lvl = there_can_be_only_one + 1  # Make sure it's not a test
        printd(
            'Processing clearing of old github comments... '
            'lvl={lvl:}, clear_shortcut={clear_shortcut:}, contexts={contexts:}'.format(**locals()),
            topic='delete_comments',
        )
        if mode == 'no_edit':
            # We aren't editing comments, so we can do normal comment deletion behavior
            clear_old_test_report_comments(lvl=lvl, contexts=contexts, remove_all=clear_shortcut, **clear_kw)
        elif not edits_handled:
            printd('  Editing comments to clear old reports', topic='omfit_testing')
            # We're editing, so do cleanup by erasing parts of comments. new_content should've already been
            # appropriately defined as None (delete material from top comment) or '' (replace regular comment body
            # with blank).
            # In edit mode, cleanup isn't needed unless edits were skipped above. There is an overly complicated
            # interaction with the pre-existing system that handled comment creation and deletion; if we only ever did
            # edits, the whole thing could be simpler.
            edit_github_comment(
                new_content=new_content, mode=mode, separator=separator, comment_mark=comment_mark, **post_comment_to_github_keywords
            )
    else:
        printd('Skipped processing old github comment clearing', topic='delete_comments')
    return
[docs]class RedirectStdStreams(object):
    """Redirects stdout and stderr streams so you can merge them and get an easier to read log from a test."""
    # https://stackoverflow.com/a/6796752/6605826
    def __init__(self, stdout=None, stderr=None):
        self._stdout = stdout or sys.stdout
        self._stderr = stderr or sys.stderr
    def __enter__(self):
        self.old_stdout, self.old_stderr = sys.stdout, sys.stderr
        self.old_stdout.flush()
        self.old_stderr.flush()
        sys.stdout, sys.stderr = self._stdout, self._stderr
    def __exit__(self, exc_type, exc_value, traceback):
        self._stdout.flush()
        self._stderr.flush()
        sys.stdout = self.old_stdout
        sys.stderr = self.old_stderr 
    # End of class RedirectStdStreams
[docs]def run_test_outside_framework(test_script, catch_exp_reports=True):
    """
    Deploys a test script outside the framework. Imports will be different.
    To include this in a test unit, do
    >>> return_code, log_tail = run_test_outside_framework(__file__)
    :param test_script: string
        Path to the file you want to test.
        If a test has a unit to test itself outside the framework, then this should be __file__.
        Also make sure a test can't run itself this way if it's already outside the framework.
    :param catch_exp_reports: bool
        Try to grab the end of the log starting with exception reports.
        Only works if test_script merges stdout and stderr; otherwise the exception reports will be somewhere else.
        You can use `with RedirectedStdStreams(stderr=sys.stdout):` in your code to do the merging.
    :return: (int, string)
        Return code (0 is success)
        End of output
    """
    print('Running {} in a separate process (no framework)'.format(os.path.basename(test_script)))
    result = subprocess.Popen(['python3', test_script], stdout=subprocess.PIPE, stderr=subprocess.STDOUT, universal_newlines=True)
    result.wait()
    return_code = result.returncode
    print('Run complete. Return code = {}'.format(return_code))
    output = result.communicate()[0]
    table_mark = 'Table of results for all tests:'
    excp_mark = 'Exception reports from failed tests'
    if catch_exp_reports and (excp_mark in output):
        log_tail = excp_mark + output.split(excp_mark)[-1]
    elif table_mark in output:
        log_tail = table_mark + output.split(table_mark)[-1]
    else:
        lines = output.split('\n')
        log_tail = '\n'.join(lines[-(min([10, len(lines)])) :])
    return return_code, log_tail 
def _is_subtype(expected, basetype):
    """Stolen from unittest to support _AssertRaiseSimilarContext"""
    if isinstance(expected, tuple):
        return all(_is_subtype(e, basetype) for e in expected)
    return isinstance(expected, type) and issubclass(expected, basetype)
class _AssertRaiseSimilarContext(object):
    """
    Copied from unittest and modified to support loose matching
    """
    _base_type = BaseException
    _base_type_str = 'an exception type or tuple of exception types'
    def __init__(self, similar_exc, test_case, expected_regex=None):
        assert issubclass(similar_exc, Exception), f'similar_exc should be subclass of Exception, not {similar_exc}'
        self.similar_exc = similar_exc
        self.test_case = test_case
        if expected_regex is not None:
            expected_regex = re.compile(expected_regex)
        self.expected_regex = expected_regex
        self.obj_name = None
        self.msg = None
    def _raiseFailure(self, standard_msg):
        raise self.test_case.failureException(f'{self.msg} : {standard_msg}')
    def __enter__(self):
        return self
    def check_similarity(self, exc_type):
        """
        Checks whether an exception is similar to the nominally expected similar exception
        :param exc_type: class
            An exception class to be compared to self.similar_exc
        """
        if issubclass(exc_type, Exception):
            same_name = exc_type.__name__ == self.similar_exc.__name__
            same_bases = exc_type.__bases__ == self.similar_exc.__bases__
            return same_name and same_bases
        else:
            return False
    def __exit__(self, exc_type, exc_value, tb):
        if exc_type is None:
            try:
                exc_name = self.similar_exc.__name__
            except AttributeError:
                exc_name = str(self.similar_exc)
            if self.obj_name:
                self._raiseFailure("{} not raised by {}".format(exc_name, self.obj_name))
            else:
                self._raiseFailure("{} not raised".format(exc_name))
        else:
            traceback.clear_frames(tb)
        if (not issubclass(exc_type, self.similar_exc)) and (not self.check_similarity(exc_type)):
            # let unexpected exceptions pass through if their names don't match expectation
            return False
        # store exception, without traceback, for later retrieval
        self.exception = exc_value.with_traceback(None)
        if self.expected_regex is None:
            return True
        expected_regex = self.expected_regex
        if not expected_regex.search(str(exc_value)):
            self._raiseFailure('"{}" does not match "{}"'.format(expected_regex.pattern, str(exc_value)))
        return True
    def handle(self, name, args, kwargs):
        """
        If args is empty, assertRaises/Warns is being used as a
        context manager, so check for a 'msg' kwarg and return self.
        If args is not empty, call a callable passing positional and keyword
        arguments.
        """
        try:
            if not _is_subtype(self.similar_exc, self._base_type):
                raise TypeError('%s() arg 1 must be %s' % (name, self._base_type_str))
            if not args:
                self.msg = kwargs.pop('msg', None)
                if kwargs:
                    raise TypeError('%r is an invalid keyword argument for this function' % (next(iter(kwargs)),))
                return self
            callable_obj, *args = args
            try:
                self.obj_name = callable_obj.__name__
            except AttributeError:
                self.obj_name = str(callable_obj)
            with self:
                callable_obj(*args, **kwargs)
        finally:
            # bpo-23890: manually break a reference cycle
            self = None
[docs]class OMFITtest(unittest.TestCase):
    """
    Test case with some extra methods to help with OMFIT testing tasks
    To use this class, make your own class that is a subclass of this one:
    >> class TestMyStuff(OMFITtest):
    >>     notify_gh_status = True  # Example of a test setting you can override
    In the top of your file, override key test settings as needed
    Test settings you can override by defining them at the top of your class:
    -------------------------------
    :param warning_level: int
        Instructions for turning some warnings into exceptions & ignoring others
        -1: Make no changes to warnings
        0: No exceptions from warnings
        1: Ignores some math warnings related to NaNs & some which should be
            benign. Exceptions for other warnings.
        2: (RECOMMENDED) Ignores a small set of warnings which are probably
            benign, but exceptions for everything else.
        3: Exceptions for practically every warning. Only ignores some really
            inconsequential ones from OMFIT.
        4: No warnings allowed. Always throw exceptions!
        The warnings are changed before your test starts, so you can still
        override or change them in s() or __init__().
    :param count_figs: bool
        Enable counting of figures. Manage using collect_figs(n) after opening
        n figures. The actual figure count will be compared to the expected
        count (supplied by you as the argument), resulting in an AssertionError
        if the count does not match.
    :param count_guis: bool
        Enable counting of GUIs. Manage using collect_guis(n) after opening
        n GUIs. AssertionError if GUI count does not match expectation given
        via argument.
    :param leave_figs_open: bool
        Don't close figures at the end of each test (can lead to clutter)
    :param modules_to_load: list of strings or tuples
        Orders OMFIT to load the modules as indicated.
        Strings: modules ID. Tuples: (module ID, key)
    :param report_table: bool
        Keep a table of test results to include in final report
    :param table_sorting_columns: list of strings
        Names of columns to use for sorting table. Passed to table's group_by().
        This is most useful if you are adding extra columns to the results table
        during your test class's __init__() and overriding tearDown() to
        populate them.
    :param notify_gh_comment: int
        Turn on automatic report in a GitHub comment
        0: off
        1: always try to post or edit
        2: try to post or edit on failure only
        4: edit a comment instead of posting a new one if possible; only post if necessary
        8: edit the top comments and append test report or replace existing report with same context
        5 = behaviors of 4 and 1 (for example)
    :param notify_gh_status: bool
        Turn on automatic GitHub commit status updates
    :param gh_individual_status: int
        0 or False: No individual status contexts.
            Maximum of one status report if notify_gh_status is set.
        1 or True: Every test gets its own status context, including a pending
            status report when the test starts.
        2: Tests get their own status context only if they fail.
            No pending status for individual tests.
        3: Like 2 but ignores notify_gh_status and posts failed status reports
            even if reports are otherwise disabled.
    :param gh_overall_status_after_individual: bool
        Post the overall status even if individual status reports are enabled
        (set to 1 or True). Otherwise, the individual contexts replace the
        overall context. The overall status will be posted if individual status
        reports are set to 2 (only on failure).
    :param notify_email: bool
        Automatically send a test report to the user via email
    :param save_stats_to: dict-like
        Container for catching test statistics
    :param stats_key: string [optional]
        Test statistics are saved to save_stats_to[stats_key].
        stats_key will be automatically generated using the subclass name and a
        timestamp if left as None.
    :param topics_skipped: list of strings
        Provide a list of skipped test topics in order to have them included in
        the test report. The logic for skipping probably happens in the setup of
        your subclass, so we can't easily generate this list here.
    :param omfitx: reference to OMFITx
        Provide a reference to OMFITx to enable GUI counting and closing.
        This class might not be able to find OMFITx by itself, depending on how
        it is loaded.
    """
    # Test setup: All of these settings can be overridden in the subclass
    # Define them here instead of in init so they're available to class methods
    count_figs = False
    count_guis = False
    leave_figs_open = False
    modules_to_load = []
    omfitx = None
    report_table = True
    table_sorting_columns = ['Result', 'Time']
    notify_gh_status = 0
    notify_gh_comment = 14 * auto_test_mode  # 14 is append report of failures only to top comment
    gh_individual_status = 2 + auto_test_mode  # 2 is only on failure, 3 is on failure even if overall status is off.
    gh_overall_status_after_individual = True
    notify_email = False
    save_stats_to = None
    stats_key = None
    topics_skipped = []
    warning_level = 2
    verbosity = 1
    debug_topic = None
    # Tracking
    stats = {
        'figures_expected': 0,
        'figures_detected': 0,
        'pre_opened_figs': None,  # We don't know this when the class gets defined. Update it in init.
        'already_saw_figures': None,
        'guis_expected': 0,  # Keep count the number of GUIs that should be opened by the test so the user can confirm
        'guis_detected': 0,
        'time_elapsed': {},
        'fig_count_discrepancy': 0,
        'gui_count_discrepancy': 0,
    }
    # Timing
    test_timing_t0 = 0
    test_timing_t1 = 0
    force_gh_status = None
    set_gh_status_keywords = {}  # This will be overwritten by manage_tests; don't control from here.
    def __init__(self, *args, **kwargs):
        import astropy.table
        super().__init__(*args, **kwargs)
        # Define sample experiment information: override later if needed
        experiment = dict(device='DIII-D', shot=154749, shots=[154749, 154754], time=2500.0, times=[2500.0, 3000.0])
        self.default_attr(experiment)
        # Reset stats
        for thing in self.stats.keys():
            if is_numeric(self.stats[thing]):
                self.stats[thing] = 0
        self.stats['pre_opened_figs'] = pyplot.get_fignums()
        self.stats['time_elapsed'] = {}
        self.force_gh_status = None  # manage_tests() might set this differently when running
        # Manage figures and GUIs
        if self.count_guis and self.omfitx is not None:
            try:
                self.omfitx.CloseAllGUIs()
            except NameError:
                # OMFITx not defined because running outside of GUI
                self.count_guis = False
            else:
                assert len(self.omfitx._GUIs) == 0, 'GUIs should have all been closed just now.'
        if len(self.modules_to_load):
            for m2l in self.modules_to_load:
                if isinstance(m2l, tuple):
                    m2l, tag = m2l
                else:
                    tag = '{}_test_copy'.format(m2l)
                try:
                    okay_ = OMFIT[tag]['SETTINGS']['MODULE']['ID'] == m2l
                except KeyError:
                    okay_ = False
                if okay_:
                    setattr(self, m2l, OMFIT[tag])
                else:
                    if hasattr(OMFIT, 'loadModule'):
                        OMFIT.loadModule(m2l, location="{}['{}']".format(treeLocation(OMFIT)[-1], tag), quiet=True)
                        setattr(self, m2l, OMFIT[tag])
                    else:
                        printe('UNABLE TO LOAD MODULES OUTSIDE OF THE FRAMEWORK. Failed to load {}.'.format(m2l))
                        setattr(self, m2l, None)
        if self.stats_key is None:
            self.stats_key = 'stats_{}_{}'.format(self.__class__.__name__, str(datetime.datetime.now()).replace(' ', '_'))
        if self.report_table:
            self.results_tables = [astropy.table.Table(names=('Name', 'Result', 'Time', 'Notes'), dtype=('S50', 'S30', 'f8', 'S50'))]
            self.results_tables[0]['Time'].unit = 'ms'
            self.results_tables[0]['Time'].format = '0.3e'
        else:
            self.results_tables = [None]
        # Make a place to allow for collection of test meta data to be printed with exception reports
        self.meta_data = {}
        # Tell the user about potential problems with their setup
        show_general = False
        if self.omfitx is None and self.count_guis:
            self.printv(
                'To count GUIs, please define self.omfitx = OMFITx in namespace that has access to OMFITx.\n'
                'This message was shown because count_guis was set but no reference to OMFITx was provided.',
                print_function=printw,
            )
            show_general = True
        if show_general:
            self.printv(
                '\nYou can just define things for your test class right at the top of the class definition.\n'
                'For example, for OMFITx, just put omfitx = OMFITx right after class.\n'
                'You can also put self.omfitx = OMFITx within __init__().',
                print_function=printw,
            )
        return
[docs]    def assertRaisesSimilar(self, similar_exc, *args, **kwargs):
        """
        Assert that some code raises an exception similar to the one provided.
        The purpose is to bypass the way OMFIT's .importCode() will provide a new reference to an exception
        that differs from the exception received by a script that does from OMFITlib_whatever import Thing
        """
        context = _AssertRaiseSimilarContext(similar_exc, self)
        try:
            return context.handle('assertRaises', args, kwargs)
        finally:
            # bpo-23890: manually break a reference cycle
            context = None 
[docs]    def default_attr(self, attr, value=None):
        """
        Sets an attribute to a default value if it is not already set
        :param attr: string or dict
            string: the name of the attribute to set
            dict: call self once for each key/value pair, using keys for attr and values for value
        :param value: object
            The value to assign to the attribute
        """
        if isinstance(attr, dict):
            for k, v in attr.items():
                self.default_attr(k, v)
            return
        if getattr(self, attr, None) is None:
            setattr(self, attr, value)
        return 
[docs]    def collect_guis(self, count=0):
        """
        Counts and then closes GUIs
        :param count: int
            Number of GUIs expected since last call.
            Actual and expected counts will be accumulated in self.stats.
        """
        if self.omfitx is None:
            return
        self.stats['guis_expected'] += int(bool(self.count_guis)) * count
        self.stats['guis_detected'] += len(self.omfitx._GUIs) * int(bool(self.count_guis))
        self.omfitx.CloseAllGUIs()
        if self.count_guis:
            prior_discrepancy = copy.copy(self.stats['gui_count_discrepancy'])
            expect_mod = self.stats['guis_expected'] + prior_discrepancy
            # Update the discrepancy so the same mismatch doesn't trigger another error during teardown.
            self.stats['gui_count_discrepancy'] = self.stats['guis_detected'] - self.stats['guis_expected']
            # fmt: off
            assert self.stats['guis_detected'] == expect_mod, (
                "{} GUIs detected doesn't match {} expected "
                "(expectation modified to include prior discrepancies: {})".format(
                    self.stats['guis_detected'], expect_mod, prior_discrepancy
                )
            )
            # fmt: on
        elif count > 0:
            self.printv('GUI counting is disabled. To count GUIs opened and compare to expectations, set self.count_guis')
        return 
[docs]    def collect_figs(self, count=0):
        """
        Counts and/or closes figures
        :param count: int
            Number of figures expected since last call.
            Actual and expected counts will be accumulated in self.stats.
        """
        draw_problems = []
        draw_tracebacks = []
        self.stats['figures_expected'] += count * int(self.count_figs)
        if (count > 0) and (not self.count_figs):
            self.printv('Figure counting is disabled. To count figures opened & compare to expectations, set self.count_figs')
        if not self.leave_figs_open:
            for fignum in pyplot.get_fignums():
                if fignum not in self.stats['pre_opened_figs']:
                    try:
                        figure(fignum).canvas.draw()
                    except Exception as exc:
                        draw_problems += [exc]
                        draw_tracebacks += [''.join(traceback.format_exception(*sys.exc_info()))]
                    close(figure(fignum))
                    self.stats['figures_detected'] += int(self.count_figs)
            if self.count_figs:
                expect_mod = self.stats['figures_expected'] + self.stats['fig_count_discrepancy']
                # Update the discrepancy so the same mismatch doesn't trigger another error during teardown.
                self.stats['fig_count_discrepancy'] = self.stats['figures_detected'] - self.stats['figures_expected']
                # fmt: off
                assert self.stats['figures_detected'] == expect_mod, (
                    "{} figs detected doesn't match {} expected "
                    "(expectation modified to include prior discrepancies)".format(self.stats['figures_detected'], expect_mod)
                )
                # fmt: on
        else:
            try:
                pyplot.gcf().canvas.draw()
            except Exception as exc:
                draw_problems += [exc]
                draw_tracebacks += [''.join(traceback.format_exception(*sys.exc_info()))]
        if len(draw_problems) > 0:
            message = f'{len(draw_problems)} problem(s) detected while rendering the present group of plots:'
            printe(message)
            for idp, draw_problem in enumerate(draw_problems):
                print(f'Draw problem {idp + 1}/{len(draw_problems)}:')
                printe(draw_tracebacks[idp])
            print('Raising the first draw problem...')
            raise draw_problems[0]
        return 
[docs]    def setUp(self):
        # Announce start of test
        test_id = self.id()
        test_name = '.'.join(test_id.split('.')[-2:])
        header = '\n'.join(['', '~' * len(test_name), test_name, '~' * len(test_name)])
        print(header)
        self.printdq(header)
        if self.count_figs:
            self.stats['fig_count_discrepancy'] = self.stats['figures_detected'] - self.stats['figures_expected']
            self.printdq(
                '    Starting {} with pre-existing overall figure count discrepancy of {}'.format(
                    test_name, self.stats['fig_count_discrepancy']
                )
            )
        if self.count_guis:
            self.stats['gui_count_discrepancy'] = self.stats['guis_detected'] - self.stats['guis_expected']
            self.printdq(
                '    Starting {} with pre-existing overall GUI count discrepancy of {}'.format(
                    test_name, self.stats['gui_count_discrepancy']
                )
            )
        if self.notify_gh_status and (int(self.gh_individual_status) == 1) and (self.force_gh_status is not False):
            context = self.get_context()
            response = set_gh_status(state='pending', context=context, **self.set_gh_status_keywords)
            self.printdq('gh_post', 'pending', context, response)
        self.stats['already_saw_figures'] = pyplot.get_fignums()
        self.test_timing_t0 = time.time()
        return 
[docs]    def tearDown(self):
        import astropy.table
        # Announce end of test with debug print
        self.test_timing_t1 = time.time()
        dt = self.test_timing_t1 - self.test_timing_t0
        test_name = '.'.join(self.id().split('.')[-2:])
        self.printdq('    {} done.'.format(test_name))
        self.printdq('    {} took {:0.6f} s\n'.format(test_name, dt))
        self.stats['time_elapsed'][test_name] = dt
        # Cleanup figures and GUIs between tests
        try:
            self.collect_figs()
            self.collect_guis()
        except AssertionError as fg_exc:
            fig_gui_mismatch = fg_exc
        else:
            fig_gui_mismatch = False
        # Handle individual GitHub status reports & Results table
        # Check for errors/failures in order to get state & description.  https://stackoverflow.com/a/39606065/6605826
        if hasattr(self, '_outcome'):  # Python 3.4+
            result = self.defaultTestResult()  # these 2 methods have no side effects
            self._feedErrorsToResult(result, self._outcome.errors)
            problem = result.errors or result.failures
            state = (not problem) and (not fig_gui_mismatch)
            if result.errors:
                exc_note = result.errors[0][1].strip().split('\n')[-1]
            elif result.failures:
                exc_note = result.failures[0][1].strip().split('\n')[-1]
            else:
                exc_note = '' if state else "(couldn't get exception report)"
        else:  # Python 3.2 - 3.3 or 3.0 - 3.1 and 2.7
            # result = getattr(self, '_outcomeForDoCleanups', self._resultForDoCleanups)  # DOESN'T WORK RELIABLY
            exc_type, exc_value, exc_traceback = sys.exc_info()
            state = (exc_type in [None, unittest.case._ExpectedFailure]) and not fig_gui_mismatch
            exc_note = '' if exc_value is None else '{}: {}'.format(exc_type.__name__, exc_value)
        gis = int(self.gh_individual_status)
        ngs = self.notify_gh_status
        if ((gis == 1) and ngs) or ((gis == 2) and (not state) and ngs) or ((gis == 3) and (not state)):
            description = test_comp_info_summary() + exc_note
            context = self.get_context()
            # Post it
            try:
                response = set_gh_status(state=state, context=context, description=description, **self.set_gh_status_keywords)
            except ValueError:  # Bad/missing token (expected common problem) results in ValueError
                self.printv('Failed to update GitHub status')
            else:
                self.printdq('gh_post', state, context, description, response)
        if self.results_tables[0] is not None:
            test_result = 'pass' if state else 'FAIL'
            self.printdq(
                '  Adding row to results_table: {}, {}, {:0.2e} ms, {} | {}'.format(test_name, test_result, dt * 1000, exc_note, self.id())
            )
            self.results_tables[0].add_row()
            self.results_tables[0][-1]['Time'] = dt * 1000  # Convert to ms
            self.results_tables[0][-1]['Result'] = test_result
            with warnings.catch_warnings():
                warnings.filterwarnings('ignore', category=astropy.table.StringTruncateWarning)
                self.results_tables[0][-1]['Name'] = test_name
                self.results_tables[0][-1]['Notes'] = exc_note
        if self.leave_figs_open:
            # If figures aren't being closed, give them nicer titles at least
            new_plots = [plot for plot in pyplot.get_fignums() if plot not in self.stats['already_saw_figures']]
            for i, plot in enumerate(new_plots):
                pyplot.figure(plot).canvas.set_window_title('{} {}'.format(test_name, i))
        if fig_gui_mismatch:
            raise fig_gui_mismatch
        self.printdq('-' * 80 + '\n')
        return 
[docs]    def get_context(self):
        """Sanitize test id() so it doesn't start with omfit_classes.omfit_python or who knows what --> define context"""
        class_name = self.__class__.__name__
        test_name = self.id().split(class_name)[-1][1:]  # First char will be .
        pyver = '.'.join(map(str, sys.version_info[0:2]))
        context = 'p{} {}.{}@{}'.format(pyver, class_name, test_name, get_server_name())
        return context 
[docs]    @classmethod
    def printv(cls, *args, **kw):
        if cls.verbosity > 0:
            kw.pop('print_function', print)(*args)
        return 
[docs]    def printdq(self, *args):
        topic = self.debug_topic or self.__class__.__name__
        printd(*args, topic=topic)
        return 
[docs]    @classmethod
    def tearDownClass(cls):
        if cls.omfitx is not None and cls.count_guis:
            cls.printv('  This test should have opened {:} GUI(s)'.format(cls.stats['guis_expected']))
            guis_during_teardown = len(cls.omfitx._GUIs)
            cls.stats['guis_detected'] += guis_during_teardown
            if not cls.leave_figs_open:
                cls.omfitx.CloseAllGUIs()
            gui_count_match = cls.stats['guis_detected'] == cls.stats['guis_expected']
            cls.printv(
                '\n{} GUIs were counted, with {} counted while tearing down the test class.'.format(
                    cls.stats['guis_detected'], guis_during_teardown
                )
            )
            cls.printv(
                '{} GUIs were expected. There is {} problem here.'.format(cls.stats['guis_expected'], ['a', 'no'][int(gui_count_match)])
            )
        else:
            gui_count_match = True
        if cls.count_figs:
            # Cleanup any remaining figures (they probably were all taken care of in tearDown() after each test)
            if cls.leave_figs_open:
                cls.stats['figures_detected'] = len(pyplot.get_fignums()) - len(cls.stats['pre_opened_figs'])
            else:
                for fignum in pyplot.get_fignums():
                    if fignum not in cls.stats['pre_opened_figs']:
                        close(figure(fignum))
                        cls.stats['figures_detected'] += 1
            count_match = cls.stats['figures_detected'] == cls.stats['figures_expected']
            cls.printv('\n{} figures were counted while tearing down the test class.'.format(cls.stats['figures_detected']))
            cls.printv(
                '{} figures should have been opened. There is {} problem here.'.format(
                    cls.stats['figures_expected'], ['a', 'no'][int(count_match)]
                )
            )
        else:
            count_match = True
        if isinstance(cls.save_stats_to, dict):
            cls.printv('Saving test statistics within user-provided dict-like object with key {}'.format(stats_key))
            cls.save_stats_to[stats_key] = cls.stats
        # Put backup experiment settings back in place
        if OMFIT is not None:
            OMFIT['MainSettings']['Experiment'].update(getattr(cls, 'backup_experiment', {}))
            backups = [k for k in dir(cls) if k.startswith('backup_experiment_')]
            for backup in backups:
                module = backup.split('backup_experiment_')[-1]
                OMFIT['{}_test_copy'.format(module)].experiment(**getattr(cls, backup, {}))
        if cls.leave_figs_open:
            with warnings.catch_warnings():
                warnings.filterwarnings('ignore', category=UserWarning, message='.*non-GUI backend.*')
                pyplot.show()
        # Get angry?
        assert count_match, 'Number of figures detected ({}) does not match number of figures expected ({})'.format(
            cls.stats['figures_detected'], cls.stats['figures_expected']
        )
        assert gui_count_match, 'Number of GUIs detected ({}) does not match number of GUIs expected ({})'.format(
            cls.stats['guis_detected'], cls.stats['guis_expected']
        )
        return  
    # End of class OMFITtest
# noinspection PyBroadException
def test_comp_info_summary(short=False, long_form=False):
    """
    Generates a short summary of computing environment setup
    Potentially includes hostname, git branch, current time, and commit hashes,
    depending on short & long_form keywords
    :param short: bool
        Return compact form: squeeze onto one line and drop branch info. Since
        this is intended for GitHub status reports, the branch is probably
        already known, and it won't fit, anyway.
    :param long_form: bool
        Add more info to the end, concerning commit hashes. This is intended for
        non-GitHub uses like email, where the report won't be attached
        to a commit history in an obvious way. This triggers a simple append
        operation that will work even if short=True.
    :return: string
        Intended to be appended to a test report
    """
    try:
        test_branch = repo.active_branch()[0]
    except Exception:
        test_branch = '<<<Failed to look up current branch>>>'
    try:
        omfit_sha = repo.get_hash(repo_active_branch_commit)
        branch_sha = repo.get_hash('HEAD~0')
    except Exception:
        omfit_sha = branch_sha = '<<<Failed to determine commit hash>>>'
    pyver = '.'.join(map(str, sys.version_info[0 : 2 if short else 3]))
    form = 'p{v:} {h:} @ {t:} ' if short else 'Py ver {v:}\nHost: {h:}\nCompleted {t:}\n' 'On branch {b:}\n'
    if long_form:
        form += 'Most recent commit @ OMFIT start time: {omfit_sha:}\n' 'Most recent commit on the branch now: {branch_sha:}\n'
    return form.format(
        h=socket.gethostname(),
        t=datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
        b=test_branch,
        omfit_sha=omfit_sha,
        branch_sha=branch_sha,
        v=pyver,
    )
[docs]def manage_tests(
    tests,
    failfast=False,
    separate=True,
    combined_context_name=None,
    force_gh_status=None,
    force_gh_comment=None,
    force_email=None,
    force_warning_level=None,
    print_report=True,
    there_can_be_only_one=True,
    raise_if_errors=True,
    max_table_width=-1,
    set_gh_status_keywords=None,
    post_comment_to_github_keywords=None,
    ut_verbosity=1,
    ut_stream=None,
    only_these=None,
    **kw,
):
    """
    Utility for running a set of OMFITtest-based test suites
    Example usage:
    >> class TestOmfitThingy(OMFITtest):
    >>     def test_thingy_init(self):
    >>         assert 1 != 0, '1 should not be 0'
    >> manage_tests(TestOmfitThingy)
    :param tests: OMFITtest instance or list of OMFITtest instances
        Define tests to run
    :param failfast: bool
        Passed straight to unittest. Causes execution to stop at the first error
        instead of running all tests and reporting which pass/fail.
    :param separate: bool
        Run each test suite separately and give it a separate context. Otherwise
        they'll have a single combined context.
    :param combined_context_name: string [optional]
        If not separate, override the automatic name
        ('+'.join([test.__class__.__name__) for test in tests]))
    :param force_gh_status: bool [optional]
        If None, use GitHub status post settings from the items in tests.
        If True or False: force status updates on/off.
    :param force_gh_comment: bool or int [optional]
        Like force_gh_status, but for comments, and with extra options:
        Set to 2 to post comments only on failure.
    :param force_email: bool [optional]
        None: notify_email on/off defined by test.
        True or False: force email notifications to be on or off.
    :param force_warning_level: int [optional]
        None: warning_level defined by test.
        int: Force the warning level for all tests to be this value.
    :param print_report: bool
        Print to console / command line?
    :param there_can_be_only_one: True, None or castable as int
        This value is interpreted as a set of binary flags. So 6 should be interpreted as options 2 and 4 active.
        A value of True is converted into 255 (all the bits are True, including unused bits).
        A value of None is replaced by the default value, which is True or 255.
        A float or string will work if it can be converted safely by int().
        1: Any of the flags will activate this feature. The 1 bit has no special meaning beyond activation.
            If active, old github comments will be deleted. The newest report may be retained.
        2: Limit deletion to reports that match the combined context of the test being run.
        4: Only protect the latest comment if it reports a failure; if the last test passed, all comments may be deleted
        8: Limit scope to comments with matching username
    :param raise_if_errors: bool
        Raise an OMFITexception at the end if there were any errors
    :param max_table_width: int
        Width in columns for the results table, if applicable. If too small,
        some columns will be hidden. Set to -1 to allow the table to be any
        width.
    :param set_gh_status_keywords: dict [optional]
        Dictionary of keywords to pass to set_gh_status()
    :param post_comment_to_github_keywords: dict [optional]
        Dictionary of keywords to pass to post_comment_to_github(), like thread, org, repository, and token
    :param ut_verbosity: int
        Verbosity level for unittest. 1 is normal, 0 suppresses ., E, and F reports from unittest as it runs.
    :param ut_stream:
        Output stream for unittest, such as StingIO() or sys.stdout
    :param only_these: string or list of strings
        Names of test units to run (with or without leading `test_`). Other test units will not be run. (None to run all tests)
    :param kw: quietly ignores other keywords
    :return: tuple containing:
        list of unittest results
        astropy.table.Table instance containing test results
        string reporting test results, including a formatted version of the table
    """
    import astropy.table
    import functools
    from omfit_classes.utils_base import encrypt, decrypt
    assert decrypt(encrypt('OMFIT')) == 'OMFIT'
    # Sanitize
    tests = tolist(tests)  # It's just easier to enter 'Test' than ['Test'] for only one. Let's allow that.
    if only_these is not None:  # It's just easier to enter 'asd' than ['test_asd']. Let's allow that.
        only_these = tolist(only_these)
        only_these = list(map(lambda x: x if x.startswith('test_') else 'test_' + x, only_these))
    server_name = get_server_name()
    set_gh_status_keywords = set_gh_status_keywords or {}
    post_comment_to_github_keywords = post_comment_to_github_keywords or {}
    for test in tests:
        test.set_gh_status_keywords = set_gh_status_keywords
    # Figure out flags
    gh_status = [
        force_gh_status or (test.notify_gh_status * bool(test.gh_overall_status_after_individual or (test.gh_individual_status != 1)))
        for test in tests
    ]
    gh_comment = [force_gh_comment or test.notify_gh_comment for test in tests]
    notify_email = [force_email or test.notify_email for test in tests]
    warning_levels = [force_warning_level or test.warning_level for test in tests]
    if there_can_be_only_one is True or there_can_be_only_one is None:
        # True turns on all the bits, even unused ones for future compatibility.
        # None does default behavior, which is True which is 255
        there_can_be_only_one = 255
    else:
        there_can_be_only_one = int(there_can_be_only_one)
    for test in tests:
        test.force_gh_status = force_gh_status
    # Look up more settings from the tests
    skipped_topics = [test.topics_skipped for test in tests]
    table_sorting_columns = common_in_list(['\n'.join(test.table_sorting_columns) for test in tests]).split('\n')
    # Prepare context information
    contexts = [test.__name__ + ('*' if len(test.topics_skipped) or only_these is not None else '') for test in tests]
    pyver_header = 'p' + '.'.join(map(str, sys.version_info[0:2]))
    ut_stream = ut_stream or StringIO()
    def post_pending_status():
        for ghs, context_ in zip(gh_status, contexts):
            if ghs:
                desc = test_comp_info_summary(short=True)
                resp = set_gh_status(state='pending', context=context_, description=desc, **set_gh_status_keywords)
                printd(
                    'Got response {res:} when posting GitHub status state = {s:}, context = {c:}, '
                    'description = {d:}'.format(s='pending', c=context_, d=desc, res=resp),
                    topic='omfit_testing',
                )
    # Run tests and collect results and settings
    meta_data = {}
    if separate:
        # Do each OMFITtest instance separately and give it its own context
        contexts = ['{} {}@{}'.format(pyver_header, context, server_name) for context in contexts]
        post_pending_status()
        results = []
        results_tables = []
        suite_tests = []
        for i, test in enumerate(tests):
            with setup_warnings(warning_levels[i]):
                if only_these is None:
                    suite = unittest.makeSuite(test)
                else:
                    suite = unittest.TestSuite()
                    for method in tolist(only_these):
                        try:
                            suite.addTest(test(method))
                        except ValueError as excp:
                            printd(excp)
                suite_tests += suite._tests
                results += [unittest.TextTestRunner(verbosity=ut_verbosity, stream=ut_stream, failfast=failfast).run(suite)]
            # Add results tables (probably just a one element list of 1 table) to the list
            for suite_test in suite_tests:
                meta_data.update(suite_test.meta_data)
                if getattr(suite_test, 'results_tables', [None])[0] is not None:
                    results_tables += copy.copy(suite_test.results_tables)
                    suite_test.results_tables = [None]  # Reset
                    printd('Cleared results table', topic='omfit_testing')
    else:
        # Merge all OMFITtest instances into one context containing all of the sub-tests together
        gh_status = [functools.reduce(lambda x, y: x | y, gh_status)]
        gh_comment = [functools.reduce(lambda x, y: x | y, gh_comment)]
        contexts = [combined_context_name or '{} {}@{}'.format(pyver_header, '+'.join(contexts), server_name)]
        post_pending_status()
        # Form test suite and grab list of tests within the suite so they can be interrogated later
        # Making a suite with makeSuite() & then adding to it with suite.addTest() can result in multiple copies of test
        # So do this instead:
        if only_these is None:
            suite_list = []
        else:
            suite_list = [unittest.TestSuite()]
        for test in tests:
            if only_these is None:
                suite_list.append(unittest.TestLoader().loadTestsFromTestCase(test))
            else:
                for method in tolist(only_these):
                    try:
                        suite_list[0].addTest(test(method))
                    except ValueError as excp:
                        printe(excp)
        combo_suite = unittest.TestSuite(suite_list)
        suite_tests = []
        for suite in suite_list:
            suite_tests += suite._tests
        with setup_warnings(max(warning_levels)):
            results = [unittest.TextTestRunner(verbosity=ut_verbosity, stream=ut_stream, failfast=failfast).run(combo_suite)]
        new_skip = []
        for context, skipped in zip(contexts, skipped_topics):
            new_skip += ['.'.join([context, skip]) for skip in skipped]
        skipped_topics = [new_skip]  # [[context+'.'+skipped for context, skipped in zip(contexts, skipped_topics)]]
        notify_email = [np.any(notify_email)]
        results_tables = []
        for suite_test in suite_tests:
            meta_data.update(suite_test.meta_data)
            printd(
                'suite_test = {}, has results table = {}, type = {}'.format(
                    suite_test, hasattr(suite_test, 'results_tables'), type(suite_test)
                ),
                topic='omfit_testing',
            )
            results_tables += copy.copy(getattr(suite_test, 'results_tables', [None]))
        results_tables = [res_tab for res_tab in results_tables if res_tab is not None]
    printd('meta_data = {}'.format(meta_data), topic='omfit_testing')
    warn_context = '{}: {}'.format('separate' if separate else 'combined', ', '.join([test.__name__ for test in tests]))
    if len(results_tables):
        a = []
        while (len(a) == 0) and len(results_tables):
            a = results_tables.pop(0)  # Skip empty tables, if any
        results_table = a
        for rt in results_tables:
            if len(rt):
                with warnings.catch_warnings():
                    warnings.filterwarnings('ignore', category=DeprecationWarning)
                    results_table = astropy.table.join(results_table, rt, join_type='outer')
        # noinspection PyBroadException
        try:
            results_table = results_table.group_by(table_sorting_columns)
        except Exception as excp:
            printw('Error sorting test results table. Columns may not be in the preferred order. ({})'.format(warn_context))
            excp_stack = traceback.format_exception(*sys.exc_info())
            printe('    The error that caused this was {}'.format(excp))
            printe(''.join(excp_stack))
        column_names = list(results_table.columns.keys())
        alignments = ['<' if cn == 'Notes' else '>' for cn in column_names]
        if len(results_table):
            rtf = '\n'.join(results_table.pformat(align=alignments, max_width=max_table_width, max_lines=-1))
            exp_res = np.sum([result.testsRun - len(result.skipped) for result in results])
            if len(results_table) != exp_res:
                printw(
                    'ERROR forming results table. Expected {} results, but table length is {}. ({})'.format(
                        exp_res, len(results_table), warn_context
                    )
                )
        else:
            rtf = None
    else:
        results_table = rtf = None
    # Prepare reports
    the_gh_comment = ''
    email_body = ''
    errors = 0
    total_tests_run = 0
    total_tests_skipped = 0
    skipped_comment = False
    # There will usually be one result. To have more, must supply list of test suites and set separate=False.
    for i in range(len(results)):
        # Gather general information
        result = results[i]
        context = contexts[i]
        failed_tests = [f[0] for f in result.failures] + [f[0] for f in result.errors]
        failed_long_names = [ft.__class__.__name__ + '.' + getattr(ft, '_testMethodName', '?') for ft in failed_tests]
        failed_names = [getattr(ft, '_testMethodName', '?') for ft in failed_tests]
        meta_data_chunks = [meta_data.get(fln, None) for fln in failed_long_names]
        error_reports = [f[1] for f in result.failures] + [f[1] for f in result.errors]
        error_summaries = [[a for a in er.split('\n') if len(a)][-1] for er in error_reports]
        errors += len(failed_tests)
        printd(
            'manage_tests: i = {}, result = {}, failed_long_names = {}, failed_names = {}, failed_tests = {}, '
            'error_reports = {}, meta_data_chunks = {}'.format(
                i, result, failed_long_names, failed_names, failed_tests, error_reports, meta_data_chunks
            ),
            topic='omfit_testing',
        )
        tests_run = result.testsRun
        tests_skipped = len(result.skipped)
        total_tests_run += tests_run
        total_tests_skipped += tests_skipped
        skiplist = [ssk[0]._testMethodName for ssk in result.skipped]
        state = (len(failed_tests)) == 0
        conclusion = context + (' PASSED! Congratulations!' if state else ' FAILED!')
        comp_info = test_comp_info_summary(short=True)
        if len(skipped_topics[i]) or tests_skipped:
            skipped_topics_report_short = ', skip{}t:{}i'.format(len(skipped_topics[i]), tests_skipped)
        else:
            skipped_topics_report_short = ''
        skipped_topics_report = '{} test topics were skipped'.format(len(skipped_topics[i]))
        skipped_topics_report_2 = '{} individual tests were skipped'.format(tests_skipped)
        skipped_topics_report_long = skipped_topics_report + (
            ':\n  - ' + '\n  - '.join(skipped_topics[i]) if len(skipped_topics[i]) else ''
        )
        skipped_topics_report_long += '\n\n' + skipped_topics_report_2 + (':\n  - ' + '\n  - '.join(skiplist) if tests_skipped else '')
        skipped_topics_report += skipped_topics_report_2
        test_count_report = '{} individual tests were considered'.format(tests_run)
        test_count_report += '\n{} individual tests were actually executed'.format(tests_run - tests_skipped)
        # Handle GitHub status
        if gh_status[i]:
            description = comp_info + ' ran {} test(s)'.format(tests_run) + skipped_topics_report_short
            for ft, er in zip(failed_names, error_summaries):
                description += '\n{}: {}'.format(ft, er)
            try:
                response = set_gh_status(state=state, context=context, description=description, **set_gh_status_keywords)
            except ValueError:  # Bad/missing token (expected common problem) results in ValueError
                print('GitHub comment post failed!')
            else:
                printd(
                    'Got response {res:} when posting GitHub status state = {s:}, context = {c:}, '
                    'description = {d:}'.format(s=state, c=context, d=description, res=response),
                    topic='omfit_testing',
                )
        # Handle GitHub comments
        ghc_bits = [int(bb) for bb in np.binary_repr(int(gh_comment[i]), 8)[::-1]]
        if ghc_bits[0] or (ghc_bits[1] and len(failed_tests)):
            comment_details_form = '<details><summary>{}</summary>\n{}\n</details>\n'
            comment_part = '### {conclusion:}\n{tests:}\n{skip:}\n'.format(
                conclusion=conclusion,
                tests=test_count_report,
                skip=comment_details_form.format('skipped tests', skipped_topics_report_long),
            )
            if len(failed_tests):
                comment_part += '<details><summary>Exception reports</summary>'
                for ft, er, mdc in zip(failed_long_names, error_reports, meta_data_chunks):
                    comment_part += '\n{}\n\n```\n{}```\n\nMeta data for {}:\n{}\n'.format(ft, er, ft, mdc)
                comment_part += '</details>\n\n'
            the_gh_comment += comment_part
        elif ghc_bits[1] and (not len(failed_tests)):
            skipped_comment = True
        # Handle email body
        if notify_email[i]:
            email_part = '{conclusion:}\n{tests:}\n{skip:}'.format(
                conclusion=conclusion, tests=test_count_report, skip=skipped_topics_report_long
            )
            if len(failed_tests):
                email_part += '\nException reports:\n-----\n'
                for ft, er, mdc in zip(failed_long_names, error_reports, meta_data_chunks):
                    email_part += '\n{}\n\n{}\n\nMeta data for {}:\n{}\n'.format(ft, er, ft, mdc)
            email_body += email_part + '-----'
        # Print to console
        if print_report:
            if state:
                printi(conclusion)
                print(test_count_report)
                print(skipped_topics_report_long)
            else:
                printe(conclusion)
                print(test_count_report)
                print(skipped_topics_report_long)
                print('Exception reports from failed tests in {}'.format(context))
                for ft, er, mdc in zip(failed_tests, error_reports, meta_data_chunks):
                    print(ft)
                    printe(er)
                    print('Meta data for {}:'.format(ft))
                    printw(mdc)
    # Finalize reports
    notice = 'Some tests failed!' if errors > 0 else 'All tests passed!'
    total_test_count_report = '{} individual tests were executed & {} were skipped while running all test cases'.format(
        total_tests_run, total_tests_skipped
    )
    # Finalize and post GitHub comment (if applicable)
    gh_test_report_update(
        the_gh_comment=the_gh_comment,
        contexts=contexts,
        gh_comment=gh_comment,
        there_can_be_only_one=there_can_be_only_one,
        rtf=rtf,
        errors=errors,
        skipped_comment=skipped_comment,
        post_comment_to_github_keywords=post_comment_to_github_keywords,
        total_test_count_report=total_test_count_report,
    )
    # Finalize and send email (if applicable)
    if len(email_body):
        prn = get_pull_request_number()
        remote, org, repository, branch = get_gh_remote_org_repo_branch()
        if prn is None:
            pr_info = 'Did not find an open pull request for this branch.'
        else:
            pr_info = 'Open pull request for this branch: {prn:} . URL = https://github.com/{org:}/{repo:}/pull/{prn:}'
        pr_info = pr_info.format(prn=prn, repo=repository, org=org)
        email_body = '{n:}\n\n{e:} errors were detected while testing {c:}\n\n{run:}\n{b:}\n=====\n{ci:}\n{pr:}'.format(
            n=notice,
            e=errors,
            c=' + '.join(contexts),
            r=total_test_count_report,
            b=email_body,
            ci=test_comp_info_summary(long_form=True),
            run=total_test_count_report,
            pr=pr_info,
        )
        if results_table is not None:
            email_body += '\n\n' + rtf
        email_subject = '{notice:} {context:} had {errors:} errors'.format(notice=notice, context=' + '.join(contexts), errors=errors)
        printd('Sending email: subject = {}, body = {}'.format(email_subject, email_body), topic='omfit_testing')
        send_email(to=MainSettings['SETUP']['email'], subject=email_subject, message=email_body, fromm=MainSettings['SETUP']['email'])
    # Closing remarks to console
    err_count = '\n{er:} error(s) were detected while testing {ct:}'.format(er=errors, ct=' + '.join(contexts))
    error_function = printe if errors > 0 else printi
    report_strings = [err_count]
    report_functions = [error_function]
    if results_table is not None:
        report_strings += ['\nTable of results for all tests:\n\n{}'.format(rtf), err_count]
        report_functions += [print, error_function]
    if print_report:
        for report_function, report_string in zip(report_functions, report_strings):
            report_function(report_string)
    if raise_if_errors and errors > 0:
        raise OMFITexception('{ct:} failed with {er:} errors'.format(er=errors, ct=' + '.join(contexts)))
    return results, results_table, '\n'.join(report_strings) 
def run_test_series(exit_at_end=False, post_gh_comment=False, post_gh_status=False, tests=None, test_plan_file=None, script_timeout=0):
    """
    Run series of regression tests and store results directly in OMFIT tree
    :param exit_at_end: bool
        Quit python session on exit
    :param post_gh_comment: bool
        Post comment to github
    :param post_gh_status: bool or 'final'
        Post status to github for each test (or only final status of whole test with 'final')
    :param tests: list of strings
        names of regression tests to be run
    :param test_plan_file: string
        Filename with a YAML file containing a test plan
    :param script_timeout: int
        Number of seconds before an exception is thrown to kill the script for taking too long. 0 to disable.
    """
    from omfit_classes.omfit_python import OMFITpythonTest
    from omfit_classes.omfit_base import OMFITtree
    # Tuning parameters for prioritization system. Higher priority tests run before lower priority tests. The group of
    # tests that failed last time is sorted within itself and goes before the group of tests that passes last time.
    initial_priority = 1  # When new tests are added to the series, they start with this priority
    # This is just a backup. The setting that's more likely to be used is in bin/verify_build.py
    priority_fail_increment = 1  # x>0 Increases priority on fail; repeat offenders tend to run first
    priority_success_multiplier = 0.7165  # 0<x<1 Priority decays on success; prioritize recent problems
    priority_cap = 5  # Priority can't wind up too far
    priority_failfast_thresh = 1  # Elevated priority triggers failfast behavior, even if test passed last time.
    #                               Set higher than cap to disable failfast on tests that passed on the last iteration.
    priority_failfast_thresh_include = priority_failfast_thresh  # High priority tests aren't skipped by failfast
    priority_speed_bonus_scale = 0.01  # Maximum size of the speed bonus
    priority_speed_bonus_timescale = 10.0  # Duration (s) that gets max speed bonus; no benefit for shorter than this
    if test_plan_file is not None:
        with open(test_plan_file, 'r') as f:
            test_plan = yaml.load(f.read(), Loader=yaml.Loader)
            exit_at_end = test_plan.get('exit_at_end', exit_at_end)
            post_gh_comment = test_plan.get('post_gh_comment', post_gh_comment)
            post_gh_status = test_plan.get('post_gh_status', post_gh_status)
            script_timeout = test_plan.get('script_timeout', script_timeout)
    else:
        tests = tests or []
        test_plan = {'tests': [{"name": test} for test in tests]}
    gh = OMFIT['MainSettings']['SERVER']['GITHUB_username'] and (post_gh_comment or post_gh_status)
    gh_all = gh and (post_gh_status != 'final')
    if gh:
        print('GitHub interaction was requested. Making sure you have a valid token (or else Exception!)...')
        get_OMFIT_GitHub_token()
        # noinspection PyBroadException
        try:
            test_branch = repo.active_branch()[0]
        except Exception:
            test_branch = '<<<Failed to look up current branch>>>'
        # Get a hostname for the server for use in setting context
        hostname = socket.gethostname()
        servername = get_server_name(hostname)
        pyverheader = 'p' + '.'.join(map(str, sys.version_info[0:2]))
        # TODO: Remove context_legacy after merging requires the new context.
        context_legacy = '{} run_test_series@{}'.format(pyverheader, servername)
        context = 'regression_test_runs@{}'.format(servername)
        c_fn = OMFITsrc + os.path.sep + '.regression_test_runs.context'
        with open(c_fn, 'w') as f:
            f.write(context)
        comp_info_start = 'Executing on host {h:}\nTest started {t:}\nRunning on branch {b:}'.format(
            h=hostname, t=datetime.datetime.now(), b=test_branch
        )
        if post_gh_status:
            set_gh_status(state='pending', context=context, description=comp_info_start)
            # TODO: Remove the next line after merging requires the new context.
            set_gh_status(state='pending', context=context_legacy, description=comp_info_start)
    else:
        servername = hostname = test_branch = pyverheader = context = comp_info_start = None
    success = []
    # regression scripts, errors and times
    OMFIT['regression_errors'] = errors = SortedDict()
    OMFIT['regression_allowed_errors'] = allowed_errors = SortedDict()
    OMFIT['regression_stacks'] = stacks = SortedDict()
    OMFIT['regression_times'] = times = SortedDict()
    OMFIT['regression_scripts'] = OMFITtree(sorted=True)
    # record what keys are in the OMFIT tree now to cleanup later
    old_keys = list(OMFIT.keys())
    def timeout_handler(signum, frame):
        signal.alarm(0)
        raise KeyboardInterrupt("Script timed out")
    signal.signal(signal.SIGALRM, timeout_handler)
    test_plan['results'] = {}
    print('Test plan:')
    for test in test_plan['tests']:
        # Define status and priority prior to failfast check, so priorities aren't forgotten when tests are skipped
        test_plan['results'][test['name']] = dict(status='not started', priority=test.get('priority', initial_priority))
        print(test['name'], 'starting priority:', test_plan['results'][test['name']]['priority'])
    last_commit_check_time = time.time()
    tests_since_last_commit_check = 0
    for test in test_plan['tests']:
        # Check github for a newer commit and abort if it's newer than the current ones if the option is enabled.
        if test_plan.get('newer_commit_abort', False) and (
            (time.time() - last_commit_check_time > test_plan.get('newer_commit_check_period', 120))
            or (tests_since_last_commit_check > test_plan.get('newer_commit_test_num_period', 10))
        ):
            print("Checking for newer github commit.")
            if not on_latest_gh_commit():
                print("Newer commit to branch detected, aborting remaining tests.")
                break
            tests_since_last_commit_check = 0
            last_commit_check_time = time.time()
        # If we are on the first previously non-failing test and
        # There have been other failing tests beforehand, abort to
        # save time (unless the test has built up high enough priority).
        # If there haven't been any errors yet then test everything else in the plan
        if (
            test_plan.get('fail_fast', False)
            and (test.get('priority', initial_priority) < priority_failfast_thresh_include)
            and (test.get('last_status', 'not run') != 'failure')
        ):
            if len(errors) > 0:
                break
            else:
                test_plan['fail_fast'] = False
        test_plan['results'][test['name']]['status'] = 'incomplete'
        # Individual tests can change their own warning level. 1 is a tolerant choice for generic tests.
        with setup_warnings(1):
            OMFIT['regression_scripts'][test['name']] = OMFITpythonTest(
                OMFITsrc + '/../regression/{}.py'.format(test['name']), modifyOriginal=True
            )
        if gh_all:
            t_context = '{} {}@{}'.format(pyverheader, test['name'], servername)
            set_gh_status(state='pending', context=t_context, description=comp_info_start)
        else:
            t_context = None
        t0 = time.time()
        try:
            print('\n' * 3)
            print('=' * 80)
            print('START OF TEST: %s' % test['name'])
            print('=' * 80)
            print('\n' * 3)
            timeout = (
                test_plan.get('yaml_timeout_factor', 0) * test.get('runtime', 0)
                if test_plan.get('yaml_timeout_factor', 0)
                else script_timeout
            )
            signal.alarm(timeout)
            OMFIT['regression_scripts'][test['name']].run(**test.get('params', {}))
        except (KeyboardInterrupt, Exception) as excp:
            # Need to disable alarm both here and after else: instead of the finally block
            # in case getting stack and posting to github moves us over the timeout threshold.
            signal.alarm(0)
            etype, value, tb = sys.exc_info()
            excp_stack = ''.join(traceback.format_exception(etype, value, tb))
            print(excp_stack)  # print the full stack on failure
            allow_failure = test.get('allow_failure', None)
            print(f'allow_failure = {allow_failure}')
            stacks[test['name']] = excp_stack
            test_plan['results'][test['name']]['stack'] = excp_stack
            if allow_failure:
                allowed_errors[test['name']] = repr(excp)
                test_plan['results'][test['name']]['status'] = 'allowed failure'
            else:
                errors[test['name']] = repr(excp)
                test_plan['results'][test['name']]['status'] = 'failure'
                if test_plan['results'][test['name']]['priority'] >= priority_failfast_thresh:  # Priority before increment
                    test_plan['fail_fast'] = True
                    print('fail_fast has been activated because this test has a poor track record')
                test_plan['results'][test['name']]['priority'] += priority_fail_increment
                if test_plan['results'][test['name']]['priority'] > priority_cap:
                    test_plan['results'][test['name']]['priority'] = priority_cap
            if gh_all:
                t_desc = comp_info_start
                if isinstance(excp, KeyboardInterrupt):
                    t_desc = 'Timeout:' + t_desc
                set_gh_status(state=False, context=t_context, description=t_desc)
        else:
            # Disabling the alarm should happen before anything else as to not risk moving over the timeout threshold.
            signal.alarm(0)
            if False:
                mod_list = sorted(list(set([mod['ID'] for mod in list(OMFIT.moduleDict().values())])))
                test_content = OMFIT['regression_scripts'][test['name']].read()
                if 'modules:' in test_content:
                    OMFIT['regression_scripts'][test['name']].write(re.sub('modules:.*\n', 'modules: %r\n' % mod_list, test_content))
                else:
                    OMFIT['regression_scripts'][test['name']].write(
                        re.sub('labels:(.*)\n', 'labels:\1\nmodules: %r\n' % mod_list, test_content)
                    )
            success.append(test['name'])
            test_plan['results'][test['name']]['priority'] *= priority_success_multiplier
            test_plan['results'][test['name']]['status'] = 'success'
            if gh_all:
                set_gh_status(state=True, context=t_context, description=comp_info_start)
        finally:
            times[test['name']] = time.time() - t0
            # Slightly increase priority for fast tests so they go first; if all tests had an even failure potential,
            # this would reveal problems sooner.
            priority_speed_bonus = priority_speed_bonus_scale * min([1.0, priority_speed_bonus_timescale / times[test['name']]])
            test_plan['results'][test['name']]['priority'] += priority_speed_bonus
            tests_since_last_commit_check += 1
            test_plan['results'][test['name']]['time'] = times[test['name']]
            close('all')
            os.environ['OMFIT_DEBUG'] = "0"
            for k in list(OMFIT.keys()):
                if k not in old_keys:
                    del OMFIT[k]
    # update test plan with results
    class YAMLDumper(yaml.Dumper):
        def increase_indent(self, flow=False, indentless=False):
            return super().increase_indent(flow, False)
    if test_plan_file:
        test_plan_yaml_update = yaml.dump(test_plan, Dumper=YAMLDumper, default_flow_style=False)
        print("Test Plan With Results:\n" + test_plan_yaml_update)
        with open(test_plan_file, 'w') as f:
            f.write(test_plan_yaml_update)
    # print test summary results
    print('\n' * 3)
    print('=' * 80)
    print('TESTS SUMMARY')
    print('=' * 80)
    print('\n' * 3)
    out_mess = ['Timing information:', '-' * 40]
    for i in np.argsort(list(times.values())):
        out_mess.append((test_plan['tests'][i]['name'], list(times.values())[i]))
    total_tests = len(test_plan['tests'])
    remaining_tests = total_tests - len(success) - len(errors) - len(allowed_errors)
    out_mess.append('-' * 40)
    out_mess.append('{}/{} Tests succeeded'.format(len(success), total_tests))
    out_mess.append('{}/{} Tests failed'.format(len(errors), total_tests))
    out_mess.append(f'{len(allowed_errors)}/{total_tests} Tests failed but were tolerated anyway.')
    out_mess.append('{}/{} Tests not run'.format(remaining_tests, total_tests))
    if len(errors):
        for test, error in errors.items():
            out_mess.append('== Error of %s ==' % test)
            out_mess.append(error)
    print('\n'.join(map(str, out_mess)))
    # Post final comment/status to GitHub
    if gh:
        if post_gh_comment:
            post_comment_to_github(comment='```\n%s\n```' % ('\n'.join(map(str, out_mess))))
        if post_gh_status:
            comp_info = '{s:}/{tot:} success on host {h:} at {t:} on branch {b:}'.format(
                s=len(success), tot=len(test_plan['tests']), h=hostname, t=time.ctime(), b=test_branch
            )
            gh_state = not len(errors)
            if gh_state and remaining_tests:
                comp_info = 'Aborted early due to new github commit. Host {h:} at {t:} on branch {b:}'.format(
                    h=hostname, t=time.ctime(), b=test_branch
                )
                gh_state = 'pending'
            set_gh_status(state=gh_state, context=context, description=comp_info)
            # TODO: Remove the next line after merging requires the new context.
            set_gh_status(state=gh_state, context=context_legacy, description=comp_info)
            if os.path.exists(c_fn):
                os.remove(c_fn)
    print('run_test_series() completed.')
    if exit_at_end:
        if len(errors):
            sys.exit(1)
        sys.exit(0)
############################################
if __name__ == '__main__':
    test_classes_main_header()
    # To run the unit tests, provide a number > 0 or the word test as a command line argument while calling
    if len(sys.argv) > 1:
        try:
            test_flag = int(sys.argv[1])
        except ValueError:
            test_flag = int(sys.argv[1] == 'test')
    else:
        test_flag = 0
    if test_flag > 0:
        sys.argv.pop(1)
        unittest.main(failfast=False)
    else:
        pass
elif __name__ == 'omfit_classes.omfit_python':
    with open(OMFITsrc + '/../regression/test_omfit_testing.py') as f_:
        exec(compile(f_.read(), OMFITsrc + '/../regression/test_omfit_testing.py', 'exec'))