Select Git revision
callbacks.py
-
Hammouda Elbez authoredHammouda Elbez authored
callbacks.py 38.53 KiB
""" This class contains Dash callbacks
Dash callbacks are the responsible on updating graphs each step.
"""
from dash.dependencies import Input, Output, State, MATCH, ALL
import dash
import pymongo
import traceback
from dash import dcc, html
import dash_daq as daq
from bson.json_util import loads
from dash.exceptions import PreventUpdate
import plotly.graph_objects as go
from plotly.subplots import make_subplots
from collections import deque
import numpy as np
import pandas as pd
from src.templates.callbacksOp import callbacksOp
class callbacks(callbacksOp):
""" Callbacks class
"""
def __init__(self, super, app, g):
""" Initialize the callback .
Args:
app : Flask app
g (Global_Var): reference to access global variables
"""
# ------------------------------------------------------------
# Callbacks
# ------------------------------------------------------------
# to prevent creating duplicate callbacks next time
if not g.checkExistance(app, "NeuronFilterSynapse"):
try:
# Callback to handle insert and remove of elements from screen
@app.callback([Output({"type": "GraphsAreaSynapse"}, "children"), Output({"type": "GlobalHeatMapAreaSynapse"}, "children")],
[Input("AddComponentSynapse", "n_clicks"), Input(
{"type": "DeleteComponent", "index": ALL}, "n_clicks")],
[State({"type": "GraphsAreaSynapse"}, "children"), State({"type": "GlobalHeatMapAreaSynapse"}, "children"),
State("LayerFilterSynapse", "value"), State("NeuronFilterSynapse", "value"),State("heatmapEditInfo", "data")])
def InsertRemoveNewElement(addButtonClick, deleteButtonsClick, graphsArea, GlobalHeatmap, selectedLayer, selectedNeuron, heatmapEditInfo):
""" Insert of remove element.
Args:
addButtonClick (int): number of clicks on the button
deleteButtonsClick (list): number of clicks on all delete buttons
graphsArea (list): list contains all neurons graphs
GlobalHeatmap (list): list of GLobalHeatma component children
selectedLayer (String): name of the selected layer
selectedNeuron (int): id of selected neuron
heatmapEditInfo (Array): heatmap edit stats
Returns:
the updated content of 'GraphsAreaSynapse' and 'GlobalHeatMapAreaSynapse'
"""
context = dash.callback_context.triggered
if "AddComponentSynapse" in context[0]['prop_id']:
# Add item
if(selectedLayer != None and (selectedNeuron != None and len(selectedNeuron) != 0)):
if len(selectedNeuron) > 0:
if len([item for item in graphsArea if item["props"]["id"] == "spaceHolder"]) == 1:
updatedBody = self.addSynapseArea(addButtonClick, selectedLayer, selectedNeuron)
return [[updatedBody], GlobalHeatmap]
else:
graphsArea.append(self.addSynapseArea(addButtonClick, selectedLayer, selectedNeuron))
return [graphsArea, GlobalHeatmap]
else:
return [graphsArea, GlobalHeatmap]
else:
data = None
if selectedLayer != None:
if(selectedLayer in super.globalHeatMap):
data = super.globalHeatMap[selectedLayer]
else:
data = self.getGlobalSynapseWeights(g,selectedLayer)
super.globalHeatMap[selectedLayer] = data
return [graphsArea, self.AddGlobalHeatmap(g, selectedLayer, data, heatmapEditInfo) if not data.empty else []]
else:
if(context[0]['prop_id'] != "."):
# Delete item
itemDeleted = loads(
str(context[0]['prop_id']).split(".")[0])["index"]
super.xAxisSynapseFreqGraph = ({item: super.xAxisSynapseFreqGraph.get(str(item)) for
item in super.xAxisSynapseFreqGraph if item != str(itemDeleted)})
super.yAxisSynapseFreqGraph = ({item: super.yAxisSynapseFreqGraph.get(str(item)) for
item in super.yAxisSynapseFreqGraph if item != str(itemDeleted)})
self.xAxisLabel = ({item: super.xAxisLabel.get(str(item)) for
item in super.xAxisLabel if item != str(itemDeleted)})
super.heatmaps = ({item: super.heatmaps.get(str(item)) for
item in super.heatmaps if item != str(itemDeleted)})
super.HeatMapSynapseFreqGraph = ({item: super.HeatMapSynapseFreqGraph.get(str(item)) for
item in super.HeatMapSynapseFreqGraph if item != str(itemDeleted)})
newbody = [item for item in graphsArea if item["props"]
["id"] != "VisComponent"+str(itemDeleted)]
if(len(newbody) == 0):
newbody.append(self.addSpaceHolder(g,app))
return [newbody, GlobalHeatmap]
else:
if(len(graphsArea) == 0):
graphsArea.append(self.addSpaceHolder(g,app))
return [graphsArea, GlobalHeatmap]
except Exception:
print("InsertRemoveNewElement: "+ traceback.format_exc())
try:
# Callback to get neurons of seleced layer
@app.callback([Output("NeuronFilterSynapse", "options"), Output("NeuronFilterSynapse", "value")], [Input("LayerFilterSynapse", "value")])
def NeuronSelection(selectedLayer):
""" Get neurons of the selected layer.
Args:
selectedLayer (String): selected layer name
Returns:
list of neurons for neurons menu component
"""
if selectedLayer != None:
return [[{'label': str(i), 'value': str(i)} for i in range(g.Layer_Neuron[selectedLayer])], ""]
else:
return [[], ""]
except Exception:
print("NeuronSelection: " + traceback.format_exc())
try:
# Callback to handle clearing content when received a clear signal
@app.callback([Output("display-Synapse", "children"), Output("clear-Synapse", "children")], [Input("v-step", "children")], [State("interval", "value"), State("clear", "children"), State("clear-Synapse", "children"), State("display-Synapse", "children"),State({"type": "GraphsAreaSynapse"}, "children")])
def clearProcess(sliderValue, updateInterval, clearGraphs, localClear, displayContent, graphContent):
""" Clear the display if it is required.
Args:
sliderValue (int): value of the slider
updateInterval (int): update Interval (s)
clearGraphs (boolean): a dash state to pass information to all visualization about clearing content (if needed)
localClear (boolean): local value of the global clear variable stored for comparaison purposes
displayContent (boolean): when this value change all graphs will be updated
graphContent : displayed neurons information
Returns:
boolean array
"""
if(localClear != clearGraphs):
existingIndexes = [item["props"]["id"].split("VisComponent")[1] for item in graphContent if "VisComponent" in item["props"]["id"]]
super.clearData(existingIndexes)
return [displayContent, clearGraphs]
except Exception:
print("clearProcess: "+ traceback.format_exc())
try:
# Callback to handle spikes per interval graph and spike class representation
@app.callback([Output({"index": MATCH, "layer": MATCH, "neuron": MATCH, "type": "SynapseFreqGraph"}, "figure"),
Output({"index": MATCH, "layer": MATCH, "neuron": MATCH, "type": "Heatmap"}, "figure")],
[Input("display-Synapse", "children")], [State("v-step", "children"), State("interval", "value"),
State({"index": MATCH, "layer": MATCH, "neuron": MATCH, "type": "SynapseFreqGraph"}, "id"),
State({"index": MATCH, "layer": MATCH, "neuron": MATCH, "type": 'Synapse-frequency-switch'}, 'on'),
State({"index": MATCH, "layer": MATCH, "neuron": MATCH, "type": 'Synapse-HeatMap-switch'}, 'on'),State("heatmapEditInfo", "data")])
def processSynapseData(displayContent, sliderValue, updateInterval, selectedItem, isOnSynapseFreqGraph, isOnHeatmap, heatmapEditInfo):
""" Create two graphs using neuron synapses ('SynapseFreqGraph' and 'Heatmap')
Args:
displayContent (boolean): when this value change all graphs will be updated
sliderValue (int): value of the slider
updateInterval (int): update Interval (s)
selectedItem (list): selected neuron
isOnSynapseFreqGraph (bool): whether SynapseFreq graph is active or not
isOnHeatmap (bool): whether Heatmap graph is active or not
heatmapEditInfo (Array): heatmap edit stats
Raises:
PreventUpdate: in case we don't want to update the content we rise this execption
Returns:
synapse frequency graph and heatmap content (data and layout)
"""
if dash.callback_context.triggered[0]["prop_id"] != '.':
# data processing
if(sliderValue > 0 and sliderValue <= g.stepMax):
data = self.getSynapseWeights(int(sliderValue)*float(updateInterval), g, selectedItem)
if (len(super.xAxisLabel[selectedItem["index"]]) == 0 or super.xAxisLabel[selectedItem["index"]][-1] != ("["+g.getLabelTime(g.updateInterval, sliderValue)+","+g.getLabelTime(g.updateInterval, sliderValue+1)+"]")):
if len(super.xAxisSynapseFreqGraph[selectedItem["index"]]) > 0:
super.xAxisSynapseFreqGraph[selectedItem["index"]].append(super.xAxisSynapseFreqGraph[selectedItem["index"]][-1]+1)
else:
super.xAxisSynapseFreqGraph[selectedItem["index"]].append(0)
super.xAxisLabel[selectedItem["index"]].append("["+g.getLabelTime(g.updateInterval, sliderValue)+","+g.getLabelTime(g.updateInterval, sliderValue+1)+"]")
output = [self.synapseFreqDrawGraph(g, selectedItem["index"],
data, super.xAxisSynapseFreqGraph[selectedItem["index"]], super.xAxisLabel[selectedItem["index"]], super.yAxisSynapseFreqGraph,
super.HeatMapSynapseFreqGraph, isOnSynapseFreqGraph),
self.heatMap(g, selectedItem["index"], data, super.heatmaps, isOnHeatmap, heatmapEditInfo)]
return output
else:
raise PreventUpdate
else:
if(sliderValue == 0):
data = self.getSynapseWeights(int(sliderValue)*float(updateInterval), g, selectedItem)
super.xAxisSynapseFreqGraph[selectedItem["index"]].append(sliderValue)
super.xAxisLabel[selectedItem["index"]].append("["+g.getLabelTime(g.updateInterval, sliderValue)+","+g.getLabelTime(g.updateInterval, sliderValue+1)+"]")
output = [self.synapseFreqDrawGraph(g, selectedItem["index"],
data, super.xAxisSynapseFreqGraph[selectedItem["index"]], super.xAxisLabel[selectedItem["index"]], super.yAxisSynapseFreqGraph,
super.HeatMapSynapseFreqGraph, isOnSynapseFreqGraph),
self.heatMap(g, selectedItem["index"], data, super.heatmaps, isOnHeatmap, heatmapEditInfo)]
return output
else:
raise PreventUpdate
else:
# after adding to the screen
if(selectedItem["index"] not in super.xAxisLabel):
data = self.getSynapseWeights(int(sliderValue)*float(updateInterval), g, selectedItem)
super.xAxisSynapseFreqGraph[selectedItem["index"]] = deque(maxlen=100)
super.xAxisLabel[selectedItem["index"]] = deque(maxlen=100)
super.xAxisSynapseFreqGraph[selectedItem["index"]].append(sliderValue)
super.xAxisLabel[selectedItem["index"]].append("["+g.getLabelTime(g.updateInterval, sliderValue)+","+g.getLabelTime(g.updateInterval, sliderValue+1)+"]")
output = [self.synapseFreqDrawGraph(g, selectedItem["index"],
data, super.xAxisSynapseFreqGraph[selectedItem["index"]], super.xAxisLabel[selectedItem["index"]], super.yAxisSynapseFreqGraph,
super.HeatMapSynapseFreqGraph, isOnSynapseFreqGraph),
self.heatMap(g, selectedItem["index"], data, super.heatmaps, isOnHeatmap, heatmapEditInfo)]
return output
else:
raise PreventUpdate
except Exception:
print("processSynapseData: "+ traceback.format_exc())
try:
# Callback to handle global heatmaps tab (open or close)
@app.callback([Output("collapse-GlobalHeatMapAreaSynapse", "is_open")],
[Input("group-GlobalHeatMapAreaSynapse-toggle", "n_clicks")], [State("collapse-GlobalHeatMapAreaSynapse", "is_open")])
def GlobalHeatMapTab(nbrClicks, isTabOpen):
""" Function called when the user clicks on the information tab.
Args:
nbrClicks (int): number of clicks on the tab
isTabOpen (bool): whether tab is open or not
Returns:
if global heatmaps tab should be opened or closed
"""
if dash.callback_context.triggered[0]["value"] != None:
return [not isTabOpen]
else:
return [isTabOpen]
except Exception:
print("GlobalHeatMapTab: "+ traceback.format_exc())
try:
# Callback to handle heatmap flipping if needed
@app.callback([Output("heatmapEditInfo", "data")],[Input('FlipHeatmapH', 'n_clicks'),Input('FlipHeatmapV', 'n_clicks'),Input('RotateHeatmap', 'n_clicks'),State("heatmapEditInfo", "data")])
def HeatmapFlip(FlipHeatmapH, FlipHeatmapV, RotateHeatmap, heatmapEditInfo):
""" Function called when flip button clicked
Args:
FlipHeatmapH (int): number of clicks on the FlipHeatmapH button
FlipHeatmapV (int): number of clicks on the FlipHeatmapV button
RotateHeatmap (int): number of clicks on the RotateHeatmap button
heatmapEditInfo (Array): heatmap edit stats
Returns:
if global heatmaps tab should be opened or closed
"""
context = dash.callback_context.triggered
if "FlipHeatmapH" in context[0]['prop_id']:
heatmapEditInfo[0] = not heatmapEditInfo[0]
elif "FlipHeatmapV" in context[0]['prop_id']:
heatmapEditInfo[1] = not heatmapEditInfo[1]
else:
heatmapEditInfo[2] = not heatmapEditInfo[2]
return [heatmapEditInfo]
except Exception:
print("HeatmapFlip: "+ traceback.format_exc())
try:
# ------------------------------------------------------------
# Helper functions
# ------------------------------------------------------------
def addSynapseArea(self, index, layer, neuron):
""" Add a new synapse area that contains graphs.
Args:
index (int): index of the new added area
layer (String): layer id
neuron (String): neuron id
Returns:
html component that contains the synapse area graphs
"""
return html.Div([
html.Div([
html.Div([
html.Div([html.P("Synapses", style={"textAlign": "start", "marginLeft": "20px",
"marginTop": "6px", "fontSize": "13px"}),
daq.PowerButton(
id={"index": str(index), "layer": layer, "neuron": neuron,
"type": "Synapse-frequency-switch"},
on='True', size=25, color="#28a745", style={"marginLeft": "10px"}),
html.Button(html.I(className="fa-solid fa-trash"), style={"fontWeight": "500","fontSize":"16px", "marginLeft": "10px","color":"red","backgroundColor":"#00110000","border":"0"}, id={"index": str(index), "type": "DeleteComponent"}),
html.Div([html.Span(layer+" N: "+neuron, style={"fontSize": "13px", "paddingTop": "10px", "marginRight": "10px"}, className="badge alert-info")], className="d-flex", style={"direction": "rtl", "width": "100%"})], className="d-flex", style={"height": "35px"}),
dcc.Graph(id={"index": str(index), "type": "SynapseFreqGraph", "layer": layer, "neuron": neuron}, style={
"width": "100%", "height": "290px"}, className="col-12", animate=False, config={"displaylogo": False}),
], className="col-lg-6 col-sm-12 col-xs-12"),
html.Div([
html.Div([
html.P("HeatMap", style={
"textAlign": "start", "marginLeft": "10px", "marginTop": "6px", "fontSize": "13px"}),
daq.PowerButton(
id={"index": str(index), "type": "Synapse-HeatMap-switch",
"layer": layer, "neuron": neuron},
on='True', size=25, color="#28a745", style={"marginLeft": "10px"}
)], className="d-flex", style={"height": "35px", "direction": "rtl", "paddingRight": "30px"}),
dcc.Graph(id={"index": str(index), "type": "Heatmap", "layer": layer, "neuron": neuron}, style={"width": "100%", "height": "290px"}, className="col-12", animate=False, config={"displaylogo": False}),
], className="col-lg-6 col-sm-12 col-xs-12")
], className="row", style={"padding": "10px"})
], style={"background": "rgb(242, 248, 255)", "paddingBottom": "10px", "marginBottom": "10px",
"margin": "5px", "borderRadius": "10px"},
id="VisComponent"+str(index))
def addSpaceHolder(self,globalVar,app):
""" Adds a space holder area when no graphs are selected.
Args:
globalVar (Global_Var): reference to access global variables
app : Flask app
Returns:
html component that contains the neuron area graphs
"""
return html.Div([
html.Div([html.Img(src=app.get_asset_url('holderImg.png'), className="col-6", style={"marginTop":"60px"})],
className="col-12", style={"width": "100%"})
], style={"background": "rgb(244 244 244)", "paddingBottom": "10px", "marginBottom": "10px", "margin": "5px", "borderRadius": "10px", "height":"350px"},
id="spaceHolder")
def AddGlobalHeatmap(self, g, selectedLayer, data, heatmapEditInfo):
""" Add global heatmap for selected layer.
Args:
g (Global_Var): reference to access global variables
selectedLayer (String): layer name
data (dataframe): synapses weights of all neurons in the layer
heatmapEditInfo (Array): heatmap edit stats
Returns:
html component that contains the global heatmaps
"""
heatMapX = data["x"].max() + 1
heatMapY = data["y"].max() + 1
depth = data["C"].max()
data = data[["x", "y", "C", "V", "To"]]
data = data.sort_values(["C"])
for i in g.LayersNeuronsInfo:
if(i["layer"] == selectedLayer):
neuronsNbr = i["neuronNbr"]
neuronsX, neuronsY = g.createShape(neuronsNbr)
fig = make_subplots(
rows=neuronsX, cols=neuronsY)
index = 0
for xx in range(1, neuronsX+1):
for yy in range(1, neuronsY+1):
if(index < neuronsNbr):
fig.add_trace(
go.Heatmap(
zmin=0,
zmax=1,
z=g.createHeatMap(
heatMapX, heatMapY, data[data.To == index],depth, heatmapEditInfo[2]),
colorscale='jet',
name=str(index)),
row=xx, col=yy)
if heatmapEditInfo[0]:
fig.update_xaxes(autorange="reversed")
if heatmapEditInfo[1]:
fig.update_yaxes(autorange="reversed")
fig.update_layout(height=80, width=80)
index += 1
fig.update_layout(height=1200, width=1200, title_text=selectedLayer)
fig.update_traces(showscale=False)
return html.Div([dcc.Graph(id="GlobalHeatmap", figure=fig, config={"displaylogo": False}, style={'overflowY': 'scroll', 'height': 500})])
def synapseFreqDrawGraph(self, g, index, data, xAxis, xAxisLabel, yAxisList, HeatMapSynapseFreqGraph, isOn):
""" Create scatter plot with vertical heatmap for selected neuron synapses.
Args:
g (Global_Var): reference to access global variables
index (int): index of the new added area
data (list): data to be added to the graph
xAxis (deque): X axis values
xAxisLabel (deque): X axis labels
yAxisList (dict): dict contains Y axis values
HeatMapSynapseFreqGraph (list): list of scatter plot with vertical heatmap created
isOn (bool): whether this graph is active or not
Returns:
scatter plot with vertical heatmap content (data and layout)
"""
try:
if isOn:
if not data.empty:
if str(index) not in yAxisList:
yAxisList[str(index)] = deque(maxlen=100)
HeatMapSynapseFreqGraph[str(
index)] = deque(maxlen=100)
# add data
if data.count != 0:
yAxisList[str(index)].append(float("{:.6f}".format(data["V"].mean())))
HeatMapSynapseFreqGraph[str(index)].append([float(d) for d in data["V"].values])
# Y X or the vertical Heatmaps
heatMapsZ = list()
heatMapsZLabel = list()
for heatmap in HeatMapSynapseFreqGraph[str(index)]:
tmp = g.createVerticalHeatMap(heatmap)
heatMapsZ.append(tmp[0])
heatMapsZLabel.append(tmp[1])
return {'data': [
go.Heatmap(
zmin=0,
zmax=100,
z=np.array(heatMapsZ).flatten(),
x=[x for x in list(xAxis)
for _ in range(0, 10)],
y=[0.1*i for i in range(10)] * len(xAxis),
text=np.array(heatMapsZLabel).flatten(),
hovertemplate="%{text}",
showscale=False,
name='',
zsmooth="fast",
colorscale='jet'),
go.Scatter(
x=list(xAxis),
y=list(yAxisList[str(index)]),
name='',
text=list(xAxisLabel),
hovertemplate="%{text} <b><br>%{y}</b>",
mode='lines+markers',
marker_color='rgb(120, 255, 120)'
)],
'layout': go.Layout(
margin={'l': 60, 'r': 0, 't': 0, 'b': 40},
uirevision='no reset of zoom',
yaxis={'title': 'Mean Synapses Potential',
'range': [0, 1]},
paper_bgcolor= "rgba(255, 255, 255,0)",
plot_bgcolor= "rgba(255, 255, 255,0)",
xaxis={'title':'Step'}
)}
else:
if str(index) not in yAxisList:
yAxisList[str(index)] = deque(maxlen=100)
HeatMapSynapseFreqGraph[str(index)] = deque(maxlen=100)
yAxisList[str(index)].append(0)
HeatMapSynapseFreqGraph[str(index)].append(0)
else:
if(len(yAxisList[str(index)]) > 0):
yAxisList[str(index)].append(yAxisList[str(index)][-1])
HeatMapSynapseFreqGraph[str(index)].append(HeatMapSynapseFreqGraph[str(index)][-1])
else:
yAxisList[str(index)].append(0)
HeatMapSynapseFreqGraph[str(index)].append(0)
heatMapsZ = list()
heatMapsZLabel = list()
for heatmap in HeatMapSynapseFreqGraph[str(index)]:
tmp = g.createVerticalHeatMap(heatmap)
heatMapsZ.append(tmp[0])
heatMapsZLabel.append(tmp[1])
return {'data': [
go.Heatmap(
zmin=0,
zmax=100,
z=np.array(heatMapsZ).flatten(),
x=[x for x in list(xAxis)
for _ in range(0, 10)],
y=[0.1*i for i in range(10)] * len(xAxis),
text=np.array(heatMapsZLabel).flatten(),
hovertemplate="%{text}",
showscale=False,
zsmooth="fast",
colorscale='jet'),
go.Scatter(
x=list(xAxis),
name='',
y=list(yAxisList[str(index)]),
mode='lines+markers',
text=list(xAxisLabel),
hovertemplate="%{text} <b><br>%{y}</b>",
marker_color='rgb(120, 255, 120)'
)],
'layout': go.Layout(
margin={'l': 60, 'r': 0, 't': 0, 'b': 40},
uirevision='no reset of zoom',
yaxis={'title': 'Mean Synapses Potential',
'range': [0, 1]},
paper_bgcolor= "rgba(255, 255, 255,0)",
plot_bgcolor= "rgba(255, 255, 255,0)",
xaxis={'title':'Step'}
)}
else:
if str(index) not in yAxisList:
yAxisList[str(index)] = deque(maxlen=100)
HeatMapSynapseFreqGraph[str(index)] = deque(maxlen=100)
heatMapsZ = list()
for heatmap in HeatMapSynapseFreqGraph[str(index)]:
heatMapsZ.append(g.createVerticalHeatMap(heatmap))
return {'data': [
go.Heatmap(
zmin=0,
zmax=100,
z=np.array(heatMapsZ).flatten(),
x=[x for x in list(xAxis)
for _ in range(0, 10)],
y=[0.1*i for i in range(10)] * len(xAxis),
hoverinfo="z",
showscale=False,
zsmooth="fast",
colorscale='jet'),
go.Scatter(
x=list(xAxis),
name='',
y=list(yAxisList[str(index)]),
mode='lines+markers',
text=list(xAxisLabel),
hovertemplate="%{text} <b><br>%{y}</b>",
marker_color='rgb(120, 255, 120)'
)],
'layout': go.Layout(
margin={'l': 50, 'r': 0, 't': 5, 'b': 40},
uirevision='no reset of zoom',
yaxis={'title': 'Mean Synapses Potential',
'range': [0, 1]},
paper_bgcolor= "rgba(255, 255, 255,0)",
plot_bgcolor= "rgba(255, 255, 255,0)",
xaxis={'title': 'Step'}
)}
except Exception:
print("synapseFreqDrawGraph: "+ traceback.format_exc())
def heatMap(self, g, index, data, heatmaps, isOn, heatmapEditInfo):
""" Create heatmap for selected neuron synapses.
Args:
g (Global_Var): reference to access global variables
index (int): index of the new added area
data (list): data to be added to the graph
heatmaps (list): list of heatmaps created
isOn (bool): whether this graph is active or not
heatmapEditInfo (Array): heatmap edit stats
Returns:
heatmap content (data and layout)
"""
try:
heatMapWithIndexs = []
layout = go.Layout(
margin={'l': 0, 'r': 0, 't': 0, 'b': 25},
uirevision='no reset of zoom',
xaxis={'showticklabels': False,"ticks":"inside","autorange": "reversed"} if heatmapEditInfo[0] else {'showticklabels': False,"ticks":"inside"},
yaxis={"autorange": "reversed"} if heatmapEditInfo[1] else {},
width=400, height=300,
paper_bgcolor= "rgba(255, 255, 255,0)",
plot_bgcolor= "rgba(255, 255, 255,0)")
if isOn:
if not data.empty:
heatMapX = data["x"].max() + 1
heatMapY = data["y"].max() + 1
# update heatmap in heatmaps
if (str(index) not in heatmaps) or (type(heatmaps[str(index)]) == type(None)):
heatmaps[str(index)] = [np.zeros((heatMapX,heatMapY)),heatMapX,heatMapY]
depth = data["C"].max()
data = data.sort_values(["T"])
data = data.groupby(["C","x","y"], as_index=False).agg({"To":"last","T":"max","L":"last","V":"last"})
for X,Y,V,C in zip(data["x"],data["y"],data["V"],data["C"]):
try:
# TODO: multi-D
heatmaps[str(index)][0][X][Y] = V
heatMapWithIndexs.append([X, Y, C, V])
except Exception:
print("heatMap1: "+ traceback.format_exc())
return {'data': [
go.Heatmap(
z=g.createHeatMap(heatMapX, heatMapY, pd.DataFrame(heatMapWithIndexs), depth, heatmapEditInfo[2]),
zmin=0,
zmax=1,
colorscale="jet")],
'layout': layout}
else:
if str(index) not in heatmaps:
heatmaps[str(index)] = None
return {'data': [
go.Heatmap(
zmin=0,
zmax=1,
z=heatmaps[str(index)],
colorscale="jet")],
'layout': layout}
else:
return {'data': [
go.Heatmap(
zmin=0,
zmax=1,
z=heatmaps[str(index)][0] if str(
index) in heatmaps else [],
colorscale="jet")],
'layout': layout}
except Exception:
print("heatMap2: "+ traceback.format_exc())
# ------------------------------------------------------------
# MongoDB operations
# ------------------------------------------------------------
def getSynapseWeights(self, timestamp, g, neuronInfo):
""" Get synapse weights in a given interval.
Args:
timestamp (int): timestamp value
g (Global_Var): reference to access global variables
neuronInfo (list): selected neuron information
Returns:
synapse weights
"""
neuron = [int(neuronInfo["neuron"])]
layer = [neuronInfo["layer"]]
# MongoDB---------------------
col = pymongo.collection.Collection(g.db, 'synapseWeight')
SynapseWeight = col.aggregate([
{"$match": {"$and": [
{"L": {'$in': layer}},
{"To": {'$in': neuron}},
{"T": {'$gt': timestamp, '$lte': (timestamp+g.updateInterval)}} ]}
}])
# ToDF----------------------
SynapseWeight = pd.DataFrame(list(SynapseWeight))
if(not SynapseWeight.empty):
SynapseWeight["x"] = SynapseWeight["index"].map(lambda i: i["x"])
SynapseWeight["y"] = SynapseWeight["index"].map(lambda i: i["y"])
SynapseWeight.drop(columns=["index"])
# ----------------------------
return SynapseWeight
def getGlobalSynapseWeights(self, g, layer):
""" Get final synapses weights.
Args:
g (Global_Var): reference to access global variables
layer (String): selected layer
Returns:
final synapses weights
"""
# MongoDB---------------------
col = pymongo.collection.Collection(g.db, 'synapseWeightFinal')
globalSynapseWeights = col.aggregate([{"$match": { "_id.L": {"$eq": layer}}}])
# ToDF----------------------
globalSynapseWeights = pd.DataFrame(list(globalSynapseWeights))
if(not globalSynapseWeights.empty):
globalSynapseWeights["To"] = globalSynapseWeights["_id"].map(lambda i: i["To"])
globalSynapseWeights["C"] = globalSynapseWeights["_id"].map(lambda i: i["C"])
globalSynapseWeights["x"] = globalSynapseWeights["_id"].map(lambda i: i["index"]["x"])
globalSynapseWeights["y"] = globalSynapseWeights["_id"].map(lambda i: i["index"]["y"])
globalSynapseWeights["L"] = globalSynapseWeights["_id"].map(lambda i: i["L"])
globalSynapseWeights.drop(columns=["_id"])
# ----------------------------
return globalSynapseWeights
except Exception:
print("Data process loading: "+ traceback.format_exc())