Skip to content
Snippets Groups Projects
Commit d4898aa4 authored by Maël Madon's avatar Maël Madon
Browse files

delete old scripts, their fonctionnality being included as an option in swf2userSessions

parent 7a96da06
Branches
Tags
No related merge requests found
Pipeline #4516 failed
Source diff could not be displayed: it is too large. Options to address this: view the blob.
#!/usr/bin/env python3
"""
Performs the dependancy graph analysis of a workload, for each of its users.
"""
import argparse
import json
import re
import networkx as nx
from src.workload import SwfField
# Global dictionnary
users = {}
class Job:
def __init__(self, job_id, submit_time, finish_time):
self.id = job_id
self.submit_time = submit_time
self.finish_time = finish_time
class User:
"""Class representing a user of the workload."""
def __init__(self, user_id, no_dynamic_reduction=False):
self.id = user_id
self.jobs = {}
# The (directed) dependancy graph
self.G = nx.DiGraph()
if not(no_dynamic_reduction):
# jobs submitted but not finished, at time t
self.jobs_in_progress = set()
# jobs finished, constituting the minimal list
# of dependencies for a job sumbitted at time t
self.current_dep = set()
def add_job(self, job_id, submit_time, finish_time, no_dynamic_reduction):
# Add the job to the user's list
current_job = Job(job_id, submit_time, finish_time)
self.jobs[job_id] = current_job
# Add a node representing that job in the dependancy graph:
self.G.add_node(job_id)
if no_dynamic_reduction:
# For all jobs that were finished when the current job was submitted,
# add a directed edge weighted by the thinking time
for ended_job in self.jobs_finished_at_time(submit_time):
think_time = current_job.submit_time - ended_job.finish_time
self.G.add_edge(current_job.id,
ended_job.id,
weight=think_time)
else:
# Move the jobs recently finished from the set `jobs_in_progress`
# to the set `current_dep`, removing all their neighbours in the
# dependancy graph from the set `current_dep` to keep it minimal
for j in self.jobs_in_progress:
if current_job.submit_time - j.finish_time >= 0: # ie j is finished
self.jobs_in_progress.pop(j)
self.current_dep = self.current_dep - self.dependancies_of(
j)
self.current_dep.add(j)
for job_dep in self.current_dep:
think_time = current_job.submit_time - job_dep.finish_time
self.G.add_edge(current_job.id, job_dep.id, weight=think_time)
def jobs_finished_at_time(self, t):
"""Return the list of jobs that are finished at time t."""
return [j for j in self.jobs.values() if j.finish_time <= t]
def dependancies_of(self, job):
"""Return the set of direct dependancies of the job."""
return {self.jobs[n] for n in self.G.successors(job.id)}
def export_dependancy_graph(self, output_folder, transitive_reduction):
"""Write the dependancy graph as a gml file, for each user."""
if transitive_reduction:
self.G = nx.transitive_reduction(self.G)
nx.write_gml(self.G, f"{output_folder}/{self.id}.gml")
def build_dep_graph(job_id, submit_time, finish_time, user_id,
no_dynamic_reduction):
if user_id not in users:
users[user_id] = User(user_id, no_dynamic_reduction)
user = users[user_id]
user.add_job(job_id, submit_time, finish_time, no_dynamic_reduction)
def parse_swf_and_build_graph(input_swf, output_folder, transitive_reduction,
no_dynamic_reduction, quiet):
# Read SWF
element = '([-+]?\d+(?:\.\d+)?)'
r = re.compile('\s*' + (element + '\s+') * 17 + element + '\s*')
i = 0
for line in input_swf:
i += 1
if not quiet and i % 100000 == 0:
print("Processing swf line", i)
res = r.match(line)
if res:
# Retreive values
job_id = str(res.group((SwfField.JOB_ID.value)))
submit_time = max(0, float(res.group(SwfField.SUBMIT_TIME.value)))
wait_time = float(res.group(SwfField.WAIT_TIME.value))
run_time = float(res.group(SwfField.RUN_TIME.value))
user_id = str(res.group(SwfField.USER_ID.value))
finish_time = submit_time + wait_time + run_time
build_dep_graph(job_id, submit_time, finish_time, user_id,
no_dynamic_reduction)
# SWF finished, output the dependancy graph of each user
for user_id, user in users.items():
user.export_dependancy_graph(output_folder, transitive_reduction)
def main():
parser = argparse.ArgumentParser(
description=
'Performs the dependancy graph analysis of a workload, for each of its users.'
)
parser.add_argument('input_swf',
type=argparse.FileType('r'),
help='The input SWF file')
parser.add_argument('output_folder',
type=str,
help='The output folder that will store the graphs')
parser.add_argument(
'--no_dynamic_reduction',
action="store_true",
help=
'Unless this option is specified, during the construction of the graph the algorithm dynamically avoids to add an edge between two nodes if a path already exists.'
)
parser.add_argument(
'--transitive_reduction',
action="store_true",
help=
'Gives the transitive reduction version of the graphs in output (without the option --no_dynamic_reduction, the graphs in output are already minimal in the sense of the transitive reduction).'
)
parser.add_argument("-q", "--quiet", action="store_true")
args = parser.parse_args()
parse_swf_and_build_graph(input_swf=args.input_swf,
output_folder=args.output_folder,
transitive_reduction=args.transitive_reduction,
no_dynamic_reduction=args.no_dynamic_reduction,
quiet=args.quiet)
if __name__ == "__main__":
main()
#!/usr/bin/env python3
"""
Parser to read an input SWF and compute stats about the users of the log
Inspired from https://gitlab.inria.fr/batsim/batsim/-/blob/master/tools/swf_to_batsim_workload_compute_only.py
"""
import argparse
import json
import re
from copy import deepcopy
from src.workload import SwfField
# Global dictionnary
users = {}
def update_user_stats(submit_time, finish_time, user_id, delim_approach,
delim_threshold):
if user_id not in users:
user = users[user_id] = {}
user['first_submit_time'] = submit_time
user['max_finish_time'] = finish_time
user['nb_sessions'] = 0
user['sessions'] = []
user['last_submit_time'] = submit_time
user['last_finish_time'] = finish_time
current_session = user['current_session'] = {}
current_session['first_submit_time'] = current_session[
'last_submit_time'] = submit_time
current_session['finish_time'] = finish_time
current_session['nb_jobs'] = 1
else:
user = users[user_id]
current_session = user['current_session']
# Does the job belongs to a new session?
# Depends on session delimiation approaches (according to Zakey and Feitelson 2013)
if delim_approach == 'arrival':
inter_arrival_time = submit_time - user['last_submit_time']
new_session = (inter_arrival_time > delim_threshold * 60)
elif delim_approach == 'last':
think_time = submit_time - user['last_finish_time']
new_session = (think_time > delim_threshold * 60)
else:
think_time = submit_time - user['max_finish_time']
new_session = (think_time > delim_threshold * 60)
if not new_session:
# Just update current session
current_session['last_submit_time'] = submit_time
current_session['finish_time'] = max(
finish_time, current_session['finish_time'])
current_session['nb_jobs'] += 1
else:
# Archive current session
user['sessions'].append(deepcopy(current_session))
user['nb_sessions'] += 1
# Create the new session
current_session['first_submit_time'] = current_session[
'last_submit_time'] = submit_time
current_session['finish_time'] = finish_time
current_session['nb_jobs'] = 1
# Update variables
user['last_submit_time'] = submit_time
user['last_finish_time'] = finish_time
user['max_finish_time'] = max(finish_time, user['max_finish_time'])
def parse_swf(input_swf, delim_approach, delim_threshold=None, quiet=False):
# Read SWF
element = '([-+]?\d+(?:\.\d+)?)'
r = re.compile('\s*' + (element + '\s+') * 17 + element + '\s*')
i = 0
for line in input_swf:
i += 1
if not quiet and i % 100000 == 0:
print("Processing swf line", i)
res = r.match(line)
if res:
# Retreive values
submit_time = max(0, float(res.group(SwfField.SUBMIT_TIME.value)))
wait_time = float(res.group(SwfField.WAIT_TIME.value))
run_time = float(res.group(SwfField.RUN_TIME.value))
user_id = str(res.group(SwfField.USER_ID.value))
finish_time = submit_time + wait_time + run_time
update_user_stats(submit_time, finish_time, user_id,
delim_approach, delim_threshold)
# SWF finished, close all the user sessions
for user_id, user in users.items():
user['sessions'].append(deepcopy(user['current_session']))
user['nb_sessions'] += 1
user.pop('current_session')
def main():
parser = argparse.ArgumentParser(
description=
'Reads a SWF (Standard Workload Format) file and compute stats about the users of the log.'
)
parser.add_argument('input_swf',
type=argparse.FileType('r'),
help='The input SWF file')
parser.add_argument('output_json',
type=argparse.FileType('w'),
help='The output json file containing the stats')
group = parser.add_mutually_exclusive_group(required=True)
group.add_argument('-a',
'--arrival',
help='Arrival delimitation approach',
action="store_true")
group.add_argument('-l',
'--last',
help='Last delimitation approach',
action="store_true")
group.add_argument('-m',
'--max',
help='Max delimitation approach',
action="store_true")
parser.add_argument(
'-t',
'--threshold',
type=int,
help=
'Unit: minutes. The threshold (on think-time or inter-arrival time, depending on the delimiation approach) above which a job will be considered to be in a new session.'
)
parser.add_argument("-q", "--quiet", action="store_true")
args = parser.parse_args()
if args.last:
delim = 'last'
elif args.max:
delim = 'max'
else:
delim = 'arrival'
parse_swf(input_swf=args.input_swf,
delim_approach=delim,
delim_threshold=args.threshold,
quiet=args.quiet)
# Write output json
json.dump(users, args.output_json)
if __name__ == "__main__":
main()
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment