Skip to content
Snippets Groups Projects
Commit 7c145ddc authored by Maël Madon's avatar Maël Madon
Browse files

script finished, to be tested :)

parent d5737bdf
Branches
Tags
No related merge requests found
main.py 100644 → 100755
# a big CLI with all options
# by default:
# - session analysis
# - SABjson for each user
# + graph mode
# + stat mode
\ No newline at end of file
#!/usr/bin/env python3
import argparse, json, re
from user_session_builder import User
from workload import SwfField, Job
def main(input_swf, output_folder, delim_approach, delim_threshold,
dynamic_reduction, build_graph_rep, quiet):
users = {}
# Read SWF
element = '([-+]?\d+(?:\.\d+)?)'
r = re.compile('\s*' + (element + '\s+') * 17 + element + '\s*')
i = 0
for line in input_swf:
i += 1
if not quiet and i % 100000 == 0:
print("Processing swf line", i)
res = r.match(line)
if res:
# Retreive values
job_id = str(res.group((SwfField.JOB_ID.value)))
submit_time = max(0, float(res.group(SwfField.SUBMIT_TIME.value)))
wait_time = float(res.group(SwfField.WAIT_TIME.value))
run_time = float(res.group(SwfField.RUN_TIME.value))
user_id = str(res.group(SwfField.USER_ID.value))
walltime = float(res.group(SwfField.REQUESTED_TIME.value))
nb_res = int(res.group(SwfField.ALLOCATED_PROCESSOR_COUNT.value))
start_time = submit_time + wait_time
finish_time = submit_time + wait_time + run_time
job = Job(job_id, submit_time, finish_time, start_time, nb_res, walltime)
if user_id not in users:
users[user_id] = User(user_id, delim_approach, delim_threshold, dynamic_reduction=dynamic_reduction, build_graph_rep=build_graph_rep)
user = users[user_id]
user.add_job(job)
# SWF finished, output for each user
for user_id, user in users.items():
with open(f"{output_folder}/{user_id}.SABjson", "w") as file:
json.dump(user.to_dict(), file)
if build_graph_rep:
user.export_dependancy_graph(output_folder)
if not quiet:
print("\nSWF parsing done.")
print("Number of users: ", len(users))
print("Number of sessions: ", sum([len(u.sessions) for u in users.values()]))
print(f"The output files have been stored in the folder {output_folder}")
parser = argparse.ArgumentParser(description='TODO')
parser.add_argument('input_swf',
type=argparse.FileType('r'),
help='The input SWF file')
parser.add_argument('output_folder',
type=str,
help='The folder that will store the output files')
group = parser.add_mutually_exclusive_group(required=True)
group.add_argument('-a',
'--arrival',
help='Arrival delimitation approach',
action="store_true")
group.add_argument('-l',
'--last',
help='Last delimitation approach',
action="store_true")
group.add_argument('-m',
'--max',
help='Max delimitation approach',
action="store_true")
parser.add_argument(
'-t',
'--threshold',
type=int,
help=
'Unit: minutes. The threshold (on think-time or inter-arrival time, depending on the delimiation approach) above which a job will be considered to be in a new session.'
)
parser.add_argument(
'--no_dynamic_reduction',
action="store_true",
help=
'Unless this option is specified, during the construction of the graph the algorithm dynamically avoids to add an edge between two nodes if a path already exists.'
)
parser.add_argument(
'--graph',
action="store_true",
help=
"Build a graphical representation of each session graph and save it in the output folder as a gml file"
)
parser.add_argument("-q", "--quiet", action="store_true")
args = parser.parse_args()
if args.last:
delim = 'last'
elif args.max:
delim = 'max'
else:
delim = 'arrival'
main(input_swf=args.input_swf,
output_folder=args.output_folder,
delim_approach=delim,
delim_threshold=args.threshold,
dynamic_reduction=not (args.no_dynamic_reduction),
build_graph_rep=args.graph,
quiet=args.quiet)
\ No newline at end of file
......@@ -3,13 +3,9 @@ The main building block of the program, where the cutting decisions are taken
according to the delimitation approach.
"""
from random import paretovariate
import threading
from workload import Job, Session
from datetime import date, datetime
from datetime import datetime
import networkx as nx
from workload import Session
class User:
"""Class representing a user of the workload."""
......@@ -23,10 +19,10 @@ class User:
self.id = user_id
self.sessions = {} # dictionnary of session_id, Session object
self.active_session_id = None
self.last_submit_time = None
self.last_finish_time = None
self.max_finish_time = None
self.active_session_id = 0
self.last_submit_time = 0
self.last_finish_time = 0
self.max_finish_time = 0
self.delim_approach = delim_approach
self.delim_threshold = delim_threshold * 60 # minutes to sec
......@@ -45,7 +41,7 @@ class User:
self.max_nb_res = 0
@private
#@private
def is_new_session(self, date_now):
"""Checks if a new session should be started according to the delimitation approach (see Zackay and Feitelson 2013)"""
......@@ -65,7 +61,7 @@ class User:
"""Return the list of session that were finished at time t"""
return [s for s in self.sessions.values() if s.max_finish_time <= t]
@private
#@private
def update_session_graph(self, date_now):
"""Add the dependencies for the active session"""
active_session = self.sessions[self.active_session_id]
......@@ -91,11 +87,11 @@ class User:
# Move the sessions recently finished from the set `_in_progress`
# to the set `current_dep`, removing all their neighbours in the
# dependancy graph from the set `current_dep` to keep it minimal
for s_id in self.sessions_in_progress:
for s_id in self.sessions_in_progress.copy():
sess = self.sessions[s_id]
if date_now > sess.max_finish_time: # ie sess is finished
self.sessions_in_progress.pop(s_id)
self.sessions_in_progress.discard(s_id)
self.current_dep = self.current_dep - set(
sess.preceding_sessions)
self.current_dep.add(s_id)
......@@ -113,30 +109,31 @@ class User:
sess.id,
weight=think_time)
@private
#@private
def create_new_session(self, date_now):
"""Create a new active session and achive the old one"""
# Archive active session
active_session = self.sessions[self.active_session_id]
if self.dynamic_reduction and date_now <= active_session.max_finish_time:
self.sessions_in_progress.add(self.active_session_id)
if self.active_session_id > 0:
# Archive active session and increment active session_id
active_session = self.sessions[self.active_session_id]
if self.dynamic_reduction and date_now <= active_session.max_finish_time:
self.sessions_in_progress.add(self.active_session_id)
# Increment active session_id
self.active_session_id += 1
# Create new session
new_session = Session(self.active_session_id, date_now)
self.sessions[self.active_session_id] = new_session
@private
#@private
def add_job_to_active_session(self, job):
active_sess = self.sessions[self.active_session_id]
active_sess.add_job(job)
@public
#@public
def add_job(self, job):
"""Add a job to the job list of the user, creating a new session and handling its dependencies, if needed."""
"""Add a job to the job list of the user, creating a new session and
handling its dependencies, if needed."""
if job.res > self.max_nb_res:
self.max_nb_res = job.res
......@@ -153,6 +150,12 @@ class User:
self.last_finish_time = job.finish_time
self.max_finish_time = max(self.max_finish_time, job.finish_time)
def export_dependancy_graph(self, output_folder):
"""Write the dependancy graph as a gml file"""
nx.write_gml(self.G, f"{output_folder}/{self.id}.gml")
def to_dict(self, descr=None, cmd=None, version=1):
"""User object to dictionnary, for printing or json export."""
......@@ -160,7 +163,7 @@ class User:
descr = f"user{self.id}"
res = {
"description": descr,
"date": datetime.now(),
"date": str(datetime.now()),
"nb_res": self.max_nb_res,
"version": version,
"sessions": [s.to_dict() for _, s in self.sessions.items()]
......
......@@ -69,7 +69,7 @@ class Session:
self.jobs = [] # list of Job objects
self.max_nb_res = 0
self.max_finish_time = None
self.max_finish_time = 0
def add_job(self, job):
"""Add job to the session"""
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment