Source code for omfit_classes.omfit_github

    # framework is running
    from .startup_choice import *
except ImportError as _excp:
    # class is imported by itself
    if (
        'attempted relative import with no known parent package' in str(_excp)
        or 'No module named \'omfit_classes\'' in str(_excp)
        or "No module named '__main__.startup_choice'" in str(_excp)
        from startup_choice import *

import requests
import datetime
from urllib.parse import quote_plus
import numpy as np

__all__ = [

[docs]def get_git_branch_info( remote=None, org=None, destination_org=None, repository=None, branch=None, url=None, omfit_fallback=True, no_pr_lookup=False, return_pr_destination_org=True, server=None, ): """ Looks up local name for upstream remote repo, GitHub org, repository name, current branch, & open pull request info All parameters are optional and should only be provided if trying to override some results :param remote: string [optional] Local name for the upstream remote. If None, attempts to lookup with method based on git rev-parse. :param org: string [optional] The organization that the repo is under, like 'gafusion'. If None, attempts to lookup with method based on git rev-parse. Falls back to gafusion on failure. :param destination_org: string [optional] Used for cross-fork pull requests: specify the destination org of the pull request. The pull request actually exists on this org, but it is not where the source branch lives. If None it defaults to same as org :param repository: string [optional] The name of the repo on GitHub. If None, attempts to lookup with method based on git rev-parse. Falls back to OMFIT-source on failure. :param branch: string [optional] Local/remote name for the current branch NOTE: there is an assumption that the local and upstream branches have same name :param url: string [optional] Provided mainly for testing. Overrides the url that would be returned by `git config --get remote.origin.url`. :param omfit_fallback: bool Default org and repository are gafusion and OMFIT-source instead of None and None in case of failed lookup. :param no_pr_lookup: bool Improve speed by skipping lookup of pull request number :param return_pr_destination_org: bool If an open pull request is found, changes remote, org, repository, and branch to match the destination side of the pull request. If there is no pull request or this flag is False, remote/org/repo/branch will correspond to the source. :param server: string [optional] The server of the remote - usually, but could also be something like :return: tuple containing 4 strings and a dict, with elements to be replaced with None for lookup failure remote (str), org (str), repository (str), branch (str), pull_request_info (dict) """ def _parse_remote_url(url): """ A utility function to parse a git remote url :return: protocol, server, org, repo """ if not url.endswith('.git'): url = url + '.git' # remote_re = r'(git@(.+?):|https://(.+?)/)(.+?)/(.+?)\.git$' m =, url) if m: server = if else return, server,, raise ValueError('Unable to parse remote url %s' % url) pr_info = None # Default unless overwritten by something real if remote is not None and branch is not None: revparse = '{}/{}'.format(remote, branch) else: revparse = repo('rev-parse --abbrev-ref @{u}') # Jenkins server defines the GIT_BRANCH environment variable if len(revparse.split('/')) < 2: revparse = os.environ.get('GIT_BRANCH', revparse) # If this is a pr branch, then we can directly look up the relevant info (if requested) if '/pr/' in revparse and not no_pr_lookup and branch is None: dest_repo, _, number, __ = revparse.split('/') assert _ == 'pr', 'Upstream pr pattern %s not parsed correctly' % revparse assert __ == 'head', 'Upstream pr pattern %s not parsed correctly' % revparse url = repo('config --get remote.{}.url'.format(dest_repo)) protocol_, server_, org_, repository_ = _parse_remote_url(url) pr_info = OMFITgithub_paged_fetcher(org=org_, path='pulls/%s' % number, repository=repository_).fetch()[0] if not return_pr_destination_org: printw('pr branch detected, but return_pr_destination_org is False') branch = branch or pr_info.get('head', {}).get('ref', None) full_name = pr_info.get('head', {}).get('repo', {}).get('full_name', "").split('/') org = org or (full_name[0] if len(full_name) is 2 else None) repository = repository or (full_name[1] if len(full_name) is 2 else None) else: remote = remote or dest_repo org = org or org_ repository = repository or repository_ branch = branch or pr_info.get('base', {}).get('ref', None) return remote, org, repository, branch, pr_info # Handle case of local branch with no upstream: we can't do much here if len(revparse.split('/')) < 2: printw('Warning: failed to determine remote/org/repo/branch info from git rev-parse: {}'.format(revparse)) print('You probably are on a local branch that does not have an upstream.') branch = branch or repo.active_branch()[0] if omfit_fallback: if org is not None and repository is not None: print('org and repository were specified, so they are fine') else: print('org and repository will default to gafusion and OMFIT-source if not overridden') org = org or 'gafusion' repository = repository or 'OMFIT-source' printw('Returning: ({}, {}, {}, {}, {})'.format(remote, org, repository, branch, pr_info)) return remote, org, repository, branch, pr_info # Get primary remote and upstream branch information remote_ = revparse.split('/')[0] branch_ = revparse.split('/')[-1] remote = remote or remote_ branch = branch or branch_ url = url or repo('config --get remote.{}.url'.format(remote)) protocol_, server_, org_, repository_ = _parse_remote_url(url) org = org or org_ repository = repository or repository_ server = server or server_ if no_pr_lookup: printd('Returning remote, org, repository, and branch before pull request lookup; pr_info will be None') return remote, org, repository, branch, pr_info # Build a list of remotes to check, starting with the first one destination_org = destination_org or org remotes = [remote] urls = [url] destination_orgs = [destination_org] repositories = [repository] servers = [server] for new_remote in repo('remote').split('\n'): if new_remote not in remotes: url = repo('config --get remote.{}.url'.format(new_remote)) protocol, server, org_, repository_ = _parse_remote_url(url) remotes.append(new_remote) urls.append(url) destination_orgs.append(org_) repositories.append(repository_) servers.append(server) printd(' Found new remote to check: {} with org = {}, repo = {}'.format(new_remote, org, repository)) # Make sure gafusion/OMFIT-source is in the list if 'gafusion' not in destination_orgs: destination_orgs.insert(1, 'gafusion') remotes.insert(1, None) repositories.insert(1, 'OMFIT-source') branches.insert(1, 'unstable') servers.insert(1, '') # Prioritize gafusion and check it first or second, but not after a bunch of other forks if 'gafusion' not in destination_orgs[0:2]: g_idx = destination_orgs.index('gafusion') for thing in [destination_orgs, remotes, repositories, servers, urls]: thing.insert(1, thing.pop(g_idx)) # Loop through list of possible remote orgs and search for an open pull request head = '{org:}:{branch}'.format(org=org, branch=branch) # The source branch, not the destination i = j = 0 info = [{'empty': None}] printd('Searching for open pull requests for {}/{}:{} with destination_org in {}'.format(org, repository, branch, destination_orgs)) while i < len(destination_orgs) and info[0].get('number', None) is None: if servers[i] != '': i += 1 continue a = OMFITgithub_paged_fetcher(org=destination_orgs[i], path='pulls', selection=dict(head=head), repository=repository) j = i info = a.fetch() if not len(info): info = [{}] if not isinstance(info[0], dict): info = [{}] printd( " PR lookup attempt i = {i:}, destination_org = {do:}, info[0].get('number', None) = {num:}".format( i=i, do=destination_orgs[i], num=info[0].get('number', None) ) ) info[0].setdefault('org', destination_orgs[i]) i += 1 if info[0].get('number', None) is None: printd('This branch ({}:{}) does not seem to have an open pull request.'.format(org, branch)) else: pr_info = info[0] if return_pr_destination_org: printd( 'Open pull request {}#{} found! Reassigning remote, org, repository, and branch to correspond to ' 'the destination side of the pull request.'.format(destination_orgs[j], pr_info.get('number', None)) ) remote = remotes[j] org = destination_orgs[j] repository = repositories[j] branch = pr_info.get('base', {}).get('ref', None) printd(' remote = {remote:}, org = {org:}, repository = {repository:}, branch = {branch:}'.format(**locals())) return remote, org, repository, branch, pr_info
[docs]def on_latest_gh_commit(): """ Returns true if the current working commit is the same as the github latest commit for this branch. """ remote, org, repository, branch, pr_info = get_git_branch_info(return_pr_destination_org=False) if org and repo and branch: headers = {'Authorization': 'token ' + get_OMFIT_GitHub_token()} url = "{}/{}/commits?sha={}".format(org, repository, quote_plus(branch)) print("URL : " + url) response = requests.get(url, headers=headers) local_hash = repo.get_hash('HEAD~0') if response.status_code == 200: commits = response.json() if commits and commits[0].get('sha', local_hash) != local_hash: return False else: print("Could not find corresponding GitHub branch. Assuming current commit is latest.") return True
[docs]def get_gh_remote_org_repo_branch(**kw): """ Looks up local name for upstream remote repo, GitHub org, repository name, and current branch Like calling get_git_branch_info with `no_pr_lookup=True` :return: tuple containing 4 strings remote, org, repository, branch """ kw['no_pr_lookup'] = True # Make sure no one can override this; the function wouldn't make any sense otherwise. return get_git_branch_info(**kw)[0:4]
def _get_OMFIT_GitHub_credential(): user = OMFIT['MainSettings']['SERVER'].get('GITHUB_username', '') if not len(user): raise ValueError('No GitHub username has been set!') return user + ''
[docs]def get_OMFIT_GitHub_token(token=None): """ :param token: string or None Token for accessing GitHub None triggers attempt to decrypt from $ credential file Must be set up in advance with set_OMFIT_GitHub_token() function :return: GitHub token """ if not token: _, token = decrypt_credential(_get_OMFIT_GitHub_credential()) if len(token) == 40: return token if not len(token): raise ValueError( 'See to create a token, ' 'then use set_OMFIT_GitHub_token() to encrypt and store it.' ) raise ValueError('string `%s` is not a valid token' % token)
[docs]def set_OMFIT_GitHub_token(token): """ :param token: 40 chars Token string to be encrypted in $ credential file """ cred = _get_OMFIT_GitHub_credential() if not len(token): return reset_credential(credential=cred) if len(token) != 40: raise ValueError('string `%s` is not a valid token' % token) try: encrypt_credential(credential=cred, password='', otp=token) printi('GitHub token stored in:', cred) except Exception as e: printe('GitHub token save failed:', repr(e))
[docs]def convert_gh_time_str_datetime(t): """ Convert a GitHub (gh) time string to a datetime object :param t: A time string like """ if t is None: return None y, M, d = list(map(int, t.split('T')[0].split('-'))) h, m, s = list(map(int, t.split('T')[1].strip('Z').split(':'))) return datetime.datetime(y, M, d, h, m, s)
[docs]class OMFITgithub_paged_fetcher(list): """ Interact with GitHub via the GitHub api: to fetch data from a path{org}/{repo}/{path} that has paged results """ def __init__(self, org=None, repository=None, path='comments', token=None, selection=None): """{org}/{repo}/{path} :param org: string [optional] The organization that the repo is under, like 'gafusion'. If None, attempts to lookup with method based on git rev-parse. Falls back to gafusion on failure. :param repository: string [optional] The name of the repo on GitHub. If None, attempts to lookup with method based on git rev-parse. Falls back to OMFIT-source on failure. :param path: string The part of the repo api to access :param token: string or None Token for accessing GitHub None triggers attempt to decrypt from file (must be set up in advance). Passed to get_OMFIT_GitHub_token. :param selection: dict A dictionary such as {'state':'all'} """ self.token = get_OMFIT_GitHub_token(token) self.path = path # Avoid infinite recursion if org is None or repository is None: remote, org, repository, branch = get_gh_remote_org_repo_branch(org=org, repository=repository) self.url = '{org}/{repository}/{path}'.format(**locals()) if selection is None: self.selection = {} else: self.selection = selection self.current_page = 1 self.find_last_page()
[docs] def find_last_page(self): """ Find the last page number and sets self.last_page :returns: self.last_page """ req = requests.get(self.url + self.get_sel_str(page=1), headers={'Authorization': 'token %s' % self.token}) if hasattr(req, 'links') and 'last' in req.links and 'url' in req.links['last']: last = self.last_page_url = req.links['last']['url'] for s in last.split('?')[1].split('&'): if 'page' in s: self.last_page = int(s.split('=')[1].strip()) break else: self.last_page = 1 printd('Unable to find last page of GitHub paged results') return self.last_page
[docs] def fetch(self): """ Fetch the paged results from GitHub and store them in self :returns: self """ if (self.current_page == self.last_page) and (self.last_page > 1): printd('Already fetched last page; not fetching again') return start = self.current_page for i in range(self.current_page, self.last_page + 1): ascii_progress_bar(i, start, self.last_page, mess='Getting GitHub %s pages' % self.path, newline=False) try: req = requests.get(self.url + self.get_sel_str(page=i), headers={'Authorization': 'token %s' % self.token}) except KeyboardInterrupt: break json = req.json() # Some paths (such as requesting only a single pull request) only return a single dictionary if isinstance(json, dict): json = [json] self.extend(json) self.current_page = i return self
[docs] def get_sel_str(self, **kw): r""" Figure out how to get the selection part of the url, such as ?state=all&page=1, etc. :param \**kw: Any variables (such as page) to override self.selection set at initialization :return: An ampersand '&' separated string of key=value (and the question mark) """ sel = copy.deepcopy(self.selection) sel.update(kw) if len(sel): return '?' + '&'.join(['%s=%s' % (x[0], x[1]) for x in list(sel.items())]) else: return ''
@property def results(self): return self
[docs]def get_pull_request_number(return_info=False, **kw): """ Gets the pull request number associated for the current git branch if there is an open pull request. Passes parameters org, destination_org, branch, and repository to get_git_branch_info(). :param return_info: bool [optional] Return a dictionary of information instead of just the pull request number :return: int, dict-like, or None Pull request number if one can be found, otherwise None. If return_info: dictionary returned by OMFITgithub_paged_fetcher with 'org' key added. Contains 'number', too. """ kw['no_pr_lookup'] = False # Make sure no one can override this; the function wouldn't make any sense otherwise. pr_info = get_git_branch_info(**kw)[4] if return_info or pr_info is None: return pr_info else: return pr_info.get('number', None)
[docs]def find_gh_comments(thread=None, contains='automatic_regression_test_post', user=False, id_only=True, org=None, repository=None, **kw): r""" Looks up comments on a GitHub issue or pull request and searches for ones with body text matching `contains` :param thread: int or None int: issue or pull request number None: look up pull request number based on active branch name. Only works if a pull request is open. :param contains: string or list of strings Check for these strings within comment body text. They all must be present. :param user: bool or string [optional] True: only consider comments made with the current username (looks up GITHUB_username from MainSettings) string: only comments made by the specified username. :param id_only: bool Return only the comment ID numbers instead of full dictionary of comment info :param org: string [optional] The organization that the repo is under, like 'gafusion'. If None, attempts to lookup with method based on git rev-parse. Falls back to gafusion on failure. :param repository: string [optional] The name of the repo on GitHub. If None, attempts to lookup with method based on git rev-parse. Falls back to OMFIT-source on failure. :param \**kw: keywords to pass to OMFITgithub_paged_fetcher :return: list of dicts (id_only=False) or list of ints (id_only=True) """ assert contains is not None, 'Must specify text to match.' _, org, repository, _ = get_gh_remote_org_repo_branch(org=org, repository=repository) if thread is None: thread = get_pull_request_number(org=org, repository=repository) if thread is None: print( 'Automatic lookup of pull request # failed. Must provide a pull request or issue number to post ' 'comments to GitHub. Nothing will be posted. Goodbye.' ) info = OMFITgithub_paged_fetcher(org=org, path='issues/{}/comments'.format(thread), repository=repository, **kw).fetch() matching_comments = [cmt for cmt in info if np.all([c in cmt.get('body', '') for c in tolist(contains)])] if user is True: user = OMFIT['MainSettings']['SERVER'].get('GITHUB_username', False) if user: matching_comments = [cmt for cmt in matching_comments if cmt.get('user', {}).get('login', None) == user] userm = user else: userm = '(any user)' ids = [comment.get('id', None) for comment in matching_comments] printd( 'Found {} comments in {}/{}#{} containing {} and user matching {} with IDs: {}'.format( len(matching_comments), org, repository, thread, ' and '.join(tolist([repr(c) for c in contains])), userm, ids ), topic='omfit_github', ) if id_only: return ids return matching_comments
[docs]def delete_matching_gh_comments( thread=None, keyword=None, test=True, token=None, org=None, repository=None, quiet=False, exclude=None, exclude_contain=None, match_username=None, **kw, ): r""" Deletes GitHub comments that contain a keyword. Use CAREFULLY for clearing obsolete automatic test report posts. :param thread: int [optional] Supply issue or comment number or leave as None to look up an open pull request # for the active branch :param keyword: string or list of strings CAREFUL! All comments which match this string will be deleted. If a list is provided, every substring in the list must be present in a comment. :param test: bool Report which comments would be deleted without actually deleting them. :param token: string or None Token for accessing GitHub None triggers attempt to decrypt from $ credential file Must be set up in advance with set_OMFIT_GitHub_token() function :param org: string [optional] The organization that the repo is under, like 'gafusion'. If None, attempts to lookup with method based on git rev-parse. Falls back to gafusion on failure. :param repository: string [optional] The name of the repo on GitHub. If None, attempts to lookup with method based on git rev-parse. Falls back to OMFIT-source on failure. :param quiet: bool Suppress print output :param exclude: list of strings [optional] List of CIDs to exclude / protect from deletion. In addition to actual CIDs, the special value of 'latest' is accepted and will trigger lookup of the matching comment with the most recent timestamp. Its CID will replace 'latest' in the list. :param exclude_contain: list of strings [optional] If provided, comments must contain all of the strings listed in their body in order to qualify for exclusion. :param match_username: bool or string [optional] True: Only delete comments that match the current username. string: Only delete comments that match the specified username. :param \**kw: keywords to pass to find_gh_comments() :return: list of responses from requests (test=False) or list of dicts with comment info (test=True) response instances should have a `status_code` attribute, which is normally int(201) for successful GitHub posts and probably 4xx for failures. """ printd( 'delete_matching_gh_comments: thread = {thread:}, keyword = {keyword:}, test = {test:}, org = {org:}, ' 'repository = {repository:}, exclude = {exclude:}, exclude_contain = {exclude_contain:}, ' 'match_username = {match_username:}'.format(**locals()), topic='delete_comments', ) def printq(*args): if not quiet: print(*args) return gafusion_fallback_allowed = org is None # Don't allow fallback to gafusion if the original org is explicit # Setup github interfacing information token = get_OMFIT_GitHub_token(token=token) hed = {'Authorization': 'token ' + token} responses = [] remote, org, repository, branch = get_gh_remote_org_repo_branch(org=org, repository=repository) thread = thread or get_pull_request_number(org=org, repository=repository) # Lookup comments info = find_gh_comments( thread=thread, contains=keyword, id_only=False, token=token, org=org, repository=repository, user=match_username, **kw ) cids = [c['id'] for c in info] if (len(cids) == 0) and (org != 'gafusion') and gafusion_fallback_allowed: # This may be a cross-fork pull request where "org" is not gafusion but the pull request is on gafusion. # See if we can find the comments if we change the org to gafusion and search there. gafusion_info = find_gh_comments( thread=thread, contains=keyword, id_only=False, token=token, org='gafusion', repository=repository, user=match_username, **kw ) gafusion_cids = [c['id'] for c in gafusion_info] if len(gafusion_cids) > 0: info = gafusion_info cids = gafusion_cids printd('Found {} comments by changing org to gafusion'.format(len(cids)), topic='delete_comments') org = 'gafusion' else: printd('Changing org to gafusion did not help find comments', topic='delete_comments') elif len(cids) == 0: printd( "Nothing found in search for comments to delete; fallback method would've been nice, but wasn't attempted", topic='delete_comments', ) if org == 'gafusion': printd('Fallback to searching for comments on gafusion not attempted because org was already gafusion', topic='delete_comments') if not gafusion_fallback_allowed: printd( 'Fallback to searching for comments on gafusion not allowed because ' 'org was explicit in call to delete_matching_gh_comments()', topic='delete_comments', ) else: printd('Found some comments on the first attempt with org={}; skipping fallback search plan'.format(org), topic='delete_comments') # Handle exclusions exclude = tolist(exclude) if 'latest' in exclude and len(info): printq('Excluding latest matching comment...') dates = [datetime.datetime.strptime(c['created_at'], "%Y-%m-%dT%H:%M:%SZ") for c in info] exclude_ = cids[np.array(dates).argmax()] printq(' CID of latest comment is {}'.format(exclude_)) for i in range(len(exclude)): if exclude[i] == 'latest': exclude[i] = exclude_ if exclude_ not in exclude: printw('Warning: had to use fallback to put {} in exclude list'.format(exclude_)) exclude += [exclude_] else: printd('No exclusion for latest comment', topic='delete_comments') if exclude_contain is not None: for i in range(len(exclude)): for k in range(len(info)): if info[k]['id'] == exclude[i]: for ec in tolist(exclude_contain): if ec not in info[k]['body']: printq( 'Revoking protection for {} (it is no longer excluded from deletion) because ' 'it does not contain all of the substrings: {}'.format(exclude[i], exclude_contain) ) exclude[i] = None else: printd('No special requirements for excluded comments', topic='delete_comments') info = [info_ for info_ in info if info_['id'] not in exclude] cids = [c['id'] for c in info] # Test mode: return info without trying to delete anything if test: printq( 'Bad authorization; nothing would be deleted because of bad token.' if token is None else 'Authorization may be okay. You could actually delete these comments:' ) printq('{} comments would be deleted in #{}.'.format(len(cids), thread)) printq('No comments have actually been deleted because test=True') return info for cid in cids: url = '{org:}/{repo:}/issues/comments/{cid:}'.format(org=org, repo=repository, cid=cid) responses += [requests.delete(url, headers=hed)] printd('responses to deletion requests: {}'.format(responses), topic='delete_comments') printq('Ordered deletion of {} comments in {}/{}#{}'.format(len(responses), org, repository, thread)) return responses
def _define_new_content(old_content, new_content, mode='replace_between', separator='---', close_separator=None): """ Defines new content of a comment after editing. Used for assigning output of the edit_github_comment function and also predicting results. :param old_content: str Content of the comment's body before editing :param new_content: str New content to add (maybe as a replacement of some old content) :param mode: str Edit mode: 'replace', 'append', 'replace_between' separators :param separator: str Separator between old_content and new content :param close_separator: str [optional] A different separator at the end of the block of new content :return: str, None, or ValueError str: try to do the edit, using this string as the new comment body None: abort ValueError: This error should be raised. It's returned instead of raised here to help predict output in tests. """ if new_content is None: if mode == 'replace_between': if separator in old_content: before = old_content.split(separator)[0] after = separator.join(old_content.split(separator)[1:]) if close_separator is not None: if close_separator in old_content: after = close_separator.join(after.split(close_separator)[1:]) else: after = '' elif separator in after: after = separator.join(after.split(separator)[1:]) else: after = '' content = before + after else: print('No separators found in output. Aborting deletion of material between separators ' '({}).'.format(separator)) return None else: return ValueError("Can't accept None as content except in mode 'replace_between'") # new_content == '' is acceptable for modes replace and append, but has special behavior for replace_between elif mode == 'replace': content = new_content elif mode == 'append' and separator is None: content = old_content + '\n' + new_content elif mode == 'append': content = old_content + separator + new_content if close_separator is not None: content += close_separator elif mode == 'replace_between' and separator is None: return ValueError('Using mode: replace between separators requires specifying a separator.') elif mode == 'replace_between': if separator in old_content: if close_separator is None: content = old_content.split(separator) content[1] = new_content content = separator.join(content) else: before = old_content.split(separator)[0] after = separator.join(old_content.split(separator)[1:]) after = close_separator.join(after.split(close_separator)[1:]) content = before + separator + new_content + close_separator + after else: content = old_content + separator + new_content if close_separator is not None: content += close_separator else: return ValueError('Unrecognized mode: {}'.format(mode)) return content
[docs]def edit_github_comment( comment_mark=None, separator='---', new_content=None, mode='replace_between', thread=None, org=None, repository=None, token=None ): """ Edits GitHub comments to update automatically generated information, such as regression test reports. :param comment_mark: str or None None: edit top comment. str: Search for a comment (not including top comment) containing comment_mark as a substring. :param new_content: str New content to put into the comment Special cases: If content is None and mode == 'replace_between': Separate close separator and close separator present in target: del between 1st open sep and next close sep Separate close separator & close separator not present in target: del everything after 1st open sep All same separator & one instance present in target: delete it and everything after All same separator & multiple instances present: delete the first two and everything in between. If content is None and mode != 'replace_between', raise ValueError If None and mode == 'replace_between', but separator not in comment, abort but do not raise. :param separator: str or list of strings Substring that separates parts that should be edited from parts that should not be edited. `'---'` will put a horizontal rule in GitHub comments, which seems like a good idea for this application. If this is a list, the first and second elements will be used as the opening and closing separators to allow for different open/close around a section. :param mode: str Replacement behavior. 'replace': Replace entire comment. Ignores separator. 'append': Append new content to old comment. Places separator between old and new if separator is supplied. Closes with another separator if separate open/close separators are supplied; otherwise just places one separator between the original content and the addition. 'replace_between: Replace between first & second appearances of separator (or between open & close separators). Raises ValueError if separator is not specified. Acts like mode == 'append' if separator (or opening separator) is not already present in target comment. other: raises ValueError :param thread: int [optional] Issue or pull request number. Looked up automatically if not provided :param org: str [optional] GitHub org. Determined automatically if not supplied. :param repository: str [optional] GitHub repository. Determined automatically if not supplied. :param token: str Token for accessing GitHub. Decrypted automatically if not supplied. :return: response instance or None None if aborted before attempting to post, otherwise response instance, which is an object generated by the requests module. It should have a `status_code` attribute which is 2** on success and often 4** for failures. """ if separator is not None: seps = tolist(separator) separator = seps[0] if len(seps) > 1: close_separator = seps[1] else: close_separator = None else: close_separator = None if separator is not None and '\n' not in separator: printw('edit_github_comment: Your life would be better if you included a line break in your separator.') # Figure out remote, org, etc. so ensure it's consistent throughout all function calls token = get_OMFIT_GitHub_token(token=token) remote, org_, repository_, branch, pr_info = get_git_branch_info( destination_org=org, repository=repository, no_pr_lookup=False, return_pr_destination_org=True ) org = org or org_ repository = repository or repository_ thread = thread or (pr_info.get('number', None) if pr_info is not None else None) if thread is None: print( 'Automatic lookup of pull request # failed. Must provide a pull request or issue number to post ' 'comments to GitHub. Nothing will be posted. Goodbye.' ) return None # Get old content and cid if needed if comment_mark is not None: matching = find_gh_comments(contains=comment_mark, org=org, repository=repository, thread=thread, token=token, id_only=False) cid = matching[0]['id'] old_content = matching[0]['body'] elif mode in ['append', 'replace_between']: # The top "comment" isn't really a comment by GitHub's naming convention, apparently. old_content = OMFITgithub_paged_fetcher(org=org, repository=repository, path='issues/{}'.format(thread), token=token).fetch()[0][ 'body' ] cid = None else: old_content = '' # Mode 'replace' just replaces, so no one cares what this is and we can save time fetching it. cid = None # Deal with different line break conventions; \r\n moves to the start of the line and then breaks, \n just breaks. # On Unix, \n should be used. \r\n is used by Windows and apparently web stuff like GitHub. It's only important # here because it breaks the str comparison. GitHub will convert \n into whatever it needs. if old_content is None: old_content = '' old_content = old_content.replace('\r\n', '\n') # Define content to post content = _define_new_content(old_content, new_content, mode=mode, separator=separator, close_separator=close_separator) if content is None: return None elif isinstance(content, ValueError): raise content # Edit the comment on GitHub! hed = {'Authorization': 'token ' + token} data = {'body': content} if comment_mark is None: # The issue itself (not a real comment) url = '{org:}/{repo:}/issues/{thread:}'.format(org=org, thread=thread, repo=repository) else: # A comment on the issue url = '{org:}/{repo:}/issues/comments/{cid:}'.format(org=org, repo=repository, cid=cid) response = requests.patch(url, json=data, headers=hed) return response
[docs]def post_comment_to_github(thread=None, comment=None, org=None, fork=None, repository=None, token=None): """ Posts a comment to a thread (issue or pull request) on GitHub. Requires setup of a GitHub token to work. This function may be tested on fork='gafusion', thread=3208 if needed. Setup:: 1. Create a GitHub token with the "repo" (Full control of private repositories) box checked. See . 2. [Optional] Safely store the token to disk by executing: set_OMFIT_GitHub_token(token) This step allows you to skip passing the token to the function every time. :param thread: int [optional] The number of the issue or pull request within the fork of interest If not supplied, the current branch name will be used to search for open pull requests on GitHub. :param comment: string The comment to be posted :param org: string [optional] Leave this as gafusion to post to the main OMFIT repo. Enter something else to post on a fork. :param fork: string [optional] Redundant with org. Use org instead. Fork is provided for backwards compatibility :param repository: string [optional] The name of the repo on GitHub. If None, attempts to lookup with method based on git rev-parse. Falls back to OMFIT-source on failure. You should probably leave this as None unless you're doing some testing, in which case you may use the regression_notifications repository under gafusion. :param token: string or None Token for accessing GitHub None triggers attempt to decrypt from $ credential file Must be set up in advance with set_OMFIT_GitHub_token() function :return: response instance As generated by requests. It should have a `status_code` attribute, which is normally int(201) for successful GitHub posts and probably 4xx for failures. """ import requests if org is None: org = fork if comment is None: print('Provide a comment if you want to post a comment.') return remote, org_, repository_, branch, pr_info = get_git_branch_info( destination_org=org, repository=repository, no_pr_lookup=False, return_pr_destination_org=True ) org = org or org_ repository = repository or repository_ thread = thread or (pr_info.get('number', None) if pr_info is not None else None) if thread is None: print( 'Automatic lookup of pull request # failed. Must provide a pull request or issue number to post ' 'comments to GitHub. Nothing will be posted. Goodbye.' ) return token = get_OMFIT_GitHub_token(token=token) hed = {'Authorization': 'token ' + token} data = {'body': comment} url = '{org:}/{repo:}/issues/{thread:}/comments'.format(org=org, thread=thread, repo=repository) response =, json=data, headers=hed) return response
[docs]def set_gh_status( org=None, repository=None, commit=None, state=None, context='regression/auto', description='', target_url=None, destination_org=None ): """ Updates the status of a pull request on GitHub. Appears as green check mark or red X at the end of the thread. :param org: string [optional] The organization that the repo is under, like 'gafusion'. If None, attempts to lookup with method based on git rev-parse. Falls back to gafusion on failure. :param destination_org: string [optional] Used for cross-fork pull requests: specify the destination org of the pull request. The pull request actually exists on this org, but it is not where the source branch lives. Passed to get_pull_request_number when determining whether a pull request is open. Defines first org to check. If None it defaults to same as org :param repository: string [optional] The name of the repo on GitHub. If None, attempts to lookup with method based on git rev-parse. Falls back to OMFIT-source on failure. :param commit: commit hash or keyword 'latest' or 'HEAD~0': Look up latest commit. This is appropriate if you have reloaded modules and classes as needed. 'omfit' or None: use commit that was active when OMFIT launched. Appropriate if testing classes as loaded during launch and not reloaded since. else: treats input as a commit hash and will fail if it is not one. :param state: string or bool 'success' or True: success -> green check 'failure' or False: problem -> red X 'pending' -> yellow circle :param context: string Match the context later to update the status :param description: string A string with a description of the status. Up to 140 characters. Long strings will be truncated. Line breaks, quotes, parentheses, etc. are allowed. :param target_url: string URL to associate with the status :return: response instance s generated by `requests`. It should have a `status_code` attribute, which is normally int(201) for successful GitHub posts and probably 4xx for failures. """ if commit in ['latest', 'HEAD~0']: printd('Using latest commit (assuming modules and classes have been reloaded as needed)') sha = repo.get_hash('HEAD~0') elif commit in ['omfit', None]: printd('Using active commit at time of OMFIT launch') try: sha = repo.get_hash(repo_active_branch_commit) except NameError: repo_active_branch, repo_active_branch_commit, repo_str = repo.active_branch() sha = repo.get_hash(repo_active_branch_commit) else: sha = commit remote, org, repository, branch, pr_info = get_git_branch_info(org=org, repository=repository, destination_org=destination_org) url = '{org}/{repository}/statuses/{sha}' url = url.format(org=org, repository=repository, sha=sha) h = {'Authorization': 'token ' + get_OMFIT_GitHub_token()} final_state = 'success' if state is True else ('failure' if state is False else state) printd('set_gh_status(): state = {}, context = {}, description = {}, sha = {}'.format(final_state, context, description, sha)) if target_url is None: target_url = os.environ.get('BUILD_URL', None) data = { 'state': final_state, 'context': context, 'description': description[:140], # GitHub status has a 140 char length limit 'target_url': target_url, } response =, json=data, headers=h) return response
############################################ if '__main__' == __name__: test_classes_main_header() if False: # remove test since token is encrypted with ssh key which has passphrase#os.environ.get('USER','') in ['eldond','smithsp']: comments = OMFITgithub_paged_fetcher() assert comments.last_page > 1 assert convert_gh_time_str_datetime('2018-12-05T15:40:30Z') == datetime.datetime(2018, 12, 5, 15, 40, 30) post_comment_to_github(thread=3, comment='Command line import test comment', fork='smithsp') else: print('No tests of omfit_github, because no guaranteed token')