diff --git a/campaign2.py b/campaign2.py index 364e8d9b68e1b19786ba436f7577f631f3442850..e70a479653d2897e5284efd3fb01437d0b12479b 100755 --- a/campaign2.py +++ b/campaign2.py @@ -1,36 +1,52 @@ #!/usr/bin/env python3 import random -from time import * +# from time import * +import datetime +from dateutil import parser import concurrent.futures -from instance1 import start_instance +from scripts.util import WL_DIR +from instance2 import start_instance -# Prepare the start date samples +############################### +# Prepare the start date sample +############################### begin_trace = 1356994806 # according to original SWF header -jun1_unix_time = mktime(strptime('Sun Jun 1 00:00:00 2014')) -nov30_unix_time = mktime(strptime('Sun Nov 30 23:59:59 2014')) -jun1 = (int)(jun1_unix_time - begin_trace) -nov30 = (int)(nov30_unix_time - begin_trace) - -random.seed(4186341) -nb_expe = 50 - - -# TODO -# We fix the demand response window to be at 16 on day 2 - - -# start_dates = [random.randint(jun1, nov30 - 72 * 3600) -# for _ in range(nb_expe)] - -# with concurrent.futures.ProcessPoolExecutor() as executor: -# instances = [] -# for i in range(nb_expe): -# print(f"Submit expe {i}") -# # start_instance(expe_num, start_date, prepare_workload, clean_log) -# instances.append(executor.submit( -# start_instance, i, start_dates[i], False, True)) - -# for instance in concurrent.futures.as_completed(instances): -# print(f"Expe {instance.result()} terminated") +jun1 = parser.parse('Sun Jun 1 00:00:00 CEST 2014') +daylight_saving_day = parser.parse('Sun Oct 26 00:00:00 CEST 2014') +day = datetime.timedelta(days=1) +weekdays = [1, 2, 3, 4, 5] # Mon to Fri + +# We do one expe for every weekday beween Jun 1 and Oct 26 +expe_start_time = [] +str_start_day = [] +day1 = jun1 +while day1 <= (daylight_saving_day - 3*day): + day2 = day1 + day + if day2.isoweekday() in weekdays: + str_start_day.append(day1.ctime()) + expe_start_time.append(day1.timestamp() - begin_trace) + day1 += day + +# Save slected dates in a txt file +with open(f"{WL_DIR}/start_days_for_campaign2.txt", 'w') as f: + for date in str_start_day: + f.write(date + '\n') + +############################### +# Launch the expe for every start date +############################### +nb_expe = len(expe_start_time) +nb_expe = 2 + +with concurrent.futures.ProcessPoolExecutor() as executor: + instances = [] + for i in range(nb_expe): + print(f"Submit expe {i}") + # start_instance(expe_num, start_date, prepare_workload, clean_log) + instances.append(executor.submit( + start_instance, i, expe_start_time[i], False, True)) + + for instance in concurrent.futures.as_completed(instances): + print(f"Expe {instance.result()} terminated") diff --git a/instance2.py b/instance2.py new file mode 100755 index 0000000000000000000000000000000000000000..8f34c51ec069e7949666b937bdafd6e26ea88f86 --- /dev/null +++ b/instance2.py @@ -0,0 +1,122 @@ +#!/usr/bin/env python3 + +import time +import os +import subprocess +import argparse +import json + +import scripts.swf_to_batsim_split_by_user as split_user +from scripts.util import * + + +def prepare_input_data(expe_num, start_date): + """Cut the original trace to extract 72h starting from this start date""" + end_date = start_date + 72*3600 + to_keep = f"submit_time >= {start_date} and submit_time <= {end_date}" + + if not os.path.exists(f'{WL_DIR}/expe{expe_num}'): + os.makedirs(f'{WL_DIR}/expe{expe_num}') + split_user.generate_workload( + input_swf=f'{WL_DIR}/MC_selection_article.swf', + output_folder=f'{WL_DIR}/expe{expe_num}', + keep_only=to_keep, + job_grain=10, + job_walltime_factor=8) + + +def run_expe(expe_num, user_category, window_size, clean_log): + """Run batmen with given behavior and demand response window. + Expe_num should be a small integer (eg < 100)""" + # Useful vars and output folder + EXPE_DIR = f"{ROOT_DIR}/out/expe{expe_num}/{user_category}_window{window_size}" + create_dir_rec_if_needed(EXPE_DIR) + create_dir_rec_if_needed(f"{EXPE_DIR}/cmd") + EXPE_FILE = f"{EXPE_DIR}/cmd/robinfile.yaml" + wl_folder = f'{WL_DIR}/expe{expe_num}' + pf = f"{ROOT_DIR}/platform/average_metacentrum.xml" + wl = f"{WL_DIR}/empty_workload.json" + uf = f"{EXPE_DIR}/cmd/user_description_file.json" + + # Demand response window, from 16 to (16 + window_size) on day2 + dm_window = [(24+16)*3600, (int) ((24+16+window_size)*3600)] + + # User description file + def user_description(user): + return { + "name": user, + "category": user_category, + "param": {"input_json": f"{wl_folder}/{user}.json"} + } + user_names = [user_file.split('.')[0] for user_file in os.listdir(wl_folder)] + data = {} + data["dm_window"] = dm_window + data["log_user_stats"] = True + data["log_folder"] = EXPE_DIR + data["users"] = [user_description(user) for user in user_names] + with open(uf, 'w') as user_description_file: + json.dump(data, user_description_file) + + # Generate and run robin instance + socket_batsim = f"tcp://localhost:280{expe_num:02d}" + socket_batsched = f"tcp://*:280{expe_num:02d}" + batcmd = gen_batsim_cmd( + pf, wl, EXPE_DIR, f"--socket-endpoint={socket_batsim} --energy --enable-compute-sharing --enable-dynamic-jobs --acknowledge-dynamic-jobs --enable-profile-reuse") + schedcmd = f"batsched --socket-endpoint={socket_batsched} -v bin_packing_energy --queue_order=desc_size --variant_options_filepath={uf}" + instance = RobinInstance(output_dir=EXPE_DIR, + batcmd=batcmd, + schedcmd=schedcmd, + simulation_timeout=604800, ready_timeout=10, + success_timeout=3600, failure_timeout=5 + ) + instance.to_file(EXPE_FILE) + print(f"Run robin {EXPE_FILE}") + ret = run_robin(EXPE_FILE) + print(f"Robin {EXPE_FILE} finished") + + # Remove the log files that can quickly become heavy... + if clean_log: + os.remove(f"{EXPE_DIR}/log/batsim.log") + os.remove(f"{EXPE_DIR}/log/sched.err.log") + os.remove(f"{EXPE_DIR}/log/sched.out.log") + + + +def start_instance(expe_num, start_date, prepare_workload=True, clean_log=False): + # Prepare workload + if prepare_workload: + prepare_input_data(expe_num, start_date) + + # Create expe folder + create_dir_rec_if_needed(f"{ROOT_DIR}/out/expe{expe_num}") + + # Run with Rigid behavior (the demand response window has no influence here) + run_expe(expe_num=expe_num, + user_category="replay_user_rigid", + window_size=1, clean_log=clean_log) + + # 4*2 = 8 expe + for behavior in ["dm_user_reconfig","dm_user_degrad", + "dm_user_renonce","dm_user_delay"]: + for window_size in [1, 4]: + run_expe(expe_num, behavior, window_size, clean_log=clean_log) + + + ###### Output data treatment ###### + # Produce the utilisation viz? + return expe_num + +def main(): + parser = argparse.ArgumentParser( + description='One expe instance. To launch for example with `oarsub -l walltime=2 "./1_one_instance arg1 arg2 arg3"`') + parser.add_argument('expe_num', type=int, help='The expe ID') + parser.add_argument('start_date', type=int, + help='Start of the 3-day window (in seconds since the start of the original trace)') + args = parser.parse_args() + + start_instance(args.expe_num, args.start_date) + + + +if __name__ == "__main__": + main()