Skip to content
Snippets Groups Projects
Commit ded541a8 authored by shinedday's avatar shinedday
Browse files

Version : IoTAMAK-0.0.6

MQTT client : now need to specify broker username and password.

SSh client / Amas : now need the path for the iotamak folder

Scheduler / Amas /env / Agent init args changed.

Communicating agent :
 * fix a lot of bug related to sending/receiving mails
 * improve interface to manage the mailbox
parent e0be72b1
Branches
No related tags found
No related merge requests found
......@@ -25,47 +25,53 @@
- [**Bibliography**](#bibliography)
## **Definition**
## **Introduction**
### **Definition**
MAS (multi-agent system):
> A multi-agent system (MAS or "self-organized system") is a computerized system composed of multiple interacting intelligent agents.
> Multi-agent systems can solve problems that are difficult or impossible for an individual agent or a monolithic system to solve.
> Intelligence may include methodic, functional, procedural approaches, algorithmic search or reinforcement learning.
## **Goal**
### **Subject**
### **Context**
> The goal of this project is to produce a wab-app to supervise remotly experiments distributed on multiple device for researcher in MAS of the IRIT. The application need to give in real time information to the user of the state of each devise / agent (activity, result..). The general architecture of the system is in 3 part :
> 1. Some devises that can work (Raspberry pi 3b)
> 2. A control device collecting information (server)
> 3. An application that can show the server information according to the user need.
>
> This project will be focused on the devedloppement of a system that can distribute a SMA defnie by AMAK on multiple raspberry PI. Once distributed, the server can start and stop the experiment. The 2nd goal of this project is to build a network architecure between those raspberry so each Agent can communicate between them and with the manager.
The SMAC team in Irit developed a tool called Amak that help scientist to develop MAS system in Java.
The wished to extend the tool in multiple programing language,
last year I participated in the conception and development of Py-AMAK (Python) during my internship,
an extension of AMAK (Java). Other version of AMAK exist in C# and C++.
### **Context**
This year the goal of the internship is to develop IoTAMAK, that will extend the application of Py-AMAK to use network and multiple device.
Last year I participated in the conception and development of Py-AMAK (Python) during my internship, an extention of AMAK (Java) a framework that helps developper to produce MAS system and visualize the result.
### **Problem and objective**
This year the goal of the intership is to developp IoTAMAK, that will extend the application of Py-AMAK to use network and multiple device.
So far all Amak system can only be used to simulate MAS system, but they canno't be use with an agent that would exist in our world.
IoTAMAK goal to connect any devices to the system as agent(s) and use the network to communicate.
Notable diference between IoTAMAK and Py-AMAK on the interface :
Notable difference between IoTAMAK and Py-AMAK on the interface :
* The structure need to be done in a more pythonic way (no getter/setter, no public/private attribute)
* The UI need to be a web app powered by a server
### **Constraint**
**Language :** Python
> The goal of this project is to produce a wab-app to supervise remotely experiments distributed on multiple device for researcher in MAS of the IRIT. The application need to give in real time information to the user of the state of each devise / agent (activity, result..). The general architecture of the system is in 3 part :
> 1. Some devises that can work (Raspberry pi 3b)
> 2. A control device collecting information (server)
> 3. An application that can show the server information according to the user need.
>
> This project will be focused on the development of a system that can distribute a SMA define by AMAK on multiple raspberry PI. Once distributed, the server can start and stop the experiment. The 2nd goal of this project is to build a network architecure between those raspberry so each Agent can communicate between them and with the manager.
The system need to follow MVC design patern, this mean that the core could work on his own, and a UI is not require.
Constraint :
* Communication between agent : MQTT
* Language : Python
* Follow the MVC patern to separate the UI with the core system, this mean that the core could work on his own, and a UI is not require.
## **Solution**
### **Solution**
The project will be splited into 2 parts :
The project will be split into 2 parts :
* IoTAMAK-core : a python module (that can be easily installed with pip) that provide all the basic method required to build a MAS experiment
* IoTAMAK UI : a django based server that can interact with any experiment developped with IoTAMAK-core.
* IoTAMAK UI : a django based server that can interact with any experiment developed with IoTAMAK-core.
## **Project managment**
### **Project management**
Meeting : weekly
......@@ -78,14 +84,14 @@ Tool :
### **IoTAMAK core**
The classes that the developper will interact with (Agent / Amas / Environment) need to be very similar to the Py-AMAK one, this is why the class looks very similar.
The classes that the developer will interact with (Agent / Amas / Environment) need to be very similar to the Py-AMAK one, this is why the class looks very similar.
#### **Code structure** :
In AMAK a SMA is composed of 4 main strucure :
* Scheduler : it's role is to make sure that everything is working in the right order, it's also play a huge role with the interface between the UI and the experiment, being able to pause it, or closing everyting.
In AMAK a SMA is composed of 4 main structure :
* Scheduler : it's a role is to make sure that everything is working in the right order, it's also play a huge role with the interface between the UI and the experiment, being able to pause it, or closing everyting.
* Agent : an agent can have various comportment that need to be define
* Amas : it's a superstructure, above the agents that provide conviniant way to add, remove or make communication between agents
* Amas : it's a superstructure, above the agents that provide convenient way to add, remove or make communication between agents
* Environment : a place where the agent live.
```mermaid
......
File added
......@@ -21,19 +21,19 @@ class Agent(Schedulable):
def __init__(self, arguments: str) -> None:
arguments = json.loads(arguments)
arguments: Dict = json.loads(arguments)
broker_ip: str = arguments.get("broker_ip")
identifier: int = int(arguments.get("identifier"))
seed: int = int(arguments.get("seed"))
broker_username: str = str(arguments.get("broker_username"))
broker_password: str = str(arguments.get("broker_password"))
self.id: int = identifier
random.seed(seed + 10 + self.id)
print(random.random())
Schedulable.__init__(self, broker_ip, "Agent" + str(self.id))
Schedulable.__init__(self, broker_ip, "Agent" + str(self.id), broker_username, broker_password)
self.subscribe("scheduler/agent/wakeup", self.wake_up)
......@@ -58,8 +58,7 @@ class Agent(Schedulable):
put the metric in a list that will be rad during on_perceive
param message: metric (dict) of the neighbor
"""
print(message.payload.decode("utf-8"))
result = literal_eval(message.payload.decode("utf-8"))
result: Dict = literal_eval(message.payload.decode("utf-8"))
self.next_neighbors.append(result)
def log(self, message: str) -> None:
......
......@@ -25,21 +25,21 @@ class Amas(Schedulable, SSHClient):
arguments = json.loads(arguments)
broker_ip: str = arguments.get("broker_ip")
self.broker_ip: str = arguments.get("broker_ip")
clients: str = arguments.get("clients")
seed: int = int(arguments.get("seed"))
self.seed: int = int(arguments.get("seed"))
self.broker_username: str = str(arguments.get("broker_username"))
self.broker_password: str = str(arguments.get("broker_password"))
iot_path: str = str(arguments.get("iot_path"))
random.seed(seed)
random.seed(self.seed)
Schedulable.__init__(self, broker_ip, "Amas")
Schedulable.__init__(self, self.broker_ip, "Amas", self.broker_username, self.broker_password)
true_client = [RemoteClient(i.get("hostname"), i.get("user"), i.get("password")) for i in literal_eval(clients)]
SSHClient.__init__(self, true_client)
SSHClient.__init__(self, true_client, iot_path)
self.seed: int = seed
self.broker_ip: str = broker_ip
self.subscribe("scheduler/schedulable/wakeup", self.wake_up)
self.next_id: int = 0
......@@ -78,10 +78,16 @@ class Amas(Schedulable, SSHClient):
if args is None:
args = []
arg_dict = {"broker_ip": str(self.broker_ip), "seed": self.seed, "identifier": self.next_id}
arg_dict = {
"broker_ip": str(self.broker_ip),
"seed": self.seed,
"identifier": self.next_id,
"broker_username": self.broker_username,
"broker_password": self.broker_password
}
command = "nohup python "
command += "\'Desktop/mqtt_goyon/example/" + experience_name + "/"+agent_name+"\' \'"
command += "\'" + self.iot_path + experience_name + "/"+agent_name+"\' \'"
command += json.dumps(arg_dict) + "\' "
for arg in args:
command += str(arg) + " "
......@@ -97,7 +103,7 @@ class Amas(Schedulable, SSHClient):
else:
have_found = False
for i_client in range(len(self.clients)):
if self.clients[i_client].hostname == i_client:
if self.clients[i_client].hostname == client_ip:
self.agents_cmd[i_client].append(Cmd(command))
have_found = True
break
......@@ -105,7 +111,6 @@ class Amas(Schedulable, SSHClient):
self.agents_cmd[0].append(Cmd(command))
self.subscribe("agent/" + str(self.next_id) + "/metric", self.agent_metric)
self.subscribe("agent/" + str(self.next_id) + "/log", self.agent_log)
self.client.publish("amas/agent/new", self.next_id)
self.next_id += 1
......@@ -115,6 +120,7 @@ class Amas(Schedulable, SSHClient):
Method used to start new agent trough ssh
"""
for i_client in range(len(self.clients)):
print(" Ip : ", self.clients[i_client].hostname)
self.run_cmd(i_client, self.agents_cmd[i_client])
print("Amas, push agent done")
......@@ -124,12 +130,6 @@ class Amas(Schedulable, SSHClient):
"""
self.publish("amas/agent/" + str(id_agent1) + "/neighbor", str(self.agents_metric[id_agent2]))
def agent_log(self, client, userdata, message) -> None:
"""
Called when the amas receive a log from any agent, print it in stdout
"""
print("[Log] " + str(message.payload.decode("utf-8")) + " on topic " + message.topic)
def agent_metric(self, client, userdata, message) -> None:
"""
Called when the amas receive new metrics from any agent
......
import json
import pathlib
import sys
from ast import literal_eval
from typing import List, Any
sys.path.insert(0, str(pathlib.Path(__file__).parent))
......@@ -17,13 +18,13 @@ class CommunicatingAgent(Agent):
def __init__(self, arguments: str) -> None:
self.mailbox: List[Mail] = []
Agent.__init__(self, arguments)
self.subscribe("mail", self.receive_mail)
self.subscribe("agent/" + str(self.id) + "/mail", self.receive_mail)
def receive_mail(self, client, userdata, message) -> None:
"""
Called when the agent receive a new message
"""
raw = json.loads(message)
raw = literal_eval(message.payload.decode("utf-8"))
self.mailbox.append(Mail(raw.get("id"), raw.get("cycle"), raw.get("payload")))
def send_mail(self, agent_id: int, payload: Any) -> None:
......@@ -34,4 +35,29 @@ class CommunicatingAgent(Agent):
"""
new_mail = {"id": self.id, "cycle": self.nbr_cycle, "payload": payload}
self.publish("agent/" + str(agent_id) + "/mail", json.dumps(new_mail))
self.client.publish("agent/" + str(agent_id) + "/mail", json.dumps(new_mail))
def next_mail(self) -> Mail:
"""
Convenient method to use to go through the mailbox
:return: the next mail, if no mail are in the mailbox, return None. Remove the mail from the mailbox
"""
if len(self.mailbox) == 0:
return None
return self.mailbox.pop(0)
def put_back(self, mail: Mail) -> None:
"""
Put the mail at the end of the mailbox
"""
self.mailbox.append(mail)
def put_front(self, mail: Mail) -> None:
"""
put the mail in the front of the mailbox
"""
self.mailbox = [mail] + self.mailbox
......@@ -22,10 +22,12 @@ class Environment(Schedulable):
broker_ip: str = arguments.get("broker_ip")
seed: int = int(arguments.get("seed"))
broker_username: str = str(arguments.get("broker_username"))
broker_password: str = str(arguments.get("broker_password"))
random.seed(seed + 1)
Schedulable.__init__(self, broker_ip, "Env")
Schedulable.__init__(self, broker_ip, "Env", broker_username, broker_password)
self.subscribe("scheduler/schedulable/wakeup", self.wake_up)
......
......@@ -17,12 +17,12 @@ class Scheduler(Schedulable):
Scheduler class, it's role is to make sure the amas, the env and all the agents stay in sync
"""
def __init__(self, broker_ip: str) -> None:
def __init__(self, broker_ip: str, broker_username: str, broker_password: str) -> None:
Schedulable.__init__(self, broker_ip, "Scheduler")
Schedulable.__init__(self, broker_ip, "Scheduler", broker_username, broker_password)
self.paused = True
self.nbr_agent = 0
self.nbr_agent: int = 0
self.subscribe("amas/agent/new", self.update_nbr_agent)
self.subscribe("amas/action_done", self.update_schedulable)
self.subscribe("env/action_done", self.update_schedulable)
......
......@@ -9,10 +9,10 @@ from iotAmak.tool.remote_client import RemoteClient
def read_ssh(path):
with open(path, 'r', encoding='utf-8') as f:
dict = json.load(f)
ssh_dict = json.load(f)
res = []
for client in dict.get("clients_ssh"):
for client in ssh_dict.get("clients_ssh"):
res.append(RemoteClient(
hostname=client.get("hostname"),
user=client.get("user"),
......
......@@ -2,6 +2,9 @@ from typing import Any
class Mail:
"""
Mail class used by the communicating agent
"""
def __init__(self, sender_id: int, date: int, payload: Any) -> None:
......
......@@ -11,9 +11,9 @@ class MqttClient:
Base class to any instance that need to interact with the broker
"""
def __init__(self, broker_ip: str, client_id: str):
def __init__(self, broker_ip: str, client_id: str, broker_username: str, broker_password: str):
self.client: Client = Client(client_id=client_id)
self.client.username_pw_set(username="goyon", password="mosquitto")
self.client.username_pw_set(username=broker_username, password=broker_password)
self.client.connect(host=broker_ip)
self.client.loop_start()
......
"""
Tool class that implement basic interaction that help to finish processes
"""
from time import sleep
import sys
import pathlib
......@@ -17,8 +16,8 @@ class Schedulable(MqttClient):
Base class for Agent/Amas/Env/scheduler
"""
def __init__(self, broker_ip: str, client_id: str):
MqttClient.__init__(self, broker_ip, client_id)
def __init__(self, broker_ip: str, client_id: str, broker_username: str, broker_password: str):
MqttClient.__init__(self, broker_ip, client_id, broker_username, broker_password)
self.exit_bool: bool = False
self.subscribe("ihm/exit", self.exit_procedure)
......
from typing import List
import paramiko
......@@ -23,8 +22,9 @@ class Cmd:
class SSHClient:
def __init__(self, clients: List[RemoteClient]):
def __init__(self, clients: List[RemoteClient], iot_path: str):
self.clients = clients
self.iot_path = iot_path
def run_cmd(self, client: int, cmd: list, repeat: bool = False) -> list[str]:
ret = []
......@@ -53,20 +53,22 @@ class SSHClient:
transport = paramiko.Transport((client.hostname, 22))
transport.connect(username=client.user, password=client.password)
sftp = MySFTPClient.from_transport(transport)
sftp.mkdir("Desktop/mqtt_goyon/example/" + experiment_name, ignore_existing=True)
sftp.mkdir(self.iot_path + experiment_name, ignore_existing=True)
sftp.put_dir(
path_to_experiment,
"Desktop/mqtt_goyon/example/" + experiment_name
self.iot_path + experiment_name
)
sftp.close()
class MySFTPClient(paramiko.SFTPClient):
def put_dir(self, source, target):
''' Uploads the contents of the source directory to the target path. The
target directory needs to exists. All subdirectories in source are
created under target.
'''
"""
Uploads the contents of the source directory to the target path. The
target directory needs to exists. All subdirectories in source are
created under target.
"""
for item in os.listdir(source):
if os.path.isfile(os.path.join(source, item)):
self.put(os.path.join(source, item), '%s/%s' % (target, item))
......@@ -80,7 +82,9 @@ class MySFTPClient(paramiko.SFTPClient):
print(os.path.join(source, item))
def mkdir(self, path, mode=511, ignore_existing=False):
''' Augments mkdir by adding an option to not fail if the folder exists '''
"""
Augments mkdir by adding an option to not fail if the folder exists
"""
try:
super(MySFTPClient, self).mkdir(path, mode)
except IOError:
......
......@@ -3,7 +3,7 @@ from setuptools import setup, find_packages
setup(
name='iotAmak',
packages=find_packages(),
version='0.0.5',
version='0.0.6',
description='AmakFramework in python',
author='SMAC - GOYON Sebastien',
install_requires=[
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment