From 6da224cd3713576215e6b8b9095e2933f211b9b6 Mon Sep 17 00:00:00 2001 From: Alexandre <alexandre.perles@gmail.com> Date: Tue, 6 Jun 2023 16:28:52 +0200 Subject: [PATCH] Allow packing of agents by bucket --- .../irit/smac/clumate/amas/ClusterAgent.java | 24 ++++++++++++------- .../smac/clumate/amas/ClusterEnvironment.java | 22 ++++++++++++++--- .../irit/smac/clumate/cluster/DataPoint.java | 1 + 3 files changed, 36 insertions(+), 11 deletions(-) diff --git a/src/main/java/fr/irit/smac/clumate/amas/ClusterAgent.java b/src/main/java/fr/irit/smac/clumate/amas/ClusterAgent.java index 958b2ec..5fccb7d 100644 --- a/src/main/java/fr/irit/smac/clumate/amas/ClusterAgent.java +++ b/src/main/java/fr/irit/smac/clumate/amas/ClusterAgent.java @@ -42,7 +42,7 @@ public class ClusterAgent<T extends DataPoint> extends Agent<ClusterAMAS<T>, Clu protected ClusterAgent(ClusterAMAS amas, T dataPoint) { super(amas); - this.cluster = new Cluster<T>(dataPoint); + this.setCluster(new Cluster<T>(dataPoint)); } public enum State {DORMANT, WAITING_FOR_REPLY, DECIDING, INIT, DEAD, ACTIVE} @@ -53,11 +53,11 @@ public class ClusterAgent<T extends DataPoint> extends Agent<ClusterAMAS<T>, Clu @Override protected void onReady() { - for (var agent : - amas.getAgents()) { - if (agent instanceof ClusterAgent otherClusterAgent && otherClusterAgent != this) { - if (isRoughlySimilarTo(otherClusterAgent)) { - clusterAgentsRoughlySimilarOnReady.add(otherClusterAgent); + for (var other : + amas.getEnvironment().getAgentsFromBucket(cluster.getRepresentative().getBucketId())) { + if (other != this) { + if (isRoughlySimilarTo(other)) { + clusterAgentsRoughlySimilarOnReady.add(other); } } } @@ -241,12 +241,20 @@ public class ClusterAgent<T extends DataPoint> extends Agent<ClusterAMAS<T>, Clu if (logger.isLoggable(Level.INFO)) logger.info(this + " fuses with " + other); - cluster = fuse(other.cluster); + setCluster(fuse(other.cluster)); + } + + private void setCluster(Cluster<T> newCluster) { + if (cluster != null) + getAmas().getEnvironment().removeFromBucket(cluster.getRepresentative().getBucketId(), this); + + cluster = newCluster; + getAmas().getEnvironment().assignToBucket(cluster.getRepresentative().getBucketId(), this); } private Cluster<T> fuse(Cluster<T> other) { T newRepresentative = (T) amas.getMasSettings().dataPointFuser().apply(cluster.getRepresentative(), other.getRepresentative()); - return new Cluster<T>(newRepresentative, cluster.getSize()+other.getSize()); + return new Cluster<T>(newRepresentative, cluster.getSize() + other.getSize()); } private List<ClusterAgent<T>> getRequestedAgentsWithSimilarityScoreAboveFusionThreshold(Map<ClusterAgent<T>, Float> similarityScoresReceived) { diff --git a/src/main/java/fr/irit/smac/clumate/amas/ClusterEnvironment.java b/src/main/java/fr/irit/smac/clumate/amas/ClusterEnvironment.java index 06b0726..64246ba 100644 --- a/src/main/java/fr/irit/smac/clumate/amas/ClusterEnvironment.java +++ b/src/main/java/fr/irit/smac/clumate/amas/ClusterEnvironment.java @@ -4,9 +4,8 @@ import fr.irit.smac.amak.Environment; import fr.irit.smac.clumate.cluster.DataPoint; import lombok.Getter; -import java.util.List; -import java.util.Optional; -import java.util.Queue; +import java.util.*; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.logging.Logger; @@ -16,17 +15,34 @@ public class ClusterEnvironment<T extends DataPoint> extends Environment { private static final Logger logger = Logger.getLogger(ClusterEnvironment.class.getName()); @Getter private T lastPolledPendingDataPoint = null; + private final Map<Integer, List<ClusterAgent<T>>> buckets = new ConcurrentHashMap<>(); public boolean hasRemainingPendingAdditionDataPoints() { return !pendingAdditionDataPoints.isEmpty(); } + public Optional<T> pollPendingDataPoint() { var dataPoint = pendingAdditionDataPoints.poll(); if (dataPoint != null) lastPolledPendingDataPoint = dataPoint; return Optional.ofNullable(dataPoint); } + public void addDataPoints(List<T> newDataPoints) { pendingAdditionDataPoints.addAll(newDataPoints); } + + public void assignToBucket(int bucketId, ClusterAgent<T> tClusterAgent) { + this.buckets.computeIfAbsent(bucketId, b -> Collections.synchronizedList(new ArrayList<>())).add(tClusterAgent); + } + + public void removeFromBucket(int bucketId, ClusterAgent<T> tClusterAgent) { + if (!buckets.containsKey(bucketId)) + return; + this.buckets.get(bucketId).remove(tClusterAgent); + } + + public List<ClusterAgent<T>> getAgentsFromBucket(int bucketId) { + return this.buckets.getOrDefault(bucketId, new ArrayList<>()); + } } diff --git a/src/main/java/fr/irit/smac/clumate/cluster/DataPoint.java b/src/main/java/fr/irit/smac/clumate/cluster/DataPoint.java index 6189901..b6e11c0 100644 --- a/src/main/java/fr/irit/smac/clumate/cluster/DataPoint.java +++ b/src/main/java/fr/irit/smac/clumate/cluster/DataPoint.java @@ -1,4 +1,5 @@ package fr.irit.smac.clumate.cluster; public interface DataPoint { + int getBucketId(); } -- GitLab