Skip to content
Snippets Groups Projects
Commit d5737bdf authored by Maël Madon's avatar Maël Madon
Browse files

lots of progress on session builder. To be continued

parent 32268959
No related branches found
No related tags found
No related merge requests found
......@@ -3,14 +3,169 @@ The main building block of the program, where the cutting decisions are taken
according to the delimitation approach.
"""
from random import paretovariate
import threading
from workload import Job, Session
from datetime import date, datetime
import networkx as nx
def User():
class User:
"""Class representing a user of the workload."""
def __init__(self):
pass
def __init__(self,
user_id,
delim_approach,
delim_threshold,
dynamic_reduction=True,
build_graph_rep=False):
self.id = user_id
self.sessions = {} # dictionnary of session_id, Session object
self.active_session_id = None
self.last_submit_time = None
self.last_finish_time = None
self.max_finish_time = None
self.delim_approach = delim_approach
self.delim_threshold = delim_threshold * 60 # minutes to sec
self.dynamic_reduction = dynamic_reduction
if dynamic_reduction:
# set of sessions having unfinished jobs, at time t
self.sessions_in_progress = set()
# finished sessions, constituting the minimal list of dependencies for a session starting at time t
self.current_dep = set()
self.build_graph_rep = build_graph_rep
if build_graph_rep:
self.G = nx.DiGraph() # the (directed) dependancy graph
self.max_nb_res = 0
@private
def is_new_session(self, date_now):
"""Checks if a new session should be started according to the delimitation approach (see Zackay and Feitelson 2013)"""
if self.delim_approach == 'arrival':
inter_arrival_time = date_now - self.last_submit_time
return (inter_arrival_time > self.delim_threshold)
elif self.delim_approach == 'last':
think_time = date_now - self.last_finish_time
return (think_time > self.delim_threshold)
else: # 'max' delimitation approach
think_time = date_now - self.max_finish_time
return (think_time > self.delim_threshold)
def sessions_finished_at_time(self, t):
"""Return the list of session that were finished at time t"""
return [s for s in self.sessions.values() if s.max_finish_time <= t]
@private
def update_session_graph(self, date_now):
"""Add the dependencies for the active session"""
active_session = self.sessions[self.active_session_id]
if self.build_graph_rep:
self.G.add_node(self.active_session_id)
if not self.dynamic_reduction:
# For all sessions that were finished when the current session was
# started, add a directed edge weighted by the thinking time
for finish_sess in self.sessions_finished_at_time(date_now):
think_time = date_now - finish_sess.max_finish_time # >=0
active_session.preceding_sessions.append(finish_sess.id)
active_session.tt_after_prec_sess.append(think_time)
if self.build_graph_rep:
self.G.add_edge(self.active_session_id,
finish_sess.id,
weight=think_time)
else:
# Move the sessions recently finished from the set `_in_progress`
# to the set `current_dep`, removing all their neighbours in the
# dependancy graph from the set `current_dep` to keep it minimal
for s_id in self.sessions_in_progress:
sess = self.sessions[s_id]
if date_now > sess.max_finish_time: # ie sess is finished
self.sessions_in_progress.pop(s_id)
self.current_dep = self.current_dep - set(
sess.preceding_sessions)
self.current_dep.add(s_id)
# Add dependencies to active session
for s_id in self.current_dep:
sess = self.sessions[s_id]
think_time = date_now - sess.max_finish_time
active_session.preceding_sessions.append(sess.id)
active_session.tt_after_prec_sess.append(think_time)
if self.build_graph_rep:
self.G.add_edge(self.active_session_id,
sess.id,
weight=think_time)
@private
def create_new_session(self, date_now):
"""Create a new active session and achive the old one"""
# Archive active session
active_session = self.sessions[self.active_session_id]
if self.dynamic_reduction and date_now <= active_session.max_finish_time:
self.sessions_in_progress.add(self.active_session_id)
# Increment active session_id
self.active_session_id += 1
# Create new session
new_session = Session(self.active_session_id, date_now)
self.sessions[self.active_session_id] = new_session
@private
def add_job_to_active_session(self, job):
active_sess = self.sessions[self.active_session_id]
active_sess.add_job(job)
@public
def add_job(self, job):
"""Add a job to the job list of the user, creating a new session and handling its dependencies, if needed."""
if job.res > self.max_nb_res:
self.max_nb_res = job.res
date_now = job.submit_time
# does the job start a new session?
if self.is_new_session(date_now):
self.create_new_session(date_now)
self.update_session_graph(date_now)
self.add_job_to_active_session(job)
# Updating variables
self.last_submit_time = job.submit_time
self.last_finish_time = job.finish_time
self.max_finish_time = max(self.max_finish_time, job.finish_time)
def to_dict(self, descr=None, cmd=None, version=1):
"""User object to dictionnary, for printing or json export."""
if descr is None:
descr = f"user{self.id}"
res = {
"description": descr,
"date": datetime.now(),
"nb_res": self.max_nb_res,
"version": version,
"sessions": [s.to_dict() for _, s in self.sessions.items()]
}
if cmd is not None:
res["command"] = cmd
def to_dict(self):
pass
return res
......@@ -66,7 +66,17 @@ class Session:
self.first_submit = first_submit
self.preceding_sessions = []
self.tt_after_prec_sess = []
self.jobs = []
self.jobs = [] # list of Job objects
self.max_nb_res = 0
self.max_finish_time = None
def add_job(self, job):
"""Add job to the session"""
self.jobs.append(job)
self.max_finish_time = max(job.finish_time, self.max_finish_time)
def to_dict(self):
"""Session object to dictionnary, for printing or json export."""
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment