From fbd79605c547a9d705923fa26a772c0b5c1e378d Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Ma=C3=ABl=20Madon?= <mael.madon@irit.fr>
Date: Thu, 3 Nov 2022 16:46:32 +0100
Subject: [PATCH] dev: add job output options, closes #2

---
 src/workload.py               |  13 +++-
 swf2userSessions.py           | 134 +++++++++++++++++++++++++---------
 test/conftest.py              |   4 +-
 test/test_expected_outputs.py |   2 +-
 4 files changed, 111 insertions(+), 42 deletions(-)

diff --git a/src/workload.py b/src/workload.py
index 3dab542..5536aa7 100644
--- a/src/workload.py
+++ b/src/workload.py
@@ -3,6 +3,7 @@
 """SWF types and functions."""
 
 from enum import Enum, unique
+from math import ceil
 
 
 @unique
@@ -38,20 +39,26 @@ class Job:
                  finish_time:float,
                  start_time:float=None,
                  nb_requested_resources:int=None,
-                 walltime:float=None):
+                 walltime:float=None,
+                 job_grain:int=1):
         self.id = job_id
         self.submit_time = submit_time
         self.start_time = start_time
         self.finish_time = finish_time
         self.res = nb_requested_resources
         self.walltime = walltime
+        self.jg = job_grain
+
+    def rounded_up_profile(self):
+        """Rounded up profile, to the next job grain"""
+        run_time = self.finish_time - self.start_time
+        return int(ceil(run_time / self.jg) * self.jg)
 
     def to_dict(self, session_start_offset=0):
         """Job object to dictionnary, for printing or json export."""
-        run_time = self.finish_time - self.start_time
         return {
             "id": self.id,
-            "profile": str(run_time),
+            "profile": str(self.rounded_up_profile()),
             "res": self.res,
             "subtime": self.submit_time - session_start_offset,
             "walltime": self.walltime
diff --git a/swf2userSessions.py b/swf2userSessions.py
index fa23bf1..087b235 100755
--- a/swf2userSessions.py
+++ b/swf2userSessions.py
@@ -6,10 +6,10 @@ from src.workload import SwfField, Job
 
 
 def swf2sessions(input_swf, output_dir, delim_approach, delim_threshold,
-         dynamic_reduction, build_graph_rep, quiet):
+                 dynamic_reduction, build_graph_rep, job_walltime_factor,
+                 given_walltime_only, job_grain, quiet):
     users = {}
 
-
     if not os.path.exists(output_dir):
         raise FileExistsError(f"Output folder '{output_dir}' does not exist")
 
@@ -34,22 +34,30 @@ def swf2sessions(input_swf, output_dir, delim_approach, delim_threshold,
             wait_time = float(res.group(SwfField.WAIT_TIME.value))
             run_time = float(res.group(SwfField.RUN_TIME.value))
             user_id = str(res.group(SwfField.USER_ID.value))
-            walltime = float(res.group(SwfField.REQUESTED_TIME.value))
+
+            if given_walltime_only:
+                walltime = float(res.group(SwfField.REQUESTED_TIME.value))
+            else:
+                walltime = max(job_walltime_factor * run_time,
+                               float(res.group(SwfField.REQUESTED_TIME.value)))
             nb_res = int(res.group(SwfField.ALLOCATED_PROCESSOR_COUNT.value))
 
             start_time = submit_time + wait_time
             finish_time = submit_time + wait_time + run_time
 
-            job = Job(job_id, submit_time, finish_time, start_time, nb_res, walltime)
+            job = Job(job_id, submit_time, finish_time, start_time, nb_res,
+                      walltime, job_grain=job_grain)
 
             if user_id not in users:
-                users[user_id] = User(user_id, delim_approach, delim_threshold, dynamic_reduction=dynamic_reduction, build_graph_rep=build_graph_rep)
+                users[user_id] = User(user_id,
+                                      delim_approach,
+                                      delim_threshold,
+                                      dynamic_reduction=dynamic_reduction,
+                                      build_graph_rep=build_graph_rep)
 
             user = users[user_id]
             user.add_job(job)
 
-            
-
     # SWF finished, output for each user
     for user_id, user in users.items():
         with open(f"{output_dir}/user{user_id}.SABjson", "w") as file:
@@ -61,10 +69,12 @@ def swf2sessions(input_swf, output_dir, delim_approach, delim_threshold,
     if not quiet:
         print("\nSWF parsing done.")
         print("Number of users:    ", len(users))
-        print("Number of sessions: ", sum([len(u.sessions) for u in users.values()]))
+        print("Number of sessions: ",
+              sum([len(u.sessions) for u in users.values()]))
         print(f"The output files have been stored in the folder {output_dir}")
 
-if __name__== "__main__":
+
+if __name__ == "__main__":
     parser = argparse.ArgumentParser(description='TODO')
     parser.add_argument('input_swf',
                         type=argparse.FileType('r'),
@@ -73,24 +83,71 @@ if __name__== "__main__":
                         type=str,
                         help='The folder that will store the output files')
 
-    group = parser.add_mutually_exclusive_group(required=True)
-    group.add_argument('-a', '--arrival', metavar="THRESHOLD", type=float,
-                    help="'Arrival' delimitation approach. A job starts a new session if the inter-arrival time with the last job is above the threshold (in minutes)")
-    group.add_argument("-l", "--last", metavar="THRESHOLD", type=float,
-                    help="'Last' delimitation approach: a job starts a new session if the think time after the last job is above the threshold (in minutes)")
-    group.add_argument("-m", "--max", metavar="THRESHOLD", type=float,
-                    help="'Max' delimitation approach: a job starts a new session if the think time after the previous job with the highest finish time is above the threshold (in minutes)")
-
-    parser.add_argument('--no_dynamic_reduction', action="store_true",
-        help=
-        'Unless this option is specified, during the construction of the graph the algorithm dynamically avoids to add an edge between two nodes if a path already exists.'
-    )
-    parser.add_argument('--graph', action="store_true",
-        help=
-        "Build a graphical representation of each session graph and save them in a subfolder as gml files"
-    )
-
-    parser.add_argument("-q", "--quiet", action="store_true", help="Lowest verbosity level.")
+    delim_options = parser.add_mutually_exclusive_group(required=True)
+    delim_options.add_argument(
+        '-a',
+        '--arrival',
+        metavar="THRESHOLD",
+        type=float,
+        help="'Arrival' delimitation approach: a job starts a new "
+        "session if the inter-arrival time with the last job "
+        "is above the threshold (in minutes)")
+    delim_options.add_argument(
+        "-l",
+        "--last",
+        metavar="THRESHOLD",
+        type=float,
+        help="'Last' delimitation approach: a job starts a new "
+        "session if the think time after the last job is above "
+        "the threshold (in minutes)")
+    delim_options.add_argument(
+        "-m",
+        "--max",
+        metavar="THRESHOLD",
+        type=float,
+        help="'Max' delimitation approach: a job starts a new "
+        "session if the think time after the previous job with "
+        "the highest finish time is above the threshold (in minutes)")
+
+    parser.add_argument(
+        '--no_dynamic_reduction',
+        action="store_true",
+        help="Unless this option is specified, during the construction of the "
+        "graph the algorithm dynamically avoids to add an edge between two "
+        "nodes if a path already exists.")
+    parser.add_argument(
+        '--graph',
+        action="store_true",
+        help="Build a graphical representation of each session graph and save "
+        "them in a subfolder as gml files")
+
+    parser.add_argument("-q",
+                        "--quiet",
+                        action="store_true",
+                        help="Lowest verbosity level.")
+
+    job_options = parser.add_argument_group("job output options")
+    job_options.add_argument(
+        '-jwf',
+        '--job_walltime_factor',
+        type=float,
+        default=2,
+        help="Jobs walltimes are computed by the formula "
+        "max(givenWalltime, jobWalltimeFactor*givenRuntime)")
+    job_options.add_argument(
+        '-gwo',
+        '--given_walltime_only',
+        action="store_true",
+        help="If set, only the given walltime in the trace "
+        "will be used")
+    job_options.add_argument(
+        '-jg',
+        '--job_grain',
+        type=int,
+        default=1,
+        help="(default: 1) Selects the level of detail we want for job profiles. "
+        "This parameter is used to group jobs that have close running times. "
+        "For example: a job grain of 10 will round up running times to the next ten.")
 
     args = parser.parse_args()
 
@@ -103,16 +160,21 @@ if __name__== "__main__":
     elif args.arrival is not None:
         delim = 'arrival'
         threshold = args.arrival
-    else: # should never happen
-        raise argparse.ArgumentError("You should specify a delimitation approach")
+    else:  # should never happen
+        raise argparse.ArgumentError(
+            "You should specify a delimitation approach.")
 
     if threshold < 0:
-        raise argparse.ArgumentTypeError("The threshold must be a positive value.")
+        raise argparse.ArgumentTypeError(
+            "The threshold must be a positive value.")
 
     swf2sessions(input_swf=args.input_swf,
-        output_dir=args.output_dir,
-        delim_approach=delim,
-        delim_threshold=threshold,
-        dynamic_reduction=not (args.no_dynamic_reduction),
-        build_graph_rep=args.graph,
-        quiet=args.quiet)
\ No newline at end of file
+                 output_dir=args.output_dir,
+                 delim_approach=delim,
+                 delim_threshold=threshold,
+                 dynamic_reduction=not (args.no_dynamic_reduction),
+                 build_graph_rep=args.graph,
+                 job_walltime_factor=args.job_walltime_factor,
+                 given_walltime_only=args.given_walltime_only,
+                 job_grain=args.job_grain,
+                 quiet=args.quiet)
diff --git a/test/conftest.py b/test/conftest.py
index ecc9802..8db24b8 100644
--- a/test/conftest.py
+++ b/test/conftest.py
@@ -30,8 +30,8 @@ def run_script(delim, threshold,
     out_dir = os.path.abspath(f'test-out/{test_name}')
     create_dir_rec_if_needed(out_dir)
 
-    args = ['python3', 'swf2userSessions.py', 
-            f'--{delim}', str(threshold), '-q', input_swf, out_dir]
+    args = ['python3', 'swf2userSessions.py', f'--{delim}', str(threshold), 
+            '-q', '--given_walltime_only', input_swf, out_dir]
     if graph:
         args.append("--graph")
     if not dyn_red:
diff --git a/test/test_expected_outputs.py b/test/test_expected_outputs.py
index 5cebc78..31baeef 100644
--- a/test/test_expected_outputs.py
+++ b/test/test_expected_outputs.py
@@ -20,7 +20,7 @@ def compare_with_expected_output(test_name):
     assert obtained["nb_res"] == expected["nb_res"]
     assert len(obtained["sessions"]) == len(expected["sessions"])
 
-    for s_e, s_o in zip(obtained["sessions"], expected["sessions"]):
+    for s_o, s_e in zip(obtained["sessions"], expected["sessions"]):
         assert int(s_e["id"]) == int(s_o["id"]), f"\
             In session {s_e['id']}"
         assert float(s_e["first_submit_time"]) == float(s_o["first_submit_time"]), f"\
-- 
GitLab