Skip to content
Snippets Groups Projects
Commit 42c94f58 authored by Maël Madon's avatar Maël Madon
Browse files

campaign2 prepared and ready to be tested in grid5000

parent 85c7b0c5
Branches main
No related tags found
No related merge requests found
#!/usr/bin/env python3 #!/usr/bin/env python3
import random import random
from time import * # from time import *
import datetime
from dateutil import parser
import concurrent.futures import concurrent.futures
from instance1 import start_instance from scripts.util import WL_DIR
from instance2 import start_instance
# Prepare the start date samples ###############################
# Prepare the start date sample
###############################
begin_trace = 1356994806 # according to original SWF header begin_trace = 1356994806 # according to original SWF header
jun1_unix_time = mktime(strptime('Sun Jun 1 00:00:00 2014')) jun1 = parser.parse('Sun Jun 1 00:00:00 CEST 2014')
nov30_unix_time = mktime(strptime('Sun Nov 30 23:59:59 2014')) daylight_saving_day = parser.parse('Sun Oct 26 00:00:00 CEST 2014')
jun1 = (int)(jun1_unix_time - begin_trace) day = datetime.timedelta(days=1)
nov30 = (int)(nov30_unix_time - begin_trace) weekdays = [1, 2, 3, 4, 5] # Mon to Fri
random.seed(4186341) # We do one expe for every weekday beween Jun 1 and Oct 26
nb_expe = 50 expe_start_time = []
str_start_day = []
day1 = jun1
# TODO while day1 <= (daylight_saving_day - 3*day):
# We fix the demand response window to be at 16 on day 2 day2 = day1 + day
if day2.isoweekday() in weekdays:
str_start_day.append(day1.ctime())
# start_dates = [random.randint(jun1, nov30 - 72 * 3600) expe_start_time.append(day1.timestamp() - begin_trace)
# for _ in range(nb_expe)] day1 += day
# with concurrent.futures.ProcessPoolExecutor() as executor: # Save slected dates in a txt file
# instances = [] with open(f"{WL_DIR}/start_days_for_campaign2.txt", 'w') as f:
# for i in range(nb_expe): for date in str_start_day:
# print(f"Submit expe {i}") f.write(date + '\n')
# # start_instance(expe_num, start_date, prepare_workload, clean_log)
# instances.append(executor.submit( ###############################
# start_instance, i, start_dates[i], False, True)) # Launch the expe for every start date
###############################
# for instance in concurrent.futures.as_completed(instances): nb_expe = len(expe_start_time)
# print(f"Expe {instance.result()} terminated") nb_expe = 2
with concurrent.futures.ProcessPoolExecutor() as executor:
instances = []
for i in range(nb_expe):
print(f"Submit expe {i}")
# start_instance(expe_num, start_date, prepare_workload, clean_log)
instances.append(executor.submit(
start_instance, i, expe_start_time[i], False, True))
for instance in concurrent.futures.as_completed(instances):
print(f"Expe {instance.result()} terminated")
#!/usr/bin/env python3
import time
import os
import subprocess
import argparse
import json
import scripts.swf_to_batsim_split_by_user as split_user
from scripts.util import *
def prepare_input_data(expe_num, start_date):
"""Cut the original trace to extract 72h starting from this start date"""
end_date = start_date + 72*3600
to_keep = f"submit_time >= {start_date} and submit_time <= {end_date}"
if not os.path.exists(f'{WL_DIR}/expe{expe_num}'):
os.makedirs(f'{WL_DIR}/expe{expe_num}')
split_user.generate_workload(
input_swf=f'{WL_DIR}/MC_selection_article.swf',
output_folder=f'{WL_DIR}/expe{expe_num}',
keep_only=to_keep,
job_grain=10,
job_walltime_factor=8)
def run_expe(expe_num, user_category, window_size, clean_log):
"""Run batmen with given behavior and demand response window.
Expe_num should be a small integer (eg < 100)"""
# Useful vars and output folder
EXPE_DIR = f"{ROOT_DIR}/out/expe{expe_num}/{user_category}_window{window_size}"
create_dir_rec_if_needed(EXPE_DIR)
create_dir_rec_if_needed(f"{EXPE_DIR}/cmd")
EXPE_FILE = f"{EXPE_DIR}/cmd/robinfile.yaml"
wl_folder = f'{WL_DIR}/expe{expe_num}'
pf = f"{ROOT_DIR}/platform/average_metacentrum.xml"
wl = f"{WL_DIR}/empty_workload.json"
uf = f"{EXPE_DIR}/cmd/user_description_file.json"
# Demand response window, from 16 to (16 + window_size) on day2
dm_window = [(24+16)*3600, (int) ((24+16+window_size)*3600)]
# User description file
def user_description(user):
return {
"name": user,
"category": user_category,
"param": {"input_json": f"{wl_folder}/{user}.json"}
}
user_names = [user_file.split('.')[0] for user_file in os.listdir(wl_folder)]
data = {}
data["dm_window"] = dm_window
data["log_user_stats"] = True
data["log_folder"] = EXPE_DIR
data["users"] = [user_description(user) for user in user_names]
with open(uf, 'w') as user_description_file:
json.dump(data, user_description_file)
# Generate and run robin instance
socket_batsim = f"tcp://localhost:280{expe_num:02d}"
socket_batsched = f"tcp://*:280{expe_num:02d}"
batcmd = gen_batsim_cmd(
pf, wl, EXPE_DIR, f"--socket-endpoint={socket_batsim} --energy --enable-compute-sharing --enable-dynamic-jobs --acknowledge-dynamic-jobs --enable-profile-reuse")
schedcmd = f"batsched --socket-endpoint={socket_batsched} -v bin_packing_energy --queue_order=desc_size --variant_options_filepath={uf}"
instance = RobinInstance(output_dir=EXPE_DIR,
batcmd=batcmd,
schedcmd=schedcmd,
simulation_timeout=604800, ready_timeout=10,
success_timeout=3600, failure_timeout=5
)
instance.to_file(EXPE_FILE)
print(f"Run robin {EXPE_FILE}")
ret = run_robin(EXPE_FILE)
print(f"Robin {EXPE_FILE} finished")
# Remove the log files that can quickly become heavy...
if clean_log:
os.remove(f"{EXPE_DIR}/log/batsim.log")
os.remove(f"{EXPE_DIR}/log/sched.err.log")
os.remove(f"{EXPE_DIR}/log/sched.out.log")
def start_instance(expe_num, start_date, prepare_workload=True, clean_log=False):
# Prepare workload
if prepare_workload:
prepare_input_data(expe_num, start_date)
# Create expe folder
create_dir_rec_if_needed(f"{ROOT_DIR}/out/expe{expe_num}")
# Run with Rigid behavior (the demand response window has no influence here)
run_expe(expe_num=expe_num,
user_category="replay_user_rigid",
window_size=1, clean_log=clean_log)
# 4*2 = 8 expe
for behavior in ["dm_user_reconfig","dm_user_degrad",
"dm_user_renonce","dm_user_delay"]:
for window_size in [1, 4]:
run_expe(expe_num, behavior, window_size, clean_log=clean_log)
###### Output data treatment ######
# Produce the utilisation viz?
return expe_num
def main():
parser = argparse.ArgumentParser(
description='One expe instance. To launch for example with `oarsub -l walltime=2 "./1_one_instance arg1 arg2 arg3"`')
parser.add_argument('expe_num', type=int, help='The expe ID')
parser.add_argument('start_date', type=int,
help='Start of the 3-day window (in seconds since the start of the original trace)')
args = parser.parse_args()
start_instance(args.expe_num, args.start_date)
if __name__ == "__main__":
main()
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment