diff --git a/swf2userSessions.py b/swf2userSessions.py index b100ea21ccc32dedb44ec460b3793cb8ba4053cd..d22f9a8d69d32f6432eced9b6c200780647ae33d 100755 --- a/swf2userSessions.py +++ b/swf2userSessions.py @@ -1,6 +1,6 @@ #!/usr/bin/env python3 -import argparse, json, re, os, sys +import argparse, json, re, os, sys, warnings from src.user_session_builder import User from src.workload import SwfField, Job @@ -14,6 +14,7 @@ def swf2sessions(input_swf, out_dir, delim_approach, delim_threshold, element = '([-+]?\d+(?:\.\d+)?)' r = re.compile('\s*' + (element + '\s+') * 17 + element + '\s*') i = 0 + nb_jobs, not_valid = 0, 0 for line in input_swf: i += 1 if not quiet and i % 100000 == 0: @@ -21,6 +22,8 @@ def swf2sessions(input_swf, out_dir, delim_approach, delim_threshold, res = r.match(line) if res: + nb_jobs += 1 + # Retreive values job_id = int(res.group((SwfField.JOB_ID.value))) submit_time = max(0, float(res.group(SwfField.SUBMIT_TIME.value))) @@ -41,16 +44,18 @@ def swf2sessions(input_swf, out_dir, delim_approach, delim_threshold, job = Job(job_id, submit_time, finish_time, start_time, nb_res, walltime, job_grain=job_grain) - if user_id not in users: - users[user_id] = User(user_id, - delim_approach, - delim_threshold, - dynamic_reduction=dynamic_reduction, - build_graph_rep=build_graph_rep) - - user = users[user_id] - user.add_job(job) - + if check_sanity(job): + if user_id not in users: + users[user_id] = User(user_id, + delim_approach, + delim_threshold, + dynamic_reduction=dynamic_reduction, + build_graph_rep=build_graph_rep) + user = users[user_id] + user.add_job(job) + + else: + not_valid += 1 # SWF finished, write output files # - SABjson: @@ -81,12 +86,29 @@ def swf2sessions(input_swf, out_dir, delim_approach, delim_threshold, if not quiet: print("\nSWF parsing done.") + print(f"{nb_jobs} jobs parsed, {not_valid} jobs were filtered out as they did not pass the sanity checks") print("Number of users: ", len(users)) print("Number of sessions: ", sum([len(u.sessions) for u in users.values()])) print(f"The output files have been stored in the folder {out_dir}") +def check_sanity(job:Job): + """Check basic properties that a recorded jobs should have""" + + if job.res <= 0: + warnings.warn(f"Invalid job: job {job.id} has ALLOCATED_PROCESSOR_COUNT <= 0") + return False + if not( job.submit_time <= job.start_time <= job.finish_time ): + warnings.warn(f"Invalid job: job {job.id} don't have submit <= start <= finish time") + return False + if job.walltime < (job.finish_time - job.start_time): + warnings.warn(f"Invalid job: job {job.id} runtime < walltime") + return False + return True + + + if __name__ == "__main__": parser = argparse.ArgumentParser(description="Python script to read a workload trace in the Standard Workload Format (SWF), decompose it into user sessions, analyse the dependencies between sessions and store the results in the Session Annotated Batsim JSON format (SABjson).") parser.add_argument('input_swf', @@ -172,6 +194,7 @@ if __name__ == "__main__": help="(default: 1) Selects the level of detail we want for job profiles. " "This parameter is used to group jobs that have close running times. " "For example: a job grain of 10 will round up running times to the next ten.") + args = parser.parse_args() @@ -192,6 +215,9 @@ if __name__ == "__main__": raise argparse.ArgumentTypeError( "The threshold must be a positive value.") + if args.quiet: + warnings.filterwarnings("ignore") + swf2sessions(input_swf=args.input_swf, out_dir=args.output_dir, delim_approach=delim,