-
Maël Madon authoredMaël Madon authored
swf_to_batsim_split_by_user.py 9.58 KiB
#!/usr/bin/env python3
"""
Transforms a SWF to a Batsim workload with computation-only jobs.
Split by user. Do not include the jobs profiles in the output JSON.
(optional) keeps only a given set of partition.
"""
import argparse
import json
import re
import sys
import datetime
import os
from swf import SwfField
def generate_workload(input_swf, output_folder,
partitions_to_select=None,
start_time=None,
job_walltime_factor=2,
given_walltime_only=False,
job_grain=1,
indent=None,
keep_only=None,
verbose=False,
quiet=False,
job_size_function_string='1*nb_res'):
"""Generate a Batsim workload from a SWF trace."""
element = '([-+]?\d+(?:\.\d+)?)'
r = re.compile('\s*' + (element + '\s+') * 17 + element + '\s*')
current_id = 0
version = 0
# A dictionary of users, each entry containing the jobs and the profiles
# used by this user
users = {}
# Let a job be a tuple (job_id, nb_res, run_time, submit_time, profile,
# walltime)
jobs = []
# Some job counters
not_selected = {"nb": 0, "coreh": 0}
selected = {"nb": 0, "coreh": 0}
not_valid = 0
not_line_match_format = 0
minimum_observed_submit_time = float('inf')
# Let's loop over the lines of the input file
i = 1
for line in input_swf:
i += 1
if i % 100000 == 0:
print("Processing swf line", i)
res = r.match(line)
if res:
# Parsing...
job_id = (int(float(res.group(SwfField.JOB_ID.value))))
nb_res = int(
float(res.group(SwfField.REQUESTED_NUMBER_OF_PROCESSORS.value)))
run_time = float(res.group(SwfField.RUN_TIME.value))
submit_time = max(0, float(res.group(SwfField.SUBMIT_TIME.value)))
walltime = max(job_walltime_factor * run_time,
float(res.group(SwfField.REQUESTED_TIME.value)))
user_id = str(res.group(SwfField.USER_ID.value))
partition_id = int(res.group(SwfField.PARTITION_ID.value))
# nb_res may be changed by calling a user-given function
nb_res = eval(job_size_function_string)
if given_walltime_only:
walltime = float(res.group(SwfField.REQUESTED_TIME.value))
# Select jobs to keep
is_valid_job = (nb_res > 0 and walltime >
run_time and run_time > 0 and submit_time >= 0)
select_partition = ((partitions_to_select is None) or
(partition_id in partitions_to_select))
use_job = select_partition and (
(keep_only is None) or eval(keep_only))
if not is_valid_job:
not_valid += 1
if not use_job:
not_selected["nb"] += 1
not_selected["coreh"] += run_time * nb_res
else:
selected["nb"] += 1
selected["coreh"] += run_time * nb_res
if not(users.__contains__(user_id)):
users[user_id] = {}
users[user_id]["jobs"] = []
profile = int(((run_time // job_grain) + 1) * job_grain)
job = (current_id, nb_res, run_time,
submit_time, profile, walltime)
current_id = current_id + 1
minimum_observed_submit_time = min(minimum_observed_submit_time,
submit_time)
users[user_id]["jobs"].append(job)
else:
not_line_match_format += 1
# Create a json file per user
if not os.path.exists(output_folder):
os.makedirs(output_folder)
if start_time is None:
translate_submit_times = minimum_observed_submit_time
else:
translate_submit_times = start_time
for user_id in users:
jobs = users[user_id]["jobs"]
# Export JSON
# Let's generate a list of dictionaries for the jobs
djobs = list()
for (job_id, nb_res, run_time, submit_time, profile, walltime) in jobs:
djobs.append({'id': job_id,
'subtime': submit_time - translate_submit_times,
'walltime': walltime,
'res': nb_res,
'profile': str(profile)})
biggest_job = max([nb_res for (job_id, nb_res, run_time, submit_time,
profile, walltime) in jobs])
platform_size = biggest_job
data = {
'version': version,
'command': ' '.join(sys.argv[:]),
'date': datetime.datetime.now().isoformat(' '),
'description': 'Workload for user {}'.format(user_id),
'nb_res': platform_size,
'jobs': djobs}
try:
output_json = output_folder + "/user" + user_id + ".json"
outFile = open(output_json, 'w')
json.dump(data, outFile, indent=indent, sort_keys=True)
if not quiet:
print('user {}:'.format(user_id))
print(' {} jobs had been created'.format(len(djobs)))
if keep_only:
print(' {} jobs have been removed by keep_only'.format(
len(jobs) - len(djobs)))
except IOError:
print('Cannot write file', output_json)
print('-------------------\nEnd parsing')
print('Total {} jobs and {} users have been created.'.format(
selected["nb"], len(users)))
print(
'Total number of core-hours: {:.0f}'.format(selected["coreh"] / 3600))
print('{} valid jobs were not selected (keep_only) for {:.0f} core-hour'.format(
not_selected["nb"], not_selected["coreh"] / 3600))
print("Jobs not selected: {:.1f}% in number, {:.1f}% in core-hour"
.format(not_selected["nb"] / (not_selected["nb"]+selected["nb"]) * 100,
not_selected["coreh"] / (selected["coreh"]+not_selected["coreh"]) * 100))
print('{} out of {} lines in the file did not match the swf format'.format(
not_line_match_format, i))
print('{} jobs were not valid'.format(not_valid))
def main():
"""
Program entry point.
Parses the input arguments then calls generate_flat_platform.
"""
parser = argparse.ArgumentParser(
description='Reads a SWF (Standard Workload Format) file and transform it into a JSON Batsim workload (with delay jobs)')
parser.add_argument('input_swf', type=argparse.FileType('r'),
help='The input SWF file')
parser.add_argument('output_folder', type=str,
help='The output folder for the JSON files')
parser.add_argument('-sp', '--partitions_to_select',
type=int, nargs='+', default=None,
help='List of partitions to only consider in the input trace. The jobs running in the other partitions will be discarded.')
parser.add_argument('--start_time', type=int, default=None,
help='If set, the submit times will be translated towards zero by this value. Otherwise, the first job sets the zero.')
parser.add_argument('-jsf', '--job-size-function',
type=str,
default='1*nb_res',
help='The function to apply on the jobs size. '
'The identity is used by default.')
parser.add_argument('-jwf', '--job_walltime_factor',
type=float, default=2,
help='Jobs walltimes are computed by the formula max(givenWalltime, jobWalltimeFactor*givenRuntime)')
parser.add_argument('-gwo', '--given_walltime_only',
action="store_true",
help='If set, only the given walltime in the trace will be used')
parser.add_argument('-jg', '--job_grain',
type=int, default=1,
help='Selects the level of detail we want for jobs. This parameter is used to group jobs that have close running time')
parser.add_argument('-i', '--indent', type=int, default=None,
help='If set to a non-negative integer, then JSON array elements and object members will be pretty-printed with that indent level. An indent level of 0, or negative, will only insert newlines. The default value (None) selects the most compact representation.')
parser.add_argument('--keep_only',
type=str,
default=None,
help='If set, this parameter is evaluated to choose which jobs should be kept')
group = parser.add_mutually_exclusive_group()
group.add_argument("-v", "--verbose", action="store_true")
group.add_argument("-q", "--quiet", action="store_true")
args = parser.parse_args()
generate_workload(input_swf=args.input_swf,
output_folder=args.output_folder,
partitions_to_select=args.partitions_to_select,
start_time=args.start_time,
job_walltime_factor=args.job_walltime_factor,
given_walltime_only=args.given_walltime_only,
job_grain=args.job_grain,
indent=args.indent,
keep_only=args.keep_only,
verbose=args.verbose,
quiet=args.quiet,
job_size_function_string=args.job_size_function)
if __name__ == "__main__":
main()