Source code for conn2res.reservoir

# -*- coding: utf-8 -*-
"""
Functionality for simulating reservoirs
"""
from abc import ABCMeta, abstractmethod
import numpy as np
from numpy.linalg import pinv
from .connectivity import Conn
from .utils import *

[docs]class Reservoir(metaclass=ABCMeta): """ Class that represents a general Reservoir object ... Attributes ---------- w : numpy.ndarray reservoir connectivity matrix (source, target) _state : numpy.ndarray reservoir activation states n_nodes : int dimension of the reservoir Methods ---------- # TODO simulate add_washout_time """ def __init__(self, w): """ Constructor class for general Reservoir Networks Parameters ---------- w : (N, N) numpy.ndarray reservoir connectivity matrix (source, target) N: number of nodes in the network. If w is directed, then rows (columns) should correspond to source (target) nodes. """ self.w = w self._state = None self.n_nodes = len(self.w)
[docs] @abstractmethod def simulate(self, *args, **kwargs): pass
[docs] def add_washout_time(self, *args, idx_washout=0): """ Add washout time to reservoir states and corresponding arrays (e.g., label, sample weight) 'ext_input' Parameters ---------- idx_washout: int index up to which the values of arrays should be deleted args: numpy.ndarray reservoir states and any additional arrays where washout is to be applied Returns ------- args: numpy.ndarray same arrays as in args but after washout is applied """ # delete initial indexes of arrays in args argout = tuple(a[idx_washout:] for a in args) return argout
[docs]class EchoStateNetwork(Reservoir): """ Class that represents an Echo State Network ... Attributes ---------- w : numpy.ndarray reservoir connectivity matrix (source, target) _state : numpy.ndarray reservoir activation states n_nodes : int dimension of the reservoir activation_function : {'tanh', 'piecewise'} type of activation function Methods ------- # TODO simulate set_activation_function """ def __init__(self, *args, activation_function='tanh', **kwargs): """ Constructor class for Echo State Networks Parameters ---------- w: (N, N) numpy.ndarray Reservoir connectivity matrix (source, target) N: number of nodes in the network. If w is directed, then rows (columns) should correspond to source (target) nodes. activation_function: str {'linear', 'elu', 'relu', 'leaky_relu', 'sigmoid', 'tanh', 'step'}, default 'tanh' Activation function (nonlinearity of the system's units) """ super().__init__(*args, **kwargs) # activation function self.activation_function = self.set_activation_function( activation_function)
[docs] def simulate( self, ext_input, w_in, input_gain=None, ic=None, output_nodes=None, return_states=True, **kwargs ): """ Simulates reservoir dynamics given an external input signal 'ext_input' and an input connectivity matrix 'w_in' Parameters ---------- ext_input : (time, N_inputs) numpy.ndarray External input signal N_inputs: number of external input signals w_in : (N_inputs, N) numpy.ndarray Input connectivity matrix (source, target) N_inputs: number of external input signals N: number of nodes in the network input_gain : float Constant gain that scales w_in ic : (N,) numpy.ndarray, optional Initial conditions N: number of nodes in the network. If w is directed, then rows (columns) should correspond to source (target) nodes. output_nodes : list or numpy.ndarray, optional List of nodes for which reservoir states will be returned if 'return_states' is True. return_states : bool, optional If True, simulated resrvoir states are returned. True by default. kwargs: Other keyword arguments are passed to self.activation_function Returns ------- self._state : (time, N) numpy.ndarray Activation states of the reservoir. N: number of nodes in the network if output_nodes is None, else number of output_nodes """ # print('\n GENERATING RESERVOIR STATES ...') # if ext_input is list or tuple convert to numpy.ndarray if isinstance(ext_input, (list, tuple)): sections = get_sections(ext_input) ext_input = concat(ext_input) convert_to_list = True else: convert_to_list = False # initialize reservoir states timesteps = range(1, len(ext_input) + 1) self._state = np.zeros((len(timesteps) + 1, self.n_nodes)) # set initial conditions if ic is not None: self._state[0, :] = ic # simulate dynamics for t in timesteps: # if (t > 0) and (t % 100 == 0): # print(f'\t ----- timestep = {t}') synap_input = np.dot( self._state[t-1, :], self.w) + np.dot(ext_input[t-1, :], w_in) self._state[t, :] = self.activation_function(synap_input, **kwargs) # remove initial condition (to match the time index of _state # and ext_input) self._state = self._state[1:] # convert back to list or tuple if convert_to_list: self._state = split(self._state, sections) # return the same type if return_states: if output_nodes is not None: if convert_to_list: return [state[:, output_nodes] for state in self._state] else: return self._state[:, output_nodes] else: return self._state
[docs] def set_activation_function(self, function): def linear(x, m=1): return m * x def elu(x, alpha=0.5): x[x <= 0] = alpha*(np.exp(x[x <= 0]) - 1) return x def relu(x): return np.maximum(0, x) def leaky_relu(x, alpha=0.5): return np.maximum(alpha * x, x) def sigmoid(x): return 1.0 / (1 + np.exp(-x)) def tanh(x): return np.tanh(x) def step(x, thr=0.5, vmin=0, vmax=1): return np.piecewise(x, [x < thr, x >= thr], [vmin, vmax]).astype(int) if function == 'linear': return linear elif function == 'elu': return elu elif function == 'relu': return relu elif function == 'leaky_relu': return leaky_relu elif function == 'sigmoid': return sigmoid elif function == 'tanh': return tanh elif function == 'step': return step
[docs]class MemristiveReservoir: """ Class that represents a general Memristive Reservoir ... Attributes ---------- _W : numpy.ndarray reservoir's binary connectivity matrix _I : numpy.ndarray indices of internal nodes _E : numpy.ndarray indices of external nodes _GR : numpy.ndarray indices of grounded nodes n_internal_nodes : int number of internal nodes n_external_nodes : int number of external nodes n_grounded_nodes : int number of gorunded nodes n_nodes : int total number of nodes (internal, external, and ground) G : numpy.ndarray matrix of conductances save_conductance : bool Indicates whether to save conductance state after each simulation step. If True, then will be stored in self._G_history. This will increase memory demands. _state : numpy.ndarray reservoir activation states Methods ---------- # TODO References ---------- # TODO """ def __init__(self, w, int_nodes, ext_nodes, gr_nodes, save_conductance=False, *args, **kwargs): """ Constructor class for Memristive Networks. Memristive networks are an abstraction for physical networks of memristive elements. Parameters ---------- w : (N, N) numpy.ndarray reservoir's binary connectivity matrix N: total number of nodes in the network (internal + external + grounded nodes) int_nodes : (n_internal_nodes,) numpy.ndarray indexes of internal nodes n_internal_nodes: number of internal nodes ext_nodes : (n_external_nodes,) numpy.ndarray indexes of external nodes n_external_nodes: number of external nodes gr_nodes : (n_grounded_nodes,) numpy.ndarray indexes of grounded nodes n_grounded_nodes: number of grounded nodes save_conductance : bool, optional Indicates whether to save conductance state after each simulation step. If True, then will be stored in self._G_history. This will increase memory demands. Default: False """ # super().__init__(*args, **kwargs) self._W = self.setW(w) self._I = np.asarray(int_nodes) self._E = np.asarray(ext_nodes) self._GR = np.asarray(gr_nodes) self._n_internal_nodes = len(self._I) self._n_external_nodes = len(self._E) self._n_grounded_nodes = len(self._GR) self._n_nodes = len(self._W) self._G = None self.save_conductance = save_conductance self._G_history = None self._state = None def setW(self, w): """ # TODO This function guarantees that W is binary and symmetric. Converts directed connectivity matrices in undirected. Parameters ---------- """ # convert to binary w = w.astype(bool).astype(int) # make sure the diagonal is zero np.fill_diagonal(w, 0) # make symmetric if w is directed if not check_symmetric(w): # connections in upper diagonal upper_diag = w[np.triu_indices_from(w, 1)] # connections in lower diagonal lower_diag = w.T[np.triu_indices_from(w, 1)] # matrix of undirected connections W = np.zeros_like(w).astype(int) W[np.triu_indices_from(w, 1)] = np.logical_or(upper_diag, lower_diag ).astype(int) return make_symmetric(W, copy_lower=False) else: return w def init_property(self, mean, std=0.1, seed=None): """ This function initializes property matrices following a normal distribution with mean = 'mean' and standard deviation = 'mean' * 'std' Parameters ---------- # TODO Returns ------- # TODO """ # use random number generator for reproducibility rng = np.random.default_rng(seed=seed) p = rng.normal(mean, std*mean, size=self._W.shape) p = make_symmetric(p) return p * self._W # ma.masked_array(p, mask=np.logical_not(self._W)) def solveVi(self, Ve, Vgr=None, G=None, **kwargs): """ This function uses Kirchhoff's law to estimate voltage at the internal nodes based on the current state of the conductance matrix G, a given external input voltage 'V_E' and the ground level voltage 'V_GR' Parameters ---------- # TODO Returns ------- # TODO """ if Vgr is None: Vgr = np.zeros((self._n_grounded_nodes)) if G is None: G = self._G # TODO: verify that the axis along which the sum is performed is correct # matrix N N = np.zeros((self._n_nodes, self._n_nodes)) np.fill_diagonal(N, np.sum(G, axis=1)) # matrix A A = N - G # inverse matrix A_II A_II = A[np.ix_(self._I, self._I)] # print(matrix_rank(A_II, hermitian=check_symmetric(A_II))) A_II_inv = pinv(A_II) # matrix HI H_IE = np.dot(G[np.ix_(self._I, self._E)], Ve) H_IGR = np.dot(G[np.ix_(self._I, self._GR)], Vgr) H_I = H_IE + H_IGR # return voltage at internal nodes return np.dot(A_II_inv, H_I) def getV(self, Vi, Ve, Vgr=None): """ Given the nodal voltage at the internal, external and grounded nodes, this function estimates the voltage across all existent memristive elements Parameters ---------- # TODO Returns ------- # TODO """ # set voltage at grounded nodes if Vgr is None: Vgr = np.zeros((self._n_grounded_nodes)) # set of all nodal voltages voltage = np.concatenate( [Vi[:, np.newaxis], Ve[:, np.newaxis], Vgr[:, np.newaxis]]).squeeze() # set of all nodes (internal + external + grounded) nodes = np.concatenate( [self._I[:, np.newaxis], self._E[:, np.newaxis], self._GR[:, np.newaxis]]).squeeze() # dictionary that groups pairs of voltage nv_dict = {n: v for n, v in zip(nodes, voltage)} # voltage across memristors V = np.zeros_like(self._W).astype(float) for i, j in list(zip(*np.where(self._W != 0))): if j > i: V[i, j] = nv_dict[i] - nv_dict[j] else: V[i, j] = nv_dict[j] - nv_dict[i] return mask(self, V) def simulate(self, Vext, ic=None, mode='forward'): """ Simulates the dynamics of a memristive reservoir given an external voltage signal V_E Parameters ---------- Vext : (time, N_external_nodes) numpy.ndarray External voltage signal N_external_nodes: number of external (input) nodes ic : (N_internal_nodes,) numpy.ndarray Initial conditions N_internal_nodes: number of internal (output) nodes mode : {'forward', 'backward'} Refers to the method used to solve the system of equations. Use 'forward' for explicit Euler method, and 'backward' for implicit Euler method. Returns ------- self._state : (time, N) numpy.ndarray activation states of the reservoir; includes all the nodes N: number of nodes in the network """ # print('\n GENERATING RESERVOIR STATES ...') # print(f'\n SIMULATING STATES IN {mode.upper()} MODE ...') # initialize reservoir states self._state = np.zeros((len(Vext), self._n_nodes)) # initialize array for storing conductance history if needed if self.save_conductance: self._G_history = np.zeros((len(Vext), self._n_nodes, self._n_nodes)) for t, Ve in enumerate(Vext): if mode == 'forward': if (t > 0) and (t % 100 == 0): print(f'\t ----- timestep = {t}') # get voltage at internal nodes Vi = self.solveVi(Ve) # update matrix of voltages across memristors V = self.getV(Vi, Ve) # update conductance self.updateG(V=V, update=True) elif mode == 'backward': if (t > 0) and (t % 100 == 0): print(f'\t ----- timestep = {t}') # get voltage at internal nodes Vi = self.iterate(Ve) # store activation states self._state[t, self._E] = Ve self._state[t, self._I] = Vi # store conductance if self.save_conductance: self._G_history[t] = self._G return self._state def iterate(self, Ve, tol=5e-2, iters=100): """ # TODO """ # initial guess for voltage at internal nodes Vi = [self.solveVi(Ve=Ve, G=self._G)] # initial guess for conductance G = [self._G] convergence = False n_iters = 0 while not convergence: assert n_iters < iters, 'There is no convergence !!!' # get voltage across memristors # and update conductance V = self.getV(Vi[-1].copy(), Ve) G_tmp = self.updateG(V, G[-1], update=False) # solve voltage at internal nodes Vi Vi.append(self.solveVi(Ve=Ve, G=G_tmp)) # update conductance with G_tmp # supposedly correct G.append(self.updateG(V, G_tmp, update=False)) # G.append(self.updateG(self.getV(Vi[-1].copy(), Ve), G[-1], update=False)) # G.append(self.updateG(self.getV(Vi[-1].copy(), Ve), G_tmp, update=False)) # estimate error err_Vi = self.getErr(Vi[-2], Vi[-1]) err_G = self.getErr(G[-2], G[-1]) max_err = np.max((np.max(err_Vi), np.max(err_G))) if max_err < tol: self.updateG(self.getV(Vi[-1].copy(), Ve), G[-1], update=True) convergence = True else: del G[0] del Vi[0] n_iters += 1 # print(f'\t\t n_iter = {n_iters}') # print(f'\t\t\t max error = {max_err}') return Vi[-1] def getErr(self, x_0, x_1): """ # TODO """ err = 2 * np.abs(x_1-x_0)/(np.abs(x_1)+np.abs(x_0)) err[np.isnan(err)] = 0.0 return err def mask(self, a): """ This functions converts to zero all entries in matrix 'a' for which there is no existent connection # TODO """ a[np.where(self._W == 0)] = 0 return a
[docs]class MSSNetwork(MemristiveReservoir): """ Class that represents a Metastable Switch Memristive network (see Nugent and Molter, 2014 for details) ... Attributes ---------- w : numpy.ndarray reservoir's binary connectivity matrix I : numpy.ndarray indices of internal nodes E : numpy.ndarray indices of external nodes GR : numpy.ndarray indices of grounded nodes n_internal_nodes : int number of internal nodes n_external_nodes : int number of external nodes n_grounded_nodes : int number of gorunded nodes n_nodes : int total number of nodes (internal, external, and ground) G : numpy.ndarray matrix of conductances save_conductance : bool Indicates whether to save conductance state after each simulation step. If True, then will be stored in self._G_history. This will increase memory demands. vA : numpy.ndarray of floats vB : numpy.ndarray of floats tc : numpy.ndarray of floats NMSS : numpy.ndarray of ints Woff : numpy.ndarray of floats Won : numpy.ndarray of floats Ga : numpy.ndarray of floats Gb : numpy.ndarray of floats Nb : numpy.ndarray of ints Methods ---------- # TODO """ # physical parameters of the MMS model k = 1.3806503e-23 # Boltzman's constant Q = 1.60217646e-19 # electron charge Temp = 298 # temperature b = Q/(k*Temp) VT = 1/b def __init__(self, vA=0.17, vB=0.22, tc=0.32e-3, NMSS=10000, Woff=0.91e-3, Won=0.87e-2, Nb=2000, noise=0.1, *args, **kwargs): """ Constructor class for Memristive Networks following the Generalized Memristive Switch Model proposed in Nugent and Molter, 2014. Default parameter values correspond to an Ag-chalcogenide memristive device taken from Nugent and Molter, 2014. Parameters ---------- w : (N, N) numpy.ndarray reservoir's binary connectivity matrix N: total number of nodes in the network (internal + external + grounded nodes) int_nodes : (n_internal_nodes,) numpy.ndarray indexes of internal nodes n_internal_nodes: number of internal nodes ext_nodes : (n_external_nodes,) numpy.ndarray indexes of external nodes n_external_nodes: number of external nodes gr_nodes : (n_grounded_nodes,) numpy.ndarray indexes of grounded nodes n_grounded_nodes: number of grounded nodes save_conductance : bool, optional Indicates whether to save conductance state after each simulation step. If True, then will be stored in self._G_history. This will increase memory demands. Default: False vA : float. Default: 0.17 vB : float. Default: 0.22 tc : float. Default: 0.32e-3 NMSS : int. Default: 10000 Woff : float. Default: 0.91e-3 Won : float. Default: 0.87e-2 Nb : int. Default: 2000 noise : float. Default: 0.1 # TODO """ super().__init__(*args, **kwargs) self.vA = self.init_property(vA, noise) # constant self.vB = self.init_property(vB, noise) # constant self.tc = self.init_property(tc, noise) # constant self.NMSS = self.init_property(NMSS, noise) # constant self.Woff = self.init_property(Woff, noise) # constant self.Won = self.init_property(Won, noise) # constant self._Ga = np.divide(self.Woff, self.NMSS, where=self.NMSS != 0, out=np.zeros_like(self.Woff)) # constant self._Gb = np.divide(self.Won, self.NMSS, where=self.NMSS != 0, out=np.zeros_like(self.Won)) # constant self._Nb = self.init_property(Nb, noise) self._G = self._Nb * (self._Gb - self._Ga) + self.NMSS * self._Ga def dG(self, V, G=None, dt=1e-4, seed=None): """ # TODO This function updates the conductance matrix G given V Parameters ---------- V : (N,N) numpy.ndarray matrix of voltages accross memristors Returns ------- References ---------- """ # set Nb values if G is not None: Gdiff1 = G - self.NMSS * self._Ga Gdiff2 = self._Gb - self._Ga Nb = np.divide(Gdiff1, Gdiff2, where=Gdiff2 != 0, out=np.zeros_like(Gdiff1)) else: Nb = self._Nb # ration of dt to characterictic time of the device tc alpha = np.divide(dt, self.tc, where=self.tc != 0, out=np.zeros_like(self.tc)) # compute Pa exponent = -1 * (V - self.vA) / self.VT Pa = alpha / (1 + np.exp(exponent)) # compute Pb exponent = -1 * (V + self.vB) / self.VT Pb = alpha * (1 - 1 / (1 + np.exp(exponent))) # compute dNb Na = self.NMSS - Nb # use random number generator for reproducibility rng = np.random.default_rng(seed=seed) Gab = rng.binomial(Na.astype(int), mask(self, Pa)) Gba = rng.binomial(Nb.astype(int), mask(self, Pb)) if check_symmetric(self._W): Gab = make_symmetric(Gab) Gba = make_symmetric(Gba) dNb = (Gab-Gba) return dNb def updateG(self, V, G=None, update=False): """ # TODO """ if G is None: G = self._G # compute dG dNb = self.dG(V=V, G=G) dG = dNb * (self._Gb-self._Ga) if update: # update Nb self._Nb += dNb # update G self._G = self._Nb * (self._Gb - self._Ga) + self.NMSS * self._Ga # self._G += dG else: return self._G.copy() + dG # updated conductance