Skip to content
GitLab
Explore
Sign in
Primary navigation
Search or go to…
Project
I
iotamak-core
Manage
Activity
Members
Labels
Plan
Issues
Issue boards
Milestones
Wiki
Code
Merge requests
Repository
Branches
Commits
Tags
Repository graph
Compare revisions
Snippets
Build
Pipelines
Jobs
Pipeline schedules
Artifacts
Deploy
Releases
Package registry
Model registry
Operate
Environments
Terraform modules
Monitor
Incidents
Analyze
Value stream analytics
Contributor analytics
CI/CD analytics
Repository analytics
Model experiments
Help
Help
Support
GitLab documentation
Compare GitLab plans
Community forum
Contribute to GitLab
Provide feedback
Keyboard shortcuts
?
Snippets
Groups
Projects
Show more breadcrumbs
SMAC
AMAK
Python
IoTAMAK
iotamak-core
Commits
36fd1a0c
Commit
36fd1a0c
authored
3 years ago
by
shinedday
Browse files
Options
Downloads
Patches
Plain Diff
Add comment for Amas/scheduler/Env files
parent
cd0ecd2d
No related branches found
No related tags found
No related merge requests found
Changes
4
Hide whitespace changes
Inline
Side-by-side
Showing
4 changed files
agent.py
+4
-3
4 additions, 3 deletions
agent.py
amas.py
+32
-11
32 additions, 11 deletions
amas.py
environment.py
+4
-2
4 additions, 2 deletions
environment.py
scheduler.py
+45
-14
45 additions, 14 deletions
scheduler.py
with
85 additions
and
30 deletions
agent.py
+
4
−
3
View file @
36fd1a0c
...
...
@@ -10,13 +10,13 @@ class Agent(Schedulable):
def
__init__
(
self
,
identifier
:
int
)
->
None
:
self
.
id
=
identifier
Schedulable
.
__init__
(
self
,
client_id
=
"
Agent
"
+
str
(
self
.
id
))
Schedulable
.
__init__
(
self
,
client_id
=
"
Agent
"
+
str
(
self
.
id
))
self
.
subscribe
(
"
scheduler/agent/wakeup
"
,
self
.
wake_up
)
self
.
neighbors
=
[]
self
.
next_neighbors
=
[]
self
.
subscribe
(
"
amas/agent/
"
+
str
(
self
.
id
)
+
"
/neighbor
"
,
self
.
add_neighbor
)
self
.
subscribe
(
"
amas/agent/
"
+
str
(
self
.
id
)
+
"
/neighbor
"
,
self
.
add_neighbor
)
self
.
on_initialization
()
...
...
@@ -82,8 +82,9 @@ class Agent(Schedulable):
self
.
publish
(
"
metric
"
,
str
(
self
.
send_metric
()))
self
.
publish
(
"
cycle_done
"
,
""
)
if
__name__
==
'
__main__
'
:
print
(
"
id agent:
"
,
int
(
sys
.
argv
[
1
]))
a
=
Agent
(
int
(
sys
.
argv
[
1
]))
a
.
run
()
\ No newline at end of file
a
.
run
()
This diff is collapsed.
Click to expand it.
amas.py
+
32
−
11
View file @
36fd1a0c
...
...
@@ -2,6 +2,7 @@
Amas class
"""
from
ast
import
literal_eval
from
typing
import
List
from
tool.schedulable
import
Schedulable
from
tool.ssh_client
import
SSHClient
,
Cmd
...
...
@@ -12,7 +13,7 @@ class Amas(Schedulable, SSHClient):
Amas class
"""
def
__init__
(
self
):
def
__init__
(
self
)
->
None
:
Schedulable
.
__init__
(
self
,
client_id
=
"
Amas
"
)
SSHClient
.
__init__
(
self
)
...
...
@@ -29,10 +30,20 @@ class Amas(Schedulable, SSHClient):
self
.
client
.
publish
(
"
amas/action_done
"
,
""
)
def
on_initial_agents_creation
(
self
):
"""
Convenient method to initially create the agents, is called at the end of initialization
"""
pass
def
add_agent
(
self
,
experience_name
,
args
):
def
add_agent
(
self
,
experience_name
:
str
,
args
:
List
=
None
)
->
None
:
"""
Function that need to be called to create a new agent
:param experience_name: name of the experience folder
:param args: if any argument is needed to initiate the new agent
:return: None
"""
if
args
is
None
:
args
=
[]
command
=
"
nohup python
"
command
+=
"
\"
Desktop/mqtt_goyon/iotamak-core/
"
+
experience_name
+
"
/agent.py
\"
"
command
+=
str
(
self
.
next_id
)
+
"
"
...
...
@@ -48,20 +59,28 @@ class Amas(Schedulable, SSHClient):
self
.
client
.
publish
(
"
amas/agent/new
"
,
self
.
next_id
)
self
.
next_id
+=
1
def
push_agent
(
self
):
def
push_agent
(
self
)
->
None
:
"""
Method used to start new agent trough ssh
"""
total_pi
=
len
(
self
.
clients
)
for
client
in
range
(
total_pi
):
self
.
run_cmd
(
client
,
list
(
self
.
agents_cmd
[
client
*
len
(
self
.
agents_cmd
)
//
total_pi
:(
client
+
1
)
*
len
(
self
.
agents_cmd
)
//
total_pi
])
client
*
len
(
self
.
agents_cmd
)
//
total_pi
:(
client
+
1
)
*
len
(
self
.
agents_cmd
)
//
total_pi
])
)
def
agent_log
(
self
,
client
,
userdata
,
message
):
def
agent_log
(
self
,
client
,
userdata
,
message
)
->
None
:
"""
Called when the amas receive a log from any agent, print it in stdout
"""
print
(
"
[Log]
"
+
str
(
message
.
payload
.
decode
(
"
utf-8
"
))
+
"
on topic
"
+
message
.
topic
)
def
agent_metric
(
self
,
client
,
userdata
,
message
):
def
agent_metric
(
self
,
client
,
userdata
,
message
)
->
None
:
"""
Called when the amas receive new metrics from any agent
"""
# print("Received message ", literal_eval(message.payload.decode("utf-8")), " on topic '" + message.topic)
result
=
literal_eval
(
message
.
payload
.
decode
(
"
utf-8
"
))
agent_id
=
result
.
get
(
"
id
"
)
...
...
@@ -86,7 +105,9 @@ class Amas(Schedulable, SSHClient):
pass
def
run
(
self
)
->
None
:
"""
Main function of the amas class
"""
self
.
push_agent
()
while
not
self
.
exit_bool
:
...
...
This diff is collapsed.
Click to expand it.
environment.py
+
4
−
2
View file @
36fd1a0c
...
...
@@ -10,7 +10,7 @@ class Environment(Schedulable):
Environment class
"""
def
__init__
(
self
):
def
__init__
(
self
)
->
None
:
Schedulable
.
__init__
(
self
,
client_id
=
"
Env
"
)
self
.
subscribe
(
"
scheduler/schedulable/wakeup
"
,
self
.
wake_up
)
...
...
@@ -38,7 +38,9 @@ class Environment(Schedulable):
pass
def
run
(
self
)
->
None
:
"""
Main function of the env
"""
while
not
self
.
exit_bool
:
self
.
wait
()
if
self
.
exit_bool
:
...
...
This diff is collapsed.
Click to expand it.
scheduler.py
+
45
−
14
View file @
36fd1a0c
"""
Scheduler class file
"""
import
sys
from
time
import
sleep
...
...
@@ -5,8 +8,11 @@ from tool.schedulable import Schedulable
class
Scheduler
(
Schedulable
):
"""
Scheduler class, it
'
s role is to make sure the amas, the env and all the agents stay in sync
"""
def
__init__
(
self
,
execution_policy
:
int
):
def
__init__
(
self
,
execution_policy
:
int
)
->
None
:
Schedulable
.
__init__
(
self
,
client_id
=
"
Scheduler
"
)
self
.
sleep_between_cycle
=
0
...
...
@@ -28,44 +34,69 @@ class Scheduler(Schedulable):
print
(
"
Init done
"
)
def
pause
(
self
,
client
,
userdata
,
message
):
def
pause
(
self
,
client
,
userdata
,
message
)
->
None
:
"""
Function called when the IHM pause/unpause the scheduler
"""
self
.
paused
=
not
self
.
paused
self
.
ihm_token
+=
1
def
step
(
self
,
client
,
userdata
,
message
):
def
step
(
self
,
client
,
userdata
,
message
)
->
None
:
"""
Function called by the IHM when the scheduler is in Step by Step mode
"""
self
.
ihm_token
+=
1
def
update_schedulable
(
self
,
client
,
userdata
,
message
):
def
update_schedulable
(
self
,
client
,
userdata
,
message
)
->
None
:
"""
Function called whenever the amas/env have finished an action and is waiting
"""
self
.
schedulable_waiting
+=
1
print
(
"
__Schedulable is waiting
"
)
def
update_nbr_agent
(
self
,
client
,
userdata
,
message
):
def
update_nbr_agent
(
self
,
client
,
userdata
,
message
)
->
None
:
"""
Called when a new agent is added to the system
param message: id of the new agent
"""
self
.
nbr_agent
+=
1
self
.
subscribe
(
"
agent/
"
+
str
(
message
.
payload
.
decode
(
"
utf-8
"
))
+
"
/cycle_done
"
,
self
.
agent_done
)
print
(
"
__Update agent :
"
,
self
.
nbr_agent
,
str
(
message
.
payload
.
decode
(
"
utf-8
"
)))
print
(
"
__Update agent :
"
,
self
.
nbr_agent
,
str
(
message
.
payload
.
decode
(
"
utf-8
"
)))
def
agent_done
(
self
,
client
,
userdata
,
message
):
def
agent_done
(
self
,
client
,
userdata
,
message
)
->
None
:
"""
Called whenever an agent have done an action and is waiting
"""
self
.
agent_waiting
+=
1
print
(
"
__Agent done
"
)
def
wait_agent
(
self
):
def
wait_agent
(
self
)
->
None
:
"""
Called when the scheduler is waiting for all agent do have finished their action
"""
while
self
.
agent_waiting
<
self
.
nbr_agent
:
sleep
(
self
.
wait_delay
)
self
.
agent_waiting
=
0
def
wait_schedulable
(
self
):
def
wait_schedulable
(
self
)
->
None
:
"""
Called when the scheduler is waiting for both amas and env to have finished their action
"""
while
self
.
schedulable_waiting
<
2
:
sleep
(
self
.
wait_delay
)
self
.
schedulable_waiting
=
0
def
wait_ihm
(
self
):
def
wait_ihm
(
self
)
->
None
:
"""
Called when the scheduler is waiting for an action of the IHM
"""
while
self
.
ihm_token
==
0
:
sleep
(
self
.
wait_delay
)
self
.
ihm_token
-=
1
def
first_part
(
self
)
->
None
:
"""
first part of a cycle
first part of a cycle
: Amas/Env on_cycle_begin
"""
self
.
client
.
publish
(
"
scheduler/schedulable/wakeup
"
,
""
)
# Amas on cycle begin
...
...
@@ -74,7 +105,7 @@ class Scheduler(Schedulable):
def
main_part
(
self
)
->
None
:
"""
main part of a cycle
main part of a cycle
: Agent cycle
"""
self
.
client
.
publish
(
"
scheduler/agent/wakeup
"
,
""
)
# Agent doing phase 1
...
...
@@ -85,7 +116,7 @@ class Scheduler(Schedulable):
def
last_part
(
self
)
->
None
:
"""
last part of a cycle
last part of a cycle
: Amas/Env on_cycle_end
"""
self
.
client
.
publish
(
"
scheduler/schedulable/wakeup
"
,
""
)
# Amas on cycle end
...
...
@@ -94,7 +125,7 @@ class Scheduler(Schedulable):
def
run
(
self
)
->
None
:
"""
m
ain
part of amak core
M
ain
function of the scheduler
"""
# wait that all schedulable have init
...
...
This diff is collapsed.
Click to expand it.
Preview
0%
Loading
Try again
or
attach a new file
.
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Save comment
Cancel
Please
register
or
sign in
to comment