Skip to content
Snippets Groups Projects
Select Git revision
  • 1d7f20b82db4a29d5447f411603b0078726b4c39
  • master default protected
2 results

spark.py

Blame
  • spark.py 2.59 KiB
    """ Spark pre-processing operations.
    """
    
    import pandas as pd
    import pymongo
    import traceback
    from pyspark.sql import functions as F
    from src.templates.sparkOp import sparkOp
    
    def init(g):
        spark(g)
        
    class spark(sparkOp):
    
        def __init__(self, g):
            """ 
            Args:
                g (Global_Var): reference to access global variables
            """
            module = "Synapse"
            document = "synapseWeight"
            output_document = "synapseWeightFinal"
            super().__init__(g, module, document, output_document)
    
        # Spark operations and preprocessing----------------------------------
    
        def preProcessing(self):
            """ Apache Spark pre-processing.
            """
            try:
                if (not (self.OUTPUT_DOCUMENT_NAME in self.g.db.list_collection_names())) and (self.DOCUMENT_NAME in self.g.db.list_collection_names()):
    
                    # Spark setup---------------------------------------
                    if self.g.sparkSession == None:
                        self.g.createSparkSession()
                    # --------------------------------------------------
                    col = pymongo.collection.Collection(self.g.db, self.DOCUMENT_NAME)
                    globalSynapseWeights = col.aggregate([{ "$sort": { "T": 1 } },{"$group" : { "_id" : {"To":'$To', "C":'$C', "index":'$index', "L":'$L'}, "T" : { "$last": '$T'},"V" : { "$last": '$V'} } }])
                 
                    # Data save into MongoDB ---------------------------------
                    col = pymongo.collection.Collection(self.g.db, self.OUTPUT_DOCUMENT_NAME)
                    globalSynapseWeights = pd.DataFrame(list(globalSynapseWeights))
                    col.insert_many(globalSynapseWeights.to_dict('records'))
    
                    # Indexes creation ---------------------------------------
    
                    print("Indexes creation (please wait...)")
    
                    col.create_index([("_id.L", 1)])
                    col.create_index([("_id", 1)])
                    col.create_index([("_id.To", 1),("_id.C", 1)])
                    col.create_index([("T", 1)])
                    col.create_index([("_id.index", 1)])
                    col.create_index([("V", 1)])
    
                    # --------------------------------------------------------
                else:
                    if(not self.DOCUMENT_NAME in self.g.db.list_collection_names()):
                        print(self.DOCUMENT_NAME, "not found")
                        self.g.modules = [module for module in self.g.modules if module != self.MODULE_NAME]
                print("done", self.MODULE_NAME)
    
            except Exception as e:
                print("Error:" + str(e))
                traceback.print_exc()
                pass