Skip to content
Snippets Groups Projects
Commit 62dcb642 authored by Millian Poquet's avatar Millian Poquet
Browse files

script: fix wld gen + compute real max/mean job W

parent df3963e8
Branches
No related tags found
No related merge requests found
...@@ -32,7 +32,7 @@ def determine_walltime(row): ...@@ -32,7 +32,7 @@ def determine_walltime(row):
def generate_node_file(job_profile_dir, job_min_dt, job_max_dt, job_node, node_id, max_power, min_power, host_flops): def generate_node_file(job_profile_dir, job_min_dt, job_max_dt, job_node, node_id, max_power, min_power, host_flops):
global POWER_DF global POWER_DF
job_mask = (POWER_DF['timestamp'] >= job_min_dt) & (POWER_DF['timestamp'] <= job_max_dt) job_mask = (POWER_DF['timestamp'] >= job_min_dt) & (POWER_DF['timestamp'] <= job_max_dt)
node_mask = POWER_DF['node'] == str(job_node) node_mask = POWER_DF['node'] == job_node
node_power_df = POWER_DF[job_mask & node_mask] node_power_df = POWER_DF[job_mask & node_mask]
with open(f'{job_profile_dir}/node{node_id}.txt', 'w', buffering=4096) as f: with open(f'{job_profile_dir}/node{node_id}.txt', 'w', buffering=4096) as f:
...@@ -45,6 +45,24 @@ def generate_node_file(job_profile_dir, job_min_dt, job_max_dt, job_node, node_i ...@@ -45,6 +45,24 @@ def generate_node_file(job_profile_dir, job_min_dt, job_max_dt, job_node, node_i
f.write(f'{node_id} m_usage {usage:.4f} {desired_flops:.1e}\n') f.write(f'{node_id} m_usage {usage:.4f} {desired_flops:.1e}\n')
def generate_job_power_time_series(job_profile_dir, job_min_dt, job_max_dt, job_nodes, min_power):
global POWER_DF
time_mask = (POWER_DF['timestamp'] >= job_min_dt) & (POWER_DF['timestamp'] <= job_max_dt)
space_mask = POWER_DF['node'].isin(job_nodes)
job_power_df = POWER_DF[time_mask & space_mask]
job_summed_power_df = job_power_df.groupby('timestamp')['value'].sum()
job_real_power_mean = float(job_summed_power_df.mean())
job_real_power_max = float(job_summed_power_df.max())
# remove static part
static_watts = len(job_nodes) * min_power
job_summed_power_df = job_summed_power_df - static_watts
os.makedirs(job_profile_dir, exist_ok=True)
job_summed_power_df.to_csv(f'{job_profile_dir}/dynpower.csv', index=False, header=['job_total_dynamic_power'])
return job_real_power_mean, job_real_power_max # return dynamic values
def main(): def main():
datetime_parser = lambda f: datetime.datetime.strptime(f, '%Y-%m-%d %H:%M:%S') datetime_parser = lambda f: datetime.datetime.strptime(f, '%Y-%m-%d %H:%M:%S')
parser = argparse.ArgumentParser() parser = argparse.ArgumentParser()
...@@ -65,7 +83,8 @@ def main(): ...@@ -65,7 +83,8 @@ def main():
begin_seconds_since_m100_epoch = int((args.begin - EPOCH_M100).total_seconds()) begin_seconds_since_m100_epoch = int((args.begin - EPOCH_M100).total_seconds())
end_seconds_since_m100_epoch = int((args.end - EPOCH_M100).total_seconds()) end_seconds_since_m100_epoch = int((args.end - EPOCH_M100).total_seconds())
wload_name = f'wload_{args.profile_type}_{begin_seconds_since_m100_epoch}.json' wload_prefix = f'wload_{args.profile_type}_{begin_seconds_since_m100_epoch}'
wload_name = f'{wload_prefix}.json'
input_df = pd.read_csv(args.input_jobs) input_df = pd.read_csv(args.input_jobs)
input_df['start_time'] = input_df['start_time'].apply(datetime_parser) input_df['start_time'] = input_df['start_time'].apply(datetime_parser)
...@@ -84,59 +103,70 @@ def main(): ...@@ -84,59 +103,70 @@ def main():
toreplay_mask = (input_df['submit_time'] >= args.begin) & (input_df['submit_time'] < args.end) toreplay_mask = (input_df['submit_time'] >= args.begin) & (input_df['submit_time'] < args.end)
toreplay_jobs = input_df[toreplay_mask].copy().reset_index() toreplay_jobs = input_df[toreplay_mask].copy().reset_index()
jobs_dict = dict()
jobs = list() jobs = list()
delay_profiles_durations = SortedSet() delay_profiles_durations = SortedSet()
profiles = SortedDict() profiles = SortedDict()
match args.profile_type: # read parquet files, only keeping values with a timestamp that can be useful to replay this workload
case 'usage_replay': min_dt = args.begin
# read parquet files, only keeping values with a timestamp that can be useful to replay this workload max_dt = max(queued_jobs['end_time'].max(), toreplay_jobs['end_time'].max(), args.end + datetime.timedelta(seconds=args.max_job_duration))
min_dt = args.begin
max_dt = args.end + datetime.timedelta(seconds=args.max_job_duration) dt = min_dt
power_filename = f'{args.input_power_timeseries_prefix}/{dt.year - 2000:02d}-{dt.month:02d}_power_total.parquet'
dt = min_dt power_df = pd.read_parquet(power_filename)
power_filename = f'{args.input_power_timeseries_prefix}/{dt.year - 2000:02d}-{dt.month:02d}_power_total.parquet' power_df = power_df[(power_df['timestamp'] >= min_dt) & (power_df['timestamp'] <= max_dt)]
power_df = pd.read_parquet(power_filename)
power_df = power_df[(power_df['timestamp'] >= min_dt) & (power_df['timestamp'] <= max_dt)] # need to load the beginning of the next month too?
if max_dt.month != min_dt.month:
# need to load the beginning of the next month too? assert max_dt.month == min_dt.month + 1
if max_dt.month != min_dt.month: dt = max_dt
dt = max_dt power_filename = f'{args.input_power_timeseries_prefix}/{dt.year - 2000:02d}-{dt.month:02d}_power_total.parquet'
power_filename = f'{args.input_power_timeseries_prefix}/{dt.year - 2000:02d}-{dt.month:02d}_power_total.parquet' next_month_power_df = pd.read_parquet(power_filename)
next_month_power_df = pd.read_parquet(power_filename) next_month_power_df = next_month_power_df[(next_month_power_df['timestamp'] >= min_dt) & (next_month_power_df['timestamp'] <= max_dt)]
next_month_power_df = next_month_power_df[(next_month_power_df['timestamp'] >= min_dt) & (next_month_power_df['timestamp'] <= max_dt)] power_df = pd.concat([power_df, next_month_power_df])
power_df = pd.concat([power_df, next_month_power_df]) del next_month_power_df
del next_month_power_df
global POWER_DF
global POWER_DF POWER_DF = power_df.sort_values(by='timestamp')
POWER_DF = power_df.sort_values(by='timestamp') del power_df
del power_df
job_power_stats_futures = dict()
with ProcessPoolExecutor(max_workers=args.max_processes) as executor: with ProcessPoolExecutor(max_workers=args.max_processes) as executor:
for wload, wload_type in [(running_jobs, 'init_run'), (queued_jobs, 'init_queue'), (toreplay_jobs, 'main')]: for wload, wload_type in [(running_jobs, 'init_run'), (queued_jobs, 'init_queue'), (toreplay_jobs, 'main')]:
for _, row in wload.iterrows(): for _, row in wload.iterrows():
job_duration = min(int(args.max_job_duration), int((row['end_time'] - row['start_time']).total_seconds())) job_duration = min(int(args.max_job_duration), int((row['end_time'] - row['start_time']).total_seconds()))
assert job_duration > 0 assert job_duration > 0
job_starts_seconds_since_m100_epoch = int((row['start_time'] - EPOCH_M100).total_seconds())
job_profile = f'{row["job_id"]}_{job_starts_seconds_since_m100_epoch}'
job_profile_dir_suffix = f'jobs/{row["job_id"]}_{job_starts_seconds_since_m100_epoch}'
job_profile_dir = f'{args.output_dir}/{job_profile_dir_suffix}'
extra_data = { extra_data = {
'workload_type': wload_type, 'workload_type': wload_type,
'zero_power_estimation': float(row['zero_power_estimation']), 'zero_power_estimation': float(row['zero_power_estimation']),
'mean_power_estimation': float(row['mean_power_estimation']), 'mean_power_estimation': float(row['mean_power_estimation'] * row['num_nodes']),
'max_power_estimation': float(row['max_power_estimation']), 'max_power_estimation': float(row['max_power_estimation'] * row['num_nodes']),
'upper_bound_power_estimation': float(row['upper_bound_power_estimation']), 'upper_bound_power_estimation': float(row['upper_bound_power_estimation']),
'job_details_filepath': job_profile_dir_suffix,
} }
job_walltime = determine_walltime(row) job_walltime = determine_walltime(row)
job_min_dt = row['start_time']
job_max_dt = job_min_dt + datetime.timedelta(seconds=job_duration)
job_nodes = [str(x) for x in eval(row['nodes'])] # yay!
job_profile = None job_profile = None
match args.profile_type: match args.profile_type:
case 'delay': case 'delay':
job_profile = f'delay{job_duration}' job_profile = f'delay{job_duration}'
delay_profiles_durations.add(job_duration) delay_profiles_durations.add(job_duration)
case 'usage_replay':
job_starts_seconds_since_m100_epoch = int((row['start_time'] - EPOCH_M100).total_seconds())
job_profile = f'{row["job_id"]}_{job_starts_seconds_since_m100_epoch}'
job_profile_dir_suffix = f'jobs/{row["job_id"]}_{job_starts_seconds_since_m100_epoch}'
job_profile_dir = f'{args.output_dir}/{job_profile_dir_suffix}'
# compute the power consumption of the job over time in parallel, and write it into a CSV file
job_power_stats_futures[str(row['job_id'])] = executor.submit(generate_job_power_time_series, job_profile_dir, job_min_dt, job_max_dt, job_nodes, args.min_power)
case 'usage_replay':
profiles[job_profile] = { profiles[job_profile] = {
'type': 'trace_replay', 'type': 'trace_replay',
'trace_type': 'usage', 'trace_type': 'usage',
...@@ -147,35 +177,42 @@ def main(): ...@@ -147,35 +177,42 @@ def main():
if not os.path.exists(job_profile_dir): if not os.path.exists(job_profile_dir):
os.makedirs(job_profile_dir, exist_ok=True) os.makedirs(job_profile_dir, exist_ok=True)
# write the main entry actions file # write the main entry actions file (small)
with open(f'{job_profile_dir}/traces.txt', 'w') as f: with open(f'{job_profile_dir}/traces.txt', 'w') as f:
for node_id in range(int(row['num_nodes'])): for node_id in range(int(row['num_nodes'])):
f.write(f'node{node_id}.txt\n') f.write(f'node{node_id}.txt\n')
job_min_dt = row['start_time'] # write the node action files in parallel
job_max_dt = job_min_dt + datetime.timedelta(seconds=job_duration)
job_nodes = eval(row['nodes'])
for node_id, job_node in enumerate(job_nodes): for node_id, job_node in enumerate(job_nodes):
executor.submit(generate_node_file, job_profile_dir, job_min_dt, job_max_dt, job_node, node_id, args.max_power, args.min_power, args.host_flops) executor.submit(generate_node_file, job_profile_dir, job_min_dt, job_max_dt, job_node, node_id, args.max_power, args.min_power, args.host_flops)
jobs.append({ jobs_dict[str(row['job_id'])] = {
'id': str(row['job_id']), 'id': str(row['job_id']),
'subtime': int((row['submit_time'] - args.begin).total_seconds()), 'subtime': int((row['submit_time'] - args.begin).total_seconds()),
'walltime': int(job_walltime), 'walltime': int(job_walltime),
'res': int(row['num_nodes']), 'res': int(row['num_nodes']),
'profile': job_profile, 'profile': job_profile,
'extra_data': extra_data, 'extra_data': extra_data,
})
match args.profile_type:
case 'delay':
for duration in delay_profiles_durations:
profiles[f'delay{duration}'] = {
'type': 'delay',
'delay': int(duration),
} }
match args.profile_type:
case 'delay':
for duration in delay_profiles_durations:
profiles[f'delay{duration}'] = {
'type': 'delay',
'delay': int(duration),
}
for job_id in jobs_dict:
job_real_mean_power, job_real_max_power = job_power_stats_futures[job_id].result()
job = jobs_dict[job_id]
job['extra_data']['real_mean_power_estimation'] = job_real_mean_power
job['extra_data']['real_max_power_estimation'] = job_real_max_power
jobs.append(job)
os.makedirs(args.output_dir, exist_ok=True) os.makedirs(args.output_dir, exist_ok=True)
with open(f'{args.output_dir}/{wload_name}', 'w') as f: with open(f'{args.output_dir}/{wload_name}', 'w') as f:
f.write(f'''{{ f.write(f'''{{
...@@ -186,3 +223,20 @@ def main(): ...@@ -186,3 +223,20 @@ def main():
"jobs": {json.dumps(jobs, indent=4)}, "jobs": {json.dumps(jobs, indent=4)},
"profiles": {json.dumps(profiles, indent=4, sort_keys=True)} "profiles": {json.dumps(profiles, indent=4, sort_keys=True)}
}}''') }}''')
match args.profile_type:
case 'delay':
job_watts_list = list()
for job in jobs:
job_watts_list.append({
'id': job['id'],
'res': job['res'],
'zero': job['extra_data']['zero_power_estimation'],
'mean': job['extra_data']['mean_power_estimation'],
'max': job['extra_data']['max_power_estimation'],
'real_mean': job['extra_data']['real_mean_power_estimation'],
'real_max': job['extra_data']['real_max_power_estimation'],
'upper_bound': job['extra_data']['upper_bound_power_estimation'],
})
df = pd.DataFrame.from_records(job_watts_list)
df.to_csv(f'{args.output_dir}/{wload_prefix}_input_watts.csv', index=False)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment