Skip to content
Snippets Groups Projects
Commit 08030529 authored by Maël Madon's avatar Maël Madon
Browse files

merging add_easy_bf branch into new_sched

parents e7950e83 8260e577
No related branches found
No related tags found
1 merge request!5New dynamic schedulers: fcfs and easy-bf
Pipeline #4614 passed
Showing
with 866 additions and 832 deletions
......@@ -12,7 +12,7 @@ EasyBackfilling::EasyBackfilling(Workload * workload,
ResourceSelector * selector,
double rjms_delay,
rapidjson::Document * variant_options) :
ISchedulingAlgorithm(workload, decision, queue, selector, rjms_delay, variant_options)
DynScheduler(workload, decision, queue, selector, rjms_delay, variant_options)
{
}
......@@ -23,6 +23,9 @@ EasyBackfilling::~EasyBackfilling()
void EasyBackfilling::on_simulation_start(double date, const rapidjson::Value & batsim_config)
{
/* Call superclass. If broker enabled, submit jobs date=0. */
DynScheduler::on_simulation_start(date, batsim_config);
_schedule = Schedule(_nb_machines, date);
(void) batsim_config;
}
......@@ -51,12 +54,16 @@ void EasyBackfilling::make_decisions(double date,
if (new_job->nb_requested_resources > _nb_machines)
{
_decision->add_reject_job(new_job_id, date);
if (broker_enabled)
broker->update_status_if_dyn_job(new_job_id, KILLED);
}
else if (!new_job->has_walltime)
{
LOG_SCOPE_FUNCTION(INFO);
LOG_F(INFO, "Date=%g. Rejecting job '%s' as it has no walltime", date, new_job_id.c_str());
_decision->add_reject_job(new_job_id, date);
if (broker_enabled)
broker->update_status_if_dyn_job(new_job_id, KILLED);
}
else
{
......@@ -92,6 +99,10 @@ void EasyBackfilling::make_decisions(double date,
if ( alloc.started_in_first_slice)
{
_decision->add_execute_job(new_job_id, alloc.used_machines, date);
if (broker_enabled)
broker->update_status_if_dyn_job(new_job_id, RUNNING);
_queue->remove_job(new_job);
nb_available_machines -= new_job->nb_requested_resources;
}
......@@ -121,6 +132,9 @@ void EasyBackfilling::make_decisions(double date,
if (alloc.started_in_first_slice)
{
_decision->add_execute_job(job->id, alloc.used_machines, date);
if (broker_enabled)
broker->update_status_if_dyn_job(job->id, RUNNING);
job_it = _queue->remove_job(job_it); // Updating job_it to remove on traversal
priority_job_after = _queue->first_job_or_nullptr();
}
......@@ -134,6 +148,9 @@ void EasyBackfilling::make_decisions(double date,
if (alloc.started_in_first_slice)
{
_decision->add_execute_job(job->id, alloc.used_machines, date);
if (broker_enabled)
broker->update_status_if_dyn_job(job->id, RUNNING);
job_it = _queue->remove_job(job_it);
}
else
......@@ -144,6 +161,10 @@ void EasyBackfilling::make_decisions(double date,
}
}
}
/* make_decisions from superclass, will in particular take care of sending
* feedback about job status to the users, if broker enabled */
DynScheduler::make_decisions(date, update_info, compare_info);
}
......@@ -180,6 +201,8 @@ void EasyBackfilling::sort_queue_while_handling_priority_job(const Job * priorit
if (alloc.started_in_first_slice)
{
_decision->add_execute_job(priority_job_after->id, alloc.used_machines, (double)update_info->current_date);
if (broker_enabled)
broker->update_status_if_dyn_job(priority_job_after->id, RUNNING);
_queue->remove_job(priority_job_after);
priority_job_after = _queue->first_job_or_nullptr();
could_run_priority_job = true;
......
......@@ -2,12 +2,11 @@
#include <list>
#include "../isalgorithm.hpp"
#include "../json_workload.hpp"
#include "../broker/dynscheduler.hpp"
#include "../locality.hpp"
#include "../schedule.hpp"
class EasyBackfilling : public ISchedulingAlgorithm
class EasyBackfilling : public DynScheduler
{
public:
EasyBackfilling(Workload * workload, SchedulingDecision * decision, Queue * queue, ResourceSelector * selector,
......
......@@ -40,13 +40,13 @@ CloudBroker::CloudBroker(rapidjson::Document *user_description_file)
&& users_json[i]["name"].IsString(),
"Invalid user_description file: user should have string field "
"'name'.");
std::string name = users_json[i]["name"].GetString();
string name = users_json[i]["name"].GetString();
PPK_ASSERT_ERROR(users_json[i].HasMember("category")
&& users_json[i]["category"].IsString(),
"Invalid user_description file: user should have string field "
"'category'.");
std::string category = users_json[i]["category"].GetString();
string category = users_json[i]["category"].GetString();
PPK_ASSERT_ERROR(users_json[i].HasMember("param")
&& users_json[i]["param"].IsObject(),
......@@ -125,7 +125,7 @@ CloudBroker::CloudBroker(rapidjson::Document *user_description_file)
// int nb_users = 2;
// for (int i=0; i<nb_users; i++)
// {
// std::string user_name = "user" + std::to_string(i)
// string user_name = "user" + to_string(i)
// users.push_back(new DichoIntersubmitTimeUser(user_name, ));
// }
......@@ -149,16 +149,16 @@ CloudBroker::~CloudBroker()
users_to_wake.clear();
}
double CloudBroker::next_submission(double date)
double CloudBroker::next_submission(double date) const
{
return user_queue.front()->next_submission();
}
void CloudBroker::jobs_to_submit(
double date, std::list<Job *> &jobs, std::list<Profile *> &profiles)
double date, list<Job *> &jobs, list<Profile *> &profiles)
{
jobs = std::list<Job *>();
profiles = std::list<Profile *>();
jobs = list<Job *>();
profiles = list<Profile *>();
User *user = user_queue.front();
double planned_date_submission = user->next_submission();
......@@ -171,8 +171,8 @@ void CloudBroker::jobs_to_submit(
while (planned_date_submission == user->next_submission())
{
user_queue.pop_front();
std::list<Job *> user_jobs;
std::list<Profile *> user_profiles;
list<Job *> user_jobs;
list<Profile *> user_profiles;
user->jobs_to_submit(date, user_jobs, user_profiles);
for (Job *job : user_jobs)
......@@ -192,17 +192,17 @@ void CloudBroker::jobs_to_submit(
}
void CloudBroker::feedback_job_status(double date,
std::vector<std::string> &jobs_ended, std::vector<std::string> &jobs_killed,
std::vector<std::string> &jobs_released)
vector<string> &jobs_ended, vector<string> &jobs_killed,
vector<string> &jobs_released)
{
/* Jobs ended recently */
for (const std::string &job_id : jobs_ended)
for (const string &job_id : jobs_ended)
{
update_status_if_dyn_job(job_id, FINISHED);
}
/* Jobs killed recently */
for (const std::string &job_id : jobs_killed)
for (const string &job_id : jobs_killed)
{
update_status_if_dyn_job(job_id, KILLED);
}
......@@ -220,12 +220,12 @@ void CloudBroker::feedback_job_status(double date,
}
void CloudBroker::update_status_if_dyn_job(
const std::string &job_id, JobStatus status)
const string &job_id, JobStatus status)
{
auto it = dynamic_jobs.find(job_id);
if (it != dynamic_jobs.end())
{
std::string user_name = job_id.substr(0, job_id.find('!'));
string user_name = job_id.substr(0, job_id.find('!'));
User *user = users[user_name];
Job *current_job = it->second;
......@@ -246,7 +246,7 @@ void CloudBroker::update_status_if_dyn_job(
/* Add potentially interested user to the map of users to wake up */
if (users_to_wake.find(user) == users_to_wake.end())
users_to_wake.emplace(user, std::list<Job *>());
users_to_wake.emplace(user, list<Job *>());
/* ..and keep track of its recently ended jobs */
users_to_wake[user].push_back(current_job);
......@@ -261,7 +261,7 @@ void CloudBroker::update_status_if_dyn_job(
/* Add potentially interested user to the map of users to wake up */
if (users_to_wake.find(user) == users_to_wake.end())
users_to_wake.emplace(user, std::list<Job *>());
users_to_wake.emplace(user, list<Job *>());
/* ..and keep track of its recently ended jobs */
users_to_wake[user].push_back(current_job);
......@@ -272,7 +272,7 @@ void CloudBroker::update_status_if_dyn_job(
}
}
void CloudBroker::log_user_stats(std::string log_folder)
void CloudBroker::log_user_stats(string log_folder)
{
static int stat[10] = { 0 };
int *dm_stat;
......@@ -285,7 +285,7 @@ void CloudBroker::log_user_stats(std::string log_folder)
}
/* Write in file */
std::ofstream file(log_folder + "/user_stats.csv");
ofstream file(log_folder + "/user_stats.csv");
file << ",nb_jobs,core_seconds\n";
file << boost::format("rigid,%1%,%2%\n") % stat[2 * RIGID]
......
......@@ -9,6 +9,8 @@
#include <string>
#include <vector>
using namespace std;
class CloudBroker
{
public:
......@@ -22,7 +24,7 @@ public:
* DATE_UNKNOWN = -2.0 if the broker doesn't have a next submission date
* yet (user waiting for feedback)
*/
double next_submission(double date);
double next_submission(double date) const;
/**
* @brief Return the list of jobs to submit by any of the users.
......@@ -32,28 +34,28 @@ public:
* @param[out] profiles The list of profiles used by the jobs, if new.
*/
void jobs_to_submit(
double date, std::list<Job *> &jobs, std::list<Profile *> &profiles);
double date, list<Job *> &jobs, list<Profile *> &profiles);
/**
* @brief Ackowledge the latest execution-related activity and forward the
* info to interested users.
*/
void feedback_job_status(double date, std::vector<std::string> &jobs_ended,
std::vector<std::string> &jobs_killed,
std::vector<std::string> &jobs_released);
void log_user_stats(std::string user_stat_file);
void update_status_if_dyn_job(const std::string &job_id, JobStatus status);
void feedback_job_status(double date, vector<string> &jobs_ended,
vector<string> &jobs_killed,
vector<string> &jobs_released);
void log_user_stats(string user_stat_file);
void update_status_if_dyn_job(const string &job_id, JobStatus status);
private:
std::map<std::string, User *> users;
std::list<User *> user_queue;
std::map<std::string, Job *> dynamic_jobs = std::map<std::string, Job *>();
std::map<User *, std::list<Job *>> users_to_wake
= std::map<User *, std::list<Job *>>();
map<string, User *> users;
list<User *> user_queue;
map<string, Job *> dynamic_jobs = map<string, Job *>();
map<User *, list<Job *>> users_to_wake
= map<User *, list<Job *>>();
private:
/* Deterministic generation of seeds for users that use randomness */
std::mt19937 seed_generator = std::mt19937(1997);
mt19937 seed_generator = mt19937(1997);
/* (Optional) The demand response window for the DM users */
DMWindow *dm_window = nullptr;
......
......@@ -10,7 +10,7 @@ User::~User()
{
}
double User::next_submission()
double User::next_submission() const
{
return date_of_next_submission;
}
......
......@@ -17,7 +17,7 @@ public:
* DATE_NEVER if she has finished to submit or DATE_UNKNOWN if she is
* waiting for feedback.
*/
double next_submission();
double next_submission() const;
/**
* @brief Return the jobs to submit by the user at this date
......
......@@ -73,7 +73,8 @@ void DichoIntersubmitTimeUser::jobs_to_submit(
job->profile = "100_sec";
job->submission_time = date;
job->nb_requested_resources = 1;
job->has_walltime = false;
job->has_walltime = true;
job->walltime = 3600;
last_job_submitted = job;
jobs.push_back(job);
......@@ -141,11 +142,11 @@ RoutineGreedyUser::~RoutineGreedyUser()
{
}
bool RoutineGreedyUser::all_jobs_finished()
bool RoutineGreedyUser::all_jobs_finished_successfully() const
{
for (Job *job : last_jobs_submitted)
for (const Job *job : last_jobs_submitted)
{
if (job->status == WAITING || job->status == RUNNING
if (job->status == WAITING || job->status == RUNNING
|| job->status == KILLED)
return false;
}
......@@ -166,7 +167,7 @@ void RoutineGreedyUser::jobs_to_submit(
}
/* All job finished: submit twice as many jobs */
else if (all_jobs_finished())
else if (all_jobs_finished_successfully())
nb_jobs_to_submit
= (nb_jobs_to_submit == 0) ? 1 : nb_jobs_to_submit * 2;
......@@ -182,7 +183,8 @@ void RoutineGreedyUser::jobs_to_submit(
job->profile = "100_sec";
job->submission_time = date;
job->nb_requested_resources = 1;
job->has_walltime = false;
job->has_walltime = true;
job->walltime = 3600;
last_jobs_submitted.push_back(job);
jobs.push_back(job);
......
......@@ -25,7 +25,7 @@ private:
double delay_sup;
double delay_between_sumbit;
int job_id = 0;
Job *last_job_submitted = nullptr;
const Job *last_job_submitted = nullptr;
};
/**
......@@ -55,8 +55,8 @@ private:
bool first_submit = true;
int job_id = 0;
std::list<Job *> last_jobs_submitted;
bool all_jobs_finished();
std::list<const Job *> last_jobs_submitted;
bool all_jobs_finished_successfully() const;
};
/**
......
......@@ -177,12 +177,7 @@ Job *Workload::job_from_json_object(const Value &object)
bool JobComparator::operator()(const Job *j1, const Job *j2) const
{
PPK_ASSERT_ERROR(j1->id.find('!') != string::npos,
"I thought jobID had always a form 'wl!id'...");
string id1 = j1->id.substr(j1->id.find('!') + 1, j1->id.size());
string id2 = j2->id.substr(j2->id.find('!') + 1, j2->id.size());
return id1 < id2;
return j1->unique_number < j2->unique_number;
}
bool SessionComparator::operator()(const Session *s1, const Session *s2) const
......
......@@ -581,6 +581,9 @@ Schedule::TimeSliceConstIterator Schedule::find_first_occurence_of_job(
for (auto slice_it = starting_point; slice_it != _profile.end(); ++slice_it)
{
const TimeSlice &slice = *slice_it;
if (_debug)
LOG_F(1, "Looking for job %s. TimeSlice allocated jobs: %s", job->id.c_str(), slice.to_string_allocated_jobs().c_str());
if (slice.allocated_jobs.count(job) == 1)
return slice_it;
}
......
......@@ -4,6 +4,7 @@ import pytest
import subprocess
from collections import namedtuple
from os.path import abspath, basename
from os import listdir
Workload = namedtuple('Workload', ['name', 'filename'])
Platform = namedtuple('Platform', ['name', 'filename'])
......@@ -47,6 +48,14 @@ def pytest_generate_tests(metafunc):
]
metafunc.parametrize('sched_multi', scheds)
if 'test_instance' in metafunc.fixturenames:
instance_files = glob.glob('test/expected_log/*')
instances = [basename(instance_file).replace('_jobs.csv', '')
for instance_file in instance_files]
metafunc.parametrize('test_instance', instances)
# def pytest_cmdline_preparse(config, args):
# html_file = "test-out/testreport.html"
# print('HTML report file:', html_file)
......
This diff is collapsed.
This diff is collapsed.
from helper import *
def test_broker_mono(platform_monoC, workload_static):
Scheduler = namedtuple('Scheduler', ['name', 'short_name'])
sched_mono = Scheduler("fcfs", "fcfs")
# def test_broker_mono(platform_monoC, workload_static):
# Scheduler = namedtuple('Scheduler', ['name', 'short_name'])
# sched_mono = Scheduler("fcfs", "fcfs")
# def test_broker_mono(platform_monoC, workload_static, sched_mono):
# """Test the broker mode with monocore platform files and schedulers"""
def test_broker_mono(platform_monoC, workload_static, sched_mono):
"""Test the broker mode with monocore platform files and schedulers"""
test_name = f'broker-{sched_mono.short_name}-{platform_monoC.name}-{workload_static.name}'
output_dir, robin_filename, _ = init_instance(test_name)
......
from helper import *
def test_expected_output():
for log_file in os.listdir('test/expected_log'):
instance_name = log_file[:-9]
assert_expected_output(instance_name)
def test_expected_output(test_instance):
"""Test for each expected log (thanks to pytest fixtures) that the test was
run and the _jobs.csv outputs are strictly equal"""
assert_expected_output(test_instance)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment