From d2b31f7c78b31a0d80364a3e4d9860b6c5d3d2de Mon Sep 17 00:00:00 2001
From: Elbez Hammouda <hammoudae@gmail.com>
Date: Fri, 18 Jun 2021 14:34:12 +0200
Subject: [PATCH] Updated Spark operations

---
 src/Global_Var.py                | 34 ++++++++---------
 src/Modules/Synapse/callbacks.py | 65 +++++++++++++++++++-------------
 src/Modules/Synapse/layout.py    |  2 +-
 src/Modules/Synapse/spark.py     |  8 +++-
 4 files changed, 61 insertions(+), 48 deletions(-)

diff --git a/src/Global_Var.py b/src/Global_Var.py
index 66ae3c3..d3202bd 100755
--- a/src/Global_Var.py
+++ b/src/Global_Var.py
@@ -43,19 +43,19 @@ class Global_Var():
     def __init__(self):
         """ Initialize all the variables used in analysis
         """
-        stepMax = 0
-        Max = 0
-
-        LayersNeuronsInfo = []
-        Layer_Neuron = None
-        NeuronsNbr = 0
-        LayersNbr = 0
-        Dataset = ""
-        Input = 0
-        Date = ""
-        Accuracy = "0"
-        Labels = None
-        oldIdSpike = None
+        self.stepMax = 0
+        self.Max = 0
+
+        self.LayersNeuronsInfo = []
+        self.Layer_Neuron = None
+        self.NeuronsNbr = 0
+        self.LayersNbr = 0
+        self.Dataset = ""
+        self.Input = 0
+        self.Date = ""
+        self.Accuracy = "0"
+        self.Labels = None
+        self.oldIdSpike = None
 
     # MongoDB connection ---------------------------------------------
 
@@ -139,11 +139,6 @@ class Global_Var():
             col.create_index([("To", 1)])
             print("Synapses index done")
 
-        # Check if indexing took a lot of time, we reboot MongoDB (to free memory)
-        #if (int(round(time.time() * 1000))-millis) > 10000:
-        #    print("Restarting Mongo")
-        #    os.system('sudo service mongod restart')
-
     def getLabelTime(self, step, value):
         """ Return a string that represent time .
 
@@ -192,6 +187,7 @@ class Global_Var():
         heatmap[:] = None
         for d in data:
             heatmap[d[0]][d[1]] = d[2]
+
         return heatmap
 
     def createVerticalHeatMap(self, data):
@@ -256,7 +252,7 @@ class Global_Var():
             conf.setMaster('local[*]')
 
             conf.setAppName(self.name)
-            conf.set("spark.executor.instances", "4")
+            conf.set("spark.executor.instances", "8")
             conf.set("spark.executor.memory", "8g")
             conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")
 
diff --git a/src/Modules/Synapse/callbacks.py b/src/Modules/Synapse/callbacks.py
index e0b0249..befa527 100644
--- a/src/Modules/Synapse/callbacks.py
+++ b/src/Modules/Synapse/callbacks.py
@@ -3,6 +3,7 @@
     Dash callbacks are the responsible on updating graphs each step.
 """
 
+import time
 from dash.dependencies import Input, Output, State, MATCH, ALL
 import dash
 import pymongo
@@ -86,7 +87,7 @@ class callbacks():
                                     data = self.getGlobalSynapseWeights(g,selectedLayer)
                                     super.globalHeatMap[selectedLayer] = data
 
-                            return [graphsArea, self.AddGlobalHeatmap(g, selectedLayer, data, HeatMapX, HeatMapY) if data != [] else []]
+                            return [graphsArea, self.AddGlobalHeatmap(g, selectedLayer, data, HeatMapX, HeatMapY) if not data.empty else []]
                     else:
                         if(context[0]['prop_id'] != "."):
                             # Delete item
@@ -347,26 +348,29 @@ class callbacks():
             Args:
                 g (Global_Var): reference to access global variables
                 selectedLayer (String): layer name
-                data (list): synapses weights of all neurons in the layer
+                data (dataframe): synapses weights of all neurons in the layer
                 heatMapX (int): X dimension for heatmaps
                 heatMapY (int): Y dimension for heatmaps
 
             Returns:
                 html component that contains the global heatmaps
             """
-            df = pd.DataFrame(data)
-            dfTo = [i["To"] for i in df["_id"]]
-            dfC = [i["C"] for i in df["_id"]]
+            
+            heatMapX = list(data["index"].max())[0] + 1
+            heatMapY = list(data["index"].max())[1] + 1
 
-            df["To"] = dfTo
-            df["C"] = dfC
+            dfTo = [i["To"] for i in data["_id"]]
+            dfC = [i["C"] for i in data["_id"]]
 
-            df = df.drop("_id", 1)
-            df = df.sort_values(["To", "C"])
-            df = df[["V", "index", "To"]]
+            data["To"] = dfTo
+            data["C"] = dfC
 
-            df["data"] = [[i["x"], i["y"], v] for i, v in zip(df["index"], df["V"])]
-            df = df[["To", "data"]]
+            data = data.drop("_id", 1)
+            data = data.sort_values(["To", "C"])
+            data = data[["V", "index", "To"]]
+
+            data["data"] = [[i["x"], i["y"], v] for i, v in zip(data["index"], data["V"])]
+            data = data[["To", "data"]]
 
             for i in g.LayersNeuronsInfo:
                 if(i["layer"] == selectedLayer):
@@ -386,17 +390,18 @@ class callbacks():
                                 zmin=0,
                                 zmax=1,
                                 z=g.createHeatMap(
-                                    heatMapX, heatMapY, df[df.To == index]["data"].to_numpy()),
+                                    heatMapX, heatMapY, data[data.To == index]["data"].to_numpy()),
                                 colorscale='jet',
                                 name=str(index)),
                             row=xx, col=yy)
                         fig.update_yaxes(autorange="reversed", row=xx, col=yy)
+                        fig.update_layout(height=80, width=80)
                         index += 1
 
-            fig.update_layout(height=800, width=800, title_text=selectedLayer)
+            fig.update_layout(height=1200, width=1200, title_text=selectedLayer)
             fig.update_traces(showscale=False)
 
-            return html.Div([dcc.Graph(id="GlobalHeatmap", figure=fig, config={"displaylogo": False})])
+            return html.Div([dcc.Graph(id="GlobalHeatmap", figure=fig, config={"displaylogo": False}, style={'overflowY': 'scroll', 'height': 500})])
 
         def synapseFreqDrawGraph(self, g, index, data, xAxis, xAxisLabel, yAxisList, HeatMapSynapseFreqGraph, isOn):
             """ Create scatter plot with vertical heatmap for selected neuron synapses.
@@ -574,6 +579,11 @@ class callbacks():
                 heatmap content (data and layout)
             """
             try:
+                #if(data != []):
+                #    print("*")
+                #    heatMapX = list(data["index"].max())[0] + 1
+                #    heatMapY = list(data["index"].max())[1] + 1
+
                 heatMapWithIndexs = []
                 layout = go.Layout(
                             margin={'l': 0, 'r': 0, 't': 0, 'b': 25},
@@ -654,12 +664,12 @@ class callbacks():
 
             # MongoDB---------------------
             col = pymongo.collection.Collection(g.db, 'synapseWeight')
+
             SynapseWeight = col.aggregate([
                 {"$match": {"$and": [
-                    {"T": {'$gt': timestamp, '$lte': (timestamp+g.updateInterval)}},
                     {"L": {'$in': layer}},
-                    {"To": {'$in': neuron}}
-                                    ]}
+                    {"To": {'$in': neuron}},
+                    {"T": {'$gt': timestamp, '$lte': (timestamp+g.updateInterval)}} ]}
                 }])
 
             # ToJson----------------------
@@ -682,18 +692,19 @@ class callbacks():
                 final synapses weights
             """
             # MongoDB---------------------
-            col = pymongo.collection.Collection(g.db, 'synapseWeightFinal')
+            #col = pymongo.collection.Collection(g.db, 'synapseWeightFinal')
+            
+
+            df = g.sparkSession.read.format("com.mongodb.spark.sql") \
+                .option("spark.mongodb.input.uri", g.MONGODBURL + g.name + ".synapseWeightFinal?authSource=admin&readPreference=primaryPreferred") \
+                .option("pipeline","[{$match: { L: {$eq: '"+layer+"'}}}]")
 
-            globalSynapseWeights = col.aggregate([{"$match": {"_id.L": {'$in': [layer]}}},
-                                                    {"$group": {
-                                                        "_id": {"To": "$To", "C": "$C"},
-                                                        "T": {"$last": "$T"},
-                                                        "index": {"$last": "$index"},
-                                                        "V": {"$last": "$V"}}
-                                                    }], allowDiskUse=True)
+            df = df.load()
+            
+            #globalSynapseWeights = col.aggregate([{"$match": {"L": {'$eq': layer}}}],allowDiskUse = True)
 
             # ToJson----------------------
-            globalSynapseWeights = loads(dumps(globalSynapseWeights))
+            globalSynapseWeights = df.toPandas()
             # ----------------------------
 
             return globalSynapseWeights
diff --git a/src/Modules/Synapse/layout.py b/src/Modules/Synapse/layout.py
index 88e77ad..df175ad 100644
--- a/src/Modules/Synapse/layout.py
+++ b/src/Modules/Synapse/layout.py
@@ -83,7 +83,7 @@ class layout():
                         ),style={"padding":"0px",}),
                         dbc.Collapse( dbc.CardBody([
                         html.Div(id={'type': "GlobalHeatMapAreaSynapse"}, children=[], 
-                        style={"textAlign": "-webkit-center", "paddingTop": "10px"})]),
+                        style={"textAlign": "-webkit-center"})]),
                         id=f"collapse-GlobalHeatMapAreaSynapse")])])),
                         html.Div(id={'type': "GraphsAreaSynapse"}, children=[], 
                         style={"textAlign": "-webkit-center", "paddingTop": "10px"})]),
diff --git a/src/Modules/Synapse/spark.py b/src/Modules/Synapse/spark.py
index 2e3af93..3118a9f 100644
--- a/src/Modules/Synapse/spark.py
+++ b/src/Modules/Synapse/spark.py
@@ -37,7 +37,7 @@ def preProcessing(g):
             # --------------------------------------------------
             df = g.sparkSession.read.format("com.mongodb.spark.sql") \
                 .option("spark.mongodb.input.uri", MONGODB_URL + g.name + "."+DOCUMENT_NAME+"?authSource=admin&readPreference=primaryPreferred") \
-                .option("pipeline", "[{ $sort: { T: 1 } },{$group : { _id : {To:'$To', C:'$C', L:'$L'}, T : { $last: '$T'},V : { $last: '$V'},index : { $last: '$index'} } }]")
+                .option("pipeline", "[{ $sort: { T: 1 } },{$group : { _id : {To:'$To', C:'$C'}, L : { $last: '$L'}, T : { $last: '$T'},V : { $last: '$V'},index : { $last: '$index'} } }]")
 
             df = df.load()
           
@@ -53,6 +53,12 @@ def preProcessing(g):
 
             col = pymongo.collection.Collection(g.db, OUTPUT_DOCUMENT_NAME_FINAL)
             col.create_index([("_id.L", 1)])
+            col.create_index([("_id", 1)])
+            col.create_index([("_id.To", 1),("_id.C", 1)])
+            col.create_index([("T", 1)])
+            col.create_index([("index", 1)])
+            col.create_index([("V", 1)])
+            
 
             # --------------------------------------------------------
         else:
-- 
GitLab