Skip to content
Snippets Groups Projects
Commit 0d84d037 authored by Millian Poquet's avatar Millian Poquet
Browse files

script: generate usage_replay traces + parallel

parent f0a74acb
Branches
No related tags found
No related merge requests found
#!/usr/bin/env python3
import argparse
from concurrent.futures import ProcessPoolExecutor
import datetime
import json
import os
......@@ -8,6 +9,7 @@ from sortedcontainers import SortedDict, SortedSet
EPOCH_PYTHON_DATETIME = datetime.datetime(year=1900, month=1, day=1)
EPOCH_M100 = datetime.datetime(year=2022, month=1, day=1)
POWER_DF = None
def determine_walltime(row):
time_limit = None
......@@ -27,15 +29,39 @@ def determine_walltime(row):
return int(walltime_delta.total_seconds())
def generate_node_file(job_profile_dir, job_min_dt, job_max_dt, job_node, node_id, max_power, min_power, host_flops):
global POWER_DF
job_mask = (POWER_DF['timestamp'] >= job_min_dt) & (POWER_DF['timestamp'] <= job_max_dt)
node_mask = POWER_DF['node'] == str(job_node)
node_power_df = POWER_DF[job_mask & node_mask]
with open(f'{job_profile_dir}/node{node_id}.txt', 'w', buffering=4096) as f:
for _, power_row in node_power_df.iterrows():
desired_watts = max(min(float(power_row['value']), max_power), min_power)
usage = desired_watts / (max_power - min_power)
desired_duration = 20 # M100 traces have 1 value every 20 s
desired_flops = desired_duration * host_flops
f.write(f'{node_id} m_usage {usage:.4f} {desired_flops:.1e}\n')
def main():
datetime_parser = lambda f: datetime.datetime.strptime(f, '%Y-%m-%d %H:%M:%S')
parser = argparse.ArgumentParser()
parser.add_argument("input_jobs", help='path to the CSV file that contains the jobs information')
parser.add_argument("input_power_timeseries_prefix", help="filepath prefix to the location of the parquet files that contain node power consumption time series")
parser.add_argument("--begin", required=True, type=datetime_parser, help='the starting datetime of the interval to replay. example: 2022-01-01 00:00:00')
parser.add_argument("--end", required=True, type=datetime_parser, help='the end datetime of the interval to replay. example: 2022-01-31 23:59:59')
parser.add_argument("-p", "--profile_type", choices=['delay'], required=True, help='the profile type to use')
parser.add_argument("-p", "--profile_type", choices=['delay', 'usage_replay'], required=True, help='the profile type to use')
parser.add_argument("--max_job_duration", type=int, default=60*60*24, help='the maximum number of seconds of a job. longer jobs will be truncated when replayed')
parser.add_argument("--min_power", type=float, default=240.0, help='the minimum power value to use for power-related profiles')
parser.add_argument("--max_power", type=float, default=2100.0, help='the maximum power value to use for power-related profiles')
parser.add_argument("--host_flops", default=1e9, help='the maximum power value to use for power-related profiles')
parser.add_argument("--max_processes", type=int, default=64, help='the maximum number of processes to use to generate the workload files')
parser.add_argument("-o", "--output_dir", required=True, help="filepath where all the workload-related files should be generated")
args = parser.parse_args()
assert args.max_power > args.min_power
assert args.max_job_duration < 28*60*60*24, 'this script does not handle jobs longer than one month'
begin_seconds_since_m100_epoch = int((args.begin - EPOCH_M100).total_seconds())
end_seconds_since_m100_epoch = int((args.end - EPOCH_M100).total_seconds())
......@@ -62,33 +88,84 @@ def main():
delay_profiles_durations = SortedSet()
profiles = SortedDict()
for wload, wload_type in [(running_jobs, 'init_run'), (queued_jobs, 'init_queue'), (toreplay_jobs, 'main')]:
for _, row in wload.iterrows():
job_duration = int((row['end_time'] - row['start_time']).total_seconds())
assert job_duration > 0
extra_data = {
'workload_type': wload_type,
'zero_power_estimation': float(row['zero_power_estimation']),
'mean_power_estimation': float(row['mean_power_estimation']),
'max_power_estimation': float(row['max_power_estimation']),
'upper_bound_power_estimation': float(row['upper_bound_power_estimation']),
}
job_walltime = determine_walltime(row)
job_profile = None
match args.profile_type:
case 'delay':
job_profile = f'delay{job_duration}'
delay_profiles_durations.add(job_duration)
jobs.append({
'id': str(row['job_id']),
'subtime': int((row['submit_time'] - args.begin).total_seconds()),
'walltime': int(job_walltime),
'res': int(row['num_nodes']),
'profile': job_profile,
'extra_data': extra_data,
})
match args.profile_type:
case 'usage_replay':
# read parquet files, only keeping values with a timestamp that can be useful to replay this workload
min_dt = args.begin
max_dt = args.end + datetime.timedelta(seconds=args.max_job_duration)
dt = min_dt
power_filename = f'{args.input_power_timeseries_prefix}/{dt.year - 2000:02d}-{dt.month:02d}_power_total.parquet'
power_df = pd.read_parquet(power_filename)
power_df = power_df[(power_df['timestamp'] >= min_dt) & (power_df['timestamp'] <= max_dt)]
# need to load the beginning of the next month too?
if max_dt.month != min_dt.month:
dt = max_dt
power_filename = f'{args.input_power_timeseries_prefix}/{dt.year - 2000:02d}-{dt.month:02d}_power_total.parquet'
next_month_power_df = pd.read_parquet(power_filename)
next_month_power_df = next_month_power_df[(next_month_power_df['timestamp'] >= min_dt) & (next_month_power_df['timestamp'] <= max_dt)]
power_df = pd.concat([power_df, next_month_power_df])
del next_month_power_df
global POWER_DF
POWER_DF = power_df.sort_values(by='timestamp')
del power_df
with ProcessPoolExecutor(max_workers=args.max_processes) as executor:
for wload, wload_type in [(running_jobs, 'init_run'), (queued_jobs, 'init_queue'), (toreplay_jobs, 'main')]:
for _, row in wload.iterrows():
job_duration = min(int(args.max_job_duration), int((row['end_time'] - row['start_time']).total_seconds()))
assert job_duration > 0
extra_data = {
'workload_type': wload_type,
'zero_power_estimation': float(row['zero_power_estimation']),
'mean_power_estimation': float(row['mean_power_estimation']),
'max_power_estimation': float(row['max_power_estimation']),
'upper_bound_power_estimation': float(row['upper_bound_power_estimation']),
}
job_walltime = determine_walltime(row)
job_profile = None
match args.profile_type:
case 'delay':
job_profile = f'delay{job_duration}'
delay_profiles_durations.add(job_duration)
case 'usage_replay':
job_starts_seconds_since_m100_epoch = int((row['start_time'] - EPOCH_M100).total_seconds())
job_profile = f'{row["job_id"]}_{job_starts_seconds_since_m100_epoch}'
job_profile_dir = f'{args.output_dir}/jobs/{row["job_id"]}_{job_starts_seconds_since_m100_epoch}'
profiles[job_profile] = {
'type': 'trace_replay',
'trace_type': 'usage',
'trace_file': f'{os.path.abspath(job_profile_dir)}/traces.txt',
}
# assume that already existing dirs have valid content
if not os.path.exists(job_profile_dir):
os.makedirs(job_profile_dir, exist_ok=True)
# write the main entry actions file
with open(f'{job_profile_dir}/traces.txt', 'w') as f:
for node_id in range(int(row['num_nodes'])):
f.write(f'node{node_id}.txt\n')
job_min_dt = row['start_time']
job_max_dt = job_min_dt + datetime.timedelta(seconds=job_duration)
job_nodes = eval(row['nodes'])
for node_id, job_node in enumerate(job_nodes):
executor.submit(generate_node_file, job_profile_dir, job_min_dt, job_max_dt, job_node, node_id, args.max_power, args.min_power, args.host_flops)
jobs.append({
'id': str(row['job_id']),
'subtime': int((row['submit_time'] - args.begin).total_seconds()),
'walltime': int(job_walltime),
'res': int(row['num_nodes']),
'profile': job_profile,
'extra_data': extra_data,
})
match args.profile_type:
case 'delay':
......@@ -99,12 +176,12 @@ def main():
}
os.makedirs(args.output_dir, exist_ok=True)
with open(f'{args.output_dir}/{wload_name}è', 'w') as f:
with open(f'{args.output_dir}/{wload_name}', 'w') as f:
f.write(f'''{{
"description": "Automatically generated from a M100 trace from '{args.begin}' to '{args.end}' using '{args.profile_type}' profile types",
"replay_begin_seconds_from_2022-01-01": {begin_seconds_since_m100_epoch},
"replay_end_seconds_from_2022-01-01": {end_seconds_since_m100_epoch},
"nb_res": 980,
"jobs": {json.dumps(jobs, indent=4)},
"profiles": {json.dumps(profiles, indent=4, sort_keys=True)}
"description": "Automatically generated from a M100 trace from '{args.begin}' to '{args.end}' using '{args.profile_type}' profile types",
"replay_begin_seconds_from_2022-01-01": {begin_seconds_since_m100_epoch},
"replay_end_seconds_from_2022-01-01": {end_seconds_since_m100_epoch},
"nb_res": 980,
"jobs": {json.dumps(jobs, indent=4)},
"profiles": {json.dumps(profiles, indent=4, sort_keys=True)}
}}''')
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment