Skip to content
Snippets Groups Projects
Commit 96fd8f53 authored by Maël Madon's avatar Maël Madon
Browse files

fix: job subtime in output SABjsons not in relative value

parent 3c686992
Branches
No related tags found
No related merge requests found
......@@ -9,8 +9,6 @@ def swf2sessions(input_swf, output_folder, delim_approach, delim_threshold,
dynamic_reduction, build_graph_rep, quiet):
users = {}
# if not os.path.exists(input_swf):
# raise FileExistsError(f"Input file '{input_swf}' does not exist")
if not os.path.exists(output_folder):
raise FileExistsError(f"Output folder '{output_folder}' does not exist")
......
......@@ -19,7 +19,7 @@ def run_test(delim,
create_dir_rec_if_needed(out_dir)
cp = subprocess.run(
['python3', 'swf2sessions.py', '-at', '30', input_swf, out_dir],
['python3', 'swf2sessions.py', '-at', str(threshold), input_swf, out_dir],
check=True)
swf2sessions(input_swf, out_dir, delim, threshold, dyn_red, graph, False)
......
from conftest import KTH_WL, thresholds
import json
import os.path
import random
random.seed(64875)
from conftest import KTH_WL, thresholds
def SABjson_sanity_check(SABjson_file, arrival_threshold=None):
"""Perform some sanity checks on the given SABjson file.
......@@ -43,22 +41,23 @@ def SABjson_sanity_check(SABjson_file, arrival_threshold=None):
if arrival_threshold is not None and previous_job_subtime != -1:
interrarrival = job_sub_time - previous_job_subtime
if job == session["jobs"][0]:
assert interrarrival > arrival_threshold, f"Session {s_id} was started but the interrarrival time was not suffisiant"
assert interrarrival > arrival_threshold * 60, f"Session {s_id} was started but the interrarrival time was not suffisiant"
else:
job_id = job["id"]
assert interrarrival <= arrival_threshold, f"Job {job_id} in session {s_id} should be in a new session"
assert interrarrival <= arrival_threshold * 60, f"Job {job_id} in session {s_id} should be in a new session"
s_id += 1
fd.close()
def SABjsons(out_dir):
return [
file for file in os.listdir(out_dir) if file[-8:] == ".SABjson"
]
def nb_users(out_dir):
"""Return the number of users for which a SABjson file has been created"""
assert os.path.exists(out_dir)
SABjsons = [
file for file in os.listdir(out_dir) if file[-8:] == ".SABjson"
]
return len(SABjsons)
return len(SABjsons(out_dir))
def nb_jobs(out_dir):
......@@ -66,12 +65,8 @@ def nb_jobs(out_dir):
assert os.path.exists(out_dir)
SABjsons = [
file for file in os.listdir(out_dir) if file[-8:] == ".SABjson"
]
job_count = 0
for file in SABjsons:
for file in SABjsons(out_dir):
with open(f"{out_dir}/{file}", 'r') as fp:
user_data = json.load(fp)
for sess in user_data["sessions"]:
......@@ -95,8 +90,7 @@ def test_sanity_kth_last_SABjsons():
kth_sanity_check(out_dir)
SABjsons_sample = random.sample(os.listdir(out_dir), k=10)
for file in SABjsons_sample:
for file in SABjsons(out_dir):
SABjson_sanity_check(f"{out_dir}/{file}")
......@@ -106,8 +100,7 @@ def test_sanity_kth_max_SABjsons():
kth_sanity_check(out_dir)
SABjsons_sample = random.sample(os.listdir(out_dir), k=10)
for file in SABjsons_sample:
for file in SABjsons(out_dir):
SABjson_sanity_check(f"{out_dir}/{file}")
......@@ -117,6 +110,5 @@ def test_sanity_kth_arrival_SABjsons():
kth_sanity_check(out_dir)
SABjsons_sample = random.sample(os.listdir(out_dir), k=10)
for file in SABjsons_sample:
for file in SABjsons(out_dir):
SABjson_sanity_check(f"{out_dir}/{file}", arrival_threshold=threshold)
......@@ -19,10 +19,10 @@ class User:
self.id = user_id
self.sessions = {} # dictionnary of session_id, Session object
self.active_session_id = 0
self.last_submit_time = 0
self.last_finish_time = 0
self.max_finish_time = 0
self.__active_session_id = 0
self.__last_submit_time = 0
self.__last_finish_time = 0
self.__max_finish_time = 0
self.delim_approach = delim_approach
self.delim_threshold = delim_threshold * 60 # minutes to sec
......@@ -46,15 +46,15 @@ class User:
"""Checks if a new session should be started according to the delimitation approach (see Zackay and Feitelson 2013)"""
if self.delim_approach == 'arrival':
inter_arrival_time = date_now - self.last_submit_time
inter_arrival_time = date_now - self.__last_submit_time
return (inter_arrival_time > self.delim_threshold)
elif self.delim_approach == 'last':
think_time = date_now - self.last_finish_time
think_time = date_now - self.__last_finish_time
return (think_time > self.delim_threshold)
else: # 'max' delimitation approach
think_time = date_now - self.max_finish_time
think_time = date_now - self.__max_finish_time
return (think_time > self.delim_threshold)
def __sessions_finished_at_time(self, t):
......@@ -63,10 +63,10 @@ class User:
def __update_session_graph(self, date_now):
"""Add the dependencies for the active session"""
active_session = self.sessions[self.active_session_id]
active_session = self.sessions[self.__active_session_id]
if self.build_graph_rep:
self.G.add_node(self.active_session_id)
self.G.add_node(self.__active_session_id)
if not self.dynamic_reduction:
# For all sessions that were finished when the current session was
......@@ -78,7 +78,7 @@ class User:
active_session.tt_after_prec_sess.append(think_time)
if self.build_graph_rep:
self.G.add_edge(self.active_session_id,
self.G.add_edge(self.__active_session_id,
finish_sess.id,
weight=think_time)
......@@ -104,27 +104,27 @@ class User:
active_session.tt_after_prec_sess.append(think_time)
if self.build_graph_rep:
self.G.add_edge(self.active_session_id,
self.G.add_edge(self.__active_session_id,
sess.id,
weight=think_time)
def __create_new_session(self, date_now):
"""Create a new active session and achive the old one"""
if self.active_session_id > 0:
if self.__active_session_id > 0:
# Archive active session and increment active session_id
active_session = self.sessions[self.active_session_id]
active_session = self.sessions[self.__active_session_id]
if self.dynamic_reduction and date_now <= active_session.max_finish_time:
self.sessions_in_progress.add(self.active_session_id)
self.sessions_in_progress.add(self.__active_session_id)
self.active_session_id += 1
self.__active_session_id += 1
# Create new session
new_session = Session(self.active_session_id, date_now)
self.sessions[self.active_session_id] = new_session
new_session = Session(self.__active_session_id, date_now)
self.sessions[self.__active_session_id] = new_session
def __add_job_to_active_session(self, job):
active_sess = self.sessions[self.active_session_id]
active_sess = self.sessions[self.__active_session_id]
active_sess.add_job(job)
######## Public methods ########
......@@ -143,9 +143,9 @@ class User:
self.__add_job_to_active_session(job)
# Updating variables
self.last_submit_time = job.submit_time
self.last_finish_time = job.finish_time
self.max_finish_time = max(self.max_finish_time, job.finish_time)
self.__last_submit_time = job.submit_time
self.__last_finish_time = job.finish_time
self.__max_finish_time = max(self.__max_finish_time, job.finish_time)
def export_dependancy_graph(self, output_folder):
"""Write the dependancy graph as a gml file"""
......
......@@ -46,18 +46,19 @@ class Job:
self.res = nb_requested_resources
self.walltime = walltime
def to_dict(self):
def to_dict(self, session_start_offset=0):
"""Job object to dictionnary, for printing or json export."""
run_time = self.finish_time - self.start_time
return {
"id": self.id,
"profile": str(run_time),
"res": self.res,
"subtime": self.submit_time,
"subtime": self.submit_time - session_start_offset,
"walltime": self.walltime
}
class Session:
"""Class representing a user session."""
......@@ -73,7 +74,6 @@ class Session:
def add_job(self, job):
"""Add job to the session"""
self.jobs.append(job)
self.max_finish_time = max(job.finish_time, self.max_finish_time)
......@@ -87,5 +87,6 @@ class Session:
"preceding_sessions": self.preceding_sessions,
"thinking_time_after_preceding_session": self.tt_after_prec_sess,
"nb_jobs": len(self.jobs),
"jobs": [j.to_dict() for j in self.jobs]
"jobs": [j.to_dict(session_start_offset = self.first_submit)
for j in self.jobs]
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment