From 1e80c5981e1038599bda4d239398cf98417718c3 Mon Sep 17 00:00:00 2001 From: unknown <david.antunes-da-silva@irit.fr> Date: Wed, 20 Jul 2022 11:14:48 +0200 Subject: [PATCH] Add MonoThreadedCycling scheduler + some minor corrections in the generalization of the schedulers --- src/example/philosophes/MainPhilosophe.java | 37 +-- src/example/philosophes/Philosopher.java | 10 +- src/example/philosophes/Waste.java | 6 +- src/mas/core/Agent.java | 4 +- src/mas/core/Cyclable.java | 4 +- .../core/{Schedulable.java => Scheduler.java} | 6 +- .../schedulers/AsyncCycling.java | 23 +- .../schedulers/FairCycling.java | 21 +- .../schedulers/FairPosCycling.java | 6 +- .../schedulers/MonoThreadedCycling.java | 223 ++++++++++++++++++ .../schedulers/ThreeStepCycling.java | 4 +- .../schedulers/variations/TwoDCycling.java | 2 +- 12 files changed, 271 insertions(+), 75 deletions(-) rename src/mas/core/{Schedulable.java => Scheduler.java} (86%) create mode 100644 src/mas/implementation/schedulers/MonoThreadedCycling.java diff --git a/src/example/philosophes/MainPhilosophe.java b/src/example/philosophes/MainPhilosophe.java index a93df0d..d228b7a 100644 --- a/src/example/philosophes/MainPhilosophe.java +++ b/src/example/philosophes/MainPhilosophe.java @@ -1,9 +1,7 @@ package example.philosophes; import mas.core.Cyclable; -import mas.core.ThreeStepCyclable; -import mas.implementation.schedulers.FairCycling; -import mas.implementation.schedulers.ThreeStepCycling; +import mas.implementation.schedulers.MonoThreadedCycling; import java.util.ArrayList; import java.util.List; @@ -12,19 +10,19 @@ public class MainPhilosophe { public static void main(String[] args) { - class MyFairCycling extends FairCycling<Cyclable> { + class MyFairCycling extends MonoThreadedCycling<Cyclable> { long startTimeCycle = 0; List<Long> cycleTime = new ArrayList<>(); - public MyFairCycling(ThreeStepCyclable... _cyclables){ + public MyFairCycling(Cyclable... _cyclables){ super(_cyclables); } @Override public boolean stopCondition() { - return nbOfCycles == 100; + return nbOfCycles == 1000; } @Override @@ -44,7 +42,7 @@ public class MainPhilosophe { } int nAgents = 100; - int nbCycles = 100; + int nbCycles = 1000; Philosopher[] philosophers = new Philosopher[nAgents]; Fork[] forks = new Fork[nAgents]; @@ -92,33 +90,8 @@ public class MainPhilosophe { scheduler.start(); - /*try { - Thread.sleep(2000); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } - scheduler.pause(); - - try { - Thread.sleep(2000); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } - - scheduler.resume(); - - - try { - Thread.sleep(5000); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } - - scheduler.stop();*/ - scheduler.waitUntilFinish(); - final long endTime = System.nanoTime(); System.out.println("AMAk : Total execution time: " + (endTime / 1000 - startTime / 1000) + " microseconds"); System.out.println("\tCycle moyen : " + scheduler.getCycleTime() / 1000 + " microseconds"); diff --git a/src/example/philosophes/Philosopher.java b/src/example/philosophes/Philosopher.java index 23c6b04..555c9ba 100644 --- a/src/example/philosophes/Philosopher.java +++ b/src/example/philosophes/Philosopher.java @@ -1,7 +1,7 @@ package example.philosophes; import mas.core.Agent; -import mas.core.Schedulable; +import mas.core.Scheduler; import java.util.*; @@ -30,7 +30,7 @@ public class Philosopher extends Agent { */ private int id; - private Schedulable scheduler; + private Scheduler scheduler; /** * States philosophers can be in @@ -121,7 +121,7 @@ public class Philosopher extends Agent { public void act() { //System.out.println("Philosopher num " + id + " act"); fibonacciRecursion(23); - //scheduler.addCyclable(new Waste(id)); + scheduler.addCyclable(new Waste(id)); } public int fibonacciRecursion(int n){ @@ -218,11 +218,11 @@ public class Philosopher extends Agent { this.rightPhilosopher = rightPhilosopher; } - public void setScheduler(Schedulable scheduler) { + public void setScheduler(Scheduler scheduler) { this.scheduler = scheduler; } - public Schedulable getScheduler() { + public Scheduler getScheduler() { return scheduler; } diff --git a/src/example/philosophes/Waste.java b/src/example/philosophes/Waste.java index 01670d5..8d0f84e 100644 --- a/src/example/philosophes/Waste.java +++ b/src/example/philosophes/Waste.java @@ -1,13 +1,13 @@ package example.philosophes; import mas.core.Cyclable; -import mas.core.Schedulable; +import mas.core.Scheduler; public class Waste implements Cyclable { private int id; - Schedulable scheduler = null; + Scheduler scheduler = null; public Waste(int _id){ id = _id; @@ -24,7 +24,7 @@ public class Waste implements Cyclable { } @Override - public void setScheduler(Schedulable _scheduler) { + public void setScheduler(Scheduler _scheduler) { scheduler = _scheduler; } diff --git a/src/mas/core/Agent.java b/src/mas/core/Agent.java index 7afaba6..378da89 100644 --- a/src/mas/core/Agent.java +++ b/src/mas/core/Agent.java @@ -8,7 +8,7 @@ public class Agent implements ThreeStepCyclable{ /** * The scheduler of the agent. */ - protected Schedulable scheduler; + protected Scheduler scheduler; @Override public void perceive() { @@ -31,7 +31,7 @@ public class Agent implements ThreeStepCyclable{ } @Override - public void setScheduler(Schedulable _scheduler) { + public void setScheduler(Scheduler _scheduler) { scheduler = _scheduler; } } diff --git a/src/mas/core/Cyclable.java b/src/mas/core/Cyclable.java index c5785b3..93c6b22 100644 --- a/src/mas/core/Cyclable.java +++ b/src/mas/core/Cyclable.java @@ -1,7 +1,7 @@ package mas.core; /** - * A cyclable objet. This objet must be scheduled by a {@link Schedulable}. + * A cyclable objet. This objet must be scheduled by a {@link Scheduler}. */ public interface Cyclable { @@ -23,5 +23,5 @@ public interface Cyclable { * @param _scheduler * The scheduler in which the cyclable is going to be scheduled. */ - void setScheduler(Schedulable _scheduler); + void setScheduler(Scheduler _scheduler); } diff --git a/src/mas/core/Schedulable.java b/src/mas/core/Scheduler.java similarity index 86% rename from src/mas/core/Schedulable.java rename to src/mas/core/Scheduler.java index acad3b2..7251053 100644 --- a/src/mas/core/Schedulable.java +++ b/src/mas/core/Scheduler.java @@ -3,7 +3,7 @@ package mas.core; /** * A schedulable object. Made to code schedulers. */ -public interface Schedulable { +public interface Scheduler<T extends Cyclable> { /** * Launch the scheduler if it is not running. @@ -31,7 +31,7 @@ public interface Schedulable { * @param cyclable * The cyclable to add */ - void addCyclable(Cyclable cyclable); + void addCyclable(T cyclable); /** @@ -51,7 +51,7 @@ public interface Schedulable { /** * Pause the execution until the executor has finished. * - * This function needs to be called after a call to {@link Schedulable#stop()} or with the redefinition of {@link Schedulable#stopCondition()}. + * This function needs to be called after a call to {@link Scheduler#stop()} or with the redefinition of {@link Scheduler#stopCondition()}. */ void waitUntilFinish(); } diff --git a/src/mas/implementation/schedulers/AsyncCycling.java b/src/mas/implementation/schedulers/AsyncCycling.java index a99e265..650f455 100644 --- a/src/mas/implementation/schedulers/AsyncCycling.java +++ b/src/mas/implementation/schedulers/AsyncCycling.java @@ -1,10 +1,11 @@ package mas.implementation.schedulers; import mas.core.Cyclable; -import mas.core.Schedulable; +import mas.core.Scheduler; import mas.core.Sleepable; import java.util.LinkedHashSet; +import java.util.Queue; import java.util.Set; import java.util.concurrent.*; @@ -13,12 +14,12 @@ import java.util.concurrent.*; * * @author David Antunes */ -public class AsyncCycling implements Schedulable, Sleepable { +public class AsyncCycling<T extends Cyclable> implements Scheduler<T>, Sleepable { /** * The cyclable objects handled by the scheduler. */ - protected final Set<Cyclable> cyclables = new LinkedHashSet<>(); + protected final Queue<T> cyclables = new ConcurrentLinkedDeque<>(); /** * Time between two cycles. Default time in {@link Sleepable#DEFAULT_SLEEP}. @@ -41,9 +42,10 @@ public class AsyncCycling implements Schedulable, Sleepable { * @param _cyclables * The corresponding cyclables */ - public AsyncCycling(Cyclable... _cyclables){ + @SafeVarargs + public AsyncCycling(T... _cyclables){ - for(Cyclable cyclable : _cyclables){ + for(T cyclable : _cyclables){ cyclables.add(cyclable); cyclable.setScheduler(this); } @@ -51,7 +53,7 @@ public class AsyncCycling implements Schedulable, Sleepable { @Override public void start() { - for (Cyclable cyclable : cyclables){ + for (T cyclable : cyclables){ executor.execute(() -> manageCyclable(cyclable)); } } @@ -60,11 +62,6 @@ public class AsyncCycling implements Schedulable, Sleepable { public void stop() { mustStop = true; executor.shutdown(); - try { - executor.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } } @Override @@ -78,7 +75,7 @@ public class AsyncCycling implements Schedulable, Sleepable { } @Override - public void addCyclable(Cyclable cyclable) { + public void addCyclable(T cyclable) { cyclables.add(cyclable); cyclable.setScheduler(this); @@ -131,7 +128,7 @@ public class AsyncCycling implements Schedulable, Sleepable { * @param cyclable * The cyclable */ - protected void manageCyclable(Cyclable cyclable){ + protected void manageCyclable(T cyclable){ cyclable.cycle(); doSleep(); diff --git a/src/mas/implementation/schedulers/FairCycling.java b/src/mas/implementation/schedulers/FairCycling.java index b31efae..be6ec91 100644 --- a/src/mas/implementation/schedulers/FairCycling.java +++ b/src/mas/implementation/schedulers/FairCycling.java @@ -1,7 +1,7 @@ package mas.implementation.schedulers; import mas.core.Cyclable; -import mas.core.Schedulable; +import mas.core.Scheduler; import mas.core.Sleepable; import java.util.*; @@ -11,9 +11,12 @@ import java.util.concurrent.*; * The FairCycling scheduler schedules tasks using a {@link Executors#newCachedThreadPool()}. * Every cyclable executes its cycle once every system's cycle. * + * @param <T> + * Extended class of {@link Cyclable} + * * @author David Antunes */ -public class FairCycling<T extends Cyclable> implements Schedulable, Sleepable { +public class FairCycling<T extends Cyclable> implements Scheduler<T>, Sleepable { /** * The cyclable objects handled by the scheduler. @@ -21,7 +24,7 @@ public class FairCycling<T extends Cyclable> implements Schedulable, Sleepable { protected Set<T> cyclables = new LinkedHashSet<>(); /** - * The cyclables that must be add in the next cycle. + * The cyclables that must be added in the next cycle. */ protected Queue<T> pendingToAddCyclables = new ConcurrentLinkedQueue<>(); @@ -98,6 +101,12 @@ public class FairCycling<T extends Cyclable> implements Schedulable, Sleepable { } } + @Override + public void addCyclable(T cyclable) { + cyclable.setScheduler(this); + pendingToAddCyclables.add(cyclable); + } + @Override public boolean stopCondition(){ return false; @@ -117,12 +126,6 @@ public class FairCycling<T extends Cyclable> implements Schedulable, Sleepable { } } - @Override - public void addCyclable(Cyclable cyclable){ - cyclable.setScheduler(this); - pendingToAddCyclables.add((T) cyclable); - } - @Override public int getSleep() { return sleep; diff --git a/src/mas/implementation/schedulers/FairPosCycling.java b/src/mas/implementation/schedulers/FairPosCycling.java index 283d615..c87a5e1 100644 --- a/src/mas/implementation/schedulers/FairPosCycling.java +++ b/src/mas/implementation/schedulers/FairPosCycling.java @@ -1,13 +1,13 @@ package mas.implementation.schedulers; import mas.core.Cyclable; -import mas.core.Schedulable; +import mas.core.Scheduler; import mas.core.Sleepable; /** - * Même que fair + equitable sur position des Round Robin + * Même que MonoThreadedCycling + equitable sur position des Round Robin */ -public class FairPosCycling implements Schedulable, Sleepable { +public class FairPosCycling implements Scheduler, Sleepable { @Override public void start() { diff --git a/src/mas/implementation/schedulers/MonoThreadedCycling.java b/src/mas/implementation/schedulers/MonoThreadedCycling.java new file mode 100644 index 0000000..15af4a5 --- /dev/null +++ b/src/mas/implementation/schedulers/MonoThreadedCycling.java @@ -0,0 +1,223 @@ +package mas.implementation.schedulers; + +import mas.core.Cyclable; +import mas.core.Scheduler; +import mas.core.Sleepable; + +import java.util.LinkedHashSet; +import java.util.Queue; +import java.util.Set; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.CountDownLatch; + +/** + * The MonoThreadedCycling scheduler schedules tasks using a {@link Thread}. The execution is sequential. + * This scheduler is very efficient with low complexity cyclables. + * Every cyclable executes its cycle once every system's cycle. + * + * @param <T> + * Extended class of {@link mas.core.Cyclable} + * + * @author David Antunes + */ +public class MonoThreadedCycling<T extends Cyclable> implements Scheduler<T>, Sleepable { + + /** + * The cyclable objects handled by the scheduler. + */ + protected Set<T> cyclables = new LinkedHashSet<>(); + + /** + * The cyclables that must be added in the next cycle. + */ + protected Queue<T> pendingToAddCyclables = new ConcurrentLinkedQueue<>(); + + /** + * Time between two cycles. Default time in {@link Sleepable#DEFAULT_SLEEP}. + */ + protected int sleep = DEFAULT_SLEEP; + + /** + * Number of system cycles. + */ + protected int nbOfCycles = 0; + + /** + * Condition to know if the scheduler must be stopped. + */ + protected boolean mustStop = false; + + /** + * Condition to know if the scheduler must be paused. + */ + protected boolean mustPause = false; + + /** + * Object used to pause the scheduler. + */ + protected CountDownLatch pauseLatch; + + /** + * The thread which executes the cycle. + */ + Thread executionThread = new Thread(this::doCycle); + + /** + * Constructor which set the initial cyclables. + * + * @param cyclables + * The corresponding cyclables + */ + @SafeVarargs + public MonoThreadedCycling(T... cyclables){ + for (T cyclable : cyclables) { + addCyclable(cyclable); + } + } + + @Override + public void start() { + executionThread.start(); + } + + @Override + public void stop() { + mustStop = true; + } + + @Override + public void pause() { + if(pauseLatch == null || pauseLatch.getCount() == 0){ + pauseLatch = new CountDownLatch(1); + } + mustPause = true; + } + + @Override + public void resume() { + if(pauseLatch != null){ + pauseLatch.countDown(); + } + } + + @Override + public void addCyclable(T cyclable) { + cyclable.setScheduler(this); + pendingToAddCyclables.add(cyclable); + } + + @Override + public boolean stopCondition() { + return false; + } + + @Override + public boolean isFinished() { + return !executionThread.isAlive(); + } + + @Override + public void waitUntilFinish() { + try { + executionThread.join(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + + @Override + public int getSleep() { + return sleep; + } + + @Override + public void setSleep(int sleep) { + this.sleep = sleep; + } + + @Override + public void doSleep() { + if (getSleep() != 0) { + try { + Thread.sleep(sleep); + } catch (final InterruptedException e) { + e.printStackTrace(); + } + } + } + + /** + * Executes {@link #step()} until the scheduler must stop. + */ + protected void doCycle(){ + while(!mustStop){ + step(); + if(stopCondition()){ + this.stop(); + } + + if(mustPause){ + try{ + pauseLatch.await(); + } catch (InterruptedException e){ + throw new RuntimeException(e); + } + } + } + } + + /** + * Executes a system's cycle. + */ + protected void step(){ + onCycleStarts(); + + treatPendingCyclables(); + + for(T cyclable : cyclables){ + cyclable.cycle(); + if(!cyclable.terminate()){ + pendingToAddCyclables.add(cyclable); + } + } + + doSleep(); + + onCycleEnds(); + + cyclables.clear(); + + nbOfCycles++; + } + + /** + * Add the cyclables that are going to be scheduled on the current cycle. + */ + protected void treatPendingCyclables() { + cyclables.addAll(pendingToAddCyclables); + pendingToAddCyclables.clear(); + } + + /** + * This method is called at the end of every system's cycle. + */ + protected void onCycleEnds() { + + } + + /** + * This method is called at the start of every system's cycle. + */ + protected void onCycleStarts(){ + + } + + /** + * Getter for the number of cycles. + * + * @return the number of cycles performed by the system + */ + public int getNbOfCycles() { + return nbOfCycles; + } +} diff --git a/src/mas/implementation/schedulers/ThreeStepCycling.java b/src/mas/implementation/schedulers/ThreeStepCycling.java index 100923c..44edf77 100644 --- a/src/mas/implementation/schedulers/ThreeStepCycling.java +++ b/src/mas/implementation/schedulers/ThreeStepCycling.java @@ -36,7 +36,7 @@ public class ThreeStepCycling extends FairCycling<ThreeStepCyclable> { protected void step() { onCycleStarts(); - treatPendingCyclables(); + //treatPendingCyclables(); cycleLatch = new CountDownLatch(cyclables.size()); perceptionLatch = new CountDownLatch(cyclables.size()); @@ -56,7 +56,7 @@ public class ThreeStepCycling extends FairCycling<ThreeStepCyclable> { threeStepCyclable.act(); if(threeStepCyclable.terminate()){ - pendingToAddCyclables.add(threeStepCyclable); + //pendingToAddCyclables.add(threeStepCyclable); } cycleLatch.countDown(); }); diff --git a/src/mas/implementation/schedulers/variations/TwoDCycling.java b/src/mas/implementation/schedulers/variations/TwoDCycling.java index d08bb54..ce906a2 100644 --- a/src/mas/implementation/schedulers/variations/TwoDCycling.java +++ b/src/mas/implementation/schedulers/variations/TwoDCycling.java @@ -129,7 +129,7 @@ public class TwoDCycling extends FairCycling<Cyclable> { @Override public void pause() { state = State.IDLE; - if(pauseLatch == null){ + if(pauseLatch == null || pauseLatch.getCount() == 0){ pauseLatch = new CountDownLatch(1); } mustPause = true; -- GitLab