diff --git a/swf2sessions.py b/swf2sessions.py index 0438bdb9a99b148d8c1faf192714d55249f6f401..5738d520c0929ed4134366bbf870ff2fa28d4754 100755 --- a/swf2sessions.py +++ b/swf2sessions.py @@ -9,8 +9,6 @@ def swf2sessions(input_swf, output_folder, delim_approach, delim_threshold, dynamic_reduction, build_graph_rep, quiet): users = {} - # if not os.path.exists(input_swf): - # raise FileExistsError(f"Input file '{input_swf}' does not exist") if not os.path.exists(output_folder): raise FileExistsError(f"Output folder '{output_folder}' does not exist") diff --git a/test/test_integration.py b/test/test_integration.py index cbddebe3a7a2937b2facee5edef91a9955fdca85..9bac0b0fa85c99d95a3a1acc35995ce031d95a13 100644 --- a/test/test_integration.py +++ b/test/test_integration.py @@ -19,7 +19,7 @@ def run_test(delim, create_dir_rec_if_needed(out_dir) cp = subprocess.run( - ['python3', 'swf2sessions.py', '-at', '30', input_swf, out_dir], + ['python3', 'swf2sessions.py', '-at', str(threshold), input_swf, out_dir], check=True) swf2sessions(input_swf, out_dir, delim, threshold, dyn_red, graph, False) diff --git a/test/test_output_sanity.py b/test/test_output_sanity.py index 7e8900155e66d56abfd47cf17cc533be77be3368..1bcff7e02fa73415689aa56a14caab22c020cba6 100644 --- a/test/test_output_sanity.py +++ b/test/test_output_sanity.py @@ -1,10 +1,8 @@ -from conftest import KTH_WL, thresholds - import json import os.path -import random -random.seed(64875) +from conftest import KTH_WL, thresholds + def SABjson_sanity_check(SABjson_file, arrival_threshold=None): """Perform some sanity checks on the given SABjson file. @@ -43,22 +41,23 @@ def SABjson_sanity_check(SABjson_file, arrival_threshold=None): if arrival_threshold is not None and previous_job_subtime != -1: interrarrival = job_sub_time - previous_job_subtime if job == session["jobs"][0]: - assert interrarrival > arrival_threshold, f"Session {s_id} was started but the interrarrival time was not suffisiant" + assert interrarrival > arrival_threshold * 60, f"Session {s_id} was started but the interrarrival time was not suffisiant" else: job_id = job["id"] - assert interrarrival <= arrival_threshold, f"Job {job_id} in session {s_id} should be in a new session" + assert interrarrival <= arrival_threshold * 60, f"Job {job_id} in session {s_id} should be in a new session" s_id += 1 fd.close() +def SABjsons(out_dir): + return [ + file for file in os.listdir(out_dir) if file[-8:] == ".SABjson" + ] + def nb_users(out_dir): """Return the number of users for which a SABjson file has been created""" assert os.path.exists(out_dir) - - SABjsons = [ - file for file in os.listdir(out_dir) if file[-8:] == ".SABjson" - ] - return len(SABjsons) + return len(SABjsons(out_dir)) def nb_jobs(out_dir): @@ -66,12 +65,8 @@ def nb_jobs(out_dir): assert os.path.exists(out_dir) - SABjsons = [ - file for file in os.listdir(out_dir) if file[-8:] == ".SABjson" - ] - job_count = 0 - for file in SABjsons: + for file in SABjsons(out_dir): with open(f"{out_dir}/{file}", 'r') as fp: user_data = json.load(fp) for sess in user_data["sessions"]: @@ -95,8 +90,7 @@ def test_sanity_kth_last_SABjsons(): kth_sanity_check(out_dir) - SABjsons_sample = random.sample(os.listdir(out_dir), k=10) - for file in SABjsons_sample: + for file in SABjsons(out_dir): SABjson_sanity_check(f"{out_dir}/{file}") @@ -106,8 +100,7 @@ def test_sanity_kth_max_SABjsons(): kth_sanity_check(out_dir) - SABjsons_sample = random.sample(os.listdir(out_dir), k=10) - for file in SABjsons_sample: + for file in SABjsons(out_dir): SABjson_sanity_check(f"{out_dir}/{file}") @@ -117,6 +110,5 @@ def test_sanity_kth_arrival_SABjsons(): kth_sanity_check(out_dir) - SABjsons_sample = random.sample(os.listdir(out_dir), k=10) - for file in SABjsons_sample: + for file in SABjsons(out_dir): SABjson_sanity_check(f"{out_dir}/{file}", arrival_threshold=threshold) diff --git a/user_session_builder.py b/user_session_builder.py index c375cf4f91c7cb7f5ba664a8e8dc9ef613360700..a416f00c6ac2418ad8344edcd96f92693d50b137 100644 --- a/user_session_builder.py +++ b/user_session_builder.py @@ -19,10 +19,10 @@ class User: self.id = user_id self.sessions = {} # dictionnary of session_id, Session object - self.active_session_id = 0 - self.last_submit_time = 0 - self.last_finish_time = 0 - self.max_finish_time = 0 + self.__active_session_id = 0 + self.__last_submit_time = 0 + self.__last_finish_time = 0 + self.__max_finish_time = 0 self.delim_approach = delim_approach self.delim_threshold = delim_threshold * 60 # minutes to sec @@ -46,15 +46,15 @@ class User: """Checks if a new session should be started according to the delimitation approach (see Zackay and Feitelson 2013)""" if self.delim_approach == 'arrival': - inter_arrival_time = date_now - self.last_submit_time + inter_arrival_time = date_now - self.__last_submit_time return (inter_arrival_time > self.delim_threshold) elif self.delim_approach == 'last': - think_time = date_now - self.last_finish_time + think_time = date_now - self.__last_finish_time return (think_time > self.delim_threshold) else: # 'max' delimitation approach - think_time = date_now - self.max_finish_time + think_time = date_now - self.__max_finish_time return (think_time > self.delim_threshold) def __sessions_finished_at_time(self, t): @@ -63,10 +63,10 @@ class User: def __update_session_graph(self, date_now): """Add the dependencies for the active session""" - active_session = self.sessions[self.active_session_id] + active_session = self.sessions[self.__active_session_id] if self.build_graph_rep: - self.G.add_node(self.active_session_id) + self.G.add_node(self.__active_session_id) if not self.dynamic_reduction: # For all sessions that were finished when the current session was @@ -78,7 +78,7 @@ class User: active_session.tt_after_prec_sess.append(think_time) if self.build_graph_rep: - self.G.add_edge(self.active_session_id, + self.G.add_edge(self.__active_session_id, finish_sess.id, weight=think_time) @@ -104,27 +104,27 @@ class User: active_session.tt_after_prec_sess.append(think_time) if self.build_graph_rep: - self.G.add_edge(self.active_session_id, + self.G.add_edge(self.__active_session_id, sess.id, weight=think_time) def __create_new_session(self, date_now): """Create a new active session and achive the old one""" - if self.active_session_id > 0: + if self.__active_session_id > 0: # Archive active session and increment active session_id - active_session = self.sessions[self.active_session_id] + active_session = self.sessions[self.__active_session_id] if self.dynamic_reduction and date_now <= active_session.max_finish_time: - self.sessions_in_progress.add(self.active_session_id) + self.sessions_in_progress.add(self.__active_session_id) - self.active_session_id += 1 + self.__active_session_id += 1 # Create new session - new_session = Session(self.active_session_id, date_now) - self.sessions[self.active_session_id] = new_session + new_session = Session(self.__active_session_id, date_now) + self.sessions[self.__active_session_id] = new_session def __add_job_to_active_session(self, job): - active_sess = self.sessions[self.active_session_id] + active_sess = self.sessions[self.__active_session_id] active_sess.add_job(job) ######## Public methods ######## @@ -143,9 +143,9 @@ class User: self.__add_job_to_active_session(job) # Updating variables - self.last_submit_time = job.submit_time - self.last_finish_time = job.finish_time - self.max_finish_time = max(self.max_finish_time, job.finish_time) + self.__last_submit_time = job.submit_time + self.__last_finish_time = job.finish_time + self.__max_finish_time = max(self.__max_finish_time, job.finish_time) def export_dependancy_graph(self, output_folder): """Write the dependancy graph as a gml file""" diff --git a/workload.py b/workload.py index c5494b8cb4bf20623914b97b95ff7870d8074ed6..b1c33706e27d87b25ddf34071362137316f28781 100644 --- a/workload.py +++ b/workload.py @@ -46,18 +46,19 @@ class Job: self.res = nb_requested_resources self.walltime = walltime - def to_dict(self): + def to_dict(self, session_start_offset=0): """Job object to dictionnary, for printing or json export.""" run_time = self.finish_time - self.start_time return { "id": self.id, "profile": str(run_time), "res": self.res, - "subtime": self.submit_time, + "subtime": self.submit_time - session_start_offset, "walltime": self.walltime } + class Session: """Class representing a user session.""" @@ -73,7 +74,6 @@ class Session: def add_job(self, job): """Add job to the session""" - self.jobs.append(job) self.max_finish_time = max(job.finish_time, self.max_finish_time) @@ -87,5 +87,6 @@ class Session: "preceding_sessions": self.preceding_sessions, "thinking_time_after_preceding_session": self.tt_after_prec_sess, "nb_jobs": len(self.jobs), - "jobs": [j.to_dict() for j in self.jobs] + "jobs": [j.to_dict(session_start_offset = self.first_submit) + for j in self.jobs] }