Skip to content
Snippets Groups Projects
Commit 10e83aa9 authored by Millian Poquet's avatar Millian Poquet
Browse files

script: generate batsim workload (delay for now)

parent 888096cf
Branches
No related tags found
No related merge requests found
......@@ -56,6 +56,7 @@
];
propagatedBuildInputs = with pyPkgs; [
packages.fastparquet-2402
pyPkgs.sortedcontainers
];
};
fastparquet-2402 = pyPkgs.fastparquet.overrideAttrs(final: prev: rec {
......
#!/usr/bin/env python3
import argparse
import datetime
import json
import os
import pandas as pd
from sortedcontainers import SortedDict, SortedSet
EPOCH_PYTHON_DATETIME = datetime.datetime(year=1900, month=1, day=1)
EPOCH_M100 = datetime.datetime(year=2022, month=1, day=1)
def determine_walltime(row):
time_limit = None
try:
time_limit = datetime.datetime.strptime(row['time_limit_str'], '%H:%M:%S')
except ValueError:
try:
time_limit = datetime.datetime.strptime(row['time_limit_str'], '%M:%S')
except ValueError:
pass
if time_limit is None:
# could not parse the walltime value... setting it to the job execution duration in the trace + 5 minutes
walltime_delta = row['end_time'] - row['start_time'] + datetime.timedelta(minutes=5)
else:
walltime_delta = time_limit - EPOCH_PYTHON_DATETIME
return int(walltime_delta.total_seconds())
def main():
datetime_parser = lambda f: datetime.datetime.strptime(f, '%Y-%m-%d %H:%M:%S')
parser = argparse.ArgumentParser()
parser.add_argument("input_jobs", help='path to the CSV file that contains the jobs information')
parser.add_argument("--begin", required=True, type=datetime_parser, help='the starting datetime of the interval to replay. example: 2022-01-01 00:00:00')
parser.add_argument("--end", required=True, type=datetime_parser, help='the end datetime of the interval to replay. example: 2022-01-31 23:59:59')
parser.add_argument("-p", "--profile_type", choices=['delay'], required=True, help='the profile type to use')
parser.add_argument("-o", "--output_dir", required=True, help="filepath where all the workload-related files should be generated")
args = parser.parse_args()
begin_seconds_since_m100_epoch = int((args.begin - EPOCH_M100).total_seconds())
end_seconds_since_m100_epoch = int((args.end - EPOCH_M100).total_seconds())
wload_name = f'wload_{args.profile_type}_{begin_seconds_since_m100_epoch}.json'
input_df = pd.read_csv(args.input_jobs)
input_df['start_time'] = input_df['start_time'].apply(datetime_parser)
input_df['end_time'] = input_df['end_time'].apply(datetime_parser)
input_df['submit_time'] = input_df['submit_time'].apply(datetime_parser)
running_mask = (input_df['start_time'] < args.begin) & (input_df['end_time'] > args.begin)
running_jobs = input_df[running_mask].copy().reset_index()
running_jobs['start_time'] = args.begin # running jobs are truncated (only their end will be simulated)
running_jobs['submit_time'] = args.begin
queued_mask = (input_df['submit_time'] < args.begin) & (input_df['start_time'] > args.begin)
queued_jobs = input_df[queued_mask].copy().reset_index()
queued_jobs['submit_time'] = args.begin
toreplay_mask = (input_df['submit_time'] >= args.begin) & (input_df['submit_time'] < args.end)
toreplay_jobs = input_df[toreplay_mask].copy().reset_index()
jobs = list()
delay_profiles_durations = SortedSet()
profiles = SortedDict()
for wload, wload_type in [(running_jobs, 'init_run'), (queued_jobs, 'init_queue'), (toreplay_jobs, 'main')]:
for _, row in wload.iterrows():
job_duration = int((row['end_time'] - row['start_time']).total_seconds())
assert job_duration > 0
extra_data = {
'workload_type': wload_type,
'zero_power_estimation': float(row['zero_power_estimation']),
'mean_power_estimation': float(row['mean_power_estimation']),
'max_power_estimation': float(row['max_power_estimation']),
'upper_bound_power_estimation': float(row['upper_bound_power_estimation']),
}
job_walltime = determine_walltime(row)
job_profile = None
match args.profile_type:
case 'delay':
job_profile = f'delay{job_duration}'
delay_profiles_durations.add(job_duration)
jobs.append({
'id': str(row['job_id']),
'subtime': int((row['submit_time'] - args.begin).total_seconds()),
'walltime': int(job_walltime),
'res': int(row['num_nodes']),
'profile': job_profile,
'extra_data': extra_data,
})
match args.profile_type:
case 'delay':
for duration in delay_profiles_durations:
profiles[f'delay{duration}'] = {
'type': 'delay',
'delay': int(duration),
}
os.makedirs(args.output_dir, exist_ok=True)
with open(f'{args.output_dir}/{wload_name}è', 'w') as f:
f.write(f'''{{
"description": "Automatically generated from a M100 trace from '{args.begin}' to '{args.end}' using '{args.profile_type}' profile types",
"replay_begin_seconds_from_2022-01-01": {begin_seconds_since_m100_epoch},
"replay_end_seconds_from_2022-01-01": {end_seconds_since_m100_epoch},
"nb_res": 980,
"jobs": {json.dumps(jobs, indent=4)},
"profiles": {json.dumps(profiles, indent=4, sort_keys=True)}
}}''')
......@@ -5,6 +5,7 @@ build-backend = "setuptools.build_meta"
[project]
name = "expe_energumen"
authors = [{name = "Millian Poquet", email = "millian.poquet@univ-tlse3.fr"}]
dependencies = ["sortedcontainers"]
version = "0.1.0"
[project.scripts]
......@@ -14,3 +15,4 @@ m100-agg-danilo-estimations = "expe_energumen.m100_agg_danilo_estimations:agg_al
m100-agg-jobs-info = "expe_energumen.m100_agg_jobs_info:several_months"
m100-join-usable-jobs-info = "expe_energumen.m100_join_usable_jobs_info:join"
m100-generate-sg-platform = "expe_energumen.m100_generate_simgrid_platform:main"
m100-generate-batsim-workload = "expe_energumen.m100_generate_batsim_workload:main"
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment