Skip to content
Snippets Groups Projects
Commit 6fea48a8 authored by Maël Madon's avatar Maël Madon
Browse files

dev: remove slow version of easy_bf and replace with fast. Adapt releted tests

parent 58b917c9
Branches
Tags
1 merge request!19Use the faster implementation of easy backfilling
Pipeline #6070 passed
......@@ -26,8 +26,6 @@ batmen_deps = [
# Source files
src = [
'src/scheds/easy_bf.cpp',
'src/scheds/easy_bf.hpp',
'src/scheds/easy_bf_fast.cpp',
'src/scheds/easy_bf_fast.hpp',
'src/scheds/fcfs.cpp',
......
......@@ -20,7 +20,6 @@
#include "pempek_assert.hpp"
#include "queue.hpp"
#include "scheds/easy_bf.hpp"
#include "scheds/easy_bf_fast.hpp"
#include "scheds/fcfs.hpp"
#include "scheds/bin_packing.hpp"
......@@ -49,7 +48,7 @@ void run(Network &n, ISchedulingAlgorithm *algo, SchedulingDecision &d, Workload
int main(int argc, char **argv)
{
const set<string> variants_set = { "fcfs", "easy_bf", "easy_bf_fast", "multicore_filler", "bin_packing", "bin_packing_energy" };
const set<string> variants_set = { "fcfs", "easy_bf", "multicore_filler", "bin_packing", "bin_packing_energy" };
const set<string> policies_set = { "basic", "contiguous" };
const set<string> verbosity_levels_set = { "debug", "info", "quiet", "silent" };
......@@ -198,7 +197,7 @@ int main(int argc, char **argv)
LOG_F(1, "variant_options = '%s'", variant_options.c_str());
// Job order
if (scheduling_variant == "easy_bf" || scheduling_variant == "easy_bf_fast" || scheduling_variant == "multicore_filler"){
if (scheduling_variant == "easy_bf" || scheduling_variant == "multicore_filler"){
order = new FCFSOrder;
queue = new Queue(order);
}
......@@ -212,9 +211,6 @@ int main(int argc, char **argv)
algo = new FCFS(&w, &decision, nullptr, selector, rjms_delay, &json_doc_variant_options);
else if (scheduling_variant == "easy_bf")
algo = new EasyBackfilling(&w, &decision, queue, selector, rjms_delay, &json_doc_variant_options);
else if (scheduling_variant == "easy_bf_fast")
algo = new EasyBackfillingFast(&w, &decision, queue, selector, rjms_delay, &json_doc_variant_options);
else if (scheduling_variant == "multicore_filler")
......
#include "easy_bf.hpp"
#include <loguru.hpp>
#include "../pempek_assert.hpp"
using namespace std;
EasyBackfilling::EasyBackfilling(Workload * workload,
SchedulingDecision * decision,
Queue * queue,
ResourceSelector * selector,
double rjms_delay,
rapidjson::Document * variant_options) :
DynScheduler(workload, decision, queue, selector, rjms_delay, variant_options)
{
}
EasyBackfilling::~EasyBackfilling()
{
}
void EasyBackfilling::on_simulation_start(double date, const rapidjson::Value & batsim_config)
{
/* Call superclass. If broker enabled, submit jobs date=0. */
DynScheduler::on_simulation_start(date, batsim_config);
_schedule = Schedule(_nb_machines, date);
(void) batsim_config;
}
void EasyBackfilling::on_simulation_end(double date)
{
(void) date;
}
void EasyBackfilling::make_decisions(double date,
SortableJobOrder::UpdateInformation *update_info,
SortableJobOrder::CompareInformation *compare_info)
{
const Job * priority_job_before = _queue->first_job_or_nullptr();
// Let's remove finished jobs from the schedule
for (const string & ended_job_id : _jobs_ended_recently)
_schedule.remove_job((*_workload)[ended_job_id]);
// Let's handle recently released jobs
std::vector<std::string> recently_queued_jobs;
for (const string & new_job_id : _jobs_released_recently)
{
const Job * new_job = (*_workload)[new_job_id];
if (new_job->nb_requested_resources > _nb_machines)
{
_decision->add_reject_job(new_job_id, date);
if (broker_enabled)
broker->update_status_if_dyn_job(new_job_id, KILLED);
}
else if (!new_job->has_walltime)
{
LOG_SCOPE_FUNCTION(INFO);
LOG_F(INFO, "Date=%g. Rejecting job '%s' as it has no walltime", date, new_job_id.c_str());
_decision->add_reject_job(new_job_id, date);
if (broker_enabled)
broker->update_status_if_dyn_job(new_job_id, KILLED);
}
else
{
_queue->append_job(new_job, update_info);
recently_queued_jobs.push_back(new_job_id);
}
}
// Let's update the schedule's present
_schedule.update_first_slice(date);
// Queue sorting
const Job * priority_job_after = nullptr;
sort_queue_while_handling_priority_job(priority_job_before, priority_job_after, update_info, compare_info);
// If no resources have been released, we can just try to backfill the newly-released jobs
if (_jobs_ended_recently.empty())
{
int nb_available_machines = _schedule.begin()->available_machines.size();
for (unsigned int i = 0; i < recently_queued_jobs.size() && nb_available_machines > 0; ++i)
{
const string & new_job_id = recently_queued_jobs[i];
const Job * new_job = (*_workload)[new_job_id];
// The job could have already been executed by sort_queue_while_handling_priority_job,
// that's why we check whether the queue contains the job.
if (_queue->contains_job(new_job) &&
new_job != priority_job_after &&
new_job->nb_requested_resources <= nb_available_machines)
{
Schedule::JobAlloc alloc = _schedule.add_job_first_fit(new_job, _selector);
if ( alloc.started_in_first_slice)
{
_decision->add_execute_job(new_job_id, alloc.used_machines, date);
if (broker_enabled)
broker->update_status_if_dyn_job(new_job_id, RUNNING);
_queue->remove_job(new_job);
nb_available_machines -= new_job->nb_requested_resources;
}
else
_schedule.remove_job(new_job);
}
}
}
else
{
// Some resources have been released, the whole queue should be traversed.
auto job_it = _queue->begin();
int nb_available_machines = _schedule.begin()->available_machines.size();
// Let's try to backfill all the jobs
while (job_it != _queue->end() && nb_available_machines > 0)
{
const Job * job = (*job_it)->job;
if (_schedule.contains_job(job))
_schedule.remove_job(job);
if (job == priority_job_after) // If the current job is priority
{
Schedule::JobAlloc alloc = _schedule.add_job_first_fit(job, _selector);
if (alloc.started_in_first_slice)
{
_decision->add_execute_job(job->id, alloc.used_machines, date);
if (broker_enabled)
broker->update_status_if_dyn_job(job->id, RUNNING);
job_it = _queue->remove_job(job_it); // Updating job_it to remove on traversal
priority_job_after = _queue->first_job_or_nullptr();
}
else
++job_it;
}
else // The job is not priority
{
Schedule::JobAlloc alloc = _schedule.add_job_first_fit(job, _selector);
if (alloc.started_in_first_slice)
{
_decision->add_execute_job(job->id, alloc.used_machines, date);
if (broker_enabled)
broker->update_status_if_dyn_job(job->id, RUNNING);
job_it = _queue->remove_job(job_it);
}
else
{
_schedule.remove_job(job);
++job_it;
}
}
}
}
/* make_decisions from superclass, will in particular take care of sending
* feedback about job status to the users, if broker enabled */
DynScheduler::make_decisions(date, update_info, compare_info);
}
void EasyBackfilling::sort_queue_while_handling_priority_job(const Job * priority_job_before,
const Job *& priority_job_after,
SortableJobOrder::UpdateInformation * update_info,
SortableJobOrder::CompareInformation * compare_info)
{
if (_debug)
LOG_F(1, "sort_queue_while_handling_priority_job beginning, %s", _schedule.to_string().c_str());
// Let's sort the queue
_queue->sort_queue(update_info, compare_info);
// Let the new priority job be computed
priority_job_after = _queue->first_job_or_nullptr();
// If the priority job has changed
if (priority_job_after != priority_job_before)
{
// If there was a priority job before, let it be removed from the schedule
if (priority_job_before != nullptr)
_schedule.remove_job_if_exists(priority_job_before);
// Let us ensure the priority job is in the schedule.
// To do so, while the priority job can be executed now, we keep on inserting it into the schedule
for (bool could_run_priority_job = true; could_run_priority_job && priority_job_after != nullptr; )
{
could_run_priority_job = false;
// Let's add the priority job into the schedule
Schedule::JobAlloc alloc = _schedule.add_job_first_fit(priority_job_after, _selector);
if (alloc.started_in_first_slice)
{
_decision->add_execute_job(priority_job_after->id, alloc.used_machines, (double)update_info->current_date);
if (broker_enabled)
broker->update_status_if_dyn_job(priority_job_after->id, RUNNING);
_queue->remove_job(priority_job_after);
priority_job_after = _queue->first_job_or_nullptr();
could_run_priority_job = true;
}
}
}
if (_debug)
LOG_F(1, "sort_queue_while_handling_priority_job ending, %s", _schedule.to_string().c_str());
}
#pragma once
#include <list>
#include "../users/dynscheduler.hpp"
#include "../locality.hpp"
#include "../schedule.hpp"
class EasyBackfilling : public DynScheduler
{
public:
EasyBackfilling(Workload * workload, SchedulingDecision * decision, Queue * queue, ResourceSelector * selector,
double rjms_delay, rapidjson::Document * variant_options);
virtual ~EasyBackfilling();
virtual void on_simulation_start(double date, const rapidjson::Value & batsim_config);
virtual void on_simulation_end(double date);
virtual void make_decisions(double date,
SortableJobOrder::UpdateInformation * update_info,
SortableJobOrder::CompareInformation * compare_info);
void sort_queue_while_handling_priority_job(const Job * priority_job_before,
const Job *& priority_job_after,
SortableJobOrder::UpdateInformation * update_info,
SortableJobOrder::CompareInformation * compare_info);
protected:
Schedule _schedule;
bool _debug = false;
};
......@@ -36,8 +36,7 @@ def pytest_generate_tests(metafunc):
if 'sched_mono' in metafunc.fixturenames:
scheds = [
Scheduler('easy_bf', 'easy'),
Scheduler('easy_bf_fast', 'easy_fast'),
Scheduler('fcfs', 'fcfs')
Scheduler('fcfs', 'fcfs')
]
metafunc.parametrize('sched_mono', scheds)
......
......@@ -26,14 +26,6 @@ def test_broker_mono(platform_monoC, workload_static, sched_mono):
if has_expected_output(test_name):
assert_expected_output(test_name)
def test_easy_bf_fast(platform_monoC, workload_static):
test_name_easy = f'broker-easy-{platform_monoC.name}-{workload_static.name}'
test_name_easy_fast = f'broker-easy_fast-{platform_monoC.name}-{workload_static.name}'
expected = 'test-out/' + test_name_easy + '/_jobs.csv'
obtained = 'test-out/' + test_name_easy_fast + '/_jobs.csv'
assert_same_outputs(expected, obtained)
def test_broker_multi(platform_multiC, workload_static, sched_multi):
"""Test the broker mode with multicore platform files and schedulers"""
......
......@@ -39,30 +39,4 @@ def test_easy_bf(platform_monoC, workload_static):
instance.to_file(robin_filename)
ret = run_robin(robin_filename)
assert ret.returncode == 0
def test_easy_bf_fast(platform_monoC, workload_static):
"""Tests if easy_bf and easy_bf_fast behave the same"""
sched = "easy_bf_fast"
test_name = f'{sched}-{platform_monoC.name}-{workload_static.name}'
output_dir, robin_filename, _ = init_instance(test_name)
batcmd = gen_batsim_cmd(platform_monoC.filename, workload_static.filename, output_dir, "")
instance = RobinInstance(output_dir=output_dir,
batcmd=batcmd,
schedcmd=f"batmen -v '{sched}'",
simulation_timeout=30, ready_timeout=5,
success_timeout=10, failure_timeout=0
)
instance.to_file(robin_filename)
ret = run_robin(robin_filename)
assert ret.returncode == 0
# Test if the output are the same than with easy_bf
test_easy = f'easy_bf-{platform_monoC.name}-{workload_static.name}'
obtained = f'test-out/{test_name}/_jobs.csv'
expected = f'test-out/{test_easy}/_jobs.csv'
assert_same_outputs(expected, obtained)
\ No newline at end of file
assert ret.returncode == 0
\ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment