diff --git a/pyAmakCore/classes/communicating_agent.py b/pyAmakCore/classes/communicating_agent.py index 72a643115a174065aad808ab72adbdbb56cc0954..e31b4ccce66e9a84a2856456e0f0e25c8558610e 100644 --- a/pyAmakCore/classes/communicating_agent.py +++ b/pyAmakCore/classes/communicating_agent.py @@ -5,10 +5,9 @@ from typing import Any, List import pathlib import sys -from pyAmakCore.exception.mailbox import ReceiverIsNotSelf, ReceiverDontExist - sys.path.insert(0, str(pathlib.Path(__file__).parent)) +from pyAmakCore.exception.mailbox import ReceiverIsNotSelf, ReceiverDontExist from pyAmakCore.classes.agent import Agent diff --git a/pyAmakCore/classes/scheduler.py b/pyAmakCore/classes/scheduler.py index 4143e5bcf4f820455476644005a6234fc73f5269..b16b13ffedf7dd8bfbaedff0972635e1364138a6 100644 --- a/pyAmakCore/classes/scheduler.py +++ b/pyAmakCore/classes/scheduler.py @@ -1,21 +1,17 @@ """ Scheduler class """ +import sys +import pathlib import pickle from threading import Semaphore, Thread -from typing import List - from time import sleep -import sys -import pathlib - sys.path.insert(0, str(pathlib.Path(__file__).parent)) -from pyAmakCore.classes.scheduler_tool.agent_thread import AgentThread from pyAmakCore.classes.amas import Amas from pyAmakCore.classes.scheduler_tool.schedulable_thread import SchedulableThread -from pyAmakCore.enumeration.executionPolicy import ExecutionPolicy +from pyAmakCore.classes.scheduler_tool.amas_thread import AmasThread class Scheduler: @@ -33,60 +29,27 @@ class Scheduler: self.schedulables = [] self.schedulables_threads = [] - self.add_schedulables(amas) - self.add_schedulables(amas.get_environment()) - - self.agents: List[AgentThread] = [] - self.agents_thread: List[Thread] = [] + self.add_schedulables(amas, AmasThread) + self.add_schedulables(amas.get_environment(), SchedulableThread) self.sleep_time = 0 - AgentThread.execution_policy = self.amas.get_execution_policy() - - self.amas.add_pending_agent() - for agent in amas.get_agents(): - self.add_agent(agent) - def get_amas(self): return self.amas - def add_schedulables(self, schedulable): - schedulable_thread = SchedulableThread(schedulable) + def add_schedulables(self, schedulable, cls): + schedulable_thread = cls(schedulable) self.schedulables.append(schedulable_thread) current_thread = Thread(target=schedulable_thread.run) self.schedulables_threads.append(current_thread) current_thread.start() - def add_agent(self, agent): - agent_thread = AgentThread(agent) - self.agents.append(agent_thread) - current_thread = Thread(target=agent_thread.run) - self.agents_thread.append(current_thread) - current_thread.start() - - def add_agents(self): - added_agent = self.amas.add_pending_agent() - for agent in added_agent: - self.add_agent(agent) - - def remove_agents(self): - removed_agent = self.amas.remove_pending_agent() - for agent in removed_agent: - for i in range(len(self.agents)): - if agent == self.agents[i].agent: - self.agents[i].exit_bool = True - self.agents[i].is_waiting.release() - self.agents_thread[i].join() - - self.agents.remove(self.agents[i]) - self.agents_thread.remove(self.agents_thread[i]) - def wait_schedulables(self) -> None: """ wait for all schedulable to release a token """ - for i in range(len(self.schedulables)): - SchedulableThread.action_done.acquire() + for schedulable in self.schedulables: + schedulable.action_done.acquire() def start_schedulables(self) -> None: """ @@ -95,47 +58,30 @@ class Scheduler: for schedulable in self.schedulables: schedulable.is_waiting.release() - def wait_agents(self) -> None: + def first_part(self) -> None: """ - wait for all agent to release a token + first part of a cycle """ - for i in range(len(self.agents)): - AgentThread.action_done.acquire() + self.start_schedulables() + # on cycle begin + self.wait_schedulables() - def start_agents(self) -> None: + def main_part(self) -> None: """ - wait for all agent to release a token + main part of a cycle """ - for agent in self.agents: - agent.is_waiting.release() - - def first_part(self): - self.add_agents() - - self.amas.to_csv(self.amas.get_cycle(), self.amas.get_agents()) - self.start_schedulables() - # on cycle begin + # agent cycle self.wait_schedulables() - def main_part(self): - self.start_agents() - # all agent run - - if self.amas.get_execution_policy() == ExecutionPolicy.TWO_PHASES: - self.wait_agents() - # agents are doing phase 2 - self.start_agents() - - self.wait_agents() - - def last_part(self): + def last_part(self) -> None: + """ + last part of a cycle + """ self.start_schedulables() # on cycle end self.wait_schedulables() - self.remove_agents() - def run(self) -> None: """ main part of amak core @@ -145,8 +91,7 @@ class Scheduler: self.semaphore_start_stop.acquire() if self.exit_bool: - self.close_childs() - return + break self.semaphore_start_stop.release() self.first_part() @@ -157,20 +102,16 @@ class Scheduler: self.close_childs() def close_childs(self): - - for agent in self.agents: - agent.exit_bool = True - agent.is_waiting.release() - - for thread in self.agents_thread: - thread.join(0) - - SchedulableThread.exit_bool = True for schedulable in self.schedulables: + schedulable.exit_bool = True schedulable.is_waiting.release() for thread in self.schedulables_threads: thread.join(0) + """ + program interface + """ + def exit_program(self): self.exit_bool = True self.semaphore_start_stop.release() @@ -184,6 +125,10 @@ class Scheduler: def set_sleep(self, sleep_time: int): self.sleep_time = sleep_time + """ + load & save program + """ + def save(self): with open('filename.pickle', 'wb') as handle: pickle.dump(self.amas, handle, protocol=pickle.HIGHEST_PROTOCOL) diff --git a/pyAmakCore/classes/scheduler_tool/agent_thread.py b/pyAmakCore/classes/scheduler_tool/agent_thread.py index ed1e1054d8734029f2c69612d5cc9fad781a0c05..72098d2bb31d8433ac47ee3525e97e3e5b680b1d 100644 --- a/pyAmakCore/classes/scheduler_tool/agent_thread.py +++ b/pyAmakCore/classes/scheduler_tool/agent_thread.py @@ -5,9 +5,10 @@ from threading import Semaphore import sys import pathlib -from pyAmakCore.classes.communicating_agent import CommunicatingAgent sys.path.insert(0, str(pathlib.Path(__file__).parent.parent)) + +from pyAmakCore.classes.communicating_agent import CommunicatingAgent from pyAmakCore.classes.agent import Agent from pyAmakCore.enumeration.executionPolicy import ExecutionPolicy diff --git a/pyAmakCore/classes/scheduler_tool/amas_thread.py b/pyAmakCore/classes/scheduler_tool/amas_thread.py new file mode 100644 index 0000000000000000000000000000000000000000..1d70f237eca2a161bff77ff0eafde8878cb23d7c --- /dev/null +++ b/pyAmakCore/classes/scheduler_tool/amas_thread.py @@ -0,0 +1,122 @@ +""" +thread class for amas +""" +from threading import Thread +from typing import List + +import sys +import pathlib + +from pyAmakCore.enumeration.executionPolicy import ExecutionPolicy + +sys.path.insert(0, str(pathlib.Path(__file__).parent.parent)) + +from pyAmakCore.classes.scheduler_tool.agent_thread import AgentThread +from pyAmakCore.classes.amas import Amas +from pyAmakCore.classes.agent import Agent +from pyAmakCore.classes.scheduler_tool.schedulable_thread import SchedulableThread + + +class AmasThread(SchedulableThread): + """ + thread class used to thread amas + """ + + def __init__(self, amas: Amas) -> None: + super().__init__(amas) + + self.agents: List[AgentThread] = [] + self.agents_thread: List[Thread] = [] + + AgentThread.execution_policy = self.schedulable.get_execution_policy() + + self.schedulable.add_pending_agent() + for agent in self.schedulable.get_agents(): + self.add_agent(agent) + + def add_agent(self, agent: Agent) -> None: + """ + make agent a thread and start the thread + """ + agent_thread = AgentThread(agent) + self.agents.append(agent_thread) + current_thread = Thread(target=agent_thread.run) + self.agents_thread.append(current_thread) + current_thread.start() + + def add_agents(self) -> None: + """ + if there are new agents, add them + """ + added_agent = self.schedulable.add_pending_agent() + for agent in added_agent: + self.add_agent(agent) + + def remove_agents(self) -> None: + """ + if some agents are removed, close their thread + """ + removed_agent = self.schedulable.remove_pending_agent() + for agent in removed_agent: + for i in range(len(self.agents)): + if agent == self.agents[i].agent: + self.agents[i].exit_bool = True + self.agents[i].is_waiting.release() + self.agents_thread[i].join() + + self.agents.remove(self.agents[i]) + self.agents_thread.remove(self.agents_thread[i]) + + def on_cycle_begin(self) -> None: + """ + start of cycle + """ + self.add_agents() + self.schedulable.on_cycle_begin() + + def main_cycle_part(self) -> None: + """ + main part of the cycle + """ + for agent in self.agents: + agent.is_waiting.release() + # agent cycle + + if self.schedulable.get_execution_policy() == ExecutionPolicy.TWO_PHASES: + # wait agents + for i in range(len(self.agents)): + AgentThread.action_done.acquire() + # start phase 2 for all agents + for agent in self.agents: + agent.is_waiting.release() + + for i in range(len(self.agents)): + AgentThread.action_done.acquire() + + def on_cycle_end(self) -> None: + """ + end of cycle + """ + self.schedulable.on_cycle_end() + + self.remove_agents() + self.schedulable.to_csv(self.schedulable.get_cycle(), self.schedulable.get_agents()) + + self.schedulable.cycle() + + def run(self) -> None: + super().run() + self.close_child() + + def close_child(self) -> None: + """ + tell all child threads to close + """ + for agent in self.agents: + agent.exit_bool = True + agent.is_waiting.release() + + for thread in self.agents_thread: + thread.join(0) + + diff --git a/pyAmakCore/classes/scheduler_tool/schedulable_thread.py b/pyAmakCore/classes/scheduler_tool/schedulable_thread.py index 69223bc115bf3cc7c9d278253e738264a3e7a783..17f347ab275403fbe548ec6390baeab4b8e4525a 100644 --- a/pyAmakCore/classes/scheduler_tool/schedulable_thread.py +++ b/pyAmakCore/classes/scheduler_tool/schedulable_thread.py @@ -15,32 +15,39 @@ class SchedulableThread: """ thread class used to thread schedulable """ - action_done: Semaphore = Semaphore(0) - exit_bool: bool = False def __init__(self, schedulable: Schedulable): self.schedulable: Schedulable = schedulable self.is_waiting: Semaphore = Semaphore(0) + self.exit_bool: bool = False + self.action_done: Semaphore = Semaphore(0) + + def on_cycle_begin(self): + self.schedulable.on_cycle_begin() + + def main_cycle_part(self): + pass + + def on_cycle_end(self): + self.schedulable.on_cycle_end() + self.schedulable.cycle() def run(self) -> None: """ main part of a schedulable thread """ - while not SchedulableThread.exit_bool: - + while not self.exit_bool: self.is_waiting.acquire() - if SchedulableThread.exit_bool: - return + if self.exit_bool: + break + self.on_cycle_begin() + self.action_done.release() - self.schedulable.on_cycle_begin() - - SchedulableThread.action_done.release() - # les agents cycles self.is_waiting.acquire() + self.main_cycle_part() + self.action_done.release() - self.schedulable.on_cycle_end() - - self.schedulable.cycle() - - SchedulableThread.action_done.release() + self.is_waiting.acquire() + self.on_cycle_end() + self.action_done.release() diff --git a/pyAmakCore/classes/tools/loggable.py b/pyAmakCore/classes/tools/loggable.py index 094e4f0ab9ce6789398d85a181a4872de0e8b58b..5ac77e22b2350d9b629a4ecce8ca0ed964fc5ec0 100644 --- a/pyAmakCore/classes/tools/loggable.py +++ b/pyAmakCore/classes/tools/loggable.py @@ -3,7 +3,6 @@ class allowing to save the state of the system at a given moment """ from os import path from typing import List - from pandas import DataFrame diff --git a/pyAmakCore/classes/tools/schedulerIHM.py b/pyAmakCore/classes/tools/schedulerIHM.py index 62dc73583bc966bbfca66eedd6eba0b1f3e5bf3a..9390671fd74934ee13020da1a0b4c51099fe6a76 100644 --- a/pyAmakCore/classes/tools/schedulerIHM.py +++ b/pyAmakCore/classes/tools/schedulerIHM.py @@ -1,8 +1,13 @@ """ Scheduler class that need to be used for pyAmakIhm """ +import pathlib from time import sleep +import sys + +sys.path.insert(0, str(pathlib.Path(__file__).parent.parent)) + from pyAmakCore.classes.amas import Amas from pyAmakCore.classes.scheduler import Scheduler @@ -16,7 +21,7 @@ class SchedulerIHM(Scheduler): self.__observer = None super().__init__(amas) - def last_part(self): + def last_part(self) -> None: super().last_part() self.__observer.updateCycle() diff --git a/pyAmakCore/tests/memory_leak/main.py b/pyAmakCore/tests/memory_leak/main.py index e6c1af277ebf4daa75ea6ded2cfbb09543708a20..a1a804ff4b60c888679878d63c78422a05d2768f 100644 --- a/pyAmakCore/tests/memory_leak/main.py +++ b/pyAmakCore/tests/memory_leak/main.py @@ -10,12 +10,13 @@ class SimpleAgent(Agent): class SimpleAmas(Amas): + def on_initialization(self) -> None: + self.set_do_log(True) def on_initial_agents_creation(self) -> None: for i in range(10): self.add_agent(SimpleAgent(self)) - class SimpleEnv(Environment): pass