Skip to content
Snippets Groups Projects
swf_to_batsim_split_by_user.py 9.86 KiB
#!/usr/bin/env python3

"""
Transforms a SWF to a Batsim workload with computation-only jobs. 
Split by user. Do not include the jobs profiles in the output JSON.
(optional) keeps only a given set of partition.
"""

import argparse
import json
import re
import sys
import datetime
import os

from scripts.swf import SwfField


def generate_workload(input_swf, output_folder,
                      partitions_to_select=None,
                      start_time=None,
                      job_walltime_factor=2,
                      given_walltime_only=False,
                      job_grain=1,
                      indent=None,
                      keep_only=None,
                      verbose=False,
                      quiet=False,
                      job_size_function_string='1*nb_res'):
    """Generate a Batsim workload from a SWF trace."""
    print(f"Input file = {input_swf}")
    print(f"output folder = {output_folder}")

    element = '([-+]?\d+(?:\.\d+)?)'
    r = re.compile('\s*' + (element + '\s+') * 17 + element + '\s*')

    current_id = 0
    version = 0

    # A dictionary of users, each entry containing the jobs and the profiles
    # used by this user
    users = {}
    # Let a job be a tuple (job_id, nb_res, run_time, submit_time, profile,
    # walltime)
    jobs = []

    # Some job counters
    not_selected = {"nb": 0, "coreh": 0}
    selected = {"nb": 0, "coreh": 0}
    not_valid = 0
    not_line_match_format = 0

    minimum_observed_submit_time = float('inf')

    # Let's loop over the lines of the input file
    i = 1
    with open(input_swf, 'r') as swf:
        for line in swf:
            i += 1
            if i % 100000 == 0:
                print("Processing swf line", i)

            res = r.match(line)

            if res:
                # Parsing...
                job_id = (int(float(res.group(SwfField.JOB_ID.value))))
                nb_res = int(
                    float(res.group(SwfField.REQUESTED_NUMBER_OF_PROCESSORS.value)))
                run_time = float(res.group(SwfField.RUN_TIME.value))
                submit_time = max(0, float(res.group(SwfField.SUBMIT_TIME.value)))
                walltime = max(job_walltime_factor * run_time,
                            float(res.group(SwfField.REQUESTED_TIME.value)))
                user_id = str(res.group(SwfField.USER_ID.value))
                partition_id = int(res.group(SwfField.PARTITION_ID.value))

                # nb_res may be changed by calling a user-given function
                nb_res = eval(job_size_function_string)

                if given_walltime_only:
                    walltime = float(res.group(SwfField.REQUESTED_TIME.value))

                # Select jobs to keep
                is_valid_job = (nb_res > 0 and walltime >
                                run_time and run_time > 0 and submit_time >= 0)
                select_partition = ((partitions_to_select is None) or
                                    (partition_id in partitions_to_select))
                use_job = select_partition and (
                    (keep_only is None) or eval(keep_only))

                if not is_valid_job:
                    not_valid += 1
                if not use_job:
                    not_selected["nb"] += 1
                    not_selected["coreh"] += run_time * nb_res

                else:
                    selected["nb"] += 1
                    selected["coreh"] += run_time * nb_res

                    if not(users.__contains__(user_id)):
                        users[user_id] = {}
                        users[user_id]["jobs"] = []

                    profile = int(((run_time // job_grain) + 1) * job_grain)

                    job = (current_id, nb_res, run_time,
                        submit_time, profile, walltime)
                    current_id = current_id + 1
                    minimum_observed_submit_time = min(minimum_observed_submit_time,
                                                    submit_time)
                    users[user_id]["jobs"].append(job)

            else:
                not_line_match_format += 1

    # Create a json file per user
    if not os.path.exists(output_folder):
        os.makedirs(output_folder)

    if start_time is None:
        translate_submit_times = minimum_observed_submit_time
    else:
        translate_submit_times = start_time

    for user_id in users:
        jobs = users[user_id]["jobs"]
        # Export JSON
        # Let's generate a list of dictionaries for the jobs
        djobs = list()
        for (job_id, nb_res, run_time, submit_time, profile, walltime) in jobs:
            djobs.append({'id': job_id,
                          'subtime': submit_time - translate_submit_times,
                          'walltime': walltime,
                          'res': nb_res,
                          'profile': str(profile)})

        biggest_job = max([nb_res for (job_id, nb_res, run_time, submit_time,
                                       profile, walltime) in jobs])

        platform_size = biggest_job

        data = {
            'version': version,
            'command': ' '.join(sys.argv[:]),
            'date': datetime.datetime.now().isoformat(' '),
            'description': 'Workload for user {}'.format(user_id),
            'nb_res': platform_size,
            'jobs': djobs}

        try:
            output_json = output_folder + "/user" + user_id + ".json"
            outFile = open(output_json, 'w')

            json.dump(data, outFile, indent=indent, sort_keys=True)

            if not quiet:
                print('user {}:'.format(user_id))
                print('   {} jobs had been created'.format(len(djobs)))
                if keep_only:
                    print('   {} jobs have been removed by keep_only'.format(
                        len(jobs) - len(djobs)))

        except IOError:
            print('Cannot write file', output_json)

    print('-------------------\nEnd parsing')
    print('Total {} jobs and {} users have been created.'.format(
        selected["nb"], len(users)))
    print(
        'Total number of core-hours: {:.0f}'.format(selected["coreh"] / 3600))
    print('{} valid jobs were not selected (keep_only) for {:.0f} core-hour'.format(
        not_selected["nb"], not_selected["coreh"] / 3600))
    print("Jobs not selected: {:.1f}% in number, {:.1f}% in core-hour"
          .format(not_selected["nb"] / (not_selected["nb"]+selected["nb"]) * 100,
                  not_selected["coreh"] / (selected["coreh"]+not_selected["coreh"]) * 100))
    print('{} out of {} lines in the file did not match the swf format'.format(
        not_line_match_format, i))
    print('{} jobs were not valid'.format(not_valid))


def main():
    """
    Program entry point.

    Parses the input arguments then calls generate_flat_platform.
    """
    parser = argparse.ArgumentParser(
        description='Reads a SWF (Standard Workload Format) file and transform it into a JSON Batsim workload (with delay jobs)')
    parser.add_argument('input_swf', type=str,
                        help='The input SWF file')
    parser.add_argument('output_folder', type=str,
                        help='The output folder for the JSON files')

    parser.add_argument('-sp', '--partitions_to_select',
                        type=int, nargs='+', default=None,
                        help='List of partitions to only consider in the input trace. The jobs running in the other partitions will be discarded.')

    parser.add_argument('--start_time', type=int, default=None,
                        help='If set, the submit times will be translated towards zero by this value. Otherwise, the first job sets the zero.')

    parser.add_argument('-jsf', '--job-size-function',
                        type=str,
                        default='1*nb_res',
                        help='The function to apply on the jobs size. '
                             'The identity is used by default.')
    parser.add_argument('-jwf', '--job_walltime_factor',
                        type=float, default=2,
                        help='Jobs walltimes are computed by the formula max(givenWalltime, jobWalltimeFactor*givenRuntime)')
    parser.add_argument('-gwo', '--given_walltime_only',
                        action="store_true",
                        help='If set, only the given walltime in the trace will be used')
    parser.add_argument('-jg', '--job_grain',
                        type=int, default=1,
                        help='Selects the level of detail we want for jobs. This parameter is used to group jobs that have close running time')
    parser.add_argument('-i', '--indent', type=int, default=None,
                        help='If set to a non-negative integer, then JSON array elements and object members will be pretty-printed with that indent level. An indent level of 0, or negative, will only insert newlines. The default value (None) selects the most compact representation.')
    parser.add_argument('--keep_only',
                        type=str,
                        default=None,
                        help='If set, this parameter is evaluated to choose which jobs should be kept')

    group = parser.add_mutually_exclusive_group()
    group.add_argument("-v", "--verbose", action="store_true")
    group.add_argument("-q", "--quiet", action="store_true")

    args = parser.parse_args()

    generate_workload(input_swf=args.input_swf,
                      output_folder=args.output_folder,
                      partitions_to_select=args.partitions_to_select,
                      start_time=args.start_time,
                      job_walltime_factor=args.job_walltime_factor,
                      given_walltime_only=args.given_walltime_only,
                      job_grain=args.job_grain,
                      indent=args.indent,
                      keep_only=args.keep_only,
                      verbose=args.verbose,
                      quiet=args.quiet,
                      job_size_function_string=args.job_size_function)


if __name__ == "__main__":
    main()