Skip to content
Snippets Groups Projects
Commit 6da224cd authored by Alexandre's avatar Alexandre
Browse files

Allow packing of agents by bucket

parent b23186c3
Branches
Tags
No related merge requests found
......@@ -42,7 +42,7 @@ public class ClusterAgent<T extends DataPoint> extends Agent<ClusterAMAS<T>, Clu
protected ClusterAgent(ClusterAMAS amas, T dataPoint) {
super(amas);
this.cluster = new Cluster<T>(dataPoint);
this.setCluster(new Cluster<T>(dataPoint));
}
public enum State {DORMANT, WAITING_FOR_REPLY, DECIDING, INIT, DEAD, ACTIVE}
......@@ -53,11 +53,11 @@ public class ClusterAgent<T extends DataPoint> extends Agent<ClusterAMAS<T>, Clu
@Override
protected void onReady() {
for (var agent :
amas.getAgents()) {
if (agent instanceof ClusterAgent otherClusterAgent && otherClusterAgent != this) {
if (isRoughlySimilarTo(otherClusterAgent)) {
clusterAgentsRoughlySimilarOnReady.add(otherClusterAgent);
for (var other :
amas.getEnvironment().getAgentsFromBucket(cluster.getRepresentative().getBucketId())) {
if (other != this) {
if (isRoughlySimilarTo(other)) {
clusterAgentsRoughlySimilarOnReady.add(other);
}
}
}
......@@ -241,12 +241,20 @@ public class ClusterAgent<T extends DataPoint> extends Agent<ClusterAMAS<T>, Clu
if (logger.isLoggable(Level.INFO))
logger.info(this + " fuses with " + other);
cluster = fuse(other.cluster);
setCluster(fuse(other.cluster));
}
private void setCluster(Cluster<T> newCluster) {
if (cluster != null)
getAmas().getEnvironment().removeFromBucket(cluster.getRepresentative().getBucketId(), this);
cluster = newCluster;
getAmas().getEnvironment().assignToBucket(cluster.getRepresentative().getBucketId(), this);
}
private Cluster<T> fuse(Cluster<T> other) {
T newRepresentative = (T) amas.getMasSettings().dataPointFuser().apply(cluster.getRepresentative(), other.getRepresentative());
return new Cluster<T>(newRepresentative, cluster.getSize()+other.getSize());
return new Cluster<T>(newRepresentative, cluster.getSize() + other.getSize());
}
private List<ClusterAgent<T>> getRequestedAgentsWithSimilarityScoreAboveFusionThreshold(Map<ClusterAgent<T>, Float> similarityScoresReceived) {
......
......@@ -4,9 +4,8 @@ import fr.irit.smac.amak.Environment;
import fr.irit.smac.clumate.cluster.DataPoint;
import lombok.Getter;
import java.util.List;
import java.util.Optional;
import java.util.Queue;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.logging.Logger;
......@@ -16,17 +15,34 @@ public class ClusterEnvironment<T extends DataPoint> extends Environment {
private static final Logger logger = Logger.getLogger(ClusterEnvironment.class.getName());
@Getter
private T lastPolledPendingDataPoint = null;
private final Map<Integer, List<ClusterAgent<T>>> buckets = new ConcurrentHashMap<>();
public boolean hasRemainingPendingAdditionDataPoints() {
return !pendingAdditionDataPoints.isEmpty();
}
public Optional<T> pollPendingDataPoint() {
var dataPoint = pendingAdditionDataPoints.poll();
if (dataPoint != null)
lastPolledPendingDataPoint = dataPoint;
return Optional.ofNullable(dataPoint);
}
public void addDataPoints(List<T> newDataPoints) {
pendingAdditionDataPoints.addAll(newDataPoints);
}
public void assignToBucket(int bucketId, ClusterAgent<T> tClusterAgent) {
this.buckets.computeIfAbsent(bucketId, b -> Collections.synchronizedList(new ArrayList<>())).add(tClusterAgent);
}
public void removeFromBucket(int bucketId, ClusterAgent<T> tClusterAgent) {
if (!buckets.containsKey(bucketId))
return;
this.buckets.get(bucketId).remove(tClusterAgent);
}
public List<ClusterAgent<T>> getAgentsFromBucket(int bucketId) {
return this.buckets.getOrDefault(bucketId, new ArrayList<>());
}
}
package fr.irit.smac.clumate.cluster;
public interface DataPoint {
int getBucketId();
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment