From 1810bd29af0fe2822d191dba3b1ba707b355f68f Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Ma=C3=ABl=20Madon?= <mael.madon@irit.fr>
Date: Fri, 28 Jan 2022 20:24:03 +0100
Subject: [PATCH] renaming and prepared to use python multiprocessing

---
 1_full_expe.py => campaign1.py         |  11 ++-
 1_one_instance.py => instance1.py      |  34 ++++---
 scripts/install_nix.sh                 |   2 +-
 scripts/swf_to_batsim_split_by_user.py | 118 +++++++++++++------------
 4 files changed, 92 insertions(+), 73 deletions(-)
 rename 1_full_expe.py => campaign1.py (64%)
 rename 1_one_instance.py => instance1.py (90%)

diff --git a/1_full_expe.py b/campaign1.py
similarity index 64%
rename from 1_full_expe.py
rename to campaign1.py
index f6532df..5fbc957 100644
--- a/1_full_expe.py
+++ b/campaign1.py
@@ -1,5 +1,8 @@
 from random import *
 from time import *
+import multiprocessing
+
+from instance1 import start_instance
 
 # Prepare the start date samples
 
@@ -7,7 +10,11 @@ begin_trace = 1356994806 # according to original SWF header
 jun1_unix_time, nov30_unix_time = mktime(strptime('Sun Jun  1 00:00:00 2014')), mktime(strptime('Sun Nov 30 23:59:59 2014'))
 jun1, nov30 = (int) (jun1_unix_time - begin_trace), (int) (nov30_unix_time - begin_trace)
 
-start_date = randint(jun1, nov30 - 72 * 3600)
-print(jun1)
+random.seed(1997)
+
+instances = []
+for i in range(2):
+    start_date = randint(jun1, nov30 - 72 * 3600)
+    instane = multiprocessing.Process(target=start_instance, args=)
 # For every start date
 #   Call the one_instance file with this date
\ No newline at end of file
diff --git a/1_one_instance.py b/instance1.py
similarity index 90%
rename from 1_one_instance.py
rename to instance1.py
index 49fd848..cc3fb94 100755
--- a/1_one_instance.py
+++ b/instance1.py
@@ -5,6 +5,7 @@ import time
 import os
 import subprocess
 import argparse
+import json
 # sys.path.insert(0, '/scripts')
 
 import scripts.swf_to_batsim_split_by_user as split_user
@@ -71,27 +72,21 @@ def run_expe(expe_num, user_category, window_size):
                             success_timeout=10, failure_timeout=0
                             )
     instance.to_file(EXPE_FILE)
+    print(f"before run {EXPE_FILE}")
     ret = run_robin(EXPE_FILE)
+    print(f"after run {EXPE_FILE}")
 
 
-
-def main():
-    parser = argparse.ArgumentParser(
-        description='One expe instance. To launch for example with `oarsub -l walltime=2 "./1_one_instance arg1 arg2 arg3"`')
-    parser.add_argument('expe_num', type=int, help='The expe ID')
-    parser.add_argument('start_date', type=int, 
-                        help='Start of the 3-day window (in seconds since the start of the original trace')
-    args = parser.parse_args()
-
+def start_instance(expe_num, start_date):
     # Prepare workload
-    prepare_input_data(args.expe_num, args.start_date)
+    #prepare_input_data(expe_num, start_date)
 
     # Create expe folder
-    if not os.path.exists(f"out/expe{expe_num}"):
-        os.makedirs(f"out/expe{expe_num}")
+    if not os.path.exists(f"{ROOT_DIR}/out/expe{expe_num}"):
+        os.makedirs(f"{ROOT_DIR}/out/expe{expe_num}")
 
     # Run with Rigid behavior (the demand response window has no influence here)
-    run_expe(expe_num=args.expe_num, 
+    run_expe(expe_num=expe_num, 
              user_category="replay_user_rigid",
              window_size=1)
 
@@ -105,5 +100,18 @@ def main():
     ###### Output data treatment ######
     # Produce the utilisation viz?
 
+
+def main():
+    parser = argparse.ArgumentParser(
+        description='One expe instance. To launch for example with `oarsub -l walltime=2 "./1_one_instance arg1 arg2 arg3"`')
+    parser.add_argument('expe_num', type=int, help='The expe ID')
+    parser.add_argument('start_date', type=int, 
+                        help='Start of the 3-day window (in seconds since the start of the original trace')
+    args = parser.parse_args()
+    
+    start_instance(args.expe_num, args.start_date)
+
+
+
 if __name__ == "__main__":
     main()
diff --git a/scripts/install_nix.sh b/scripts/install_nix.sh
index 3199a8b..fa86af0 100644
--- a/scripts/install_nix.sh
+++ b/scripts/install_nix.sh
@@ -3,5 +3,5 @@ cd ~/demand-response-user
 sudo-g5k
 sudo su root --command "echo 1 > /proc/sys/kernel/unprivileged_userns_clone"
 curl https://nixos.org/releases/nix/nix-2.6.0/install | sh
-source ${HOME}/.nix-profile/etc/profile.d/nix.sh
+. ${HOME}/.nix-profile/etc/profile.d/nix.sh
 nix-store --import < cache_nix
diff --git a/scripts/swf_to_batsim_split_by_user.py b/scripts/swf_to_batsim_split_by_user.py
index d69ea78..f23839e 100755
--- a/scripts/swf_to_batsim_split_by_user.py
+++ b/scripts/swf_to_batsim_split_by_user.py
@@ -28,6 +28,9 @@ def generate_workload(input_swf, output_folder,
                       quiet=False,
                       job_size_function_string='1*nb_res'):
     """Generate a Batsim workload from a SWF trace."""
+    print(f"Input file = {input_swf}")
+    print(f"output folder = {output_folder}")
+
     element = '([-+]?\d+(?:\.\d+)?)'
     r = re.compile('\s*' + (element + '\s+') * 17 + element + '\s*')
 
@@ -51,64 +54,65 @@ def generate_workload(input_swf, output_folder,
 
     # Let's loop over the lines of the input file
     i = 1
-    for line in input_swf:
-        i += 1
-        if i % 100000 == 0:
-            print("Processing swf line", i)
-
-        res = r.match(line)
-
-        if res:
-            # Parsing...
-            job_id = (int(float(res.group(SwfField.JOB_ID.value))))
-            nb_res = int(
-                float(res.group(SwfField.REQUESTED_NUMBER_OF_PROCESSORS.value)))
-            run_time = float(res.group(SwfField.RUN_TIME.value))
-            submit_time = max(0, float(res.group(SwfField.SUBMIT_TIME.value)))
-            walltime = max(job_walltime_factor * run_time,
-                           float(res.group(SwfField.REQUESTED_TIME.value)))
-            user_id = str(res.group(SwfField.USER_ID.value))
-            partition_id = int(res.group(SwfField.PARTITION_ID.value))
-
-            # nb_res may be changed by calling a user-given function
-            nb_res = eval(job_size_function_string)
-
-            if given_walltime_only:
-                walltime = float(res.group(SwfField.REQUESTED_TIME.value))
-
-            # Select jobs to keep
-            is_valid_job = (nb_res > 0 and walltime >
-                            run_time and run_time > 0 and submit_time >= 0)
-            select_partition = ((partitions_to_select is None) or
-                                (partition_id in partitions_to_select))
-            use_job = select_partition and (
-                (keep_only is None) or eval(keep_only))
-
-            if not is_valid_job:
-                not_valid += 1
-            if not use_job:
-                not_selected["nb"] += 1
-                not_selected["coreh"] += run_time * nb_res
+    with open(input_swf, 'r') as swf:
+        for line in swf:
+            i += 1
+            if i % 100000 == 0:
+                print("Processing swf line", i)
+
+            res = r.match(line)
+
+            if res:
+                # Parsing...
+                job_id = (int(float(res.group(SwfField.JOB_ID.value))))
+                nb_res = int(
+                    float(res.group(SwfField.REQUESTED_NUMBER_OF_PROCESSORS.value)))
+                run_time = float(res.group(SwfField.RUN_TIME.value))
+                submit_time = max(0, float(res.group(SwfField.SUBMIT_TIME.value)))
+                walltime = max(job_walltime_factor * run_time,
+                            float(res.group(SwfField.REQUESTED_TIME.value)))
+                user_id = str(res.group(SwfField.USER_ID.value))
+                partition_id = int(res.group(SwfField.PARTITION_ID.value))
+
+                # nb_res may be changed by calling a user-given function
+                nb_res = eval(job_size_function_string)
+
+                if given_walltime_only:
+                    walltime = float(res.group(SwfField.REQUESTED_TIME.value))
+
+                # Select jobs to keep
+                is_valid_job = (nb_res > 0 and walltime >
+                                run_time and run_time > 0 and submit_time >= 0)
+                select_partition = ((partitions_to_select is None) or
+                                    (partition_id in partitions_to_select))
+                use_job = select_partition and (
+                    (keep_only is None) or eval(keep_only))
+
+                if not is_valid_job:
+                    not_valid += 1
+                if not use_job:
+                    not_selected["nb"] += 1
+                    not_selected["coreh"] += run_time * nb_res
+
+                else:
+                    selected["nb"] += 1
+                    selected["coreh"] += run_time * nb_res
+
+                    if not(users.__contains__(user_id)):
+                        users[user_id] = {}
+                        users[user_id]["jobs"] = []
+
+                    profile = int(((run_time // job_grain) + 1) * job_grain)
+
+                    job = (current_id, nb_res, run_time,
+                        submit_time, profile, walltime)
+                    current_id = current_id + 1
+                    minimum_observed_submit_time = min(minimum_observed_submit_time,
+                                                    submit_time)
+                    users[user_id]["jobs"].append(job)
 
             else:
-                selected["nb"] += 1
-                selected["coreh"] += run_time * nb_res
-
-                if not(users.__contains__(user_id)):
-                    users[user_id] = {}
-                    users[user_id]["jobs"] = []
-
-                profile = int(((run_time // job_grain) + 1) * job_grain)
-
-                job = (current_id, nb_res, run_time,
-                       submit_time, profile, walltime)
-                current_id = current_id + 1
-                minimum_observed_submit_time = min(minimum_observed_submit_time,
-                                                   submit_time)
-                users[user_id]["jobs"].append(job)
-
-        else:
-            not_line_match_format += 1
+                not_line_match_format += 1
 
     # Create a json file per user
     if not os.path.exists(output_folder):
@@ -183,7 +187,7 @@ def main():
     """
     parser = argparse.ArgumentParser(
         description='Reads a SWF (Standard Workload Format) file and transform it into a JSON Batsim workload (with delay jobs)')
-    parser.add_argument('input_swf', type=argparse.FileType('r'),
+    parser.add_argument('input_swf', type=str,
                         help='The input SWF file')
     parser.add_argument('output_folder', type=str,
                         help='The output folder for the JSON files')
-- 
GitLab