Skip to content
Snippets Groups Projects
Commit e747d2ee authored by Maël Madon's avatar Maël Madon
Browse files

Script dependency graph: dynamic reduction by default

parent cced96a8
Branches
Tags
No related merge requests found
...@@ -32,11 +32,13 @@ class User: ...@@ -32,11 +32,13 @@ class User:
def init_dyn_reduc(self): def init_dyn_reduc(self):
"""Necessary data to keep track of to do dynamic transitive reduction""" """Necessary data to keep track of to do dynamic transitive reduction"""
self.jobs_in_progress = set() # jobs submitted but not finished, at time t self.jobs_in_progress = set(
self.current_dep = set() # jobs finished, constituting the minimal list ) # jobs submitted but not finished, at time t
# of dependencies for a job sumbitted at time t self.current_dep = set(
) # jobs finished, constituting the minimal list
# of dependencies for a job sumbitted at time t
def add_job(self, job_id, submit_time, finish_time, dynamic_reduction): def add_job(self, job_id, submit_time, finish_time, no_dynamic_reduction):
# Add the job to the user's list # Add the job to the user's list
current_job = Job(job_id, submit_time, finish_time) current_job = Job(job_id, submit_time, finish_time)
self.jobs[job_id] = current_job self.jobs[job_id] = current_job
...@@ -44,7 +46,7 @@ class User: ...@@ -44,7 +46,7 @@ class User:
# Add a node representing that job in the dependancy graph: # Add a node representing that job in the dependancy graph:
self.G.add_node(job_id) self.G.add_node(job_id)
if not dynamic_reduction: if no_dynamic_reduction:
# For all jobs that were finished when the current job was submitted, # For all jobs that were finished when the current job was submitted,
# add a directed edge weighted by the thinking time # add a directed edge weighted by the thinking time
for ended_job in self.jobs_finished_at_time(submit_time): for ended_job in self.jobs_finished_at_time(submit_time):
...@@ -53,21 +55,19 @@ class User: ...@@ -53,21 +55,19 @@ class User:
ended_job.id, ended_job.id,
weight=think_time) weight=think_time)
else: else:
# Move the jobs recently finished from the set `jobs_in_progress` # Move the jobs recently finished from the set `jobs_in_progress`
# to the set `current_dep`, removing all their neighbours in the # to the set `current_dep`, removing all their neighbours in the
# dependancy graph from the set `current_dep` to keep it minimal # dependancy graph from the set `current_dep` to keep it minimal
for j in self.jobs_in_progress: for j in self.jobs_in_progress:
if current_job.submit_time - j.finish_time >= 0: # ie j is finished if current_job.submit_time - j.finish_time >= 0: # ie j is finished
self.jobs_in_progress.pop(j) self.jobs_in_progress.pop(j)
self.current_dep = self.current_dep - self.dependancies_of(j) self.current_dep = self.current_dep - self.dependancies_of(
j)
self.current_dep.add(j) self.current_dep.add(j)
for job_dep in self.current_dep: for job_dep in self.current_dep:
think_time = current_job.submit_time - job_dep.finish_time think_time = current_job.submit_time - job_dep.finish_time
self.G.add_edge(current_job.id, self.G.add_edge(current_job.id, job_dep.id, weight=think_time)
job_dep.id,
weight=think_time)
def jobs_finished_at_time(self, t): def jobs_finished_at_time(self, t):
"""Return the list of jobs that are finished at time t.""" """Return the list of jobs that are finished at time t."""
...@@ -85,18 +85,18 @@ class User: ...@@ -85,18 +85,18 @@ class User:
def build_dep_graph(job_id, submit_time, finish_time, user_id, def build_dep_graph(job_id, submit_time, finish_time, user_id,
dynamic_reduction): no_dynamic_reduction):
if user_id not in users: if user_id not in users:
users[user_id] = User(user_id) users[user_id] = User(user_id)
if dynamic_reduction: if not no_dynamic_reduction:
(users[user_id]).init_dyn_reduc() (users[user_id]).init_dyn_reduc()
user = users[user_id] user = users[user_id]
user.add_job(job_id, submit_time, finish_time, dynamic_reduction) user.add_job(job_id, submit_time, finish_time, no_dynamic_reduction)
def parse_swf_and_build_graph(input_swf, output_folder, transitive_reduction, def parse_swf_and_build_graph(input_swf, output_folder, transitive_reduction,
dynamic_reduction, quiet): no_dynamic_reduction, quiet):
# Read SWF # Read SWF
element = '([-+]?\d+(?:\.\d+)?)' element = '([-+]?\d+(?:\.\d+)?)'
r = re.compile('\s*' + (element + '\s+') * 17 + element + '\s*') r = re.compile('\s*' + (element + '\s+') * 17 + element + '\s*')
...@@ -118,7 +118,7 @@ def parse_swf_and_build_graph(input_swf, output_folder, transitive_reduction, ...@@ -118,7 +118,7 @@ def parse_swf_and_build_graph(input_swf, output_folder, transitive_reduction,
finish_time = submit_time + wait_time + run_time finish_time = submit_time + wait_time + run_time
build_dep_graph(job_id, submit_time, finish_time, user_id, build_dep_graph(job_id, submit_time, finish_time, user_id,
dynamic_reduction) no_dynamic_reduction)
# SWF finished, output the dependancy graph of each user # SWF finished, output the dependancy graph of each user
for user_id, user in users.items(): for user_id, user in users.items():
...@@ -137,14 +137,16 @@ def main(): ...@@ -137,14 +137,16 @@ def main():
type=str, type=str,
help='The output folder that will store the graphs') help='The output folder that will store the graphs')
parser.add_argument( parser.add_argument(
'--transitive_reduction', '--no_dynamic_reduction',
action="store_true", action="store_true",
help='Gives the transitive reduction version of the graphs in output.') help=
'Unless this option is specified, during the construction of the graph the algorithm dynamically avoids to add an edge between two nodes if a path already exists.'
)
parser.add_argument( parser.add_argument(
'--dynamic_reduction', '--transitive_reduction',
action="store_true", action="store_true",
help= help=
'While building the graph, dynamically avoids to add an edge between two nodes if a path already exist.' 'Gives the transitive reduction version of the graphs in output (without the option --no_dynamic_reduction, the graphs in output are already minimal in the sense of the transitive reduction).'
) )
parser.add_argument("-q", "--quiet", action="store_true") parser.add_argument("-q", "--quiet", action="store_true")
...@@ -154,7 +156,7 @@ def main(): ...@@ -154,7 +156,7 @@ def main():
parse_swf_and_build_graph(input_swf=args.input_swf, parse_swf_and_build_graph(input_swf=args.input_swf,
output_folder=args.output_folder, output_folder=args.output_folder,
transitive_reduction=args.transitive_reduction, transitive_reduction=args.transitive_reduction,
dynamic_reduction=args.dynamic_reduction, no_dynamic_reduction=args.no_dynamic_reduction,
quiet=args.quiet) quiet=args.quiet)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment