From 602b5138d887551a4060f94b1e4efba9189493d1 Mon Sep 17 00:00:00 2001
From: Maxime MORGE <maxime.morge@univ-lille.fr>
Date: Wed, 24 Jan 2024 17:58:53 +0100
Subject: [PATCH] Refactoring VDS such that we consider the objective function
 is the global flowtime penalized by the throughput

---
 .../balancer/local/HillClimbingBalancer.scala |  16 +--
 .../balancer/local/VDSBalancer.scala          | 131 +++++++++---------
 .../org/smastaplus/core/Allocation.scala      |   9 ++
 .../balancer/local/VDSBalancerEx1Spec.scala   |   2 +-
 .../balancer/local/VDSBalancerEx2Spec.scala   |   8 +-
 .../balancer/local/VDSBalancerSpec.scala      |   2 +-
 6 files changed, 88 insertions(+), 80 deletions(-)

diff --git a/src/main/scala/org/smastaplus/balancer/local/HillClimbingBalancer.scala b/src/main/scala/org/smastaplus/balancer/local/HillClimbingBalancer.scala
index 633c5b4a..d5dde9e3 100644
--- a/src/main/scala/org/smastaplus/balancer/local/HillClimbingBalancer.scala
+++ b/src/main/scala/org/smastaplus/balancer/local/HillClimbingBalancer.scala
@@ -19,8 +19,8 @@ class HillClimbingBalancer(stap: STAP, rule: SocialRule, name: String = "HillCli
     * Returns the best state which is a neighbor with the lowest objective function value
     */
   def highValueSuccessor(current: Allocation): Allocation = {
-    var bestAllocation = new Allocation(stap)
-    var bestSwap : Option[Swap] = None
+    var bestNeighbor = current
+    var bestMove : Option[Swap] = None
     var bestValue = Double.MaxValue
     stap.ds.computingNodes.foreach { initiator => // Foreach initiator
       current.bundle(initiator).foreach { task => // Foreach task in its bundle
@@ -35,8 +35,8 @@ class HillClimbingBalancer(stap: STAP, rule: SocialRule, name: String = "HillCli
           }
           if (nextValue <= bestValue) {
             nbSuccessfulDelegation += 1
-            bestSwap = Some(delegation)
-            bestAllocation = nextAllocation
+            bestMove = Some(delegation)
+            bestNeighbor = nextAllocation
             bestValue = nextValue
           }
           if (withSwap) { // Let us consider the swaps from this delegation
@@ -51,8 +51,8 @@ class HillClimbingBalancer(stap: STAP, rule: SocialRule, name: String = "HillCli
               }
               if (nextValue <= bestValue) {
                 nbSuccessfulSwap += 1
-                bestSwap = Some(swap)
-                bestAllocation = nextAllocation
+                bestMove = Some(swap)
+                bestNeighbor = nextAllocation
                 bestValue = nextValue
               }
             }
@@ -60,8 +60,8 @@ class HillClimbingBalancer(stap: STAP, rule: SocialRule, name: String = "HillCli
         }
       }
     }
-    if (trace && bestSwap.nonEmpty) println(bestSwap.get)
-    bestAllocation
+    if (trace && bestMove.nonEmpty) println(bestMove.get)
+    bestNeighbor
   }
 
   /**
diff --git a/src/main/scala/org/smastaplus/balancer/local/VDSBalancer.scala b/src/main/scala/org/smastaplus/balancer/local/VDSBalancer.scala
index 80b30bb6..de3c44d6 100644
--- a/src/main/scala/org/smastaplus/balancer/local/VDSBalancer.scala
+++ b/src/main/scala/org/smastaplus/balancer/local/VDSBalancer.scala
@@ -1,7 +1,7 @@
 // Copyright (C) Anne-Cécile CARON, Maxime MORGE 2023, 2024
 package org.smastaplus.balancer.local
 
-import org.smastaplus.core.{Allocation, GlobalFlowtime, STAP, SocialRule}
+import org.smastaplus.core.{Allocation, GlobalFlowtime, STAP}
 import org.smastaplus.process.{Deal, SingleDelegation, SingleSwap}
 
 import scala.annotation.tailrec
@@ -10,69 +10,63 @@ import scala.annotation.tailrec
  * VDS (variable depth search) is a generalization of the local search
  * method which adaptively change the size of neighborhood
  * @param stap instance to tackle
- * @param rule is the social rule to be applied
  * @param name of the balancer
- * @param withSwap is true if swap are allowed
- * @param alpha
+ * @param alpha is prespecified parameter
  *  See  Mutsunori Yagiura, Takashi Yamaguchi and Toshihide Ibaraki.
  *  "A variable depth search algorithm for the generalized assignment problem"
  *  Proc. of 2nd International Conference on Metaheuristics, 1997
  *
- *  The best feasible solution is estimated with the global flowtime
- *  cost(alloc) = local flowtime, so needs to know the 2 contractors
- *  pcost(alloc) = cost(alloc) + alpha * global flowtime
+ *  We consider the objective function is the global flowtime penalized by the throughput
  *
  */
-class VDSBalancer (stap: STAP, rule: SocialRule = GlobalFlowtime, name: String = "VDSBalancer",
-                   withSwap: Boolean = false, alpha: Double = 0.5)
-  extends LocalBalancer(stap, rule, name, withSwap) {
+class VDSBalancer(stap: STAP, name: String = "VDSBalancer", alpha: Double = 0.5)
+  extends LocalBalancer(stap, GlobalFlowtime, name, true) {
 
   private var newBestAllocation: Boolean = false //
-  private var bestAllocationForCost : Allocation = null
+  private var bestAllocationForCost : Option[Allocation] = None
   private var bestCost : Double = Double.MaxValue
 
   /**
-   *
-   * Compute the cost of an allocation, i.e. the local flowtime between two contractors of a deal
+   * Returns the cost of an allocation, i.e. the global flowtime
    */
-  def cost(allocation: Allocation, deal: Deal): Double = {
-    val node1 = deal.contractors.head
-    val node2 = deal.contractors(1)
-    // local flowtime between node1 and node2
-    allocation.jobs.foldLeft(0.0) { (sum, job) => sum + Math.max(allocation.completionTime(job, node1), allocation.completionTime(job, node2)) }
+  def cost(allocation: Allocation): Double = {
+    allocation.globalFlowtime
   }
 
   /**
-   *
-   * compute the "penalized" cost of an allocation, i.e. the local flowtime between two contractors of a deal, penalized by global flowtime
+   * Returns the objective function penalized by the throughput
    */
-  def penalizedCost(allocation: Allocation, deal: Deal): Double =
-    cost(allocation, deal) + alpha * allocation.globalFlowtime
+  private def penalizedCost(allocation: Allocation) =
+    cost(allocation) + alpha * allocation.throughput
 
   /**
    *
    * best feasible allocation between a1 and a2, estimated with Global Flowtime
    */
-  def bestBetween(a1 : Allocation, a2 : Allocation) : Allocation =
+  private def bestBetween(a1 : Allocation, a2 : Allocation) : Allocation =
     if (a1.globalFlowtime <= a2.globalFlowtime) a1 else a2
 
   /**
-   * returns a neighbor better than the current allocation (the first encountered)
-   */
-  def firstBetterNeighbor(current: Allocation, mycost: (Allocation, Deal) => Double, withSwap: Boolean, alreadyVisited : Set[Allocation]): Option[(Allocation, Deal)] = {
+   * Returns the best state which is a neighbor with the lowest objective function value
+   * @param current is the current allocation/state
+   * @param evaluationFunction is the objective function
+   * @param isSwap is true if the neighbourhood is the swap neighbourhood of false if it is the delegation neighbourhood
+   * @param alreadyVisited is the set of already visited states to avoid cycling
+   * */
+  private def firstBetterNeighbor(current: Allocation, evaluationFunction: Allocation => Double, isSwap: Boolean, alreadyVisited : Set[Allocation]): Option[(Allocation, Deal)] = {
     newBestAllocation = false
     stap.ds.computingNodes.foreach { initiator => // Foreach initiator
       current.bundle(initiator).foreach { task => // Foreach task in its bundle
         (stap.ds.computingNodes diff Set(initiator)).foreach { responder => // Foreach responder
-          if (withSwap) { // Let us consider the swaps from this delegation
+          if (isSwap) { // Let us consider the swaps from this delegation
             current.bundle(responder).foreach { counterpart => // Foreach counterpart
               val swap = new SingleSwap(stap, initiator, responder, task, counterpart)
               val nextAllocation = swap.execute(current)
-              val currentValue = mycost(current, swap)
-              val nextValue = mycost(nextAllocation, swap)
+              val currentValue = evaluationFunction(current)
+              val nextValue = evaluationFunction(nextAllocation)
               if (nextValue < currentValue && !alreadyVisited.contains(nextAllocation)) {
-                bestAllocationForCost = bestBetween(current, nextAllocation)
-                bestCost = bestAllocationForCost.globalFlowtime
+                bestAllocationForCost = Some(bestBetween(current, nextAllocation))
+                bestCost = bestAllocationForCost.get.globalFlowtime
                 if (bestCost > current.globalFlowtime) newBestAllocation = true else newBestAllocation = false
                 return Some((nextAllocation, swap))
               }
@@ -80,11 +74,11 @@ class VDSBalancer (stap: STAP, rule: SocialRule = GlobalFlowtime, name: String =
           } else { // gift, not swap
             val gift = new SingleDelegation(stap, initiator, responder, task)
             val nextAllocation = gift.execute(current)
-            val nextValue = mycost(nextAllocation, gift)
-            val currentValue = mycost(current, gift)
+            val nextValue = evaluationFunction(nextAllocation)
+            val currentValue = evaluationFunction(current)
             if (nextValue < currentValue && !alreadyVisited.contains(nextAllocation)) {
-              bestAllocationForCost = bestBetween(current, nextAllocation)
-              bestCost = bestAllocationForCost.globalFlowtime
+              bestAllocationForCost = Some(bestBetween(current, nextAllocation))
+              bestCost = bestAllocationForCost.get.globalFlowtime
               if (bestCost > current.globalFlowtime) newBestAllocation = true else newBestAllocation = false
               return Some((nextAllocation, gift))
             }
@@ -99,7 +93,7 @@ class VDSBalancer (stap: STAP, rule: SocialRule = GlobalFlowtime, name: String =
   /**
    * returns the best neighbor of the current allocation w.r.t. mycost
    */
-  def bestNeighbor(current: Allocation, mycost: (Allocation, Deal) => Double, withSwap: Boolean, alreadyVisited : Set[Allocation]): Option[(Allocation, Deal)] = {
+  private def bestNeighbor(current: Allocation, evaluationFunction: (Allocation, Deal) => Double, isSwap: Boolean, alreadyVisited : Set[Allocation]): Option[(Allocation, Deal)] = {
     newBestAllocation = false
     var bestAllocation : Allocation = null
     var bestValue : Double = Double.MaxValue
@@ -107,12 +101,12 @@ class VDSBalancer (stap: STAP, rule: SocialRule = GlobalFlowtime, name: String =
     stap.ds.computingNodes.foreach { initiator => // Foreach initiator
       current.bundle(initiator).foreach { task => // Foreach task in its bundle
         (stap.ds.computingNodes diff Set(initiator)).foreach { responder => // Foreach responder
-          if (withSwap) { // Let us consider the swaps from this delegation
+          if (isSwap) { // Let us consider the swaps from this delegation
             current.bundle(responder).foreach { counterpart => // Foreach counterpart
               val swap = new SingleSwap(stap, initiator, responder, task, counterpart)
               val nextAllocation = swap.execute(current)
-              val currentValue = mycost(current, swap)
-              val nextValue = mycost(nextAllocation, swap)
+              val currentValue = evaluationFunction(current, swap)
+              val nextValue = evaluationFunction(nextAllocation, swap)
               if (nextValue < currentValue && !alreadyVisited.contains(nextAllocation)) {
                 bestAllocation = nextAllocation
                 bestValue = nextValue
@@ -122,8 +116,8 @@ class VDSBalancer (stap: STAP, rule: SocialRule = GlobalFlowtime, name: String =
           } else { // gift, not swap
             val gift = new SingleDelegation(stap, initiator, responder, task)
             val nextAllocation = gift.execute(current)
-            val nextValue = mycost(nextAllocation, gift)
-            val currentValue = mycost(current, gift)
+            val nextValue = evaluationFunction(nextAllocation, gift)
+            val currentValue = evaluationFunction(current, gift)
             if (nextValue < currentValue && !alreadyVisited.contains(nextAllocation)) {
               bestAllocation = nextAllocation
               bestValue = nextValue
@@ -135,33 +129,36 @@ class VDSBalancer (stap: STAP, rule: SocialRule = GlobalFlowtime, name: String =
     }
     if (bestAllocation == null) None
     else {
-      bestAllocationForCost = bestBetween(current, bestAllocation)
-      bestCost = bestAllocationForCost.globalFlowtime
+      bestAllocationForCost = Some(bestBetween(current, bestAllocation))
+      bestCost = bestAllocationForCost.get.globalFlowtime
       if (bestCost > current.globalFlowtime) newBestAllocation = true else newBestAllocation = false
       Some((bestAllocation, bestDeal))
     }
   }
 
   /**
-   * returns a better allocation, found iterating firstBetterNeighbor
-   * Use reflexive transitive closure of neighborhood, i.e. If no better allocation is found in neighborhood, returns the allocation itself.
+   * Returns a locally optimal solution with respect to the neighborhood and the evaluation function
+   * @param current is the current allocation/state
+   * @param evaluationFunction is the objective function
+   * @param isSwap is true if the neighbourhood is the swap neighbourhood of false if it is the delegation neighbourhood
+   * @param alreadyVisited is the set of already visited states to avoid cycling
    */
-  def LS(allocation: Allocation, mycost: (Allocation, Deal) => Double, withSwap: Boolean, alreadyVisited : Set[Allocation]): (Allocation, Set[Allocation]) = {
-    //super.init(allocation)
-    val candidate = firstBetterNeighbor(allocation, mycost, withSwap, alreadyVisited)
+  @tailrec
+  private def localSearch(current: Allocation, evaluationFunction: Allocation => Double, isSwap: Boolean, alreadyVisited: Set[Allocation]) : (Allocation, Set[Allocation]) = {
+    val candidate = firstBetterNeighbor(current, evaluationFunction, isSwap, alreadyVisited)
     candidate match {
       case Some((alloc, _)) =>
-        if (alloc.equals(allocation)) {
-          (allocation, alreadyVisited)
+        if (alloc.equals(current)) {
+          (current, alreadyVisited)
         } else {
-          LS(alloc, mycost, withSwap, alreadyVisited+alloc)
+          localSearch(alloc, evaluationFunction, isSwap, alreadyVisited + alloc)
         }
-      case None => (allocation, alreadyVisited)
+      case None => (current, alreadyVisited)
     }
   }
 
   private def step2b(allocation: Allocation, alreadyVisited : Set[Allocation]): (Allocation, Set[Allocation]) = {
-    val result1 = bestNeighbor(allocation, penalizedCost, withSwap = true, alreadyVisited)
+    val result1 = bestNeighbor(allocation, (allocation: Allocation, deal: Deal) => penalizedCost(allocation), isSwap = true, alreadyVisited)
     result1 match{
       case Some((allocbis, _)) => (allocbis, alreadyVisited+allocbis)
       case None => (allocation, alreadyVisited)
@@ -171,30 +168,30 @@ class VDSBalancer (stap: STAP, rule: SocialRule = GlobalFlowtime, name: String =
   @tailrec
   private def variable_depth_search_step2(allocation: Allocation, alreadyVisited : Set[Allocation]): (Allocation, Set[Allocation]) ={
     // step 2a = a shift move
-    val step2a = bestNeighbor(allocation, penalizedCost, withSwap = false, alreadyVisited)
+    val step2a = bestNeighbor(allocation, (allocation: Allocation, deal: Deal) => penalizedCost(allocation), isSwap = false, alreadyVisited)
     step2a match {
       case Some((alloc, _)) => // step2b = sequence of swap moves
         if (newBestAllocation) //exit to step3
-          variable_depth_search_step2(bestAllocationForCost, alreadyVisited+bestAllocationForCost+alloc)
+          variable_depth_search_step2(bestAllocationForCost.get, alreadyVisited + alloc + bestAllocationForCost.get)
         else {
           val result1 = step2b(allocation, alreadyVisited + alloc)
           //val nextstep = step2b(alloc, alreadyVisited+alloc)
           result1 match {
             case (allocBis, visited) =>
               if (newBestAllocation) //step3
-                variable_depth_search_step2(bestAllocationForCost, visited + bestAllocationForCost)
+                variable_depth_search_step2(bestAllocationForCost.get, visited + bestAllocationForCost.get)
               else if (allocBis.equals(alloc) ) // return to step2a (i.e. try shift)
                 variable_depth_search_step2(allocBis, visited)
               else step2b(allocBis, visited+allocBis)
           }
         }
-      case None => (bestAllocationForCost, alreadyVisited) // exit to step 3 ?
+      case None => (bestAllocationForCost.get, alreadyVisited) // exit to step 3 ?
     }
   }
 
   /**
-   * Modify the current allocation
-   * iterate step 2/step 3 in VDS (no iteration with new initial allocation)
+   * Returns the closest local minima from a current state
+   * by iterating step 2/step 3 in VDS (no iteration with new initial allocation)
    */
   override def reallocate(allocation: Allocation): Allocation = {
     val current = scheduler.schedule(allocation)
@@ -204,16 +201,18 @@ class VDSBalancer (stap: STAP, rule: SocialRule = GlobalFlowtime, name: String =
 
   /**
    * Returns an initial allocation
-   * @note why we do not use a random one
+   * Generates randomly an allocation, then
+   * apply local search with respect to the delegation and the penalized cost,
+   * and apply local search with respect to the swap and the penalized cost,
    */
   override def init(allocation: Allocation): Allocation = {
-    val initialAllocation = super.init(allocation)
+    val initialAllocation: Allocation = super.init(allocation)
     bestCost = initialAllocation.globalFlowtime
-    bestAllocationForCost = initialAllocation
-    // LS modifies bestGF and bestAlloc :
-    val allocbisWithPath = LS(initialAllocation, penalizedCost, withSwap = false, Set[Allocation]()+initialAllocation)
-    LS(allocbisWithPath._1, penalizedCost, withSwap = true, allocbisWithPath._2)
-    bestAllocationForCost
+    bestAllocationForCost = Some(initialAllocation)
+    // LS modifies bestCost and bestAllocationForCost
+    val allocBisWithPath = localSearch(initialAllocation, penalizedCost, isSwap = false, Set[Allocation]() + initialAllocation)
+    localSearch(allocBisWithPath._1,  penalizedCost, isSwap = true, allocBisWithPath._2)
+    bestAllocationForCost.get
   }
 }
 
diff --git a/src/main/scala/org/smastaplus/core/Allocation.scala b/src/main/scala/org/smastaplus/core/Allocation.scala
index 3124c38b..87c71e04 100644
--- a/src/main/scala/org/smastaplus/core/Allocation.scala
+++ b/src/main/scala/org/smastaplus/core/Allocation.scala
@@ -258,6 +258,15 @@ class Allocation(val stap: STAP,
     Math.max(maxCompletionDate, completionDate(job))}
   */
 
+  /**
+   * Returns the sum of the workload
+   * The throughput is a utilitarian measure of system performance
+   * from the point of view of the executors.
+   * It does not depend on scheduling.
+   */
+  def throughput: Double = stap.ds.computingNodes.foldLeft(0.0){
+    (sumWorkload: Double, node: ComputingNode) => sumWorkload + workload(node)
+  }
   /**
    * Returns the size of local resources for a task
    */
diff --git a/src/test/scala/org/smastaplus/balancer/local/VDSBalancerEx1Spec.scala b/src/test/scala/org/smastaplus/balancer/local/VDSBalancerEx1Spec.scala
index 7f1b7146..bb171b7a 100644
--- a/src/test/scala/org/smastaplus/balancer/local/VDSBalancerEx1Spec.scala
+++ b/src/test/scala/org/smastaplus/balancer/local/VDSBalancerEx1Spec.scala
@@ -16,7 +16,7 @@ class VDSBalancerEx1Spec extends AnyFlatSpec {
 
   "VDSBalancer" should
     "perform TODO" in {
-    val balancer = new VDSBalancer(stap, GlobalFlowtime, withSwap = true)
+    val balancer = new VDSBalancer(stap)
     balancer.trace = true
     //println(a)
     val outcome = balancer.reallocate(a)
diff --git a/src/test/scala/org/smastaplus/balancer/local/VDSBalancerEx2Spec.scala b/src/test/scala/org/smastaplus/balancer/local/VDSBalancerEx2Spec.scala
index 39fde11c..0f4acfa4 100644
--- a/src/test/scala/org/smastaplus/balancer/local/VDSBalancerEx2Spec.scala
+++ b/src/test/scala/org/smastaplus/balancer/local/VDSBalancerEx2Spec.scala
@@ -16,15 +16,15 @@ class VDSBalancerEx2Spec extends AnyFlatSpec {
 
   "VDS" should
     "perform TODO" in {
-    val balancer = new VDSBalancer(stap, GlobalFlowtime, withSwap = true)
+    val balancer = new VDSBalancer(stap)
     balancer.trace = true
     println(a)
     val outcome = balancer.reallocate(a)
     println(outcome)
     assert(
-      (outcome.bundle(cn1) == List(t1, t4, t7) &&
-        outcome.bundle(cn2) == List(t8, t3, t2) &&
-        outcome.bundle(cn3) == List(t9, t5, t6))
+      (outcome.bundle(cn1) == List(t7, t1, t2) &&
+        outcome.bundle(cn2) == List(t3, t5, t4) &&
+        outcome.bundle(cn3) == List(t8, t9, t6))
     )
     assert(outcome.globalFlowtime <= a.globalFlowtime)
   }
diff --git a/src/test/scala/org/smastaplus/balancer/local/VDSBalancerSpec.scala b/src/test/scala/org/smastaplus/balancer/local/VDSBalancerSpec.scala
index baea7b48..c951a8ef 100644
--- a/src/test/scala/org/smastaplus/balancer/local/VDSBalancerSpec.scala
+++ b/src/test/scala/org/smastaplus/balancer/local/VDSBalancerSpec.scala
@@ -20,7 +20,7 @@ class VDSBalancerSpec extends AnyFlatSpec {
     val d = 3 // with d duplicated instances per resource
     val pb = STAP.randomProblem(l = l, m = m, n = n, o, d , Uncorrelated)
     val a = Allocation.randomAllocation(pb)
-    val balancer = new VDSBalancer(pb, GlobalFlowtime, withSwap = true)
+    val balancer = new VDSBalancer(pb)
     balancer.trace = false
     val outcome = balancer.reallocate(a)
     assert(outcome.isSound)
-- 
GitLab