Skip to content
Snippets Groups Projects
Commit 1810bd29 authored by Maël Madon's avatar Maël Madon
Browse files

renaming and prepared to use python multiprocessing

parent ec1bf89c
No related branches found
No related tags found
No related merge requests found
from random import * from random import *
from time import * from time import *
import multiprocessing
from instance1 import start_instance
# Prepare the start date samples # Prepare the start date samples
...@@ -7,7 +10,11 @@ begin_trace = 1356994806 # according to original SWF header ...@@ -7,7 +10,11 @@ begin_trace = 1356994806 # according to original SWF header
jun1_unix_time, nov30_unix_time = mktime(strptime('Sun Jun 1 00:00:00 2014')), mktime(strptime('Sun Nov 30 23:59:59 2014')) jun1_unix_time, nov30_unix_time = mktime(strptime('Sun Jun 1 00:00:00 2014')), mktime(strptime('Sun Nov 30 23:59:59 2014'))
jun1, nov30 = (int) (jun1_unix_time - begin_trace), (int) (nov30_unix_time - begin_trace) jun1, nov30 = (int) (jun1_unix_time - begin_trace), (int) (nov30_unix_time - begin_trace)
start_date = randint(jun1, nov30 - 72 * 3600) random.seed(1997)
print(jun1)
instances = []
for i in range(2):
start_date = randint(jun1, nov30 - 72 * 3600)
instane = multiprocessing.Process(target=start_instance, args=)
# For every start date # For every start date
# Call the one_instance file with this date # Call the one_instance file with this date
\ No newline at end of file
...@@ -5,6 +5,7 @@ import time ...@@ -5,6 +5,7 @@ import time
import os import os
import subprocess import subprocess
import argparse import argparse
import json
# sys.path.insert(0, '/scripts') # sys.path.insert(0, '/scripts')
import scripts.swf_to_batsim_split_by_user as split_user import scripts.swf_to_batsim_split_by_user as split_user
...@@ -71,27 +72,21 @@ def run_expe(expe_num, user_category, window_size): ...@@ -71,27 +72,21 @@ def run_expe(expe_num, user_category, window_size):
success_timeout=10, failure_timeout=0 success_timeout=10, failure_timeout=0
) )
instance.to_file(EXPE_FILE) instance.to_file(EXPE_FILE)
print(f"before run {EXPE_FILE}")
ret = run_robin(EXPE_FILE) ret = run_robin(EXPE_FILE)
print(f"after run {EXPE_FILE}")
def start_instance(expe_num, start_date):
def main():
parser = argparse.ArgumentParser(
description='One expe instance. To launch for example with `oarsub -l walltime=2 "./1_one_instance arg1 arg2 arg3"`')
parser.add_argument('expe_num', type=int, help='The expe ID')
parser.add_argument('start_date', type=int,
help='Start of the 3-day window (in seconds since the start of the original trace')
args = parser.parse_args()
# Prepare workload # Prepare workload
prepare_input_data(args.expe_num, args.start_date) #prepare_input_data(expe_num, start_date)
# Create expe folder # Create expe folder
if not os.path.exists(f"out/expe{expe_num}"): if not os.path.exists(f"{ROOT_DIR}/out/expe{expe_num}"):
os.makedirs(f"out/expe{expe_num}") os.makedirs(f"{ROOT_DIR}/out/expe{expe_num}")
# Run with Rigid behavior (the demand response window has no influence here) # Run with Rigid behavior (the demand response window has no influence here)
run_expe(expe_num=args.expe_num, run_expe(expe_num=expe_num,
user_category="replay_user_rigid", user_category="replay_user_rigid",
window_size=1) window_size=1)
...@@ -105,5 +100,18 @@ def main(): ...@@ -105,5 +100,18 @@ def main():
###### Output data treatment ###### ###### Output data treatment ######
# Produce the utilisation viz? # Produce the utilisation viz?
def main():
parser = argparse.ArgumentParser(
description='One expe instance. To launch for example with `oarsub -l walltime=2 "./1_one_instance arg1 arg2 arg3"`')
parser.add_argument('expe_num', type=int, help='The expe ID')
parser.add_argument('start_date', type=int,
help='Start of the 3-day window (in seconds since the start of the original trace')
args = parser.parse_args()
start_instance(args.expe_num, args.start_date)
if __name__ == "__main__": if __name__ == "__main__":
main() main()
...@@ -3,5 +3,5 @@ cd ~/demand-response-user ...@@ -3,5 +3,5 @@ cd ~/demand-response-user
sudo-g5k sudo-g5k
sudo su root --command "echo 1 > /proc/sys/kernel/unprivileged_userns_clone" sudo su root --command "echo 1 > /proc/sys/kernel/unprivileged_userns_clone"
curl https://nixos.org/releases/nix/nix-2.6.0/install | sh curl https://nixos.org/releases/nix/nix-2.6.0/install | sh
source ${HOME}/.nix-profile/etc/profile.d/nix.sh . ${HOME}/.nix-profile/etc/profile.d/nix.sh
nix-store --import < cache_nix nix-store --import < cache_nix
...@@ -28,6 +28,9 @@ def generate_workload(input_swf, output_folder, ...@@ -28,6 +28,9 @@ def generate_workload(input_swf, output_folder,
quiet=False, quiet=False,
job_size_function_string='1*nb_res'): job_size_function_string='1*nb_res'):
"""Generate a Batsim workload from a SWF trace.""" """Generate a Batsim workload from a SWF trace."""
print(f"Input file = {input_swf}")
print(f"output folder = {output_folder}")
element = '([-+]?\d+(?:\.\d+)?)' element = '([-+]?\d+(?:\.\d+)?)'
r = re.compile('\s*' + (element + '\s+') * 17 + element + '\s*') r = re.compile('\s*' + (element + '\s+') * 17 + element + '\s*')
...@@ -51,64 +54,65 @@ def generate_workload(input_swf, output_folder, ...@@ -51,64 +54,65 @@ def generate_workload(input_swf, output_folder,
# Let's loop over the lines of the input file # Let's loop over the lines of the input file
i = 1 i = 1
for line in input_swf: with open(input_swf, 'r') as swf:
i += 1 for line in swf:
if i % 100000 == 0: i += 1
print("Processing swf line", i) if i % 100000 == 0:
print("Processing swf line", i)
res = r.match(line)
res = r.match(line)
if res:
# Parsing... if res:
job_id = (int(float(res.group(SwfField.JOB_ID.value)))) # Parsing...
nb_res = int( job_id = (int(float(res.group(SwfField.JOB_ID.value))))
float(res.group(SwfField.REQUESTED_NUMBER_OF_PROCESSORS.value))) nb_res = int(
run_time = float(res.group(SwfField.RUN_TIME.value)) float(res.group(SwfField.REQUESTED_NUMBER_OF_PROCESSORS.value)))
submit_time = max(0, float(res.group(SwfField.SUBMIT_TIME.value))) run_time = float(res.group(SwfField.RUN_TIME.value))
walltime = max(job_walltime_factor * run_time, submit_time = max(0, float(res.group(SwfField.SUBMIT_TIME.value)))
float(res.group(SwfField.REQUESTED_TIME.value))) walltime = max(job_walltime_factor * run_time,
user_id = str(res.group(SwfField.USER_ID.value)) float(res.group(SwfField.REQUESTED_TIME.value)))
partition_id = int(res.group(SwfField.PARTITION_ID.value)) user_id = str(res.group(SwfField.USER_ID.value))
partition_id = int(res.group(SwfField.PARTITION_ID.value))
# nb_res may be changed by calling a user-given function
nb_res = eval(job_size_function_string) # nb_res may be changed by calling a user-given function
nb_res = eval(job_size_function_string)
if given_walltime_only:
walltime = float(res.group(SwfField.REQUESTED_TIME.value)) if given_walltime_only:
walltime = float(res.group(SwfField.REQUESTED_TIME.value))
# Select jobs to keep
is_valid_job = (nb_res > 0 and walltime > # Select jobs to keep
run_time and run_time > 0 and submit_time >= 0) is_valid_job = (nb_res > 0 and walltime >
select_partition = ((partitions_to_select is None) or run_time and run_time > 0 and submit_time >= 0)
(partition_id in partitions_to_select)) select_partition = ((partitions_to_select is None) or
use_job = select_partition and ( (partition_id in partitions_to_select))
(keep_only is None) or eval(keep_only)) use_job = select_partition and (
(keep_only is None) or eval(keep_only))
if not is_valid_job:
not_valid += 1 if not is_valid_job:
if not use_job: not_valid += 1
not_selected["nb"] += 1 if not use_job:
not_selected["coreh"] += run_time * nb_res not_selected["nb"] += 1
not_selected["coreh"] += run_time * nb_res
else:
selected["nb"] += 1
selected["coreh"] += run_time * nb_res
if not(users.__contains__(user_id)):
users[user_id] = {}
users[user_id]["jobs"] = []
profile = int(((run_time // job_grain) + 1) * job_grain)
job = (current_id, nb_res, run_time,
submit_time, profile, walltime)
current_id = current_id + 1
minimum_observed_submit_time = min(minimum_observed_submit_time,
submit_time)
users[user_id]["jobs"].append(job)
else: else:
selected["nb"] += 1 not_line_match_format += 1
selected["coreh"] += run_time * nb_res
if not(users.__contains__(user_id)):
users[user_id] = {}
users[user_id]["jobs"] = []
profile = int(((run_time // job_grain) + 1) * job_grain)
job = (current_id, nb_res, run_time,
submit_time, profile, walltime)
current_id = current_id + 1
minimum_observed_submit_time = min(minimum_observed_submit_time,
submit_time)
users[user_id]["jobs"].append(job)
else:
not_line_match_format += 1
# Create a json file per user # Create a json file per user
if not os.path.exists(output_folder): if not os.path.exists(output_folder):
...@@ -183,7 +187,7 @@ def main(): ...@@ -183,7 +187,7 @@ def main():
""" """
parser = argparse.ArgumentParser( parser = argparse.ArgumentParser(
description='Reads a SWF (Standard Workload Format) file and transform it into a JSON Batsim workload (with delay jobs)') description='Reads a SWF (Standard Workload Format) file and transform it into a JSON Batsim workload (with delay jobs)')
parser.add_argument('input_swf', type=argparse.FileType('r'), parser.add_argument('input_swf', type=str,
help='The input SWF file') help='The input SWF file')
parser.add_argument('output_folder', type=str, parser.add_argument('output_folder', type=str,
help='The output folder for the JSON files') help='The output folder for the JSON files')
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment