Skip to content
Snippets Groups Projects
Commit 324816dc authored by Maël Madon's avatar Maël Madon
Browse files

renaming file

parent 1f372813
No related branches found
No related tags found
No related merge requests found
#!/usr/bin/env python3
"""
Transforms a SWF to a Batsim workload with computation-only jobs.
Split by user. Do not include the jobs profiles in the output JSON.
"""
import argparse
import json
import re
import sys
import datetime
import os
from swf import SwfField
def generate_workload(input_swf, output_folder,
start_time=None,
job_walltime_factor=2,
given_walltime_only=False,
job_grain=1,
indent=None,
quiet=False,
job_size_function_string='1*nb_res'):
"""Generate a Batsim workload from a SWF trace."""
if not quiet:
print(f"Input file = {input_swf}")
print(f"output folder = {output_folder}")
element = '([-+]?\d+(?:\.\d+)?)'
r = re.compile('\s*' + (element + '\s+') * 17 + element + '\s*')
version = 0
# A dictionary of users, each entry containing the its jobs
users = {}
# Some job counters
not_valid = 0
not_line_match_format = 0
minimum_observed_submit_time = float('inf')
# Let's loop over the lines of the input file
i = 0
with open(input_swf, 'r') as swf:
for line in swf:
i += 1
if not quiet and i % 10000 == 0:
print(f"\r\033[KProcessing swf line {i}...", end="")
res = r.match(line)
if res:
# Parsing...
job_id = (int(float(res.group(SwfField.JOB_ID.value))))
nb_res = int(
float(res.group(SwfField.REQUESTED_NUMBER_OF_PROCESSORS.value)))
run_time = float(res.group(SwfField.RUN_TIME.value))
submit_time = max(
0, float(res.group(SwfField.SUBMIT_TIME.value)))
walltime = max(job_walltime_factor * run_time,
float(res.group(SwfField.REQUESTED_TIME.value)))
user_id = int(res.group(SwfField.USER_ID.value))
# nb_res may be changed by calling a user-given function
nb_res = eval(job_size_function_string)
if given_walltime_only:
walltime = float(res.group(SwfField.REQUESTED_TIME.value))
is_valid_job = (nb_res > 0 and walltime >
run_time and run_time > 0 and submit_time >= 0)
if not is_valid_job:
not_valid += 1
else:
if user_id not in users:
users[user_id] = []
profile = int(((run_time // job_grain) + 1) * job_grain)
job = (job_id, nb_res, run_time,
submit_time, profile, walltime)
minimum_observed_submit_time = min(minimum_observed_submit_time,
submit_time)
users[user_id].append(job)
else:
not_line_match_format += 1
if not quiet:
print("\nEnd parsing.")
# Create a json file per user
if not os.path.exists(output_folder):
os.makedirs(output_folder)
if start_time is None:
translate_submit_times = minimum_observed_submit_time
else:
translate_submit_times = start_time
for user_id, jobs in users.items():
# Export JSON
# Let's generate a list of dictionaries for the jobs
djobs = list()
for (job_id, nb_res, run_time, submit_time, profile, walltime) in jobs:
djobs.append({'id': job_id,
'subtime': submit_time - translate_submit_times,
'walltime': walltime,
'res': nb_res,
'profile': str(profile)})
biggest_job = max([nb_res for (_, nb_res, _, _, _, _) in jobs])
data = {
'version': version,
'command': ' '.join(sys.argv[:]),
'date': datetime.datetime.now().isoformat(' '),
'description': f'Workload for user {user_id}',
'nb_res': biggest_job,
'jobs': djobs}
try:
output_json = f"{output_folder}/user{user_id}.json"
out_file = open(output_json, 'w')
json.dump(data, out_file, indent=indent, sort_keys=True)
if not quiet:
print(f"user{user_id:3d}: {len(djobs):10d} jobs")
except IOError:
print('Cannot write file', output_json)
if not quiet:
print('-------------------')
print(f"Total {sum([len(jobs) for jobs in users.values()])} jobs and "
f"{len(users)} users have been created.")
print(f"{not_line_match_format} out of {i} lines in the file did not "
"match the swf format")
print(f"{not_valid} jobs were not valid")
def main():
"""
Program entry point.
Parses the input arguments then calls generate_flat_platform.
"""
parser = argparse.ArgumentParser(
description='Reads a SWF (Standard Workload Format) file and transform it into a JSON Batsim workload (with delay jobs)')
parser.add_argument('input_swf', type=str,
help='The input SWF file')
parser.add_argument('output_folder', type=str,
help='The output folder for the JSON files')
parser.add_argument('--start_time', type=int, default=None,
help='If set, the submit times will be translated towards zero by this value. Otherwise, the first job sets the zero.')
parser.add_argument('-jsf', '--job-size-function',
type=str,
default='1*nb_res',
help='The function to apply on the jobs size. '
'The identity is used by default.')
parser.add_argument('-jwf', '--job_walltime_factor',
type=float, default=2,
help='Jobs walltimes are computed by the formula max(givenWalltime, jobWalltimeFactor*givenRuntime)')
parser.add_argument('-gwo', '--given_walltime_only',
action="store_true",
help='If set, only the given walltime in the trace will be used')
parser.add_argument('-jg', '--job_grain',
type=int, default=1,
help='Selects the level of detail we want for jobs. This parameter is used to group jobs that have close running time')
parser.add_argument('-i', '--indent', type=int, default=None,
help='If set to a non-negative integer, then JSON array elements and object members will be pretty-printed with that indent level. An indent level of 0, or negative, will only insert newlines. The default value (None) selects the most compact representation.')
parser.add_argument("-q", "--quiet", action="store_true")
args = parser.parse_args()
generate_workload(input_swf=args.input_swf,
output_folder=args.output_folder,
start_time=args.start_time,
job_walltime_factor=args.job_walltime_factor,
given_walltime_only=args.given_walltime_only,
job_grain=args.job_grain,
indent=args.indent,
quiet=args.quiet,
job_size_function_string=args.job_size_function)
if __name__ == "__main__":
main()
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment