Skip to content
Snippets Groups Projects
Commit 1d7f20b8 authored by Hammouda Elbez's avatar Hammouda Elbez :computer:
Browse files

Synapse module updated

parent 7f592df3
No related branches found
No related tags found
1 merge request!25VS2N 0.36
...@@ -69,8 +69,6 @@ class layout(layoutOp): ...@@ -69,8 +69,6 @@ class layout(layoutOp):
options=[{'label': str(i), 'value': str(i)} for i in ( options=[{'label': str(i), 'value': str(i)} for i in (
i for i in self.g.Layer_Neuron if i != "Input")], i for i in self.g.Layer_Neuron if i != "Input")],
multi=False, multi=False,
value=[{'label': str(i), 'value': str(i)} for i in (
i for i in self.g.Layer_Neuron if i != "Input")][0]["value"],
style={'width': '150px', "marginLeft": "10px", "textAlign": "start"}), style={'width': '150px', "marginLeft": "10px", "textAlign": "start"}),
dcc.Dropdown( dcc.Dropdown(
id='NeuronFilterNeuron', id='NeuronFilterNeuron',
......
""" Spark pre-processing operations. """ Spark pre-processing operations.
""" """
import pandas as pd
import pymongo import pymongo
import traceback import traceback
from pyspark.sql import functions as F from pyspark.sql import functions as F
...@@ -33,23 +34,18 @@ class spark(sparkOp): ...@@ -33,23 +34,18 @@ class spark(sparkOp):
if self.g.sparkSession == None: if self.g.sparkSession == None:
self.g.createSparkSession() self.g.createSparkSession()
# -------------------------------------------------- # --------------------------------------------------
df = self.g.sparkSession.read.format("com.mongodb.spark.sql") \ col = pymongo.collection.Collection(self.g.db, self.DOCUMENT_NAME)
.option("spark.mongodb.input.uri", self.MONGODBURL + self.g.name + "."+self.DOCUMENT_NAME+"?authSource=admin&readPreference=primaryPreferred") \ globalSynapseWeights = col.aggregate([{ "$sort": { "T": 1 } },{"$group" : { "_id" : {"To":'$To', "C":'$C', "index":'$index', "L":'$L'}, "T" : { "$last": '$T'},"V" : { "$last": '$V'} } }])
.option("pipeline", "[{ $sort: { T: 1 } },{$group : { _id : {To:'$To', C:'$C', index:'$index', L:'$L'}, T : { $last: '$T'},V : { $last: '$V'} } }]")
df = df.load()
# Data save into MongoDB --------------------------------- # Data save into MongoDB ---------------------------------
col = pymongo.collection.Collection(self.g.db, self.OUTPUT_DOCUMENT_NAME)
df.write.format("com.mongodb.spark.sql.DefaultSource") \ globalSynapseWeights = pd.DataFrame(list(globalSynapseWeights))
.option("spark.mongodb.output.uri", col.insert_many(globalSynapseWeights.to_dict('records'))
self.MONGODBURL + self.g.name + "."+self.OUTPUT_DOCUMENT_NAME+"?authSource=admin&readPreference=primaryPreferred").mode('append').save()
# Indexes creation --------------------------------------- # Indexes creation ---------------------------------------
print("Indexes creation (please wait...)") print("Indexes creation (please wait...)")
col = pymongo.collection.Collection(self.g.db, self.OUTPUT_DOCUMENT_NAME)
col.create_index([("_id.L", 1)]) col.create_index([("_id.L", 1)])
col.create_index([("_id", 1)]) col.create_index([("_id", 1)])
col.create_index([("_id.To", 1),("_id.C", 1)]) col.create_index([("_id.To", 1),("_id.C", 1)])
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment