From 430962bf75e0abcb9aecad6e79ac18bd3e6562e0 Mon Sep 17 00:00:00 2001 From: Alexandre <alexandre.perles@gmail.com> Date: Mon, 4 Sep 2023 13:21:56 +0200 Subject: [PATCH] Split cluster agents in two --- build.gradle | 2 +- .../irit/smac/amas4dc/amas/ClusterAgent.java | 302 ------------------ .../amas4dc/amas/DynamicClusteringAMAS.java | 9 +- .../amas/DynamicClusteringEnvironment.java | 3 +- .../smac/amas4dc/amas/agent/ClusterAgent.java | 209 ++++++++++++ .../amas4dc/amas/agent/DataPointAgent.java | 149 +++++++++ .../AMAS4DCCommandAndQueryHandler.java | 1 + .../amas/messages/EvaluatedScoreMessage.java | 10 +- ...mCandidateSimilarClusterAgentsMessage.java | 19 ++ .../messages/RequestSimilarityMessage.java | 10 +- .../fr/irit/smac/amas4dc/cluster/Cluster.java | 5 + .../smac/amas4dc/cluster/ExtendedCluster.java | 13 + .../smac/amas4dc/amas/ClusterAgentTest.groovy | 6 +- 13 files changed, 423 insertions(+), 315 deletions(-) delete mode 100644 src/main/java/fr/irit/smac/amas4dc/amas/ClusterAgent.java create mode 100644 src/main/java/fr/irit/smac/amas4dc/amas/agent/ClusterAgent.java create mode 100644 src/main/java/fr/irit/smac/amas4dc/amas/agent/DataPointAgent.java create mode 100644 src/main/java/fr/irit/smac/amas4dc/amas/messages/InformCandidateSimilarClusterAgentsMessage.java diff --git a/build.gradle b/build.gradle index bf1a5fd..4835731 100644 --- a/build.gradle +++ b/build.gradle @@ -31,7 +31,7 @@ tasks.register('experimentsTest', Test) { dependencies { // AMAK (Only one of the next two lines should be uncommented. Also, settings.gradle file should also be modified) - implementation 'com.github.alexandreprl:amak:3.1.0' // Uncomment this line to get AMAK from git repository (The last part matches a tag, a commit hash or the last commit of a branch : branchName-SNAPSHOT + implementation 'com.github.alexandreprl:amak:3.1.3' // Uncomment this line to get AMAK from git repository (The last part matches a tag, a commit hash or the last commit of a branch : branchName-SNAPSHOT // implementation project(':amak') // Uncomment this line to get AMAK from local implementation 'com.github.alexandreprl:lxplot:main-SNAPSHOT' diff --git a/src/main/java/fr/irit/smac/amas4dc/amas/ClusterAgent.java b/src/main/java/fr/irit/smac/amas4dc/amas/ClusterAgent.java deleted file mode 100644 index 85037cf..0000000 --- a/src/main/java/fr/irit/smac/amas4dc/amas/ClusterAgent.java +++ /dev/null @@ -1,302 +0,0 @@ -package fr.irit.smac.amas4dc.amas; - -import fr.irit.smac.amak.Agent; -import fr.irit.smac.amas4dc.amas.messages.EvaluatedScoreMessage; -import fr.irit.smac.amas4dc.amas.messages.RequestSimilarityMessage; -import fr.irit.smac.amas4dc.cluster.Cluster; -import fr.irit.smac.amas4dc.cluster.DataPoint; -import fr.irit.smac.amas4dc.cluster.ExtendedCluster; -import fr.irit.smac.amas4dc.event.NewClusterEvent; -import fr.irit.smac.amas4dc.event.RemoveClusterEvent; -import lombok.Getter; - -import java.text.MessageFormat; -import java.util.*; -import java.util.logging.Level; -import java.util.logging.Logger; - -public class ClusterAgent<T extends DataPoint> extends Agent<DynamicClusteringAMAS<T>, DynamicClusteringEnvironment<T>> { - private static final float FLOAT_COMPARISON_EPSILON = 0.000001f; - @Getter - private Cluster<T> cluster; - private State state = State.INIT; - private State nextState = state; - - //region Perception variables - private Cluster<T> receivedClusterForRequestSimilarity; - private ClusterAgent<T> receivedRequestSimilarityRequester; - private final Map<ClusterAgent<T>, Float> similarityScoresReceived = new HashMap<>(); - //endregion - - //region Decision variables - private boolean decideToFuse; - private boolean decideToDie; - private float computedSimilarityScoreDuringThisCycle; - //endregion - - //region Action variables - private final List<ClusterAgent<T>> requestSimilarityMessageSentAgents = new ArrayList<>(); - //endregion - - private final List<ClusterAgent<T>> clusterAgentsRoughlySimilarOnReady = new ArrayList<>(); - private final Random random = new Random(); - private static final Logger logger = Logger.getLogger(ClusterAgent.class.getName()); - private int amountOfCyclesWithTheSameState; - - protected ClusterAgent(DynamicClusteringAMAS amas, T dataPoint) { - super(amas); - if (amas.getMasSettings().amasOptions().contains(AMASOption.KeepAllDataPoints)) - this.setCluster(new ExtendedCluster<T>(dataPoint)); - else - this.setCluster(new Cluster<T>(dataPoint)); - } - - public enum State {DORMANT, WAITING_FOR_REPLY, DECIDING, INIT, DEAD, ACTIVE} - - public State getState() { - return state; - } - - @Override - protected void onReady() { - for (var other : - amas.getEnvironment().getAgentsFromBucket(cluster.getRepresentative().getBucketId())) { - if (other != this) { - if (isRoughlySimilarTo(other)) { - clusterAgentsRoughlySimilarOnReady.add(other); - } - } - } - } - - private boolean isRoughlySimilarTo(ClusterAgent<T> otherClusterAgent) { - return getAmas().getMasSettings().similarityScoreMethod().areRoughlySimilar(cluster.getRepresentative(), otherClusterAgent.cluster.getRepresentative()); - } - - @Override - protected void onAgentCycleEnd() { - if (state != State.DORMANT) - if (logger.isLoggable(Level.INFO)) - logger.info(MessageFormat.format("---- End {0} cycle state= {1} ----", this, state)); - } - - public boolean readyToStop() { - return state == State.DORMANT && !mailbox.hasMessageOfType(RequestSimilarityMessage.class); - } - - @Override - protected void onPerceive() { - switch (state) { - case DORMANT -> { - var receivedRequestSimilarityMessage = this.getMailbox().read(RequestSimilarityMessage.class); - if (receivedRequestSimilarityMessage.isPresent()) { - receivedClusterForRequestSimilarity = receivedRequestSimilarityMessage.get().getCluster(); - receivedRequestSimilarityRequester = (ClusterAgent) receivedRequestSimilarityMessage.get().getSender(); - } else { - receivedClusterForRequestSimilarity = null; - receivedRequestSimilarityRequester = null; - } - } - case WAITING_FOR_REPLY -> { - while (mailbox.hasMessageOfType(EvaluatedScoreMessage.class)) { - var evaluatedScoreMessage = this.getMailbox().read(EvaluatedScoreMessage.class); - if (evaluatedScoreMessage.isPresent()) { - similarityScoresReceived.put((ClusterAgent) evaluatedScoreMessage.get().getSender(), evaluatedScoreMessage.get().getSimilarityScore()); - } - } - clearDeadAgentsFromThoseWeExpectAnAnswerFrom(); - - } - } - } - - private void clearDeadAgentsFromThoseWeExpectAnAnswerFrom() { - var itr = requestSimilarityMessageSentAgents.iterator(); - while (itr.hasNext()) { - var a = itr.next(); - if (a.state == State.DEAD && !similarityScoresReceived.keySet().contains(a)) - itr.remove(); - } - for (var key : - similarityScoresReceived.keySet()) { - if (!requestSimilarityMessageSentAgents.contains(key)) - similarityScoresReceived.remove(key); - } - } - - @Override - protected void onDecide() { - switch (state) { - case DORMANT -> { - decideToDie = false; - if (receivedClusterForRequestSimilarity == null && amas.getMasSettings().amasOptions().contains(AMASOption.Forget)) { - // TODO decide to die logic - - } - } - case DECIDING -> { - decideToFuse = false; - if (logger.isLoggable(Level.INFO)) { - for (var e : - similarityScoresReceived.entrySet()) { - logger.info(this + " received similarity score " + e.getValue() + " from " + e.getKey()); - } - } - if (similarityScoresReceived.values().stream().anyMatch(v -> v >= getAmas().getMasSettings().fusionThreshold())) - decideToFuse = true; - - if (logger.isLoggable(Level.INFO)) { - if (decideToFuse) - logger.info(this + " decides to fuse with one of them"); - else - logger.info(this + " decides NOT to fuse with one of them"); - } - } - case ACTIVE -> { - if (receivedRequestSimilarityRequester != null) { - computedSimilarityScoreDuringThisCycle = computeSimilarityScore(receivedClusterForRequestSimilarity); - if (logger.isLoggable(Level.INFO)) - logger.info(this + " computed a similarity of " + computedSimilarityScoreDuringThisCycle + " with " + receivedRequestSimilarityRequester); - } - } - } - } - - @Override - protected void onAct() { - switch (state) { - case DORMANT -> { - if (receivedClusterForRequestSimilarity != null) - nextState = State.ACTIVE; - else if (decideToDie) { - die(); - } - } - case INIT -> { - similarityScoresReceived.clear(); - requestSimilarityMessageSentAgents.clear(); - for (var agent : clusterAgentsRoughlySimilarOnReady) { - if (logger.isLoggable(Level.INFO)) - logger.info(this + " sends RequestSimilarityMessage to " + agent); - agent.getMailbox().receive(new RequestSimilarityMessage(this, cluster)); - requestSimilarityMessageSentAgents.add(agent); - } - - if (clusterAgentsRoughlySimilarOnReady.isEmpty()) - nextState = State.DORMANT; - else if (!requestSimilarityMessageSentAgents.isEmpty()) - nextState = State.WAITING_FOR_REPLY; - } - case ACTIVE -> { - if (receivedRequestSimilarityRequester != null) { - var message = new EvaluatedScoreMessage(this, computedSimilarityScoreDuringThisCycle); - if (logger.isLoggable(Level.INFO)) - logger.info(this + " --> " + receivedRequestSimilarityRequester + " " + message); - receivedRequestSimilarityRequester.getMailbox().receive(message); - receivedRequestSimilarityRequester = null; - - if (computedSimilarityScoreDuringThisCycle >= getAmas().getMasSettings().fusionThreshold()) { - die(); - } else { - nextState = State.DORMANT; - } - } - } - case DECIDING -> { - if (decideToFuse) { - if (logger.isLoggable(Level.INFO)) - logger.info(this + " decides to fuse"); - var requestedAgentsWithHighestSimilarityScore = getRequestedAgentsWithSimilarityScoreAboveFusionThreshold(similarityScoresReceived); - if (!requestedAgentsWithHighestSimilarityScore.isEmpty()) { - for (var e : similarityScoresReceived.entrySet()) { - if (requestedAgentsWithHighestSimilarityScore.contains(e.getKey())) { - fuse(e.getKey()); - } - } - } else { - throw new RuntimeException("Wanted to fuse with an agent but no agents are available"); - } - } - nextState = State.DORMANT; - } - case WAITING_FOR_REPLY -> { - if (requestSimilarityMessageSentAgents.size() == similarityScoresReceived.size()) - nextState = State.DECIDING; - } - } - applyNextState(); - } - - private void die() { - nextState = State.DEAD; - destroy(); - } - - private void applyNextState() { - if (state == nextState) { - amountOfCyclesWithTheSameState++; - if (amountOfCyclesWithTheSameState > 10 && state == State.ACTIVE) - throw new RuntimeException(this + " is in the state " + state + " for too long"); - } else { - amountOfCyclesWithTheSameState = 0; - } - state = nextState; - } - - private void fuse(ClusterAgent<T> other) { - if (logger.isLoggable(Level.INFO)) - logger.info(this + " fuses with " + other); - - setCluster(fuse(other.cluster)); - } - - private void setCluster(Cluster<T> newCluster) { - if (cluster != null) { - getAmas().getEnvironment().removeFromBucket(cluster.getRepresentative().getBucketId(), this); - if (getAmas().getMasSettings().eventStore() != null) - getAmas().getMasSettings().eventStore().add(new RemoveClusterEvent(cluster)); - } - - cluster = newCluster; - getAmas().getEnvironment().assignToBucket(cluster.getRepresentative().getBucketId(), this); - if (getAmas().getMasSettings().eventStore() != null) - getAmas().getMasSettings().eventStore().add(new NewClusterEvent(cluster)); - } - - @Override - protected void onDestroy() { - if (cluster != null) - if (getAmas().getMasSettings().eventStore() != null) - getAmas().getMasSettings().eventStore().add(new RemoveClusterEvent(cluster)); - } - - private Cluster<T> fuse(Cluster<T> other) { - T newRepresentative = (T) amas.getMasSettings().dataPointFuser().apply(cluster.getRepresentative(), other.getRepresentative()); - if (getAmas().getMasSettings().amasOptions().contains(AMASOption.KeepAllDataPoints)) - return new ExtendedCluster<>(newRepresentative, cluster, other); - else - return new Cluster<T>(newRepresentative, cluster, other); - } - - private List<ClusterAgent<T>> getRequestedAgentsWithSimilarityScoreAboveFusionThreshold(Map<ClusterAgent<T>, Float> similarityScoresReceived) { - float highestScore = Float.MIN_VALUE; - List<ClusterAgent<T>> agentsWithHighestScore = new ArrayList<>(); - for (var e : similarityScoresReceived.entrySet()) { - if (e.getValue() >= getAmas().getMasSettings().fusionThreshold()) { - if (Math.abs(e.getValue() - highestScore) < FLOAT_COMPARISON_EPSILON) { - agentsWithHighestScore.add(e.getKey()); - } else if (e.getValue() > highestScore) { - agentsWithHighestScore.clear(); - agentsWithHighestScore.add(e.getKey()); - highestScore = e.getValue(); - } - } - } - return agentsWithHighestScore; - } - - - private float computeSimilarityScore(Cluster<T> other) { - return getAmas().getMasSettings().similarityScoreMethod().apply(this.cluster.getRepresentative(), other.getRepresentative()); - } -} diff --git a/src/main/java/fr/irit/smac/amas4dc/amas/DynamicClusteringAMAS.java b/src/main/java/fr/irit/smac/amas4dc/amas/DynamicClusteringAMAS.java index 1ec5d63..a1e3aa2 100644 --- a/src/main/java/fr/irit/smac/amas4dc/amas/DynamicClusteringAMAS.java +++ b/src/main/java/fr/irit/smac/amas4dc/amas/DynamicClusteringAMAS.java @@ -1,6 +1,8 @@ package fr.irit.smac.amas4dc.amas; import fr.irit.smac.amak.Amas; +import fr.irit.smac.amas4dc.amas.agent.ClusterAgent; +import fr.irit.smac.amas4dc.amas.agent.DataPointAgent; import fr.irit.smac.amas4dc.cluster.DataPoint; import lombok.Getter; import lombok.Setter; @@ -27,7 +29,7 @@ public class DynamicClusteringAMAS<T extends DataPoint> extends Amas<DynamicClus logger.info("---- BEGIN MAS CYCLE ----"); if (agents.isEmpty() || allAgentsAreDormant()) { var newDataPoint = environment.pollPendingDataPoint(); - newDataPoint.ifPresent(dataPoint -> new ClusterAgent<T>(this, dataPoint)); + newDataPoint.ifPresent(dataPoint -> new DataPointAgent<T>(this, dataPoint)); } } @@ -46,7 +48,8 @@ public class DynamicClusteringAMAS<T extends DataPoint> extends Amas<DynamicClus @Override public boolean stopCondition() { - return !environment.hasRemainingPendingAdditionDataPoints() && agents.stream() - .noneMatch(agent -> agent instanceof ClusterAgent clusterAgent && !clusterAgent.readyToStop()); + return !environment.hasRemainingPendingAdditionDataPoints() && + agents.stream().noneMatch(agent -> agent instanceof ClusterAgent clusterAgent && !clusterAgent.readyToStop()) && + agents.stream().noneMatch(agent -> agent instanceof DataPointAgent); } } diff --git a/src/main/java/fr/irit/smac/amas4dc/amas/DynamicClusteringEnvironment.java b/src/main/java/fr/irit/smac/amas4dc/amas/DynamicClusteringEnvironment.java index e51bcea..83f61c6 100644 --- a/src/main/java/fr/irit/smac/amas4dc/amas/DynamicClusteringEnvironment.java +++ b/src/main/java/fr/irit/smac/amas4dc/amas/DynamicClusteringEnvironment.java @@ -1,6 +1,7 @@ package fr.irit.smac.amas4dc.amas; import fr.irit.smac.amak.Environment; +import fr.irit.smac.amas4dc.amas.agent.ClusterAgent; import fr.irit.smac.amas4dc.cluster.DataPoint; import lombok.Getter; @@ -42,7 +43,7 @@ public class DynamicClusteringEnvironment<T extends DataPoint> extends Environme this.buckets.get(bucketId).remove(tClusterAgent); } - public List<ClusterAgent<T>> getAgentsFromBucket(int bucketId) { + public List<ClusterAgent<T>> getClusterAgentsFromBucket(int bucketId) { return this.buckets.getOrDefault(bucketId, new ArrayList<>()); } } diff --git a/src/main/java/fr/irit/smac/amas4dc/amas/agent/ClusterAgent.java b/src/main/java/fr/irit/smac/amas4dc/amas/agent/ClusterAgent.java new file mode 100644 index 0000000..75fc041 --- /dev/null +++ b/src/main/java/fr/irit/smac/amas4dc/amas/agent/ClusterAgent.java @@ -0,0 +1,209 @@ +package fr.irit.smac.amas4dc.amas.agent; + +import fr.irit.smac.amak.Agent; +import fr.irit.smac.amas4dc.amas.AMASOption; +import fr.irit.smac.amas4dc.amas.DynamicClusteringAMAS; +import fr.irit.smac.amas4dc.amas.DynamicClusteringEnvironment; +import fr.irit.smac.amas4dc.amas.messages.EvaluatedScoreMessage; +import fr.irit.smac.amas4dc.amas.messages.InformCandidateSimilarClusterAgentsMessage; +import fr.irit.smac.amas4dc.amas.messages.RequestSimilarityMessage; +import fr.irit.smac.amas4dc.cluster.Cluster; +import fr.irit.smac.amas4dc.cluster.DataPoint; +import fr.irit.smac.amas4dc.cluster.ExtendedCluster; +import fr.irit.smac.amas4dc.event.NewClusterEvent; +import fr.irit.smac.amas4dc.event.RemoveClusterEvent; +import lombok.Getter; + +import java.text.MessageFormat; +import java.util.*; +import java.util.logging.Level; +import java.util.logging.Logger; + +public class ClusterAgent<T extends DataPoint> extends Agent<DynamicClusteringAMAS<T>, DynamicClusteringEnvironment<T>> { + @Getter + private Cluster<T> cluster; + private State state = State.DORMANT; + private State nextState = state; + + //region Perception variables + private T receivedDataPointForRequestSimilarity; + private DataPointAgent<T> receivedRequestSimilarityRequester; + //endregion + + //region Decision variables + private boolean decideToDie; + private float computedSimilarityScoreDuringThisCycle; + //endregion + + //region Action variables + //endregion + private final Random random = new Random(); + private static final Logger logger = Logger.getLogger(ClusterAgent.class.getName()); + private int amountOfCyclesWithTheSameState; + private final List<Cluster<T>> receivedCandidateSimilarClusters = new ArrayList<>(); + + protected ClusterAgent(DynamicClusteringAMAS amas, T dataPoint) { + super(amas); + logger.info(this+" created"); + if (amas.getMasSettings().amasOptions().contains(AMASOption.KeepAllDataPoints)) + this.setCluster(new ExtendedCluster<T>(dataPoint)); + else + this.setCluster(new Cluster<T>(dataPoint)); + } + + public enum State {DORMANT, DECIDING, DEAD, ACTIVE} + + public State getState() { + return state; + } + + + @Override + protected void onAgentCycleEnd() { + if (state != State.DORMANT) + if (logger.isLoggable(Level.INFO)) + logger.info(MessageFormat.format("---- End {0} cycle state= {1} ----", this, state)); + } + + public boolean readyToStop() { + return state == State.DORMANT && + !mailbox.hasAMessage(); + } + + @Override + protected void onPerceive() { + switch (state) { + case DORMANT -> { + var receivedInformCandidateSimilarClusterAgentsMessage = this.getMailbox().read(InformCandidateSimilarClusterAgentsMessage.class); + if (receivedInformCandidateSimilarClusterAgentsMessage.isPresent()) { + logger.info(this+ " received a list of candidates similar cluster agents"); + receivedCandidateSimilarClusters.addAll(receivedInformCandidateSimilarClusterAgentsMessage.get().getCandidateSimilarCluster()); + return; + } + var receivedRequestSimilarityMessage = this.getMailbox().read(RequestSimilarityMessage.class); + if (receivedRequestSimilarityMessage.isPresent()) { + logger.info(this+ " received a request for similarity"); + RequestSimilarityMessage<T> message = receivedRequestSimilarityMessage.get(); + receivedDataPointForRequestSimilarity = message.getDataPoint(); + receivedRequestSimilarityRequester = message.getSender(); + return; + } else { + receivedDataPointForRequestSimilarity = null; + receivedRequestSimilarityRequester = null; + } + } + } + } + + + + @Override + protected void onDecide() { + switch (state) { + case DORMANT -> { + decideToDie = false; + if (receivedDataPointForRequestSimilarity == null && amas.getMasSettings().amasOptions().contains(AMASOption.Forget)) { + // TODO decide to die logic + + } + + } + case ACTIVE -> { + if (receivedRequestSimilarityRequester != null) { + computedSimilarityScoreDuringThisCycle = computeSimilarityScore(receivedDataPointForRequestSimilarity); + if (logger.isLoggable(Level.INFO)) + logger.info(this + " computed a similarity of " + computedSimilarityScoreDuringThisCycle + " with " + receivedRequestSimilarityRequester); + } + } + } + } + + @Override + protected void onAct() { + switch (state) { + case DORMANT -> { + if (receivedDataPointForRequestSimilarity != null) + nextState = State.ACTIVE; + else if (!receivedCandidateSimilarClusters.isEmpty()) { + for (var otherCluster : receivedCandidateSimilarClusters) { + absorb(otherCluster); + } + receivedCandidateSimilarClusters.clear(); + } else if (decideToDie) { + die(); + } + } + case ACTIVE -> { + if (receivedRequestSimilarityRequester != null) { + var message = new EvaluatedScoreMessage<T>(this, cluster, computedSimilarityScoreDuringThisCycle); + if (logger.isLoggable(Level.INFO)) + logger.info(this + " --> " + receivedRequestSimilarityRequester + " " + message); + receivedRequestSimilarityRequester.getMailbox().receive(message); + receivedRequestSimilarityRequester = null; + + if (computedSimilarityScoreDuringThisCycle >= getAmas().getMasSettings().fusionThreshold()) { + die(); + } else { + nextState = State.DORMANT; + } + } + } + } + applyNextState(); + } + + private void die() { + nextState = State.DEAD; + destroy(); + } + + private void applyNextState() { + if (state == nextState) { + amountOfCyclesWithTheSameState++; + if (amountOfCyclesWithTheSameState > 10 && state == State.ACTIVE) + throw new RuntimeException(this + " is in the state " + state + " for too long"); + } else { + amountOfCyclesWithTheSameState = 0; + } + state = nextState; + } + + private void absorb(Cluster<T> other) { + if (logger.isLoggable(Level.INFO)) + logger.info(this + " absorbs " + other); + + T newRepresentative = (T) amas.getMasSettings().dataPointFuser().apply(cluster.getRepresentative(), other.getRepresentative()); + Cluster<T> newCluster; + if (getAmas().getMasSettings().amasOptions().contains(AMASOption.KeepAllDataPoints)) + newCluster = new ExtendedCluster<>(newRepresentative, cluster, other); + else + newCluster = new Cluster<T>(newRepresentative, cluster, other); + setCluster(newCluster); + } + + private void setCluster(Cluster<T> newCluster) { + if (cluster != null) { + getAmas().getEnvironment().removeFromBucket(cluster.getRepresentative().getBucketId(), this); + if (getAmas().getMasSettings().eventStore() != null) + getAmas().getMasSettings().eventStore().add(new RemoveClusterEvent(cluster)); + } + + cluster = newCluster; + getAmas().getEnvironment().assignToBucket(cluster.getRepresentative().getBucketId(), this); + if (getAmas().getMasSettings().eventStore() != null) + getAmas().getMasSettings().eventStore().add(new NewClusterEvent(cluster)); + } + + @Override + protected void onDestroy() { + if (cluster != null) + if (getAmas().getMasSettings().eventStore() != null) + getAmas().getMasSettings().eventStore().add(new RemoveClusterEvent(cluster)); + } + + + + private float computeSimilarityScore(T other) { + return getAmas().getMasSettings().similarityScoreMethod().apply(this.cluster.getRepresentative(), other); + } +} diff --git a/src/main/java/fr/irit/smac/amas4dc/amas/agent/DataPointAgent.java b/src/main/java/fr/irit/smac/amas4dc/amas/agent/DataPointAgent.java new file mode 100644 index 0000000..c331a3f --- /dev/null +++ b/src/main/java/fr/irit/smac/amas4dc/amas/agent/DataPointAgent.java @@ -0,0 +1,149 @@ +package fr.irit.smac.amas4dc.amas.agent; + +import fr.irit.smac.amak.Agent; +import fr.irit.smac.amas4dc.amas.DynamicClusteringAMAS; +import fr.irit.smac.amas4dc.amas.DynamicClusteringEnvironment; +import fr.irit.smac.amas4dc.amas.messages.EvaluatedScoreMessage; +import fr.irit.smac.amas4dc.amas.messages.InformCandidateSimilarClusterAgentsMessage; +import fr.irit.smac.amas4dc.amas.messages.RequestSimilarityMessage; +import fr.irit.smac.amas4dc.cluster.Cluster; +import fr.irit.smac.amas4dc.cluster.DataPoint; + +import java.text.MessageFormat; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.logging.Level; +import java.util.logging.Logger; + +public class DataPointAgent<T extends DataPoint> extends Agent<DynamicClusteringAMAS<T>, DynamicClusteringEnvironment<T>> { + private static final float FLOAT_COMPARISON_EPSILON = 0.000001f; + private final T dataPoint; + private final List<ClusterAgent<T>> requestSimilarityMessageSentAgents = new ArrayList<>(); + private final Map<ClusterAgent<T>, ClusterSimilarityScore<T>> similarityScoresReceived = new HashMap<>(); + private static final Logger logger = Logger.getLogger(DataPointAgent.class.getName()); + private boolean decideToFuse, decideToCreateANewCluster; + + public DataPointAgent(DynamicClusteringAMAS<T> amas, T dataPoint) { + super(amas); + logger.info(this + " created"); + this.dataPoint = dataPoint; + } + + + private boolean isRoughlySimilarTo(ClusterAgent<T> otherClusterAgent) { + return getAmas().getMasSettings().similarityScoreMethod().areRoughlySimilar(dataPoint, otherClusterAgent.getCluster().getRepresentative()); + } + + @Override + protected void onReady() { + super.onReady(); + var clusterAgentsRoughlySimilarOnReady = new ArrayList<ClusterAgent<T>>(); + for (var other : amas.getEnvironment().getClusterAgentsFromBucket(dataPoint.getBucketId())) { + if (isRoughlySimilarTo(other)) { + clusterAgentsRoughlySimilarOnReady.add(other); + } + } + for (var agent : clusterAgentsRoughlySimilarOnReady) { + if (logger.isLoggable(Level.INFO)) + logger.info(this + " sends RequestSimilarityMessage to " + agent); + agent.getMailbox().receive(new RequestSimilarityMessage<T>(this, dataPoint)); + requestSimilarityMessageSentAgents.add(agent); + } + } + + @Override + protected void onPerceive() { + while (mailbox.hasMessageOfType(EvaluatedScoreMessage.class)) { + var evaluatedScoreMessage = this.getMailbox().read(EvaluatedScoreMessage.class); + if (evaluatedScoreMessage.isPresent()) { + similarityScoresReceived.put((ClusterAgent<T>) evaluatedScoreMessage.get().getSender(), new ClusterSimilarityScore<T>(evaluatedScoreMessage.get().getCluster(), evaluatedScoreMessage.get().getSimilarityScore())); + } + } + clearDeadAgentsFromThoseWeExpectAnAnswerFrom(); + } + + @Override + protected void onDecide() { + decideToFuse = false; + decideToCreateANewCluster = false; + if (requestSimilarityMessageSentAgents.size() == similarityScoresReceived.size()) { + if (logger.isLoggable(Level.INFO)) { + for (var e : similarityScoresReceived.entrySet()) { + logger.info(this + " received similarity score " + e.getValue() + " from " + e.getKey()); + } + } + if (similarityScoresReceived.values().stream().anyMatch(v -> v.score >= getAmas().getMasSettings().fusionThreshold())) + decideToFuse = true; + else + decideToCreateANewCluster = true; + + if (logger.isLoggable(Level.INFO)) { + if (decideToFuse) + logger.info(this + " decides to fuse with one of them"); + else if (decideToCreateANewCluster) + logger.info(this + " decides to create a new cluster"); + else + logger.info(this + " decides NOT to fuse not with one of them"); + } + } + } + + @Override + protected void onAct() { + if (decideToFuse) { + if (logger.isLoggable(Level.INFO)) + logger.info(this + " decides to fuse"); + var requestedAgentsWithHighestSimilarityScore = getReceivedClustersWithSimilarityScoreAboveFusionThreshold(similarityScoresReceived); + if (!requestedAgentsWithHighestSimilarityScore.isEmpty()) { + var newClusterAgent = new ClusterAgent<T>(amas, dataPoint); + newClusterAgent.getMailbox().receive(new InformCandidateSimilarClusterAgentsMessage<T>(this, requestedAgentsWithHighestSimilarityScore)); + destroy(); + } else { + throw new RuntimeException("Wanted to fuse with an agent but no agents are available"); + } + } else if (decideToCreateANewCluster) { + new ClusterAgent<T>(amas, dataPoint); + destroy(); + } + } + + @Override + protected void onDestroy() { + logger.info(MessageFormat.format("{0} dies", this)); + } + + private List<Cluster<T>> getReceivedClustersWithSimilarityScoreAboveFusionThreshold(Map<ClusterAgent<T>, ClusterSimilarityScore<T>> similarityScoresReceived) { + float highestScore = Float.MIN_VALUE; + List<Cluster<T>> clustersWithHighestScore = new ArrayList<>(); + for (var e : similarityScoresReceived.entrySet()) { + float clusterScore = e.getValue().score; + if (clusterScore >= getAmas().getMasSettings().fusionThreshold()) { + Cluster<T> cluster = e.getValue().cluster; + if (Math.abs(clusterScore - highestScore) < FLOAT_COMPARISON_EPSILON) { + clustersWithHighestScore.add(cluster); + } else if (clusterScore > highestScore) { + clustersWithHighestScore.clear(); + clustersWithHighestScore.add(cluster); + highestScore = clusterScore; + } + } + } + return clustersWithHighestScore; + } + + private void clearDeadAgentsFromThoseWeExpectAnAnswerFrom() { + requestSimilarityMessageSentAgents.removeIf(a -> a.getState() == ClusterAgent.State.DEAD && !similarityScoresReceived.keySet().contains(a)); + for (var key : + similarityScoresReceived.keySet()) { + if (!requestSimilarityMessageSentAgents.contains(key)) + similarityScoresReceived.remove(key); + } + } + + + private record ClusterSimilarityScore<T extends DataPoint>(Cluster<T> cluster, float score) { + + } +} diff --git a/src/main/java/fr/irit/smac/amas4dc/amas/controller/AMAS4DCCommandAndQueryHandler.java b/src/main/java/fr/irit/smac/amas4dc/amas/controller/AMAS4DCCommandAndQueryHandler.java index 161e4f1..f984f25 100644 --- a/src/main/java/fr/irit/smac/amas4dc/amas/controller/AMAS4DCCommandAndQueryHandler.java +++ b/src/main/java/fr/irit/smac/amas4dc/amas/controller/AMAS4DCCommandAndQueryHandler.java @@ -2,6 +2,7 @@ package fr.irit.smac.amas4dc.amas.controller; import fr.irit.smac.amak.scheduling.Scheduler; import fr.irit.smac.amas4dc.amas.*; +import fr.irit.smac.amas4dc.amas.agent.ClusterAgent; import fr.irit.smac.amas4dc.amas.controller.command.NewDataPointCommand; import fr.irit.smac.amas4dc.amas.controller.command.ShutdownCommand; import fr.irit.smac.amas4dc.amas.controller.command.SolveCommand; diff --git a/src/main/java/fr/irit/smac/amas4dc/amas/messages/EvaluatedScoreMessage.java b/src/main/java/fr/irit/smac/amas4dc/amas/messages/EvaluatedScoreMessage.java index f2a381b..9875556 100644 --- a/src/main/java/fr/irit/smac/amas4dc/amas/messages/EvaluatedScoreMessage.java +++ b/src/main/java/fr/irit/smac/amas4dc/amas/messages/EvaluatedScoreMessage.java @@ -2,16 +2,22 @@ package fr.irit.smac.amas4dc.amas.messages; import fr.irit.smac.amak.Agent; import fr.irit.smac.amak.messaging.Message; +import fr.irit.smac.amas4dc.amas.agent.ClusterAgent; +import fr.irit.smac.amas4dc.cluster.Cluster; +import fr.irit.smac.amas4dc.cluster.DataPoint; import lombok.Getter; import lombok.ToString; @ToString -public class EvaluatedScoreMessage extends Message { +public class EvaluatedScoreMessage<T extends DataPoint> extends Message<ClusterAgent<T>> { + @Getter + private final Cluster<T> cluster; @Getter private final float similarityScore; - public EvaluatedScoreMessage(Agent sender, float similarityScore) { + public EvaluatedScoreMessage(ClusterAgent<T> sender, Cluster<T> cluster, float similarityScore) { super(sender); + this.cluster = cluster; this.similarityScore = similarityScore; } } diff --git a/src/main/java/fr/irit/smac/amas4dc/amas/messages/InformCandidateSimilarClusterAgentsMessage.java b/src/main/java/fr/irit/smac/amas4dc/amas/messages/InformCandidateSimilarClusterAgentsMessage.java new file mode 100644 index 0000000..675ce92 --- /dev/null +++ b/src/main/java/fr/irit/smac/amas4dc/amas/messages/InformCandidateSimilarClusterAgentsMessage.java @@ -0,0 +1,19 @@ +package fr.irit.smac.amas4dc.amas.messages; + +import fr.irit.smac.amak.messaging.Message; +import fr.irit.smac.amas4dc.amas.agent.DataPointAgent; +import fr.irit.smac.amas4dc.cluster.Cluster; +import fr.irit.smac.amas4dc.cluster.DataPoint; +import lombok.Getter; + +import java.util.List; + +public class InformCandidateSimilarClusterAgentsMessage<T extends DataPoint> extends Message<DataPointAgent<T>> { + @Getter + private final List<Cluster<T>> candidateSimilarCluster; + + public InformCandidateSimilarClusterAgentsMessage(DataPointAgent<T> sender, List<Cluster<T>> candidateSimilarCluster) { + super(sender); + this.candidateSimilarCluster = candidateSimilarCluster; + } +} diff --git a/src/main/java/fr/irit/smac/amas4dc/amas/messages/RequestSimilarityMessage.java b/src/main/java/fr/irit/smac/amas4dc/amas/messages/RequestSimilarityMessage.java index f2846dd..8e6e694 100644 --- a/src/main/java/fr/irit/smac/amas4dc/amas/messages/RequestSimilarityMessage.java +++ b/src/main/java/fr/irit/smac/amas4dc/amas/messages/RequestSimilarityMessage.java @@ -1,16 +1,18 @@ package fr.irit.smac.amas4dc.amas.messages; +import fr.irit.smac.amas4dc.amas.agent.DataPointAgent; import fr.irit.smac.amas4dc.cluster.Cluster; import fr.irit.smac.amak.Agent; import fr.irit.smac.amak.messaging.Message; +import fr.irit.smac.amas4dc.cluster.DataPoint; import lombok.Getter; -public class RequestSimilarityMessage extends Message { +public class RequestSimilarityMessage<T extends DataPoint> extends Message<DataPointAgent<T>> { @Getter - private final Cluster cluster; + private final T dataPoint; - public RequestSimilarityMessage(Agent sender, Cluster cluster) { + public RequestSimilarityMessage(DataPointAgent<T> sender, T dataPoint) { super(sender); - this.cluster = cluster; + this.dataPoint = dataPoint; } } diff --git a/src/main/java/fr/irit/smac/amas4dc/cluster/Cluster.java b/src/main/java/fr/irit/smac/amas4dc/cluster/Cluster.java index 18bb680..ede936d 100644 --- a/src/main/java/fr/irit/smac/amas4dc/cluster/Cluster.java +++ b/src/main/java/fr/irit/smac/amas4dc/cluster/Cluster.java @@ -27,5 +27,10 @@ public class Cluster<T extends DataPoint> { this.representative = representative; this.size = c1.size + c2.size; } + + public Cluster(T representative, T newDataPoint, Cluster<T> c2) { + this.representative = representative; + this.size = 1 + c2.size; + } } diff --git a/src/main/java/fr/irit/smac/amas4dc/cluster/ExtendedCluster.java b/src/main/java/fr/irit/smac/amas4dc/cluster/ExtendedCluster.java index 241b121..d2bc0c0 100644 --- a/src/main/java/fr/irit/smac/amas4dc/cluster/ExtendedCluster.java +++ b/src/main/java/fr/irit/smac/amas4dc/cluster/ExtendedCluster.java @@ -26,4 +26,17 @@ public class ExtendedCluster<T extends DataPoint> extends Cluster<T> { this.content = Collections.unmodifiableList(content); } + + public ExtendedCluster(T representative, T newDataPoint, Cluster<T> c2) { + super(representative, newDataPoint, c2); + + if (!(c2 instanceof ExtendedCluster<T> ec2)) + throw new ExtendedClusterException("ExtendedCluster can only be made out of an ExtendedCluster object"); + + var content = new ArrayList<T>(); + content.add(newDataPoint); + content.addAll(ec2.getContent()); + + this.content = Collections.unmodifiableList(content); + } } diff --git a/src/test/groovy/fr/irit/smac/amas4dc/amas/ClusterAgentTest.groovy b/src/test/groovy/fr/irit/smac/amas4dc/amas/ClusterAgentTest.groovy index c858fd5..47f6e4b 100644 --- a/src/test/groovy/fr/irit/smac/amas4dc/amas/ClusterAgentTest.groovy +++ b/src/test/groovy/fr/irit/smac/amas4dc/amas/ClusterAgentTest.groovy @@ -2,6 +2,8 @@ package fr.irit.smac.amas4dc.amas import fr.irit.smac.amak.messaging.Mailbox +import fr.irit.smac.amas4dc.amas.agent.ClusterAgent +import fr.irit.smac.amas4dc.amas.agent.DataPointAgent import fr.irit.smac.amas4dc.amas.messages.EvaluatedScoreMessage import fr.irit.smac.amas4dc.amas.messages.RequestSimilarityMessage import fr.irit.smac.amas4dc.cluster.Cluster @@ -30,11 +32,11 @@ class ClusterAgentTest extends Specification { requesterMailboxMock.receive(_) >> { msg -> messageReceivedByRequester.set(msg) } - def requesterAgentMock = Mock(ClusterAgent) + def requesterAgentMock = Mock(DataPointAgent) requesterAgentMock.getMailbox() >> requesterMailboxMock when: - agent.mailbox.receive(new RequestSimilarityMessage(requesterAgentMock, Mock(Cluster))) + agent.mailbox.receive(new RequestSimilarityMessage(requesterAgentMock, Mock(DataPoint))) agent.cycle() agent.cycle() -- GitLab