diff --git a/build.gradle b/build.gradle index bf1a5fd186c16a465a7ea492ba18510391f07b2e..483573198dba99a353faad01663d2f35b58536db 100644 --- a/build.gradle +++ b/build.gradle @@ -31,7 +31,7 @@ tasks.register('experimentsTest', Test) { dependencies { // AMAK (Only one of the next two lines should be uncommented. Also, settings.gradle file should also be modified) - implementation 'com.github.alexandreprl:amak:3.1.0' // Uncomment this line to get AMAK from git repository (The last part matches a tag, a commit hash or the last commit of a branch : branchName-SNAPSHOT + implementation 'com.github.alexandreprl:amak:3.1.3' // Uncomment this line to get AMAK from git repository (The last part matches a tag, a commit hash or the last commit of a branch : branchName-SNAPSHOT // implementation project(':amak') // Uncomment this line to get AMAK from local implementation 'com.github.alexandreprl:lxplot:main-SNAPSHOT' diff --git a/src/main/java/fr/irit/smac/amas4dc/amas/ClusterAgent.java b/src/main/java/fr/irit/smac/amas4dc/amas/ClusterAgent.java deleted file mode 100644 index 85037cfcbc32edf86eff9ab32f7a51446c51fcca..0000000000000000000000000000000000000000 --- a/src/main/java/fr/irit/smac/amas4dc/amas/ClusterAgent.java +++ /dev/null @@ -1,302 +0,0 @@ -package fr.irit.smac.amas4dc.amas; - -import fr.irit.smac.amak.Agent; -import fr.irit.smac.amas4dc.amas.messages.EvaluatedScoreMessage; -import fr.irit.smac.amas4dc.amas.messages.RequestSimilarityMessage; -import fr.irit.smac.amas4dc.cluster.Cluster; -import fr.irit.smac.amas4dc.cluster.DataPoint; -import fr.irit.smac.amas4dc.cluster.ExtendedCluster; -import fr.irit.smac.amas4dc.event.NewClusterEvent; -import fr.irit.smac.amas4dc.event.RemoveClusterEvent; -import lombok.Getter; - -import java.text.MessageFormat; -import java.util.*; -import java.util.logging.Level; -import java.util.logging.Logger; - -public class ClusterAgent<T extends DataPoint> extends Agent<DynamicClusteringAMAS<T>, DynamicClusteringEnvironment<T>> { - private static final float FLOAT_COMPARISON_EPSILON = 0.000001f; - @Getter - private Cluster<T> cluster; - private State state = State.INIT; - private State nextState = state; - - //region Perception variables - private Cluster<T> receivedClusterForRequestSimilarity; - private ClusterAgent<T> receivedRequestSimilarityRequester; - private final Map<ClusterAgent<T>, Float> similarityScoresReceived = new HashMap<>(); - //endregion - - //region Decision variables - private boolean decideToFuse; - private boolean decideToDie; - private float computedSimilarityScoreDuringThisCycle; - //endregion - - //region Action variables - private final List<ClusterAgent<T>> requestSimilarityMessageSentAgents = new ArrayList<>(); - //endregion - - private final List<ClusterAgent<T>> clusterAgentsRoughlySimilarOnReady = new ArrayList<>(); - private final Random random = new Random(); - private static final Logger logger = Logger.getLogger(ClusterAgent.class.getName()); - private int amountOfCyclesWithTheSameState; - - protected ClusterAgent(DynamicClusteringAMAS amas, T dataPoint) { - super(amas); - if (amas.getMasSettings().amasOptions().contains(AMASOption.KeepAllDataPoints)) - this.setCluster(new ExtendedCluster<T>(dataPoint)); - else - this.setCluster(new Cluster<T>(dataPoint)); - } - - public enum State {DORMANT, WAITING_FOR_REPLY, DECIDING, INIT, DEAD, ACTIVE} - - public State getState() { - return state; - } - - @Override - protected void onReady() { - for (var other : - amas.getEnvironment().getAgentsFromBucket(cluster.getRepresentative().getBucketId())) { - if (other != this) { - if (isRoughlySimilarTo(other)) { - clusterAgentsRoughlySimilarOnReady.add(other); - } - } - } - } - - private boolean isRoughlySimilarTo(ClusterAgent<T> otherClusterAgent) { - return getAmas().getMasSettings().similarityScoreMethod().areRoughlySimilar(cluster.getRepresentative(), otherClusterAgent.cluster.getRepresentative()); - } - - @Override - protected void onAgentCycleEnd() { - if (state != State.DORMANT) - if (logger.isLoggable(Level.INFO)) - logger.info(MessageFormat.format("---- End {0} cycle state= {1} ----", this, state)); - } - - public boolean readyToStop() { - return state == State.DORMANT && !mailbox.hasMessageOfType(RequestSimilarityMessage.class); - } - - @Override - protected void onPerceive() { - switch (state) { - case DORMANT -> { - var receivedRequestSimilarityMessage = this.getMailbox().read(RequestSimilarityMessage.class); - if (receivedRequestSimilarityMessage.isPresent()) { - receivedClusterForRequestSimilarity = receivedRequestSimilarityMessage.get().getCluster(); - receivedRequestSimilarityRequester = (ClusterAgent) receivedRequestSimilarityMessage.get().getSender(); - } else { - receivedClusterForRequestSimilarity = null; - receivedRequestSimilarityRequester = null; - } - } - case WAITING_FOR_REPLY -> { - while (mailbox.hasMessageOfType(EvaluatedScoreMessage.class)) { - var evaluatedScoreMessage = this.getMailbox().read(EvaluatedScoreMessage.class); - if (evaluatedScoreMessage.isPresent()) { - similarityScoresReceived.put((ClusterAgent) evaluatedScoreMessage.get().getSender(), evaluatedScoreMessage.get().getSimilarityScore()); - } - } - clearDeadAgentsFromThoseWeExpectAnAnswerFrom(); - - } - } - } - - private void clearDeadAgentsFromThoseWeExpectAnAnswerFrom() { - var itr = requestSimilarityMessageSentAgents.iterator(); - while (itr.hasNext()) { - var a = itr.next(); - if (a.state == State.DEAD && !similarityScoresReceived.keySet().contains(a)) - itr.remove(); - } - for (var key : - similarityScoresReceived.keySet()) { - if (!requestSimilarityMessageSentAgents.contains(key)) - similarityScoresReceived.remove(key); - } - } - - @Override - protected void onDecide() { - switch (state) { - case DORMANT -> { - decideToDie = false; - if (receivedClusterForRequestSimilarity == null && amas.getMasSettings().amasOptions().contains(AMASOption.Forget)) { - // TODO decide to die logic - - } - } - case DECIDING -> { - decideToFuse = false; - if (logger.isLoggable(Level.INFO)) { - for (var e : - similarityScoresReceived.entrySet()) { - logger.info(this + " received similarity score " + e.getValue() + " from " + e.getKey()); - } - } - if (similarityScoresReceived.values().stream().anyMatch(v -> v >= getAmas().getMasSettings().fusionThreshold())) - decideToFuse = true; - - if (logger.isLoggable(Level.INFO)) { - if (decideToFuse) - logger.info(this + " decides to fuse with one of them"); - else - logger.info(this + " decides NOT to fuse with one of them"); - } - } - case ACTIVE -> { - if (receivedRequestSimilarityRequester != null) { - computedSimilarityScoreDuringThisCycle = computeSimilarityScore(receivedClusterForRequestSimilarity); - if (logger.isLoggable(Level.INFO)) - logger.info(this + " computed a similarity of " + computedSimilarityScoreDuringThisCycle + " with " + receivedRequestSimilarityRequester); - } - } - } - } - - @Override - protected void onAct() { - switch (state) { - case DORMANT -> { - if (receivedClusterForRequestSimilarity != null) - nextState = State.ACTIVE; - else if (decideToDie) { - die(); - } - } - case INIT -> { - similarityScoresReceived.clear(); - requestSimilarityMessageSentAgents.clear(); - for (var agent : clusterAgentsRoughlySimilarOnReady) { - if (logger.isLoggable(Level.INFO)) - logger.info(this + " sends RequestSimilarityMessage to " + agent); - agent.getMailbox().receive(new RequestSimilarityMessage(this, cluster)); - requestSimilarityMessageSentAgents.add(agent); - } - - if (clusterAgentsRoughlySimilarOnReady.isEmpty()) - nextState = State.DORMANT; - else if (!requestSimilarityMessageSentAgents.isEmpty()) - nextState = State.WAITING_FOR_REPLY; - } - case ACTIVE -> { - if (receivedRequestSimilarityRequester != null) { - var message = new EvaluatedScoreMessage(this, computedSimilarityScoreDuringThisCycle); - if (logger.isLoggable(Level.INFO)) - logger.info(this + " --> " + receivedRequestSimilarityRequester + " " + message); - receivedRequestSimilarityRequester.getMailbox().receive(message); - receivedRequestSimilarityRequester = null; - - if (computedSimilarityScoreDuringThisCycle >= getAmas().getMasSettings().fusionThreshold()) { - die(); - } else { - nextState = State.DORMANT; - } - } - } - case DECIDING -> { - if (decideToFuse) { - if (logger.isLoggable(Level.INFO)) - logger.info(this + " decides to fuse"); - var requestedAgentsWithHighestSimilarityScore = getRequestedAgentsWithSimilarityScoreAboveFusionThreshold(similarityScoresReceived); - if (!requestedAgentsWithHighestSimilarityScore.isEmpty()) { - for (var e : similarityScoresReceived.entrySet()) { - if (requestedAgentsWithHighestSimilarityScore.contains(e.getKey())) { - fuse(e.getKey()); - } - } - } else { - throw new RuntimeException("Wanted to fuse with an agent but no agents are available"); - } - } - nextState = State.DORMANT; - } - case WAITING_FOR_REPLY -> { - if (requestSimilarityMessageSentAgents.size() == similarityScoresReceived.size()) - nextState = State.DECIDING; - } - } - applyNextState(); - } - - private void die() { - nextState = State.DEAD; - destroy(); - } - - private void applyNextState() { - if (state == nextState) { - amountOfCyclesWithTheSameState++; - if (amountOfCyclesWithTheSameState > 10 && state == State.ACTIVE) - throw new RuntimeException(this + " is in the state " + state + " for too long"); - } else { - amountOfCyclesWithTheSameState = 0; - } - state = nextState; - } - - private void fuse(ClusterAgent<T> other) { - if (logger.isLoggable(Level.INFO)) - logger.info(this + " fuses with " + other); - - setCluster(fuse(other.cluster)); - } - - private void setCluster(Cluster<T> newCluster) { - if (cluster != null) { - getAmas().getEnvironment().removeFromBucket(cluster.getRepresentative().getBucketId(), this); - if (getAmas().getMasSettings().eventStore() != null) - getAmas().getMasSettings().eventStore().add(new RemoveClusterEvent(cluster)); - } - - cluster = newCluster; - getAmas().getEnvironment().assignToBucket(cluster.getRepresentative().getBucketId(), this); - if (getAmas().getMasSettings().eventStore() != null) - getAmas().getMasSettings().eventStore().add(new NewClusterEvent(cluster)); - } - - @Override - protected void onDestroy() { - if (cluster != null) - if (getAmas().getMasSettings().eventStore() != null) - getAmas().getMasSettings().eventStore().add(new RemoveClusterEvent(cluster)); - } - - private Cluster<T> fuse(Cluster<T> other) { - T newRepresentative = (T) amas.getMasSettings().dataPointFuser().apply(cluster.getRepresentative(), other.getRepresentative()); - if (getAmas().getMasSettings().amasOptions().contains(AMASOption.KeepAllDataPoints)) - return new ExtendedCluster<>(newRepresentative, cluster, other); - else - return new Cluster<T>(newRepresentative, cluster, other); - } - - private List<ClusterAgent<T>> getRequestedAgentsWithSimilarityScoreAboveFusionThreshold(Map<ClusterAgent<T>, Float> similarityScoresReceived) { - float highestScore = Float.MIN_VALUE; - List<ClusterAgent<T>> agentsWithHighestScore = new ArrayList<>(); - for (var e : similarityScoresReceived.entrySet()) { - if (e.getValue() >= getAmas().getMasSettings().fusionThreshold()) { - if (Math.abs(e.getValue() - highestScore) < FLOAT_COMPARISON_EPSILON) { - agentsWithHighestScore.add(e.getKey()); - } else if (e.getValue() > highestScore) { - agentsWithHighestScore.clear(); - agentsWithHighestScore.add(e.getKey()); - highestScore = e.getValue(); - } - } - } - return agentsWithHighestScore; - } - - - private float computeSimilarityScore(Cluster<T> other) { - return getAmas().getMasSettings().similarityScoreMethod().apply(this.cluster.getRepresentative(), other.getRepresentative()); - } -} diff --git a/src/main/java/fr/irit/smac/amas4dc/amas/DynamicClusteringAMAS.java b/src/main/java/fr/irit/smac/amas4dc/amas/DynamicClusteringAMAS.java index 1ec5d63c109e4cbd0e55a4c1de8da448efe1df48..a1e3aa2247f47f6b1e2d436095f87f5bb99ec103 100644 --- a/src/main/java/fr/irit/smac/amas4dc/amas/DynamicClusteringAMAS.java +++ b/src/main/java/fr/irit/smac/amas4dc/amas/DynamicClusteringAMAS.java @@ -1,6 +1,8 @@ package fr.irit.smac.amas4dc.amas; import fr.irit.smac.amak.Amas; +import fr.irit.smac.amas4dc.amas.agent.ClusterAgent; +import fr.irit.smac.amas4dc.amas.agent.DataPointAgent; import fr.irit.smac.amas4dc.cluster.DataPoint; import lombok.Getter; import lombok.Setter; @@ -27,7 +29,7 @@ public class DynamicClusteringAMAS<T extends DataPoint> extends Amas<DynamicClus logger.info("---- BEGIN MAS CYCLE ----"); if (agents.isEmpty() || allAgentsAreDormant()) { var newDataPoint = environment.pollPendingDataPoint(); - newDataPoint.ifPresent(dataPoint -> new ClusterAgent<T>(this, dataPoint)); + newDataPoint.ifPresent(dataPoint -> new DataPointAgent<T>(this, dataPoint)); } } @@ -46,7 +48,8 @@ public class DynamicClusteringAMAS<T extends DataPoint> extends Amas<DynamicClus @Override public boolean stopCondition() { - return !environment.hasRemainingPendingAdditionDataPoints() && agents.stream() - .noneMatch(agent -> agent instanceof ClusterAgent clusterAgent && !clusterAgent.readyToStop()); + return !environment.hasRemainingPendingAdditionDataPoints() && + agents.stream().noneMatch(agent -> agent instanceof ClusterAgent clusterAgent && !clusterAgent.readyToStop()) && + agents.stream().noneMatch(agent -> agent instanceof DataPointAgent); } } diff --git a/src/main/java/fr/irit/smac/amas4dc/amas/DynamicClusteringEnvironment.java b/src/main/java/fr/irit/smac/amas4dc/amas/DynamicClusteringEnvironment.java index e51bceaa64a117c86b7643facfd91c7a2434ca5e..83f61c6a46c74dfd5c6c85ffadc7ba654fb8c1de 100644 --- a/src/main/java/fr/irit/smac/amas4dc/amas/DynamicClusteringEnvironment.java +++ b/src/main/java/fr/irit/smac/amas4dc/amas/DynamicClusteringEnvironment.java @@ -1,6 +1,7 @@ package fr.irit.smac.amas4dc.amas; import fr.irit.smac.amak.Environment; +import fr.irit.smac.amas4dc.amas.agent.ClusterAgent; import fr.irit.smac.amas4dc.cluster.DataPoint; import lombok.Getter; @@ -42,7 +43,7 @@ public class DynamicClusteringEnvironment<T extends DataPoint> extends Environme this.buckets.get(bucketId).remove(tClusterAgent); } - public List<ClusterAgent<T>> getAgentsFromBucket(int bucketId) { + public List<ClusterAgent<T>> getClusterAgentsFromBucket(int bucketId) { return this.buckets.getOrDefault(bucketId, new ArrayList<>()); } } diff --git a/src/main/java/fr/irit/smac/amas4dc/amas/agent/ClusterAgent.java b/src/main/java/fr/irit/smac/amas4dc/amas/agent/ClusterAgent.java new file mode 100644 index 0000000000000000000000000000000000000000..75fc04165970f85794e61467ce082489bd52d987 --- /dev/null +++ b/src/main/java/fr/irit/smac/amas4dc/amas/agent/ClusterAgent.java @@ -0,0 +1,209 @@ +package fr.irit.smac.amas4dc.amas.agent; + +import fr.irit.smac.amak.Agent; +import fr.irit.smac.amas4dc.amas.AMASOption; +import fr.irit.smac.amas4dc.amas.DynamicClusteringAMAS; +import fr.irit.smac.amas4dc.amas.DynamicClusteringEnvironment; +import fr.irit.smac.amas4dc.amas.messages.EvaluatedScoreMessage; +import fr.irit.smac.amas4dc.amas.messages.InformCandidateSimilarClusterAgentsMessage; +import fr.irit.smac.amas4dc.amas.messages.RequestSimilarityMessage; +import fr.irit.smac.amas4dc.cluster.Cluster; +import fr.irit.smac.amas4dc.cluster.DataPoint; +import fr.irit.smac.amas4dc.cluster.ExtendedCluster; +import fr.irit.smac.amas4dc.event.NewClusterEvent; +import fr.irit.smac.amas4dc.event.RemoveClusterEvent; +import lombok.Getter; + +import java.text.MessageFormat; +import java.util.*; +import java.util.logging.Level; +import java.util.logging.Logger; + +public class ClusterAgent<T extends DataPoint> extends Agent<DynamicClusteringAMAS<T>, DynamicClusteringEnvironment<T>> { + @Getter + private Cluster<T> cluster; + private State state = State.DORMANT; + private State nextState = state; + + //region Perception variables + private T receivedDataPointForRequestSimilarity; + private DataPointAgent<T> receivedRequestSimilarityRequester; + //endregion + + //region Decision variables + private boolean decideToDie; + private float computedSimilarityScoreDuringThisCycle; + //endregion + + //region Action variables + //endregion + private final Random random = new Random(); + private static final Logger logger = Logger.getLogger(ClusterAgent.class.getName()); + private int amountOfCyclesWithTheSameState; + private final List<Cluster<T>> receivedCandidateSimilarClusters = new ArrayList<>(); + + protected ClusterAgent(DynamicClusteringAMAS amas, T dataPoint) { + super(amas); + logger.info(this+" created"); + if (amas.getMasSettings().amasOptions().contains(AMASOption.KeepAllDataPoints)) + this.setCluster(new ExtendedCluster<T>(dataPoint)); + else + this.setCluster(new Cluster<T>(dataPoint)); + } + + public enum State {DORMANT, DECIDING, DEAD, ACTIVE} + + public State getState() { + return state; + } + + + @Override + protected void onAgentCycleEnd() { + if (state != State.DORMANT) + if (logger.isLoggable(Level.INFO)) + logger.info(MessageFormat.format("---- End {0} cycle state= {1} ----", this, state)); + } + + public boolean readyToStop() { + return state == State.DORMANT && + !mailbox.hasAMessage(); + } + + @Override + protected void onPerceive() { + switch (state) { + case DORMANT -> { + var receivedInformCandidateSimilarClusterAgentsMessage = this.getMailbox().read(InformCandidateSimilarClusterAgentsMessage.class); + if (receivedInformCandidateSimilarClusterAgentsMessage.isPresent()) { + logger.info(this+ " received a list of candidates similar cluster agents"); + receivedCandidateSimilarClusters.addAll(receivedInformCandidateSimilarClusterAgentsMessage.get().getCandidateSimilarCluster()); + return; + } + var receivedRequestSimilarityMessage = this.getMailbox().read(RequestSimilarityMessage.class); + if (receivedRequestSimilarityMessage.isPresent()) { + logger.info(this+ " received a request for similarity"); + RequestSimilarityMessage<T> message = receivedRequestSimilarityMessage.get(); + receivedDataPointForRequestSimilarity = message.getDataPoint(); + receivedRequestSimilarityRequester = message.getSender(); + return; + } else { + receivedDataPointForRequestSimilarity = null; + receivedRequestSimilarityRequester = null; + } + } + } + } + + + + @Override + protected void onDecide() { + switch (state) { + case DORMANT -> { + decideToDie = false; + if (receivedDataPointForRequestSimilarity == null && amas.getMasSettings().amasOptions().contains(AMASOption.Forget)) { + // TODO decide to die logic + + } + + } + case ACTIVE -> { + if (receivedRequestSimilarityRequester != null) { + computedSimilarityScoreDuringThisCycle = computeSimilarityScore(receivedDataPointForRequestSimilarity); + if (logger.isLoggable(Level.INFO)) + logger.info(this + " computed a similarity of " + computedSimilarityScoreDuringThisCycle + " with " + receivedRequestSimilarityRequester); + } + } + } + } + + @Override + protected void onAct() { + switch (state) { + case DORMANT -> { + if (receivedDataPointForRequestSimilarity != null) + nextState = State.ACTIVE; + else if (!receivedCandidateSimilarClusters.isEmpty()) { + for (var otherCluster : receivedCandidateSimilarClusters) { + absorb(otherCluster); + } + receivedCandidateSimilarClusters.clear(); + } else if (decideToDie) { + die(); + } + } + case ACTIVE -> { + if (receivedRequestSimilarityRequester != null) { + var message = new EvaluatedScoreMessage<T>(this, cluster, computedSimilarityScoreDuringThisCycle); + if (logger.isLoggable(Level.INFO)) + logger.info(this + " --> " + receivedRequestSimilarityRequester + " " + message); + receivedRequestSimilarityRequester.getMailbox().receive(message); + receivedRequestSimilarityRequester = null; + + if (computedSimilarityScoreDuringThisCycle >= getAmas().getMasSettings().fusionThreshold()) { + die(); + } else { + nextState = State.DORMANT; + } + } + } + } + applyNextState(); + } + + private void die() { + nextState = State.DEAD; + destroy(); + } + + private void applyNextState() { + if (state == nextState) { + amountOfCyclesWithTheSameState++; + if (amountOfCyclesWithTheSameState > 10 && state == State.ACTIVE) + throw new RuntimeException(this + " is in the state " + state + " for too long"); + } else { + amountOfCyclesWithTheSameState = 0; + } + state = nextState; + } + + private void absorb(Cluster<T> other) { + if (logger.isLoggable(Level.INFO)) + logger.info(this + " absorbs " + other); + + T newRepresentative = (T) amas.getMasSettings().dataPointFuser().apply(cluster.getRepresentative(), other.getRepresentative()); + Cluster<T> newCluster; + if (getAmas().getMasSettings().amasOptions().contains(AMASOption.KeepAllDataPoints)) + newCluster = new ExtendedCluster<>(newRepresentative, cluster, other); + else + newCluster = new Cluster<T>(newRepresentative, cluster, other); + setCluster(newCluster); + } + + private void setCluster(Cluster<T> newCluster) { + if (cluster != null) { + getAmas().getEnvironment().removeFromBucket(cluster.getRepresentative().getBucketId(), this); + if (getAmas().getMasSettings().eventStore() != null) + getAmas().getMasSettings().eventStore().add(new RemoveClusterEvent(cluster)); + } + + cluster = newCluster; + getAmas().getEnvironment().assignToBucket(cluster.getRepresentative().getBucketId(), this); + if (getAmas().getMasSettings().eventStore() != null) + getAmas().getMasSettings().eventStore().add(new NewClusterEvent(cluster)); + } + + @Override + protected void onDestroy() { + if (cluster != null) + if (getAmas().getMasSettings().eventStore() != null) + getAmas().getMasSettings().eventStore().add(new RemoveClusterEvent(cluster)); + } + + + + private float computeSimilarityScore(T other) { + return getAmas().getMasSettings().similarityScoreMethod().apply(this.cluster.getRepresentative(), other); + } +} diff --git a/src/main/java/fr/irit/smac/amas4dc/amas/agent/DataPointAgent.java b/src/main/java/fr/irit/smac/amas4dc/amas/agent/DataPointAgent.java new file mode 100644 index 0000000000000000000000000000000000000000..c331a3f21fcfafa03ba01f490ead7d1921214aee --- /dev/null +++ b/src/main/java/fr/irit/smac/amas4dc/amas/agent/DataPointAgent.java @@ -0,0 +1,149 @@ +package fr.irit.smac.amas4dc.amas.agent; + +import fr.irit.smac.amak.Agent; +import fr.irit.smac.amas4dc.amas.DynamicClusteringAMAS; +import fr.irit.smac.amas4dc.amas.DynamicClusteringEnvironment; +import fr.irit.smac.amas4dc.amas.messages.EvaluatedScoreMessage; +import fr.irit.smac.amas4dc.amas.messages.InformCandidateSimilarClusterAgentsMessage; +import fr.irit.smac.amas4dc.amas.messages.RequestSimilarityMessage; +import fr.irit.smac.amas4dc.cluster.Cluster; +import fr.irit.smac.amas4dc.cluster.DataPoint; + +import java.text.MessageFormat; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.logging.Level; +import java.util.logging.Logger; + +public class DataPointAgent<T extends DataPoint> extends Agent<DynamicClusteringAMAS<T>, DynamicClusteringEnvironment<T>> { + private static final float FLOAT_COMPARISON_EPSILON = 0.000001f; + private final T dataPoint; + private final List<ClusterAgent<T>> requestSimilarityMessageSentAgents = new ArrayList<>(); + private final Map<ClusterAgent<T>, ClusterSimilarityScore<T>> similarityScoresReceived = new HashMap<>(); + private static final Logger logger = Logger.getLogger(DataPointAgent.class.getName()); + private boolean decideToFuse, decideToCreateANewCluster; + + public DataPointAgent(DynamicClusteringAMAS<T> amas, T dataPoint) { + super(amas); + logger.info(this + " created"); + this.dataPoint = dataPoint; + } + + + private boolean isRoughlySimilarTo(ClusterAgent<T> otherClusterAgent) { + return getAmas().getMasSettings().similarityScoreMethod().areRoughlySimilar(dataPoint, otherClusterAgent.getCluster().getRepresentative()); + } + + @Override + protected void onReady() { + super.onReady(); + var clusterAgentsRoughlySimilarOnReady = new ArrayList<ClusterAgent<T>>(); + for (var other : amas.getEnvironment().getClusterAgentsFromBucket(dataPoint.getBucketId())) { + if (isRoughlySimilarTo(other)) { + clusterAgentsRoughlySimilarOnReady.add(other); + } + } + for (var agent : clusterAgentsRoughlySimilarOnReady) { + if (logger.isLoggable(Level.INFO)) + logger.info(this + " sends RequestSimilarityMessage to " + agent); + agent.getMailbox().receive(new RequestSimilarityMessage<T>(this, dataPoint)); + requestSimilarityMessageSentAgents.add(agent); + } + } + + @Override + protected void onPerceive() { + while (mailbox.hasMessageOfType(EvaluatedScoreMessage.class)) { + var evaluatedScoreMessage = this.getMailbox().read(EvaluatedScoreMessage.class); + if (evaluatedScoreMessage.isPresent()) { + similarityScoresReceived.put((ClusterAgent<T>) evaluatedScoreMessage.get().getSender(), new ClusterSimilarityScore<T>(evaluatedScoreMessage.get().getCluster(), evaluatedScoreMessage.get().getSimilarityScore())); + } + } + clearDeadAgentsFromThoseWeExpectAnAnswerFrom(); + } + + @Override + protected void onDecide() { + decideToFuse = false; + decideToCreateANewCluster = false; + if (requestSimilarityMessageSentAgents.size() == similarityScoresReceived.size()) { + if (logger.isLoggable(Level.INFO)) { + for (var e : similarityScoresReceived.entrySet()) { + logger.info(this + " received similarity score " + e.getValue() + " from " + e.getKey()); + } + } + if (similarityScoresReceived.values().stream().anyMatch(v -> v.score >= getAmas().getMasSettings().fusionThreshold())) + decideToFuse = true; + else + decideToCreateANewCluster = true; + + if (logger.isLoggable(Level.INFO)) { + if (decideToFuse) + logger.info(this + " decides to fuse with one of them"); + else if (decideToCreateANewCluster) + logger.info(this + " decides to create a new cluster"); + else + logger.info(this + " decides NOT to fuse not with one of them"); + } + } + } + + @Override + protected void onAct() { + if (decideToFuse) { + if (logger.isLoggable(Level.INFO)) + logger.info(this + " decides to fuse"); + var requestedAgentsWithHighestSimilarityScore = getReceivedClustersWithSimilarityScoreAboveFusionThreshold(similarityScoresReceived); + if (!requestedAgentsWithHighestSimilarityScore.isEmpty()) { + var newClusterAgent = new ClusterAgent<T>(amas, dataPoint); + newClusterAgent.getMailbox().receive(new InformCandidateSimilarClusterAgentsMessage<T>(this, requestedAgentsWithHighestSimilarityScore)); + destroy(); + } else { + throw new RuntimeException("Wanted to fuse with an agent but no agents are available"); + } + } else if (decideToCreateANewCluster) { + new ClusterAgent<T>(amas, dataPoint); + destroy(); + } + } + + @Override + protected void onDestroy() { + logger.info(MessageFormat.format("{0} dies", this)); + } + + private List<Cluster<T>> getReceivedClustersWithSimilarityScoreAboveFusionThreshold(Map<ClusterAgent<T>, ClusterSimilarityScore<T>> similarityScoresReceived) { + float highestScore = Float.MIN_VALUE; + List<Cluster<T>> clustersWithHighestScore = new ArrayList<>(); + for (var e : similarityScoresReceived.entrySet()) { + float clusterScore = e.getValue().score; + if (clusterScore >= getAmas().getMasSettings().fusionThreshold()) { + Cluster<T> cluster = e.getValue().cluster; + if (Math.abs(clusterScore - highestScore) < FLOAT_COMPARISON_EPSILON) { + clustersWithHighestScore.add(cluster); + } else if (clusterScore > highestScore) { + clustersWithHighestScore.clear(); + clustersWithHighestScore.add(cluster); + highestScore = clusterScore; + } + } + } + return clustersWithHighestScore; + } + + private void clearDeadAgentsFromThoseWeExpectAnAnswerFrom() { + requestSimilarityMessageSentAgents.removeIf(a -> a.getState() == ClusterAgent.State.DEAD && !similarityScoresReceived.keySet().contains(a)); + for (var key : + similarityScoresReceived.keySet()) { + if (!requestSimilarityMessageSentAgents.contains(key)) + similarityScoresReceived.remove(key); + } + } + + + private record ClusterSimilarityScore<T extends DataPoint>(Cluster<T> cluster, float score) { + + } +} diff --git a/src/main/java/fr/irit/smac/amas4dc/amas/controller/AMAS4DCCommandAndQueryHandler.java b/src/main/java/fr/irit/smac/amas4dc/amas/controller/AMAS4DCCommandAndQueryHandler.java index 161e4f1d34d0d2945c3dc95400af2f2eb33a0b00..f984f25546a9bf10c3b78e8e4bb1e1161b95efb0 100644 --- a/src/main/java/fr/irit/smac/amas4dc/amas/controller/AMAS4DCCommandAndQueryHandler.java +++ b/src/main/java/fr/irit/smac/amas4dc/amas/controller/AMAS4DCCommandAndQueryHandler.java @@ -2,6 +2,7 @@ package fr.irit.smac.amas4dc.amas.controller; import fr.irit.smac.amak.scheduling.Scheduler; import fr.irit.smac.amas4dc.amas.*; +import fr.irit.smac.amas4dc.amas.agent.ClusterAgent; import fr.irit.smac.amas4dc.amas.controller.command.NewDataPointCommand; import fr.irit.smac.amas4dc.amas.controller.command.ShutdownCommand; import fr.irit.smac.amas4dc.amas.controller.command.SolveCommand; diff --git a/src/main/java/fr/irit/smac/amas4dc/amas/messages/EvaluatedScoreMessage.java b/src/main/java/fr/irit/smac/amas4dc/amas/messages/EvaluatedScoreMessage.java index f2a381b37d31cf10ff80c9306d57c9ff59fea5e4..98755561c3208b4aa08f61cc0e8da3368d95ecf5 100644 --- a/src/main/java/fr/irit/smac/amas4dc/amas/messages/EvaluatedScoreMessage.java +++ b/src/main/java/fr/irit/smac/amas4dc/amas/messages/EvaluatedScoreMessage.java @@ -2,16 +2,22 @@ package fr.irit.smac.amas4dc.amas.messages; import fr.irit.smac.amak.Agent; import fr.irit.smac.amak.messaging.Message; +import fr.irit.smac.amas4dc.amas.agent.ClusterAgent; +import fr.irit.smac.amas4dc.cluster.Cluster; +import fr.irit.smac.amas4dc.cluster.DataPoint; import lombok.Getter; import lombok.ToString; @ToString -public class EvaluatedScoreMessage extends Message { +public class EvaluatedScoreMessage<T extends DataPoint> extends Message<ClusterAgent<T>> { + @Getter + private final Cluster<T> cluster; @Getter private final float similarityScore; - public EvaluatedScoreMessage(Agent sender, float similarityScore) { + public EvaluatedScoreMessage(ClusterAgent<T> sender, Cluster<T> cluster, float similarityScore) { super(sender); + this.cluster = cluster; this.similarityScore = similarityScore; } } diff --git a/src/main/java/fr/irit/smac/amas4dc/amas/messages/InformCandidateSimilarClusterAgentsMessage.java b/src/main/java/fr/irit/smac/amas4dc/amas/messages/InformCandidateSimilarClusterAgentsMessage.java new file mode 100644 index 0000000000000000000000000000000000000000..675ce923b45041e54ee69df64210f4909b0fe946 --- /dev/null +++ b/src/main/java/fr/irit/smac/amas4dc/amas/messages/InformCandidateSimilarClusterAgentsMessage.java @@ -0,0 +1,19 @@ +package fr.irit.smac.amas4dc.amas.messages; + +import fr.irit.smac.amak.messaging.Message; +import fr.irit.smac.amas4dc.amas.agent.DataPointAgent; +import fr.irit.smac.amas4dc.cluster.Cluster; +import fr.irit.smac.amas4dc.cluster.DataPoint; +import lombok.Getter; + +import java.util.List; + +public class InformCandidateSimilarClusterAgentsMessage<T extends DataPoint> extends Message<DataPointAgent<T>> { + @Getter + private final List<Cluster<T>> candidateSimilarCluster; + + public InformCandidateSimilarClusterAgentsMessage(DataPointAgent<T> sender, List<Cluster<T>> candidateSimilarCluster) { + super(sender); + this.candidateSimilarCluster = candidateSimilarCluster; + } +} diff --git a/src/main/java/fr/irit/smac/amas4dc/amas/messages/RequestSimilarityMessage.java b/src/main/java/fr/irit/smac/amas4dc/amas/messages/RequestSimilarityMessage.java index f2846dd4cd721171a1c1e133b4c05c03b3561119..8e6e694bbefe2f2012776f659886a2fc083e3f3b 100644 --- a/src/main/java/fr/irit/smac/amas4dc/amas/messages/RequestSimilarityMessage.java +++ b/src/main/java/fr/irit/smac/amas4dc/amas/messages/RequestSimilarityMessage.java @@ -1,16 +1,18 @@ package fr.irit.smac.amas4dc.amas.messages; +import fr.irit.smac.amas4dc.amas.agent.DataPointAgent; import fr.irit.smac.amas4dc.cluster.Cluster; import fr.irit.smac.amak.Agent; import fr.irit.smac.amak.messaging.Message; +import fr.irit.smac.amas4dc.cluster.DataPoint; import lombok.Getter; -public class RequestSimilarityMessage extends Message { +public class RequestSimilarityMessage<T extends DataPoint> extends Message<DataPointAgent<T>> { @Getter - private final Cluster cluster; + private final T dataPoint; - public RequestSimilarityMessage(Agent sender, Cluster cluster) { + public RequestSimilarityMessage(DataPointAgent<T> sender, T dataPoint) { super(sender); - this.cluster = cluster; + this.dataPoint = dataPoint; } } diff --git a/src/main/java/fr/irit/smac/amas4dc/cluster/Cluster.java b/src/main/java/fr/irit/smac/amas4dc/cluster/Cluster.java index 18bb68045cc70133cbdef4269a43e60e41f6f11c..ede936d4e8d0b63c36fb368b898f4460442b9c1c 100644 --- a/src/main/java/fr/irit/smac/amas4dc/cluster/Cluster.java +++ b/src/main/java/fr/irit/smac/amas4dc/cluster/Cluster.java @@ -27,5 +27,10 @@ public class Cluster<T extends DataPoint> { this.representative = representative; this.size = c1.size + c2.size; } + + public Cluster(T representative, T newDataPoint, Cluster<T> c2) { + this.representative = representative; + this.size = 1 + c2.size; + } } diff --git a/src/main/java/fr/irit/smac/amas4dc/cluster/ExtendedCluster.java b/src/main/java/fr/irit/smac/amas4dc/cluster/ExtendedCluster.java index 241b12133f852229acb06bd4dcfa29ddf8a4826c..d2bc0c0f3493b200b31ebd2efc6f56d87ebc0376 100644 --- a/src/main/java/fr/irit/smac/amas4dc/cluster/ExtendedCluster.java +++ b/src/main/java/fr/irit/smac/amas4dc/cluster/ExtendedCluster.java @@ -26,4 +26,17 @@ public class ExtendedCluster<T extends DataPoint> extends Cluster<T> { this.content = Collections.unmodifiableList(content); } + + public ExtendedCluster(T representative, T newDataPoint, Cluster<T> c2) { + super(representative, newDataPoint, c2); + + if (!(c2 instanceof ExtendedCluster<T> ec2)) + throw new ExtendedClusterException("ExtendedCluster can only be made out of an ExtendedCluster object"); + + var content = new ArrayList<T>(); + content.add(newDataPoint); + content.addAll(ec2.getContent()); + + this.content = Collections.unmodifiableList(content); + } } diff --git a/src/test/groovy/fr/irit/smac/amas4dc/amas/ClusterAgentTest.groovy b/src/test/groovy/fr/irit/smac/amas4dc/amas/ClusterAgentTest.groovy index c858fd57f69270ca4e87189b84ce08f109071313..47f6e4b9690c7b2f4961918d622c0dfcd6006427 100644 --- a/src/test/groovy/fr/irit/smac/amas4dc/amas/ClusterAgentTest.groovy +++ b/src/test/groovy/fr/irit/smac/amas4dc/amas/ClusterAgentTest.groovy @@ -2,6 +2,8 @@ package fr.irit.smac.amas4dc.amas import fr.irit.smac.amak.messaging.Mailbox +import fr.irit.smac.amas4dc.amas.agent.ClusterAgent +import fr.irit.smac.amas4dc.amas.agent.DataPointAgent import fr.irit.smac.amas4dc.amas.messages.EvaluatedScoreMessage import fr.irit.smac.amas4dc.amas.messages.RequestSimilarityMessage import fr.irit.smac.amas4dc.cluster.Cluster @@ -30,11 +32,11 @@ class ClusterAgentTest extends Specification { requesterMailboxMock.receive(_) >> { msg -> messageReceivedByRequester.set(msg) } - def requesterAgentMock = Mock(ClusterAgent) + def requesterAgentMock = Mock(DataPointAgent) requesterAgentMock.getMailbox() >> requesterMailboxMock when: - agent.mailbox.receive(new RequestSimilarityMessage(requesterAgentMock, Mock(Cluster))) + agent.mailbox.receive(new RequestSimilarityMessage(requesterAgentMock, Mock(DataPoint))) agent.cycle() agent.cycle()