Select Git revision
Global_Var.py 9.19 KiB
""" This class contains global variables and functions.
"""
from pyspark.sql import SparkSession
from pymongo import MongoClient
from datetime import timedelta
from pyspark import SparkConf
from config import config
import numpy as np
import pymongo
import socket
import traceback
class Global_Var():
name = None
modules = []
modulesNbr = 0
allModules = []
config = config()
client = None
MONGODBURL = None
sparkSession = None
data_loaded = None
db = None
updateInterval = 1.0 # 1 second
stepMax = 0
Max = 0
nbrClasses = 0
# General network information ------------------------------------
LayersNeuronsInfo = []
Layer_Neuron = None
NeuronsNbr = 0
LayersNbr = 0
Dataset = ""
Input = 0
Date = ""
Accuracy = 0
finalLabels = None
labelsExistance = False
oldIdSpike = None
def __init__(self):
""" Initialize all the variables used in analysis
"""
self.stepMax = 0
self.Max = 0
self.LayersNeuronsInfo = []
self.NeuronsSize = {"x":0,"y":0}
self.Layer_Neuron = None
self.NeuronsNbr = 0
self.LayersNbr = 0
self.Dataset = ""
self.Input = 0
self.Date = ""
self.Accuracy = 0
self.finalLabels = None
self.labelsExistance = False
self.oldIdSpike = None
# MongoDB connection ---------------------------------------------
def mongoConnect(self):
""" Connect to MongoDB.
Returns:
int: returns 1 if successful otherwise 0
"""
try:
if(config.USERNAME == "" and config.PASSWORD == ""):
self.client = MongoClient(config.DATABASE_URL,
authSource='admin')
self.MONGODBURL = config().DATABASE_URL
else:
self.client = MongoClient(config.DATABASE_URL,
username=config.USERNAME,
password=config.PASSWORD,
authSource='admin',
authMechanism='SCRAM-SHA-1')
self.MONGODBURL = config().MONGODBURL
self.client.server_info() # will throw an exception if mongodb is not detected
self.db = self.client.list_database_names()
return 1
except Exception:
print("mongoConnect:" + traceback.format_exc())
return 0
def checkExistance(self, app, component):
""" Check if dash application exists or not .
Args:
app : instance of dash application
component (String): name of one component of the app
Returns:
Boolean: the app exist or not
"""
exist = False
for components in app.callback_map:
if component in components:
exist = True
return exist
def CreateIndexes(self, name):
"""Create more indexes for the collections
Args:
name (String): database name
"""
self.db = self.client[name]
if ('labels' in self.db.list_collection_names()):
col = pymongo.collection.Collection(self.db, 'labels')
col.create_index([("T", 1)])
print("Labels index done")
if ('spikes' in self.db.list_collection_names()):
col = pymongo.collection.Collection(self.db, 'spikes')
col.create_index([("T", 1)])
col.create_index([("i.L", 1)])
col.create_index([("i.N", 1)])
print("Spikes index done")
if ('potential' in self.db.list_collection_names()):
col = pymongo.collection.Collection(self.db, 'potential')
col.create_index([("T", 1)])
col.create_index([("L", 1)])
col.create_index([("N", 1)])
print("Potential index done")
if ('synapseWeight' in self.db.list_collection_names()):
col = pymongo.collection.Collection(self.db, 'synapseWeight')
col.create_index([("T", 1)])
col.create_index([("C", 1)])
col.create_index([("L", 1)])
col.create_index([("To", 1)])
print("Synapses index done")
def getLabelTime(self, step, value):
""" Return a string that represent time .
Args:
step (int): step value used in the app
value (int): slider value from the app
Returns:
String: time value
"""
label = str(timedelta(seconds=value*step))
if ("." in label):
label = label[:-3]
return label
def createShape(self, nbrNeurons):
""" Organize neurons in a square shape for 2D visualization .
Args:
nbrNeurons (int): number of neurons
Returns:
(int,int): shape 2D dimensions
"""
x = y = 1
while x*y < nbrNeurons:
if x != y:
x = y
else:
y = y + 1
return x, y
def createHeatMap(self, x, y, data, depth, rotation):
""" Create a heatmap from a set of synapses values .
Args:
x , y (int): shape 2D dimensions
data (Array): synapses values
rotation: if to rotate the heatmap 90°
Returns:
Array: Heatmap vector
"""
try:
heatmap = np.zeros((x, y))
heatmap[:] = -1
data = data.to_numpy()
if(depth == 0): # single depth
for d in data:
if rotation:
heatmap[int(d[0])][int(d[1])] = d[3]
else:
heatmap[int(d[1])][int(d[0])] = d[3]
else: # multiple dimensions
for d in data:
if(d[2] == 0):
#if(heatmap[int(d[0])][int(d[1])] == -1):
if rotation:
heatmap[int(d[0])][int(d[1])] = d[3]
else:
heatmap[int(d[1])][int(d[0])] = d[3]
#else:
# heatmap[int(d[0])][int(d[1])] = np.mean(heatmap[int(d[0])][int(d[1])]+d[3])
return heatmap
except Exception:
print("createHeatMap: "+ traceback.format_exc())
def createVerticalHeatMap(self, data):
""" Create a vertical heat map from a given data array .
Args:
data (Array): Synapses weights array
Returns:
Array: Vertical heatmap values
"""
z = data
heatMapsZ = [0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
if(z != 0):
for Z in z:
if Z < 0.1:
heatMapsZ[0] += 1
elif Z < 0.2:
heatMapsZ[1] += 1
elif Z < 0.3:
heatMapsZ[2] += 1
elif Z < 0.4:
heatMapsZ[3] += 1
elif Z < 0.5:
heatMapsZ[4] += 1
elif Z < 0.6:
heatMapsZ[5] += 1
elif Z < 0.7:
heatMapsZ[6] += 1
elif Z < 0.8:
heatMapsZ[7] += 1
elif Z < 0.9:
heatMapsZ[8] += 1
else:
heatMapsZ[9] += 1
heatMapsZLabel = heatMapsZ
maxHeatMaps = max(heatMapsZ)
heatMapsZ = [(v * 100)/maxHeatMaps for v in heatMapsZ]
return [heatMapsZ, heatMapsZLabel]
else:
return [heatMapsZ,heatMapsZ]
def norm(data, Max):
""" Normalize data to be between 0 and 1 .
Args:
data (Array): Array of data
Max (float): Max value in the array
Returns:
Array: Normalized data
"""
return (data * 100)/Max if Max != 0 else data
def createSparkSession(self):
"""Create a Spark session .
"""
try:
conf = SparkConf()
conf.setMaster('local[*]')
conf.setAppName(self.name)
conf.set("spark.executor.instances", "8")
conf.set("spark.executor.memory", "16g")
conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")
self.sparkSession = SparkSession.builder.config(conf=conf) \
.config('spark.jars.packages', 'org.mongodb.spark:mongo-spark-connector_2.12:2.4.4') \
.getOrCreate()
if not self.config.DEBUG:
self.sparkSession.sparkContext.setLogLevel("Error")
return self.sparkSession.version
except Exception:
print("createSparkSession:" + traceback.format_exc())
return ""
def testPort(self, port):
"""Test if a given port is currently in use .
Args:
port (int): Port number
Returns:
Boolean: If the port is used or not
"""
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
result = False
try:
sock.bind(("0.0.0.0", port))
result = True
except:
print("Port is in use")
sock.close()
return result