Skip to content
Snippets Groups Projects
Commit 48cacb14 authored by Mazdak Fatahi's avatar Mazdak Fatahi
Browse files

Upload New File

parent 12a0e64a
No related branches found
No related tags found
No related merge requests found
import numpy as np
import math
from mpl_toolkits.axes_grid1 import ImageGrid
import matplotlib.pyplot as plt
import torch
import snntorch
from snntorch import utils, spikegen
from tqdm import tqdm
#from pyNN.utility.plotting import Figure, Panel
#import spynnaker.pyNN as sim
INPUT_SIZE=784
# Initialization
#Neurons Parameters
excitatory_neuron_parameters = {
'v_rest': -70.0,#V_REST, # Resting membrane potential in mV.
'cm': .3,# 1.0, # Capacity of the membrane in nF
'tau_m': 10.0, #100.0, # Membrane time constant in ms.
'tau_refrac': 10,#4.0,#5.0, # Duration of refractory period in ms.
'tau_syn_E': 1.0, # Decay time of the excitatory synaptic conductance in ms.
'tau_syn_I': 1.0,#2.0, # Decay time of the inhibitory synaptic conductance in ms.
# 'e_rev_E': 0.0, # Reversal potential for excitatory input in mV
# 'e_rev_I': -100.0, # Reversal potential for inhibitory input in mV
'v_thresh': -55.4,#-52.0, # Spike threshold in mV.
'v_reset': -65.0,#V_RESET, # Reset potential after a spike in mV.
'i_offset': 0.005# 0.0, # Offset current in nA
}
def img_grid(input_data, nrows_ncols, file_name=None, figsize=(10, 10), cmap="jet"):
"""
Using ImageGrid from mpl_toolkits.axes_grid1 to show multiple images in grids
Parameters
----------
input_data: An array of images.
nrows_ncols: A tuple (number of rows, number of columns)
file_name: A string to save the plot as
cmap: 'flag', 'prism', 'ocean', 'gist_earth', 'terrain',
'gist_stern', 'gnuplot', 'gnuplot2', 'CMRmap',
'cubehelix', 'brg', 'gist_rainbow', 'rainbow', 'jet',
'turbo', 'nipy_spectral', 'gist_ncar'
Ex_1:
p=np.random.random_integers(0,255,(100,10,10))
plot_and_save_grid(p, (10, 10), 'test.svg')
Ex_2:
p=[[[1,2,3],[1,2,3],[1,2,3]] for _ in range(8)]
plots.img_grid(p, (3,2), 'test.svg')
"""
fig = plt.figure(figsize=figsize)
grid = ImageGrid(fig, 111,
nrows_ncols,
axes_pad=0.1,
)
for ax, inx in zip(grid, list(range(nrows_ncols[0]*nrows_ncols[1]))):
ax.imshow(input_data[inx],cmap=cmap)
ax.set_axis_off()
if file_name:
plt.savefig(file_name)
return
def multi_titled_imgs(input_data, main_title, row_titles, col_titles, file_name=None, axis_off=True, figsize=(10,15), main_title_fontsize=20, rows_fontsize=15, cols_fontsize=10):
"""
Parameters
----------
input_data: An array of images.
main_title: A sting as top title
row_titles: An array of n strings for the title of n each row
col_titles: An array of 2 strings for the title of each columns
Ex_1:
p=np.random.random_integers(0,255,(6,10,10))# 6 random (10, 10) matrixes
main_title='Main Title'
row_titles=["A", "B", "C"]
col_titles=["First Col", "2nd Col"]
plots.multi_titled_imgs(p[:10],main_title,row_titles,col_titles, file_name='a.png', axis_off=False)
"""
fig = plt.figure(figsize=figsize,constrained_layout=True)
fig.suptitle(main_title, fontsize=main_title_fontsize)
subfigs = fig.subfigures(nrows=len(row_titles), ncols=1)
for row, subfig in enumerate(subfigs):
subfig.suptitle(f'{row_titles[row]}', fontsize=rows_fontsize)
axs = subfig.subplots(nrows=1, ncols=2)
for col, ax in enumerate(axs):
ax.imshow(input_data[row*2+col])
ax.set_title(f' {col_titles[col]}', fontsize=cols_fontsize)
if axis_off:
ax.set_axis_off()
if file_name:
plt.savefig(file_name)
return
def Extract_ConnectionFiles(Input_CSNN_Binary_Files=[], ConnectionFile_Names=[], Input_size=784, Output_size=400, n_samples_to_plot=0, Non_Zero=False, Transpoed=False, Inpute_shape={'x':28, 'y':28},):
"""
To generate connection files from the CSNN binary files.
----------
NOTE: For each layer you need to call the function separately.
----------
Parameters
----------
Authors Note: It is recommended to call the function with Transpoed=False and transpose the input before passing it to the SpikeSourceArray in PyNN.
-----
Input_CSNN_Binary_Files: Python list of binary file names (different binary files from the same layer)
ConnectionFile_Names: Python list of output file names (as text files compatible with PyNN FromFileConnector)
Input_size: Number of the neurons as the input of the layer
Output_size: Number of the neurons as the output of the layer
n_samples_to_plot: You can plot n_samples_to_plot weights between one input and all the output as plot
Non_Zero: If set to True, the weights with zero values will be removed from the output file.
Transpoed: Since CSNN binary file for the trained weights is transposed, is set Transpoed to True, the function will transpose the weights before generating the output file.
Inpute_shape: Python dictionary of shape of the input samples
"""
extracted_weights=[[] for _ in Input_CSNN_Binary_Files]
for i, file_name in enumerate(Input_CSNN_Binary_Files):
w = np.fromfile(file_name, dtype=np.dtype(np.float32))
print(f'Input file shape = {w.shape}')
f=open(ConnectionFile_Names[i], 'w')
f.write('# columns = ["i", "j", "weight", "delay"]\n')
if Transpoed:
w=w.reshape(Inpute_shape['x'], Inpute_shape['y'], -1)#(28, 28, 400)
w=np.array([w[:, :, j].transpose() for j in range(Output_size) ])#(400, 28, 28)
w=w.reshape( Output_size, Input_size).transpose()# 400 rows of (28, 28) to 400 rows of 784 then to 784*400
X,Y=w.shape
print(X,Y)
for x in range(X):
for y in range(Y):
if Non_Zero:
if w[x][y]:
line=f'{x} {y} {w[x][y]} 1\n'
f.write(line)
extracted_weights[i].append(w[x][y])
else:
line=f'{x} {y} {w[x][y]} 1\n'
f.write(line)
extracted_weights[i].append(w[x][y])
f.close()
if n_samples_to_plot:
w_to_plot=[w.reshape(Inpute_shape['x'], Inpute_shape['y'], -1)[ :, :, i] for i in range(n_samples_to_plot)]
img_grid(w_to_plot,(int(math.sqrt(n_samples_to_plot)),int(math.sqrt(n_samples_to_plot))))
else:
w=w.reshape(Input_size, Output_size)# 784*400
X,Y=w.shape
for x in range(X):
for y in range(Y):
if Non_Zero:
if w[x][y]:
line=f'{x} {y} {w[x][y]} 1\n'
f.write(line)
extracted_weights[i].append(w[x][y])
else:
line=f'{x} {y} {w[x][y]} 1\n'
f.write(line)
extracted_weights[i].append(w[x][y])
f.close()
if n_samples_to_plot:
w_to_plot=[w.reshape(Inpute_shape['x'], Inpute_shape['y'], -1)[ :, :, i] for i in range(n_samples_to_plot)]
img_grid(w_to_plot,(int(math.sqrt(n_samples_to_plot)),int(math.sqrt(n_samples_to_plot))))
return extracted_weights
def convert_to_latency_code_V4(input_data, tarnsposed=True, samples_intervel=5, time_step=5, num_steps=10):# for CSNN simulation since exracted weights are transposed
slice_interval=time_step
input_interval=samples_intervel
number_of_digits=len(input_data)
coded_digits=[]
if tarnsposed:
for i in range(number_of_digits):
s=spikegen.latency((torch.t(input_data[i][0:28,0:28].T/255)).float(),num_steps=num_steps,normalize=True,linear=True)##spikegen.rate((input_data[i][0:28,0:28]/255).float(), num_steps=num_steps)
#print(s)
coded_digits.append(s[:-1])
else:
for i in range(number_of_digits):
s=spikegen.latency((torch.t(input_data[i][0:28,0:28]/255)).float(),num_steps=num_steps,normalize=True,linear=True)##spikegen.rate((input_data[i][0:28,0:28]/255).float(), num_steps=num_steps)
#print(s)
coded_digits.append(s[:-1])
#print(f'coded_digits ={len(coded_digits)}')
current_time=5# 0 will make the strongest vlaue which will spike first to zero
INPUT_SIZE=784
spikes_time_source_array=[[] for _ in range(INPUT_SIZE)]
with tqdm(total=number_of_digits) as pbar:
for coded_digit_counter,coded_digit in enumerate(coded_digits):
for time_slice_counter, time_slice in enumerate(coded_digit):
timed_flatten_time_slice=time_slice.view(-1)*current_time
#print(timed_flatten_time_slice.shape)
for spike_time_index in torch.nonzero(timed_flatten_time_slice):
spikes_time_source_array[spike_time_index].append(timed_flatten_time_slice[spike_time_index].item())
current_time+=slice_interval
current_time+=input_interval
pbar.update(1)
return (current_time, spikes_time_source_array)
#========================================================================================================== extract_wieghts
def extract_wieghts(projection):
return (projection.get(["weight"],"list", with_address=False))
#========================================================================================================== extract_synaptic_parameters
def extract_synaptic_parameters(projection):
stored_kernel_connector=projection.get(['weight', 'delay'],"list")
connection_list=[]
for con in stored_kernel_connector:
connection_list.append((con['source'],con['target'],con['weight'],con['delay']))
return ( sorted(connection_list, key=lambda x: x[1])) # sort it according to index of neurons in the post population
#========================================================================================================== plot_and_save_grid
def plot_and_save_grid(input_data,nrows_ncols,save=False,file_name=None):
fig = plt.figure(figsize=(10, 10))
grid = ImageGrid(fig, 111, # similar to subplot(111)
nrows_ncols, # creates nrow x ncol grid of axes
axes_pad=0.1, # pad between axes in inch.
)
for ax, inx in zip(grid, list(range(nrows_ncols[0]*nrows_ncols[1]))):
ax.imshow(input_data[inx],cmap="jet")
ax.set_axis_off()
if save:
plt.savefig(file_name)
return
#========================================================================================================== make_population
def make_populations(inupu_spike_train, EXC_POP_SIZE, INH_POP_SIZE):
INPUT_POP = sim.Population(int(INPUT_SIZE), sim.SpikeSourceArray(inupu_spike_train), label="input")
EXC_POP=[]
INH_POP=[]
for exp_pop_number in range(len(EXC_POP_SIZE)):
print(f'exp_pop_number_{exp_pop_number} has {EXC_POP_SIZE[exp_pop_number]} neurons')
EXC_POP.append(sim.Population(int(EXC_POP_SIZE[exp_pop_number]),sim.IF_curr_exp(**excitatory_neuron_parameters), initial_values={'v': excitatory_neuron_parameters["v_rest"]}, label=f"EXC_POP_{exp_pop_number}"))
for inh_pop_number in range(len(INH_POP_SIZE)):
print(f'inh_pop_number_{inh_pop_number} has {INH_POP_SIZE[inh_pop_number]} neurons')
INH_POP.append(sim.Population(int(INH_POP_SIZE[inh_pop_number]),sim.IF_curr_exp(**inhibitory_neuron_parameters), initial_values={'v': inhibitory_neuron_parameters["v_rest"]}, label=f"INH_POP_{inh_pop_number}"))
return INPUT_POP, EXC_POP, INH_POP
#========================================================================================================== Make Network
def make_projections_from_file(INPUT_POP, EXC_POPs, INH_POPs, FILES):
EXC_EXC_PRJs=[]
EXC_INH_PRJs=[]
INH_EXC_PRJs=[]
connections_list_from_file=sim.FromFileConnector(FILES[0])
INPUT_EXC_PRJ=sim.Projection(INPUT_POP, EXC_POPs[0], connections_list_from_file, receptor_type = 'excitatory')
print(f'Porjection from Input ot Excitatory pop_0 is created')
print('============================================================')
connections_list_from_file=[]
for exc_pop_number, file in enumerate(FILES[1:]):
connections_list_from_file=sim.FromFileConnector(file)
EXC_EXC_PRJs.append(sim.Projection(EXC_POPs[exc_pop_number], EXC_POPs[exc_pop_number+1], connections_list_from_file, receptor_type = 'excitatory'))
print(f'Porjection from Excitatory pop_{exc_pop_number} ot Excitatory pop_{exc_pop_number+1} is created')
print('============================================================')
for inh_pop_number in range(len(INH_POPs)):
EXC_INH_PRJs.append(sim.Projection(EXC_POPs[inh_pop_number], INH_POPs[inh_pop_number], sim.OneToOneConnector(), synapse_type = sim.StaticSynapse(weight=2, delay=1.0),receptor_type = 'excitatory') )
print(f'Porjection from Excitatory pop_{inh_pop_number} ot Inhibatory pop_{inh_pop_number} is created')
inhibitory_excitatory_connection_list = []
for i in range(INH_POP_SIZE[inh_pop_number]):
for j in range(EXC_POP_SIZE[inh_pop_number]):
if not i==j:
inhibitory_excitatory_connection_list.append((i,j))
print(f'Connection list from Inhibatory_Pop_{inh_pop_number} to Exctatory_Pop_{inh_pop_number} = {INH_POP_SIZE[inh_pop_number]}*{EXC_POP_SIZE[inh_pop_number]}')
INH_EXC_PRJs.append(sim.Projection(INH_POPs[inh_pop_number], EXC_POPs[inh_pop_number],
sim.FromListConnector(inhibitory_excitatory_connection_list),
synapse_type = sim.StaticSynapse(weight=25,delay=1.0),
receptor_type = 'inhibitory'))
print('============================================================')
return INPUT_EXC_PRJ, EXC_EXC_PRJs
def make_projections_from_file_with_one_shut_inhibitory(INPUT_POP, EXC_POPs, INH_POPs, FILES):
EXC_EXC_PRJs=[]
EXC_INH_PRJs=[]
INH_EXC_PRJs=[]
connections_list_from_file=sim.FromFileConnector(FILES[0])
INPUT_EXC_PRJ=sim.Projection(INPUT_POP, EXC_POPs[0], connections_list_from_file, receptor_type = 'excitatory')
print(f'Porjection from Input ot Excitatory pop_0 is created')
print('============================================================')
connections_list_from_file=[]
for exc_pop_number, file in enumerate(FILES[1:]):
connections_list_from_file=sim.FromFileConnector(file)
EXC_EXC_PRJs.append(sim.Projection(EXC_POPs[exc_pop_number], EXC_POPs[exc_pop_number+1], connections_list_from_file, receptor_type = 'excitatory'))
print(f'Porjection from Excitatory pop_{exc_pop_number} ot Excitatory pop_{exc_pop_number+1} is created')
print('============================================================')
for inh_pop_number in range(len(INH_POPs)):
EXC_INH_PRJs.append(sim.Projection(EXC_POPs[inh_pop_number], INH_POPs[inh_pop_number], sim.OneToOneConnector(), synapse_type = sim.StaticSynapse(weight=50, delay=1.0),receptor_type = 'excitatory') )
print(f'Porjection from Excitatory pop_{inh_pop_number} ot Inhibatory pop_{inh_pop_number} is created')
print(f'Connecting ONE SHUT Inhibatory_Pop_{inh_pop_number} to all Exctatory_Pop_{inh_pop_number} = {INH_POP_SIZE[inh_pop_number]}*{EXC_POP_SIZE[inh_pop_number]}')
INH_EXC_PRJs.append(sim.Projection(INH_POPs[inh_pop_number], EXC_POPs[inh_pop_number],
sim.AllToAllConnector(),
synapse_type = sim.StaticSynapse(weight=100,delay=1.0),
receptor_type = 'inhibitory'))
print('============================================================')
return INPUT_EXC_PRJ, EXC_EXC_PRJs
#========================================================================================================== Make Network
# run simulator for 'time' and feed SpikeSourceArray with 'spike_list_flatten'
def init_sim(timestep, time_scale_factor, neuron_tpye, number_of_neurons_per_core):
sim.end()
sim.setup(timestep=timestep,time_scale_factor=time_scale_factor)#sim.setup(timestep=1,time_scale_factor=10)
sim.set_number_of_neurons_per_core(neuron_tpye, number_of_neurons_per_core)
return
def make_network(spikes_list_flatten, EXC_POP_SIZE, INH_POP_SIZE):
INPUT_POP, EXC_POPs, INH_POPs = make_populations(spikes_list_flatten, EXC_POP_SIZE, INH_POP_SIZE)
INPUT_EXC_PRJ, EXC_EXC_PRJs = make_projections_from_file(INPUT_POP, EXC_POPs, INH_POPs, weights_files)#make_projections_2(INPUT_POP)
return (INPUT_POP, EXC_POPs, INH_POPs, INPUT_EXC_PRJ, EXC_EXC_PRJs)
def record_activities(activities, INPUT_POP, EXC_POPs, INH_POPs):
if INPUT_POP:
INPUT_POP.record(activities[1])
for i in range(len(EXC_POPs)):
EXC_POPs[i].record(activities)
for i in range(len(INH_POPs)):
INH_POPs[i].record(activities)
return
#==================================================
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment