Skip to content
Snippets Groups Projects
Commit 0e4f203a authored by Maël Madon's avatar Maël Madon
Browse files

copy swf parsers from other projects

parent 9cc09ae9
No related branches found
No related tags found
No related merge requests found
swf.py 0 → 100644
#!/usr/bin/env python3
"""SWF types and functions."""
from enum import Enum, unique
@unique
class SwfField(Enum):
"""Maps SWF columns and their meaning."""
JOB_ID = 1
SUBMIT_TIME = 2
WAIT_TIME = 3
RUN_TIME = 4
ALLOCATED_PROCESSOR_COUNT = 5
AVERAGE_CPU_TIME_USED = 6
USED_MEMORY = 7
REQUESTED_NUMBER_OF_PROCESSORS = 8
REQUESTED_TIME = 9
REQUESTED_MEMORY = 10
STATUS = 11
USER_ID = 12
GROUP_ID = 13
APPLICATION_ID = 14
QUEUD_ID = 15
PARTITION_ID = 16
PRECEDING_JOB_ID = 17
THINK_TIME_FROM_PRECEDING_JOB = 18
#!/usr/bin/env python3
"""
SWF parser to make selections and obtain some stats.
Inspired from https://gitlab.inria.fr/batsim/batsim/-/blob/master/tools/swf_to_batsim_workload_compute_only.py
"""
import argparse
import json
import re
from swf import SwfField
def generate_workload(input_swf, output_swf=None,
partitions_to_select=None,
job_walltime_factor=2,
given_walltime_only=False,
job_grain=1,
platform_size=None,
indent=None,
translate_submit_times=False,
keep_only=None,
verbose=False,
quiet=False,
job_size_function_string='1*nb_res'):
"""Makes a selection from a SWF trace, optionally outputing it as SWF."""
element = '([-+]?\d+(?:\.\d+)?)'
r = re.compile('\s*' + (element + '\s+') * 17 + element + '\s*')
current_id = 0
# Some counters...
not_selected = {"nb": 0, "coreh": 0}
selected = {"nb": 0, "coreh": 0}
not_valid = 0
not_line_match_format = 0
users = []
minimum_observed_submit_time = float('inf')
# Let's loop over the lines of the input file
i = 1
for line in input_swf:
i += 1
if i % 100000 == 0:
print("Processing swf line", i)
res = r.match(line)
if res:
# Parsing...
job_id = (int(float(res.group(SwfField.JOB_ID.value))))
nb_res = int(
float(res.group(SwfField.REQUESTED_NUMBER_OF_PROCESSORS.value)))
run_time = float(res.group(SwfField.RUN_TIME.value))
submit_time = max(0, float(res.group(SwfField.SUBMIT_TIME.value)))
walltime = max(job_walltime_factor * run_time,
float(res.group(SwfField.REQUESTED_TIME.value)))
user_id = str(res.group(SwfField.USER_ID.value))
partition_id = int(res.group(SwfField.PARTITION_ID.value))
# nb_res may be changed by calling a user-given function
nb_res = eval(job_size_function_string)
if given_walltime_only:
walltime = float(res.group(SwfField.REQUESTED_TIME.value))
# Select jobs to keep
is_valid_job = (nb_res > 0 and walltime >
run_time and run_time > 0 and submit_time >= 0)
select_partition = ((partitions_to_select is None) or
(partition_id in partitions_to_select))
use_job = select_partition and (
(keep_only is None) or eval(keep_only))
if not is_valid_job:
not_valid += 1
if not use_job:
not_selected["nb"] += 1
not_selected["coreh"] += run_time * nb_res
else:
# Increment counters
selected["nb"] += 1
selected["coreh"] += run_time * nb_res
if user_id not in users:
users.append(user_id)
# Output in the swf
if output_swf is not None:
output_swf.write(line)
else:
not_line_match_format += 1
print('-------------------\nEnd parsing')
print('Total {} jobs and {} users have been created.'.format(
selected["nb"], len(users)))
print(
'Total number of core-hours: {:.0f}'.format(selected["coreh"] / 3600))
print('{} valid jobs were not selected (keep_only) for {:.0f} core-hour'.format(
not_selected["nb"], not_selected["coreh"] / 3600))
print("Jobs not selected: {:.1f}% in number, {:.1f}% in core-hour"
.format(not_selected["nb"] / (not_selected["nb"]+selected["nb"]) * 100,
not_selected["coreh"] / (selected["coreh"]+not_selected["coreh"]) * 100))
print('{} out of {} lines in the file did not match the swf format'.format(
not_line_match_format, i))
print('{} jobs were not valid'.format(not_valid))
def main():
"""
Program entry point.
Parses the input arguments then calls generate_flat_platform.
"""
parser = argparse.ArgumentParser(
description='Reads a SWF (Standard Workload Format) file and outputs some stats')
parser.add_argument('input_swf', type=argparse.FileType('r'),
help='The input SWF file')
parser.add_argument('-o', '--output_swf',
type=argparse.FileType('w'), default=None,
help='The optional output SWF file')
parser.add_argument('-sp', '--partitions_to_select',
type=int, nargs='+', default=None,
help='List of partitions to only consider in the input trace. The jobs running in the other partitions will be discarded.')
parser.add_argument('-jsf', '--job-size-function',
type=str,
default='1*nb_res',
help='The function to apply on the jobs size. '
'The identity is used by default.')
parser.add_argument('-jwf', '--job_walltime_factor',
type=float, default=2,
help='Jobs walltimes are computed by the formula max(givenWalltime, jobWalltimeFactor*givenRuntime)')
parser.add_argument('-gwo', '--given_walltime_only',
action="store_true",
help='If set, only the given walltime in the trace will be used')
parser.add_argument('-jg', '--job_grain',
type=int, default=1,
help='Selects the level of detail we want for jobs. This parameter is used to group jobs that have close running time')
parser.add_argument('-pf', '--platform_size', type=int, default=None,
help='If set, the number of machines to put in the output JSON files is set by this parameter instead of taking the maximum job size')
parser.add_argument('-i', '--indent', type=int, default=None,
help='If set to a non-negative integer, then JSON array elements and object members will be pretty-printed with that indent level. An indent level of 0, or negative, will only insert newlines. The default value (None) selects the most compact representation.')
parser.add_argument('-t', '--translate_submit_times',
action="store_true",
help="If set, the jobs' submit times will be translated towards 0")
parser.add_argument('--keep_only',
type=str,
default=None,
help='If set, this parameter is evaluated to choose which jobs should be kept')
group = parser.add_mutually_exclusive_group()
group.add_argument("-v", "--verbose", action="store_true")
group.add_argument("-q", "--quiet", action="store_true")
args = parser.parse_args()
generate_workload(input_swf=args.input_swf,
output_swf=args.output_swf,
partitions_to_select=args.partitions_to_select,
job_walltime_factor=args.job_walltime_factor,
given_walltime_only=args.given_walltime_only,
job_grain=args.job_grain,
platform_size=args.platform_size,
indent=args.indent,
translate_submit_times=args.translate_submit_times,
keep_only=args.keep_only,
verbose=args.verbose,
quiet=args.quiet,
job_size_function_string=args.job_size_function)
if __name__ == "__main__":
main()
#!/usr/bin/env python3
"""
Transforms a SWF to a Batsim workload with computation-only jobs.
Split by user. Do not include the jobs profiles in the output JSON.
(optional) keeps only a given set of partition.
"""
import argparse
import json
import re
import sys
import datetime
import os
from swf import SwfField
def generate_workload(input_swf, output_folder,
partitions_to_select=None,
start_time=None,
job_walltime_factor=2,
given_walltime_only=False,
job_grain=1,
indent=None,
keep_only=None,
verbose=False,
quiet=False,
job_size_function_string='1*nb_res'):
"""Generate a Batsim workload from a SWF trace."""
print(f"Input file = {input_swf}")
print(f"output folder = {output_folder}")
element = '([-+]?\d+(?:\.\d+)?)'
r = re.compile('\s*' + (element + '\s+') * 17 + element + '\s*')
current_id = 0
version = 0
# A dictionary of users, each entry containing the jobs and the profiles
# used by this user
users = {}
# Let a job be a tuple (job_id, nb_res, run_time, submit_time, profile,
# walltime)
jobs = []
# Some job counters
not_selected = {"nb": 0, "coreh": 0}
selected = {"nb": 0, "coreh": 0}
not_valid = 0
not_line_match_format = 0
minimum_observed_submit_time = float('inf')
# Let's loop over the lines of the input file
i = 1
with open(input_swf, 'r') as swf:
for line in swf:
i += 1
if i % 100000 == 0:
print("Processing swf line", i)
res = r.match(line)
if res:
# Parsing...
job_id = (int(float(res.group(SwfField.JOB_ID.value))))
nb_res = int(
float(res.group(SwfField.REQUESTED_NUMBER_OF_PROCESSORS.value)))
run_time = float(res.group(SwfField.RUN_TIME.value))
submit_time = max(0, float(res.group(SwfField.SUBMIT_TIME.value)))
walltime = max(job_walltime_factor * run_time,
float(res.group(SwfField.REQUESTED_TIME.value)))
user_id = str(res.group(SwfField.USER_ID.value))
partition_id = int(res.group(SwfField.PARTITION_ID.value))
# nb_res may be changed by calling a user-given function
nb_res = eval(job_size_function_string)
if given_walltime_only:
walltime = float(res.group(SwfField.REQUESTED_TIME.value))
# Select jobs to keep
is_valid_job = (nb_res > 0 and walltime >
run_time and run_time > 0 and submit_time >= 0)
select_partition = ((partitions_to_select is None) or
(partition_id in partitions_to_select))
use_job = select_partition and (
(keep_only is None) or eval(keep_only))
if not is_valid_job:
not_valid += 1
if not use_job:
not_selected["nb"] += 1
not_selected["coreh"] += run_time * nb_res
else:
selected["nb"] += 1
selected["coreh"] += run_time * nb_res
if not(users.__contains__(user_id)):
users[user_id] = {}
users[user_id]["jobs"] = []
profile = int(((run_time // job_grain) + 1) * job_grain)
job = (current_id, nb_res, run_time,
submit_time, profile, walltime)
current_id = current_id + 1
minimum_observed_submit_time = min(minimum_observed_submit_time,
submit_time)
users[user_id]["jobs"].append(job)
else:
not_line_match_format += 1
# Create a json file per user
if not os.path.exists(output_folder):
os.makedirs(output_folder)
if start_time is None:
translate_submit_times = minimum_observed_submit_time
else:
translate_submit_times = start_time
for user_id in users:
jobs = users[user_id]["jobs"]
# Export JSON
# Let's generate a list of dictionaries for the jobs
djobs = list()
for (job_id, nb_res, run_time, submit_time, profile, walltime) in jobs:
djobs.append({'id': job_id,
'subtime': submit_time - translate_submit_times,
'walltime': walltime,
'res': nb_res,
'profile': str(profile)})
biggest_job = max([nb_res for (job_id, nb_res, run_time, submit_time,
profile, walltime) in jobs])
platform_size = biggest_job
data = {
'version': version,
'command': ' '.join(sys.argv[:]),
'date': datetime.datetime.now().isoformat(' '),
'description': 'Workload for user {}'.format(user_id),
'nb_res': platform_size,
'jobs': djobs}
try:
output_json = output_folder + "/user" + user_id + ".json"
outFile = open(output_json, 'w')
json.dump(data, outFile, indent=indent, sort_keys=True)
if not quiet:
print('user {}:'.format(user_id))
print(' {} jobs had been created'.format(len(djobs)))
if keep_only:
print(' {} jobs have been removed by keep_only'.format(
len(jobs) - len(djobs)))
except IOError:
print('Cannot write file', output_json)
print('-------------------\nEnd parsing')
print('Total {} jobs and {} users have been created.'.format(
selected["nb"], len(users)))
print(
'Total number of core-hours: {:.0f}'.format(selected["coreh"] / 3600))
print('{} valid jobs were not selected (keep_only) for {:.0f} core-hour'.format(
not_selected["nb"], not_selected["coreh"] / 3600))
print("Jobs not selected: {:.1f}% in number, {:.1f}% in core-hour"
.format(not_selected["nb"] / (not_selected["nb"]+selected["nb"]) * 100,
not_selected["coreh"] / (selected["coreh"]+not_selected["coreh"]) * 100))
print('{} out of {} lines in the file did not match the swf format'.format(
not_line_match_format, i))
print('{} jobs were not valid'.format(not_valid))
def main():
"""
Program entry point.
Parses the input arguments then calls generate_flat_platform.
"""
parser = argparse.ArgumentParser(
description='Reads a SWF (Standard Workload Format) file and transform it into a JSON Batsim workload (with delay jobs)')
parser.add_argument('input_swf', type=str,
help='The input SWF file')
parser.add_argument('output_folder', type=str,
help='The output folder for the JSON files')
parser.add_argument('-sp', '--partitions_to_select',
type=int, nargs='+', default=None,
help='List of partitions to only consider in the input trace. The jobs running in the other partitions will be discarded.')
parser.add_argument('--start_time', type=int, default=None,
help='If set, the submit times will be translated towards zero by this value. Otherwise, the first job sets the zero.')
parser.add_argument('-jsf', '--job-size-function',
type=str,
default='1*nb_res',
help='The function to apply on the jobs size. '
'The identity is used by default.')
parser.add_argument('-jwf', '--job_walltime_factor',
type=float, default=2,
help='Jobs walltimes are computed by the formula max(givenWalltime, jobWalltimeFactor*givenRuntime)')
parser.add_argument('-gwo', '--given_walltime_only',
action="store_true",
help='If set, only the given walltime in the trace will be used')
parser.add_argument('-jg', '--job_grain',
type=int, default=1,
help='Selects the level of detail we want for jobs. This parameter is used to group jobs that have close running time')
parser.add_argument('-i', '--indent', type=int, default=None,
help='If set to a non-negative integer, then JSON array elements and object members will be pretty-printed with that indent level. An indent level of 0, or negative, will only insert newlines. The default value (None) selects the most compact representation.')
parser.add_argument('--keep_only',
type=str,
default=None,
help='If set, this parameter is evaluated to choose which jobs should be kept')
group = parser.add_mutually_exclusive_group()
group.add_argument("-v", "--verbose", action="store_true")
group.add_argument("-q", "--quiet", action="store_true")
args = parser.parse_args()
generate_workload(input_swf=args.input_swf,
output_folder=args.output_folder,
partitions_to_select=args.partitions_to_select,
start_time=args.start_time,
job_walltime_factor=args.job_walltime_factor,
given_walltime_only=args.given_walltime_only,
job_grain=args.job_grain,
indent=args.indent,
keep_only=args.keep_only,
verbose=args.verbose,
quiet=args.quiet,
job_size_function_string=args.job_size_function)
if __name__ == "__main__":
main()
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment