diff --git a/src/main/java/fr/irit/smac/clumate/amas/ClusterAgent.java b/src/main/java/fr/irit/smac/clumate/amas/ClusterAgent.java index 958b2eca79dfb162a54c8df0c330d2a14069fadf..5fccb7db6a6fda10262b0b4ecd42bde05fb24eb3 100644 --- a/src/main/java/fr/irit/smac/clumate/amas/ClusterAgent.java +++ b/src/main/java/fr/irit/smac/clumate/amas/ClusterAgent.java @@ -42,7 +42,7 @@ public class ClusterAgent<T extends DataPoint> extends Agent<ClusterAMAS<T>, Clu protected ClusterAgent(ClusterAMAS amas, T dataPoint) { super(amas); - this.cluster = new Cluster<T>(dataPoint); + this.setCluster(new Cluster<T>(dataPoint)); } public enum State {DORMANT, WAITING_FOR_REPLY, DECIDING, INIT, DEAD, ACTIVE} @@ -53,11 +53,11 @@ public class ClusterAgent<T extends DataPoint> extends Agent<ClusterAMAS<T>, Clu @Override protected void onReady() { - for (var agent : - amas.getAgents()) { - if (agent instanceof ClusterAgent otherClusterAgent && otherClusterAgent != this) { - if (isRoughlySimilarTo(otherClusterAgent)) { - clusterAgentsRoughlySimilarOnReady.add(otherClusterAgent); + for (var other : + amas.getEnvironment().getAgentsFromBucket(cluster.getRepresentative().getBucketId())) { + if (other != this) { + if (isRoughlySimilarTo(other)) { + clusterAgentsRoughlySimilarOnReady.add(other); } } } @@ -241,12 +241,20 @@ public class ClusterAgent<T extends DataPoint> extends Agent<ClusterAMAS<T>, Clu if (logger.isLoggable(Level.INFO)) logger.info(this + " fuses with " + other); - cluster = fuse(other.cluster); + setCluster(fuse(other.cluster)); + } + + private void setCluster(Cluster<T> newCluster) { + if (cluster != null) + getAmas().getEnvironment().removeFromBucket(cluster.getRepresentative().getBucketId(), this); + + cluster = newCluster; + getAmas().getEnvironment().assignToBucket(cluster.getRepresentative().getBucketId(), this); } private Cluster<T> fuse(Cluster<T> other) { T newRepresentative = (T) amas.getMasSettings().dataPointFuser().apply(cluster.getRepresentative(), other.getRepresentative()); - return new Cluster<T>(newRepresentative, cluster.getSize()+other.getSize()); + return new Cluster<T>(newRepresentative, cluster.getSize() + other.getSize()); } private List<ClusterAgent<T>> getRequestedAgentsWithSimilarityScoreAboveFusionThreshold(Map<ClusterAgent<T>, Float> similarityScoresReceived) { diff --git a/src/main/java/fr/irit/smac/clumate/amas/ClusterEnvironment.java b/src/main/java/fr/irit/smac/clumate/amas/ClusterEnvironment.java index 06b07263967983d48583951217494518d322f464..64246ba55aaf81948e12fd5ba16ac45599398134 100644 --- a/src/main/java/fr/irit/smac/clumate/amas/ClusterEnvironment.java +++ b/src/main/java/fr/irit/smac/clumate/amas/ClusterEnvironment.java @@ -4,9 +4,8 @@ import fr.irit.smac.amak.Environment; import fr.irit.smac.clumate.cluster.DataPoint; import lombok.Getter; -import java.util.List; -import java.util.Optional; -import java.util.Queue; +import java.util.*; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.logging.Logger; @@ -16,17 +15,34 @@ public class ClusterEnvironment<T extends DataPoint> extends Environment { private static final Logger logger = Logger.getLogger(ClusterEnvironment.class.getName()); @Getter private T lastPolledPendingDataPoint = null; + private final Map<Integer, List<ClusterAgent<T>>> buckets = new ConcurrentHashMap<>(); public boolean hasRemainingPendingAdditionDataPoints() { return !pendingAdditionDataPoints.isEmpty(); } + public Optional<T> pollPendingDataPoint() { var dataPoint = pendingAdditionDataPoints.poll(); if (dataPoint != null) lastPolledPendingDataPoint = dataPoint; return Optional.ofNullable(dataPoint); } + public void addDataPoints(List<T> newDataPoints) { pendingAdditionDataPoints.addAll(newDataPoints); } + + public void assignToBucket(int bucketId, ClusterAgent<T> tClusterAgent) { + this.buckets.computeIfAbsent(bucketId, b -> Collections.synchronizedList(new ArrayList<>())).add(tClusterAgent); + } + + public void removeFromBucket(int bucketId, ClusterAgent<T> tClusterAgent) { + if (!buckets.containsKey(bucketId)) + return; + this.buckets.get(bucketId).remove(tClusterAgent); + } + + public List<ClusterAgent<T>> getAgentsFromBucket(int bucketId) { + return this.buckets.getOrDefault(bucketId, new ArrayList<>()); + } } diff --git a/src/main/java/fr/irit/smac/clumate/cluster/DataPoint.java b/src/main/java/fr/irit/smac/clumate/cluster/DataPoint.java index 6189901916c975d4849f41a59578d04add126ac0..b6e11c09cd49ce4d2a057e9e36e89ba2257d1fe5 100644 --- a/src/main/java/fr/irit/smac/clumate/cluster/DataPoint.java +++ b/src/main/java/fr/irit/smac/clumate/cluster/DataPoint.java @@ -1,4 +1,5 @@ package fr.irit.smac.clumate.cluster; public interface DataPoint { + int getBucketId(); }