Source code for omfit_classes.utils_fit

import sys

try:
    # framework is running
    from .startup_choice import *
except ImportError as _excp:
    # class is imported by itself
    if (
        'attempted relative import with no known parent package' in str(_excp)
        or 'No module named \'omfit_classes\'' in str(_excp)
        or "No module named '__main__.startup_choice'" in str(_excp)
    ):
        from startup_choice import *
    else:
        raise

if framework:
    print('Loading fit utility functions...')

from omfit_classes.utils_base import *
from omfit_classes.utils_base import _available_to_user_math
from omfit_classes.utils_math import *
import numpy as np


# -----------
# profiles fitting
# -----------
[docs]@_available_to_user_math def autoknot(x, y, x0, evaluate=False, minDist=None, minKnotSpacing=0, s=3, w=None, allKnots=False, userFunc=None, *args, **kwargs): """ This function returns the optimal location of the inner-knots for a nth degree spline interpolation of y=f(x) :param x: input x array :param y: input y array :param x0: initial knots distribution (list) or number of knots (integer) :param s: order of the spline :param w: input weights array :param allKnots: returns all knots or only the central ones exluding the extremes :param userFunc: autoknot with user defined function with signature `y0=userFunc(x,y)(x0)` :param minDist: a number between >0 and infinity (though usually <1), which sets the minimum distance between knots. If small knots will be allowed to be close to one another, if large knots will be equispaced. Use `None` to automatically determine this parameter based on: `0.01*len(knots)` If minDist is a string than it will be evaluated (the knots locations in the string can be accessed as `knots`). :param minKnotSpacing: a number in x input units that denotes the minimal inter-knot space that autoknot should aim for. It shows up as an addition term in the cost function and is heavily punished against. If x is too large it will force autoknow to output evenly spaced knots. Default to 0. (ie no limit) :return: x0 optimal location of inner knots to spline interpolate y=f(x) f1=interpolate.LSQUnivariateSpline(x,y,x0,k=s,w=w) """ def _func_arbitrary(xd, s): x0 = np.cumsum([0] + xd.tolist()) x0 = x0 / max(x0) if x0[1] < ux[1] or x0[-2] > ux[-2]: x0 = (x0[1:-1] - x0[1]) / (x0[-2] - x0[1]) * (ux[-2] - ux[1]) + ux[1] x0 = [ux[0]] + x0.tolist() + [ux[-1]] x0 = np.array(x0) x0 = x0 / max(x0) y0 = userFunc(x, y)(x0) def cost(y0, func, w): y1 = func(x0, y0)(x) return np.sqrt(np.mean((abs(y - y1) * w) ** 2)) bounds = np.reshape([min(y), max(y)] * len(y0), (-1, 2)) y0 = optimize.fmin_tnc(cost, y0, args=(userFunc, w), bounds=bounds, approx_grad=True, pgtol=1e-1)[0] y1 = userFunc(x0, y0)(x) return y1, y0, x0 def _func(xd, s): x0 = np.cumsum([0] + xd.tolist()) x0 = x0 / max(x0) if x0[1] < ux[1] or x0[-2] > ux[-2]: x0 = (x0[1:-1] - x0[1]) / (x0[-2] - x0[1]) * (ux[-2] - ux[1]) + ux[1] x0 = [ux[0]] + x0.tolist() + [ux[-1]] x0 = np.array(x0) x0 = x0 / max(x0) try: f1 = interpolate.LSQUnivariateSpline(x, y, x0[1:-1], k=s, w=w) except ValueError as _excp: raise OMFITexception(_excp) y0 = f1(x0) y1 = f1(x) return y1, y0, x0 if userFunc is not None: func = _func_arbitrary else: func = _func def cost(xd, s, w, minKnotSpacing=0): # xd: space between knots (will be normalized) # s: spline degree # w: weight array # minKnotSpacing: default to no limit try: y1, y0, x0 = _func(xd, s) # y1: fit at data locs (x) # y0: fit at knots (x0) # x0: knot locs (normalized) except ValueError: return np.inf c = np.sqrt(np.mean((abs(y - y1) * w) ** 2)) if np.isnan(c): c = max(abs(y)) ** 2 if any((x0[1:] - x0[:-1]) < minKnotSpacing): c += (max(abs(y)) * (minDist / min(x0[1:] - x0[:-1]))) ** 2 return c if w is None: w = x * 0 + 1 xm = min(x) xM = max(x) ux = (np.unique(x) - xm) / (xM - xm) if is_int(x0): # initial guess if allKnots: n = x0 - 2 else: n = x0 x0 = [] xt = np.linspace(np.unique(x)[1], np.unique(x)[-2], int(n * (3 + np.log(n)))).tolist() for kkk in range(n): x0.append(np.nan) started = True for xt_chosen in xt: x0[kkk] = xt_chosen try: y1 = interpolate.LSQUnivariateSpline(x, y, sorted(x0), k=s, w=w)(x) except ValueError as _excp: raise OMFITexception(_excp) c = np.sqrt(np.mean((abs(y - y1) * w) ** 2)) if started or c < mincost: mincost = c minxost_x = xt_chosen started = False xt.pop(xt.index(minxost_x)) x0[kkk] = minxost_x x0 = [xm, xM] + x0 x0 = np.array(sorted(x0)) x0 = np.atleast_1d(x0).tolist() if min(x0) != xm: x0 = [xm] + x0 if max(x0) != xM: x0 = x0 + [xM] x0 = np.atleast_1d(x0) x = (x - xm) / (xM - xm) x0 = (x0 - xm) / (xM - xm) x[-1] = x0[-1] = 1 x[0] = x0[0] = 0 x0 = sorted(x0) xd = np.diff(x0) n = len(xd) # set minimum knots distance knots = x0 if minDist is None: minDist = 0.01 * len(knots) elif isinstance(minDist, str): minDist = eval(minDist) bounds = np.reshape([minDist, 1] * n, (-1, 2)) if evaluate: y1_, y0_, x0 = func(xd, s) if allKnots: return y1_, y0_ else: return y1_, y0_[1:-1] minKnotSpacing = minKnotSpacing / (xM - xm) # normalize before passing in xd = optimize.fmin_l_bfgs_b(cost, xd, args=(s, w, minKnotSpacing), bounds=bounds, approx_grad=True)[0] y1, y0, x0 = func(xd, s) x0 = x0 * (xM - xm) + xm if allKnots: return x0, y0 else: return x0[1:-1]
[docs]class knotted_fit_base(object): """ The base class for the types of fits that have free knots and locations """ kw_vars = ['min_slope', 'monotonic', 'min_dist', 'first_knot', 'knots', 'fixed_knots', 'fit_SOL', 'outliers'] kw_defaults = [None, False, 0, None, 3, False, False, 3] def __init__(self, x, y, yerr): """ Does basic checking for x,y,yerr then stores them in self.x, self.y, self.yerr such that x is monotonically increasing """ valid_indices = ~np.isnan(x) & ~np.isnan(y) & ~np.isnan(yerr) & (yerr > 0) if not np.any(valid_indices): raise OMFITexception('No valid data passed to ' + self.__class__.__name__) sort_index = np.argsort(x[valid_indices]) self.x_orig = self.x = x[valid_indices][sort_index] self.y_orig = self.y = y[valid_indices][sort_index] self.e_orig = self.e = yerr[valid_indices][sort_index] self.all_one_sign = not (min(self.y) < 0 and max(self.y) > 0)
[docs] def fit_knot_range(self, knots_range, **kw): r""" Try a range of number of knots Stop the loop if the current number of knots did no better than the previous best :param knots_range: A tuple, ``(min_num_knots, max_num_knots)`` passed to ``range`` :param \**kw: The keywords passed to ``fit_single`` """ redchi = 1e4 tag = None for i in range(*knots_range): kw['knots'] = i self.fit_single(**kw) best_tag, best_redchi = self.get_best_fit() if best_redchi == redchi: return else: redchi = best_redchi tag = best_tag
[docs] def get_tag(self, **kw): r""" Get the tag for the settings given by ``**kw`` :param \**kw: The Fit Keywords documented in the ``__init__`` method """ tag = [] for default, var in zip(self.kw_defaults, self.kw_vars): val = kw.get(var, self.kw_orig[var]) if np.array(val != self.kw_orig[var]).any(): tag.append('%s=%s' % (var, val)) if tag: return ';'.join(tag) return 'default'
[docs] def fit_single(self, **keyw): r""" Perform a single fit for the given ``**keyw`` :param \**keyw: The Fit Keywords documented in the ``__init__`` method :return: The ``lmfit.MinimizerResult`` instance for the fit The fit is also stored in ``self.fits[self.get_tag(**keyw)]`` """ kw = copy.deepcopy(self.kw_orig) kw.update(keyw) outliers = kw['outliers'] # For variable knots, get a guess on the knot values from the fixed_knot solution if not kw['fixed_knots']: kw['fixed_knots'] = True kw['outliers'] = 0 self.fit_single(**kw) kw['fixed_knots'] = False # Determine whether to use the scrape off layer points ind_valid = self.x_orig >= 0 if not kw['fit_SOL']: ind_valid = ind_valid & (self.x_orig <= 1) # Always start with original data self.restore_orig_data() # Do the first fit with all data if outliers and outliers > 0: kw['outliers'] = 0 params = self.build_params(**kw) tag_no_outliers = self.get_tag(**kw) self.fit_single(**kw) kw['outliers'] = outliers self.restore_orig_data() diff = self.residual(self.fits[tag_no_outliers].params) ind_valid = ind_valid & (diff <= outliers) | (self.x_orig == 0) # Only fit the valid data (stored in self.x, self.y, self.e) self.x = self.x_orig[ind_valid] self.y = self.y_orig[ind_valid] self.e = self.e_orig[ind_valid] params = self.build_params(**kw) tag = self.get_tag(**kw) if tag in self.fits: return self.fits[tag] # Store the lmfit.MinimizerResult in self.fits, along with data, outliers, and kw # out = lmfit.minimize(self.residual,params,method='nelder') self.fits[tag] = lmfit.minimize(self.residual, params) self.fits[tag].x = self.x self.fits[tag].y = self.y self.fits[tag].e = self.e self.fits[tag].xo = self.x_orig[~ind_valid] self.fits[tag].yo = self.y_orig[~ind_valid] self.fits[tag].eo = self.e_orig[~ind_valid] self.fits[tag].kw = kw self.restore_orig_data() return self.fits[tag]
[docs] def restore_orig_data(self): '''Restore the original data''' self.x = self.x_orig self.y = self.y_orig self.e = self.e_orig
[docs] def get_zk(self, params): """ Return the array of knot values from the ``lmfit.Parameters`` object """ return np.array([params['zk%d' % i].value for i in range(params['nk'].value)])
[docs] def get_xk(self, params): """ Return the array of knot locations from the ``lmfit.Parameters`` object """ return np.array([params['xk%d' % i].value for i in range(params['nk'].value)])
[docs] def valid_xk(self, params): """ :param params: An ``lmfit.Paramaters`` object to check :return: True if np.all(min_dist< xk_i-xk_(i-1))) """ dxk = np.diff(self.get_xk(params)) return (dxk > min([params['dxk%d' % i].min for i in range(1, params['nk'].value)])).all()
[docs] def residual(self, params): """ Return the array of np.sqrt(((ymodel-y)/yerr)**2) given the ``lmfit.Parameters`` instance Developers note: For the default data set tested, choosing the sqrt of the square did better than the signed residual """ ymodel = self.model(params, self.x) return np.sqrt((((ymodel - self.y) / self.e)) ** 2)
[docs] def get_param_unc(self, lmfit_out): """ Get parameters with correlated uncertainties :param lmfit_out: ``lmfit.MinimizerResult`` instance :return: tuple of xk, zk, ybc, xbc """ result = {} for k in lmfit_out.params: result[k] = lmfit_out.params[k].value if lmfit_out.errorbars: corr_params = uncertainties.correlated_values( [lmfit_out.params[k].value for k in lmfit_out.var_names], lmfit_out.covar ) # ,labels=lmfit_out.var_map) for vi, var in enumerate(lmfit_out.var_names): result[var] = corr_params[vi] nk = result['nk'] xk = np.array([result['xk%d' % i] for i in range(nk)]) zk = np.array([result['zk%d' % i] for i in range(nk)]) ybc = result['ybc'] xbc = result['xbc'] return xk, zk, ybc, xbc
[docs] def plot(self, **kw): r""" Plot all fits calculated so far, each in its own tab of a ``FigureNotebook``, where the tab is labeled by the shortened tag of the tag of the fit :param \**kw: Dictionary passed to self.plot_individual_fit :return: The ``FigureNotebook`` instance created """ from omfit_plot import FigureNotebook fn = FigureNotebook(nfig=len(self.fits), labels=list(map(self.short_tag, sorted(self.fits.keys())))) for k, v in list(self.fits.items()): fg = fn.add_figure(label=self.short_tag(k)) ax = fg.use_subplot(111) self.plot_individual_fit(k, ax=ax, **kw) fn.draw() return fn
[docs] def short_tag(self, tag): '''Return a shortened version of the tag''' return tag.replace('knots', 'k').replace('fixed', 'f').replace('monotonic', 'm').replace('fit_', '')
[docs] def plot_individual_fit(self, tag, ax=None, x=np.linspace(0, 1.1, 1001)): """ Plot a single fit, characterized by ``tag`` :param tag: The tag of the fit that is to be plotted, must be in self.fits.keys() :param ax: The axes to plot into (one is created if ``None``) :param x: The x values to use for plotting the fitted curve """ from omfit_classes.utils_plot import uband if tag not in self.fits: printe('%s is not a valid tag' % tag) printe('Use one of:') printe('\n'.join(list(self.fits.keys()))) return k = tag v = self.fits[k] if ax is None: from matplotlib import pyplot ax = pyplot.gca() ax.set_title(r'%s: Reduced $\chi^2$=%g' % (self.short_tag(k), v.redchi)) # Plot data ax.errorbar(v.x, v.y, v.e, linestyle='', label='Data') max_x = max(v.x) * 1.01 # Plot outliers if len(v.xo): ax.errorbar(v.xo, v.yo, v.eo, color='m', linestyle='', label='Outlier') max_x = max([max(v.x), max(v.xo)]) * 1.01 ax.set_xlim(left=0, right=max_x) # Plot fit uband(x, self.model(v.params, x, lmfit_out=v), zorder=100, label='Fit', ax=ax)
[docs] def has_large_xerrorbars(self, lmfit_out): """ :param lmfit_out: A ``lmfit.MinimizerResult`` object :return: True if the errorbars of the knot locations are larger than the distance between knots """ v = lmfit_out if v.errorbars: xk = self.get_param_unc(v)[0] dxk = np.diff(nominal_values(xk)) if (std_devs(xk[1:-1]) > dxk[:-1]).any(): return True if (std_devs(xk[1:-1]) > dxk[1:]).any(): return True return False return None
[docs] def has_large_errorbars(self, lmfit_out, verbose=False): """ :param lmfit_out: A ``lmfit.MinimizerResult`` object :return: True if any of the following are False: 1) the errorbars of the knot locations are smaller than the distance between the knots 2) the errorbars in the fit at the data locations is not larger than the range in data 3) the errorbars in the fit at the data locations is not larger than the fit value at that location, if the data are all of one sign """ v = lmfit_out if v.errorbars: if self.has_large_xerrorbars(v): if verbose: printi('Fit has large xerrorbars') return True ymodel = self.model(v.params, v.x, v) if self.all_one_sign and (std_devs(ymodel) > abs(nominal_values(ymodel))).any(): if verbose: printi('Fit has errorbars larger than the fit itself:') return True if (std_devs(ymodel) > (abs(v.y).max() - abs(v.y).min())).any(): if verbose: printi('Fit has errorbars larger than the range in data') return True return False return None
[docs] def get_best_fit(self, verbose=False, allow_no_errorbar=None): """ Figure out which is the best fit so far The best fit is characterized as being the fit with the lowest reduced chi^2 that is valid. The definition of valid is 1) the knots are in order 2) the knots are at least min_dist apart 3) the errorbars on the fit parameters were able to be determined 4) the errorbars of the knot locations are smaller than the distance between the knots 5) the errorbars in the fit at the data locations is not larger than the range in data 6) the errorbars in the fit at the data locations is not larger than the fit value at that location :param verbose: If ``True``, print the tag and reduced chi2 of all fits :param allow_no_errorbar: If ``True``, if there is no valid fit found with errorbars, return the best fit without errorbars :return: A tuple of (best_tag, best_reduced_chi2) """ if allow_no_errorbar is None: allow_no_errorbar = self.allow_no_errorbar best_tag = None best_redchi = 1e4 if verbose: printi('Reduced chi^2; fit type') for k, v in list(self.fits.items()): if verbose: printi(v.redchi, k) if not v.errorbars: if verbose: printi('%s rejected because no errorbars' % k) continue if v.redchi > best_redchi: if verbose: printi('%s rejected because redchi too large' % k) continue xk = self.get_param_unc(v)[0] dxk = np.diff(nominal_values(xk)) if v.kw['min_dist'] < 0 and not (dxk >= abs(v.kw['min_dist'])).all(): if verbose: printi('%s rejected because knots too close together' % k) continue if self.has_large_errorbars(v, verbose=verbose): if verbose: printi('%s rejected because errorbars too large' % k) continue best_tag = k best_redchi = v.redchi if best_tag is None and allow_no_errorbar: if verbose: printi('Looking for best fit without errorbars') for k, v in list(self.fits.items()): if verbose: printi(v.redchi, v.errorbars, k) if not v.errorbars and (v.redchi < best_redchi): best_tag = k best_redchi = v.redchi return best_tag, best_redchi
[docs] def plot_best_fit(self, **kw): '''A convenience function for plotting the best fit''' best_tag, best_redchi = self.get_best_fit() if best_tag: self.plot_individual_fit(best_tag, **kw) else: printi('No valid fit:') self.get_best_fit(verbose=True)
def __call__(self, x): """ Evaluate the best fit at ``x`` """ best_tag = self.get_best_fit()[0] if best_tag: return self.model(self.fits[best_tag].params, x, self.fits[best_tag]) return np.array([np.nan] * len(x))
[docs]class fitSL(knotted_fit_base): """ Fit a profile of data using integrated scale lengths, ideally obtaining uncertainties in the fitting parameters. Due to the nature of integrating scale lengths, this fitter is only good for data > 0 or data < 0. :Examples: >>> pkl = OMFITpickle(OMFITsrc+'/../samples/data_pickled.pkl') >>> x = pkl['x'][0,:] >>> y = pkl['y'][0,:] >>> yerr = pkl['e'][0,:] >>> fit = fitSL(x,y,yerr,fixed_knots=True,knots=-7,plot_best=True) Along the way of obtaining the fit with the desired parameters, other intermediate fits may be obtained. These are stored in the ``fits`` attribute (a ``dict``), the value of whose keys provide an indication of how the fit was obtained, relative to the starting fit. For instance, to provide a variable knot fit, a fixed knot (equally spaced) fit is performed first. Also an initial fit is necessary to know if there are any outliers, and then the outliers can be detected. The ``get_best_fit`` method is useful for determing which of all of the fits is the best, meaning the valid fit with the lowest reduced chi^2. Here valid means 1. the knots are in order 2. the knots are at least min_dist apart 3. the errorbars on the fit parameters were able to be determined 4. the errorbars of the knot locations are smaller than the distance between the knots Note that 1) and 2) should be satisfied by using lmfit Parameter constraints, but it doesn't hurt to double check :-) Developer note: If the fitter is always failing to find the errorbars due to tolerance problems, there are some tolerance keywords that can be passed to ``lmfit.minimize``: ``xtol``, ``ftol``, ``gtol`` that could be exposed. """ def __init__( self, x, y, yerr, knots=3, min_dist=0, first_knot=None, fixed_knots=False, fit_SOL=False, monotonic=False, min_slope=None, outliers=3, plot_best=False, allow_no_errorbar=False, ): """ Initialize the fitSL object, including calculating the first fit(s) :param x: The x values of the data :param y: The values of the data :param yerr: The errors of the data Fit Keywords: :param knots: * Positive integer: Use this number of knots as default (>=3) * Negative integer: Invoke the ``fit_knot_range`` method for the range (3,abs(knots)) * list-like: Use this list as the starting point for the knot locations :param min_dist: The minimum distance between knot locations * min_dist > 0 (faster) enforced by construction * min_dist < 0 (much slower) enforced with lmfit :param first_knot: The first knot can be constrained to be above first_knot. The default is above ``min(x)+min_dist`` (The zeroth knot is at 0.) :param fixed_knots: If ``True``, do not allow the knot locations to change :param fit_SOL: If ``True``, include data points with x>1 :param monotonic: If ``True``, only allow positive scale lengths :param min_slope: Constrain the scale lengths to be above ``min_slope`` :param outliers: Do an initial fit, then throw out any points a factor of ``outliers`` standard deviations away from the fit Convenience Keywords: :param plot_best: Plot the best fit :param allow_no_errorbar: If ``True``, ``get_best_fit`` will return the best fit without errorbars if no valid fit with errorbars exists """ knotted_fit_base.__init__(self, x, y, yerr) y = self.y_orig if (np.array(y) < 0).any() and (np.array(y) > 0).any(): raise OMFITexception('The scale length fitter is not appropriate for mixed sign data (y < 0 and y > 0)') self.sign = 1 if (np.array(y) < 0).any(): self.sign = -1 self.y_max = max(abs(self.y_orig)) orig_knots = knots knots_range = None if isinstance(knots, int) and knots < 0: knots = abs(knots) knots_range = (3, knots + 1) self.allow_no_errorbar = allow_no_errorbar self.kw_orig = {} for var in self.kw_vars: self.kw_orig[var] = eval(var) self.fits = {} if knots_range: self.fit_knot_range(knots_range, **self.kw_orig) else: self.fit_single(**self.kw_orig) if plot_best: self.plot_best_fit()
[docs] def build_params(self, **keyw): r""" Build the ``lmfit.Parameters`` object needed for the fitting :param \**keyw: The Fit Keywords documented in the ``__init__`` method :return: The ``lmfit.Parameters`` translation of the settings given by ``**keyw`` """ kw = copy.deepcopy(self.kw_orig) kw.update(keyw) tag = self.get_tag(**kw) # Put the Fit Keywords in the current namespace min_slope = kw['min_slope'] monotonic = kw['monotonic'] min_dist = kw['min_dist'] first_knot = kw['first_knot'] knots = kw['knots'] fixed_knots = kw['fixed_knots'] fit_SOL = kw['fit_SOL'] outliers = kw['outliers'] # Determine the maximum x to be fit max_x = max([max(self.x), 1]) if not fit_SOL: max_x = 1 # Determine whether knots is the number of knots or the knot locations if isinstance(knots, int): if knots < 3: raise ValueError('The value of knots must be >= 3') nk = knots if first_knot is not None: knots = np.array([0] + list(np.linspace(first_knot, max_x, nk - 1))) else: knots = np.linspace(0, max_x, nk) elif np.iterable(knots): nk = len(knots) if (knots[0] != 0 or knots[-1] != max_x) and not fixed_knots: print('Resetting knots[0],knots[-1] to 0,%g' % max_x) knots[-1] = max_x knots[0] = 0 else: raise ValueError('knots must be an integer >= 3 or an iterable') # Check initial knots if min_dist < 0 and (np.diff(knots) < abs(min_dist)).any(): raise ValueError('Initial knots violated min_dist: %s' % knots) # Convert monotonic to min_slope if monotonic: if min_slope: min_slope = max([min_slope, 0]) else: min_slope = 0 # add lmfit parameters params = lmfit.Parameters() # number of knots params.add('nk', value=len(knots), vary=False) nk = params['nk'].value if min_dist >= 0: # knot locations for i in range(nk): params.add( 'xk%d' % i, value=knots[i], min=abs(min_dist) * i, max=knots[-1] - (nk - i - 1) * abs(min_dist), vary=not fixed_knots ) params['xk0'].vary = False params['xk%d' % (nk - 1)].vary = False else: # knot difference constraints params.add('xk0', value=knots[0], vary=False) for i in range(1, nk): params.add( 'dxk%d' % i, value=knots[i] - knots[i - 1], min=abs(min_dist), max=(knots[-1] - knots[0] - abs(min_dist) * (nk - 2)), vary=not fixed_knots, ) denom = '(' + '+'.join(['dxk%d' % k for k in range(1, nk)]) + ')' num = '(%s-xk0)' % (knots[-1]) for i in range(1, nk - 1): params.add( 'xk%d' % i, value=knots[i], expr='xk0+(%s)/%s*%s' % ('+'.join(['dxk%d' % k for k in range(1, i + 1)]), denom, num) ) params.add('xk%d' % (nk - 1), value=knots[-1], vary=False) # guess of value of z guess_z = [(max(abs(self.y)) - min(abs(self.y))) / (max(self.x) - min(self.x)) / (np.mean(abs(self.y)))] * len(knots) guess_ybc = abs(self.y[0]) / self.y_max # If the fixed knot solution exists, use it as a starting guess if not fixed_knots: kw['fixed_knots'] = True fixed_tag = self.get_tag(**kw) kw['fixed_knots'] = False if fixed_tag in self.fits: guess_z = self.get_zk(self.fits[fixed_tag].params) guess_ybc = self.fits[fixed_tag].params['ybc'].value # zeroth knot value if knots[0] == 0: params.add('zk0', value=0, vary=False) else: params.add('zk0', value=guess_z[0], min=min_slope) # knot values for i in range(1, nk): params.add('zk%d' % i, value=guess_z[i], min=min_slope) # knot boundary condition value params.add('ybc', value=guess_ybc, min=max([0, min(abs(self.y)) / 2.0 / self.y_max]), max=max(abs(self.y)) * 2.0 / self.y_max) # knot boundary condition location params.add('xbc', value=0, vary=False) # constraint on first knot location if first_knot: if 'dxk1' in params: params['dxk1'].min = max([first_knot, abs(min_dist)]) params['xk1'].min = first_knot else: if 'dxk1' in params: params['dxk1'].min = min(self.x) + abs(min_dist) params['xk1'].min = min(self.x) + abs(min_dist) # keeping last knot value positive if min_slope: params['zk%d' % (nk - 1)].min = max([0, min_slope]) else: params['zk%d' % (nk - 1)].min = 0 return params
[docs] def model(self, params, x, lmfit_out=None): """ Return the model integrated scale length curve at x :param params: The `lmfit.Parameters` object :param x: evaluate model at x :param lmfit_out: ``lmfit.MinimizerResult`` instance to use for getting uncertainties in the curve """ if not lmfit_out: xk = self.get_xk(params) zk = self.get_zk(params) xbc = params['xbc'].value ybc = params['ybc'].value return self.sign * self.y_max * integz(xk, zk, xbc, ybc, x) else: xk, zk, ybc, xbc = self.get_param_unc(lmfit_out) def wrappable_integz(xkzk, x): xbc = xkzk[0, -1] xk = xkzk[0, :-1] zk = xkzk[1, :-1] ybc = xkzk[1, -1] return integz(xk, zk, xbc, ybc, x) wrapped_integz = uncertainties.unumpy.core.wrap_array_func(wrappable_integz) return self.sign * self.y_max * wrapped_integz(np.array([xk.tolist() + [xbc], zk.tolist() + [ybc]]), x)
[docs] def plot(self, showZ=True, x=np.linspace(0, 1.1, 111)): """ Plot all fits calculated so far, each in its own tab of a ``FigureNotebook``, where the tab is labeled by the shortened tag of the tag of the fit :param showZ: Overplot the values of the inverse scale lengths in red :param x: The x values to use for plotting the fitted curve :return: The ``FigureNotebook`` instance created """ return knotted_fit_base.plot(self, showZ=showZ, x=x)
[docs] def plot_individual_fit(self, tag, ax=None, showZ=True, x=np.linspace(0, 1.1, 1001)): """ Plot a single fit, characterized by ``tag`` :param tag: The tag of the fit that is to be plotted, must be in self.fits.keys() :param ax: The axes to plot into (one is created if ``None``) :param showZ: Overplot the values of the inverse scale lengths in red :param x: The x values to use for plotting the fitted curve """ from omfit_classes.utils_plot import uerrorbar knotted_fit_base.plot_individual_fit(self, tag, x=x, ax=ax) k = tag v = self.fits[k] if ax is None: from matplotlib import pyplot ax = pyplot.gca() # Plot scale lengths if showZ: ax.spines['right'].set_color('red') ax2 = ax.twinx() xk, zk, ybc, xbc = self.get_param_unc(v) uerrorbar(xk, zk, color='red', label='Scale length', ax=ax2) ax2.set_ylim([min(nominal_values(zk) - std_devs(zk)), max(nominal_values(zk) + std_devs(zk))]) ax2.tick_params(axis='y', colors='red') ax2.set_xlim([0, 1.1]) ax2.axvline(1.0, color='k') ax2.set_ylabel(r'$\partial\log()/\partial\rho$', color='red')
[docs]class fitSpline(knotted_fit_base): """ Fit a spline to some data; return the fit with uncertainties """ def __init__( self, x, y, yerr, knots=3, min_dist=0, first_knot=None, fixed_knots=False, fit_SOL=False, monotonic=False, min_slope=None, outliers=3, plot_best=False, allow_no_errorbar=False, ): knotted_fit_base.__init__(self, x, y, yerr) self.y_max = self.y[np.argmax(abs(self.y))] orig_knots = knots knots_range = None if isinstance(knots, int) and knots < 0: knots = abs(knots) knots_range = (3, knots + 1) self.allow_no_errorbar = allow_no_errorbar self.kw_orig = {} for var in self.kw_vars: self.kw_orig[var] = eval(var) self.fits = {} if knots_range: self.fit_knot_range(knots_range, **self.kw_orig) else: self.fit_single(**self.kw_orig) if plot_best: self.plot_best_fit()
[docs] def build_params(self, **keyw): r""" Build the ``lmfit.Parameters`` object needed for the fitting :param \**keyw: The Fit Keywords documented in the ``__init__`` method :return: The ``lmfit.Parameters`` translation of the settings given by ``**keyw`` """ kw = copy.deepcopy(self.kw_orig) kw.update(keyw) tag = self.get_tag(**kw) # Put the Fit Keywords in the current namespace min_slope = kw['min_slope'] monotonic = kw['monotonic'] min_dist = kw['min_dist'] first_knot = kw['first_knot'] knots = kw['knots'] fixed_knots = kw['fixed_knots'] fit_SOL = kw['fit_SOL'] outliers = kw['outliers'] # Determine the maximum x to be fit max_x = max([max(self.x), 1]) if not fit_SOL: max_x = 1 # Determine whether knots is the number of knots or the knot locations if isinstance(knots, int): if knots < 3: raise ValueError('The value of knots must be >= 3') nk = knots if first_knot is not None: knots = np.array([0] + list(np.linspace(first_knot, max_x, nk - 1))) else: knots = np.linspace(0, max_x, nk) elif np.iterable(knots): nk = len(knots) if (knots[0] != 0 or knots[-1] != max_x) and not fixed_knots: print('Resetting knots[0],knots[-1] to 0,%g' % max_x) knots[-1] = max_x knots[0] = 0 else: raise ValueError('knots must be an integer >= 3 or an iterable') # Check initial knots if min_dist < 0 and (np.diff(knots) < abs(min_dist)).any(): raise ValueError('Initial knots violated min_dist: %s' % knots) # Convert monotonic to min_slope if monotonic: raise NotImplementedError('Not yet available') # add lmfit parameters params = lmfit.Parameters() # number of knots params.add('nk', value=len(knots), vary=False) nk = params['nk'].value if False: # min_dist>=0: # knot locations for i in range(nk): params.add( 'xk%d' % i, value=knots[i], min=abs(min_dist) * i, max=knots[-1] - (nk - i - 1) * abs(min_dist), vary=not fixed_knots ) params['xk0'].vary = False params['xk%d' % (nk - 1)].vary = False if True: # knot difference constraints params.add('xk0', value=knots[0], vary=False) for i in range(1, nk): params.add( 'dxk%d' % i, value=knots[i] - knots[i - 1], min=abs(min_dist), max=(knots[-1] - knots[0] - abs(min_dist) * (nk - 2)), vary=not fixed_knots, ) denom = '(' + '+'.join(['dxk%d' % k for k in range(1, nk)]) + ')' num = '(%s-xk0)' % (knots[-1]) for i in range(1, nk - 1): params.add( 'xk%d' % i, value=None, # knots[i], expr='xk0+(%s)/%s*%s' % ('+'.join(['dxk%d' % k for k in range(1, i + 1)]), denom, num), ) params.add('xk%d' % (nk - 1), value=knots[-1], vary=False) # guess of value of knot values guess_z = [] for k in knots: guess_z.append(self.y[closestIndex(self.x, k)] / self.y_max) guess_z = np.array(guess_z) # If the fixed knot solution exists, use it as a starting guess if not fixed_knots: kw['fixed_knots'] = True fixed_tag = self.get_tag(**kw) kw['fixed_knots'] = False if fixed_tag in self.fits: guess_z = self.get_zk(self.fits[fixed_tag].params) guess_ybc = self.fits[fixed_tag].params['ybc'].value # knot values for i in range(0, nk): params.add('zk%d' % i, value=guess_z[i]) # knot boundary condition value (not active currently) params.add('ybc', value=0, vary=False) # knot boundary condition location params.add('xbc', value=0, vary=False) # constraint on first knot location if first_knot: if 'dxk1' in params: params['dxk1'].min = max([first_knot, abs(min_dist)]) # params['xk1'].min = first_knot else: if 'dxk1' in params: params['dxk1'].min = min(self.x) + abs(min_dist) # params['xk1'].min = min(self.x) + abs(min_dist) return params
[docs] def model(self, params, x, lmfit_out=None): """ Return the model spline curve at x :param params: The `lmfit.Parameters` object :param x: evaluate model at x :param lmfit_out: ``lmfit.MinimizerResult`` instance to use for getting uncertainties in the curve """ if not lmfit_out: xk = self.get_xk(params) zk = self.get_zk(params) xbc = params['xbc'].value ybc = params['ybc'].value try: return self.y_max * CubicSpline(xk, zk, bc_type=((1, 0), (2, 0)))(x) except ValueError: raise else: xk, zk, ybc, xbc = self.get_param_unc(lmfit_out) def wrappable_cubic_spline(xkzk, x): xk, zk = xkzk try: return CubicSpline(xk, zk, bc_type=((1, 0), (2, 0)))(x) except ValueError: raise wrapped_cubic_spline = uncertainties.unumpy.core.wrap_array_func(wrappable_cubic_spline) return self.y_max * wrapped_cubic_spline(np.array((xk, zk)), x)
[docs]def xy_outliers(x, y, cutoff=1.2, return_valid=False): """ This function returns the index of the outlier x,y data useful to run before doing a fit of experimental data to remove outliers. This function works assuming that the first and the last samples of the x/y data set are valid data points (i.e. not outliers). :param x: x data (e.g. rho) :param y: y data (e.g. ne) :param cutoff: sensitivity of the cutoff (smaller numbers -> more sensitive [min=1]) :param return_valid: if False returns the index of the outliers, if True returns the index of the valid data :return: index of outliers or valid data depending on `return_valid` switch """ index = np.argsort(y) x = x[index] y = y[index] index = np.argsort(x) x = x[index] y = y[index] x = (x - np.min(x)) / (np.max(x) - np.min(x)) y = (y - np.min(y)) / (np.max(y) - np.min(y)) def step(k1=0, path=[], dpath=[]): if len(x) - 1 in path and 0 in path: return dist = np.sqrt((x[k1] - x) ** 2 + (y[k1] - y) ** 2) dist[path] = 1e100 i = np.argmin(dist) path.append(i) dpath.append(dist[i]) step(i, path, dpath) return val = np.zeros(np.size(x)) tension = 0.0 for k in range(len(x)): path = [] dpath = [] step(k1=k, path=path, dpath=dpath) clustering = 1 - np.array(dpath) / np.sqrt(2.0) val[path] += tension + clustering outliers = np.where(val < (np.mean(val) / np.max([1, cutoff])))[0].tolist() if 0 in outliers: outliers.remove(0) if len(x) - 1 in outliers: outliers.remove(len(x) - 1) if return_valid: return [kk for kk in range(len(x)) if kk not in outliers] else: return outliers
# GPR fitting class using gptools package
[docs]class fitGP(object): """ Inputs: -------- x: array or array of arrays Independent variable data points. y: array or array of arrays Dependent variable data points. e: array or array of arrays Uncertainty in the dependent variable data points. noise_ub: float, optional Upper bound on a multiplicative factor that will be optimized to infer the most probable systematic underestimation of uncertainties. Note that this factor is applied over the entire data profile, although diagnostic uncertainties are expected to be heteroschedastic. Default is 2 (giving significant freedom to the optimizer). random_starts: int, optional Number of random starts for the optimization of the hyperparameters of the GP. Each random starts begins sampling the posterior distribution in a different way. The optimization that gives the largest posterior probability is chosen. It is recommended to increase this value if the fit results difficult. If the regression fails, it might be necessary to vary the constraints given in the _fit method of the class GPfit2 below, which has been kept rather general for common usage. Default is 20. zero_value_outside: bool, optional Set to True if the profile to be evaluated is expected to go to zero beyond the LCFS, e.g. for electron temperature and density; note that this option does NOT force the value to be 0 at the LCFS, but only attempts to constrain the fit to stabilize to 0 well beyond rho=1. Profiles like those of omega_tor_12C6 and T_12C6 are experimentally observed not to go to 0 at the LCFS, so this option should be set to False for these. Default is True. ntanh: integer, optional Set to 2 if an internal transport barrier is expected. Default is 1 (no ITB expected). This parameter has NOT been tested recently. verbose: bool, optional If set to True, outputs messages from non-linear kernel optimization. Default is False. Returns: ---------- (object) fit: call at points at which the profile is to be evaluated, e.g. if locations are stored in an array ``xo'', call fo = fit(xo). For an example, see 7_fit.py in OMFITprofiles. """ def __init__(self, xx, yy, ey, noise_ub=2.0, random_starts=20, zero_value_outside=True, ntanh=1, verbose=False): self.xx = np.atleast_2d(xx) self.yy = np.atleast_2d(yy) self.ey = np.atleast_2d(ey) self.ntanh = ntanh self.noise_ub = noise_ub self.random_starts = random_starts self.initial_params = [ 2.0, 0.5, 0.05, 0.1, 0.5, ] # for random_starts!= 0, the initial state of the hyperparameters is not actually used. self.verbose = verbose self.zero_value_outside = zero_value_outside self.gp = [] if not self.xx.size: return for k in range(self.xx.shape[0]): if verbose: printi('fitting profile ' + str(k + 1) + ' of ' + str(self.xx.shape[0])) i = ~np.isnan(self.yy[k, :]) & ~np.isnan(self.xx[k, :]) & ~np.isnan(self.ey[k, :]) self.gp.append(self._fit(self.xx[k, i], self.yy[k, i], self.ey[k, i])) def _fit(self, xx, yy, ey): import gptools import copy norm = np.mean(abs(yy)) yy = yy / norm ey = ey / norm for kk in range(self.ntanh): hprior = ( # Set a uniform prior for sigmaf gptools.UniformJointPrior([(0, 10)]) * # Set Gamma distribution('alternative form') for the other 4 priors of the Gibbs 1D Tanh kernel gptools.GammaJointPriorAlt([1.0, 0.5, 0.0, 1.0], [0.3, 0.25, 0.1, 0.05]) ) k = gptools.GibbsKernel1dTanh( # = ====== ======================================================================= # 0 sigmaf Amplitude of the covariance function # 1 l1 Small-X saturation value of the length scale. # 2 l2 Large-X saturation value of the length scale. # 3 lw Length scale of the transition between the two length scales. # 4 x0 Location of the center of the transition between the two length scales. # = ====== ======================================================================= initial_params=self.initial_params, fixed_params=[False] * 5, hyperprior=hprior, ) if kk == 0: nk = gptools.DiagonalNoiseKernel( 1, n=0, initial_noise=np.mean(ey) * self.noise_ub, fixed_noise=False, noise_bound=(min(ey), max(ey) * self.noise_ub) ) # , enforce_bounds=True) # printd "noise_ub= [", min(ey), ",",max(ey)*self.noise_ub,"]" ke = k else: # the following is from Orso's initial implementation. Not tested on ITBs! nk = gptools.DiagonalNoiseKernel(1, n=0, initial_noise=gp.noise_k.params[0], fixed_noise=False) k1 = gptools.GibbsKernel1dTanh(initial_params=copy.deepcopy(gp.k.params[-5:]), fixed_params=[False] * 5) ke += k1 # Create and populate GP: gp = gptools.GaussianProcess(ke, noise_k=nk) gp.add_data(xx, yy, err_y=ey) gp.add_data(0, 0, n=1, err_y=0.0) # zero derivative on axis # ================= Add constraints ==================== # Impose constraints on values in the SOL if self.zero_value_outside: gp.add_data(max([1.1, max(xx)]) + 0.1, 0, n=0, err_y=0) # zero beyond edge gp.add_data(max([1.1, max(xx)]) + 0.2, 0, n=0, err_y=0) # zero beyond edge # Impose constraints on derivatives in the SOL # grad=gradient(yy,xx) # rough estimate of gradients -- this seems broken... grad = np.gradient(yy, xx) # alternative rough estimte of gradients gp.add_data(max([1.1, max(xx)]), 0, n=1, err_y=abs(max(grad) * max(ey / yy))) # added uncertainty in derivative # printd "Added {:.0f}% of max(gradient) in max(grad) on GPR derivative constraints outside of the LCFS".format(max(ey/yy)*100) gp.add_data(max([1.1, max(xx)]) + 0.1, 0, n=1) # zero derivative far beyond at edge for kk1 in range(1, 3): if self.zero_value_outside: gp.add_data(max([1.1, max(xx)]) + 0.1 * kk1, 0, n=0, err_y=np.mean(ey)) # zero at edge gp.add_data(max([1.1, max(xx)]) + 0.1 * kk1, 0, n=1) # zero derivative beyond the edge # In shots where data is missing at the edge, attempt forcing outer stabilization if max(xx) < 0.8: print("Missing data close to the edge. Fit at rho>0.8 might be rather wild.") if self.zero_value_outside: if max(ey / yy) < 0.1: gp.add_data(1.0, 0, n=0, err_y=max(ey) * 2) else: gp.add_data(1.0, 0, n=0, err_y=max(ey)) # pad SOL with zero-derivative constraints for i in np.arange(5): gp.add_data(1.05 + 0.02 * i, 0, n=1) # exact derivative=0 # ============ Optimization of hyperparameters =========== print('Number of random starts: ', self.random_starts) if kk == 0: # Optimize hyperparameters: gp.optimize_hyperparameters( method='SLSQP', verbose=self.verbose, num_proc=None, # if 0, optimization with 1 processor in series; if None, use all available processors random_starts=self.random_starts, opt_kwargs={'bounds': (ke + nk).free_param_bounds}, ) else: # Optimize hyperparameters: gp.optimize_hyperparameters( method='SLSQP', verbose=self.verbose, num_proc=None, random_starts=self.random_starts, opt_kwargs={'bounds': ke.free_param_bounds}, ) gp.norm = norm self.inferred_params = copy.deepcopy(gp.k.params) self.final_noise = copy.deepcopy(gp.noise_k.params) print('------> self.inferred_params: ', self.inferred_params) print('-------> self.final_noise: ', self.final_noise) print('-------> np.mean(ey) =', np.mean(ey)) print('-------> self.final_noise/ np.mean(ey) =', self.final_noise / np.mean(ey)) return gp def __call__(self, Xstar, n=0, use_MCMC=False, profile=None): """ Evaluate the fit at specific locations. Inputs: ---------- Xstar: array Independent variable values at which we wish to evaluate the fit. n: int, optional Order of derivative to evaluate. Default is 0 (data profile) use_MCMC: bool, optional Set whether MCMC sampling and a fully-Bayesian estimate for a fitting should be used. This is recommended for accurate computations of gradients and uncertainties. Profile: int, optional Profile to evaluate if more than one has been computed and include in the gp object. To call the nth profile, set profile=n. If None, it will return an array of arrays. Outputs: ---------- Value and error of the fit evaluated at the Xstar points """ if profile is None: profile = list(range(len(self.gp))) print("len(profile) = ", len(profile)) M = [] D = [] run_on_engaging = False for k in np.atleast_1d(profile): if n > 1: print('Trying to evaluate higher derivatives than 1. Warning: *NOT* TESTED!') else: print('Proceeding with the evaluation of {:}-derivative'.format(n)) predict_data = {'Xstar': Xstar, 'n': n, 'gp': self.gp[k]} if use_MCMC: print('*************** Using MCMC for predictions ********************') if run_on_engaging: # set up to run on engaging. This is only preliminary! tmp_python_script = ''' def predict_MCMC(Xstar, n, gp): """ Helper function to call gptool's predict method with MCMC """ out=gp.predict_MCMC(Xstar,n=n,full_output=True, noise=True, return_std=True, full_MCMC=True) return out''' out = OMFITx.remote_python( module_root=None, python_script=tmp_python_script, target_function=predict_MCMC, namespace=predict_data, remotedir=OMFITtmpDir, workdir=OMFITtmpDir, server=OMFIT['MainSettings']['SERVER']['engaging']['server'], tunnel=OMFIT['MainSettings']['SERVER']['engaging']['tunnel'], ) else: out = OMFITx.remote_python(root, python_script=tmp_python_script, target_function=predict_MCMC, namespace=predict_data) else: out = self.gp[k].predict(Xstar, n=n, full_output=True, noise=True) m = out['mean'] # covd=out['cov'] has size len(Xstar) x len(Xstar) std = out['std'] # equivalent to np.squeeze(np.sqrt(diagonal(covd))) # Multiply the outputs by the norm, since data were divided by this before fitting m = m * self.gp[k].norm d = std * self.gp[k].norm M.append(m) D.append(d) M = np.squeeze(M) D = np.squeeze(D) return unumpy.uarray(M, D)
[docs] def plot(self, profile=None, ngauss=1): """ Function to conveniently plot the input data and the result of the fit. Inputs: ----------- Profile: int, optional Profile to evaluate if more than one has been computed and included in the gp object. To call the nth profile, set profile=n. If None, it will return an array of arrays. ngauss: int, optional Number of shaded standard deviations Outputs: ----------- None """ from matplotlib import pyplot pyplot.figure() if profile is None: profile = list(range(len(self.gp))) Xstar = np.linspace(0, np.nanmin([1.2, np.nanmax(self.xx + 0.1)]), 1000) for k in profile: ua = self(Xstar, 0, k) m, d = nominal_values(ua), std_devs(ua) pyplot.errorbar(self.xx[k, :], self.yy[k, :], self.ey[k, :], color='b', linestyle='') pyplot.plot(Xstar, m, linewidth=2, color='g') for kk in range(1, ngauss + 1): pyplot.fill_between(Xstar, m - d * kk, m + d * kk, alpha=0.25, facecolor='g') pyplot.axvline(0, color='k') pyplot.axhline(0, color='k')
[docs]class fitCH(object): """ Fitting of kinetic profiles by Chebyshev polynomials Adapted from MATLAB function by A. Marinoni <marinoni@fusion.gat.com> :param x: radial coordinate :param y: data :param yerr: data uncertainties :param m: Polynomial degree """ def __init__(self, x, y, yerr, m=18): # Beginning of the actual routine m0 = [] # polynomial coefficients to be discarded (can be empty, so keep all coefficients) n = len(x) ##trick/cheat: the fit is better data are mirrored as we avoid cusp and salient points on # axis spuriously generated by chebycheff nodes packed at the end of the # interval # x = [-x(end:-1:1);x] # y = [y(end:-1:1);y] # yerr = [yerr(end:-1:1);yerr] x = np.hstack((np.flipud(-x), x)) y = np.hstack((np.flipud(y), y)) yerr = np.hstack((np.flipud(yerr), yerr)) # errorbar(x,y,yerr) # axvline(0,ls='--',color='k') ## Generate the z variable as a mapping of input x data range into the interval [-1,1] z = ((x - np.min(x)) - (np.max(x) - x)) / (np.max(x) - np.min(x)) jacob = 2 / (np.max(x) - np.min(x)) ##Defining data variables like in manuscript b = y / yerr ##Building the Vandermonde matrix A_d = np.zeros((len(z), m + 1)) A_d[:, 1] = np.ones((1, len(z))) if m > 1: A_d[:, 2] = z if m > 2: for k in range(3, m + 1): A_d[:, k] = 2 * z * A_d[:, k - 1] - A_d[:, k - 2] ## recurrence relation A_d = np.dot(np.diag(1 / yerr), A_d) # A_d = A_d(:,~ismember([1:m+1],m0)) a = np.linalg.lstsq(A_d, b)[0] ##Computing unnormalized chi2 and quantities that might be of interest yfit_data = np.dot(np.dot(np.diag(yerr), A_d), a) # Fit on the data radial points res = y - yfit_data # residual db = res / yerr chisq = np.sum(db**2) deg3dom = max(0, len(y) - (m + 2 - len(m0))) normres = norm(res) C = np.linalg.pinv(np.dot(A_d.T, A_d)) ##De-mirroring y = y[n + 1 :] x = x[n + 1 :] yerr = yerr[n + 1 :] z = z[n + 1 :] ##Computing uncertainties on the coefficients (this is not necessary for ##the fit and can be commented out) da = np.dot(np.dot(C, A_d.T), db) self.jacob = jacob self.C = C self.a = a self.x = x self.y = y self.yerr = yerr self.m = m def __call__(self, rho): """ Calculate fit and uncertainties :param rho: rho grid of the fit :return: value and error of the fit evaluated at the rho points """ jacob = self.jacob C = self.C a = self.a m = self.m ##Fitted profiles on new cordinate and remapping it with the same jacobian zz = np.dot(rho, jacob) ##Computing the A matrix on the new radial cordinate and its gradient A = np.zeros((len(zz), m + 1)) W = np.zeros((len(zz), m + 1)) A[:, 1] = np.ones((1, len(zz))) if m > 1: A[:, 2] = zz W[:, 2] = np.ones(len(zz)) * jacob if m > 2: for k in range(3, m + 1): A[:, k] = 2 * zz * A[:, k - 1] - A[:, k - 2] ## recurrence relation W[:, k] = gradient(A[:, k], rho) ##Computing along rho instead of along zz and then multiply by jacobian # A = A(:,~ismember([1:m+1],m0)) # W = W(:,~ismember([1:m+1],m0)) yfit = np.dot(A, a) yfit_g = np.dot(W, a) ##Scale length yfit_sl = -yfit_g / yfit # Computing covariance matrices between quantities computed at points (x1,x2) cov_gy = np.dot(np.dot(A, C), W.T) var_yy = np.dot(np.dot(A, C), A.T) var_gg = np.dot(np.dot(W, C), W.T) ##Computing covariance vectors at same points (x,x), i.e. taking the diagonal of the matrix sig2p = np.diag(var_yy) sig2g = np.diag(var_gg) cov = np.diag(cov_gy) ##Sigmas dyfit = np.sqrt(sig2p) dyfit_g = np.sqrt(sig2g) ##Formulas to estimate uncertainties on scale length...do not implement as ##is still generally wrong inside rho=0.1-0.2. The simplified equation ##seems to be better?! dump1 = 6 * cov**2 / yfit**4 - 4 * yfit_g * cov / yfit**3 - 24 * yfit_g * sig2p * cov / yfit_g**5 dump2 = sig2g + yfit_g**2 dump3 = sig2p / yfit**4 + 8 * sig2p**2 / yfit**6 + (1 / yfit + sig2p / yfit**3 + 3 * sig2p**2 / yfit**5) ** 2 dump4 = ( -cov / yfit**2 - 3 * cov * sig2p / yfit**4 + yfit_g / yfit + yfit_g / yfit**3 * sig2p + 3 * yfit_g * sig2p**2 / yfit**5 ) ** 2 sig2sl = dump1 + dump2 * dump3 - dump4 # Eq. 17 sig2sl_sim = (yfit_g / yfit) ** 2 * (sig2g / yfit_g**2 + sig2p / yfit**2) # Eq 18 dyfit_sl_an = np.sqrt(sig2sl) # Analythical uncertainty dyfit_sl_an_sim = np.sqrt(sig2sl_sim) # Simplified analythical uncertainty self.rho = rho self.yfit = yfit self.dyfit = dyfit return yfit, dyfit
[docs] def plot(self): """ Plotting of the raw data and fit with uncertainties :return: None """ from matplotlib import pyplot pyplot.errorbar(self.x, self.y, self.yerr, color='b', linestyle='', label="Raw data") pyplot.plot(self.rho, self.yfit, 'g', label="fit") pyplot.fill_between(self.rho, self.yfit - self.dyfit, self.yfit + self.dyfit, alpha=0.25, facecolor='b') pyplot.legend(loc=0)
[docs]class fitLG(object): """ This class provides linear fitting of experimental profiles, with gaussian bluring for smoothing. This procedure was inspired by discussions with David Eldon about the `Weighted Average of Interpolations to a Common base` (WAIC) technique that he describes in his thesis. However the implementation here is quite a bit different, in that instead of using a weighted average the median profile is taken, which allows for robust rejection of outliers. In this implementation the profiles smoothing is obtained by radially perturbing the measurements based on the farthest distance to their neighboring point. :param x: x values of the experimental data :param y: experimental data :param e: uncertainties of the experimental data :param d: data time identifier :param ng: number of gaussian instances :param sm: smoothing :param nmax: take no more than nmax data points """ def __init__(self, x, y, e, d, ng=100, sm=1, nmax=None): if nmax: i = pylab.randint(0, len(x), np.min([nmax, len(x)])) x = x[i] y = y[i] e = e[i] d = d[i] i = np.argsort(x) self.x = x[i] self.y = y[i] self.e = e[i] self.d = d[i] self.ux = np.unique(x) self.dx = np.max([np.hstack((0, np.abs(np.diff(self.ux)))), np.hstack((0, np.abs(np.diff(self.ux))))], 0) self.ng = ng self.sm = sm self._doPlot = False def __call__(self, x0): x = self.x y = self.y e = self.e d = self.d ng = self.ng X = [] Y = [] E = [] for k in np.unique(d): i = np.where(d == k)[0] if not len(i) or not len(x[i]): continue X.append(x[i]) Y.append(y[i]) E.append(e[i]) if self._doPlot: pyplot.ioff() if ng > 0: dx = interp1e(self.ux, self.dx)(x[i]) R = np.reshape(np.random.randn(ng * i.size), (ng, i.size)) X.extend(x[i][np.newaxis, :] + R * dx[np.newaxis, :] * self.sm) R = np.reshape(np.random.randn(ng * i.size), (ng, i.size)) Y.extend(y[i][np.newaxis, :] + R * e[i][np.newaxis, :]) E.extend(np.tile(e[i], (ng, 1))) if self._doPlot: for k in range(1, ng + 1): pyplot.errorbar(X[-k], Y[-k], E[-k], ls='', color='g', alpha=1.0 / ng) if self._doPlot: pyplot.ion() Y0 = np.zeros((len(X), len(x0))) Y0[:] = np.nan E0 = np.zeros((len(X), len(x0))) E0[:] = np.nan for k in range(len(X)): inside = np.where((x0 >= min(X[k])) & (x0 <= max(X[k])))[0] if len(inside) > 1: Y0[k, inside] = interp1e(X[k], Y[k])(x0[inside]) E0[k, inside] = interp1e(X[k], E[k])(x0[inside]) y0 = np.nanmedian(Y0, 0) e0 = np.nanmedian(E0, 0) # pyplot.plot(x0[i0],y0[i0],'ob') ok = np.where(np.isnan(y0) == 0)[0] # handle the core i00 = np.where(x0 == 0)[0] i01 = np.argmin(x0[ok]) y0[i00] = y0[ok][i01] # handle the edge i00 = np.where(x0 == 1)[0] i01 = np.argmin(1 - x0[ok]) y0[i00] = y0[ok][i01] # interpolate between ok = np.where(np.isnan(y0) == 0)[0] no = np.where(np.isnan(y0) == 1)[0] y0[no] = interp1e(x0[ok], y0[ok])(x0[no]) e0[no] = interp1e(x0[ok], e0[ok])(x0[no]) return y0, e0
[docs] def plot(self, variations=True): x = self.x y = self.y e = self.e x0 = np.linspace(0, max(x), 201) try: self._doPlot = variations y0, e0 = self(x0) finally: self._doPlot = False pyplot.errorbar(x, y, e, color='r', ls='') pyplot.errorbar(x0, y0, e0, color='k')
[docs]@_available_to_user_math def mtanh(c, x, y=None, e=1.0, a2_plus_a3=None): """ Modified tanh function >> if len(c)==6: >> y=a0*(a1+tanh((a2-x)/a3))+a4*(a5-x)*(x<a5) a0: scale a1: offset in y a2: shift in x a3: width a4: slope of linear a5: when linear part takes off >> if len(c)==5: >> y=a0*(a1+tanh((a2-x)/a3))+a4*(a2-a3-x)*(x<a2-a3) a0: scale a1: offset in y a2: shift in x a3: width a4: slope of linear :param c: array of coefficients [a0,a1,a2,a3,a4,(a5)] :param x: x data to fit :param y: y data to fit :param e: y error of the data to fit :param a2_plus_a3: force sum of a2 and a3 to be some value NOTE: In this case `a3` should be removed from input vector `c` :return: cost, or evaluates y if y==None """ if (len(c) == 6 and a2_plus_a3 is None) or (len(c) == 5 and a2_plus_a3 is not None): if a2_plus_a3 is not None: a0, a1, a2, a4, a5 = c a3 = a2_plus_a3 - a2 else: a0, a1, a2, a3, a4, a5 = c yt = a0 * (a1 + np.tanh((a2 - x) / a3)) ytm = yt + a4 * (a5 - x) * (x < a5) if y is not None: cost = np.sqrt(np.mean(((y - ytm) / e) ** 2)) cost *= 1 + abs(a2 - a5) / a3 / 10.0 return cost else: return ytm elif (len(c) == 5 and a2_plus_a3 is None) or (len(c) == 4 and a2_plus_a3 is not None): if a2_plus_a3 is not None: a0, a1, a2, a4 = c a3 = a2_plus_a3 - a2 else: a0, a1, a2, a3, a4 = c yt = a0 * (a1 + np.tanh((a2 - x) / a3)) aa4 = a2 + a4 y4 = a0 * (a1 + np.tanh((a2 - aa4) / a3)) d = -a0 / np.cosh((a2 - aa4) / a3) ** 2 / a3 ytm = yt * (x > aa4) + (y4 + d * (x - aa4)) * (x <= aa4) if y is not None: cost = np.sqrt(np.mean(((y - ytm) / e) ** 2)) return cost else: return ytm
[docs]def mtanh_gauss_model(x, bheight, bsol, bpos, bwidth, bslope, aheight, awidth, aexp): """ Modified hyperbolic tangent function for fitting pedestal with gaussian function for the fitting of the core. Stefanikova, E., et al., RewSciInst, 87 (11), Nov 2016 This function is design to fit H-mode density and temeprature profiles as a function of psi_n. """ # To be sure psi > 0 x = abs(x) mtanh = lambda x, bslope: (1 + bslope * x) * (np.exp(x) - np.exp(-x)) / (np.exp(x) + np.exp(-x)) fped = lambda x, bheight, bsol, bpos, bwidth, bslope: (bheight - bsol) / 2.0 * (mtanh((bpos - x) / (2 * bwidth), bslope) + 1) + bsol ffull = lambda x, bheight, bsol, bpos, bwidth, bslope, aheight, awidth, aexp: fped(x, bheight, bsol, bwidth, bslope, bpos) + ( aheight - fped(x, bheight, bsol, bwidth, bslope, bpos) ) * np.exp(-((x / awidth) ** aexp)) mtanh_un = lambda x, bslope: (1 + bslope * x) * (unumpy.exp(x) - unumpy.exp(-x)) / (unumpy.exp(x) + unumpy.exp(-x)) fped_un = ( lambda x, bheight, bsol, bpos, bwidth, bslope: (bheight - bsol) / 2.0 * (mtanh_un((bpos - x) / (2 * bwidth), bslope) + 1) + bsol ) ffull_un = lambda x, bheight, bsol, bpos, bwidth, bslope, aheight, awidth, aexp: fped_un(x, bheight, bsol, bwidth, bslope, bpos) + ( aheight - fped_un(x, bheight, bsol, bwidth, bslope, bpos) ) * unumpy.exp(-((x / awidth) ** aexp)) if np.any(is_uncertain([bheight, bsol, bpos, bwidth, bslope, aheight, awidth, aexp])): return ffull_un(x, bheight, bsol, bpos, bwidth, bslope, aheight, awidth, aexp) return ffull(x, bheight, bsol, bpos, bwidth, bslope, aheight, awidth, aexp)
[docs]def tanh_model(x, a1, a2, a3, c): if is_uncertain([a1, a2, a3, c]): return a1 * unumpy.tanh((a2 - x) / a3) + c return a1 * np.tanh((a2 - x) / a3) + c
[docs]def tanh_poly_model(x, a1, a2, a3, c, p2, p3, p4): if is_uncertain([a1, a2, a3, c, p2, p3, p4]): return tanh_model(x, a1, a2, a3, c) + a1 * (-1 * unumpy.tanh(a2 / a3) ** 2 + 1) / a3 * x + p2 * x**2 + p3 * x**3 + p4 * x**4 return tanh_model(x, a1, a2, a3, c) + a1 * (-1 * np.tanh(a2 / a3) ** 2 + 1) / a3 * x + p2 * x**2 + p3 * x**3 + p4 * x**4
[docs]class fit_base(object): def __init__(self, x, y, yerr): valid_ind = ~np.isnan(x) & ~np.isnan(y) & ~np.isnan(yerr) & (yerr > 0) self.x = x[valid_ind] self.ymax = max(abs(y[valid_ind])) self.y = y[valid_ind] / self.ymax self.yerr = yerr[valid_ind] / self.ymax
[docs] def get_param_unc(self, lmfit_out): result = {} for k in lmfit_out.params: result[k] = lmfit_out.params[k].value if lmfit_out.errorbars: corr_params = uncertainties.correlated_values([lmfit_out.params[k].value for k in lmfit_out.var_names], lmfit_out.covar) for vi, var in enumerate(lmfit_out.var_names): result[var] = corr_params[vi] return result
@property def uparams(self): return self.get_param_unc(self.best_fit) @property def params(self): tmp = self.get_param_unc(self.best_fit) for item in tmp: tmp[item] = nominal_values(tmp[item]) return tmp def __call__(self, x): if not hasattr(self, 'best_fit'): return x * np.nan try: return self.best_fit.eval(x=x, **self.get_param_unc(self.best_fit)) * self.ymax except Exception: return self.best_fit.eval(x=x) * self.ymax
[docs] def valid_fit(self, model_out): y = model_out.eval(x=self.x, **self.get_param_unc(model_out)) return np.all(abs(nominal_values(y)) > std_devs(y))
[docs]class fit_mtanh_gauss(fit_base): def __init__(self, x, y, yerr, **kw): fit_base.__init__(self, x, y, yerr) x = self.x y = self.y yerr = self.yerr kw.setdefault('verbose', False) kw.setdefault('bheight', max(y) / 5.0) kw.setdefault('bsol', max(y) / 60.0) kw.setdefault('bpos', 1.0) kw.setdefault('bwidth', 1.02) kw.setdefault('bslope', max(y) / 500.0) kw.setdefault('aheight', max(y)) kw.setdefault('awidth', 0.5) kw.setdefault('aexp', 3.0) self.best_fit = lmfit.Model(mtanh_gauss_model).fit(y, x=x, weights=1 / yerr, **kw)
[docs]class fit_tanh(fit_base): def __init__(self, x, y, yerr, **kw): fit_base.__init__(self, x, y, yerr) y = self.y yerr = self.yerr kw.setdefault('verbose', False) kw.setdefault('a3', 0.05) kw.setdefault('a2', 0.95) kw.setdefault('c', y[np.argmin(abs(y))]) kw.setdefault('a1', y[np.argmax(abs(y))] - y[np.argmin(abs(y))]) self.best_fit = lmfit.Model(tanh_model).fit(y, x=x, weights=1 / yerr, **kw)
[docs]class fit_tanh_poly(fit_base): def __init__(self, x, y, yerr, **kw): fit_base.__init__(self, x, y, yerr) y = self.y yerr = self.yerr kw.setdefault('a3', 0.05) kw.setdefault('a2', 0.95) kw.setdefault('c', min(abs(y))) kw.setdefault('a1', np.mean(y)) kw.setdefault('p2', 0.5 * np.mean(y)) kw.setdefault('p3', 1 * np.mean(y)) kw.setdefault('p4', 1 * np.mean(y)) kw.setdefault('verbose', False) self.best_fit4 = lmfit.Model(tanh_poly_model).fit(y, x=x, weights=1 / yerr, **kw) params = copy.deepcopy(self.best_fit4.params) params['p4'].vary = False params['p4'].value = 0 self.best_fit3 = lmfit.Model(tanh_poly_model).fit(y, x=x, weights=1 / yerr, params=params) params = copy.deepcopy(self.best_fit3.params) params['p3'].vary = False params['p3'].value = 0 self.best_fit2 = lmfit.Model(tanh_poly_model).fit(y, x=x, weights=1 / yerr, params=params) best_redchi = 1e30 for p in range(2, 5): bf = getattr(self, 'best_fit%d' % p) if bf.redchi < best_redchi: if self.valid_fit(bf): self.best_fit = bf best_redchi = bf.redchi
[docs]def tanh_through_2_points(x0, y0, x=None): """ Find tanh passing through two points x0=[x_0,x_1] and y0=[y_0,y_1] y=a1*tanh((a2-x)/a3)+c :param x0: iterable of two values (x coords) :param y0: iterable of two values (y coords) :param x: array of coordinates where to evaluate tanh. If None function will return fit parameters :return: if `x` is None then return tanh coefficients (a1,a2,a3,c), otherwise returns y(x) points """ y = np.array(y0) - y0[1] a1 = y[0] / np.tanh(1) a2 = x0[1] a3 = x0[1] - x0[0] c = y0[1] if x is None: return a1, a2, a3, c else: return a1 * np.tanh((a2 - np.array(x)) / a3) + c
[docs]def toq_profiles(psin, width, core, ped, edge, expin, expout): """ This is a direct fortran->python translation from the TOQ source :param psin: psi grid :param width: width of tanh :param core: core value :param ped: pedestal value :param edge: separatrix value :param expin: inner exponent (EPED1: 1.1 for n_e, and 1.2 for T_e) :param expout: outner exponent (EPED1: 1.1 for n_e, and 1.4 for T_e) :return: profile """ # psi_N at tanh symmetry point xphalf = 1.0 - width # pconst=? pconst = 1.0 - np.tanh((1.0 - xphalf) / width) # normalization so that n_ped=ped a_n = 2.0 * (ped - edge) / (1.0 + np.tanh(1.0) - pconst) # psi_N at pedestal xped = xphalf - width # core density coretanh = 0.5 * a_n * (1.0 - np.tanh(-xphalf / width) - pconst) + edge data = np.zeros(psin.shape) # for all flux surfaces for i in range(len(psin)): # psi at flux surface xpsi = psin[i] # density at flux surface data[i] = _data = 0.5 * a_n * (1.0 - np.tanh((xpsi - xphalf) / width) - pconst) + edge # xtoped is proportional to psi, but normed to equal 1 at top of ped xtoped = xpsi / xped # if core is set, then add polynomial piece to get desired core value (add only inside pedestal) if core > 0.0 and xpsi < xped: # density at flux surface data[i] = _data = _data + (core - coretanh) * (1.0 - xtoped**expin) ** expout return data
[docs]@_available_to_user_math def mtanh_polyexp(x, params): """ given lmfit Parameters object, generate mtanh_poly """ nc = params['pow_c'].value + 1 # always a fixed value core = np.zeros(nc) keys = list(params.keys()) for key in keys: if 'core' in key: core[int(key.split('core')[1])] = params[key].value core_poly = np.zeros(len(x)) + core[0] for i in range(1, len(core)): core_poly += core[i] * x ** (i) xsym = params['xsym'].value blend_width = params['blend_width'].value edge_width = params['edge_width'].value offset = params['offset'].value A = params['A'].value edge_func = offset + A * np.exp(-(x - xsym) / edge_width) z = (xsym - x) / blend_width y_fit = (core_poly * np.exp(z) + edge_func * np.exp(-z)) / (np.exp(z) + np.exp(-z)) return y_fit
[docs]def mtanh_polyexp_res(params, x_data, y_data, y_err): y_model = mtanh_polyexp(x_data, params) res = (y_data - y_model) / y_err return res
[docs]@_available_to_user_math class GASpline(object): """ Python based replacement for GAprofiles IDL spline routine "spl_mod" * Code accepts irregularly spaced (x,y,e) data and returns fit on regularly spaced grid * Numerical spline procedure based on Numerical Recipes Sec. 3.3 equations 3.3.2, 3.3.4, 3.3.5, 3.3.7 * Auto-knotting uses LMFIT minimization with chosen scheme * Boundary conditions enforced with matrix elements The logic of this implementation is as follows: * The defualt is to auto-knot, which uses least-squares minimization to choose the knot locations. * If auto-knotting, then there are options of the guess of the knot locations or option to bias the knots. Else manual knots are used and LMFIT is not called * If the knot guess is present, then it is used else there are two options. Else we use the knot guess. * If the knot bias is None or >-1 then the knot guess is uniformly distributed using linspace. Else we use linspace with a knot bias. For the edge data, the logic is as follows: * We can have auto/manual knots, free/fixed boundary value, and fit/ignore edge data. * When we fit edge data, then that edge data places a constraint on boundary value. When monte-carlo is used, the return value is an unumpy uncertainties array that contains the mean and standard-deviation of the monte-carlo trials. >>> x = np.linspace(0,1,21) >>> y = np.sin(2*np.pi*x) >>> e = np.repeat(0.1,len(x)) >>> xo = np.linspace(0,1,101) >>> fit_obj = GASpline(x,y,e,xo,numknot=4,doPlot=True,monte=20) >>> uerrorbar(x,uarray(y,e)) >>> pyplot.plot(xo,nominal_values(fit_obj(xo))) >>> uband(xo,fit_obj(xo)) """ def __init__( self, xdata, ydata, ydataerr, xufit, y0=None, y0p=0.0, y1=None, y1p=None, sol=True, solc=False, numknot=3, autoknot=True, knotloc=None, knotbias=0, sepval_min=None, sepval_max=None, scrapewidth_min=None, scrapewidth_max=None, method='leastsq', maxiter=2000, monte=0, verbose=False, doPlot=False, ): # On initialization create the parameters and set up all options for the fit. # First there is the data # All data self.xdata = xdata self.ydata = ydata self.ydataerr = ydataerr self.ydatawgt = 1.0 / self.ydataerr # Core data edgemask = xdata <= 1.0 self.xdatacore = xdata[edgemask] self.ydatacore = ydata[edgemask] self.ydatacoreerr = ydataerr[edgemask] self.ydatacorewgt = 1.0 / self.ydatacoreerr self.edgemask = edgemask # Edge data coremask = xdata > 1.0 self.xdataedge = xdata[coremask] self.ydataedge = ydata[coremask] self.ydataedgeerr = ydataerr[coremask] self.ydataedgewgt = 1.0 / self.ydataedgeerr self.coremask = coremask # Then each keyword is stored self.numknot = numknot self.bcs = {'y0': y0, 'y0p': y0p, 'y1': y1, 'y1p': y1p} # If there are edge points, then we can fit them, else disable. if np.sum(coremask) > 0: self.sol = sol else: self.sol = False self.solc = solc self.sepval_min = sepval_min self.sepval_max = sepval_max self.scrapewidth = 0.0 self.scrapewidth_min = scrapewidth_min self.scrapewidth_max = scrapewidth_max self.autoknot = autoknot self.knotbias = knotbias self.method = method self.maxiter = maxiter self.monte = monte self.verbose = verbose #### # Now the output #### # knot locations for basic fit, or average of monte-carlo trials self.knotloc = knotloc # The knot locations of the monte-carlo trials self.knotlocs = None # The knot locations for each iteration self.iknotloc = None # The knot values, or the mean of the knot values from monte-carlo self.knotval = None # The knot values of the monte-carlo trials self.knotvals = None # The data used in the fits if self.sol: self.xdatafit = self.xdata self.ydatafit = self.ydata self.ydatafiterr = self.ydataerr self.ydatafitwgt = self.ydatawgt else: self.xdatafit = self.xdatacore self.ydatafit = self.ydatacore self.ydatafiterr = self.ydatacoreerr self.ydatafitwgt = self.ydatacorewgt # The data used in the monte-carlo trials self.ydatafits = None # Fit on data axis or mean of monte-carlo trials self.yfit = None # The fits from the monte-carlo trials on data axis self.yfits = None # The uncertainty in the fit from the stddev of the monte-carlo trials self.yfiterr = np.zeros(len(self.xdata)) # Uniform / user x axis self.xufit = xufit # Fit on user axis self.yufit = None self.yufits = None self.yufiterr = np.zeros(len(self.xdata)) # First derivative or mean of monte-carlo trials on user axis self.ypufit = None self.ypufits = None self.ypufiterr = np.zeros(len(self.xdata)) # Fit quantities consistent with LMFIT # Number of observations self.ndata = None # Number of free parameters self.nvarys = None # Degrees of freedom = number of observations - number of free parameters self.nfree = None self.rchi2 = None self.irchi2 = None self.rchi2s = None # Do the fit self.do_fit() if monte is not None and monte > 0: knotlocs = np.zeros((self.numknot, monte)) knotvals = np.zeros((self.numknot, monte)) ydatafits = np.zeros((len(self.xdatafit), monte)) yfits = np.zeros((len(self.xdatafit), monte)) yufits = np.zeros((len(self.xufit), monte)) ypufits = np.zeros((len(self.xufit), monte)) rchi2s = np.zeros(monte) for im in range(monte): if self.sol: self.ydatafit = self.ydata + np.random.normal(0.0, 1.0, len(self.xdata)) * self.ydataerr else: self.ydatafit = self.ydatacore + np.random.normal(0.0, 1.0, len(self.xdatacore)) * self.ydatacoreerr self.knotloc = knotloc self.do_fit() knotlocs[:, im] = self.knotloc knotvals[:, im] = self.knotval ydatafits[:, im] = self.ydatafit yfits[:, im] = self.yfit yufits[:, im] = self.yufit ypufits[:, im] = self.ypufit rchi2s[im] = self.rchi2 # Replace the data if self.sol: self.ydatafit = self.ydata else: self.ydatafit = self.ydatacore self.ydatafits = ydatafits # Replace the knot locations for the nominal unperturbed data self.knotlocs = knotlocs self.knotloc = np.mean(knotlocs, axis=1) self.knotval = np.mean(knotvals, axis=1) self.rchi2 = np.mean(rchi2s) # Store the fit as the mean of the monte-carlo trials self.yfits = yfits self.yfit = np.mean(yfits, axis=1) self.yufits = yufits self.yufit = np.mean(yufits, axis=1) self.ypufits = ypufits self.ypufit = np.mean(ypufits, axis=1) # Store the uncertianty as the standard deviation of the monte-carlo trials self.yfiterr = np.std(yfits, axis=1) self.yufiterr = np.std(yufits, axis=1) self.ypufiterr = np.std(ypufits, axis=1) # Do the plot if doPlot: self.plot()
[docs] def design_matrix(self, xcore, knotloc, bcs): """Design Matrix for cubic spline interpolation Numerical Recipes Sec. 3.3 :param xcore: rho values for data on [0,1] :param knotloc: knot locations on [0,1] :param bcs: Dictionary of boundary conditions :return: design matrix for cubic interpolating spline """ nx = len(xcore) numknot = len(knotloc) a = np.zeros((nx, numknot)) b = np.zeros((nx, numknot)) gee = np.zeros((numknot, numknot)) h = np.zeros((numknot, numknot)) geeinvh = np.zeros((numknot, numknot)) rowa = np.zeros(numknot) rowb = np.zeros(numknot) c = np.zeros(numknot) fx = np.zeros(numknot) for i in range(len(xcore)): rowa[:] = 0.0 rowb[:] = 0.0 bt = np.where((knotloc >= xcore[i]) & (knotloc > 0.0))[0] j = bt[0] # Eq. 3.3.2 rowa[j - 1] = (knotloc[j] - xcore[i]) / (knotloc[j] - knotloc[j - 1]) rowa[j] = 1.0 - rowa[j - 1] # Eq. 3.3.4 rowb[j - 1] = (rowa[j - 1] ** 3 - rowa[j - 1]) * (knotloc[j] - knotloc[j - 1]) ** 2 / 6.0 rowb[j] = (rowa[j] ** 3 - rowa[j]) * (knotloc[j] - knotloc[j - 1]) ** 2 / 6.0 a[i, :] = rowa b[i, :] = rowb # Apply B.C. for y(0) if bcs['y0'] is not None: fx[0] = bcs['y0'] # Apply B.C. for y'(0) if bcs['y0p'] is not None: gee[0, 0] = (knotloc[1] - knotloc[0]) / 3.0 gee[0, 1] = gee[0, 0] / 2.0 h[0, 0] = -1.0 / (knotloc[1] - knotloc[0]) h[0, 1] = -1.0 * h[0, 0] c[0] = -1.0 * bcs['y0p'] else: gee[0, 0] = 1.0 # Apply B.C. for y(1) if bcs['y1'] is not None: fx[numknot - 1] = bcs['y1'] # Apply B.C. for y'(1) if bcs['y1p'] is not None: gee[numknot - 1, numknot - 1] = -1.0 * (knotloc[numknot - 1] - knotloc[numknot - 2]) / 3.0 gee[numknot - 1, numknot - 2] = gee[numknot - 1, numknot - 1] / 2.0 h[numknot - 1, numknot - 1] = 1.0 / (knotloc[numknot - 1] - knotloc[numknot - 2]) h[numknot - 1, numknot - 2] = -1.0 * h[numknot - 1, numknot - 1] c[numknot - 1] = -1.0 * bcs['y1p'] else: gee[numknot - 1, numknot - 1] = 1.0 # Internal knots # Eq. 3.3.7 for i in range(1, numknot - 1, 1): gee[i, i - 1] = (knotloc[i] - knotloc[i - 1]) / 6.0 gee[i, i] = (knotloc[i + 1] - knotloc[i - 1]) / 3.0 gee[i, i + 1] = (knotloc[i + 1] - knotloc[i]) / 6.0 h[i, i - 1] = 1.0 / (knotloc[i] - knotloc[i - 1]) h[i, i] = -1.0 * (1.0 / (knotloc[i + 1] - knotloc[i]) + 1.0 / (knotloc[i] - knotloc[i - 1])) h[i, i + 1] = 1.0 / (knotloc[i + 1] - knotloc[i]) # LU Decomposition lu, piv = scipy.linalg.lu_factor(gee, overwrite_a=True) # Solve system for i in range(numknot): geeinvh[:, i] = scipy.linalg.lu_solve((lu, piv), h[:, i]) # c is non-zero for derivative constraints geeinvc = scipy.linalg.lu_solve((lu, piv), c) d = a + np.dot(b, geeinvh) ap = np.zeros((nx, numknot)) bp = np.zeros((nx, numknot)) rowap = np.zeros(numknot) rowbp = np.zeros(numknot) for i in range(len(xcore)): rowap[:] = 0.0 rowbp[:] = 0.0 bt = np.where((knotloc >= xcore[i]) & (knotloc > 0.0))[0] j = bt[0] # Eq. 3.3.5 rowap[j - 1] = -1.0 / (knotloc[j] - knotloc[j - 1]) rowap[j] = -1.0 * rowap[j - 1] rowbp[j - 1] = (knotloc[j] - knotloc[j - 1]) * (1.0 - 3.0 * a[i, j - 1] ** 2) / 6.0 rowbp[j] = (knotloc[j] - knotloc[j - 1]) * (3.0 * a[i, j] ** 2 - 1.0) / 6.0 ap[i, :] = rowap bp[i, :] = rowbp dp = ap + np.dot(bp, geeinvh) dpp = np.dot(a, geeinvh) return d, dp, dpp, geeinvc, fx, b
[docs] def get_knotvals(self, xcore, ycore, wgt, d, geeinvc, fx, b, knotloc, bcs): """ Get the spline y-values at the knot locations that best fit the data :param xdata: x values of measured data :param ydata: values of measured data :param wgt: weight of measured data :param knotloc: location of spline knots [0, ..., 1] :param bcs: dictionary of boundary conditions :param d, geeinvc, fx, b: Return values from design_matrix :return: values of the cubic interpolating spline at knot locations that best match the data """ nx = len(xcore) numknots = len(knotloc) dwgt = np.zeros((nx, numknots)) sub = np.dot(b, geeinvc) for i in range(nx): dwgt[i, :] = d[i, :] * wgt[i] # Here is where normalization would happen dnorm = np.ones(nx) datatofit = dnorm * ycore ydatawgt = (datatofit - sub - np.dot(d, fx)) * wgt if bcs['y0'] is not None: mini = 1 else: mini = 0 if bcs['y1'] is not None: maxi = len(knotloc) - 1 else: maxi = len(knotloc) decom = np.dot(dwgt[:, mini:maxi].T, dwgt[:, mini:maxi]) # Make A = U s V' u, s, v = np.linalg.svd(decom) # Solve Ax = B using # C = u' B # S = solve(diag(s),C) B = np.dot(dwgt[:, mini:maxi].T, ydatawgt) C = np.dot(u.T, B) S = np.linalg.solve(np.diag(s), C) knotval = np.dot(v.T, S) if mini == 1: knotval = np.insert(knotval, 0, bcs['y0']) if maxi == len(knotloc) - 1: knotval = np.append(knotval, bcs['y1']) return knotval
[docs] def get_spl(self, x, y, w, bcs, k): """ :param xdata: rho values :param ydata: data values :param wgt: data weight (1/uncertainty) :param knotloc: location of knots :param bcs: boundary conditions :return: spline fit on rho values """ # Get the design matrix d, dp, dpp, geeinvc, fx, b = self.design_matrix(x, k, bcs) knotval = self.get_knotvals(x, y, w, d, geeinvc, fx, b, k, bcs) yfit = np.dot(d, knotval) + np.dot(b, geeinvc) yfitp = np.dot(dp, knotval) yfitpp = np.dot(dpp, knotval) return knotval, yfit, yfitp, yfitpp
[docs] def do_fit(self): def params_to_array(params): """ Convert params dictionary to array of knot locations :param params: lmfit parameters dictionary :return: knot locations np.ndarray """ keys = list(params.keys()) if 'sepval' in keys: keys.remove('sepval') if 'scrapewidth' in keys: keys.remove('scrapewidth') knotloc = np.zeros(len(keys)) for i, k in enumerate(keys): knotloc[i] = params[k].value sor = np.argsort(knotloc) knotloc = knotloc[sor] return knotloc def callback(params, iter, resid, *args, **kw): r""" :param params: parameter dictionary :param iter: iteration number :param resid: residual :param \*args: extra arguments :param \**kw: extra keywords :return: """ p = np.zeros(len(params)) for i, key in enumerate(params.keys()): p[i] = params[key].value def linearfit(params): """ Function to minimize for auto-knotting and edge fitting :param params: lmfit parameters dictionary :return: residual for least-squares minimization """ # Turn lmfit params into array knotloc = params_to_array(params) # Store the knot locations for each iteration of the solver. if self.iknotloc is None: self.iknotloc = np.atleast_2d(knotloc) else: self.iknotloc = np.append(self.iknotloc, knotloc[np.newaxis, :], axis=0) if self.sol: # The core part x = self.xdatafit[self.edgemask] y = self.ydatafit[self.edgemask] w = self.ydatafitwgt[self.edgemask] # Set the boundary conditions to the least-squares current iteration self.bcs['y1'] = params['sepval'].value # Do the core fit with the specified knots and boundary knotval, yfit, yfitp, yfitpp = self.get_spl(x, y, w, self.bcs, knotloc) # Must make sure that we evaluate the core spline at x=1.0 for the exponential decay # to be accurately captured starting at the boundary in the least-squares if x[-1] != 1.0: xx = np.append(x, 1.0) d, dp, dpp, geeinvc, fx, b = self.design_matrix(xx, knotloc, self.bcs) tmp = np.dot(d, knotval) + np.dot(b, geeinvc) else: tmp = y # Compute the edge part scrapewidth = params['scrapewidth'].value yfitedge = tmp[-1] * np.exp((1.0 - self.xdataedge) / scrapewidth) # Set the edge derivative to the least-squares current iteration if self.solc: self.bcs['y1p'] = -(tmp[-1] / scrapewidth) # Do the fit again with constrained edge value and derivative knotval, yfit, yfitp, yfitpp = self.get_spl(x, y, w, self.bcs, knotloc) # Form residual resid = (y - yfit) * w resid = np.append(resid, (self.ydataedge - yfitedge) * self.ydataedgewgt) # Number of observations ndata = len(x) + len(self.xdataedge) # Number of core variables in the number of internal knots nvarys = len(knotloc) - 2 # Add the two edge values as a free parameters nvarys += 2 else: x = self.xdatafit y = self.ydatafit w = self.ydatafitwgt bcs = self.bcs knotval, yfit, yfitp, yfitpp = self.get_spl(x, y, w, bcs, knotloc) # Form residual resid = (y - yfit) * w # Number of observations ndata = len(x) # Number of variables is the number of knots minus the two knots at the boundaries nvarys = len(knotloc) - 2 # Degrees of freedom is the number of data points less the number of variables nfree = ndata - nvarys # Store reduced chi2 as a function of iteration if self.irchi2 is None: self.irchi2 = np.atleast_1d(np.sum(resid**2) / nfree) else: self.irchi2 = np.append(self.irchi2, np.sum(resid**2) / nfree) return resid # If there is no knot bias then it is uniform or user, else we bias it to the edge if self.knotbias == 0: # If there are no knot locations then they are uniformly spaced if self.knotloc is None: self.knotloc = np.linspace(0, 1, self.numknot) else: knottmp = 1.0 / np.linspace(1, self.numknot, self.numknot) ** self.knotbias self.knotloc = np.abs((knottmp - knottmp[0]) / (knottmp[-1] - knottmp[0])) # Create parameters for lmfit params = lmfit.Parameters() for i in range(len(self.knotloc)): params.add('k{}'.format(i), value=self.knotloc[i], min=0.01, max=0.99) if not self.autoknot: params['k{}'.format(i)].vary = False # rho locations [0, 1] always fixed params['k0'].vary = False params['k0'].min = 0.0 params['k0'].max = 1.0 params['k0'].value = 0.0 params['k{}'.format(self.numknot - 1)].vary = False params['k{}'.format(self.numknot - 1)].min = 0.0 params['k{}'.format(self.numknot - 1)].max = 1.0 params['k{}'.format(self.numknot - 1)].value = 1.0 if self.sol: params.add('scrapewidth') params['scrapewidth'].vary = True params['scrapewidth'].value = 0.1 if self.scrapewidth_min is not None: params['scrapewidth'].min = self.scrapewidth_min if self.scrapewidth_max is not None: params['scrapewidth'].max = self.scrapewidth_max params.add('sepval') params['sepval'].vary = True params['sepval'].value = 0.1 * np.mean(self.ydatacore) if self.sepval_min is not None: params['sepval'].min = self.sepval_min if self.sepval_max is not None: params['sepval'].max = self.sepval_max #### # Perform the fit #### # Note that in general the lmfit MinimizerResult object cannot be stored fitter = lmfit.Minimizer(linearfit, params, iter_cb=callback) # v1.0.1 introduced a new argument max_nfev to uniformly specify the maximum number of function evalutions # https://lmfit.github.io/lmfit-py/whatsnew.html#version-1-0-1-release-notes if compare_version(lmfit.__version__, '1.0.1') >= 0: fit_kws = {'max_nfev': self.maxiter} else: if self.method == 'leastsq': fit_kws = {'maxfev': self.maxiter} else: fit_kws = {'options': {'maxiter': self.maxiter}} out = fitter.minimize(method=self.method, **fit_kws) #### # Process and gather the fit outputs #### if self.verbose: print(lmfit.fit_report(out)) # Store number of variables self.nvarys = out.nvarys # Store the number of degrees of freedom (ndata - nvarys) self.nfree = out.nfree # Store the knot locations and the scrape off layer width self.knotloc = params_to_array(out.params) if self.sol: self.scrapewidth = out.params['scrapewidth'].value #### # Get the spline fit on the data coordinates and rchi2 #### if self.sol: x = self.xdatafit[self.edgemask] y = self.ydatafit[self.edgemask] w = self.ydatafitwgt[self.edgemask] else: x = self.xdatafit y = self.ydatafit w = self.ydatafitwgt knotval, yfit, yfitp, yfitpp = self.get_spl(x, y, w, self.bcs, self.knotloc) self.knotval = knotval if self.sol: if x[-1] != 1.0: x = np.append(x, 1.0) d, dp, dpp, geeinvc, fx, b = self.design_matrix(x, self.knotloc, self.bcs) tmp = np.dot(d, knotval) + np.dot(b, geeinvc) self.yfit = np.append(yfit, tmp[-1] * np.exp((1.0 - self.xdataedge) / self.scrapewidth)) else: self.yfit = yfit self.resid = (self.ydatafit - self.yfit) * self.ydatafitwgt self.rchi2 = np.sum(self.resid**2) / self.nfree #### # Get the spline fit on the user axis coordinates #### edgemask = self.xufit <= 1.0 coremask = self.xufit > 1.0 d, dp, dpp, geeinvc, fx, b = self.design_matrix(self.xufit[edgemask], self.knotloc, self.bcs) yufit = np.dot(d, self.knotval) + np.dot(b, geeinvc) ypufit = np.dot(dp, self.knotval) if self.sol: self.yufit = np.append(yufit, yufit[-1] * np.exp((1.0 - self.xufit[coremask]) / self.scrapewidth)) self.ypufit = np.append(ypufit, -1.0 * yufit[-1] * np.exp((1.0 - self.xufit[coremask]) / self.scrapewidth) / self.scrapewidth) else: self.yufit = np.append(yufit, np.repeat(np.nan, np.sum(coremask))) self.ypufit = np.append(ypufit, np.repeat(np.nan, np.sum(coremask)))
[docs] def plot(self): # Plot the data and fit from omfit_plot import FigureNotebook from omfit_classes.utils_plot import uband, uerrorbar fn = FigureNotebook('GASpline') fig, ax = fn.subplots(nrows=2, sharex=True, label='Data and Fit') uerrorbar(self.xdata, uarray(self.ydata, self.ydataerr), ax=ax[0], markersize=3.0, label='All data') ax[0].plot(self.xdatafit, nominal_values(self.yfit), marker='o', markersize=3.0, label='Fit on Data') ax[0].plot(self.xufit, nominal_values(self.yufit), label='Fit on User') ax[0].plot(self.knotloc, self.knotval, marker='o', label='Knotloc,val') ax[0].legend() ax[1].plot(self.xdatafit, self.resid, marker='o', label='Weighted Residual') ax[1].axhline(0.0, color='k', ls='dashed') ax[1].legend() fig, ax = fn.subplots(nrows=2, sharex=True, label='Convergence') ax[0].plot(self.iknotloc, marker='o', mec='none') ax[0].set_ylabel('Knot Locations') ax[1].plot(self.irchi2, marker='o', mec='none') ax[1].set_ylabel('Reduced chi2') ax[1].set_xlabel('Iteration') ax[1].set_yscale('log') if self.monte is not None and self.monte > 0: fig, ax = fn.subplots(nrows=2, sharex=True, label='Monte-Carlo Fits') for i in range(self.monte): uerrorbar( self.xdatafit, uarray(self.ydatafits[:, i], self.ydatafiterr), ax=ax[0], markersize=3.0, alpha=0.2, markeredgecolor='none', ) ax[0].plot(self.xdatafit, self.yfits[:, i], color='black') ax[0].plot(self.knotlocs[:, i], np.zeros(self.numknot), marker='o', ls='None', color='black') ax[1].plot(self.xufit, self.ypufits[:, i], color='black') fig, ax = fn.subplots(nrows=2, sharex=True, label='Monte-Carlo Results') uerrorbar(self.xdata, uarray(self.ydata, self.ydataerr), ax=ax[0], markersize=3.0, label='Data', markeredgecolor='none') for i in range(self.numknot): ax[0].axvline(self.knotloc[i], color='k', ls='dashed', label='_' * (i == 0) + 'Knot Location') ax[1].axvline(self.knotloc[i], color='k', ls='dashed') for i in range(self.monte): ax[0].plot(self.xufit, self.yufits[:, i], color='black', alpha=0.2, ls='dashed') uband(self.xufit, uarray(self.yufit, self.yufiterr), ax=ax[0], label='Spline fit') ax[0].legend(loc='best', frameon=False) for i in range(self.monte): ax[1].plot(self.xufit, self.ypufits[:, i], color='black', alpha=0.2, ls='dashed') ax[1].plot([0], [0], lw=0) # dummy to sync colors uband(self.xufit, uarray(self.ypufit, self.ypufiterr), ax=ax[1], label='Spline derivative') # This line below shows why the numerical uncertainties derivative is incorrect. # uerrorbar(self.xufit, deriv(self.xufit, uarray(self.yufit, self.yufiterr)),ax=ax[1], color='black',alpha=0.2) ax[1].legend(loc='best', frameon=False)
def __call__(self, x=None, n=0): """ Returns y values of the spline fit. :param x: np.ndarray. Must be subset of the fit x. Default is all fit x. :param n: int. Order of the derivative returned :return: uarray. nth derivative of fit at x """ if x is None: x = self.xufit * 1.0 if not set(x) <= set(self.xufit): raise ValueError('Requested x must be in fit x') if n == 0: if self.monte is not None and self.monte > 0: return uarray(self.yufit, self.yufiterr) else: return uarray(self.yufit, self.yufit * 0) elif n == 1: if self.monte is not None and self.monte > 0: return uarray(self.ypufit, self.ypufiterr) else: return uarray(self.ypufit, self.ypufit * 0) else: raise ValueError('Only returns values and first order derivatives')
[docs]@_available_to_user_math class MtanhPolyExpFit(object): """ Generalized fitter derived from B. Grierson tools fits core with pow_core polynomial C(x), and edge with offset exponential of form E(x) = offset + A*np.exp(-(x - xsym)/edge_width) blends functions together about x=x_sym with tanh-like behavior y_fit = (C(x)*np.exp(z) + E(x)*np.exp(-z))/(np.exp(z) + np.exp(-z)) where z = (xsym - x)/blend_width :param method: minimization method to use :param verbose: turns on details of set flags :param onAxis_gradzero: turn on to force C'(0) = 0 (effectively y'(0) for xsym/blend_width >> 1) :param onAxis_value: set to force y(0) = onAxis_value :param fitEdge: set = False to require E(x) = offset, A=edge_width=0 :param edge_value: set to force y(x=1) = edge_value :param maxiter: controls maximum # of iterations :param blend_width_min: minimum value for the core edge blending :param edge_width_min: minimum value for the edge :param sym_guess: guess for the x location of the pedestal symmetry point :param sym_min: constraint for minimum x location for symmetry point :param sym_max: constraint for maximum x location for symmetry point :pos_edge_exp: force exponential to be positively valued so that exponential will have a negative slope in the SOL :Methods: :__call__(x): Evaluate mtanh_polyexp at x, propagating correlated uncertainties in the fit parameters. """ def __init__( self, x_data, y_data, y_err, pow_core=2, method='leastsq', verbose=False, onAxis_gradzero=False, onAxis_value=None, fitEdge=True, edge_value=None, maxiter=None, blend_width_min=1.0e-3, edge_width_min=1.0e-3, edge_width_max=None, sym_guess=0.975, sym_min=0.9, sym_max=1.0, pos_edge_exp=False, ): # Create input parameters list params = lmfit.Parameters() # Core poly initial guess = max(y_data)*(1 - x^2) # Note onAxis_value will overwrite core[0] params.add('pow_c', value=pow_core, vary=False) core = np.zeros(pow_core + 1) core[:] = 1.0 if pow_core >= 2: core[2] = -1.0 maxy = y_data[np.argmax(abs(y_data))] # Keeps sign to handle negative profiles e.g V_tor miny = y_data[np.argmin(abs(y_data))] core *= maxy for i in range(len(core)): params.add('core{}'.format(i), value=core[i]) # "blending" and edge parameters params.add('xsym', value=sym_guess, min=sym_min, max=sym_max) params.add('blend_width', value=max(0.05, 1.5 * blend_width_min), min=blend_width_min) if edge_width_max is None: params.add('edge_width', value=max(0.05, 1.5 * edge_width_min), min=edge_width_min) else: params.add('edge_width', value=0.5 * (edge_width_max + edge_width_min), min=edge_width_min, max=edge_width_max) params.add('offset', value=miny) params.add('A', value=(maxy + miny) / 2.0) # Add some dependant parameters for use in constraints params.add('em2z0', expr='exp(-2*xsym/blend_width)') params.add('E0', expr='offset + A*exp(xsym/edge_width)') params.add('E0p', expr='-A*exp(xsym/edge_width)/edge_width') params.add('y0', expr='(core0 + E0*em2z0)/(1. + em2z0)') params.add('em2z1', expr='exp(-2*(xsym-1)/blend_width)') params.add('E1', expr='offset + A*exp((xsym-1)/edge_width)') core_sum = '+'.join(['core{}'.format(i) for i in range(pow_core + 1)]) params.add('y1', expr='(' + core_sum + ' + E1*em2z1)/(1. + em2z1)') # Set up constraints if onAxis_value is not None: if verbose: print('constrain on-axis value = %f' % onAxis_value) params['y0'].set(value=onAxis_value, vary=False) params['core0'].set(expr='(1. + em2z0)*y0 - E0*em2z0') else: if verbose: print('on-axis value unconstrained') if onAxis_gradzero and ('core1' in params): if verbose: print('constrain on-axis gradient = 0') params['core1'].set(expr='(-E0p -(2*E0)/blend_width + (2*y0)/blend_width)*em2z0') else: if verbose: print('on-axis gradient unconstrained') if edge_value is not None: if verbose: print('constrain edge value = %f' % edge_value) params['y1'].set(value=edge_value, vary=False) core_sub = '-'.join(['core{}'.format(i) for i in range(pow_core)]) params['core{}'.format(pow_core)].set(expr='(1. + em2z1)*y1 - E1*em2z1 - ' + core_sub) else: if verbose: print('edge value unconstrained') if (not fitEdge) or (max(x_data) <= 1.0): # Definitely needs better tuning for initial guesses # turns out better to keep A and edge width even if only doing x <= 1 if verbose: print('only fitting x <= 1') # ; setting A=0,edge_width=blend_width' # params['A'].set(value=0, vary=False) # params['edge_width'].set(expr='blend_width') # since A=0, value doesn't matter if maxy >= 0.0: params['A'].set(min=0.0) params['offset'].set(min=0.0) else: params['A'].set(max=0.0) params['offset'].set(max=0.0) idx = np.where(x_data <= 1.0) x_core = x_data[idx] y_core = y_data[idx] err_core = y_err[idx] fitter = lmfit.Minimizer(mtanh_polyexp_res, params, fcn_args=(x_core, y_core, err_core)) else: # force edge exponential function to be always >=0 so that the slope will always be <=0 if pos_edge_exp: params['A'].set(min=0.0) fitter = lmfit.Minimizer(mtanh_polyexp_res, params, fcn_args=(x_data, y_data, y_err)) # v1.0.1 introduced a new argument max_nfev to uniformly specify the maximum number of function evalutions # https://lmfit.github.io/lmfit-py/whatsnew.html#version-1-0-1-release-notes if compare_version(lmfit.__version__, '1.0.1') >= 0: fit_kws = {'max_nfev': maxiter} else: if method == 'leastsq': fit_kws = {'maxfev': maxiter} else: fit_kws = {'options': {'maxiter': maxiter}} fitout = fitter.minimize(method=method, params=params, **fit_kws) # force errors to zero for now. todo: proper errorbars. fitout.errorbars = False # store the MinimizerResult and make its properties directly accessible in this class self._MinimizerResult = fitout self.__dict__.update(fitout.__dict__) def __call__(self, x): """ Generate mtanh_poly including correlated uncertainties from the lmfit MinimizerResult formed at initialization. :param x: np.ndarray. Output grid. :return y: UncertaintiesArray. Fit values at x. """ lmfit_out = self._MinimizerResult # error check- if no error bars, just return function w/o error bars if lmfit_out.errorbars == False: y_fit = mtanh_polyexp(x, lmfit_out.params) return y_fit # take from OMFIT fit_base object uparams = {} for k in lmfit_out.params: uparams[k] = lmfit_out.params[k].value if lmfit_out.errorbars: corr_params = uncertainties.correlated_values([lmfit_out.params[k].value for k in lmfit_out.var_names], lmfit_out.covar) for vi, var in enumerate(lmfit_out.var_names): uparams[var] = corr_params[vi] nc = uparams['pow_c'] + 1 # always a fixed value core = np.zeros(nc) # write like this to ensure starts as complex array in case on-axis value set from uncertainties import ufloat core_poly = np.zeros(len(x)) + ufloat(0.0, 0) core_poly += uparams['core0'] for ii in range(1, nc): core_poly += uparams['core%s' % ii] * (x**ii) xsym = uparams['xsym'] blend_width = uparams['blend_width'] edge_width = uparams['edge_width'] offset = uparams['offset'] A = uparams['A'] edge_func = offset + A * unumpy.exp((xsym - x) / edge_width) z = (xsym - x) / blend_width y_fit = (core_poly * unumpy.exp(z) + edge_func * unumpy.exp(-z)) / (unumpy.exp(z) + unumpy.exp(-z)) return y_fit
[docs]class UncertainRBF(object): r""" A class for radial basis function fitting of n-dimensional uncertain scattered data Parameters: :param \*args: arrays `x, y, z, ..., d, e` where `x, y, z, ...` are the coordinates of the nodes `d` is the array of values at the nodes, and `e` is the standard deviation error of the values at the nodes :param centers: None the RBFs are centered on the input data points (can be very expensive for large number of nodes points) -N: N nodes randomly distributed in the domain N: N*N nodes uniformly distributed in the domain np.array(N,X): user-defined array with X coordinates of the N nodes :param epsilon: float Adjustable constant for gaussian - defaults to approximate average distance between nodes :param function: 'multiquadric': np.sqrt((r / self.epsilon) ** 2 + 1) #<--- default 'inverse': 1.0 / np.sqrt((r / self.epsilon) ** 2 + 1) 'gaussian': np.exp(-(r**2 / self.epsilon)) 'linear': r 'cubic': r ** 3 'quintic': r ** 5 'thin_plate': r ** 2 * np.log(r) :param norm: default "distance" is the euclidean norm (2-norm) :Examples: >>> x=np.linspace(0,1,21) >>> y=np.linspace(0,1,21) >>> e=x*0+.5 >>> e[abs(x-0.5)<0.1]=8 >>> y[abs(x-0.5)<0.1]=10 >>> >>> x1 = np.linspace(0,1,100) >>> y1 = UncertainRBF(x, y, e, centers=None, epsilon=1)(x1) >>> y0 = UncertainRBF(x, y, e*0+1, centers=None, epsilon=1)(x1) >>> >>> pyplot.subplot(2,2,1) >>> errorbar(x,y,e,ls='',marker='.',label='raw 1D data') >>> uband(x1,y1,label='1D RBF w/ uncertainty') >>> pyplot.plot(x1,nominal_values(y0),label='1D RBF w/o uncertainty') >>> pyplot.title('1D') >>> legend(loc=0) >>> >>> x = np.random.rand(1000)*4.0-2.0 >>> y = np.random.rand(1000)*4.0-2.0 >>> e = np.random.rand(1000) >>> z = x*np.exp(-x**2-y**2) >>> ti = np.linspace(-2.0, 2.0, 100) >>> XI, YI = np.meshgrid(ti, ti) >>> >>> rbf = UncertainRBF(x, y, z+e, abs(e), centers=5, epsilon=1) >>> ZI = nominal_values(rbf(XI, YI)) >>> >>> rbf = UncertainRBF(x, y, z+e, abs(e)*0+1, centers=5, epsilon=1) >>> ZC = nominal_values(rbf(XI, YI)) >>> >>> pyplot.subplot(2,2,3) >>> pyplot.scatter(x, y, c=z, s=100, edgecolor='none') >>> pyplot.xlim(-2, 2) >>> pyplot.ylim(-2, 2) >>> pyplot.colorbar() >>> pyplot.title('raw 2D data (w/o noise)') >>> >>> pyplot.subplot(2,2,2) >>> pyplot.pcolor(XI, YI, ZI) >>> pyplot.xlim(-2, 2) >>> pyplot.ylim(-2, 2) >>> pyplot.colorbar() >>> pyplot.title('2D RBF w/ uncertainty') >>> >>> pyplot.subplot(2,2,4) >>> pyplot.pcolor(XI, YI, ZC) >>> pyplot.xlim(-2, 2) >>> pyplot.ylim(-2, 2) >>> pyplot.colorbar() >>> pyplot.title('2D RBF w/o uncertainty') """ def __init__(self, x, d, e, centers=None, function='multiquadric', epsilon=None, norm=None): if not np.all([xx.shape == d.shape for xx in x]) and (e.shape == d.shape): raise ValueError("Array lengths must be equal") # npts by ndim self.xi = np.asarray([np.asarray(a, dtype=np.float_).flatten() for a in x]).T self.di = np.atleast_2d(np.asarray(d).flatten()).T self.ei = np.atleast_2d(np.asarray(e).flatten()).T self.indim = self.xi.shape[1] self.outdim = self.di.shape[1] if centers is None: self.centers = self.xi elif np.iterable(centers): self.centers = centers elif centers > 0: self.centers = np.array( [x.flatten() for x in meshgrid(*[np.linspace(min(self.xi[:, k]), max(self.xi[:, k]), centers) for k in range(self.indim)])] ).T else: self.centers = np.array([np.random.uniform(min(self.xi[i]), max(self.xi[i]), self.indim) for i in range(abs(centers))]) self.numCenters = self.centers.shape[0] self.centers = self.centers.T # ndim by numcenters # default "distance" is the euclidean norm (2-norm) if norm is None: self.norm = lambda ctrs, pts: np.linalg.norm(ctrs[:, np.newaxis, ...] - pts[..., np.newaxis], axis=0) else: self.norm = norm self.epsilon = epsilon if self.epsilon is None: # default epsilon is the "the average distance between nodes" based on a bounding hypercube dim = self.xi.shape[0] ximax = np.max(self.xi, axis=1) ximin = np.min(self.xi, axis=1) edges = ximax - ximin edges = edges[np.nonzero(edges)] if not len(edges): self.epsilon = (np.max(self.xi) - np.min(self.xi)) / self.xi.size else: self.epsilon = 1.0 / np.power(np.prod(edges) / self.indim, 1.0 / edges.size) # function options function_options = { 'multiquadric': lambda r: np.sqrt((r / self.epsilon) ** 2 + 1), 'inverse': lambda r: 1.0 / np.sqrt((r / self.epsilon) ** 2 + 1), 'gaussian': lambda r: np.exp(-(r**2 / self.epsilon)), 'linear': lambda r: r, 'cubic': lambda r: r**3, 'quintic': lambda r: r**5, 'thin_plate': lambda r: r**2 * np.log(r), } if isinstance(function, str): self.fun = function_options[function] # demands it be one of the defined keys else: self.fun = function G = self._calcAct(self.xi.T) D = np.linalg.pinv(np.matrix(G / self.ei[np.newaxis, :])) self.Wd = np.dot(D, self.di / self.ei) self.We = np.dot(D, self.ei * 0 + 1.0) def _calcAct(self, X): """ Calculate the radial basis function of "radius" between points X and the centers. :param X: np.ndarray. Shape (ndims, npts) :return: np.ndarray. Shape (npts, numcenters) """ return self.fun(self.norm(self.centers, X)) def __call__(self, *args): r""" evaluate interpolation at coordinates :param \*args: arrays `x, y, z, ...` the coordinates of the nodes :return: uncertain array, of the same shape of coordinates """ args = [np.asarray(x) for x in args] if not np.all([x.shape == y.shape for x in args for y in args]): raise ValueError("Array lengths must be equal") shp = args[0].shape X = np.asarray([a.flatten() for a in args], dtype=np.float_) # ndim, npts G = self._calcAct(X) return uarray(np.array(np.dot(G, self.Wd)), np.array(abs(np.dot(G, self.We)))).reshape(shp)
# ------------ # More fitting # ------------
[docs]@_available_to_user_math def bimodal_gaussian(xx, center0, center1, sigma0, sigma1, amp0, amp1, delta0=None, delta1=None, debug=False): """ Calculates bimodal gaussian function. There are two gaussians added together. :param xx: Independent variable :param center0: Center of first gaussian before log transform :param center1: Center of second gaussian before log transform :param sigma0: Sigma of first gaussian before log transform :param sigma1: Sigma of second gaussian before log transform :param amp0: Amplitude of first gaussian :param amp1: Amplitude of second gaussian :param delta0: The fitter uses this variable to help it set up limits internally. The deltas are not actually used in the model. :param delta1: Not used in the model; here to help the fitter. :param debug: T/F: print debugging stuff (keep this off during fit, but maybe on for testing) :return: Model function y(x) """ # Calculate arguments for the exponentials arg0 = ((xx - center0) / sigma0) ** 2 / 2.0 arg1 = ((xx - center1) / sigma1) ** 2 / 2.0 # Floating underflow protection: enforce limits pad = max([abs(amp0 * 2), abs(amp1 * 2)]) # Padding gives a margin below overflow limit to allow for amplitude minpad = 1e3 if pad < minpad: pad = minpad pad *= 1e4 # Pad a little more too_big = sys.float_info.max / pad # Take the operating systems floating max and reduce it by pad big = np.log(too_big) # We will use np.exp() on arg, so take np.log(too_big) to compare to arg before np.exp() def apply_limits(c): c = np.atleast_1d(c) c[c > big] = big c[c < -big] = -big return c arg0 = apply_limits(arg0) arg1 = apply_limits(arg1) if debug: printd(' Calculating model...') printd(' arg0 : min = {:}, mean = {:}, max = {:}'.format(arg0.min(), np.mean(arg0), arg0.max())) printd(' arg1 : min = {:}, mean = {:}, max = {:}'.format(arg1.min(), np.mean(arg1), arg1.max())) printd(' amp0 = {:}, amp1 = {:}'.format(amp0, amp1)) model_y = np.exp(-arg0) * amp0 + np.exp(-arg1) * amp1 if debug: printd(' Calculated model: min = {:}, mean = {:}, max = {:}'.format(model_y.min(), np.mean(model_y), model_y.max())) return model_y
[docs]@_available_to_user_math class BimodalGaussianFit(object): """ The fit model is a sum of two Gaussians. If a middle value is provided, limits will be imposed to try to keep the Gaussians separated. """ default_guess = {} def __init__(self, x=None, pdf=None, guess=default_guess, middle=None, spammy=False, limits=None): """ Initialize variables and call functions :param x: Independent variable :param pdf: Probability distribution function :param guess: Guess for parameters. Use {} or leave as default for auto guess. :param middle: X value of a dividing line that is known to be between two separate Gaussians. Sets up additional limits on centers. No effect if None. :param spammy: Many debugging print statements :param limits: None for default limits or dictionary with PARAM_LIM keys where PARAM is center0, center1, etc. and LIM is min or max """ from lmfit import Model self.x = x self.pdf = pdf self.middle = middle self.spammy = spammy self.limits = limits self.guess = guess self.make_guess() self.model = Model(bimodal_gaussian) self.result = self.bimodal_gaussian_fit()
[docs] def make_guess(self): """ Given a dictionary with some guesses (can be incomplete or even empty), fills in any missing values with defaults, makes consistent deltas, and then defines a parameter set. Produces a parameter instance suitable for input to lmfit .fit() method and stores it as self.guess. """ from lmfit import Parameters # Make default guesses if self.middle is not None and np.any(np.atleast_1d(self.x <= self.middle)) and np.any(np.atleast_1d(self.x >= self.middle)): printd('middle was defined: {:}'.format(self.middle)) amp0g = pdf[self.x <= self.middle].max() amp1g = pdf[self.x >= self.middle].max() cen0g = self.x[self.pdf[self.x <= self.middle].argmax()] cen1g = self.x[self.pdf[self.x >= self.middle].argmax() + np.where(self.x >= self.middle)[0][0]] sig1g = cen1g / 8.0 if sig1g < 0.5: sig1g = 0.5 if sig1g > 20: sig1g = 20 c0max = self.middle * 0.9 c1min = self.middle * 1.1 d1min = self.middle else: amp0g = self.pdf.max() amp1g = self.pdf.max() cen0g = self.x[self.pdf.argmax()] cen1g = 15.0 sig1g = 1.5 c0max = self.x.max() c1min = self.x.min() d1min = 2 c0min = 0.1 c1max = self.x.max() * 2 sig0g = cen0g / 6.0 if sig0g < 0.2: sig0g = 0.2 if sig0g > 10: sig0g = 10 # Define limits min_amp = self.pdf.max() / 25.0 max_amp = self.pdf.max() * 4.0 if self.limits is None: self.limits = {} c0min = self.limits.get('center0_min', c0min) c0max = self.limits.get('center0_max', c0max) c1min = self.limits.get('center1_min', c1min) c1max = self.limits.get('center1_max', c1max) a0min = self.limits.get('amp0_min', min_amp) a0max = self.limits.get('amp0_max', max_amp) a1min = self.limits.get('amp1_min', min_amp) a1max = self.limits.get('amp1_max', max_amp) d0min = self.limits.get('delta0_min', 0) d0max = self.limits.get('delta1_min', c0max) d1min = self.limits.get('delta1_min', d1min) d1max = self.limits.get('delta1_min', c1max) printd('initial guess for center1, cen1g = {:}'.format(cen1g)) # Fill in dictionary of guesses self.guess.setdefault('center0', cen0g) self.guess.setdefault('center1', cen1g) self.guess.setdefault('amp0', amp0g) self.guess.setdefault('amp1', amp1g) if 'delta0' not in list(self.guess.keys()): self.guess.setdefault('sigma0', sig0g) else: self.guess.setdefault('sigma0', self.guess['center0'] - self.guess['delta0']) # Update delta0, which might change delta0 if inconsistent delta0 & sigma0 were supplied. printd('guess = {:}'.format(self.guess)) self.guess['delta0'] = self.guess['center0'] - self.guess['sigma0'] if 'delta1' not in list(self.guess.keys()): self.guess.setdefault('sigma1', sig1g) else: self.guess.setdefault('sigma1', self.guess['center1'] - self.guess['delta1']) # Update delta0, which might change delta0 if inconsistent delta0 & sigma0 were supplied. self.guess['delta1'] = self.guess['center1'] - self.guess['sigma1'] pars = Parameters() pars.add('amp0', value=self.guess['amp0'], vary=True, min=a0min, max=a0max) pars.add('amp1', value=self.guess['amp1'], vary=True, min=a1min, max=a1max) pars.add('center0', value=self.guess['center0'], vary=True, min=c0min, max=c0max) pars.add('center1', value=self.guess['center1'], vary=True, min=c1min) pars.add('delta0', value=self.guess['delta0'], vary=True, min=d0min, max=d0max) pars.add('delta1', value=self.guess['delta1'], vary=True, min=d1min, max=d1max) pars.add('sigma0', expr='center0-delta0') pars.add('sigma1', expr='center1-delta1') pars.add('debug', value=self.spammy, vary=False) self.guess = pars
[docs] def bimodal_gaussian_fit(self): """ Fits a probability distribution function (like a histogram output, maybe) with a two gaussian function :return: Minimizer result """ # Do the fit printd('fitting...') result = self.model.fit(self.pdf, xx=self.x, params=self.guess, method='nelder') # ms return result