diff --git a/batsim_edc.h b/batsim_edc.h new file mode 100644 index 0000000000000000000000000000000000000000..f63f4b2fcd219b0735a79a2d88e6b99330b6cf42 --- /dev/null +++ b/batsim_edc.h @@ -0,0 +1,52 @@ +// This is free and unencumbered software released into the public domain. +// For more information, please refer to <http://unlicense.org/> + +// This file describes the C API you can use to make your decision components +// (schedulers, workload injectors...) callable by Batsim as dynamic libraries. + +#ifdef __cplusplus +extern "C" { +#endif +#include <stdint.h> + +// These are the flags supported by batsim_edc_init() argument. +#define BATSIM_EDC_FORMAT_BINARY 0x1 // Format is flatbuffers's binary format. Messages are pointers to buffers generated by a flatbuffers library. +#define BATSIM_EDC_FORMAT_JSON 0x2 // Format is flatbuffer's JSON format. Messages are NULL-terminated C strings with JSON content. + +/** + * @brief The batsim_edc_init() function is called by Batsim to initialize your external decision component. + * @details This is typically used to initialize global data structures to de/serialize messages and to take decisions later on. + * + * @param[in] data The initialization data of your decision component. This is retrieved from Batsim's command-line arguments. + * @param[in] size The size of your initialization data. This is retrieved from Batsim's command-line arguments. + * @param[in] flags These flags tell you additional information on how to communicate with Batsim. + * Currently, this is only used to know which data format should be used (flatbuffers's binary format or flatbuffer's JSON format). + * @return Zero if and only if you could initialize yourself successfully. + */ +uint8_t batsim_edc_init(const uint8_t * data, uint32_t size, uint32_t flags); + +/** + * @brief The batsim_edc_deinit() function is called by Batsim when it stops calling your decision component. + * @details This is typically used to deallocate any memory allocated by batsim_edc_init() or batsim_edc_take_decisions(). + * @return Zero if and only if you could deinitialize yourself successfully. + */ +uint8_t batsim_edc_deinit(); + +/** + * @brief The batsim_edc_take_decisions() function is called by Batsim when it asks you to take decisions. + * + * @param[in] what_happened A Batsim protocol message that contains what happened in the simulation since the previous call to your decision component + * (that is to say, since last batsim_edc_take_decisions() call or since the initial batsim_edc_init() at the beginning of the simulation). + * The message format depends on what flags were given to batsim_edc_init(). + * @param[in] what_happened_size The size (in bytes) of the what_happened input buffer. + * @param[out] decisions A Batsim protocol message that contains the decisions taken by this function. + * The buffer should be formatted according to the flags given to batsim_edc_init(). + * This buffer must be allocated by you and must persist in memory at least until the next batsim_edc_take_decisions() or batsim_edc_deinit() call. + * @param[out] decisions_size The size (in bytes) of the decisions output buffer. + * @return Zero if and only if you could take decisions. + */ +uint8_t batsim_edc_take_decisions(const uint8_t * what_happened, uint32_t what_happened_size, uint8_t ** decisions, uint32_t * decisions_size); + +#ifdef __cplusplus +} +#endif diff --git a/easypower.cpp b/easypower.cpp new file mode 100644 index 0000000000000000000000000000000000000000..b8a615b16af2bc97fdd8518c738afee204c493ca --- /dev/null +++ b/easypower.cpp @@ -0,0 +1,254 @@ +#include <cstdint> +#include <list> +#include <string> +#include <unordered_map> + +#include <batprotocol.hpp> +#include <intervalset.hpp> +#include <nlohmann/json.hpp> + +#include "batsim_edc.h" + +using namespace batprotocol; +using json = nlohmann::json; + +struct Job +{ + std::string id; + uint32_t nb_hosts; + double walltime; + + IntervalSet alloc; + double maximum_finish_time; + double power_estimation; +}; + +MessageBuilder * mb = nullptr; +bool format_binary = true; // whether flatbuffers binary or json format should be used + +uint32_t platform_nb_hosts = 0; +std::list<::Job*> job_queue; +std::unordered_map<std::string, ::Job*> running_jobs; +uint32_t nb_available_hosts = 0; +IntervalSet available_hosts; + +double platform_normal_dynamic_watts = -1; +double platform_powercap_dynamic_watts = -1; +double platform_current_powercap_dynamic_watts = -1; +double platform_nb_available_watts = -1; +double powercap_end_time = -1; +double idle_power_watts = -1; // per node +std::string power_estimation_field; + +uint8_t batsim_edc_init(const uint8_t * data, uint32_t size, uint32_t flags) +{ + format_binary = ((flags & BATSIM_EDC_FORMAT_BINARY) != 0); + if ((flags & (BATSIM_EDC_FORMAT_BINARY | BATSIM_EDC_FORMAT_JSON)) != flags) + { + printf("Unknown flags used, cannot initialize myself.\n"); + return 1; + } + + mb = new MessageBuilder(!format_binary); + + std::string init_string((const char *)data, static_cast<size_t>(size)); + try { + auto init_json = json::parse(init_string); + platform_normal_dynamic_watts = init_json["normal_dynamic_watts"]; + platform_powercap_dynamic_watts = init_json["powercap_dynamic_watts"]; + idle_power_watts = init_json["idle_watts"]; + platform_current_powercap_dynamic_watts = platform_powercap_dynamic_watts; // simulation starts in the powercap constrained window + platform_nb_available_watts = platform_current_powercap_dynamic_watts; + powercap_end_time = init_json["powercap_end_time_seconds"]; + power_estimation_field = init_json["job_power_estimation_field"]; + } catch (const json::exception & e) { + throw std::runtime_error("scheduler called with bad init string: " + std::string(e.what())); + } + + return 0; +} + +uint8_t batsim_edc_deinit() +{ + delete mb; + mb = nullptr; + + return 0; +} + +bool ascending_max_finish_time_job_order(const ::Job* a, const ::Job* b) { + return a->maximum_finish_time < b->maximum_finish_time; +} + +uint8_t batsim_edc_take_decisions( + const uint8_t * what_happened, + uint32_t what_happened_size, + uint8_t ** decisions, + uint32_t * decisions_size) +{ + (void) what_happened_size; + auto * parsed = deserialize_message(*mb, !format_binary, what_happened); + mb->clear(parsed->now()); + + // should only become true once, when the powercap window finishes + if (parsed->now() >= powercap_end_time) { + platform_current_powercap_dynamic_watts = platform_normal_dynamic_watts; + platform_nb_available_watts += (platform_normal_dynamic_watts - platform_powercap_dynamic_watts); + } + + bool need_scheduling = false; + auto nb_events = parsed->events()->size(); + for (unsigned int i = 0; i < nb_events; ++i) { + auto event = (*parsed->events())[i]; + switch (event->event_type()) + { + case fb::Event_BatsimHelloEvent: { + mb->add_edc_hello("easy-powercap", "0.1.0"); + } break; + case fb::Event_SimulationBeginsEvent: { + auto simu_begins = event->event_as_SimulationBeginsEvent(); + platform_nb_hosts = simu_begins->computation_host_number(); + nb_available_hosts = platform_nb_hosts; + available_hosts = IntervalSet::ClosedInterval(0, platform_nb_hosts - 1); + } break; + case fb::Event_JobSubmittedEvent: { + auto parsed_job = event->event_as_JobSubmittedEvent(); + ::Job job{ + parsed_job->job_id()->str(), + parsed_job->job()->resource_request(), + parsed_job->job()->walltime(), + IntervalSet::empty_interval_set(), + -1, + -1 + }; + + try { + auto extra_data = json::parse(parsed_job->job()->extra_data()->str()); + job.power_estimation = std::max(0.0, (double)extra_data[power_estimation_field] - idle_power_watts * job.nb_hosts); + } catch (const json::exception & e) { + throw std::runtime_error("bad extra_data in job submitted: tried to read field " + power_estimation_field); + } + + if ( (job.nb_hosts > platform_nb_hosts) // usual EASY predicate + || (job.power_estimation > platform_normal_dynamic_watts) // powercap predicate + ) + mb->add_reject_job(job.id); + else if (job.walltime <= 0) + mb->add_reject_job(job.id); + else { + need_scheduling = true; + job_queue.emplace_back(new ::Job(job)); + } + } break; + case fb::Event_JobCompletedEvent: { + need_scheduling = true; + + auto job_id = event->event_as_JobCompletedEvent()->job_id()->str(); + auto job_it = running_jobs.find(job_id); + auto job = job_it->second; + nb_available_hosts += job->nb_hosts; // usual EASY update + available_hosts += job->alloc; + platform_nb_available_watts += job->power_estimation; // powercap update + + delete job; + running_jobs.erase(job_it); + } break; + default: break; + } + } + + if (need_scheduling) { + ::Job* priority_job = nullptr; + uint32_t nb_available_hosts_at_priority_job_start = 0; + double nb_available_watts_at_priority_job_start = 0; + float priority_job_start_time = -1; + + // First traversal, done until a job cannot be executed right now and is set as the priority job + // (or until all jobs have been executed) + auto job_it = job_queue.begin(); + for (; job_it != job_queue.end(); ) { + auto job = *job_it; + if ( (job->nb_hosts <= nb_available_hosts) // usual EASY predicate + && (job->power_estimation <= platform_nb_available_watts) // powercap predicate + ) { + running_jobs[job->id] = *job_it; + job->maximum_finish_time = parsed->now() + job->walltime; + job->alloc = available_hosts.left(job->nb_hosts); + mb->add_execute_job(job->id, job->alloc.to_string_hyphen()); + available_hosts -= job->alloc; + nb_available_hosts -= job->nb_hosts; + platform_nb_available_watts -= job->power_estimation; + + job_it = job_queue.erase(job_it); + } + else + { + priority_job = *job_it; + ++job_it; + + // compute when the priority job can start, and the number of available machines at this time + std::vector<::Job*> running_jobs_asc_maximum_finish_time; + running_jobs_asc_maximum_finish_time.reserve(running_jobs.size()); + for (const auto & it : running_jobs) + running_jobs_asc_maximum_finish_time.push_back(it.second); + std::sort(running_jobs_asc_maximum_finish_time.begin(), running_jobs_asc_maximum_finish_time.end(), ascending_max_finish_time_job_order); + + nb_available_hosts_at_priority_job_start = nb_available_hosts; + nb_available_watts_at_priority_job_start = platform_nb_available_watts; + for (const auto & job : running_jobs_asc_maximum_finish_time) { + nb_available_hosts_at_priority_job_start += job->nb_hosts; + nb_available_watts_at_priority_job_start += job->power_estimation; + if ( (nb_available_hosts_at_priority_job_start >= priority_job->nb_hosts) // usual EASY predicate + && (nb_available_watts_at_priority_job_start >= priority_job->power_estimation) // powercap predicate + ) { + nb_available_hosts_at_priority_job_start -= priority_job->nb_hosts; + nb_available_watts_at_priority_job_start -= priority_job->power_estimation; + priority_job_start_time = job->maximum_finish_time; + break; + } + } + + break; + } + } + + // Continue traversal to backfill jobs + for (; job_it != job_queue.end(); ) { + auto job = *job_it; + // should the job be backfilled? + float job_finish_time = parsed->now() + job->walltime; + if ( (job->nb_hosts <= nb_available_hosts) // enough hosts now? + && (job->power_estimation <= platform_nb_available_watts) // enough power now? + && ( + ( (job->nb_hosts <= nb_available_hosts_at_priority_job_start) // cannot hinder priority job at all regarding hosts + && (job->power_estimation <= nb_available_watts_at_priority_job_start) // cannot hinder priority job at all regarding watts + ) // previous block if true if the backfilled job cannot hinder the priority job regardless of the backfilled job duration + || (job_finish_time <= priority_job_start_time) // the backfilled job finishes before the priority job's expected start time + ) + ) { + running_jobs[job->id] = *job_it; + job->maximum_finish_time = job_finish_time; + job->alloc = available_hosts.left(job->nb_hosts); + mb->add_execute_job(job->id, job->alloc.to_string_hyphen()); + available_hosts -= job->alloc; + nb_available_hosts -= job->nb_hosts; + platform_nb_available_watts -= job->power_estimation; + + if (job_finish_time > priority_job_start_time) { + nb_available_hosts_at_priority_job_start -= job->nb_hosts; + nb_available_watts_at_priority_job_start -= job->power_estimation; + } + + job_it = job_queue.erase(job_it); + } + else if (nb_available_hosts == 0) + break; + else + ++job_it; + } + } + + mb->finish_message(parsed->now()); + serialize_message(*mb, !format_binary, const_cast<const uint8_t **>(decisions), decisions_size); + return 0; +} diff --git a/flake.lock b/flake.lock new file mode 100644 index 0000000000000000000000000000000000000000..86fd1075d3e396ccda018284cb3d565dc32843bd --- /dev/null +++ b/flake.lock @@ -0,0 +1,140 @@ +{ + "nodes": { + "batprotocol-flake": { + "inputs": { + "flake-utils": [ + "flake-utils" + ], + "nixpkgs": [ + "nixpkgs" + ], + "nur-kapack": [ + "nur-kapack" + ] + }, + "locked": { + "lastModified": 1707316999, + "narHash": "sha256-+jGORWq0Vx9JKFfkMU2u5ZhnSD4Nc7eRDu9kBgZgJto=", + "ref": "refs/heads/master", + "rev": "25bc5bbf039c18a8024c4ab326047ba56800376a", + "revCount": 153, + "type": "git", + "url": "https://framagit.org/batsim/batprotocol" + }, + "original": { + "type": "git", + "url": "https://framagit.org/batsim/batprotocol" + } + }, + "flake-utils": { + "inputs": { + "systems": "systems" + }, + "locked": { + "lastModified": 1710146030, + "narHash": "sha256-SZ5L6eA7HJ/nmkzGG7/ISclqe6oZdOZTNoesiInkXPQ=", + "owner": "numtide", + "repo": "flake-utils", + "rev": "b1d9ab70662946ef0850d488da1c9019f3a9752a", + "type": "github" + }, + "original": { + "owner": "numtide", + "repo": "flake-utils", + "type": "github" + } + }, + "intervalset-flake": { + "inputs": { + "flake-utils": [ + "flake-utils" + ], + "nixpkgs": [ + "nixpkgs" + ], + "nur-kapack": [ + "nur-kapack" + ] + }, + "locked": { + "lastModified": 1707321891, + "narHash": "sha256-eEH9zxYphfl/0PPX39ChMpguF/yuR4z4ZLOlWPwkoHA=", + "ref": "refs/heads/main", + "rev": "13d8f2d7000d4dc4e3202422658b0b5d83f83679", + "revCount": 84, + "type": "git", + "url": "https://framagit.org/batsim/intervalset" + }, + "original": { + "type": "git", + "url": "https://framagit.org/batsim/intervalset" + } + }, + "nixpkgs": { + "locked": { + "lastModified": 1701282334, + "narHash": "sha256-MxCVrXY6v4QmfTwIysjjaX0XUhqBbxTWWB4HXtDYsdk=", + "owner": "nixos", + "repo": "nixpkgs", + "rev": "057f9aecfb71c4437d2b27d3323df7f93c010b7e", + "type": "github" + }, + "original": { + "owner": "nixos", + "ref": "23.11", + "repo": "nixpkgs", + "type": "github" + } + }, + "nur-kapack": { + "inputs": { + "flake-utils": [ + "flake-utils" + ], + "nixpkgs": [ + "nixpkgs" + ] + }, + "locked": { + "lastModified": 1709732514, + "narHash": "sha256-beQGEh9mgRL4T7tdkSlT/MrhncmfICWt3LmfuydGL1o=", + "owner": "oar-team", + "repo": "nur-kapack", + "rev": "2c3f5d9f5078229de10394eead8f8ed28d17b6a9", + "type": "github" + }, + "original": { + "owner": "oar-team", + "ref": "master", + "repo": "nur-kapack", + "type": "github" + } + }, + "root": { + "inputs": { + "batprotocol-flake": "batprotocol-flake", + "flake-utils": "flake-utils", + "intervalset-flake": "intervalset-flake", + "nixpkgs": "nixpkgs", + "nur-kapack": "nur-kapack" + } + }, + "systems": { + "locked": { + "lastModified": 1681028828, + "narHash": "sha256-Vy1rq5AaRuLzOxct8nz4T6wlgyUR7zLU309k9mBC768=", + "owner": "nix-systems", + "repo": "default", + "rev": "da67096a3b9bf56a91d16901293e51ba5b49a27e", + "type": "github" + }, + "original": { + "owner": "nix-systems", + "repo": "default", + "type": "github" + } + } + }, + "root": "root", + "version": 7 +} diff --git a/flake.nix b/flake.nix new file mode 100644 index 0000000000000000000000000000000000000000..d77d0d5e3eeb14e5978b286c5e3a70598889c3f4 --- /dev/null +++ b/flake.nix @@ -0,0 +1,57 @@ +{ + inputs = { + nixpkgs.url = "github:nixos/nixpkgs?ref=23.11"; + flake-utils.url = "github:numtide/flake-utils"; + nur-kapack = { + url = "github:oar-team/nur-kapack/master"; + inputs.nixpkgs.follows = "nixpkgs"; + inputs.flake-utils.follows = "flake-utils"; + }; + intervalset-flake = { + url = "git+https://framagit.org/batsim/intervalset"; + inputs.nixpkgs.follows = "nixpkgs"; + inputs.nur-kapack.follows = "nur-kapack"; + inputs.flake-utils.follows = "flake-utils"; + }; + batprotocol-flake = { + url = "git+https://framagit.org/batsim/batprotocol"; + inputs.nixpkgs.follows = "nixpkgs"; + inputs.nur-kapack.follows = "nur-kapack"; + inputs.flake-utils.follows = "flake-utils"; + }; + }; + + outputs = { self, nixpkgs, flake-utils, nur-kapack, intervalset-flake, batprotocol-flake }: + flake-utils.lib.eachSystem [ "x86_64-linux" ] (system: + let + pkgs = import nixpkgs { inherit system; }; + batprotopkgs = batprotocol-flake.packages.${system}; + intervalsetpkgs = intervalset-flake.packages.${system}; + in rec { + packages = rec { + easypower = pkgs.stdenv.mkDerivation rec { + name = "easypower"; + version = "0.1.0"; + src = pkgs.lib.sourceByRegex ./. [ + "^meson\.build" + "^.*\.?pp" + "^.*\.h" + ]; + buildInputs = [ + pkgs.nlohmann_json + batprotopkgs.batprotocol-cpp + intervalsetpkgs.intervalset + ]; + nativeBuildInputs = [ + pkgs.meson + pkgs.ninja + pkgs.pkg-config + ]; + }; + default = easypower; + }; + devShells = { + }; + } + ); +} diff --git a/meson.build b/meson.build new file mode 100644 index 0000000000000000000000000000000000000000..33dca2ad3bb96daff39894f4428b457137e6f559 --- /dev/null +++ b/meson.build @@ -0,0 +1,23 @@ +project('energusched', 'cpp', + version: '0.1.0', + license: 'LGPL-3.0', + default_options: ['cpp_std=c++17'], + meson_version: '>=0.40.0' +) + +batprotocol_cpp_dep = dependency('batprotocol-cpp') +boost_dep = dependency('boost') +intervalset_dep = dependency('intervalset') +nlohmann_json_dep = dependency('nlohmann_json') +deps = [ + batprotocol_cpp_dep +] + +common = [ + 'batsim_edc.h' +] + +easypower = shared_library('easypower', common + ['easypower.cpp'], + dependencies: deps + [boost_dep, intervalset_dep, nlohmann_json_dep], + install: true, +)