diff --git a/easypower.cpp b/easypower.cpp index 093336fee1df0d54b349317c8445b0206c95a2d9..1d5a2501f6287b3b9ef4907b1ee16474a6ca2241 100644 --- a/easypower.cpp +++ b/easypower.cpp @@ -21,14 +21,29 @@ struct Job IntervalSet alloc; double maximum_finish_time; double power_estimation; + double mean; + double std; }; -MessageBuilder * mb = nullptr; +enum TypePowerPrediction +{ + BY_FIELD = 0, + GAUSSIAN = 1, + REAL_GAUSSIAN = 2, +}; + +enum JobOrder +{ + FCFS = 0, + SAF = 1 +}; + +MessageBuilder *mb = nullptr; bool format_binary = true; // whether flatbuffers binary or json format should be used uint32_t platform_nb_hosts = 0; -std::list<::Job*> job_queue; -std::unordered_map<std::string, ::Job*> running_jobs; +std::list<::Job *> job_queue; +std::unordered_map<std::string, ::Job *> running_jobs; uint32_t nb_available_hosts = 0; IntervalSet available_hosts; @@ -39,8 +54,13 @@ double platform_nb_available_watts = -1; double powercap_end_time = -1; double idle_power_watts = -1; // per node std::string power_estimation_field; +TypePowerPrediction typeprediction; +JobOrder jobOrder; +double sigma_times = -1; +double mean_running_jobs = 0; +double std_running_jobs = 0; -uint8_t batsim_edc_init(const uint8_t * data, uint32_t size, uint32_t flags) +uint8_t batsim_edc_init(const uint8_t *data, uint32_t size, uint32_t flags) { format_binary = ((flags & BATSIM_EDC_FORMAT_BINARY) != 0); if ((flags & (BATSIM_EDC_FORMAT_BINARY | BATSIM_EDC_FORMAT_JSON)) != flags) @@ -52,7 +72,8 @@ uint8_t batsim_edc_init(const uint8_t * data, uint32_t size, uint32_t flags) mb = new MessageBuilder(!format_binary); std::string init_string((const char *)data, static_cast<size_t>(size)); - try { + try + { auto init_json = json::parse(init_string); platform_normal_dynamic_watts = init_json["normal_dynamic_watts"]; platform_powercap_dynamic_watts = init_json["powercap_dynamic_watts"]; @@ -61,7 +82,28 @@ uint8_t batsim_edc_init(const uint8_t * data, uint32_t size, uint32_t flags) platform_nb_available_watts = platform_current_powercap_dynamic_watts; powercap_end_time = init_json["powercap_end_time_seconds"]; power_estimation_field = init_json["job_power_estimation_field"]; - } catch (const json::exception & e) { + if (power_estimation_field == "gaussian") + { + typeprediction = TypePowerPrediction::GAUSSIAN; + sigma_times = init_json["sigma_times"]; + } + else if (power_estimation_field == "real_gaussian") + { + typeprediction = TypePowerPrediction::REAL_GAUSSIAN; + sigma_times = init_json["sigma_times"]; + } + else + typeprediction = TypePowerPrediction::BY_FIELD; + + if (init_json["order"] == "fcfs") + jobOrder = JobOrder::FCFS; + else if (init_json["order"] == "saf") + jobOrder = JobOrder::SAF; + else + throw std::runtime_error("Order error " + std::string(init_json["order"])); + } + catch (const json::exception &e) + { throw std::runtime_error("scheduler called with bad init string: " + std::string(e.what())); } @@ -76,42 +118,72 @@ uint8_t batsim_edc_deinit() return 0; } -bool ascending_max_finish_time_job_order(const ::Job* a, const ::Job* b) { +bool verify_power_capping(::Job *j, double power_capping, double mean_total, double std_total) +{ + if (typeprediction == TypePowerPrediction::BY_FIELD) + { + if (j->power_estimation <= power_capping) + return true; + else + return false; + } + // fprintf(stderr, "Job mean %f std %f. Total mean %f std %f. Prevision %f\n", j->mean, j->std, mean_total, std_total, (j->mean + mean_total) + (sigma_times * (sqrt(pow(j->std, 2) + std_total)))); + double prevision = (j->mean + mean_total) + (sigma_times * (sqrt(pow(j->std, 2) + std_total))); + if (prevision <= platform_current_powercap_dynamic_watts) + return true; + else + return false; +} + +bool ascending_max_finish_time_job_order(const ::Job *a, const ::Job *b) +{ return a->maximum_finish_time < b->maximum_finish_time; } +bool lower_ascending_saf(const ::Job *a, const ::Job *b) +{ + return (a->walltime * a->nb_hosts) < (b->walltime * b->nb_hosts); +} + uint8_t batsim_edc_take_decisions( - const uint8_t * what_happened, + const uint8_t *what_happened, uint32_t what_happened_size, - uint8_t ** decisions, - uint32_t * decisions_size) + uint8_t **decisions, + uint32_t *decisions_size) { - (void) what_happened_size; - auto * parsed = deserialize_message(*mb, !format_binary, what_happened); + (void)what_happened_size; + auto *parsed = deserialize_message(*mb, !format_binary, what_happened); mb->clear(parsed->now()); // should only become true once, when the powercap window finishes - if (parsed->now() >= powercap_end_time) { + if (parsed->now() >= powercap_end_time) + { platform_current_powercap_dynamic_watts = platform_normal_dynamic_watts; platform_nb_available_watts += (platform_normal_dynamic_watts - platform_powercap_dynamic_watts); } bool need_scheduling = false; auto nb_events = parsed->events()->size(); - for (unsigned int i = 0; i < nb_events; ++i) { + for (unsigned int i = 0; i < nb_events; ++i) + { auto event = (*parsed->events())[i]; switch (event->event_type()) { - case fb::Event_BatsimHelloEvent: { + case fb::Event_BatsimHelloEvent: + { mb->add_edc_hello("easy-powercap", "0.1.0"); - } break; - case fb::Event_SimulationBeginsEvent: { + } + break; + case fb::Event_SimulationBeginsEvent: + { auto simu_begins = event->event_as_SimulationBeginsEvent(); platform_nb_hosts = simu_begins->computation_host_number(); nb_available_hosts = platform_nb_hosts; available_hosts = IntervalSet::ClosedInterval(0, platform_nb_hosts - 1); - } break; - case fb::Event_JobSubmittedEvent: { + } + break; + case fb::Event_JobSubmittedEvent: + { auto parsed_job = event->event_as_JobSubmittedEvent(); ::Job job{ parsed_job->job_id()->str(), @@ -119,28 +191,54 @@ uint8_t batsim_edc_take_decisions( parsed_job->job()->walltime(), IntervalSet::empty_interval_set(), -1, - -1 - }; + -1, + -1, + -1}; - try { + try + { auto extra_data = json::parse(parsed_job->job()->extra_data()->str()); - job.power_estimation = std::max(0.0, (double)extra_data[power_estimation_field] - idle_power_watts * job.nb_hosts); - } catch (const json::exception & e) { + + if (typeprediction == TypePowerPrediction::GAUSSIAN) + { + job.mean = std::max(0.0, (double)extra_data["mean_power_estimation"] - (idle_power_watts * job.nb_hosts)); + job.std = (double)extra_data["std_power_estimation"]; + } + if (typeprediction == TypePowerPrediction::REAL_GAUSSIAN) + { + job.mean = std::max(0.0, (double)extra_data["real_mean_power_estimation"] - (idle_power_watts * job.nb_hosts)); + job.std = (double)extra_data["std_power_real"]; + } + + if (typeprediction == TypePowerPrediction::GAUSSIAN || typeprediction == TypePowerPrediction::REAL_GAUSSIAN) + { + job.power_estimation = job.mean + job.std; + } + else + { + job.power_estimation = std::max(0.0, (double)extra_data[power_estimation_field] - (idle_power_watts * job.nb_hosts)); + } + } + catch (const json::exception &e) + { throw std::runtime_error("bad extra_data in job submitted: tried to read field " + power_estimation_field); } - if ( (job.nb_hosts > platform_nb_hosts) // usual EASY predicate + if ((job.nb_hosts > platform_nb_hosts) // usual EASY predicate || (job.power_estimation > platform_normal_dynamic_watts) // powercap predicate - ) + ) mb->add_reject_job(job.id); else if (job.walltime <= 0) mb->add_reject_job(job.id); - else { + else + { need_scheduling = true; job_queue.emplace_back(new ::Job(job)); } - } break; - case fb::Event_JobCompletedEvent: { + } + break; + case fb::Event_JobCompletedEvent: + { need_scheduling = true; auto job_id = event->event_as_JobCompletedEvent()->job_id()->str(); @@ -149,28 +247,40 @@ uint8_t batsim_edc_take_decisions( nb_available_hosts += job->nb_hosts; // usual EASY update available_hosts += job->alloc; platform_nb_available_watts += job->power_estimation; // powercap update + mean_running_jobs -= job->mean; + std_running_jobs -= pow(job->std, 2); delete job; running_jobs.erase(job_it); - } break; - default: break; + } + break; + default: + break; } } - if (need_scheduling) { - ::Job* priority_job = nullptr; + if (need_scheduling) + { + ::Job *priority_job = nullptr; uint32_t nb_available_hosts_at_priority_job_start = 0; double nb_available_watts_at_priority_job_start = 0; + double mean_watts_at_priority_job_start = 0; + double std_watts_at_priority_job_start = 0; float priority_job_start_time = -1; // First traversal, done until a job cannot be executed right now and is set as the priority job // (or until all jobs have been executed) + if (jobOrder == JobOrder::SAF) + job_queue.sort(lower_ascending_saf); auto job_it = job_queue.begin(); - for (; job_it != job_queue.end(); ) { + for (; job_it != job_queue.end();) + { auto job = *job_it; - if ( (job->nb_hosts <= nb_available_hosts) // usual EASY predicate - && (job->power_estimation <= platform_nb_available_watts) // powercap predicate - ) { + // fprintf(stderr, "Priority order Job %s mean %f std %f\n", job->id.c_str(), job->mean, job->std); + if ((job->nb_hosts <= nb_available_hosts) // usual EASY predicate + && (verify_power_capping(job, platform_nb_available_watts, mean_running_jobs, std_running_jobs)) // powercap predicate + ) + { running_jobs[job->id] = *job_it; job->maximum_finish_time = parsed->now() + job->walltime; job->alloc = available_hosts.left(job->nb_hosts); @@ -178,6 +288,8 @@ uint8_t batsim_edc_take_decisions( available_hosts -= job->alloc; nb_available_hosts -= job->nb_hosts; platform_nb_available_watts -= job->power_estimation; + mean_running_jobs += job->mean; + std_running_jobs += pow(job->std, 2); job_it = job_queue.erase(job_it); } @@ -187,23 +299,36 @@ uint8_t batsim_edc_take_decisions( ++job_it; // compute when the priority job can start, and the number of available machines at this time - std::vector<::Job*> running_jobs_asc_maximum_finish_time; + std::vector<::Job *> running_jobs_asc_maximum_finish_time; running_jobs_asc_maximum_finish_time.reserve(running_jobs.size()); - for (const auto & it : running_jobs) + for (const auto &it : running_jobs) running_jobs_asc_maximum_finish_time.push_back(it.second); std::sort(running_jobs_asc_maximum_finish_time.begin(), running_jobs_asc_maximum_finish_time.end(), ascending_max_finish_time_job_order); nb_available_hosts_at_priority_job_start = nb_available_hosts; nb_available_watts_at_priority_job_start = platform_nb_available_watts; - for (const auto & job : running_jobs_asc_maximum_finish_time) { + mean_watts_at_priority_job_start = mean_running_jobs; + std_watts_at_priority_job_start = std_running_jobs; + + for (const auto &job : running_jobs_asc_maximum_finish_time) + { nb_available_hosts_at_priority_job_start += job->nb_hosts; nb_available_watts_at_priority_job_start += job->power_estimation; - if ( (nb_available_hosts_at_priority_job_start >= priority_job->nb_hosts) // usual EASY predicate - && (nb_available_watts_at_priority_job_start >= priority_job->power_estimation) // powercap predicate - ) { + mean_watts_at_priority_job_start -= job->mean; + std_watts_at_priority_job_start -= pow(job->std, 2); + + if ((nb_available_hosts_at_priority_job_start >= priority_job->nb_hosts) // usual EASY predicate + && (verify_power_capping(priority_job, + nb_available_watts_at_priority_job_start, + mean_watts_at_priority_job_start, + std_watts_at_priority_job_start)) // powercap predicate + ) + { nb_available_hosts_at_priority_job_start -= priority_job->nb_hosts; nb_available_watts_at_priority_job_start -= priority_job->power_estimation; priority_job_start_time = job->maximum_finish_time; + mean_watts_at_priority_job_start += priority_job->mean; + std_watts_at_priority_job_start += pow(priority_job->std, 2); break; } } @@ -213,19 +338,25 @@ uint8_t batsim_edc_take_decisions( } // Continue traversal to backfill jobs - for (; job_it != job_queue.end(); ) { + for (; job_it != job_queue.end();) + { auto job = *job_it; // should the job be backfilled? float job_finish_time = parsed->now() + job->walltime; - if ( (job->nb_hosts <= nb_available_hosts) // enough hosts now? - && (job->power_estimation <= platform_nb_available_watts) // enough power now? - && ( - ( (job->nb_hosts <= nb_available_hosts_at_priority_job_start) // cannot hinder priority job at all regarding hosts - && (job->power_estimation <= nb_available_watts_at_priority_job_start) // cannot hinder priority job at all regarding watts - ) // previous block is true if the backfilled job cannot hinder the priority job regardless of the backfilled job duration - || (job_finish_time <= priority_job_start_time) // the backfilled job finishes before the priority job's expected start time - ) - ) { + if ((job->nb_hosts <= nb_available_hosts) // enough hosts now? + && (verify_power_capping(job, + platform_nb_available_watts, + mean_running_jobs, + std_running_jobs)) // enough power now? + && (((job->nb_hosts <= nb_available_hosts_at_priority_job_start) // cannot hinder priority job at all regarding hosts + && (verify_power_capping(job, + nb_available_watts_at_priority_job_start, + mean_watts_at_priority_job_start, + std_watts_at_priority_job_start)) // cannot hinder priority job at all regarding watts + ) // previous block is true if the backfilled job cannot hinder the priority job regardless of the backfilled job duration + || (job_finish_time <= priority_job_start_time) // the backfilled job finishes before the priority job's expected start time + )) + { running_jobs[job->id] = *job_it; job->maximum_finish_time = job_finish_time; job->alloc = available_hosts.left(job->nb_hosts); @@ -233,10 +364,15 @@ uint8_t batsim_edc_take_decisions( available_hosts -= job->alloc; nb_available_hosts -= job->nb_hosts; platform_nb_available_watts -= job->power_estimation; + mean_running_jobs += job->mean; + std_running_jobs += pow(job->std, 2); - if (job_finish_time > priority_job_start_time) { + if (job_finish_time > priority_job_start_time) + { nb_available_hosts_at_priority_job_start -= job->nb_hosts; nb_available_watts_at_priority_job_start -= job->power_estimation; + mean_watts_at_priority_job_start += priority_job->mean; + std_watts_at_priority_job_start += pow(priority_job->std, 2); } job_it = job_queue.erase(job_it); diff --git a/knapsack_greedy.cpp b/knapsack_greedy.cpp new file mode 100644 index 0000000000000000000000000000000000000000..bc0f5ff7828ddc86182ba40f437cb01584f978a8 --- /dev/null +++ b/knapsack_greedy.cpp @@ -0,0 +1,499 @@ +#include <cstdint> +#include <list> +#include <string> +#include <unordered_map> + +#include <batprotocol.hpp> +#include <intervalset.hpp> +#include <nlohmann/json.hpp> +#include <random> +#include "batsim_edc.h" +#include <iostream> + +using namespace batprotocol; +using json = nlohmann::json; + +struct Job +{ + std::string id; + uint32_t nb_hosts; + double walltime; + double arrival; + IntervalSet alloc; + double maximum_finish_time; + double power_estimation; + double mean; + double std; +}; + +enum TypePowerPrediction +{ + BY_FIELD = 0, + GAUSSIAN = 1, + REAL_GAUSSIAN = 2, +}; + +enum TypeKnapsackValue +{ + WAITING_TIME = 0, + WAITING_TIME_RATIO = 1, +}; + +// Class to represent an item +class Item +{ +public: + double weight; + double value; + int idx; + ::Job *job; +}; + +// Comparator to sort items based on value/weight ratio +class sortByRatio +{ +public: + bool operator()(const Item &a, const Item &b) const + { + return static_cast<double>(a.value) / a.weight > static_cast<double>(b.value) / b.weight; + } +}; + +MessageBuilder *mb = nullptr; +bool format_binary = true; // whether flatbuffers binary or json format should be used + +uint32_t platform_nb_hosts = 0; +std::list<::Job *> job_queue; +std::unordered_map<std::string, ::Job *> running_jobs; +uint32_t nb_available_hosts = 0; +IntervalSet available_hosts; + +double platform_normal_dynamic_watts = -1; +double platform_powercap_dynamic_watts = -1; +double platform_current_powercap_dynamic_watts = -1; +double platform_nb_available_watts = -1; +double powercap_end_time = -1; +double idle_power_watts = -1; // per node +std::string power_estimation_field; +TypePowerPrediction typeprediction; +TypeKnapsackValue type_knapsack_value; +double sigma_times = -1; +double mean_running_jobs = 0; +double std_running_jobs = 0; + +uint8_t batsim_edc_init(const uint8_t *data, uint32_t size, uint32_t flags) +{ + format_binary = ((flags & BATSIM_EDC_FORMAT_BINARY) != 0); + if ((flags & (BATSIM_EDC_FORMAT_BINARY | BATSIM_EDC_FORMAT_JSON)) != flags) + { + printf("Unknown flags used, cannot initialize myself.\n"); + return 1; + } + + mb = new MessageBuilder(!format_binary); + + std::string init_string((const char *)data, static_cast<size_t>(size)); + try + { + auto init_json = json::parse(init_string); + platform_normal_dynamic_watts = init_json["normal_dynamic_watts"]; + platform_powercap_dynamic_watts = init_json["powercap_dynamic_watts"]; + idle_power_watts = init_json["idle_watts"]; + platform_current_powercap_dynamic_watts = platform_powercap_dynamic_watts; // simulation starts in the powercap constrained window + platform_nb_available_watts = platform_current_powercap_dynamic_watts; + powercap_end_time = init_json["powercap_end_time_seconds"]; + power_estimation_field = init_json["job_power_estimation_field"]; + + if (init_json["type_knapsack"] == "waiting_time") + { + type_knapsack_value = TypeKnapsackValue::WAITING_TIME; + } + else if (init_json["type_knapsack"] == "waiting_time_ratio") + { + type_knapsack_value = TypeKnapsackValue::WAITING_TIME_RATIO; + } + + if (power_estimation_field == "gaussian") + { + typeprediction = TypePowerPrediction::GAUSSIAN; + sigma_times = init_json["sigma_times"]; + } + else if (power_estimation_field == "real_gaussian") + { + typeprediction = TypePowerPrediction::REAL_GAUSSIAN; + sigma_times = init_json["sigma_times"]; + } + else + typeprediction = TypePowerPrediction::BY_FIELD; + } + catch (const json::exception &e) + { + throw std::runtime_error("scheduler called with bad init string: " + std::string(e.what())); + } + + return 0; +} + +uint8_t batsim_edc_deinit() +{ + delete mb; + mb = nullptr; + + return 0; +} + +bool verify_power_capping(::Job *j, double power_capping, double mean_total, double std_total) +{ + if (typeprediction == TypePowerPrediction::BY_FIELD) + { + if (j->power_estimation <= power_capping) + return true; + else + return false; + } + // fprintf(stderr, "Job mean %f std %f. Total mean %f std %f. Prevision %f\n", j->mean, j->std, mean_total, std_total, (j->mean + mean_total) + (sigma_times * (sqrt(pow(j->std, 2) + std_total)))); + double prevision = (j->mean + mean_total) + (sigma_times * (sqrt(pow(j->std, 2) + std_total))); + if (prevision <= platform_current_powercap_dynamic_watts) + return true; + else + return false; +} + +// Function to solve the 0/1 knapsack problem using greedy algo +void knapsack_greedy(Item arr[], int size, std::vector<int> &list_jobs, double actual_mean, double actual_std) +{ + // Sort the items based on the value/weight ratio + std::sort(arr, arr + size, sortByRatio()); + double capacity = platform_nb_available_watts; + double profit = 0; + double mean = actual_mean; + double std = actual_std; + + for (int i = 0; i < size; i++) + { + if (!verify_power_capping(arr[i].job, capacity, mean, std)) + { + break; + // continue; + } + fprintf(stderr, "Job %s jobs mean %f jobs std %f jobs value %f actual_mean %f actual std %f platform_current_powercap_dynamic_watts %f\n", + arr[i].job->id.c_str(), + arr[i].job->mean, + arr[i].job->std, + arr[i].value, + mean, + std, + platform_current_powercap_dynamic_watts); + capacity -= arr[i].weight; + profit += arr[i].value; + mean += arr[i].job->mean; + std += pow(arr[i].job->std, 2); + list_jobs.push_back(arr[i].idx); + } + fprintf(stderr, "Maximum profit is %f\n", profit); +} + +bool ascending_max_finish_time_job_order(const ::Job *a, const ::Job *b) +{ + return a->maximum_finish_time < b->maximum_finish_time; +} + +uint8_t batsim_edc_take_decisions( + const uint8_t *what_happened, + uint32_t what_happened_size, + uint8_t **decisions, + uint32_t *decisions_size) +{ + (void)what_happened_size; + auto *parsed = deserialize_message(*mb, !format_binary, what_happened); + mb->clear(parsed->now()); + + // should only become true once, when the powercap window finishes + if (parsed->now() >= powercap_end_time) + { + platform_current_powercap_dynamic_watts = platform_normal_dynamic_watts; + platform_nb_available_watts += (platform_normal_dynamic_watts - platform_powercap_dynamic_watts); + } + + bool need_scheduling = false; + auto nb_events = parsed->events()->size(); + for (unsigned int i = 0; i < nb_events; ++i) + { + auto event = (*parsed->events())[i]; + switch (event->event_type()) + { + case fb::Event_BatsimHelloEvent: + { + mb->add_edc_hello("easy-powercap", "0.1.0"); + } + break; + case fb::Event_SimulationBeginsEvent: + { + auto simu_begins = event->event_as_SimulationBeginsEvent(); + platform_nb_hosts = simu_begins->computation_host_number(); + nb_available_hosts = platform_nb_hosts; + available_hosts = IntervalSet::ClosedInterval(0, platform_nb_hosts - 1); + } + break; + case fb::Event_JobSubmittedEvent: + { + auto parsed_job = event->event_as_JobSubmittedEvent(); + ::Job job{ + parsed_job->job_id()->str(), + parsed_job->job()->resource_request(), + parsed_job->job()->walltime(), + parsed_job->submission_time(), + IntervalSet::empty_interval_set(), + -1, + -1, + -1, + -1}; + + try + { + auto extra_data = json::parse(parsed_job->job()->extra_data()->str()); + if (typeprediction == TypePowerPrediction::GAUSSIAN || typeprediction == TypePowerPrediction::REAL_GAUSSIAN) + { + job.power_estimation = 0; + } + else + { + job.power_estimation = std::max(0.0, (double)extra_data[power_estimation_field] - (idle_power_watts * job.nb_hosts)); + } + + if (typeprediction == TypePowerPrediction::GAUSSIAN) + { + job.mean = std::max(0.0, (double)extra_data["mean_power_estimation"] - (idle_power_watts * job.nb_hosts)); + job.std = (double)extra_data["std_power_estimation"]; + } + if (typeprediction == TypePowerPrediction::REAL_GAUSSIAN) + { + job.mean = std::max(0.0, (double)extra_data["real_mean_power_estimation"] - (idle_power_watts * job.nb_hosts)); + job.std = (double)extra_data["std_power_real"]; + } + } + catch (const json::exception &e) + { + throw std::runtime_error("bad extra_data in job submitted: tried to read field " + power_estimation_field); + } + + if ((job.nb_hosts > platform_nb_hosts) // usual EASY predicate + || (job.power_estimation > platform_normal_dynamic_watts) // powercap predicate + ) + mb->add_reject_job(job.id); + else if (job.walltime <= 0) + mb->add_reject_job(job.id); + else + { + need_scheduling = true; + job_queue.emplace_back(new ::Job(job)); + } + } + break; + case fb::Event_JobCompletedEvent: + { + need_scheduling = true; + + auto job_id = event->event_as_JobCompletedEvent()->job_id()->str(); + auto job_it = running_jobs.find(job_id); + auto job = job_it->second; + nb_available_hosts += job->nb_hosts; // usual EASY update + available_hosts += job->alloc; + platform_nb_available_watts += job->power_estimation; // powercap update + mean_running_jobs -= job->mean; + std_running_jobs -= pow(job->std, 2); + + delete job; + running_jobs.erase(job_it); + } + break; + default: + break; + } + } + + if (need_scheduling) + { + ::Job *priority_job = nullptr; + uint32_t nb_available_hosts_at_priority_job_start = 0; + double nb_available_watts_at_priority_job_start = 0; + double mean_watts_at_priority_job_start = 0; + double std_watts_at_priority_job_start = 0; + float priority_job_start_time = -1; + if (parsed->now() < powercap_end_time) + { + uint32_t hostsAvailableNow = available_hosts.size(); + std::vector<::Job *> jobs_to_choose; + for (const auto &it : job_queue) + { + if (it->nb_hosts > hostsAvailableNow) + { + continue; + } + jobs_to_choose.push_back(it); + } + if (!jobs_to_choose.empty()) + { + int nb = jobs_to_choose.size(); + Item jobs_array[nb]; + int index = 0; + for (auto job_it : jobs_to_choose) + { + if (typeprediction == TypePowerPrediction::GAUSSIAN || typeprediction == TypePowerPrediction::REAL_GAUSSIAN) + jobs_array[index].weight = job_it->mean + (sigma_times * job_it->std); + else + jobs_array[index].weight = job_it->power_estimation; + jobs_array[index].job = job_it; + if (type_knapsack_value == TypeKnapsackValue::WAITING_TIME_RATIO) + jobs_array[index].value = ((parsed->now() - job_it->arrival) + job_it->walltime) / job_it->walltime; + else if (type_knapsack_value == TypeKnapsackValue::WAITING_TIME) + jobs_array[index].value = parsed->now() - job_it->arrival; + jobs_array[index].idx = index; + index++; + } + std::vector<int> list_jobs; + knapsack_greedy(jobs_array, nb, list_jobs, mean_running_jobs, std_running_jobs); + // Finish the placement + for (auto job_index : list_jobs) + { + auto job = jobs_to_choose[job_index]; + if (job->nb_hosts > available_hosts.size()) + { + fprintf(stderr, "Ignore job %s because we do not have enough machines!\n", job->id.c_str()); + continue; + } + // fprintf(stderr, "(time %f) Execute job id %s power %f resources %d\n", parsed->now(), job->id.c_str(), job->power_estimation, job->nb_hosts); + running_jobs[job->id] = job; + job->maximum_finish_time = parsed->now() + job->walltime; + job->alloc = available_hosts.left(job->nb_hosts); + mb->add_execute_job(job->id, job->alloc.to_string_hyphen()); + available_hosts -= job->alloc; + nb_available_hosts -= job->nb_hosts; + platform_nb_available_watts -= job->power_estimation; + mean_running_jobs += job->mean; + std_running_jobs += pow(job->std, 2); + std::string job_id = job->id; + job_queue.remove_if([job_id](auto &element) + { return element->id == job_id; }); + } + } + } + else + { + // First traversal, done until a job cannot be executed right now and is set as the priority job + // (or until all jobs have been executed) + auto job_it = job_queue.begin(); + for (; job_it != job_queue.end();) + { + auto job = *job_it; + if ((job->nb_hosts <= nb_available_hosts) // usual EASY predicate + && (verify_power_capping(job, platform_nb_available_watts, mean_running_jobs, std_running_jobs)) // powercap predicate + ) + { + running_jobs[job->id] = *job_it; + job->maximum_finish_time = parsed->now() + job->walltime; + job->alloc = available_hosts.left(job->nb_hosts); + mb->add_execute_job(job->id, job->alloc.to_string_hyphen()); + available_hosts -= job->alloc; + nb_available_hosts -= job->nb_hosts; + platform_nb_available_watts -= job->power_estimation; + mean_running_jobs += job->mean; + std_running_jobs += pow(job->std, 2); + + job_it = job_queue.erase(job_it); + } + else + { + priority_job = *job_it; + ++job_it; + + // compute when the priority job can start, and the number of available machines at this time + std::vector<::Job *> running_jobs_asc_maximum_finish_time; + running_jobs_asc_maximum_finish_time.reserve(running_jobs.size()); + for (const auto &it : running_jobs) + running_jobs_asc_maximum_finish_time.push_back(it.second); + std::sort(running_jobs_asc_maximum_finish_time.begin(), running_jobs_asc_maximum_finish_time.end(), ascending_max_finish_time_job_order); + + nb_available_hosts_at_priority_job_start = nb_available_hosts; + nb_available_watts_at_priority_job_start = platform_nb_available_watts; + mean_watts_at_priority_job_start = mean_running_jobs; + std_watts_at_priority_job_start = std_running_jobs; + + for (const auto &job : running_jobs_asc_maximum_finish_time) + { + nb_available_hosts_at_priority_job_start += job->nb_hosts; + nb_available_watts_at_priority_job_start += job->power_estimation; + mean_watts_at_priority_job_start -= job->mean; + std_watts_at_priority_job_start -= pow(job->std, 2); + + if ((nb_available_hosts_at_priority_job_start >= priority_job->nb_hosts) // usual EASY predicate + && (verify_power_capping(priority_job, + nb_available_watts_at_priority_job_start, + mean_watts_at_priority_job_start, + std_watts_at_priority_job_start)) // powercap predicate + ) + { + nb_available_hosts_at_priority_job_start -= priority_job->nb_hosts; + nb_available_watts_at_priority_job_start -= priority_job->power_estimation; + priority_job_start_time = job->maximum_finish_time; + mean_watts_at_priority_job_start += priority_job->mean; + std_watts_at_priority_job_start += pow(priority_job->std, 2); + + break; + } + } + + break; + } + } + // Continue traversal to backfill jobs + for (; job_it != job_queue.end();) + { + auto job = *job_it; + // should the job be backfilled? + float job_finish_time = parsed->now() + job->walltime; + if ((job->nb_hosts <= nb_available_hosts) // enough hosts now? + && (verify_power_capping(job, + platform_nb_available_watts, + mean_running_jobs, + std_running_jobs)) // enough power now? + && (((job->nb_hosts <= nb_available_hosts_at_priority_job_start) // cannot hinder priority job at all regarding hosts + && (verify_power_capping(job, + nb_available_watts_at_priority_job_start, + mean_watts_at_priority_job_start, + std_watts_at_priority_job_start)) // cannot hinder priority job at all regarding watts + ) // previous block is true if the backfilled job cannot hinder the priority job regardless of the backfilled job duration + || (job_finish_time <= priority_job_start_time) // the backfilled job finishes before the priority job's expected start time + )) + { + running_jobs[job->id] = *job_it; + job->maximum_finish_time = job_finish_time; + job->alloc = available_hosts.left(job->nb_hosts); + mb->add_execute_job(job->id, job->alloc.to_string_hyphen()); + available_hosts -= job->alloc; + nb_available_hosts -= job->nb_hosts; + platform_nb_available_watts -= job->power_estimation; + mean_running_jobs += job->mean; + std_running_jobs += pow(job->std, 2); + + if (job_finish_time > priority_job_start_time) + { + nb_available_hosts_at_priority_job_start -= job->nb_hosts; + nb_available_watts_at_priority_job_start -= job->power_estimation; + mean_watts_at_priority_job_start += priority_job->mean; + std_watts_at_priority_job_start += pow(priority_job->std, 2); + } + + job_it = job_queue.erase(job_it); + } + else if (nb_available_hosts == 0) + break; + else + ++job_it; + } + } + } + mb->finish_message(parsed->now()); + serialize_message(*mb, !format_binary, const_cast<const uint8_t **>(decisions), decisions_size); + return 0; +} diff --git a/meson.build b/meson.build index 33dca2ad3bb96daff39894f4428b457137e6f559..1907afe059e88f76bb5a8f9d7655fc4ae6355dcf 100644 --- a/meson.build +++ b/meson.build @@ -21,3 +21,8 @@ easypower = shared_library('easypower', common + ['easypower.cpp'], dependencies: deps + [boost_dep, intervalset_dep, nlohmann_json_dep], install: true, ) + +knapsack_greedy = shared_library('knapsack_greedy', common + ['knapsack_greedy.cpp'], + dependencies: deps + [boost_dep, intervalset_dep, nlohmann_json_dep], + install: true, +) \ No newline at end of file