Skip to content
Snippets Groups Projects
Commit 16368e56 authored by Maël Madon's avatar Maël Madon
Browse files

refac: core_per_user checked by broker instead of dynsched

parent f0979d2e
Branches
Tags
1 merge request!10Better memory management and float rounding issues
......@@ -433,7 +433,7 @@ Job *Parser::job_from_json_object(string user_name, const Value &object)
return j;
}
string Parser::json_description_string_from_job(Job *job)
string Parser::json_description_string_from_job(const Job *job)
{
PPK_ASSERT_ERROR(job->id != "", "A job must have the field 'id'.");
PPK_ASSERT_ERROR(
......
......@@ -13,7 +13,7 @@
#include <utility>
struct JobAlloc;
struct Session;
class Session;
#define DATE_NEVER -1.0
#define DATE_UNKNOWN -2.0
......@@ -129,7 +129,7 @@ struct Parser
static Job *job_from_json_object(
const std::string user_name, const rapidjson::Value &object);
// with format 'user-name!jobID'
static std::string json_description_string_from_job(Job *job);
static std::string json_description_string_from_job(const Job *job);
static Session *session_from_json_object(const std::string user_name,
const rapidjson::Value &object, std::list<unsigned int> *prec_sessions,
std::list<double> *tt_after_prec_sessions);
......
......@@ -15,6 +15,16 @@ Broker::Broker(rapidjson::Document *user_description_file)
/* Parse description file and call constructor for each user */
if (!user_description_file->ObjectEmpty())
{
if (user_description_file->HasMember("core_limit_per_user"))
{
PPK_ASSERT_ERROR(
(*user_description_file)["core_limit_per_user"].IsInt(),
"Invalid user_description file: field "
"'core_limit_per_user' should be an int.");
core_limit_per_user
= (*user_description_file)["core_limit_per_user"].GetInt();
// otherwise, initialized at std::numeric_limits<int>::max()
}
if (user_description_file->HasMember("dm_window"))
{
const Value &dm_param = (*user_description_file)["dm_window"];
......@@ -171,15 +181,25 @@ void Broker::jobs_to_submit(
while (planned_date_submission == user->next_submission())
{
user_queue.pop_front();
unsigned int nb_core_requested = 0;
list<Job *> user_jobs;
list<const Profile *> user_profiles;
user->jobs_to_submit(date, user_jobs, user_profiles);
for (Job *job : user_jobs)
{
jobs.push_back(job);
dynamic_jobs[job->id] = job;
job->status = WAITING;
nb_core_requested = nb_core_requested + job->nb_requested_resources;
if (nb_core_requested > core_limit_per_user)
{
job->status = KILLED;
}
else
{
jobs.push_back(job);
dynamic_jobs[job->id] = job;
job->status = WAITING;
}
}
profiles.splice(profiles.end(), user_profiles);
......
......@@ -48,6 +48,7 @@ public:
private:
map<string, User *> users;
unsigned int core_limit_per_user = std::numeric_limits<int>::max();
list<User *> user_queue;
map<string, Job *> dynamic_jobs = map<string, Job *>();
map<User *, list<Job *>> users_to_wake
......
......@@ -41,16 +41,6 @@ void DynScheduler::on_simulation_start(
/* Check for optional parameters */
if (!_variant_options->ObjectEmpty())
{
if (_variant_options->HasMember("core_limit_per_user"))
{
PPK_ASSERT_ERROR(
(*_variant_options)["core_limit_per_user"].IsInt(),
"Invalid user_description file: field "
"'core_limit_per_user' should be an int.");
core_limit_per_user
= (*_variant_options)["core_limit_per_user"].GetInt();
// otherwise, initialized at std::numeric_limits<int>::max()
}
if (_variant_options->HasMember("log_user_stats")
&& (*_variant_options)["log_user_stats"].GetBool())
{
......@@ -163,11 +153,9 @@ void DynScheduler::submit_broker_jobs(double date)
dyn_submit(jobs, date);
}
void DynScheduler::dyn_submit(std::list<Job *> jobs, double date)
void DynScheduler::dyn_submit(std::list<const Job *> jobs, double date)
{
std::map<std::string, int> core_per_user;
for (Job *current_job : jobs)
for (const Job *current_job : jobs)
{
std::string fullID = current_job->id;
PPK_ASSERT_ERROR(fullID.find('!') != std::string::npos,
......@@ -176,26 +164,15 @@ void DynScheduler::dyn_submit(std::list<Job *> jobs, double date)
std::string user_name = fullID.substr(0, fullID.find('!'));
fullID.erase(0, fullID.find('!') + 1);
std::string jobID = fullID;
std::string job_description
= Parser::json_description_string_from_job(current_job);
std::string empty_profile_description
= std::string(); // has already been sent
// REFAC: move to broker
core_per_user[user_name]
= core_per_user[user_name] + current_job->nb_requested_resources;
/* Register the job to batsim */
_decision->add_submit_job(user_name, jobID, current_job->profile,
job_description, empty_profile_description, date, true);
if (core_per_user[user_name] > core_limit_per_user)
{
current_job->status = KILLED;
}
else
{
std::string job_description
= Parser::json_description_string_from_job(current_job);
std::string empty_profile_description
= std::string(); // has already been sent
/* Register the job to batsim */
_decision->add_submit_job(user_name, jobID, current_job->profile,
job_description, empty_profile_description, date, true);
}
}
}
......
......@@ -53,7 +53,7 @@ protected:
/**
* Dynamically registers to batsim the job `jobs` to submit.
*/
void dyn_submit(std::list<Job *> jobs, double date);
void dyn_submit(std::list<const Job *> jobs, double date);
/**
* Registers to batsim the profiles that will be used by the broker.
......@@ -67,6 +67,5 @@ protected:
bool next_submission_has_changed = false;
private:
int core_limit_per_user = std::numeric_limits<int>::max();
std::string log_folder = "";
};
\ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment