Source code for omfit_classes.omfit_brainfuse

try:
    # framework is running
    from .startup_choice import *
except ImportError as _excp:
    # class is imported by itself
    if (
        'attempted relative import with no known parent package' in str(_excp)
        or 'No module named \'omfit_classes\'' in str(_excp)
        or "No module named '__main__.startup_choice'" in str(_excp)
    ):
        from startup_choice import *
    else:
        raise

try:
    from omfit_classes.brainfuse import brainfuse
    from omfit_classes.brainfuse import *
except ImportError:
    brainfuse = None
from omfit_classes.omfit_ascii import OMFITascii
from omfit_classes.utils_math import *

import random

if brainfuse is not None:

    class OMFITbrainfuse(brainfuse, OMFITascii):
        def __init__(self, filename, **kw):
            OMFITascii.__init__(self, filename, **kw)
            brainfuse.__init__(self, self.filename, **kw)

[docs] def train_net( self, dB, inputNames=[], outputNames=[], max_iterations=100, hidden_layer=None, connection_rate=1.0, noise=0.0, fraction=0.5, norm_output={}, output_mean_0=False, robust_stats=0.0, weight_decay=0.1, spring_decay=1.0, activation_function='SIGMOID_SYMMETRIC', ): """ :param dB: dictionary of input/output arrays :param inputNames: train on these inputs (use all arrays not starting with `OUT_` if not specified) :param outputNames: train on these outputs (use all arrays starting with `OUT_` if not specified) :param max_iterations: >0 max number of iterations <0 max number of iterations without improvement :param hidden_layer: list of integers defining the NN hidden layer topology :param connection_rate: float from 0. to 1. defining the density of the synapses :param noise: add gaussian noise to training set :param fraction: fraction of data used for training (the rest being for validation) fraction>0 fraction random splitting -1<fraction<0 fraction sequential splitting fraction>1 index sequential splitting :param norm_output: normalize outputs :param output_mean_0: force average of normalized outputs to have 0 mean :param robust_stats: 0<x<100 percentile of data to be considered =0 mean and std <0 median and mad :param weight_decay: exponential forget of the weight :param spring_decay: link training weight decay to the validation error :return: std_out of training process """ def f(inv1, inv2): try: return abs(np.sum(inv1 - inv2)) except Exception: return None, None, None forbidden = [] for k1 in sorted(dB.keys()): if k1[0] == '_' or np.all(dB[k1] == dB[k1][0]): forbidden.append(k1) for k2 in sorted(dB.keys()): if k1 in forbidden or k2 in forbidden: continue v = f(dB[k1], dB[k2]) if k1 != k2 and v == 0: forbidden.append(k2) if not len(inputNames): for k in sorted(dB.keys()): if k not in forbidden and not k.startswith('OUT_') and k[0] != '_': self.inputNames.append(k) else: self.inputNames = inputNames if not len(self.inputNames): raise OMFITexception('No inputs!') if not len(outputNames): for k in sorted(dB.keys()): if k[0] != '_': if np.std(dB[k] - np.mean(dB[k])) != 0: if k.startswith('OUT_') and k[0] != '_': self.outputNames.append(k) else: self.outputNames = outputNames if not len(self.outputNames): raise OMFITexception('No outputs!') inputs = [] for k in self.inputNames: inputs.append(dB[k].astype(float)) inputs = np.array(inputs).T outputs = [] for k in self.outputNames: outputs.append(dB[k].astype(float)) outputs = np.array(outputs).T def fann_mean(x, dim): if robust_stats > 0: tmp = [] for k in range(x.shape[-1]): i = np.where( (x[:, k] > np.percentile(x[:, k], robust_stats)) & (x[:, k] < np.percentile(x[:, k], 100 - robust_stats)) )[0] if len(i): tmp.append(np.mean(x[i, k])) else: tmp.append(np.mean(x[:, k])) elif robust_stats == 0: tmp = np.mean(x, 0) else: tmp = np.median(x, 0) return tmp def fann_std(x, dim): if robust_stats > 0: tmp = [] for k in range(x.shape[-1]): i = np.where( (x[:, k] > np.percentile(x[:, k], robust_stats)) & (x[:, k] < np.percentile(x[:, k], 100 - robust_stats)) )[0] if len(i): tmp.append(np.std(x[i, k])) else: tmp.append(np.std(x[:, k])) elif robust_stats == 0: tmp = np.std(x, 0) else: tmp = mad(x, 0) for k in range(x.shape[-1]): if tmp[k] == 0: tmp[k] = abs(np.mean(x[:, k])) if tmp[k] == 0: tmp[k] = 1.0 return tmp inputs_ = inputs outputs_ = outputs # normalize self.norm_output = [] for ko, itemo in enumerate(self.outputNames): self.norm_output.append([]) if isinstance(norm_output, dict): for ki, itemi in enumerate(self.inputNames): n0 = 0.0 if itemo in norm_output and itemi in norm_output[itemo]: n0 = norm_output[itemo][itemi] print('normalize %s with %s' % (itemo, itemi)) self.norm_output[-1].append(n0) elif norm_output is True: p, fitfunc = powerlaw_fit(inputs_.T, outputs_[:, ko]) print('normalize %s with %s' % (itemo, repr(p))) self.norm_output[-1].extend(p[:-1]) else: self.norm_output[-1].extend([0.0] * len(self.inputNames)) outputs_ = self.normOutputs(inputs_, outputs_) # scale inputs self.scale_mean_in = fann_mean(inputs_, 0) self.scale_deviation_in = fann_std(inputs_ - self.scale_mean_in, 0) inputs_ = (inputs_ - self.scale_mean_in) / self.scale_deviation_in # 0 output mean if not output_mean_0: self.scale_mean_out = fann_mean(outputs_, 0) self.scale_deviation_out = fann_std(outputs_ - self.scale_mean_out, 0) else: print('output mean 0') self.scale_mean_out = fann_mean(outputs_, 0) * 0 self.scale_deviation_out = np.sqrt(fann_mean(abs(outputs_) ** 2, 0)) outputs_ = (outputs_ - self.scale_mean_out) / self.scale_deviation_out if hidden_layer is None: hidden_layer = len(self.inputNames) * 2 print([len(self.inputNames)] + tolist(hidden_layer) + [len(self.outputNames)]) self.create_sparse_array(connection_rate, [len(self.inputNames)] + tolist(hidden_layer) + [len(self.outputNames)]) self.set_activation_function_output(libfann.LINEAR) self.set_activation_function_hidden(getattr(libfann, activation_function)) self.set_training_algorithm(libfann.TRAIN_RPROP) self.set_rprop_increase_factor(1.2) self.set_rprop_decrease_factor(0.5) self.set_train_error_function(libfann.ERRORFUNC_LINEAR) self.randomize_weights(-1.0, 1.0) if fraction > 0: i = np.array(rand(inputs_.shape[0])) t_tmpi = inputs_[np.where((i <= fraction))[0], :] t_tmpo = outputs_[np.where(i <= fraction)[0], :] v_tmpi = inputs_[np.where(i > fraction)[0], :] v_tmpo = outputs_[np.where(i > fraction)[0], :] elif fraction > -1 and fraction < 0: fraction = abs(fraction) n = inputs_.shape[0] t_tmpi = inputs_[: int(n * fraction), :] t_tmpo = outputs_[: int(n * fraction), :] v_tmpi = inputs_[int(n * fraction) :, :] v_tmpo = outputs_[int(n * fraction) :, :] elif abs(fraction) > 1: fraction = int(abs(fraction)) t_tmpi = inputs_[:fraction, :] t_tmpo = outputs_[:fraction, :] v_tmpi = inputs_[fraction:, :] v_tmpo = outputs_[fraction:, :] print('training set size: %d' % t_tmpo.shape[0]) print('validation set size: %d' % v_tmpo.shape[0]) os.remove(self.filename) if noise: t_tmpi = t_tmpi * (1 + self.scale_deviation_in[np.newaxis, :] * noise * randn(*t_tmpi.shape)) t_tmpo = t_tmpo * (1 + self.scale_deviation_out[np.newaxis, :] * noise * randn(*t_tmpo.shape)) print('Writing train data file') txt = [' '.join(map(str, [t_tmpi.shape[0], t_tmpi.shape[1], t_tmpo.shape[1]]))] tmp = list(range(t_tmpi.shape[0])) random.shuffle(tmp) for k in tmp: txt.append(' '.join(['%6.6f' % x for x in t_tmpi[k, :]])) txt.append(' '.join(['%6.6f' % x for x in t_tmpo[k, :]])) trainData = OMFITascii('brainfuse.train', fromString='\n'.join(txt)) print('Writing validation data file') txt = [' '.join(map(str, [v_tmpi.shape[0], v_tmpi.shape[1], v_tmpo.shape[1]]))] tmp = list(range(v_tmpi.shape[0])) random.shuffle(tmp) for k in tmp: txt.append(' '.join(['%6.6f' % x for x in v_tmpi[k, :]])) txt.append(' '.join(['%6.6f' % x for x in v_tmpo[k, :]])) validData = OMFITascii('brainfuse.valid', fromString='\n'.join(txt)) t0 = time.time() self.save() inputNames = copy.deepcopy(self.inputNames) outputNames = copy.deepcopy(self.outputNames) import omfit_classes.OMFITx as OMFITx if 'BRAINFUSE_ROOT' not in os.environ: raise OMFITexception('Error! BRAINFUSE_ROOT environmental variable is not defined') std_out = [] OMFITx.execute( os.environ['BRAINFUSE_ROOT'] + '/brainfuse_train.exe %s %s %s %d %f %f' % (self.filename, trainData.filename, validData.filename, max_iterations, weight_decay, spring_decay), std_out=std_out, ) self.load() self.inputNames = inputNames self.outputNames = outputNames data = libfann.training_data() data.set_train_data(inputs_, outputs_) self.MSE = self.test_data(data) data.destroy_train() print(self.MSE) self.save() print('Training timing:' + str(time.time() - t0)) return std_out
else: # associate OMFITbrainfuse to OMFITascii to bring the # files along even if they cannot be parsed and used
[docs] class OMFITbrainfuse(OMFITascii): pass
brainfuse = None activateNets = None activateNetsFile = None activateMergeNets = None __all__ = ['OMFITbrainfuse', 'brainfuse', 'activateNets', 'activateNetsFile', 'activateMergeNets'] ############################################ if __name__ == '__main__': test_classes_main_header() if not os.path.exists('brainfuse.net'): print('Unable to test omfit_brainfuse') sys.exit() OMFITbrainfuse('brainfuse.net')