Skip to content
Snippets Groups Projects
Commit e13754a5 authored by Igor Fontana de Nardin's avatar Igor Fontana de Nardin
Browse files

Modifications for TPDS

parent 659660c3
Branches
No related tags found
No related merge requests found
......@@ -21,14 +21,29 @@ struct Job
IntervalSet alloc;
double maximum_finish_time;
double power_estimation;
double mean;
double std;
};
MessageBuilder * mb = nullptr;
enum TypePowerPrediction
{
BY_FIELD = 0,
GAUSSIAN = 1,
REAL_GAUSSIAN = 2,
};
enum JobOrder
{
FCFS = 0,
SAF = 1
};
MessageBuilder *mb = nullptr;
bool format_binary = true; // whether flatbuffers binary or json format should be used
uint32_t platform_nb_hosts = 0;
std::list<::Job*> job_queue;
std::unordered_map<std::string, ::Job*> running_jobs;
std::list<::Job *> job_queue;
std::unordered_map<std::string, ::Job *> running_jobs;
uint32_t nb_available_hosts = 0;
IntervalSet available_hosts;
......@@ -39,8 +54,13 @@ double platform_nb_available_watts = -1;
double powercap_end_time = -1;
double idle_power_watts = -1; // per node
std::string power_estimation_field;
TypePowerPrediction typeprediction;
JobOrder jobOrder;
double sigma_times = -1;
double mean_running_jobs = 0;
double std_running_jobs = 0;
uint8_t batsim_edc_init(const uint8_t * data, uint32_t size, uint32_t flags)
uint8_t batsim_edc_init(const uint8_t *data, uint32_t size, uint32_t flags)
{
format_binary = ((flags & BATSIM_EDC_FORMAT_BINARY) != 0);
if ((flags & (BATSIM_EDC_FORMAT_BINARY | BATSIM_EDC_FORMAT_JSON)) != flags)
......@@ -52,7 +72,8 @@ uint8_t batsim_edc_init(const uint8_t * data, uint32_t size, uint32_t flags)
mb = new MessageBuilder(!format_binary);
std::string init_string((const char *)data, static_cast<size_t>(size));
try {
try
{
auto init_json = json::parse(init_string);
platform_normal_dynamic_watts = init_json["normal_dynamic_watts"];
platform_powercap_dynamic_watts = init_json["powercap_dynamic_watts"];
......@@ -61,7 +82,28 @@ uint8_t batsim_edc_init(const uint8_t * data, uint32_t size, uint32_t flags)
platform_nb_available_watts = platform_current_powercap_dynamic_watts;
powercap_end_time = init_json["powercap_end_time_seconds"];
power_estimation_field = init_json["job_power_estimation_field"];
} catch (const json::exception & e) {
if (power_estimation_field == "gaussian")
{
typeprediction = TypePowerPrediction::GAUSSIAN;
sigma_times = init_json["sigma_times"];
}
else if (power_estimation_field == "real_gaussian")
{
typeprediction = TypePowerPrediction::REAL_GAUSSIAN;
sigma_times = init_json["sigma_times"];
}
else
typeprediction = TypePowerPrediction::BY_FIELD;
if (init_json["order"] == "fcfs")
jobOrder = JobOrder::FCFS;
else if (init_json["order"] == "saf")
jobOrder = JobOrder::SAF;
else
throw std::runtime_error("Order error " + std::string(init_json["order"]));
}
catch (const json::exception &e)
{
throw std::runtime_error("scheduler called with bad init string: " + std::string(e.what()));
}
......@@ -76,42 +118,72 @@ uint8_t batsim_edc_deinit()
return 0;
}
bool ascending_max_finish_time_job_order(const ::Job* a, const ::Job* b) {
bool verify_power_capping(::Job *j, double power_capping, double mean_total, double std_total)
{
if (typeprediction == TypePowerPrediction::BY_FIELD)
{
if (j->power_estimation <= power_capping)
return true;
else
return false;
}
// fprintf(stderr, "Job mean %f std %f. Total mean %f std %f. Prevision %f\n", j->mean, j->std, mean_total, std_total, (j->mean + mean_total) + (sigma_times * (sqrt(pow(j->std, 2) + std_total))));
double prevision = (j->mean + mean_total) + (sigma_times * (sqrt(pow(j->std, 2) + std_total)));
if (prevision <= platform_current_powercap_dynamic_watts)
return true;
else
return false;
}
bool ascending_max_finish_time_job_order(const ::Job *a, const ::Job *b)
{
return a->maximum_finish_time < b->maximum_finish_time;
}
bool lower_ascending_saf(const ::Job *a, const ::Job *b)
{
return (a->walltime * a->nb_hosts) < (b->walltime * b->nb_hosts);
}
uint8_t batsim_edc_take_decisions(
const uint8_t * what_happened,
const uint8_t *what_happened,
uint32_t what_happened_size,
uint8_t ** decisions,
uint32_t * decisions_size)
uint8_t **decisions,
uint32_t *decisions_size)
{
(void) what_happened_size;
auto * parsed = deserialize_message(*mb, !format_binary, what_happened);
(void)what_happened_size;
auto *parsed = deserialize_message(*mb, !format_binary, what_happened);
mb->clear(parsed->now());
// should only become true once, when the powercap window finishes
if (parsed->now() >= powercap_end_time) {
if (parsed->now() >= powercap_end_time)
{
platform_current_powercap_dynamic_watts = platform_normal_dynamic_watts;
platform_nb_available_watts += (platform_normal_dynamic_watts - platform_powercap_dynamic_watts);
}
bool need_scheduling = false;
auto nb_events = parsed->events()->size();
for (unsigned int i = 0; i < nb_events; ++i) {
for (unsigned int i = 0; i < nb_events; ++i)
{
auto event = (*parsed->events())[i];
switch (event->event_type())
{
case fb::Event_BatsimHelloEvent: {
case fb::Event_BatsimHelloEvent:
{
mb->add_edc_hello("easy-powercap", "0.1.0");
} break;
case fb::Event_SimulationBeginsEvent: {
}
break;
case fb::Event_SimulationBeginsEvent:
{
auto simu_begins = event->event_as_SimulationBeginsEvent();
platform_nb_hosts = simu_begins->computation_host_number();
nb_available_hosts = platform_nb_hosts;
available_hosts = IntervalSet::ClosedInterval(0, platform_nb_hosts - 1);
} break;
case fb::Event_JobSubmittedEvent: {
}
break;
case fb::Event_JobSubmittedEvent:
{
auto parsed_job = event->event_as_JobSubmittedEvent();
::Job job{
parsed_job->job_id()->str(),
......@@ -119,28 +191,54 @@ uint8_t batsim_edc_take_decisions(
parsed_job->job()->walltime(),
IntervalSet::empty_interval_set(),
-1,
-1
};
-1,
-1,
-1};
try {
try
{
auto extra_data = json::parse(parsed_job->job()->extra_data()->str());
job.power_estimation = std::max(0.0, (double)extra_data[power_estimation_field] - idle_power_watts * job.nb_hosts);
} catch (const json::exception & e) {
if (typeprediction == TypePowerPrediction::GAUSSIAN)
{
job.mean = std::max(0.0, (double)extra_data["mean_power_estimation"] - (idle_power_watts * job.nb_hosts));
job.std = (double)extra_data["std_power_estimation"];
}
if (typeprediction == TypePowerPrediction::REAL_GAUSSIAN)
{
job.mean = std::max(0.0, (double)extra_data["real_mean_power_estimation"] - (idle_power_watts * job.nb_hosts));
job.std = (double)extra_data["std_power_real"];
}
if (typeprediction == TypePowerPrediction::GAUSSIAN || typeprediction == TypePowerPrediction::REAL_GAUSSIAN)
{
job.power_estimation = job.mean + job.std;
}
else
{
job.power_estimation = std::max(0.0, (double)extra_data[power_estimation_field] - (idle_power_watts * job.nb_hosts));
}
}
catch (const json::exception &e)
{
throw std::runtime_error("bad extra_data in job submitted: tried to read field " + power_estimation_field);
}
if ( (job.nb_hosts > platform_nb_hosts) // usual EASY predicate
if ((job.nb_hosts > platform_nb_hosts) // usual EASY predicate
|| (job.power_estimation > platform_normal_dynamic_watts) // powercap predicate
)
)
mb->add_reject_job(job.id);
else if (job.walltime <= 0)
mb->add_reject_job(job.id);
else {
else
{
need_scheduling = true;
job_queue.emplace_back(new ::Job(job));
}
} break;
case fb::Event_JobCompletedEvent: {
}
break;
case fb::Event_JobCompletedEvent:
{
need_scheduling = true;
auto job_id = event->event_as_JobCompletedEvent()->job_id()->str();
......@@ -149,28 +247,40 @@ uint8_t batsim_edc_take_decisions(
nb_available_hosts += job->nb_hosts; // usual EASY update
available_hosts += job->alloc;
platform_nb_available_watts += job->power_estimation; // powercap update
mean_running_jobs -= job->mean;
std_running_jobs -= pow(job->std, 2);
delete job;
running_jobs.erase(job_it);
} break;
default: break;
}
break;
default:
break;
}
}
if (need_scheduling) {
::Job* priority_job = nullptr;
if (need_scheduling)
{
::Job *priority_job = nullptr;
uint32_t nb_available_hosts_at_priority_job_start = 0;
double nb_available_watts_at_priority_job_start = 0;
double mean_watts_at_priority_job_start = 0;
double std_watts_at_priority_job_start = 0;
float priority_job_start_time = -1;
// First traversal, done until a job cannot be executed right now and is set as the priority job
// (or until all jobs have been executed)
if (jobOrder == JobOrder::SAF)
job_queue.sort(lower_ascending_saf);
auto job_it = job_queue.begin();
for (; job_it != job_queue.end(); ) {
for (; job_it != job_queue.end();)
{
auto job = *job_it;
if ( (job->nb_hosts <= nb_available_hosts) // usual EASY predicate
&& (job->power_estimation <= platform_nb_available_watts) // powercap predicate
) {
// fprintf(stderr, "Priority order Job %s mean %f std %f\n", job->id.c_str(), job->mean, job->std);
if ((job->nb_hosts <= nb_available_hosts) // usual EASY predicate
&& (verify_power_capping(job, platform_nb_available_watts, mean_running_jobs, std_running_jobs)) // powercap predicate
)
{
running_jobs[job->id] = *job_it;
job->maximum_finish_time = parsed->now() + job->walltime;
job->alloc = available_hosts.left(job->nb_hosts);
......@@ -178,6 +288,8 @@ uint8_t batsim_edc_take_decisions(
available_hosts -= job->alloc;
nb_available_hosts -= job->nb_hosts;
platform_nb_available_watts -= job->power_estimation;
mean_running_jobs += job->mean;
std_running_jobs += pow(job->std, 2);
job_it = job_queue.erase(job_it);
}
......@@ -187,23 +299,36 @@ uint8_t batsim_edc_take_decisions(
++job_it;
// compute when the priority job can start, and the number of available machines at this time
std::vector<::Job*> running_jobs_asc_maximum_finish_time;
std::vector<::Job *> running_jobs_asc_maximum_finish_time;
running_jobs_asc_maximum_finish_time.reserve(running_jobs.size());
for (const auto & it : running_jobs)
for (const auto &it : running_jobs)
running_jobs_asc_maximum_finish_time.push_back(it.second);
std::sort(running_jobs_asc_maximum_finish_time.begin(), running_jobs_asc_maximum_finish_time.end(), ascending_max_finish_time_job_order);
nb_available_hosts_at_priority_job_start = nb_available_hosts;
nb_available_watts_at_priority_job_start = platform_nb_available_watts;
for (const auto & job : running_jobs_asc_maximum_finish_time) {
mean_watts_at_priority_job_start = mean_running_jobs;
std_watts_at_priority_job_start = std_running_jobs;
for (const auto &job : running_jobs_asc_maximum_finish_time)
{
nb_available_hosts_at_priority_job_start += job->nb_hosts;
nb_available_watts_at_priority_job_start += job->power_estimation;
if ( (nb_available_hosts_at_priority_job_start >= priority_job->nb_hosts) // usual EASY predicate
&& (nb_available_watts_at_priority_job_start >= priority_job->power_estimation) // powercap predicate
) {
mean_watts_at_priority_job_start -= job->mean;
std_watts_at_priority_job_start -= pow(job->std, 2);
if ((nb_available_hosts_at_priority_job_start >= priority_job->nb_hosts) // usual EASY predicate
&& (verify_power_capping(priority_job,
nb_available_watts_at_priority_job_start,
mean_watts_at_priority_job_start,
std_watts_at_priority_job_start)) // powercap predicate
)
{
nb_available_hosts_at_priority_job_start -= priority_job->nb_hosts;
nb_available_watts_at_priority_job_start -= priority_job->power_estimation;
priority_job_start_time = job->maximum_finish_time;
mean_watts_at_priority_job_start += priority_job->mean;
std_watts_at_priority_job_start += pow(priority_job->std, 2);
break;
}
}
......@@ -213,19 +338,25 @@ uint8_t batsim_edc_take_decisions(
}
// Continue traversal to backfill jobs
for (; job_it != job_queue.end(); ) {
for (; job_it != job_queue.end();)
{
auto job = *job_it;
// should the job be backfilled?
float job_finish_time = parsed->now() + job->walltime;
if ( (job->nb_hosts <= nb_available_hosts) // enough hosts now?
&& (job->power_estimation <= platform_nb_available_watts) // enough power now?
&& (
( (job->nb_hosts <= nb_available_hosts_at_priority_job_start) // cannot hinder priority job at all regarding hosts
&& (job->power_estimation <= nb_available_watts_at_priority_job_start) // cannot hinder priority job at all regarding watts
) // previous block is true if the backfilled job cannot hinder the priority job regardless of the backfilled job duration
|| (job_finish_time <= priority_job_start_time) // the backfilled job finishes before the priority job's expected start time
)
) {
if ((job->nb_hosts <= nb_available_hosts) // enough hosts now?
&& (verify_power_capping(job,
platform_nb_available_watts,
mean_running_jobs,
std_running_jobs)) // enough power now?
&& (((job->nb_hosts <= nb_available_hosts_at_priority_job_start) // cannot hinder priority job at all regarding hosts
&& (verify_power_capping(job,
nb_available_watts_at_priority_job_start,
mean_watts_at_priority_job_start,
std_watts_at_priority_job_start)) // cannot hinder priority job at all regarding watts
) // previous block is true if the backfilled job cannot hinder the priority job regardless of the backfilled job duration
|| (job_finish_time <= priority_job_start_time) // the backfilled job finishes before the priority job's expected start time
))
{
running_jobs[job->id] = *job_it;
job->maximum_finish_time = job_finish_time;
job->alloc = available_hosts.left(job->nb_hosts);
......@@ -233,10 +364,15 @@ uint8_t batsim_edc_take_decisions(
available_hosts -= job->alloc;
nb_available_hosts -= job->nb_hosts;
platform_nb_available_watts -= job->power_estimation;
mean_running_jobs += job->mean;
std_running_jobs += pow(job->std, 2);
if (job_finish_time > priority_job_start_time) {
if (job_finish_time > priority_job_start_time)
{
nb_available_hosts_at_priority_job_start -= job->nb_hosts;
nb_available_watts_at_priority_job_start -= job->power_estimation;
mean_watts_at_priority_job_start += priority_job->mean;
std_watts_at_priority_job_start += pow(priority_job->std, 2);
}
job_it = job_queue.erase(job_it);
......
This diff is collapsed.
......@@ -21,3 +21,8 @@ easypower = shared_library('easypower', common + ['easypower.cpp'],
dependencies: deps + [boost_dep, intervalset_dep, nlohmann_json_dep],
install: true,
)
knapsack_greedy = shared_library('knapsack_greedy', common + ['knapsack_greedy.cpp'],
dependencies: deps + [boost_dep, intervalset_dep, nlohmann_json_dep],
install: true,
)
\ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment