Source code for omfit_classes.brainfuse

import os
import sys
import glob
import numpy as np

try:
    from fann2 import libfann
except Exception as libfannImportError:

    class libfann:
        class neural_net:
            def __init__(self, *args, **kw):
                from fann2 import libfann


__all__ = ['libfann', 'brainfuse', 'activateNets', 'activateNetsFile', 'activateMergeNets', 'brainfuseDataset']


[docs]class brainfuse(libfann.neural_net): def __init__(self, filename, **kw): libfann.neural_net.__init__(self) self.filename = filename self.scale_mean_in = None self.scale_mean_out = None self.scale_deviation_in = None self.scale_deviation_out = None self.inputNames = [] self.outputNames = [] self.norm_output = [] self.MSE = None self.load()
[docs] def denormOutputs(self, inputs, outputs): return self.normOutputs(inputs, outputs, denormalize=True)
[docs] def normOutputs(self, inputs, outputs, denormalize=False): for ko, itemo in enumerate(self.outputNames): norm = 1.0 for ki, itemi in enumerate(self.inputNames): norm *= inputs[:, ki] ** self.norm_output[ko][ki] # print('*normalize %s with %s'%(itemo,itemi)) if isinstance(norm, np.ndarray): if not denormalize: # print('*apply normalization on %s: %s'%(itemo,repr(self.norm_output[ko]))) outputs[:, ko] /= norm else: # print('*apply denormalization') outputs[:, ko] *= norm return outputs
[docs] def activate(self, dB, verbose=False): inputs = dB if isinstance(dB, dict): inputs = [] for k in self.inputNames: inputs.append(np.atleast_1d(dB[k]).astype(float)) inputs = np.array(inputs).T targets = [] if isinstance(dB, dict): for k in self.outputNames: try: targets.append(np.atleast_1d(dB[k]).astype(float)) except Exception: if verbose: print(k + ' not in dB') targets = np.array(targets).T out = [] for k, tmp in enumerate(inputs): tmp = (tmp - self.scale_mean_in) / self.scale_deviation_in tmp = np.array(self.run(tmp)) tmp = tmp * self.scale_deviation_out + self.scale_mean_out out.append(tmp) out = np.array(out) out = self.denormOutputs(inputs, out) return out, targets
def __getstate__(self): self.save() tmp = {'filename': self.filename} return tmp def __setstate__(self, tmp): self.__init__(tmp['filename'])
[docs] def save(self): super().save(self.filename) with open(self.filename, 'r') as f: original = f.readlines() tmp = [] for line in original: if 'neurons' in line and self.scale_mean_in is not None: tmp.append('scale_included=1') tmp.append('scale_mean_in=' + ' '.join(['%6.6f' % k for k in self.scale_mean_in])) tmp.append('scale_deviation_in=' + ' '.join(['%6.6f' % k for k in self.scale_deviation_in])) tmp.append('scale_new_min_in=' + ' '.join(['%6.6f' % -1.0] * self.get_num_input())) tmp.append('scale_factor_in=' + ' '.join(['%6.6f' % 1.0] * self.get_num_input())) tmp.append('scale_mean_out=' + ' '.join(['%6.6f' % k for k in self.scale_mean_out])) tmp.append('scale_deviation_out=' + ' '.join(['%6.6f' % k for k in self.scale_deviation_out])) tmp.append('scale_new_min_out=' + ' '.join(['%6.6f' % -1.0] * self.get_num_output())) tmp.append('scale_factor_out=' + ' '.join(['%6.6f' % 1.0] * self.get_num_output())) tmp.append(line.strip()) elif 'scale_' not in line or self.scale_mean_in is None: tmp.append(line.strip()) if len(self.inputNames): tmp.append('input_names=' + ' '.join([repr(k) for k in self.inputNames])) if len(self.outputNames): tmp.append('output_names=' + ' '.join([repr(k) for k in self.outputNames])) if len(self.norm_output): tmp.append('norm_output=' + ' '.join([repr(float(k)) for k in np.array(self.norm_output).flatten()])) if self.MSE is not None: tmp.append('MSE=' + ' ' + str(self.MSE)) with open(self.filename, 'w') as f: f.write('\n'.join(tmp))
# print('\n'.join(tmp))
[docs] def load(self): if not os.stat(self.filename).st_size: return self super().create_from_file(self.filename) with open(self.filename, 'r') as f: tmp = f.readlines() for line in tmp: line = line.rstrip() if 'input_names=' in line: self.inputNames = eval((line.replace('input_names=', '[') + ']').replace("' '", "','")) elif 'output_names=' in line: self.outputNames = eval((line.replace('output_names=', '[') + ']').replace("' '", "','")) elif 'norm_output=' in line: tmp = np.array(eval((line.replace('norm_output=', '[') + ']').replace(" ", ","))) self.norm_output = np.reshape(tmp, (len(self.outputNames), len(self.inputNames))) elif 'MSE=' in line: what, value = line.strip().split('=') self.MSE = eval(value) elif 'scale_mean_in' in line: what, value = line.strip().split('=') self.scale_mean_in = np.array(eval('[' + ','.join(value.split(' ')) + ']')) elif 'scale_mean_out' in line: what, value = line.strip().split('=') self.scale_mean_out = np.array(eval('[' + ','.join(value.split(' ')) + ']')) elif 'scale_deviation_in' in line: what, value = line.strip().split('=') self.scale_deviation_in = np.array(eval('[' + ','.join(value.split(' ')) + ']')) elif 'scale_deviation_out' in line: what, value = line.strip().split('=') self.scale_deviation_out = np.array(eval('[' + ','.join(value.split(' ')) + ']')) return self
[docs]def activateNets(nets, dB): """ :param nets: dictionary with OMFITbrainfuse objects (or path where to load NNs from) :param dB: dictionary with entries to run on :return: tuple with (out,sut,targets,nets,out_) """ if isinstance(nets, str): nets = {k: brainfuse(file) for k, file in enumerate(glob.glob(nets))} net = nets[list(nets.keys())[0]] out_ = np.empty((len(np.atleast_1d(dB[list(dB.keys())[0]])), len(net.outputNames), len(nets))) for k, n in enumerate(nets): out_[:, :, k], targets = nets[n].activate(dB) out = np.mean(out_, -1) sut = np.std(out_, -1) return out, sut, targets, nets, out_
[docs]def activateNetsFile(nets, inputFile, targetFile=None): """ :param nets: dictionary with OMFITbrainfuse objects (or path where to load NNs from) :param inputFile: ASCII file where to load the inputs to run the NN :param targetFile: ASCII file where to load the targets for validating the NN :return: tuple with (out,sut,targets,nets,out_) """ if isinstance(nets, str): nets = {k: OMFITbrainfuse(file) for k, file in enumerate(glob.glob(nets))} net = nets[list(nets.keys())[0]] dB = {} for k in net.inputNames: dB[k] = [] with open(inputFile, 'r') as f: for line in f.readlines()[1:]: for k, item in enumerate(line.split()): dB[net.inputNames[k]].append(float(item)) if targetFile is not None: for k in net.outputNames: dB[k] = [] for line in open(targetFile, 'r').readlines()[1:]: for k, item in enumerate(line.split()): dB[net.outputNames[k]].append(float(item)) for k in net.inputNames: dB[k] = np.array(dB[k]) return activateNets(nets, dB)
[docs]def activateMergeNets(nets, dB, merge_nets): if isinstance(nets, str): nets = {k: OMFITbrainfuse(file) for k, file in enumerate(glob.glob(nets))} net = nets[list(nets.keys())[0]] out_0 = np.empty((len(dB[list(dB.keys())[0]]), len(net.outputNames), len(nets))) index = net.inputNames.index(merge_nets) centers = np.empty(len(nets)) merge_norm = np.empty(len(nets)) for k, n in enumerate(nets): out_0[:, :, k], targets = nets[n].activate(dB) centers[k] = nets[n].scale_mean_in[index] merge_norm[k] = nets[n].scale_deviation_in[index] w = dB[merge_nets][:, np.newaxis] - centers[np.newaxis, :] w = np.exp(-((w / merge_norm[np.newaxis, :]) ** 2)) w = w / np.sum(w, 1)[np.newaxis, :].T out_ = out_0 * w[:, np.newaxis, :] out = np.sum(out_, -1) sut = np.sqrt(np.sum(w[:, np.newaxis, :] * (out_0 - out[:, :, np.newaxis]) ** 2, -1)) return out, sut, targets, nets, out_0
class brainfuseDataset(dict): def inputs(self): return [k for k in list(self.keys()) if not k.startswith('OUT_')] def outputs(self): return [k for k in list(self.keys()) if k.startswith('OUT_')] def astable(self, columns, every=1): tmp = [] return np.array([self[k][::every] for k in list(self.keys()) if k in columns])