Skip to content
Snippets Groups Projects
Commit e0be72b1 authored by shinedday's avatar shinedday
Browse files

Version : IoTAMAK 0.0.5

Amas :
 * Rework the way to add agents to give more option to user, can now choose multiple agents and choose agent destination

From : add_agent(self, experience_name: str, args: List = None)
To : add_agent(self, experience_name: str, client_ip: str = None, agent_name: str = "agent.py", args: List = None)

param experience_name: name of the experience folder
param client_ip: if the agent should be created in a specific device, you can specify an ip address, otherwise the Amas will try to share the work between the devices
param agent_name: if using multiple kind of agent, you can specify the relative path in the experiment directory to the agent file to use
param args: if any argument is needed to initiate the new agent

Added Mail class, that can be used to communicate between agents, a mail contain the sender id, the cycle of the mail creation as well as a payload that can be anything serializable.

Added CommunicatingAgent class, that can be used instead of agent
 * A mailbox that work like a stack of mail
 * A method to send mails
 * And a method de receaive them

Scheduler :
 * Fixed a bug where the mqtt client would lock himself making communication with the scheduler impossible
parent da9f815a
Branches
No related tags found
No related merge requests found
......@@ -7,3 +7,4 @@ venv
__pycache__/
build/
iotAmak.egg-info/
README.html
# IoTAMAK-core
# **IoTAMAK : a framework for distributed MAS**
- [IoTAMAK-core](#iotamak-core)
- [Reseau](#reseau)
- [Random note](#random-note)
- [Agent Cycle](#agent-cycle)
- [Shared work](#shared-work)
- [Wait function](#wait-function)
- [Experience format](#experience-format)
**Name :** Sebastien GOYON
**Group :** M1 CSA 2021-2022, UT3 Paul Sabatier
**Internship tutor :** Guilhem MARCILLAUD, IRIT SMAC team.
# Reseau
- [**IoTAMAK : a framework for distributed MAS**](#iotamak--a-framework-for-distributed-mas)
- [**Definition**](#definition)
- [**Goal**](#goal)
- [**Subject**](#subject)
- [**Context**](#context)
- [**Constraint**](#constraint)
- [**Solution**](#solution)
- [**Project managment**](#project-managment)
- [**Development**](#development)
- [**IoTAMAK core**](#iotamak-core)
- [**Code structure** :](#code-structure-)
- [**MAS behavious**](#mas-behavious)
- [**Version**](#version)
- [**IoTAMAK UI**](#iotamak-ui)
- [**Network**](#network)
- [**Raspberry network**](#raspberry-network)
- [**Server network**](#server-network)
- [**Bibliography**](#bibliography)
Port :
* Mqtt : 1883
* SSH : 22
**Mqtt:**
* Broker : Mosquitto (Windows)
* Client : python paho-mqtt
## **Definition**
MAS (multi-agent system):
> A multi-agent system (MAS or "self-organized system") is a computerized system composed of multiple interacting intelligent agents.
> Multi-agent systems can solve problems that are difficult or impossible for an individual agent or a monolithic system to solve.
> Intelligence may include methodic, functional, procedural approaches, algorithmic search or reinforcement learning.
**Configuration windows :**
WSL (pour pouvoir démarer l'amas)
## **Goal**
### **Subject**
# Random note
> The goal of this project is to produce a wab-app to supervise remotly experiments distributed on multiple device for researcher in MAS of the IRIT. The application need to give in real time information to the user of the state of each devise / agent (activity, result..). The general architecture of the system is in 3 part :
> 1. Some devises that can work (Raspberry pi 3b)
> 2. A control device collecting information (server)
> 3. An application that can show the server information according to the user need.
>
> This project will be focused on the devedloppement of a system that can distribute a SMA defnie by AMAK on multiple raspberry PI. Once distributed, the server can start and stop the experiment. The 2nd goal of this project is to build a network architecure between those raspberry so each Agent can communicate between them and with the manager.
ihm :
1. Loading (check if good format) -> Warning -> can't "start"
2. Ping (all in config) -> Warning -> can't start, can't agent
3. Agent -> Warning -> already running -> kill ?
4. Whatever (start)
### **Context**
Last year I participated in the conception and development of Py-AMAK (Python) during my internship, an extention of AMAK (Java) a framework that helps developper to produce MAS system and visualize the result.
Start procédure :
1. start broker
2. start scheduler
3. Start amas/env
This year the goal of the intership is to developp IoTAMAK, that will extend the application of Py-AMAK to use network and multiple device.
```mermaid
flowchart TD
subgraph server
amas
scheduler
end
client
amas <-->|mqtt| client
scheduler <-->|mqtt| client
```
```mermaid
flowchart TD
server -->|create n agent| remote_servers
```
Agent neighboor:
add neighboor :
-> subscribe to his metrics
remove neighboor :
-> unsuscribe to his metrics
on_perceive -> update neighbors
then wait
then send metrics
then phase 2
# Agent Cycle
```mermaid
flowchart TD
on_cycle_begin
subgraph on_perceive
find_neighbor
subscribe_neighbor_metric
find_neighbor --> subscribe_neighbor_metric
end
on_cycle_begin --> on_perceive
subgraph twophase
wait
end
subgraph onephasee
nothing
end
on_perceive -.-> twophase
on_perceive -.-> onephasee
twophase -.-> on_decide
onephasee -.-> on_decide
on_cycle_en[on_cycle_end]
on_decide --> on_act
on_act --> publish_metric
publish_metric --> on_cycle_en
```
`find_neighbor` :
> pas d'info de base ? Comment trouver les voisins
Solution :
* Tout le monde accepte toute les metrics mais ne regarde que ces voisins
* L'amas sait tout en distribue les données
* ?
Notable diference between IoTAMAK and Py-AMAK on the interface :
* The structure need to be done in a more pythonic way (no getter/setter, no public/private attribute)
* The UI need to be a web app powered by a server
### **Constraint**
Amas sait tout, il s'abonne aux metrics, calcules les voisins, et les envois aux agents.
Les agents stock les données et les lisent dans on_perceive
**Language :** Python
# Shared work
The system need to follow MVC design patern, this mean that the core could work on his own, and a UI is not require.
1. Python multiprocessing
2. Envoie du fichier par ssh
3. execution
## **Solution**
Probleme :
* 1 session ssh par agent
* Souci de dépendance
* Pas élégant
The project will be splited into 2 parts :
* IoTAMAK-core : a python module (that can be easily installed with pip) that provide all the basic method required to build a MAS experiment
* IoTAMAK UI : a django based server that can interact with any experiment developped with IoTAMAK-core.
Utiliser fabrique pour partager les fichier
## **Project managment**
Communication : pexpect -> linux only (wsl work)
Meeting : weekly
# Wait function
Tool :
* Gitlab
* Trello
* Discord
```py
while not condition:
pass
```
15 agents -> 43 sec/cycle
## **Development**
### **IoTAMAK core**
```py
while not condition:
sleep(0.01)
```
15 agents -> 0.09 sec/cycle
The classes that the developper will interact with (Agent / Amas / Environment) need to be very similar to the Py-AMAK one, this is why the class looks very similar.
Solution a trouvé !
#### **Code structure** :
# Experience format
```
/experienceNameFolder/
/agent.py
/amas.py
/env.py
/...
```
Les noms amas, agent et env sont obligatoire et ne peuvent pas etre modifier
Ils doivent contenir un code de la sorte :
```py
if __name__ == '__main__':
a = MyAgent(int(sys.argv[1]), ...) #MyAmas() MyEnv()
a.run()
```
Pour ajouter des attributs a l'initialisation aux agents il suffit de rajouter des `type(sys.argv[x])`.
**A AJOUTER**
un .config dans les experience avec la version du coeur
# diagrame de classe
In AMAK a SMA is composed of 4 main strucure :
* Scheduler : it's role is to make sure that everything is working in the right order, it's also play a huge role with the interface between the UI and the experiment, being able to pause it, or closing everyting.
* Agent : an agent can have various comportment that need to be define
* Amas : it's a superstructure, above the agents that provide conviniant way to add, remove or make communication between agents
* Environment : a place where the agent live.
```mermaid
classDiagram
......@@ -245,4 +156,267 @@ classDiagram
Schedulable <|-- Env
Schedulable <|-- Amas
SSHClient <|-- Amas
```
\ No newline at end of file
```
#### **MAS behavious**
```mermaid
graph TD
subgraph first_part:
amas_metrics(Amas : publish all stored metrics to database)
amas_begin(Amas : on_cycle_begin)
amas_not(Amas : notify scheduler)
env_begin(Env : on_cycle_begin)
env_not(Env : notify scheduler)
amas_metrics --> amas_begin
amas_begin --> amas_not
env_begin --> env_not
end
subgraph main_part:
agent1_begin(Agent 1 : on_cycle_begin)
agent1_perceive(Agent 1 : on_perceive)
agent1_decide(Agent 1 : on_decide)
agent1_act(Agent 1 : on_act)
agent1_finish(Agent 1 : on_cycle_end)
agent1_metric(Agent 1 : send metrics)
agent1_sche(Agent 1 : notify scheduler)
agent1_begin --> agent1_perceive
agent1_perceive --> agent1_decide
agent1_decide --> agent1_act
agent1_act --> agent1_finish
agent1_finish --> agent1_metric
agent1_metric --> agent1_sche
agent_mid(...)
agentn_begin(Agent n : on_cycle_begin)
agentn_perceive(Agent n : on_perceive)
agentn_decide(Agent n : on_decide)
agentn_act(Agent n : on_act)
agentn_finish(Agent n : on_cycle_end)
agentn_metric(Agent n : send metrics)
agentn_sche(Agent n : notify scheduler)
agentn_begin --> agentn_perceive
agentn_perceive --> agentn_decide
agentn_decide --> agentn_act
agentn_act --> agentn_finish
agentn_finish --> agentn_metric
agentn_metric --> agentn_sche
end
subgraph last_part:
amas_finish(Amas : on_cycle_end)
amas_noti(Amas : notify scheduler)
env_finish(Env : on_cycle_end)
env_noti(Env : notify scheduler)
amas_finish --> amas_noti
env_finish --> env_noti
end
scheduler_not_first(Scheduler : Notify Amas/env)
scheduler_wait_first(Scheduler : Wait Amas/env)
scheduler_not_main(Scheduler : Notify all Agents)
scheduler_wait_main(Scheduler : Wait all agents)
scheduler_not_last(Scheduler : Notify Amas/env)
scheduler_wait_last(Scheduler : Wait Amas/env)
scheduler_not_first --> amas_metrics
scheduler_not_first --> env_begin
amas_not --> scheduler_wait_first
env_not --> scheduler_wait_first
scheduler_wait_first --> scheduler_not_main
scheduler_not_main --> agent1_begin
scheduler_not_main --> agent_mid
scheduler_not_main --> agentn_begin
agent1_sche --> scheduler_wait_main
agent_mid --> scheduler_wait_main
agentn_sche --> scheduler_wait_main
scheduler_wait_main --> scheduler_not_last
scheduler_not_last --> amas_finish
scheduler_not_last --> env_finish
env_noti --> scheduler_wait_last
amas_noti --> scheduler_wait_last
```
#### **Version**
**0.0.1 :**
* Basic implementation of the system
**0.0.2 :**
Clean code :
* Provide way better interface for the developper
Amas :
* Optimize metric publish to the database
SSH Client :
* will try to connect once again if it have failed
**0.0.3 :**
Amas :
* Add : method agent_neighbour that publish the metric of agent2 to agent 1
Schedulable & Scheduler :
* Greatly improve the wait model, now using threading semaphore
**0.0.4**
Feature :
* Amas, Env and Agent now take only 1 argument to simplify arg managment for the user. (Not compatible with 0.0.3)
* It's now possible to seed the experiment.
* Scheduler will print exec time
Requirements.txt / Setup.py :
* add requirement version
Known bug :
* Scheduler : auto mode pause seem to lock the scheduler in a state where it's not possible to interact anymore with it.
### **IoTAMAK UI**
The Web application will be powered by a django server.
## **Network**
In order for the system to work a network architecture need to be conceptualize.
The communication between agents need to be MQTT.
There are 3 actore in this system : a server, some user and somre device(raspberry) that execute agents.
```mermaid
graph LR
raspberry_1
raspberry_n
client((User))
git[(Gitlab : iotamak-core)]
subgraph server
django
broker
end
broker-- MQTT 1883---raspberry_1
django-- SSH/SFTP: 22---raspberry_1
django-- ping---raspberry_1
broker-- MQTT 1883---raspberry_n
django-- SSH/SFTP: 22---raspberry_n
django-- ping---raspberry_n
django-- Git : pull --- git
raspberry_n-- Git : pull ---git
raspberry_1-- Git : pull ---git
client --> django
```
### **Raspberry network**
From the point of view of a raspberry the network look like this :
```mermaid
graph LR
subgraph raspberry
direction TB
os(OS)
client_1
client_2
client_n
os-- nohup/kill---client_1
os-- nohup/kill---client_2
os-- nohup/kill---client_n
end
server((Server))
git((Gitlab : iotamak-core))
server-- MQTT : 1883---client_1
server-- MQTT : 1883---client_2
server-- MQTT : 1883---client_n
server-- ping--- os
server-- SSH/SFTP: 22--- os
git-- Git : pull--- os
```
We can see that beside the agent communicate not only with MQTT to the server but also with ssh and sftp in order to share the experiment and control the experiment in case of abnormal beharvior(kill agents for example, or see the error stack).
### **Server network**
```mermaid
graph LR
subgraph Server
direction TB
subgraph DjangoApp
ihm[Mqtt : IHM]
django[Django server]
end
subgraph Experiment
amas[Amas]
env[Environment]
scheduler[Scheduler]
end
cache((Cache server : Redis))
broker((Broker : Mosquitto))
database[(Database : PostgreSQL)]
django-- Port : 6379 ---cache
django-- popen / kill ---amas
django-- popen / kill ---env
django-- popen / kill ---scheduler
django-- Port : 5432 ---database
amas-- MQTT : 1883---broker
env-- MQTT : 1883---broker
scheduler-- MQTT : 1883---broker
ihm-- MQTT : 1883---broker
end
raspberry
git((Gitlab : iotamak-core))
User
django-- SSH/SFTP: 22 / ping ---raspberry
django-- Git : pull ---git
amas-- SSH: 22 ---raspberry
broker-- MQTT : 1883---raspberry
User --> django
```
The server is composed of multiple componant :
* A MQTT broker (Mosquitto is used here) : to hold the communication between the agent, amas, scheduler, and env.
* A Django application composed of the main application and a MQTT client to ineteract with the experiment
* An experiment, composed of a Scheduler, an environment and an amas.
* A database : since the base Django database (Sqlite3) didn't meet the requirement for the system and external database is require, PoqstgreSQL.
* A cache server Redis : in order to have real time graph and canvas, a cahe server was needed.
## **Bibliography**
1. Smac : https://www.irit.fr/en/departement/dep-interaction-collective-intelligence/smac-team/
2. Irit : https://www.irit.fr/en/home/
3. MAS definition : https://en.wikipedia.org/wiki/Multi-agent_system
File added
......@@ -37,14 +37,14 @@ class Amas(Schedulable, SSHClient):
SSHClient.__init__(self, true_client)
self.seed = seed
self.seed: int = seed
self.broker_ip: str = broker_ip
self.subscribe("scheduler/schedulable/wakeup", self.wake_up)
self.next_id: int = 0
self.agents_cmd: List[Cmd] = []
self.agents_cmd: List[List[Cmd]] = [[] for _ in range(len(self.clients))]
self.on_initialization()
self.on_initial_agents_creation()
......@@ -58,10 +58,20 @@ class Amas(Schedulable, SSHClient):
"""
pass
def add_agent(self, experience_name: str, args: List = None) -> None:
def add_agent(
self,
experience_name: str,
client_ip: str = None,
agent_name: str = "agent.py",
args: List = None
) -> None:
"""
Function that need to be called to create a new agent
:param experience_name: name of the experience folder
:param client_ip: if the agent should be created in a specific device, you can specify an ip address,
otherwise the Amas will try to share the work between the devices
:param agent_name: if using multiple kind of agent, you can specify the relative path in the experiment
directory to the agent file to use
:param args: if any argument is needed to initiate the new agent
:return: None
"""
......@@ -71,13 +81,28 @@ class Amas(Schedulable, SSHClient):
arg_dict = {"broker_ip": str(self.broker_ip), "seed": self.seed, "identifier": self.next_id}
command = "nohup python "
command += "\'Desktop/mqtt_goyon/example/" + experience_name + "/agent.py\' \'"
command += "\'Desktop/mqtt_goyon/example/" + experience_name + "/"+agent_name+"\' \'"
command += json.dumps(arg_dict) + "\' "
for arg in args:
command += str(arg) + " "
command += "&"
self.agents_cmd.append(Cmd(command))
if client_ip is None:
# find the most suitable pi
i_min = 0
for elem in range(len(self.clients)):
if len(self.agents_cmd[i_min]) > len(self.agents_cmd[elem]):
i_min = elem
self.agents_cmd[i_min].append(Cmd(command))
else:
have_found = False
for i_client in range(len(self.clients)):
if self.clients[i_client].hostname == i_client:
self.agents_cmd[i_client].append(Cmd(command))
have_found = True
break
if not have_found:
self.agents_cmd[0].append(Cmd(command))
self.subscribe("agent/" + str(self.next_id) + "/metric", self.agent_metric)
self.subscribe("agent/" + str(self.next_id) + "/log", self.agent_log)
......@@ -89,13 +114,8 @@ class Amas(Schedulable, SSHClient):
"""
Method used to start new agent trough ssh
"""
total_pi = len(self.clients)
for client in range(total_pi):
self.run_cmd(client, list(self.agents_cmd[
client * len(self.agents_cmd) // total_pi:(client + 1) * len(
self.agents_cmd) // total_pi
])
)
for i_client in range(len(self.clients)):
self.run_cmd(i_client, self.agents_cmd[i_client])
print("Amas, push agent done")
def agent_neighbour(self, id_agent1: int, id_agent2: int) -> None:
......
import json
import pathlib
import sys
from typing import List, Any
sys.path.insert(0, str(pathlib.Path(__file__).parent))
from iotAmak.agent import Agent
from iotAmak.tool.mail import Mail
class CommunicatingAgent(Agent):
"""
Agent class that can communicate
"""
def __init__(self, arguments: str) -> None:
self.mailbox: List[Mail] = []
Agent.__init__(self, arguments)
self.subscribe("mail", self.receive_mail)
def receive_mail(self, client, userdata, message) -> None:
"""
Called when the agent receive a new message
"""
raw = json.loads(message)
self.mailbox.append(Mail(raw.get("id"), raw.get("cycle"), raw.get("payload")))
def send_mail(self, agent_id: int, payload: Any) -> None:
"""
Send a mail to agent id
:param agent_id: id of the receiver
:param payload: anything, must be serializable with json.dumps().
"""
new_mail = {"id": self.id, "cycle": self.nbr_cycle, "payload": payload}
self.publish("agent/" + str(agent_id) + "/mail", json.dumps(new_mail))
......@@ -41,7 +41,6 @@ class Scheduler(Schedulable):
Function called when the IHM pause the scheduler
"""
self.paused = True
self.ihm_semaphore.acquire()
def unpause(self, client, userdata, message) -> None:
"""
......
from typing import Any
class Mail:
def __init__(self, sender_id: int, date: int, payload: Any) -> None:
self.sender_id: int = sender_id
self.date: int = date
self.payload: Any = payload
......@@ -3,7 +3,7 @@ from setuptools import setup, find_packages
setup(
name='iotAmak',
packages=find_packages(),
version='0.0.4',
version='0.0.5',
description='AmakFramework in python',
author='SMAC - GOYON Sebastien',
install_requires=[
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment