Skip to content
Snippets Groups Projects
Commit 5c8dae2b authored by Maël Madon's avatar Maël Madon
Browse files

Merge branch '20-easy-bf-fast' into 'master'

Use the faster implementation of easy backfilling

Closes #20 and #7

See merge request !19
parents 90fe3865 d912deef
Branches
Tags
1 merge request!19Use the faster implementation of easy backfilling
Pipeline #6073 passed
......@@ -39,6 +39,8 @@ Dynamic submission is supported by all the schedulers inheriting from the class
- *bin_packing_energy*: same as *bin_packing*, but saves power by switching off the machines as soon as they are idle and powering them on again when needed
- *multicore_filler*: for test purposes. Schedules only one job at a time on only one multicore host.
For guidance on how to add a new dynamic scheduler, refer to [this readme](src/scheds/readme.md).
## Install
With [Nix] package manager. At the root of the project:
......
......@@ -26,8 +26,8 @@ batmen_deps = [
# Source files
src = [
'src/scheds/easy_bf.cpp',
'src/scheds/easy_bf.hpp',
'src/scheds/easy_bf_fast.cpp',
'src/scheds/easy_bf_fast.hpp',
'src/scheds/fcfs.cpp',
'src/scheds/fcfs.hpp',
'src/scheds/multicore_filler.cpp',
......
......@@ -20,7 +20,7 @@
#include "pempek_assert.hpp"
#include "queue.hpp"
#include "scheds/easy_bf.hpp"
#include "scheds/easy_bf_fast.hpp"
#include "scheds/fcfs.hpp"
#include "scheds/bin_packing.hpp"
#include "scheds/bin_packing_energy.hpp"
......@@ -209,8 +209,9 @@ int main(int argc, char **argv)
// Scheduling variant
if (scheduling_variant == "fcfs")
algo = new FCFS(&w, &decision, nullptr, selector, rjms_delay, &json_doc_variant_options);
else if (scheduling_variant == "easy_bf")
algo = new EasyBackfilling(&w, &decision, queue, selector, rjms_delay, &json_doc_variant_options);
algo = new EasyBackfillingFast(&w, &decision, queue, selector, rjms_delay, &json_doc_variant_options);
else if (scheduling_variant == "multicore_filler")
algo = new MulticoreFiller(&w, &decision, queue, selector, rjms_delay, &json_doc_variant_options);
......
#include "easy_bf.hpp"
#include <loguru.hpp>
#include "../pempek_assert.hpp"
using namespace std;
EasyBackfilling::EasyBackfilling(Workload * workload,
SchedulingDecision * decision,
Queue * queue,
ResourceSelector * selector,
double rjms_delay,
rapidjson::Document * variant_options) :
DynScheduler(workload, decision, queue, selector, rjms_delay, variant_options)
{
}
EasyBackfilling::~EasyBackfilling()
{
}
void EasyBackfilling::on_simulation_start(double date, const rapidjson::Value & batsim_config)
{
/* Call superclass. If broker enabled, submit jobs date=0. */
DynScheduler::on_simulation_start(date, batsim_config);
_schedule = Schedule(_nb_machines, date);
(void) batsim_config;
}
void EasyBackfilling::on_simulation_end(double date)
{
(void) date;
}
void EasyBackfilling::make_decisions(double date,
SortableJobOrder::UpdateInformation *update_info,
SortableJobOrder::CompareInformation *compare_info)
{
const Job * priority_job_before = _queue->first_job_or_nullptr();
// Let's remove finished jobs from the schedule
for (const string & ended_job_id : _jobs_ended_recently)
_schedule.remove_job((*_workload)[ended_job_id]);
// Let's handle recently released jobs
std::vector<std::string> recently_queued_jobs;
for (const string & new_job_id : _jobs_released_recently)
{
const Job * new_job = (*_workload)[new_job_id];
if (new_job->nb_requested_resources > _nb_machines)
{
_decision->add_reject_job(new_job_id, date);
if (broker_enabled)
broker->update_status_if_dyn_job(new_job_id, KILLED);
}
else if (!new_job->has_walltime)
{
LOG_SCOPE_FUNCTION(INFO);
LOG_F(INFO, "Date=%g. Rejecting job '%s' as it has no walltime", date, new_job_id.c_str());
_decision->add_reject_job(new_job_id, date);
if (broker_enabled)
broker->update_status_if_dyn_job(new_job_id, KILLED);
}
else
{
_queue->append_job(new_job, update_info);
recently_queued_jobs.push_back(new_job_id);
}
}
// Let's update the schedule's present
_schedule.update_first_slice(date);
// Queue sorting
const Job * priority_job_after = nullptr;
sort_queue_while_handling_priority_job(priority_job_before, priority_job_after, update_info, compare_info);
// If no resources have been released, we can just try to backfill the newly-released jobs
if (_jobs_ended_recently.empty())
{
int nb_available_machines = _schedule.begin()->available_machines.size();
for (unsigned int i = 0; i < recently_queued_jobs.size() && nb_available_machines > 0; ++i)
{
const string & new_job_id = recently_queued_jobs[i];
const Job * new_job = (*_workload)[new_job_id];
// The job could have already been executed by sort_queue_while_handling_priority_job,
// that's why we check whether the queue contains the job.
if (_queue->contains_job(new_job) &&
new_job != priority_job_after &&
new_job->nb_requested_resources <= nb_available_machines)
{
Schedule::JobAlloc alloc = _schedule.add_job_first_fit(new_job, _selector);
if ( alloc.started_in_first_slice)
{
_decision->add_execute_job(new_job_id, alloc.used_machines, date);
if (broker_enabled)
broker->update_status_if_dyn_job(new_job_id, RUNNING);
_queue->remove_job(new_job);
nb_available_machines -= new_job->nb_requested_resources;
}
else
_schedule.remove_job(new_job);
}
}
}
else
{
// Some resources have been released, the whole queue should be traversed.
auto job_it = _queue->begin();
int nb_available_machines = _schedule.begin()->available_machines.size();
// Let's try to backfill all the jobs
while (job_it != _queue->end() && nb_available_machines > 0)
{
const Job * job = (*job_it)->job;
if (_schedule.contains_job(job))
_schedule.remove_job(job);
if (job == priority_job_after) // If the current job is priority
{
Schedule::JobAlloc alloc = _schedule.add_job_first_fit(job, _selector);
if (alloc.started_in_first_slice)
{
_decision->add_execute_job(job->id, alloc.used_machines, date);
if (broker_enabled)
broker->update_status_if_dyn_job(job->id, RUNNING);
job_it = _queue->remove_job(job_it); // Updating job_it to remove on traversal
priority_job_after = _queue->first_job_or_nullptr();
}
else
++job_it;
}
else // The job is not priority
{
Schedule::JobAlloc alloc = _schedule.add_job_first_fit(job, _selector);
if (alloc.started_in_first_slice)
{
_decision->add_execute_job(job->id, alloc.used_machines, date);
if (broker_enabled)
broker->update_status_if_dyn_job(job->id, RUNNING);
job_it = _queue->remove_job(job_it);
}
else
{
_schedule.remove_job(job);
++job_it;
}
}
}
}
/* make_decisions from superclass, will in particular take care of sending
* feedback about job status to the users, if broker enabled */
DynScheduler::make_decisions(date, update_info, compare_info);
}
void EasyBackfilling::sort_queue_while_handling_priority_job(const Job * priority_job_before,
const Job *& priority_job_after,
SortableJobOrder::UpdateInformation * update_info,
SortableJobOrder::CompareInformation * compare_info)
{
if (_debug)
LOG_F(1, "sort_queue_while_handling_priority_job beginning, %s", _schedule.to_string().c_str());
// Let's sort the queue
_queue->sort_queue(update_info, compare_info);
// Let the new priority job be computed
priority_job_after = _queue->first_job_or_nullptr();
// If the priority job has changed
if (priority_job_after != priority_job_before)
{
// If there was a priority job before, let it be removed from the schedule
if (priority_job_before != nullptr)
_schedule.remove_job_if_exists(priority_job_before);
// Let us ensure the priority job is in the schedule.
// To do so, while the priority job can be executed now, we keep on inserting it into the schedule
for (bool could_run_priority_job = true; could_run_priority_job && priority_job_after != nullptr; )
{
could_run_priority_job = false;
// Let's add the priority job into the schedule
Schedule::JobAlloc alloc = _schedule.add_job_first_fit(priority_job_after, _selector);
if (alloc.started_in_first_slice)
{
_decision->add_execute_job(priority_job_after->id, alloc.used_machines, (double)update_info->current_date);
if (broker_enabled)
broker->update_status_if_dyn_job(priority_job_after->id, RUNNING);
_queue->remove_job(priority_job_after);
priority_job_after = _queue->first_job_or_nullptr();
could_run_priority_job = true;
}
}
}
if (_debug)
LOG_F(1, "sort_queue_while_handling_priority_job ending, %s", _schedule.to_string().c_str());
}
#pragma once
#include <list>
#include "../users/dynscheduler.hpp"
#include "../locality.hpp"
#include "../schedule.hpp"
class EasyBackfilling : public DynScheduler
{
public:
EasyBackfilling(Workload * workload, SchedulingDecision * decision, Queue * queue, ResourceSelector * selector,
double rjms_delay, rapidjson::Document * variant_options);
virtual ~EasyBackfilling();
virtual void on_simulation_start(double date, const rapidjson::Value & batsim_config);
virtual void on_simulation_end(double date);
virtual void make_decisions(double date,
SortableJobOrder::UpdateInformation * update_info,
SortableJobOrder::CompareInformation * compare_info);
void sort_queue_while_handling_priority_job(const Job * priority_job_before,
const Job *& priority_job_after,
SortableJobOrder::UpdateInformation * update_info,
SortableJobOrder::CompareInformation * compare_info);
protected:
Schedule _schedule;
bool _debug = false;
};
#include "easy_bf_fast.hpp"
// #include <loguru.hpp>
#include "../pempek_assert.hpp"
EasyBackfillingFast::EasyBackfillingFast(Workload *workload,
SchedulingDecision *decision, Queue *queue, ResourceSelector *selector,
double rjms_delay, rapidjson::Document *variant_options)
: DynScheduler(
workload, decision, queue, selector, rjms_delay, variant_options)
{
}
EasyBackfillingFast::~EasyBackfillingFast()
{
}
void EasyBackfillingFast::on_simulation_start(
double date, const rapidjson::Value &batsim_config)
{
/* Call superclass. If broker enabled, submit jobs date=0. */
DynScheduler::on_simulation_start(date, batsim_config);
_available_machines.insert(
IntervalSet::ClosedInterval(0, _nb_machines - 1));
_nb_available_machines = _nb_machines;
PPK_ASSERT_ERROR(_available_machines.size() == (unsigned int)_nb_machines);
}
void EasyBackfillingFast::on_simulation_end(double date)
{
(void)date;
}
void EasyBackfillingFast::make_decisions(double date,
SortableJobOrder::UpdateInformation *update_info,
SortableJobOrder::CompareInformation *compare_info)
{
(void)update_info;
(void)compare_info;
// This algorithm is a fast version of EASY backfilling.
// It is meant to be fast in the usual case, not to handle corner cases
// (use the other easy backfilling available in batsched for this purpose).
// It is not meant to be easily readable or hackable ;).
// This fast EASY backfilling variant in a few words:
// - only handles the FCFS queue order
// - only handles the basic resource selection policy
// - only handles finite jobs (no switchoff), with walltimes
// - only handles one priority job (the first of the queue)
// - only handles time as floating-point (-> precision errors).
// Hacks:
// - uses priority job's completion time to store its expected starting time
bool job_ended = false;
// Handle newly finished jobs
for (const std::string &ended_job_id : _jobs_ended_recently)
{
job_ended = true;
Job *finished_job = (*_workload)[ended_job_id];
const Allocation &alloc = _current_allocations[ended_job_id];
// Update data structures
_available_machines.insert(alloc.machines);
_nb_available_machines += finished_job->nb_requested_resources;
_horizons.erase(alloc.horizon_it);
_current_allocations.erase(ended_job_id);
}
// If jobs have finished, let's execute jobs as long as they are priority
if (job_ended)
{
if (_priority_job != nullptr)
{
Allocation alloc;
FinishedHorizonPoint point;
if (_priority_job->nb_requested_resources <= _nb_available_machines)
{
// LOG_F(INFO, "Priority job fits!");
alloc.machines = _available_machines.left(
_priority_job->nb_requested_resources);
_decision->add_execute_job(
_priority_job->id, alloc.machines, date);
if (broker_enabled)
broker->update_status_if_dyn_job(
_priority_job->id, RUNNING);
point.nb_released_machines
= _priority_job->nb_requested_resources;
point.date = date + (double)_priority_job->walltime;
alloc.horizon_it = insert_horizon_point(point);
// Update data structures
_available_machines -= alloc.machines;
_nb_available_machines -= _priority_job->nb_requested_resources;
_current_allocations[_priority_job->id] = alloc;
_priority_job = nullptr;
// Execute the whole queue until a priority job cannot fit
for (auto job_it = _pending_jobs.begin();
job_it != _pending_jobs.end();)
{
Job *pending_job = *job_it;
if (pending_job->nb_requested_resources
<= _nb_available_machines)
{
alloc.machines = _available_machines.left(
pending_job->nb_requested_resources);
_decision->add_execute_job(
pending_job->id, alloc.machines, date);
if (broker_enabled)
broker->update_status_if_dyn_job(
pending_job->id, RUNNING);
point.nb_released_machines
= pending_job->nb_requested_resources;
point.date = date + (double)pending_job->walltime;
alloc.horizon_it = insert_horizon_point(point);
// Update data structures
_available_machines -= alloc.machines;
_nb_available_machines
-= pending_job->nb_requested_resources;
_current_allocations[pending_job->id] = alloc;
job_it = _pending_jobs.erase(job_it);
}
else
{
// The job becomes priority!
_priority_job = pending_job;
_priority_job->completion_time
= compute_priority_job_expected_earliest_starting_time();
_pending_jobs.erase(job_it);
// Stop first queue traversal.
break;
}
}
}
// Backfill jobs that does not hinder priority job.
if (_nb_available_machines > 0)
{
// Update priority job expected starting time (might have changed if a recently ended job
// completed before its walltime)
if (_priority_job != nullptr)
_priority_job->completion_time = compute_priority_job_expected_earliest_starting_time();
for (auto job_it = _pending_jobs.begin();
job_it != _pending_jobs.end();)
{
const Job *pending_job = *job_it;
// Can the job be executed now ?
if (pending_job->nb_requested_resources
<= _nb_available_machines
&& date + pending_job->walltime
<= _priority_job->completion_time)
{
// Yes, it can be backfilled!
alloc.machines = _available_machines.left(
pending_job->nb_requested_resources);
_decision->add_execute_job(
pending_job->id, alloc.machines, date);
if (broker_enabled)
broker->update_status_if_dyn_job(
pending_job->id, RUNNING);
point.nb_released_machines
= pending_job->nb_requested_resources;
point.date = date + (double)pending_job->walltime;
alloc.horizon_it = insert_horizon_point(point);
// Update data structures
_available_machines -= alloc.machines;
_nb_available_machines
-= pending_job->nb_requested_resources;
_current_allocations[pending_job->id] = alloc;
job_it = _pending_jobs.erase(job_it);
// Directly get out of the backfilling loop if all
// machines are busy.
if (_nb_available_machines <= 0)
break;
}
else
{
++job_it;
}
}
}
}
}
// Handle newly released jobs
for (const std::string &new_job_id : _jobs_released_recently)
{
Job *new_job = (*_workload)[new_job_id];
// Is the job valid on this platform?
if (new_job->nb_requested_resources > _nb_machines)
{
_decision->add_reject_job(new_job_id, date);
if (broker_enabled)
broker->update_status_if_dyn_job(new_job_id, KILLED);
}
else if (!new_job->has_walltime)
{
_decision->add_reject_job(new_job_id, date);
if (broker_enabled)
broker->update_status_if_dyn_job(new_job_id, KILLED);
}
// Can the job be executed right now?
else if (new_job->nb_requested_resources <= _nb_available_machines)
{
// LOG_F(INFO, "There are enough available resources (%d) to execute
// job %s", _nb_available_machines, new_job->id.c_str());
// Can it be executed now (without hindering priority job?)
if (_priority_job == nullptr
|| date + new_job->walltime <= _priority_job->completion_time)
{
// LOG_F(INFO, "Job %s can be started right away!",
// new_job->id.c_str());
// Yes, the job can be executed right away!
Allocation alloc;
alloc.machines
= _available_machines.left(new_job->nb_requested_resources);
_decision->add_execute_job(new_job_id, alloc.machines, date);
if (broker_enabled)
broker->update_status_if_dyn_job(new_job_id, RUNNING);
FinishedHorizonPoint point;
point.nb_released_machines = new_job->nb_requested_resources;
point.date = date + (double)new_job->walltime;
alloc.horizon_it = insert_horizon_point(point);
// Update data structures
_available_machines -= alloc.machines;
_nb_available_machines -= new_job->nb_requested_resources;
_current_allocations[new_job_id] = alloc;
}
else
{
// No, the job cannot be executed (hinders priority job.)
/*LOG_F(INFO, "Not enough time to execute job %s (walltime=%g,
priority job expected starting time=%g)",
new_job->id.c_str(), (double)new_job->walltime,
_priority_job->completion_time);*/
_pending_jobs.push_back(new_job);
}
}
else
{
// The job is too big to fit now.
if (_priority_job == nullptr)
{
// The job becomes priority.
_priority_job = new_job;
_priority_job->completion_time
= compute_priority_job_expected_earliest_starting_time();
}
else
{
// The job is queued up.
_pending_jobs.push_back(new_job);
}
}
}
/* make_decisions from superclass, will in particular take care of sending
* feedback about job status to the users, if broker enabled */
DynScheduler::make_decisions(date, update_info, compare_info);
}
double
EasyBackfillingFast::compute_priority_job_expected_earliest_starting_time()
{
int nb_available = _nb_available_machines;
int required = _priority_job->nb_requested_resources;
for (auto it = _horizons.begin(); it != _horizons.end(); ++it)
{
nb_available += it->nb_released_machines;
if (nb_available >= required)
{
return it->date;
}
}
PPK_ASSERT_ERROR(false, "The job will never be executable.");
return 0;
}
std::list<EasyBackfillingFast::FinishedHorizonPoint>::iterator
EasyBackfillingFast::insert_horizon_point(
const EasyBackfillingFast::FinishedHorizonPoint &point)
{
// The data structure is sorted, we can therefore traverse it in order
// until finding an insertion point.
for (auto it = _horizons.begin(); it != _horizons.end(); ++it)
{
if (point.date < it->date)
{
// Insertion point is before the current iterator.
return _horizons.insert(it, point);
}
}
// Insertion point not found. Insertion at end.
return _horizons.insert(_horizons.end(), point);
}
#pragma once
#include <unordered_map>
#include <list>
#include "../users/dynscheduler.hpp"
#include "../locality.hpp"
class EasyBackfillingFast : public DynScheduler
{
public:
EasyBackfillingFast(Workload * workload, SchedulingDecision * decision,
Queue * queue, ResourceSelector * selector,
double rjms_delay,
rapidjson::Document * variant_options);
virtual ~EasyBackfillingFast();
virtual void on_simulation_start(double date,
const rapidjson::Value & batsim_config);
virtual void on_simulation_end(double date);
virtual void make_decisions(double date,
SortableJobOrder::UpdateInformation * update_info,
SortableJobOrder::CompareInformation * compare_info);
private:
struct FinishedHorizonPoint
{
double date;
int nb_released_machines;
};
struct Allocation
{
IntervalSet machines;
std::list<FinishedHorizonPoint>::iterator horizon_it;
};
private:
double compute_priority_job_expected_earliest_starting_time();
std::list<FinishedHorizonPoint>::iterator insert_horizon_point(const FinishedHorizonPoint & point);
private:
// Machines currently available
IntervalSet _available_machines;
int _nb_available_machines = -1;
// Pending jobs (queue; without the priority job)
std::list<Job *> _pending_jobs;
// Allocations of running jobs
std::unordered_map<std::string, Allocation> _current_allocations;
// When running jobs are expected to finish.
// Always sorted by increasing date.
std::list<FinishedHorizonPoint> _horizons;
// At any time, null if there is no priority job (no waiting job)
Job * _priority_job = nullptr;
};
# Steps to create a new dynamic scheduler
For now, if someone wants to create a new dynamic scheduler (or transform a static scheduler into dynamic), here is what one should do:
- change super class from `ISchedulingAlgorithm` to `DynScheduler`
- add the call `DynScheduler::on_simulation_start(date, batsim_config);` at the beginning of `on_simulation_start`
- add the call `DynScheduler::make_decisions(date, update_info, compare_info);` at the end of `make_decisions`
- update the status of jobs with `if (broker_enabled) broker->update_status_if_dyn_job(new_job_id, STATUS);` each time the scheduler adds or rejects a job
See for example [this commit](https://gitlab.irit.fr/sepia-pub/mael/batmen/-/commit/5b4e30a631c7544ad009b2368512e78b79ffa5b9).
......@@ -40,7 +40,7 @@ def pytest_generate_tests(metafunc):
if 'sched_mono' in metafunc.fixturenames:
scheds = [
Scheduler('easy_bf', 'easy'),
Scheduler('fcfs', 'fcfs')
Scheduler('fcfs', 'fcfs')
]
metafunc.parametrize('sched_mono', scheds)
......
......@@ -5,6 +5,7 @@ import subprocess
import filecmp
from collections import namedtuple
from os.path import abspath
import pandas as pd
empty_workload = abspath('test/workloads/static/empty.json')
......@@ -79,4 +80,15 @@ def assert_expected_behavior(test_file) :
assert filecmp.cmp(expected, obtained), f"\
Files {expected} and {obtained} should be equal but are not.\n\
Run `diff {expected} {obtained}` to investigate why.\n\
Run `cp {obtained} {expected}` to override the expected file with the obtained."
\ No newline at end of file
Run `cp {obtained} {expected}` to override the expected file with the obtained."
def assert_same_outputs(expected, obtained):
"""Test that the relevent columns of the _jobs.csv are identical"""
cols = ["job_id", "submission_time", "final_state", "starting_time", "execution_time"]
df_o = pd.read_csv(obtained)[cols]
df_e = pd.read_csv(expected)[cols]
assert df_o.equals(df_e), f"\
Files {expected} and {obtained} should be equal but are not.\n\
Run `diff {expected} {obtained}` to investigate why."
\ No newline at end of file
......@@ -71,6 +71,4 @@ def test_easy_bf(platform_monoC, workload_static):
instance.to_file(robin_filename)
ret = run_robin(robin_filename)
assert ret.returncode == 0
assert ret.returncode == 0
\ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment