Select Git revision
makefile.dep
-
Paul Ripault authoredPaul Ripault authored
spark.py 2.59 KiB
""" Spark pre-processing operations.
"""
import pandas as pd
import pymongo
import traceback
from pyspark.sql import functions as F
from src.templates.sparkOp import sparkOp
def init(g):
spark(g)
class spark(sparkOp):
def __init__(self, g):
"""
Args:
g (Global_Var): reference to access global variables
"""
module = "Synapse"
document = "synapseWeight"
output_document = "synapseWeightFinal"
super().__init__(g, module, document, output_document)
# Spark operations and preprocessing----------------------------------
def preProcessing(self):
""" Apache Spark pre-processing.
"""
try:
if (not (self.OUTPUT_DOCUMENT_NAME in self.g.db.list_collection_names())) and (self.DOCUMENT_NAME in self.g.db.list_collection_names()):
# Spark setup---------------------------------------
if self.g.sparkSession == None:
self.g.createSparkSession()
# --------------------------------------------------
col = pymongo.collection.Collection(self.g.db, self.DOCUMENT_NAME)
globalSynapseWeights = col.aggregate([{ "$sort": { "T": 1 } },{"$group" : { "_id" : {"To":'$To', "C":'$C', "index":'$index', "L":'$L'}, "T" : { "$last": '$T'},"V" : { "$last": '$V'} } }])
# Data save into MongoDB ---------------------------------
col = pymongo.collection.Collection(self.g.db, self.OUTPUT_DOCUMENT_NAME)
globalSynapseWeights = pd.DataFrame(list(globalSynapseWeights))
col.insert_many(globalSynapseWeights.to_dict('records'))
# Indexes creation ---------------------------------------
print("Indexes creation (please wait...)")
col.create_index([("_id.L", 1)])
col.create_index([("_id", 1)])
col.create_index([("_id.To", 1),("_id.C", 1)])
col.create_index([("T", 1)])
col.create_index([("_id.index", 1)])
col.create_index([("V", 1)])
# --------------------------------------------------------
else:
if(not self.DOCUMENT_NAME in self.g.db.list_collection_names()):
print(self.DOCUMENT_NAME, "not found")
self.g.modules = [module for module in self.g.modules if module != self.MODULE_NAME]
print("done", self.MODULE_NAME)
except Exception as e:
print("Error:" + str(e))
traceback.print_exc()
pass