Skip to content
Snippets Groups Projects
swf_to_batsim_split_by_user.py 7.36 KiB
#!/usr/bin/env python3

"""
Transforms a SWF to a Batsim workload with computation-only jobs. 
Split by user. Do not include the jobs profiles in the output JSON.
"""

import argparse
import json
import re
import sys
import datetime
import os

from swf import SwfField


def generate_workload(input_swf, output_folder,
                      start_time=None,
                      job_walltime_factor=2,
                      given_walltime_only=False,
                      job_grain=1,
                      indent=None,
                      quiet=False,
                      job_size_function_string='1*nb_res'):
    """Generate a Batsim workload from a SWF trace."""
    if not quiet:
        print(f"Input file = {input_swf}")
        print(f"output folder = {output_folder}")

    element = '([-+]?\d+(?:\.\d+)?)'
    r = re.compile('\s*' + (element + '\s+') * 17 + element + '\s*')

    version = 0

    # A dictionary of users, each entry containing the its jobs 
    users = {}


    # Some job counters
    not_valid = 0
    not_line_match_format = 0

    minimum_observed_submit_time = float('inf')

    # Let's loop over the lines of the input file
    i = 0
    with open(input_swf, 'r') as swf:
        for line in swf:
            i += 1
            if not quiet and i % 10000 == 0:
                print(f"\r\033[KProcessing swf line {i}...", end="")
            
            res = r.match(line)

            if res:
                # Parsing...
                job_id = (int(float(res.group(SwfField.JOB_ID.value))))
                nb_res = int(
                    float(res.group(SwfField.REQUESTED_NUMBER_OF_PROCESSORS.value)))
                run_time = float(res.group(SwfField.RUN_TIME.value))
                submit_time = max(0, float(res.group(SwfField.SUBMIT_TIME.value)))
                walltime = max(job_walltime_factor * run_time,
                            float(res.group(SwfField.REQUESTED_TIME.value)))
                user_id = int(res.group(SwfField.USER_ID.value))

                # nb_res may be changed by calling a user-given function
                nb_res = eval(job_size_function_string)

                if given_walltime_only:
                    walltime = float(res.group(SwfField.REQUESTED_TIME.value))

                is_valid_job = (nb_res > 0 and walltime >
                                run_time and run_time > 0 and submit_time >= 0)

                if not is_valid_job:
                    not_valid += 1
                
                else:
                    if user_id not in users:
                        users[user_id] = []

                    profile = int(((run_time // job_grain) + 1) * job_grain)

                    job = (job_id, nb_res, run_time,
                        submit_time, profile, walltime)
                    minimum_observed_submit_time = min(minimum_observed_submit_time,
                                                    submit_time)
                    users[user_id].append(job)

            else:
                not_line_match_format += 1
        if not quiet:
            print("\nEnd parsing.")

    # Create a json file per user
    if not os.path.exists(output_folder):
        os.makedirs(output_folder)

    if start_time is None:
        translate_submit_times = minimum_observed_submit_time
    else:
        translate_submit_times = start_time

    for user_id, jobs in users.items():
        # Export JSON
        # Let's generate a list of dictionaries for the jobs
        djobs = list()
        for (job_id, nb_res, run_time, submit_time, profile, walltime) in jobs:
            djobs.append({'id': job_id,
                          'subtime': submit_time - translate_submit_times,
                          'walltime': walltime,
                          'res': nb_res,
                          'profile': str(profile)})

        biggest_job = max([nb_res for (_, nb_res, _, _, _, _) in jobs])

        data = {
            'version': version,
            'command': ' '.join(sys.argv[:]),
            'date': datetime.datetime.now().isoformat(' '),
            'description': f'Workload for user {user_id}',
            'nb_res': biggest_job,
            'jobs': djobs}

        try:
            output_json = f"{output_folder}/user{user_id}.json"
            out_file = open(output_json, 'w')

            json.dump(data, out_file, indent=indent, sort_keys=True)

            if not quiet:
                print(f"user{user_id:3d}: {len(djobs):10d} jobs")


        except IOError:
            print('Cannot write file', output_json)

    if not quiet:
        print('-------------------')
        print(f"Total {sum([len(jobs) for jobs in users.values()])} jobs and "
              f"{len(users)} users have been created.")
        print(f"{not_line_match_format} out of {i} lines in the file did not "
               "match the swf format")
        print(f"{not_valid} jobs were not valid")


def main():
    """
    Program entry point.

    Parses the input arguments then calls generate_flat_platform.
    """
    parser = argparse.ArgumentParser(
        description='Reads a SWF (Standard Workload Format) file and transform it into a JSON Batsim workload (with delay jobs)')
    parser.add_argument('input_swf', type=str,
                        help='The input SWF file')
    parser.add_argument('output_folder', type=str,
                        help='The output folder for the JSON files')

    parser.add_argument('--start_time', type=int, default=None,
                        help='If set, the submit times will be translated towards zero by this value. Otherwise, the first job sets the zero.')

    parser.add_argument('-jsf', '--job-size-function',
                        type=str,
                        default='1*nb_res',
                        help='The function to apply on the jobs size. '
                             'The identity is used by default.')
    parser.add_argument('-jwf', '--job_walltime_factor',
                        type=float, default=2,
                        help='Jobs walltimes are computed by the formula max(givenWalltime, jobWalltimeFactor*givenRuntime)')
    parser.add_argument('-gwo', '--given_walltime_only',
                        action="store_true",
                        help='If set, only the given walltime in the trace will be used')
    parser.add_argument('-jg', '--job_grain',
                        type=int, default=1,
                        help='Selects the level of detail we want for jobs. This parameter is used to group jobs that have close running time')
    parser.add_argument('-i', '--indent', type=int, default=None,
                        help='If set to a non-negative integer, then JSON array elements and object members will be pretty-printed with that indent level. An indent level of 0, or negative, will only insert newlines. The default value (None) selects the most compact representation.')

    parser.add_argument("-q", "--quiet", action="store_true")

    args = parser.parse_args()

    generate_workload(input_swf=args.input_swf,
                      output_folder=args.output_folder,
                      start_time=args.start_time,
                      job_walltime_factor=args.job_walltime_factor,
                      given_walltime_only=args.given_walltime_only,
                      job_grain=args.job_grain,
                      indent=args.indent,
                      quiet=args.quiet,
                      job_size_function_string=args.job_size_function)


if __name__ == "__main__":
    main()