Skip to content
Snippets Groups Projects
Commit 3485e378 authored by Pierre Falez's avatar Pierre Falez
Browse files

Init commit

parents
Branches
No related tags found
No related merge requests found
Showing
with 1466 additions and 0 deletions
/************************************************************************************************************
* Contributors:
* - created by Pierre Falez on 10/05/16.
***********************************************************************************************************/
package fr.univ_lille.cristal.emeraude.n2s3.core
import fr.univ_lille.cristal.emeraude.n2s3.support.GlobalTypesAlias.Timestamp
import fr.univ_lille.cristal.emeraude.n2s3.support.actors.Message
object GetGlobalTimestamp extends Message
class GlobalTimestamp {
var timestamp : Timestamp = 0
}
trait TimestampGenerator {
var currentMaxTimestamp : Timestamp = 0
def getGlobalTimestamp : GlobalTimestamp
def toGlobalTimestamp(rawTimestamp : Timestamp) : Timestamp = {
currentMaxTimestamp = math.max(rawTimestamp, currentMaxTimestamp)
getGlobalTimestamp.timestamp+rawTimestamp
}
def endOfStage() : Unit = {
getGlobalTimestamp.timestamp += currentMaxTimestamp
currentMaxTimestamp = 0
}
}
\ No newline at end of file
package fr.univ_lille.cristal.emeraude.n2s3.core.actors
import fr.univ_lille.cristal.emeraude.n2s3.core.Neuron.NeuronMessage
import fr.univ_lille.cristal.emeraude.n2s3.core.Synchronizer.{AskInput, CleanQueue, ProcessUntil, ResponseInput}
import fr.univ_lille.cristal.emeraude.n2s3.core._
import fr.univ_lille.cristal.emeraude.n2s3.core.event.{EventHolder, EventHolderMessage, LabelChangeEvent, LabelChangeResponse}
import fr.univ_lille.cristal.emeraude.n2s3.features.io.input._
import fr.univ_lille.cristal.emeraude.n2s3.support.actors.Message
/**
* Specialization of the NetworkContainer for the input layer
*/
class InputLayer(stream: StreamSupport[_ <: InputPacket, N2S3InputPacket]) extends NetworkContainer with EventHolder {
var lastDisplay : Long = 0L
val displayInterval : Long = 10000000000L
stream.setContainer(this)
/********************************************************************************************************************
* Declaration of events
******************************************************************************************************************/
addEvent(LabelChangeEvent)
override def receiveMessage(message: Message, sender: NetworkEntityReference): Unit = message match {
case AskInput =>
if(System.nanoTime() >= lastDisplay+displayInterval) {
println(stream)
lastDisplay = System.nanoTime()
}
if (!stream.atEnd()) {
val input = stream.next()
val dataList = input.getAllData
val metaDataList = input.getMetaData
if (dataList.isEmpty && metaDataList.isEmpty) {
println("Input returned no more data or metadata")
sender.send(CleanQueue)
sender.send(ResponseInput)
}
else {
dataList.foreach { case(index, l) =>
val target = childEntityAt(stream.shape.toIndex(index:_*))
l.foreach{
case spike : N2S3InputSpike =>
target.sendMessageFrom(NeuronMessage(spike.getStartTimestamp, spike.getMessage, null, null, None), TrashNetworkEntityReference)
case m => println("Warning : unhandled input stimuli "+m.toString)
}
}
metaDataList.foreach{
case label : N2S3InputLabel =>
this.triggerEventWith(LabelChangeEvent, LabelChangeResponse(label.getStartTimestamp, label.getStartTimestamp+label.getDuration, label.getLabel))
case m => println("Warning : unhandled meta data "+m.toString)
}
//this.stream.getStageManagementStrategy().itemProcessed()
sender.send(ResponseInput)
sender.send(ProcessUntil(input.getStartTimestamp+input.getDuration))
}
}
else{
sender.send(NoMoreInput)
sender.send(ProcessUntil(-1L))
}
case AskRemainInput =>
sender.send(WrapMessage(!stream.atEnd()))
case m: EventHolderMessage =>
processEventHolderMessage(m)
case _ => super.receiveMessage(message, sender)
}
}
package fr.univ_lille.cristal.emeraude.n2s3.core
import java.awt.event.{ActionEvent, ActionListener}
import javax.swing.Timer
import akka.actor.SupervisorStrategy.Stop
import akka.actor.{Actor, ActorRef}
import fr.univ_lille.cristal.emeraude.n2s3.core.actors.{Done, Initialize}
import fr.univ_lille.cristal.emeraude.n2s3.support.actors.Message
import scala.concurrent.Future
/**
* Created by falezp on 13/03/17.
*/
abstract class NetworkActor extends Actor {
def initialize() : Unit
def destroy() : Unit
def process(message : Message, sender : ActorRef) : Unit
def internInitialize() : Unit = {}
def internDestroy() : Unit = {}
def askTo(path : NetworkEntityPath, message: Message) : Any = {
ExternalSender.askTo(path, message)
}
def askFuture(path : NetworkEntityPath, message: Message) : Future[Any] = {
ExternalSender.askFuture(path, message)
}
def sendTo(path : NetworkEntityPath, message: Message) : Unit = {
ExternalSender.sendTo(path, message)
}
def getReferenceOf(path : NetworkEntityPath) : NetworkEntityReference = {
new ExternalNetworkEntityReference(path)
}
def getReferenceOf(path : NetworkEntityPath, sender : NetworkEntityPath) : NetworkEntityReference = {
new RemoteNetworkEntityReference(path, sender)
}
def receive : Receive = {
case Initialize =>
initialize()
internInitialize()
sender ! Done
case Stop =>
internDestroy()
destroy()
sender ! Done
context.stop(self)
case m : Message =>
process(m, sender)
case m => println("[Logger] Unknown message "+m)
}
}
/**
*
* @param refreshRate : refresh delay in milliseconds
* @param n : number of element to update
*/
abstract class AutoRefreshNetworkActor(refreshRate : Int, n : Int = 1) extends NetworkActor {
private var i = 0
private val taskPerformer = new ActionListener() {
override def actionPerformed(actionEvent: ActionEvent): Unit = {
update(i)
i = (i+1)%n
}
}
private val timer = new Timer(refreshRate, taskPerformer)
def update(n : Int) : Unit
override def internInitialize() : Unit = {
super.internInitialize()
timer.start()
}
override def internDestroy() : Unit = {
super.internDestroy()
timer.stop()
}
}
/************************************************************************************************************
* Contributors:
* - created by falezp
***********************************************************************************************************/
package fr.univ_lille.cristal.emeraude.n2s3.core
import fr.univ_lille.cristal.emeraude.n2s3.core.actors.NetworkEntityActor.AddChildEntity
import fr.univ_lille.cristal.emeraude.n2s3.core.exceptions.DuplicatedChildException
import fr.univ_lille.cristal.emeraude.n2s3.support.actors.{Message, UnhandledMessageException}
import scala.collection.mutable
/**
* This class is used to contain several NetworkEntity children
* Each of them is associated to an getIdentifier, used to route messages
*/
class NetworkContainer extends NetworkEntity {
/**
* Map of all children entities associated to their identifiers
*/
private var childEntities = mutable.HashMap[Any, NetworkEntity]()
/**
* Return the number of direct children entities
*/
def numberOfChildEntity = childEntities.size
def isEmpty = this.childEntities.isEmpty
/**
* Return the entity child associated to the getIdentifier
*
* @param identifier is the getIdentifier of the child
* @throws NoSuchElementException if getIdentifier not associated to a child
*/
def childEntityAt(identifier : Any) = {
childEntities.get(identifier) match {
case Some(entity) => entity
case None => throw new NoSuchElementException(identifier.toString)
}
}
/**
* Retrieve the entity designated by the parameter path
*
* @param target is the local path, which is constituted of a sequence of getIdentifier
* @return the NetworkEntity symbolized by the parameter destination
*/
override def resolvePath(target : Traversable[Any]) : NetworkEntity = {
if(target.isEmpty){
this
} else {
this.childEntityAt(target.head).resolvePath(target.tail)
}
}
/**
* The only message handled by this class is AddChildEntity, which add a child entity to the current container
* All other message throws an exception
*
* @param message is the content to be processed
* @param sender is the reference of the sender
* @throws RuntimeException when a child with the same getIdentifier already exists in this container
* @throws UnhandledMessageException when the message is unrecognized
*/
def receiveMessage(message : Message, sender : NetworkEntityReference) : Unit = message match {
case AddChildEntity(identifier, entity) => addChild(identifier, entity)
case _ => throw new UnhandledMessageException(this.getClass, message)
}
def addChild(identifier: Any, entity: NetworkEntity): Unit = {
if (childEntities.isDefinedAt(identifier)) {
throw new DuplicatedChildException("Child \"" + identifier + "\" already exists")
}
else {
childEntities += identifier -> entity
entity.setParent(this, container)
entity.setLocalPath(localPath ++ Seq(identifier))
}
}
}
\ No newline at end of file
/************************************************************************************************************
* Contributors:
* - created by Pierre Falez on 02/05/16
***********************************************************************************************************/
package fr.univ_lille.cristal.emeraude.n2s3.core
import akka.actor.ActorRef
import fr.univ_lille.cristal.emeraude.n2s3.core.NetworkEntity.AskReference
import fr.univ_lille.cristal.emeraude.n2s3.core.actors.{Done, NetworkEntityActor, WrapMessage}
import fr.univ_lille.cristal.emeraude.n2s3.core.exceptions.UnknownPathException
import fr.univ_lille.cristal.emeraude.n2s3.support.actors.Message
/**
* NetworkEntity companion object
*/
object NetworkEntity {
/**
* Message used to get the reference of the parameter path according to the receiver.
* Response of this message is a WrapMessage with the reference in it content
*
* @param path is the target of the asked reference
*/
case class AskReference(path: NetworkEntityPath) extends Message
}
/**
* A trait with common behaviour for entities in a N2S3 simulation.
*/
trait NetworkEntity {
/**
* The NetworkEntityActor in which the current entity is
*/
var container : NetworkEntityActor = _
private var parent: Option[NetworkEntity] = None
def getContainerActorRef: ActorRef = {
this.container match {
case null => null
case _ => this.container.self
}
}
/**
* Method used to send a message to this NetworkEntity with a sender
*/
final def sendMessageFrom(message : Message, sender : NetworkEntityReference) : Unit = {
processMessage(message, sender)
if(message.expectResponse && !sender.hasSendMessage) {
sender.send(Done)
}
}
/**
* The local path prefix of the current entity
*/
protected var localPath : Traversable[Any] = Nil
def getParent = this.parent.get
def setParent(container: NetworkEntity, containerActor: NetworkEntityActor) = {
this.parent = Some(container)
this.container = containerActor
}
def setLocalPath(localPath: Traversable[Any]): Unit = this.localPath = localPath
def getNetworkAddress = new NetworkEntityPath(this.getContainerActorRef, this.localPath)
/**
* Return the reference from a NetworkEntityPath. The sender is consider to be the current entity
*
* @param path is the target to reference
* @return a reference to the target (e.g. local or remote)
*/
def getReferenceOf(path : NetworkEntityPath) : NetworkEntityReference = {
getReferenceOf(path, getNetworkAddress)
}
def isLocalReference(path : NetworkEntityPath) : Boolean = {
this.getContainerActorRef == path.actor
}
/**
* Return the reference from a NetworkEntityPath. The sender is will be the sender parameter
*
* @param path is the target to reference
* @param sender is the sender of the reference
* @return
*/
def getReferenceOf(path : NetworkEntityPath, sender: NetworkEntityPath) : NetworkEntityReference = {
if(isLocalReference(path, sender))
new LocalNetworkEntityReference(container, container.resolvePath(path.local), container.resolvePath(sender.local))
else
new RemoteNetworkEntityReference(path,sender)
}
def isLocalReference(path : NetworkEntityPath, sender: NetworkEntityPath) : Boolean = {
if(container == null)
throw new RuntimeException("null container")
if(path == null)
throw new RuntimeException("null path")
if(sender == null)
throw new RuntimeException("null sender")
container.self == path.actor && sender.actor == container.self
}
/**
* If the current path is empty, return the current entity
* Otherwise throw an exception
*
* @param destination is a local path
* @return the NetworkEntity symbolized by the parameter destination
* @throws UnknownPathException is the local path is not empty
*/
def resolvePath(destination : Traversable[Any]) : NetworkEntity = {
if(destination.isEmpty)
this
else
throw new UnknownPathException(localPath++destination)
}
/**
* Process message AskReference
* All other messages are delegate to receiveMessage
*
* @param message is the content to be processed
* @param sender is a reference of the sender, which can be used to send response
*/
final def processMessage(message : Message, sender : NetworkEntityReference) : Unit = message match {
case AskReference(path) =>
val reference = getReferenceOf(path)
val wrappedMessage = WrapMessage(reference)
sender.send(wrappedMessage)
case _ =>
receiveMessage(message, sender)
}
/**
* Process a message not handled by ReferenceableNetworkEntity
*
* @param message is the content to be processed
* @param sender is a reference of the sender, which can be used to send response
*/
def receiveMessage(message : Message, sender : NetworkEntityReference) : Unit
def referenceToSynchronizer(synchronizer : NetworkEntityPath) : NetworkEntityReference = {
getReferenceOf(synchronizer)
}
}
/************************************************************************************************************
* Contributors:
* - created by Pierre Falez
***********************************************************************************************************/
package fr.univ_lille.cristal.emeraude.n2s3.core
import fr.univ_lille.cristal.emeraude.n2s3.core.Neuron.{Connection, NeuronEnds, NeuronMessage}
import fr.univ_lille.cristal.emeraude.n2s3.core.event.{Event, EventResponse}
import fr.univ_lille.cristal.emeraude.n2s3.support.GlobalTypesAlias.{ConnectionId, Timestamp}
import fr.univ_lille.cristal.emeraude.n2s3.support.actors.Message
/**
* NeuronConnection Companion object
*/
object NeuronConnection {
/**
* Class used to manage communication with input and output neurons of a connection
*
* @param connection is the current connection object
* @param neuron is the output neuron, which contains the connection
* @param connectionId is the id of the current connection
*/
class ConnectionEnds(connection : Connection, val neuron : Neuron, val connectionId: ConnectionId) {
var directMessageSent : Int = 0
/**
* Send a message to the input neuron
*
* @param timestamp is the timestamp at which the message will be processed
* @param message is the content which will be sent
*/
def sendToInput(timestamp : Timestamp, message : Message) : Unit = {
if(timestamp >= getCurrentTimestamp) {
connection.preSyncRef.send(NeuronMessage(timestamp, message, connection.preSyncRef, connection.postSyncRef, None))
}
else
throw new RuntimeException("can't send message with past timestamp")
}
/**
* Send a message to the input neuron. In case of no delay, the output neuron will immediately process the message
*
* @param timestamp is the timestamp at which the message will be processed
* @param message is the content which will be sent
*/
def sendToOutput(timestamp : Timestamp, message : Message) : Unit = {
if(timestamp == getCurrentTimestamp) {
val ends = new NeuronEnds(neuron, timestamp)
neuron.processSomaMessage(timestamp, message, Some(connectionId -> neuron.getInputConnection(connectionId).connection), ends)
}
else if(timestamp >= getCurrentTimestamp) {
neuron.thisToSyncRef.send(NeuronMessage(timestamp, message, neuron.thisToSyncRef, neuron.syncToThisRef, Some(connectionId)))
}
else
throw new RuntimeException("can't send message with past timestamp")
}
def getCurrentTimestamp : Timestamp = neuron.getCurrentTimestamp
}
}
abstract class SynapseBuilder {
def createSynapse: NeuronConnection
}
/**
* Basic trait for all neuron connection
*
*/
trait NeuronConnection extends Serializable{
import NeuronConnection._
var ends : Option[ConnectionEnds] = None
private var fixedParameter = false
def initialize() : Unit = {
addProperty(ConnectionList)
addProperty(ConnectionClassName)
}
def reset() : Unit = {}
/**
* Method used to describe the behavior of the connection model
*
* @param timestamp is the current time
* @param message is the content
*/
def processConnectionMessage(timestamp : Timestamp, message : Message) : Unit
def getProperty[T](property : ConnectionProperty[T]) : Option[T] = property match {
case ConnectionClassName =>
Some(getClass.getCanonicalName.asInstanceOf[T])
case ConnectionList =>
Some('\u0000'.asInstanceOf[T])
case _ =>
None
}
def setProperty[T](property : ConnectionProperty[T], value : T) : Unit = {
}
def getEnds : ConnectionEnds = {
if(ends.isEmpty)
throw new RuntimeException("Synapse not connected")
ends.get
}
def triggerEvent[Response <: EventResponse](event : Event[Response], response : Response) : Unit = {
getEnds.neuron.triggerEventWith(event, response)
}
def addProperty[T](connectionProperty: ConnectionProperty[T]) : Unit = {
getEnds.neuron.addConnectionProperty(connectionProperty)
}
def addEvent(event : Event[_ <: EventResponse]) : Unit = {
getEnds.neuron.addEvent(event)
}
def hasFixedParameter : Boolean = this.fixedParameter
def setFixedParameter(state : Boolean) : Unit = this.fixedParameter = state
}
/************************************************************************************************************
* Contributors:
* - Pierre Falez
***********************************************************************************************************/
package fr.univ_lille.cristal.emeraude.n2s3.core
import akka.actor.ActorRef
import akka.util.Timeout
import fr.univ_lille.cristal.emeraude.n2s3.core.actors.NetworkEntityActor.{ExplicitSenderRoutedMessage, ImplicitSenderRoutedMessage}
import fr.univ_lille.cristal.emeraude.n2s3.core.actors.{Config, NetworkEntityActor}
import fr.univ_lille.cristal.emeraude.n2s3.support.actors.Message
import scala.concurrent.{Await, Future}
object NetworkEntityPath {
def apply(actor : ActorRef, local : Traversable[Any]) = new NetworkEntityPath(actor, local)
def apply(actor : ActorRef) = new NetworkEntityPath(actor, Nil)
}
/********************************************************************************************************
* Represent an absolute path of a network entity
*
* @param actor which is the reference to the container actor of the targeted network entity
* @param local is the sequence of getIdentifier in order to pass through the local actor hierarchy
*
*******************************************************************************************************/
@SerialVersionUID(1L)
class NetworkEntityPath(val actor : ActorRef, val local : Traversable[Any]) extends Serializable {
def this(actor : ActorRef) = this(actor, Nil)
def /(child : Any) : NetworkEntityPath = NetworkEntityPath(actor, local++Seq(child))
override def equals(that : Any) = that match {
case null => false
case obj : NetworkEntityPath =>
if(isNullPath || obj.isNullPath)
isNullPath == obj.isNullPath
else
hashCode == obj.hashCode && actor.path == obj.actor.path && local == obj.local
case _ => false
}
override def hashCode : Int = {
actor.path.hashCode+31*local.hashCode
}
def isNullPath : Boolean = actor == null
override def toString : String = if(isNullPath)
"NullPath"
else
local.foldLeft(actor.path.name+":")((acc, curr) => acc+"/"+curr.toString)
}
/********************************************************************************************************
* Represent the reference of a targeted network entity according of the sender
*******************************************************************************************************/
trait NetworkEntityReference extends Serializable{
var hasSendMessage = false
def send(message: Message) : Unit
def ask(message : Message) : Message
def disableAutoResponse() : Unit = {
hasSendMessage = true
}
}
/********************************************************************************************************
* Specialization of the NetworkEntityReference dedicated to network entities which lives in the same actor.
* Messages can be directly sent to the object
*
* @param target is the direct access to the target network entity
*******************************************************************************************************/
class LocalNetworkEntityReference(val container : NetworkEntityActor, val target : NetworkEntity, val sender : NetworkEntity) extends NetworkEntityReference {
def send(message: Message) : Unit = {
container.addToQueue(message, target, new LocalNetworkEntityReference(container, sender, target))
hasSendMessage = true
}
def ask(message : Message) : Message = {
class DummyReference(var response : Message) extends NetworkEntityReference {
def send(message: Message) : Unit = {
assert(response == null)
response = message
}
def ask(message : Message) : Message = throw new UnsupportedOperationException
}
val sender = new DummyReference(null)
target.sendMessageFrom(message, sender)
assert(sender.response != null)
sender.response
}
override def toString = "LocalNetworkEntityReference(target="+target.getNetworkAddress+",sender="+sender.getNetworkAddress+")"
override def equals(that : Any): Boolean = that match {
case that: LocalNetworkEntityReference => container == that.container && target == that.target && sender == that.sender
case _ => false
}
}
/********************************************************************************************************
* Specialization of the NetworkEntityReference dedicated to network entities which lives in different actor
* Messages need to go through the actor mailbox before reach the destination network entity
*
* @param target is the absolute pass of the targeted network entity
*******************************************************************************************************/
class RemoteNetworkEntityReference(val target : NetworkEntityPath, val sender : NetworkEntityPath) extends NetworkEntityReference with Serializable {
implicit val timeout = Config.longTimeout
def send(message: Message) : Unit = {
target.actor ! ExplicitSenderRoutedMessage(target.local, message, sender)
hasSendMessage = true
}
def ask(message: Message) : Message = {
Await.result(akka.pattern.ask(target.actor, ImplicitSenderRoutedMessage(target.local, message)), timeout.duration).asInstanceOf[Message]
}
override def toString = "RemoteNetworkEntityReference(target="+target+",sender="+sender+")"
override def equals(that : Any): Boolean = that match {
case that: RemoteNetworkEntityReference => target == that.target && sender == that.sender
case _ => false
}
}
class ExternalNetworkEntityReference(val target : NetworkEntityPath) extends NetworkEntityReference with Serializable {
implicit val timeout: Timeout = Config.defaultTimeout
def send(message: Message) : Unit = {
target.actor ! ImplicitSenderRoutedMessage(target.local, message)
hasSendMessage = true
}
def ask(message: Message) : Message = {
Await.result(akka.pattern.ask(target.actor, ImplicitSenderRoutedMessage(target.local, message)), timeout.duration).asInstanceOf[Message]
}
override def toString: String = "ExternalNetworkEntityReference("+target+")"
override def equals(that : Any): Boolean = that match {
case that: ExternalNetworkEntityReference => target == that.target
case _ => false
}
}
/**
* Specialization of the NetworkEntityReference dedicated to unknown network entities
*
* @param target is the destination akka Actor
*/
class ExternalActorReference(val target : ActorRef) extends NetworkEntityReference {
implicit val timeout: Timeout = Config.longTimeout
def send(message: Message): Unit = {
target ! message
hasSendMessage = true
}
def ask(message: Message): Message = {
Await.result(akka.pattern.ask(target, message), timeout.duration).asInstanceOf[Message]
}
override def toString: String = "ExternalActorReference("+target.path+")"
override def equals(that : Any): Boolean = that match {
case that: ExternalActorReference => target == that.target
case _ => false
}
}
object NullNetworkEntityReference extends NetworkEntityReference {
def send(message: Message) : Unit = {
throw new UnsupportedOperationException
}
def ask(message: Message) : Message = {
throw new UnsupportedOperationException
}
override def equals(that : Any): Boolean = that match {
case NullNetworkEntityReference => true
case _ => false
}
}
object TrashNetworkEntityReference extends NetworkEntityReference {
def send(message: Message) : Unit = {
}
def ask(message: Message) : Message = {
object NullMessage extends Message
NullMessage
}
}
/**
* Helper for interact with system entities
*/
object ExternalSender {
implicit val timeout: Timeout = Config.longTimeout
def sendTo(path : NetworkEntityPath, message: Message) : Unit = {
path.actor ! ImplicitSenderRoutedMessage(path.local, message)
}
def askTo(path : NetworkEntityPath, message: Message, timeout: Timeout = timeout) : Any = {
Await.result(askFuture(path, message), timeout.duration)
}
def askFuture(path : NetworkEntityPath, message: Message) : Future[Any] = {
akka.pattern.ask(path.actor, ImplicitSenderRoutedMessage(path.local, message.setResponseMode()))
}
def getReference(actor: ActorRef) : NetworkEntityReference = new ExternalActorReference(actor)
}
\ No newline at end of file
/************************************************************************************************************
* Contributors:
* - Pierre
***********************************************************************************************************/
package fr.univ_lille.cristal.emeraude.n2s3.core
import fr.univ_lille.cristal.emeraude.n2s3.core.Neuron.{ConnectionID, InputConnection}
import fr.univ_lille.cristal.emeraude.n2s3.core.actors.WrapMessage
import fr.univ_lille.cristal.emeraude.n2s3.support.actors.Message
import scala.collection.mutable.Map
import scala.collection.{AbstractMap, immutable}
abstract class PropertyMessage extends Message
/***********************************************************************************************************
* Property are visible attribute of a model.
* Properties can be get and set
*
* There are two type of property :
* Basic which are unique per actor
* Connection which can be several in an actor
**********************************************************************************************************/
/**********************************************************************************************************
* MANAGE PROPERTIES
*********************************************************************************************************/
object GetAllProperties extends PropertyMessage
case class PropertiesList(content : immutable.Map[Property[Any], Any]) extends PropertyMessage
object GetAllConnectionProperties extends PropertyMessage
case class ConnectionPropertiesList(content : immutable.Map[ConnectionProperty[Any], ConnectionPropertyValues[Any]]) extends PropertyMessage
object GetClassName extends PropertyMessage
/**********************************************************************************************************
* NEURONS PROPERTIES
*********************************************************************************************************/
case class SetProperty[A](property : Property[A], value : A) extends PropertyMessage
case class GetProperty[A](property : Property[A]) extends PropertyMessage
case class PropertyValue[A](value : A) extends PropertyMessage
/**********************************************************************************************************
* CONNECTION PROPERTIES
*********************************************************************************************************/
case class SetConnectionProperty[A](property : ConnectionProperty[A], value : A) extends PropertyMessage
case class GetConnectionProperty[A](property : ConnectionProperty[A]) extends PropertyMessage
case class GetAllConnectionProperty[A](property : ConnectionProperty[A]) extends PropertyMessage
case class ConnectionPropertyValues[A](values : Seq[(ConnectionID, NetworkEntityPath, A)]) extends PropertyMessage
case class SetAllConnectionProperty[A](property : ConnectionProperty[A], values : Seq[(ConnectionID, A)]) extends PropertyMessage
/*
case class SetAllConnectionProperty[A](property : ConnectionProperty[A], values : Seq[(NetworkEntityPath, A)]) extends PropertyMessage
case class GetAllConnectionProperty[A](property : ConnectionProperty[A]) extends PropertyMessage
*/
class PropertyHandler[A](val getter : () => A, val setter : A => Unit) extends Serializable {
def setValue(value: Any): Unit ={
setter(value.asInstanceOf[A])
}
def set(message : PropertyMessage) : Unit = {
message match {
case SetProperty(_, value) => this.setValue(value)
case other => throw new RuntimeException("Incorrect PropertyMessage")
}
}
def get(message : PropertyMessage) : A = {
getter()
}
}
class ConnectionPropertyHandler[A](val getter : NeuronConnection => Option[A], val setter : (NeuronConnection, A) => Unit) extends Serializable {
def set(connections : AbstractMap[ConnectionID, InputConnection], message : PropertyMessage, connectionID : ConnectionID) : Unit = {
message match {
case SetConnectionProperty(_, value) =>
setter(connections(connectionID).connection, value.asInstanceOf[A])
case other => throw new RuntimeException("Incorrect PropertyMessage")
}
}
def setAll(connections : AbstractMap[ConnectionID, InputConnection], message : PropertyMessage) : Unit = {
message match {
case SetAllConnectionProperty(_, values) =>
values.foreach{ case(id, value) =>
setter(connections(id).connection, value.asInstanceOf[A])
}
case other => throw new RuntimeException("Incorrect PropertyMessage")
}
}
def get(connections : AbstractMap[ConnectionID, InputConnection], message : PropertyMessage, connectionID : ConnectionID) : A = {
message match {
case GetConnectionProperty(_) => getter(connections(connectionID).connection).get
case other => throw new RuntimeException("Incorrect PropertyMessage")
}
}
def getAll(connections : AbstractMap[ConnectionID, InputConnection], message : PropertyMessage) : Seq[(ConnectionID, NetworkEntityPath, A)] = {
message match {
case GetAllConnectionProperty(_) =>
connections.toSeq.map{ case(connectionId, connectionObj) => (connectionId, connectionObj.path, getter(connectionObj.connection))}.filter(_._3.isDefined)
.map{ case(id, path, value) => (id, path, value.get) }
case other => throw new RuntimeException("Incorrect PropertyMessage")
}
}
}
abstract class Property[A] extends Serializable {
def create(getter : () => A, setter : A => Unit) : PropertyHandler[A] = {
new PropertyHandler[A](getter, setter)
}
}
abstract class ConnectionProperty[A] extends Serializable {
def create(getter : NeuronConnection => Option[A], setter : (NeuronConnection, A) => Unit) : ConnectionPropertyHandler[A] = {
new ConnectionPropertyHandler[A](getter, setter)
}
}
/*******************************************************************************************************
* PropertyHolder contains properties
******************************************************************************************************/
trait PropertyHolder {
val properties = Map[Property[_], PropertyHandler[_]]()
//val connectionProperties = Map[ConnectionProperty[_], ConnectionPropertyHandler[_]]()
//val connectionProperties = mutable.ArrayBuffer[ConnectionProperty[_]]()
var connections : AbstractMap[ConnectionID, InputConnection] = _
def initializePropertyHolder(connections : AbstractMap[ConnectionID, InputConnection]) : Unit = {
this.connections = connections
}
def addProperty[A](property : Property[A], getter : () => A, setter : A => Unit) : Unit = {
properties += (property -> property.create(getter, setter))
}
def addConnectionProperty[A](property : ConnectionProperty[A]) : Unit = {
//connectionProperties += property
}
def unableToFindProperty[A](property : Property[A]) : Unit = {
// println("Warning : Unable to find property \""+property+"\"")
throw new RuntimeException("Unable to find property \""+property+"\"")
}
def unableToFindConnection[A](property : ConnectionProperty[A], connectionID: ConnectionID) : Unit = {
// println("Warning : Unable to find property \""+property+"\"")
throw new RuntimeException("Unable to find connection "+connectionID)
}
def unableToFindConnectionProperty[A](property : ConnectionProperty[A], connectionID: ConnectionID, connectionObject : NeuronConnection) : Unit = {
// println("Warning : Unable to find property \""+property+"\"")
throw new RuntimeException("Unable to find connection property \""+property+"\" on connection "+connectionID+" (connection class="+connectionObject.getClass.getCanonicalName+")")
}
def setProperty(property: Property[_], value: Any): Unit = {
properties.get(property) match {
case Some(handler) => handler.setValue(value)
case None => unableToFindProperty(property)
}
}
def getConnectionProperty[T](property: ConnectionProperty[T], connectionID : ConnectionID) : T = {
if(!connections.isDefinedAt(connectionID))
unableToFindConnection(property, connectionID)
val ret = connections(connectionID).connection.getProperty[T](property)
if(ret.isEmpty)
unableToFindConnectionProperty(property, connectionID, connections(connectionID).connection)
ret.get
}
def setConnectionProperty[T](property: ConnectionProperty[T], value: T, connectionID: ConnectionID) : Unit = {
if(!connections.isDefinedAt(connectionID))
unableToFindConnection(property, connectionID)
connections(connectionID).connection.setProperty[T](property, value)
}
def getAllConnectionProperty[T](property: ConnectionProperty[T]) : Seq[(ConnectionID, NetworkEntityPath, T)] = {
connections.flatMap{ case(connectionId, connectionObj) =>
val ret = connectionObj.connection.getProperty[T](property)
if(ret.isDefined)
Some((connectionId, connectionObj.path, ret.get))
else
None
}.toSeq
}
def setAllConnectionProperty[T](property: ConnectionProperty[T], values : Seq[(ConnectionID, T)]) : Unit = {
values.foreach{ case(connectionId, value) =>
setConnectionProperty[T](property, value, connectionId)
}
}
def processConnectionPropertyMessage(message : PropertyMessage, sender : NetworkEntityReference, connectionID: ConnectionID) : Unit = message match {
case SetConnectionProperty(property, value) =>
setConnectionProperty(property, value, connectionID)
case GetConnectionProperty(property) =>
sender.send(PropertyValue(getConnectionProperty(property, connectionID)))
}
def processPropertyMessage(message : PropertyMessage, sender : NetworkEntityReference) : Unit = message match {
case SetProperty(property, _) =>
properties.get(property) match {
case Some(handler) => handler.set(message)
case None => unableToFindProperty(property)
}
case GetProperty(property) =>
properties.get(property) match {
case Some(handler) => sender.send(PropertyValue(handler.get(message)))
case None => unableToFindProperty(property)
}
case GetAllConnectionProperty(property) =>
sender.send(ConnectionPropertyValues(getAllConnectionProperty(property)))
case GetAllProperties =>
sender.send(PropertiesList(properties.map(entry => (entry._1.asInstanceOf[Property[Any]], entry._2 match {
case value : PropertyHandler[_] => value.getter()
})).toMap))
case GetAllConnectionProperties =>
throw new UnsupportedOperationException
/*
sender.send(ConnectionPropertiesList(connectionProperties
.map(property => property.asInstanceOf[ConnectionProperty[Any]] ->
ConnectionPropertyValues[Any](getAllConnectionProperty(property))).toMap
))*/
case SetAllConnectionProperty(property, values) =>
setAllConnectionProperty(property, values)
case GetClassName =>
sender.send(WrapMessage(this.getClass.getCanonicalName))
case other => throw new RuntimeException("Unknown message \""+message+"\"")
}
}
\ No newline at end of file
/************************************************************************************************************
* Contributors:
* - created by Pierre Falez on 03/05/16
***********************************************************************************************************/
package fr.univ_lille.cristal.emeraude.n2s3.core
import akka.actor.Props
import fr.univ_lille.cristal.emeraude.n2s3.core.actors._
import fr.univ_lille.cristal.emeraude.n2s3.core.event.{Event, EventHolder, EventTriggered}
import fr.univ_lille.cristal.emeraude.n2s3.features.io.input.GlobalStream
import fr.univ_lille.cristal.emeraude.n2s3.support.GlobalTypesAlias.Timestamp
import fr.univ_lille.cristal.emeraude.n2s3.support.Time
import fr.univ_lille.cristal.emeraude.n2s3.support.actors.{ActorCompanion, Message, PropsBuilder, UnhandledMessageException}
/**
* Synchronizer companion object
*/
object Synchronizer {
/**
* Event used when an entity need to wait that synchronizer has ended all process
*/
object WaitComputationEvent extends Event[EventTriggered.type] {
override def defaultResponse = EventTriggered
override def isSingleUsage = true
}
/**
* Message send to synchronizer for wait end of process
* When synchronizer will terminate is task, he will response with a EventTriggered message
*/
object WaitEndOfActivity extends Message
/**
* Message send to synchronizer for wait until the duration is reached
* When synchronizer will terminate is task, he will response with a EventTriggered message
*/
case class WaitUntil(duration : Time) extends Message
/**
* Message used to ask synchronizer to stop asking input for new inputs
*/
object CleanQueue extends Message
/**
* Base class for all synchronizable messages
*/
abstract class SynchronizedMessage extends Message {
val postSync : NetworkEntityReference
val message : Message
val timestamp : Timestamp
}
/**
* Start the processing of messages by the synchronizer
* It will ask input until the provider is empty
*/
object Start extends Message
/**
* Ask only the next input to the provider and process it
*/
object Next extends Message
/**
* Stop the synchronizer
*/
object Stop extends Message
/**
* Ask new event to input
*/
object AskInput extends Message
/**
* Response sent when the new event has been sent
*/
object ResponseInput extends Message
case class ProcessUntil(timestamp : Timestamp) extends Message
}
/**
* Synchronizer is an entity which ensure the time causality of the system
* To do this, synchronizer will dequeue all message with the smallest timestamp, and send them
* Then it will wait, for each sent message, a Done response before process next messages
*
* Furthermore, synchronizer is responsible to ask input for new events when the number of remain messages
* falls below a threshold
*/
class Synchronizer extends NetworkEntity with EventHolder {
import Synchronizer._
addEvent(WaitComputationEvent)
val bufferSize = 128
/**
* internal queue contains all messages generated by the network and by inputs
*/
val queue = scala.collection.mutable.PriorityQueue[SynchronizedMessage]()(Ordering.fromLessThan((s1, s2) => s1.timestamp > s2.timestamp))
//val inputQueue = scala.collection.mutable.PriorityQueue[SynchronizedMessage]()(Ordering.fromLessThan((s1, s2) => s1.timestamp > s2.timestamp))
var processLimitTimestamp : Timestamp = 0
/**
* Indicated the number of remain acknowledgement before process next messages
*/
var ackRemain = 0
/**
* Reference of the input
*/
var input : NetworkEntityReference = _
/**
* State of the input
*/
var closedInput = true
/**
* True if a request for ask new input as already be sent
*/
var askedInput = false
/**
* True if the clean mode queue is activate
*/
var cleanQueueMode = false
/**
* Store the current timestamp for this synchronizer. All future received message should be upper or equals to that attribute
*/
var currentTimestamp : Timestamp = 0
/**
* duration limit (negative number is no limitation is set)
*/
var waitUntil : Timestamp = -1L
var lastMessage : Option[Message] = None
/**
* Add a message to the queue
*
* @param message is the message to synchronize
* @throws RuntimeException if causality error arise
*/
def enqueue(message : SynchronizedMessage) : Unit = {
if(message.timestamp < currentTimestamp){
println(/*throw new RuntimeException(*/"causality error : messageTimestamp="+message.timestamp+" < globalTime="+currentTimestamp+"\n"+
(if(lastMessage.isDefined) lastMessage.get else "/")+" - "+message)
}
queue += message
}
/**
* Dequeue all message with the smallest timestamp and send them
*/
def dequeue() : Unit = {
if(queue.nonEmpty && ackRemain == 0) {
currentTimestamp = queue.head.timestamp
if(waitUntil >= 0 && currentTimestamp > waitUntil) {
closedInput = true
askedInput = false
queue.clear()
}
var messageList = List[SynchronizedMessage]()
while(queue.nonEmpty && queue.head.timestamp == currentTimestamp && (closedInput || processLimitTimestamp < 0 || queue.head.timestamp <= processLimitTimestamp)) {
messageList = queue.dequeue() :: messageList
}
ackRemain = messageList.size
if(messageList.isEmpty && input != null && !closedInput && !askedInput) { // need new timestamps to process queue
input.send(AskInput)
ackRemain += 1
askedInput = true
}
messageList.foreach { sync: SynchronizedMessage =>
sync.postSync.send(sync)
}
lastMessage = messageList.lastOption
//println(messageList.mkString("\n"))
}
if(cleanQueueMode) {
if(queue.isEmpty) {
cleanQueueMode = false
if (input != null && !closedInput && !askedInput) {
input.send(AskInput)
askedInput = true
}
}
}
else {
if (input != null && queue.size <= bufferSize && !closedInput && !askedInput) {
input.send(AskInput)
ackRemain += 1
askedInput = true
}
}
if(!this.isActive){
GlobalStream.setPrefix(currentTimestamp)
triggerEvent(WaitComputationEvent)
}
}
/**
* Return true if it remains process to do
* Otherwise return false
*/
def isActive: Boolean = queue.nonEmpty || ackRemain > 0 || !closedInput
/**
* Handle all message required by synchronizer
*
* @param message is the content to be processed
* @param sender is a reference of the sender, which can be used to send response
*/
def receiveMessage(message : Message, sender : NetworkEntityReference) : Unit = message match {
case message : SynchronizedMessage =>
enqueue(message)
dequeue()
case ResponseInput =>
askedInput = false
ackRemain -= 1
dequeue()
case Done =>
ackRemain -= 1
dequeue()
case SetInput(path) =>
input = getReferenceOf(path)
closedInput = false
askedInput = false
case Start =>
ackRemain = 0
askedInput = false
closedInput = false
dequeue()
case Next =>
closedInput = true
input.send(AskInput)
ackRemain = 1
dequeue()
case Stop =>
closedInput = true
ackRemain = -1
case NoMoreInput =>
ackRemain -= 1
closedInput = true
dequeue()
case WaitEndOfActivity =>
if (this.isActive){
sender.disableAutoResponse()
subscribeTo(sender, WaitComputationEvent) }
else {
sender.send(WaitComputationEvent.defaultResponse)
}
case WaitUntil(duration) =>
waitUntil = duration.timestamp+currentTimestamp
case CleanQueue =>
cleanQueueMode = true
case ProcessUntil(limit) =>
processLimitTimestamp = limit
case _ => throw new UnhandledMessageException(this.getClass, message)
}
}
/**
* SynchronizerActor companion object
*/
object SynchronizerActor extends ActorCompanion {
override def newPropsBuilder() = new PropsBuilder{
def build() : Props = Props(new SynchronizerActor)
}
}
/**
* mixin of Synchronizer with NetworkEntityActor
*/
class SynchronizerActor extends NetworkEntityActor(new Synchronizer)
\ No newline at end of file
package fr.univ_lille.cristal.emeraude.n2s3.core
import akka.actor.Props
import fr.univ_lille.cristal.emeraude.n2s3.core.Synchronizer.SynchronizedMessage
import fr.univ_lille.cristal.emeraude.n2s3.core.SynchronizerEvent.{AskEvents, ResponseEvents}
import fr.univ_lille.cristal.emeraude.n2s3.core.actors.NetworkEntityActor
import fr.univ_lille.cristal.emeraude.n2s3.core.event.EventResponse
import fr.univ_lille.cristal.emeraude.n2s3.support.GlobalTypesAlias.Timestamp
import fr.univ_lille.cristal.emeraude.n2s3.support.actors.{ActorCompanion, Message, PropsBuilder}
import scala.collection.mutable
/**
* Created by pfalez on 28/03/17.
*/
object SynchronizerEvent {
case class AskEvents(until : Timestamp) extends Message
case class ResponseEvents(events : Seq[EventResponse]) extends Message
}
class SynchronizerEvent extends NetworkEntity{
val queue : mutable.HashMap[NetworkEntityReference, mutable.ArrayBuffer[SynchronizedMessage]] = mutable.HashMap()
var currentTimestamp : Timestamp = 0L
def enqueue(message: SynchronizedMessage) : Unit = {
if(message.timestamp < currentTimestamp){
throw new RuntimeException("[SynchronizerEvent] causality error : messageTimestamp="+message.timestamp+" < globalTime="+currentTimestamp+"\n")
}
queue.getOrElseUpdate(message.postSync, mutable.ArrayBuffer[SynchronizedMessage]()) += message
val n = queue.foldLeft(0){case(acc, curr) => acc+curr._2.size}
if(n%1000==0)
println(queue.size+" "+n)
}
def dequeue(sender: NetworkEntityReference, timestamp: Timestamp) : Unit = {
val (valid, invalid) = queue.getOrElseUpdate(sender, mutable.ArrayBuffer[SynchronizedMessage]()).partition(_.timestamp <= timestamp)
queue(sender).clear()
queue(sender) ++= invalid
sender.send(ResponseEvents(valid.map(_.message.asInstanceOf[EventResponse])))
}
/**
* Process a message not handled by ReferenceableNetworkEntity
*
*/
object RefreshMessage extends Message
var counter = 0
var refreshCounter = 0
override def receiveMessage(message: Message, sender: NetworkEntityReference): Unit = message match {
//case PullUntil(timestamp) =>..
case message : SynchronizedMessage =>
if(counter==0)
getReferenceOf(getNetworkAddress).send(RefreshMessage)
counter += 1
if(counter%1000==0)
println(counter)
//enqueue(message)
case AskEvents(timestamp) =>
dequeue(sender, timestamp)
case RefreshMessage =>
refreshCounter += 1
if(refreshCounter%1000==0)
println("refresh "+refreshCounter)
getReferenceOf(getNetworkAddress).send(RefreshMessage)
case m => throw new RuntimeException("[SynchronizerEvent] Unknown message : \""+m+"\"")
}
}
/**
* SynchronizerActor companion object
*/
object SynchronizerEventActor extends ActorCompanion {
override def newPropsBuilder() = new PropsBuilder{
def build() : Props = Props(new SynchronizerEventActor)
}
}
/**
* mixin of Synchronizer with NetworkEntityActor
*/
class SynchronizerEventActor extends NetworkEntityActor(new SynchronizerEvent)
\ No newline at end of file
package fr.univ_lille.cristal.emeraude.n2s3.core.actors
import akka.actor.ActorRef
import fr.univ_lille.cristal.emeraude.n2s3.support.actors.Message
/**
* Created by guille on 10/14/16.
*/
case class AddLoggerTo(actor : ActorRef) extends Message
package fr.univ_lille.cristal.emeraude.n2s3.core.actors
import fr.univ_lille.cristal.emeraude.n2s3.support.actors.Message
/**
* Created by guille on 10/14/16.
*/
object AskRemainInput extends Message
package fr.univ_lille.cristal.emeraude.n2s3.core.actors
import squants.electro.ElectricCharge
/**
* Created by pfalez on 12/12/17.
*/
case class BackpropSpike(charge : ElectricCharge) extends Spike
\ No newline at end of file
package fr.univ_lille.cristal.emeraude.n2s3.core.actors
import akka.actor.ActorRef
import fr.univ_lille.cristal.emeraude.n2s3.support.GlobalTypesAlias.Timestamp
import fr.univ_lille.cristal.emeraude.n2s3.support.actors.Message
/**********************************************************************************************
* Message send for wait end of computation of the receiver
*********************************************************************************************/
case class CurrentLabel(timestamp : Timestamp, end : Timestamp, destination : ActorRef) extends Message
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment