Skip to content
Snippets Groups Projects
Commit f7dbc8c8 authored by Maël Madon's avatar Maël Madon
Browse files

remove job filters from swf_to_batsim_split_by_user

parent 70a24b12
No related branches found
No related tags found
No related merge requests found
......@@ -3,7 +3,6 @@
"""
Transforms a SWF to a Batsim workload with computation-only jobs.
Split by user. Do not include the jobs profiles in the output JSON.
(optional) keeps only a given set of partition.
"""
import argparse
......@@ -17,49 +16,41 @@ from swf import SwfField
def generate_workload(input_swf, output_folder,
partitions_to_select=None,
start_time=None,
job_walltime_factor=2,
given_walltime_only=False,
job_grain=1,
indent=None,
keep_only=None,
verbose=False,
quiet=False,
job_size_function_string='1*nb_res'):
"""Generate a Batsim workload from a SWF trace."""
print(f"Input file = {input_swf}")
print(f"output folder = {output_folder}")
if not quiet:
print(f"Input file = {input_swf}")
print(f"output folder = {output_folder}")
element = '([-+]?\d+(?:\.\d+)?)'
r = re.compile('\s*' + (element + '\s+') * 17 + element + '\s*')
current_id = 0
version = 0
# A dictionary of users, each entry containing the jobs and the profiles
# used by this user
# A dictionary of users, each entry containing the its jobs
users = {}
# Let a job be a tuple (job_id, nb_res, run_time, submit_time, profile,
# walltime)
jobs = []
# Some job counters
not_selected = {"nb": 0, "coreh": 0}
selected = {"nb": 0, "coreh": 0}
not_valid = 0
not_line_match_format = 0
minimum_observed_submit_time = float('inf')
# Let's loop over the lines of the input file
i = 1
i = 0
with open(input_swf, 'r') as swf:
for line in swf:
i += 1
if i % 100000 == 0:
print("Processing swf line", i)
if not quiet and i % 10000 == 0:
print(f"\r\033[KProcessing swf line {i}...", end="")
res = r.match(line)
if res:
......@@ -72,7 +63,6 @@ def generate_workload(input_swf, output_folder,
walltime = max(job_walltime_factor * run_time,
float(res.group(SwfField.REQUESTED_TIME.value)))
user_id = str(res.group(SwfField.USER_ID.value))
partition_id = int(res.group(SwfField.PARTITION_ID.value))
# nb_res may be changed by calling a user-given function
nb_res = eval(job_size_function_string)
......@@ -80,39 +70,28 @@ def generate_workload(input_swf, output_folder,
if given_walltime_only:
walltime = float(res.group(SwfField.REQUESTED_TIME.value))
# Select jobs to keep
is_valid_job = (nb_res > 0 and walltime >
run_time and run_time > 0 and submit_time >= 0)
select_partition = ((partitions_to_select is None) or
(partition_id in partitions_to_select))
use_job = select_partition and (
(keep_only is None) or eval(keep_only))
if not is_valid_job:
not_valid += 1
if not use_job:
not_selected["nb"] += 1
not_selected["coreh"] += run_time * nb_res
else:
selected["nb"] += 1
selected["coreh"] += run_time * nb_res
if not(users.__contains__(user_id)):
users[user_id] = {}
users[user_id]["jobs"] = []
if user_id not in users:
users[user_id] = []
profile = int(((run_time // job_grain) + 1) * job_grain)
job = (current_id, nb_res, run_time,
job = (job_id, nb_res, run_time,
submit_time, profile, walltime)
current_id = current_id + 1
minimum_observed_submit_time = min(minimum_observed_submit_time,
submit_time)
users[user_id]["jobs"].append(job)
users[user_id].append(job)
else:
not_line_match_format += 1
if not quiet:
print("\nEnd parsing.")
# Create a json file per user
if not os.path.exists(output_folder):
......@@ -123,8 +102,7 @@ def generate_workload(input_swf, output_folder,
else:
translate_submit_times = start_time
for user_id in users:
jobs = users[user_id]["jobs"]
for user_id, jobs in users.items():
# Export JSON
# Let's generate a list of dictionaries for the jobs
djobs = list()
......@@ -135,17 +113,14 @@ def generate_workload(input_swf, output_folder,
'res': nb_res,
'profile': str(profile)})
biggest_job = max([nb_res for (job_id, nb_res, run_time, submit_time,
profile, walltime) in jobs])
platform_size = biggest_job
biggest_job = max([nb_res for (_, nb_res, _, _, _, _) in jobs])
data = {
'version': version,
'command': ' '.join(sys.argv[:]),
'date': datetime.datetime.now().isoformat(' '),
'description': 'Workload for user {}'.format(user_id),
'nb_res': platform_size,
'nb_res': biggest_job,
'jobs': djobs}
try:
......@@ -157,26 +132,18 @@ def generate_workload(input_swf, output_folder,
if not quiet:
print('user {}:'.format(user_id))
print(' {} jobs had been created'.format(len(djobs)))
if keep_only:
print(' {} jobs have been removed by keep_only'.format(
len(jobs) - len(djobs)))
except IOError:
print('Cannot write file', output_json)
print('-------------------\nEnd parsing')
print('Total {} jobs and {} users have been created.'.format(
selected["nb"], len(users)))
print(
'Total number of core-hours: {:.0f}'.format(selected["coreh"] / 3600))
print('{} valid jobs were not selected (keep_only) for {:.0f} core-hour'.format(
not_selected["nb"], not_selected["coreh"] / 3600))
print("Jobs not selected: {:.1f}% in number, {:.1f}% in core-hour"
.format(not_selected["nb"] / (not_selected["nb"]+selected["nb"]) * 100,
not_selected["coreh"] / (selected["coreh"]+not_selected["coreh"]) * 100))
print('{} out of {} lines in the file did not match the swf format'.format(
not_line_match_format, i))
print('{} jobs were not valid'.format(not_valid))
if not quiet:
print('-------------------')
print('Total {} jobs and {} users have been created.'.format(
sum([len(jobs) for jobs in users.values()]), len(users)))
print('{} out of {} lines in the file did not match the swf format'.format(
not_line_match_format, i))
print('{} jobs were not valid'.format(not_valid))
def main():
......@@ -192,10 +159,6 @@ def main():
parser.add_argument('output_folder', type=str,
help='The output folder for the JSON files')
parser.add_argument('-sp', '--partitions_to_select',
type=int, nargs='+', default=None,
help='List of partitions to only consider in the input trace. The jobs running in the other partitions will be discarded.')
parser.add_argument('--start_time', type=int, default=None,
help='If set, the submit times will be translated towards zero by this value. Otherwise, the first job sets the zero.')
......@@ -215,27 +178,18 @@ def main():
help='Selects the level of detail we want for jobs. This parameter is used to group jobs that have close running time')
parser.add_argument('-i', '--indent', type=int, default=None,
help='If set to a non-negative integer, then JSON array elements and object members will be pretty-printed with that indent level. An indent level of 0, or negative, will only insert newlines. The default value (None) selects the most compact representation.')
parser.add_argument('--keep_only',
type=str,
default=None,
help='If set, this parameter is evaluated to choose which jobs should be kept')
group = parser.add_mutually_exclusive_group()
group.add_argument("-v", "--verbose", action="store_true")
group.add_argument("-q", "--quiet", action="store_true")
parser.add_argument("-q", "--quiet", action="store_true")
args = parser.parse_args()
generate_workload(input_swf=args.input_swf,
output_folder=args.output_folder,
partitions_to_select=args.partitions_to_select,
start_time=args.start_time,
job_walltime_factor=args.job_walltime_factor,
given_walltime_only=args.given_walltime_only,
job_grain=args.job_grain,
indent=args.indent,
keep_only=args.keep_only,
verbose=args.verbose,
quiet=args.quiet,
job_size_function_string=args.job_size_function)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment