Skip to content
Snippets Groups Projects
swf_to_batsim_split_by_user.py 9.58 KiB
#!/usr/bin/env python3

"""
Transforms a SWF to a Batsim workload with computation-only jobs. 
Split by user. Do not include the jobs profiles in the output JSON.
(optional) keeps only a given set of partition.
"""

import argparse
import json
import re
import sys
import datetime
import os

from swf import SwfField


def generate_workload(input_swf, output_folder,
                      partitions_to_select=None,
                      start_time=None,
                      job_walltime_factor=2,
                      given_walltime_only=False,
                      job_grain=1,
                      indent=None,
                      keep_only=None,
                      verbose=False,
                      quiet=False,
                      job_size_function_string='1*nb_res'):
    """Generate a Batsim workload from a SWF trace."""
    element = '([-+]?\d+(?:\.\d+)?)'
    r = re.compile('\s*' + (element + '\s+') * 17 + element + '\s*')

    current_id = 0
    version = 0

    # A dictionary of users, each entry containing the jobs and the profiles
    # used by this user
    users = {}
    # Let a job be a tuple (job_id, nb_res, run_time, submit_time, profile,
    # walltime)
    jobs = []

    # Some job counters
    not_selected = {"nb": 0, "coreh": 0}
    selected = {"nb": 0, "coreh": 0}
    not_valid = 0
    not_line_match_format = 0

    minimum_observed_submit_time = float('inf')

    # Let's loop over the lines of the input file
    i = 1
    for line in input_swf:
        i += 1
        if i % 100000 == 0:
            print("Processing swf line", i)

        res = r.match(line)

        if res:
            # Parsing...
            job_id = (int(float(res.group(SwfField.JOB_ID.value))))
            nb_res = int(
                float(res.group(SwfField.REQUESTED_NUMBER_OF_PROCESSORS.value)))
            run_time = float(res.group(SwfField.RUN_TIME.value))
            submit_time = max(0, float(res.group(SwfField.SUBMIT_TIME.value)))
            walltime = max(job_walltime_factor * run_time,
                           float(res.group(SwfField.REQUESTED_TIME.value)))
            user_id = str(res.group(SwfField.USER_ID.value))
            partition_id = int(res.group(SwfField.PARTITION_ID.value))

            # nb_res may be changed by calling a user-given function
            nb_res = eval(job_size_function_string)

            if given_walltime_only:
                walltime = float(res.group(SwfField.REQUESTED_TIME.value))

            # Select jobs to keep
            is_valid_job = (nb_res > 0 and walltime >
                            run_time and run_time > 0 and submit_time >= 0)
            select_partition = ((partitions_to_select is None) or
                                (partition_id in partitions_to_select))
            use_job = select_partition and (
                (keep_only is None) or eval(keep_only))

            if not is_valid_job:
                not_valid += 1
            if not use_job:
                not_selected["nb"] += 1
                not_selected["coreh"] += run_time * nb_res

            else:
                selected["nb"] += 1
                selected["coreh"] += run_time * nb_res

                if not(users.__contains__(user_id)):
                    users[user_id] = {}
                    users[user_id]["jobs"] = []

                profile = int(((run_time // job_grain) + 1) * job_grain)

                job = (current_id, nb_res, run_time,
                       submit_time, profile, walltime)
                current_id = current_id + 1
                minimum_observed_submit_time = min(minimum_observed_submit_time,
                                                   submit_time)
                users[user_id]["jobs"].append(job)

        else:
            not_line_match_format += 1

    # Create a json file per user
    if not os.path.exists(output_folder):
        os.makedirs(output_folder)

    if start_time is None:
        translate_submit_times = minimum_observed_submit_time
    else:
        translate_submit_times = start_time

    for user_id in users:
        jobs = users[user_id]["jobs"]
        # Export JSON
        # Let's generate a list of dictionaries for the jobs
        djobs = list()
        for (job_id, nb_res, run_time, submit_time, profile, walltime) in jobs:
            djobs.append({'id': job_id,
                          'subtime': submit_time - translate_submit_times,
                          'walltime': walltime,
                          'res': nb_res,
                          'profile': str(profile)})

        biggest_job = max([nb_res for (job_id, nb_res, run_time, submit_time,
                                       profile, walltime) in jobs])

        platform_size = biggest_job

        data = {
            'version': version,
            'command': ' '.join(sys.argv[:]),
            'date': datetime.datetime.now().isoformat(' '),
            'description': 'Workload for user {}'.format(user_id),
            'nb_res': platform_size,
            'jobs': djobs}

        try:
            output_json = output_folder + "/user" + user_id + ".json"
            outFile = open(output_json, 'w')

            json.dump(data, outFile, indent=indent, sort_keys=True)

            if not quiet:
                print('user {}:'.format(user_id))
                print('   {} jobs had been created'.format(len(djobs)))
                if keep_only:
                    print('   {} jobs have been removed by keep_only'.format(
                        len(jobs) - len(djobs)))

        except IOError:
            print('Cannot write file', output_json)

    print('-------------------\nEnd parsing')
    print('Total {} jobs and {} users have been created.'.format(
        selected["nb"], len(users)))
    print(
        'Total number of core-hours: {:.0f}'.format(selected["coreh"] / 3600))
    print('{} valid jobs were not selected (keep_only) for {:.0f} core-hour'.format(
        not_selected["nb"], not_selected["coreh"] / 3600))
    print("Jobs not selected: {:.1f}% in number, {:.1f}% in core-hour"
          .format(not_selected["nb"] / (not_selected["nb"]+selected["nb"]) * 100,
                  not_selected["coreh"] / (selected["coreh"]+not_selected["coreh"]) * 100))
    print('{} out of {} lines in the file did not match the swf format'.format(
        not_line_match_format, i))
    print('{} jobs were not valid'.format(not_valid))


def main():
    """
    Program entry point.

    Parses the input arguments then calls generate_flat_platform.
    """
    parser = argparse.ArgumentParser(
        description='Reads a SWF (Standard Workload Format) file and transform it into a JSON Batsim workload (with delay jobs)')
    parser.add_argument('input_swf', type=argparse.FileType('r'),
                        help='The input SWF file')
    parser.add_argument('output_folder', type=str,
                        help='The output folder for the JSON files')

    parser.add_argument('-sp', '--partitions_to_select',
                        type=int, nargs='+', default=None,
                        help='List of partitions to only consider in the input trace. The jobs running in the other partitions will be discarded.')

    parser.add_argument('--start_time', type=int, default=None,
                        help='If set, the submit times will be translated towards zero by this value. Otherwise, the first job sets the zero.')

    parser.add_argument('-jsf', '--job-size-function',
                        type=str,
                        default='1*nb_res',
                        help='The function to apply on the jobs size. '
                             'The identity is used by default.')
    parser.add_argument('-jwf', '--job_walltime_factor',
                        type=float, default=2,
                        help='Jobs walltimes are computed by the formula max(givenWalltime, jobWalltimeFactor*givenRuntime)')
    parser.add_argument('-gwo', '--given_walltime_only',
                        action="store_true",
                        help='If set, only the given walltime in the trace will be used')
    parser.add_argument('-jg', '--job_grain',
                        type=int, default=1,
                        help='Selects the level of detail we want for jobs. This parameter is used to group jobs that have close running time')
    parser.add_argument('-i', '--indent', type=int, default=None,
                        help='If set to a non-negative integer, then JSON array elements and object members will be pretty-printed with that indent level. An indent level of 0, or negative, will only insert newlines. The default value (None) selects the most compact representation.')
    parser.add_argument('--keep_only',
                        type=str,
                        default=None,
                        help='If set, this parameter is evaluated to choose which jobs should be kept')

    group = parser.add_mutually_exclusive_group()
    group.add_argument("-v", "--verbose", action="store_true")
    group.add_argument("-q", "--quiet", action="store_true")

    args = parser.parse_args()

    generate_workload(input_swf=args.input_swf,
                      output_folder=args.output_folder,
                      partitions_to_select=args.partitions_to_select,
                      start_time=args.start_time,
                      job_walltime_factor=args.job_walltime_factor,
                      given_walltime_only=args.given_walltime_only,
                      job_grain=args.job_grain,
                      indent=args.indent,
                      keep_only=args.keep_only,
                      verbose=args.verbose,
                      quiet=args.quiet,
                      job_size_function_string=args.job_size_function)


if __name__ == "__main__":
    main()