Skip to content
Snippets Groups Projects
Commit 9b53db46 authored by Maël Madon's avatar Maël Madon
Browse files

do simple validy checks on the SWF

parent c5080e48
No related branches found
No related tags found
No related merge requests found
#!/usr/bin/env python3
import argparse, json, re, os, sys
import argparse, json, re, os, sys, warnings
from src.user_session_builder import User
from src.workload import SwfField, Job
......@@ -14,6 +14,7 @@ def swf2sessions(input_swf, out_dir, delim_approach, delim_threshold,
element = '([-+]?\d+(?:\.\d+)?)'
r = re.compile('\s*' + (element + '\s+') * 17 + element + '\s*')
i = 0
nb_jobs, not_valid = 0, 0
for line in input_swf:
i += 1
if not quiet and i % 100000 == 0:
......@@ -21,6 +22,8 @@ def swf2sessions(input_swf, out_dir, delim_approach, delim_threshold,
res = r.match(line)
if res:
nb_jobs += 1
# Retreive values
job_id = int(res.group((SwfField.JOB_ID.value)))
submit_time = max(0, float(res.group(SwfField.SUBMIT_TIME.value)))
......@@ -41,16 +44,18 @@ def swf2sessions(input_swf, out_dir, delim_approach, delim_threshold,
job = Job(job_id, submit_time, finish_time, start_time, nb_res,
walltime, job_grain=job_grain)
if user_id not in users:
users[user_id] = User(user_id,
delim_approach,
delim_threshold,
dynamic_reduction=dynamic_reduction,
build_graph_rep=build_graph_rep)
user = users[user_id]
user.add_job(job)
if check_sanity(job):
if user_id not in users:
users[user_id] = User(user_id,
delim_approach,
delim_threshold,
dynamic_reduction=dynamic_reduction,
build_graph_rep=build_graph_rep)
user = users[user_id]
user.add_job(job)
else:
not_valid += 1
# SWF finished, write output files
# - SABjson:
......@@ -81,12 +86,29 @@ def swf2sessions(input_swf, out_dir, delim_approach, delim_threshold,
if not quiet:
print("\nSWF parsing done.")
print(f"{nb_jobs} jobs parsed, {not_valid} jobs were filtered out as they did not pass the sanity checks")
print("Number of users: ", len(users))
print("Number of sessions: ",
sum([len(u.sessions) for u in users.values()]))
print(f"The output files have been stored in the folder {out_dir}")
def check_sanity(job:Job):
"""Check basic properties that a recorded jobs should have"""
if job.res <= 0:
warnings.warn(f"Invalid job: job {job.id} has ALLOCATED_PROCESSOR_COUNT <= 0")
return False
if not( job.submit_time <= job.start_time <= job.finish_time ):
warnings.warn(f"Invalid job: job {job.id} don't have submit <= start <= finish time")
return False
if job.walltime < (job.finish_time - job.start_time):
warnings.warn(f"Invalid job: job {job.id} runtime < walltime")
return False
return True
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Python script to read a workload trace in the Standard Workload Format (SWF), decompose it into user sessions, analyse the dependencies between sessions and store the results in the Session Annotated Batsim JSON format (SABjson).")
parser.add_argument('input_swf',
......@@ -172,6 +194,7 @@ if __name__ == "__main__":
help="(default: 1) Selects the level of detail we want for job profiles. "
"This parameter is used to group jobs that have close running times. "
"For example: a job grain of 10 will round up running times to the next ten.")
args = parser.parse_args()
......@@ -192,6 +215,9 @@ if __name__ == "__main__":
raise argparse.ArgumentTypeError(
"The threshold must be a positive value.")
if args.quiet:
warnings.filterwarnings("ignore")
swf2sessions(input_swf=args.input_swf,
out_dir=args.output_dir,
delim_approach=delim,
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment