Skip to content
Snippets Groups Projects
Commit d2b31f7c authored by Elbez Hammouda's avatar Elbez Hammouda
Browse files

Updated Spark operations

parent 3d7a1da0
Branches
No related tags found
1 merge request!22V0.351
...@@ -43,19 +43,19 @@ class Global_Var(): ...@@ -43,19 +43,19 @@ class Global_Var():
def __init__(self): def __init__(self):
""" Initialize all the variables used in analysis """ Initialize all the variables used in analysis
""" """
stepMax = 0 self.stepMax = 0
Max = 0 self.Max = 0
LayersNeuronsInfo = [] self.LayersNeuronsInfo = []
Layer_Neuron = None self.Layer_Neuron = None
NeuronsNbr = 0 self.NeuronsNbr = 0
LayersNbr = 0 self.LayersNbr = 0
Dataset = "" self.Dataset = ""
Input = 0 self.Input = 0
Date = "" self.Date = ""
Accuracy = "0" self.Accuracy = "0"
Labels = None self.Labels = None
oldIdSpike = None self.oldIdSpike = None
# MongoDB connection --------------------------------------------- # MongoDB connection ---------------------------------------------
...@@ -139,11 +139,6 @@ class Global_Var(): ...@@ -139,11 +139,6 @@ class Global_Var():
col.create_index([("To", 1)]) col.create_index([("To", 1)])
print("Synapses index done") print("Synapses index done")
# Check if indexing took a lot of time, we reboot MongoDB (to free memory)
#if (int(round(time.time() * 1000))-millis) > 10000:
# print("Restarting Mongo")
# os.system('sudo service mongod restart')
def getLabelTime(self, step, value): def getLabelTime(self, step, value):
""" Return a string that represent time . """ Return a string that represent time .
...@@ -192,6 +187,7 @@ class Global_Var(): ...@@ -192,6 +187,7 @@ class Global_Var():
heatmap[:] = None heatmap[:] = None
for d in data: for d in data:
heatmap[d[0]][d[1]] = d[2] heatmap[d[0]][d[1]] = d[2]
return heatmap return heatmap
def createVerticalHeatMap(self, data): def createVerticalHeatMap(self, data):
...@@ -256,7 +252,7 @@ class Global_Var(): ...@@ -256,7 +252,7 @@ class Global_Var():
conf.setMaster('local[*]') conf.setMaster('local[*]')
conf.setAppName(self.name) conf.setAppName(self.name)
conf.set("spark.executor.instances", "4") conf.set("spark.executor.instances", "8")
conf.set("spark.executor.memory", "8g") conf.set("spark.executor.memory", "8g")
conf.set("spark.sql.execution.arrow.pyspark.enabled", "true") conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")
......
...@@ -3,6 +3,7 @@ ...@@ -3,6 +3,7 @@
Dash callbacks are the responsible on updating graphs each step. Dash callbacks are the responsible on updating graphs each step.
""" """
import time
from dash.dependencies import Input, Output, State, MATCH, ALL from dash.dependencies import Input, Output, State, MATCH, ALL
import dash import dash
import pymongo import pymongo
...@@ -86,7 +87,7 @@ class callbacks(): ...@@ -86,7 +87,7 @@ class callbacks():
data = self.getGlobalSynapseWeights(g,selectedLayer) data = self.getGlobalSynapseWeights(g,selectedLayer)
super.globalHeatMap[selectedLayer] = data super.globalHeatMap[selectedLayer] = data
return [graphsArea, self.AddGlobalHeatmap(g, selectedLayer, data, HeatMapX, HeatMapY) if data != [] else []] return [graphsArea, self.AddGlobalHeatmap(g, selectedLayer, data, HeatMapX, HeatMapY) if not data.empty else []]
else: else:
if(context[0]['prop_id'] != "."): if(context[0]['prop_id'] != "."):
# Delete item # Delete item
...@@ -347,26 +348,29 @@ class callbacks(): ...@@ -347,26 +348,29 @@ class callbacks():
Args: Args:
g (Global_Var): reference to access global variables g (Global_Var): reference to access global variables
selectedLayer (String): layer name selectedLayer (String): layer name
data (list): synapses weights of all neurons in the layer data (dataframe): synapses weights of all neurons in the layer
heatMapX (int): X dimension for heatmaps heatMapX (int): X dimension for heatmaps
heatMapY (int): Y dimension for heatmaps heatMapY (int): Y dimension for heatmaps
Returns: Returns:
html component that contains the global heatmaps html component that contains the global heatmaps
""" """
df = pd.DataFrame(data)
dfTo = [i["To"] for i in df["_id"]]
dfC = [i["C"] for i in df["_id"]]
df["To"] = dfTo heatMapX = list(data["index"].max())[0] + 1
df["C"] = dfC heatMapY = list(data["index"].max())[1] + 1
df = df.drop("_id", 1) dfTo = [i["To"] for i in data["_id"]]
df = df.sort_values(["To", "C"]) dfC = [i["C"] for i in data["_id"]]
df = df[["V", "index", "To"]]
df["data"] = [[i["x"], i["y"], v] for i, v in zip(df["index"], df["V"])] data["To"] = dfTo
df = df[["To", "data"]] data["C"] = dfC
data = data.drop("_id", 1)
data = data.sort_values(["To", "C"])
data = data[["V", "index", "To"]]
data["data"] = [[i["x"], i["y"], v] for i, v in zip(data["index"], data["V"])]
data = data[["To", "data"]]
for i in g.LayersNeuronsInfo: for i in g.LayersNeuronsInfo:
if(i["layer"] == selectedLayer): if(i["layer"] == selectedLayer):
...@@ -386,17 +390,18 @@ class callbacks(): ...@@ -386,17 +390,18 @@ class callbacks():
zmin=0, zmin=0,
zmax=1, zmax=1,
z=g.createHeatMap( z=g.createHeatMap(
heatMapX, heatMapY, df[df.To == index]["data"].to_numpy()), heatMapX, heatMapY, data[data.To == index]["data"].to_numpy()),
colorscale='jet', colorscale='jet',
name=str(index)), name=str(index)),
row=xx, col=yy) row=xx, col=yy)
fig.update_yaxes(autorange="reversed", row=xx, col=yy) fig.update_yaxes(autorange="reversed", row=xx, col=yy)
fig.update_layout(height=80, width=80)
index += 1 index += 1
fig.update_layout(height=800, width=800, title_text=selectedLayer) fig.update_layout(height=1200, width=1200, title_text=selectedLayer)
fig.update_traces(showscale=False) fig.update_traces(showscale=False)
return html.Div([dcc.Graph(id="GlobalHeatmap", figure=fig, config={"displaylogo": False})]) return html.Div([dcc.Graph(id="GlobalHeatmap", figure=fig, config={"displaylogo": False}, style={'overflowY': 'scroll', 'height': 500})])
def synapseFreqDrawGraph(self, g, index, data, xAxis, xAxisLabel, yAxisList, HeatMapSynapseFreqGraph, isOn): def synapseFreqDrawGraph(self, g, index, data, xAxis, xAxisLabel, yAxisList, HeatMapSynapseFreqGraph, isOn):
""" Create scatter plot with vertical heatmap for selected neuron synapses. """ Create scatter plot with vertical heatmap for selected neuron synapses.
...@@ -574,6 +579,11 @@ class callbacks(): ...@@ -574,6 +579,11 @@ class callbacks():
heatmap content (data and layout) heatmap content (data and layout)
""" """
try: try:
#if(data != []):
# print("*")
# heatMapX = list(data["index"].max())[0] + 1
# heatMapY = list(data["index"].max())[1] + 1
heatMapWithIndexs = [] heatMapWithIndexs = []
layout = go.Layout( layout = go.Layout(
margin={'l': 0, 'r': 0, 't': 0, 'b': 25}, margin={'l': 0, 'r': 0, 't': 0, 'b': 25},
...@@ -654,12 +664,12 @@ class callbacks(): ...@@ -654,12 +664,12 @@ class callbacks():
# MongoDB--------------------- # MongoDB---------------------
col = pymongo.collection.Collection(g.db, 'synapseWeight') col = pymongo.collection.Collection(g.db, 'synapseWeight')
SynapseWeight = col.aggregate([ SynapseWeight = col.aggregate([
{"$match": {"$and": [ {"$match": {"$and": [
{"T": {'$gt': timestamp, '$lte': (timestamp+g.updateInterval)}},
{"L": {'$in': layer}}, {"L": {'$in': layer}},
{"To": {'$in': neuron}} {"To": {'$in': neuron}},
]} {"T": {'$gt': timestamp, '$lte': (timestamp+g.updateInterval)}} ]}
}]) }])
# ToJson---------------------- # ToJson----------------------
...@@ -682,18 +692,19 @@ class callbacks(): ...@@ -682,18 +692,19 @@ class callbacks():
final synapses weights final synapses weights
""" """
# MongoDB--------------------- # MongoDB---------------------
col = pymongo.collection.Collection(g.db, 'synapseWeightFinal') #col = pymongo.collection.Collection(g.db, 'synapseWeightFinal')
df = g.sparkSession.read.format("com.mongodb.spark.sql") \
.option("spark.mongodb.input.uri", g.MONGODBURL + g.name + ".synapseWeightFinal?authSource=admin&readPreference=primaryPreferred") \
.option("pipeline","[{$match: { L: {$eq: '"+layer+"'}}}]")
df = df.load()
globalSynapseWeights = col.aggregate([{"$match": {"_id.L": {'$in': [layer]}}}, #globalSynapseWeights = col.aggregate([{"$match": {"L": {'$eq': layer}}}],allowDiskUse = True)
{"$group": {
"_id": {"To": "$To", "C": "$C"},
"T": {"$last": "$T"},
"index": {"$last": "$index"},
"V": {"$last": "$V"}}
}], allowDiskUse=True)
# ToJson---------------------- # ToJson----------------------
globalSynapseWeights = loads(dumps(globalSynapseWeights)) globalSynapseWeights = df.toPandas()
# ---------------------------- # ----------------------------
return globalSynapseWeights return globalSynapseWeights
......
...@@ -83,7 +83,7 @@ class layout(): ...@@ -83,7 +83,7 @@ class layout():
),style={"padding":"0px",}), ),style={"padding":"0px",}),
dbc.Collapse( dbc.CardBody([ dbc.Collapse( dbc.CardBody([
html.Div(id={'type': "GlobalHeatMapAreaSynapse"}, children=[], html.Div(id={'type': "GlobalHeatMapAreaSynapse"}, children=[],
style={"textAlign": "-webkit-center", "paddingTop": "10px"})]), style={"textAlign": "-webkit-center"})]),
id=f"collapse-GlobalHeatMapAreaSynapse")])])), id=f"collapse-GlobalHeatMapAreaSynapse")])])),
html.Div(id={'type': "GraphsAreaSynapse"}, children=[], html.Div(id={'type': "GraphsAreaSynapse"}, children=[],
style={"textAlign": "-webkit-center", "paddingTop": "10px"})]), style={"textAlign": "-webkit-center", "paddingTop": "10px"})]),
......
...@@ -37,7 +37,7 @@ def preProcessing(g): ...@@ -37,7 +37,7 @@ def preProcessing(g):
# -------------------------------------------------- # --------------------------------------------------
df = g.sparkSession.read.format("com.mongodb.spark.sql") \ df = g.sparkSession.read.format("com.mongodb.spark.sql") \
.option("spark.mongodb.input.uri", MONGODB_URL + g.name + "."+DOCUMENT_NAME+"?authSource=admin&readPreference=primaryPreferred") \ .option("spark.mongodb.input.uri", MONGODB_URL + g.name + "."+DOCUMENT_NAME+"?authSource=admin&readPreference=primaryPreferred") \
.option("pipeline", "[{ $sort: { T: 1 } },{$group : { _id : {To:'$To', C:'$C', L:'$L'}, T : { $last: '$T'},V : { $last: '$V'},index : { $last: '$index'} } }]") .option("pipeline", "[{ $sort: { T: 1 } },{$group : { _id : {To:'$To', C:'$C'}, L : { $last: '$L'}, T : { $last: '$T'},V : { $last: '$V'},index : { $last: '$index'} } }]")
df = df.load() df = df.load()
...@@ -53,6 +53,12 @@ def preProcessing(g): ...@@ -53,6 +53,12 @@ def preProcessing(g):
col = pymongo.collection.Collection(g.db, OUTPUT_DOCUMENT_NAME_FINAL) col = pymongo.collection.Collection(g.db, OUTPUT_DOCUMENT_NAME_FINAL)
col.create_index([("_id.L", 1)]) col.create_index([("_id.L", 1)])
col.create_index([("_id", 1)])
col.create_index([("_id.To", 1),("_id.C", 1)])
col.create_index([("T", 1)])
col.create_index([("index", 1)])
col.create_index([("V", 1)])
# -------------------------------------------------------- # --------------------------------------------------------
else: else:
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment