From c6bdbe2e223ef7721e785af9560742a41adc507b Mon Sep 17 00:00:00 2001
From: Alexandre <alexandre.perles@gmail.com>
Date: Wed, 7 Jun 2023 09:15:40 +0200
Subject: [PATCH] Push data to event store (Event Sourcing)

---
 build.gradle                                    |  5 ++++-
 src/main/java/fr/irit/smac/clumate/CluMATE.java |  2 ++
 .../fr/irit/smac/clumate/amas/ClusterAMAS.java  |  2 ++
 .../fr/irit/smac/clumate/amas/ClusterAgent.java | 17 ++++++++++++++++-
 .../fr/irit/smac/clumate/amas/MASSettings.java  | 14 ++++++++++++--
 .../amas/controller/CluMATEController.java      |  2 ++
 .../fr/irit/smac/clumate/cluster/Cluster.java   |  4 ++++
 .../fr/irit/smac/clumate/cluster/DataPoint.java |  4 +++-
 .../smac/clumate/event/NewClusterEvent.java     |  8 ++++++++
 .../smac/clumate/event/RemoveClusterEvent.java  |  8 ++++++++
 src/test/groovy/SimpleClusteringIT.groovy       |  2 ++
 .../smac/clumate/amas/ClusterAgentTest.groovy   |  3 ++-
 12 files changed, 65 insertions(+), 6 deletions(-)
 create mode 100644 src/main/java/fr/irit/smac/clumate/event/NewClusterEvent.java
 create mode 100644 src/main/java/fr/irit/smac/clumate/event/RemoveClusterEvent.java

diff --git a/build.gradle b/build.gradle
index 98f190c..78213ff 100644
--- a/build.gradle
+++ b/build.gradle
@@ -13,8 +13,11 @@ repositories {
 
 dependencies {
     // AMAK (Only one of the next two lines should be uncommented. Also, settings.gradle file should also be modified)
-     implementation 'com.github.alexandreprl:amak:3.0.4'// Uncomment this line to get AMAK from git repository (The last part matches a tag, a commit hash or the last commit of a branch : branchName-SNAPSHOT
+     implementation 'com.github.alexandreprl:amak:3.1.0'
+// Uncomment this line to get AMAK from git repository (The last part matches a tag, a commit hash or the last commit of a branch : branchName-SNAPSHOT
 //    implementation project(':amak') // Uncomment this line to get AMAK from local
+
+    
     testImplementation platform('org.junit:junit-bom:5.9.1')
     testImplementation 'org.junit.jupiter:junit-jupiter'
 
diff --git a/src/main/java/fr/irit/smac/clumate/CluMATE.java b/src/main/java/fr/irit/smac/clumate/CluMATE.java
index d1c1d42..8dfbabf 100644
--- a/src/main/java/fr/irit/smac/clumate/CluMATE.java
+++ b/src/main/java/fr/irit/smac/clumate/CluMATE.java
@@ -1,11 +1,13 @@
 package fr.irit.smac.clumate;
 
+import fr.irit.smac.amak.event.EventStore;
 import fr.irit.smac.clumate.amas.MASSettings;
 import fr.irit.smac.clumate.amas.controller.CluMATEController;
 import fr.irit.smac.clumate.amas.controller.command.NewDataPointCommand;
 import fr.irit.smac.clumate.amas.controller.command.SolveCommand;
 import fr.irit.smac.clumate.amas.controller.query.Result;
 import fr.irit.smac.clumate.amas.controller.query.RetrieveClustersResultQuery;
+import fr.irit.smac.clumate.cluster.Cluster;
 import fr.irit.smac.clumate.cluster.DataPoint;
 
 import java.util.List;
diff --git a/src/main/java/fr/irit/smac/clumate/amas/ClusterAMAS.java b/src/main/java/fr/irit/smac/clumate/amas/ClusterAMAS.java
index ed241f0..e127ef6 100644
--- a/src/main/java/fr/irit/smac/clumate/amas/ClusterAMAS.java
+++ b/src/main/java/fr/irit/smac/clumate/amas/ClusterAMAS.java
@@ -1,6 +1,8 @@
 package fr.irit.smac.clumate.amas;
 
 import fr.irit.smac.amak.Amas;
+import fr.irit.smac.amak.event.EventStore;
+import fr.irit.smac.clumate.cluster.Cluster;
 import fr.irit.smac.clumate.cluster.DataPoint;
 import lombok.Getter;
 import lombok.Setter;
diff --git a/src/main/java/fr/irit/smac/clumate/amas/ClusterAgent.java b/src/main/java/fr/irit/smac/clumate/amas/ClusterAgent.java
index 5fccb7d..7255ce5 100644
--- a/src/main/java/fr/irit/smac/clumate/amas/ClusterAgent.java
+++ b/src/main/java/fr/irit/smac/clumate/amas/ClusterAgent.java
@@ -1,10 +1,13 @@
 package fr.irit.smac.clumate.amas;
 
 import fr.irit.smac.amak.Agent;
+import fr.irit.smac.amak.event.EventStore;
 import fr.irit.smac.clumate.amas.messages.EvaluatedScoreMessage;
 import fr.irit.smac.clumate.amas.messages.RequestSimilarityMessage;
 import fr.irit.smac.clumate.cluster.Cluster;
 import fr.irit.smac.clumate.cluster.DataPoint;
+import fr.irit.smac.clumate.event.NewClusterEvent;
+import fr.irit.smac.clumate.event.RemoveClusterEvent;
 import lombok.Getter;
 
 import java.text.MessageFormat;
@@ -245,11 +248,23 @@ public class ClusterAgent<T extends DataPoint> extends Agent<ClusterAMAS<T>, Clu
 	}
 
 	private void setCluster(Cluster<T> newCluster) {
-		if (cluster != null)
+		if (cluster != null) {
 			getAmas().getEnvironment().removeFromBucket(cluster.getRepresentative().getBucketId(), this);
+			if (getAmas().getMasSettings().eventStore() != null)
+				getAmas().getMasSettings().eventStore().add(new RemoveClusterEvent(cluster));
+		}
 
 		cluster = newCluster;
 		getAmas().getEnvironment().assignToBucket(cluster.getRepresentative().getBucketId(), this);
+		if (getAmas().getMasSettings().eventStore() != null)
+			getAmas().getMasSettings().eventStore().add(new NewClusterEvent(cluster));
+	}
+
+	@Override
+	protected void onDestroy() {
+		if (cluster != null)
+			if (getAmas().getMasSettings().eventStore() != null)
+				getAmas().getMasSettings().eventStore().add(new RemoveClusterEvent(cluster));
 	}
 
 	private Cluster<T> fuse(Cluster<T> other) {
diff --git a/src/main/java/fr/irit/smac/clumate/amas/MASSettings.java b/src/main/java/fr/irit/smac/clumate/amas/MASSettings.java
index c8721cf..9adf535 100644
--- a/src/main/java/fr/irit/smac/clumate/amas/MASSettings.java
+++ b/src/main/java/fr/irit/smac/clumate/amas/MASSettings.java
@@ -1,11 +1,21 @@
 package fr.irit.smac.clumate.amas;
 
+import fr.irit.smac.amak.event.EventStore;
+import fr.irit.smac.clumate.cluster.Cluster;
 import fr.irit.smac.clumate.cluster.DataPoint;
 import fr.irit.smac.clumate.cluster.DataPointFuser;
 import fr.irit.smac.clumate.cluster.SimilarityScoreMethod;
 
 import java.util.EnumSet;
 
-public record MASSettings<T extends DataPoint>(SimilarityScoreMethod<T> similarityScoreMethod, float fusionThreshold, EnumSet<AMASOption> amasOptions, DataPointFuser<T> dataPointFuser) {
-
+public record MASSettings<T extends DataPoint>(
+		SimilarityScoreMethod<T> similarityScoreMethod,
+		float fusionThreshold,
+		EnumSet<AMASOption> amasOptions,
+		DataPointFuser<T> dataPointFuser,
+		EventStore<Cluster<T>> eventStore
+) {
+	public MASSettings(SimilarityScoreMethod<T> similarityScoreMethod, float fusionThreshold, EnumSet<AMASOption> amasOptions, DataPointFuser<T> dataPointFuser) {
+		this(similarityScoreMethod, fusionThreshold, amasOptions, dataPointFuser, null);
+	}
 }
diff --git a/src/main/java/fr/irit/smac/clumate/amas/controller/CluMATEController.java b/src/main/java/fr/irit/smac/clumate/amas/controller/CluMATEController.java
index 64bcb2b..7f14015 100644
--- a/src/main/java/fr/irit/smac/clumate/amas/controller/CluMATEController.java
+++ b/src/main/java/fr/irit/smac/clumate/amas/controller/CluMATEController.java
@@ -1,5 +1,6 @@
 package fr.irit.smac.clumate.amas.controller;
 
+import fr.irit.smac.amak.event.EventStore;
 import fr.irit.smac.amak.scheduling.Scheduler;
 import fr.irit.smac.clumate.amas.controller.command.NewDataPointCommand;
 import fr.irit.smac.clumate.amas.controller.command.ShutdownCommand;
@@ -10,6 +11,7 @@ import fr.irit.smac.clumate.amas.ClusterAMAS;
 import fr.irit.smac.clumate.amas.ClusterAgent;
 import fr.irit.smac.clumate.amas.ClusterEnvironment;
 import fr.irit.smac.clumate.amas.MASSettings;
+import fr.irit.smac.clumate.cluster.Cluster;
 import fr.irit.smac.clumate.cluster.DataPoint;
 import lombok.Getter;
 
diff --git a/src/main/java/fr/irit/smac/clumate/cluster/Cluster.java b/src/main/java/fr/irit/smac/clumate/cluster/Cluster.java
index 3c3576a..bf5eb79 100644
--- a/src/main/java/fr/irit/smac/clumate/cluster/Cluster.java
+++ b/src/main/java/fr/irit/smac/clumate/cluster/Cluster.java
@@ -3,8 +3,12 @@ package fr.irit.smac.clumate.cluster;
 import lombok.Getter;
 import lombok.ToString;
 
+import java.util.UUID;
+
 @ToString
 public class Cluster<T extends DataPoint> {
+	@Getter
+	private final String id = UUID.randomUUID().toString();
 	@Getter
 	private final T representative;
 	@Getter
diff --git a/src/main/java/fr/irit/smac/clumate/cluster/DataPoint.java b/src/main/java/fr/irit/smac/clumate/cluster/DataPoint.java
index b6e11c0..923b8b1 100644
--- a/src/main/java/fr/irit/smac/clumate/cluster/DataPoint.java
+++ b/src/main/java/fr/irit/smac/clumate/cluster/DataPoint.java
@@ -1,5 +1,7 @@
 package fr.irit.smac.clumate.cluster;
 
 public interface DataPoint {
-	int getBucketId();
+	default int getBucketId() {
+		return 0;
+	}
 }
diff --git a/src/main/java/fr/irit/smac/clumate/event/NewClusterEvent.java b/src/main/java/fr/irit/smac/clumate/event/NewClusterEvent.java
new file mode 100644
index 0000000..2ef7580
--- /dev/null
+++ b/src/main/java/fr/irit/smac/clumate/event/NewClusterEvent.java
@@ -0,0 +1,8 @@
+package fr.irit.smac.clumate.event;
+
+import fr.irit.smac.amak.event.Event;
+import fr.irit.smac.clumate.cluster.Cluster;
+import fr.irit.smac.clumate.cluster.DataPoint;
+
+public record NewClusterEvent<T extends DataPoint>(Cluster<T> cluster) implements Event {
+}
diff --git a/src/main/java/fr/irit/smac/clumate/event/RemoveClusterEvent.java b/src/main/java/fr/irit/smac/clumate/event/RemoveClusterEvent.java
new file mode 100644
index 0000000..6137a81
--- /dev/null
+++ b/src/main/java/fr/irit/smac/clumate/event/RemoveClusterEvent.java
@@ -0,0 +1,8 @@
+package fr.irit.smac.clumate.event;
+
+import fr.irit.smac.amak.event.Event;
+import fr.irit.smac.clumate.cluster.Cluster;
+import fr.irit.smac.clumate.cluster.DataPoint;
+
+public record RemoveClusterEvent<T extends DataPoint>(Cluster<T> cluster) implements Event {
+}
diff --git a/src/test/groovy/SimpleClusteringIT.groovy b/src/test/groovy/SimpleClusteringIT.groovy
index f77a4ba..a27cd58 100644
--- a/src/test/groovy/SimpleClusteringIT.groovy
+++ b/src/test/groovy/SimpleClusteringIT.groovy
@@ -1,6 +1,8 @@
+import fr.irit.smac.amak.event.EventStore
 import fr.irit.smac.clumate.CluMATE
 import fr.irit.smac.clumate.amas.AMASOption
 import fr.irit.smac.clumate.amas.MASSettings
+import fr.irit.smac.clumate.cluster.Cluster
 import fr.irit.smac.clumate.cluster.DataPoint
 import fr.irit.smac.clumate.cluster.SimilarityScoreMethod
 import spock.lang.Specification
diff --git a/src/test/groovy/fr/irit/smac/clumate/amas/ClusterAgentTest.groovy b/src/test/groovy/fr/irit/smac/clumate/amas/ClusterAgentTest.groovy
index 473cbe6..699e79b 100644
--- a/src/test/groovy/fr/irit/smac/clumate/amas/ClusterAgentTest.groovy
+++ b/src/test/groovy/fr/irit/smac/clumate/amas/ClusterAgentTest.groovy
@@ -1,6 +1,6 @@
 package fr.irit.smac.clumate.amas
 
-
+import fr.irit.smac.amak.event.EventStore
 import fr.irit.smac.amak.messaging.Mailbox
 import fr.irit.smac.amak.messaging.Message
 import fr.irit.smac.clumate.amas.messages.EvaluatedScoreMessage
@@ -22,6 +22,7 @@ class ClusterAgentTest extends Specification {
 
 		def amasMock = Mock(ClusterAMAS<DataPoint>)
 		amasMock.getMasSettings() >> new MASSettings(similarityScoreMethodMock, 0.5f, EnumSet.noneOf(AMASOption), Mock(DataPointFuser))
+		amasMock.getEnvironment() >> Mock(ClusterEnvironment)
 
 		def agent = new ClusterAgent(amasMock, Mock(DataPoint))
 		agent.state = ClusterAgent.State.DORMANT
-- 
GitLab