Skip to content
Snippets Groups Projects
Commit d94517cd authored by Millian Poquet's avatar Millian Poquet
Browse files

initial implementation

parent a4fe84f6
No related merge requests found
// This is free and unencumbered software released into the public domain.
// For more information, please refer to <http://unlicense.org/>
// This file describes the C API you can use to make your decision components
// (schedulers, workload injectors...) callable by Batsim as dynamic libraries.
#ifdef __cplusplus
extern "C" {
#endif
#include <stdint.h>
// These are the flags supported by batsim_edc_init() argument.
#define BATSIM_EDC_FORMAT_BINARY 0x1 // Format is flatbuffers's binary format. Messages are pointers to buffers generated by a flatbuffers library.
#define BATSIM_EDC_FORMAT_JSON 0x2 // Format is flatbuffer's JSON format. Messages are NULL-terminated C strings with JSON content.
/**
* @brief The batsim_edc_init() function is called by Batsim to initialize your external decision component.
* @details This is typically used to initialize global data structures to de/serialize messages and to take decisions later on.
*
* @param[in] data The initialization data of your decision component. This is retrieved from Batsim's command-line arguments.
* @param[in] size The size of your initialization data. This is retrieved from Batsim's command-line arguments.
* @param[in] flags These flags tell you additional information on how to communicate with Batsim.
* Currently, this is only used to know which data format should be used (flatbuffers's binary format or flatbuffer's JSON format).
* @return Zero if and only if you could initialize yourself successfully.
*/
uint8_t batsim_edc_init(const uint8_t * data, uint32_t size, uint32_t flags);
/**
* @brief The batsim_edc_deinit() function is called by Batsim when it stops calling your decision component.
* @details This is typically used to deallocate any memory allocated by batsim_edc_init() or batsim_edc_take_decisions().
* @return Zero if and only if you could deinitialize yourself successfully.
*/
uint8_t batsim_edc_deinit();
/**
* @brief The batsim_edc_take_decisions() function is called by Batsim when it asks you to take decisions.
*
* @param[in] what_happened A Batsim protocol message that contains what happened in the simulation since the previous call to your decision component
* (that is to say, since last batsim_edc_take_decisions() call or since the initial batsim_edc_init() at the beginning of the simulation).
* The message format depends on what flags were given to batsim_edc_init().
* @param[in] what_happened_size The size (in bytes) of the what_happened input buffer.
* @param[out] decisions A Batsim protocol message that contains the decisions taken by this function.
* The buffer should be formatted according to the flags given to batsim_edc_init().
* This buffer must be allocated by you and must persist in memory at least until the next batsim_edc_take_decisions() or batsim_edc_deinit() call.
* @param[out] decisions_size The size (in bytes) of the decisions output buffer.
* @return Zero if and only if you could take decisions.
*/
uint8_t batsim_edc_take_decisions(const uint8_t * what_happened, uint32_t what_happened_size, uint8_t ** decisions, uint32_t * decisions_size);
#ifdef __cplusplus
}
#endif
#include <cstdint>
#include <list>
#include <string>
#include <unordered_map>
#include <batprotocol.hpp>
#include <intervalset.hpp>
#include <nlohmann/json.hpp>
#include "batsim_edc.h"
using namespace batprotocol;
using json = nlohmann::json;
struct Job
{
std::string id;
uint32_t nb_hosts;
double walltime;
IntervalSet alloc;
double maximum_finish_time;
double power_estimation;
};
MessageBuilder * mb = nullptr;
bool format_binary = true; // whether flatbuffers binary or json format should be used
uint32_t platform_nb_hosts = 0;
std::list<::Job*> job_queue;
std::unordered_map<std::string, ::Job*> running_jobs;
uint32_t nb_available_hosts = 0;
IntervalSet available_hosts;
double platform_normal_dynamic_watts = -1;
double platform_powercap_dynamic_watts = -1;
double platform_current_powercap_dynamic_watts = -1;
double platform_nb_available_watts = -1;
double powercap_end_time = -1;
double idle_power_watts = -1; // per node
std::string power_estimation_field;
uint8_t batsim_edc_init(const uint8_t * data, uint32_t size, uint32_t flags)
{
format_binary = ((flags & BATSIM_EDC_FORMAT_BINARY) != 0);
if ((flags & (BATSIM_EDC_FORMAT_BINARY | BATSIM_EDC_FORMAT_JSON)) != flags)
{
printf("Unknown flags used, cannot initialize myself.\n");
return 1;
}
mb = new MessageBuilder(!format_binary);
std::string init_string((const char *)data, static_cast<size_t>(size));
try {
auto init_json = json::parse(init_string);
platform_normal_dynamic_watts = init_json["normal_dynamic_watts"];
platform_powercap_dynamic_watts = init_json["powercap_dynamic_watts"];
idle_power_watts = init_json["idle_watts"];
platform_current_powercap_dynamic_watts = platform_powercap_dynamic_watts; // simulation starts in the powercap constrained window
platform_nb_available_watts = platform_current_powercap_dynamic_watts;
powercap_end_time = init_json["powercap_end_time_seconds"];
power_estimation_field = init_json["job_power_estimation_field"];
} catch (const json::exception & e) {
throw std::runtime_error("scheduler called with bad init string: " + std::string(e.what()));
}
return 0;
}
uint8_t batsim_edc_deinit()
{
delete mb;
mb = nullptr;
return 0;
}
bool ascending_max_finish_time_job_order(const ::Job* a, const ::Job* b) {
return a->maximum_finish_time < b->maximum_finish_time;
}
uint8_t batsim_edc_take_decisions(
const uint8_t * what_happened,
uint32_t what_happened_size,
uint8_t ** decisions,
uint32_t * decisions_size)
{
(void) what_happened_size;
auto * parsed = deserialize_message(*mb, !format_binary, what_happened);
mb->clear(parsed->now());
// should only become true once, when the powercap window finishes
if (parsed->now() >= powercap_end_time) {
platform_current_powercap_dynamic_watts = platform_normal_dynamic_watts;
platform_nb_available_watts += (platform_normal_dynamic_watts - platform_powercap_dynamic_watts);
}
bool need_scheduling = false;
auto nb_events = parsed->events()->size();
for (unsigned int i = 0; i < nb_events; ++i) {
auto event = (*parsed->events())[i];
switch (event->event_type())
{
case fb::Event_BatsimHelloEvent: {
mb->add_edc_hello("easy-powercap", "0.1.0");
} break;
case fb::Event_SimulationBeginsEvent: {
auto simu_begins = event->event_as_SimulationBeginsEvent();
platform_nb_hosts = simu_begins->computation_host_number();
nb_available_hosts = platform_nb_hosts;
available_hosts = IntervalSet::ClosedInterval(0, platform_nb_hosts - 1);
} break;
case fb::Event_JobSubmittedEvent: {
auto parsed_job = event->event_as_JobSubmittedEvent();
::Job job{
parsed_job->job_id()->str(),
parsed_job->job()->resource_request(),
parsed_job->job()->walltime(),
IntervalSet::empty_interval_set(),
-1,
-1
};
try {
auto extra_data = json::parse(parsed_job->job()->extra_data()->str());
job.power_estimation = std::max(0.0, (double)extra_data[power_estimation_field] - idle_power_watts * job.nb_hosts);
} catch (const json::exception & e) {
throw std::runtime_error("bad extra_data in job submitted: tried to read field " + power_estimation_field);
}
if ( (job.nb_hosts > platform_nb_hosts) // usual EASY predicate
|| (job.power_estimation > platform_normal_dynamic_watts) // powercap predicate
)
mb->add_reject_job(job.id);
else if (job.walltime <= 0)
mb->add_reject_job(job.id);
else {
need_scheduling = true;
job_queue.emplace_back(new ::Job(job));
}
} break;
case fb::Event_JobCompletedEvent: {
need_scheduling = true;
auto job_id = event->event_as_JobCompletedEvent()->job_id()->str();
auto job_it = running_jobs.find(job_id);
auto job = job_it->second;
nb_available_hosts += job->nb_hosts; // usual EASY update
available_hosts += job->alloc;
platform_nb_available_watts += job->power_estimation; // powercap update
delete job;
running_jobs.erase(job_it);
} break;
default: break;
}
}
if (need_scheduling) {
::Job* priority_job = nullptr;
uint32_t nb_available_hosts_at_priority_job_start = 0;
double nb_available_watts_at_priority_job_start = 0;
float priority_job_start_time = -1;
// First traversal, done until a job cannot be executed right now and is set as the priority job
// (or until all jobs have been executed)
auto job_it = job_queue.begin();
for (; job_it != job_queue.end(); ) {
auto job = *job_it;
if ( (job->nb_hosts <= nb_available_hosts) // usual EASY predicate
&& (job->power_estimation <= platform_nb_available_watts) // powercap predicate
) {
running_jobs[job->id] = *job_it;
job->maximum_finish_time = parsed->now() + job->walltime;
job->alloc = available_hosts.left(job->nb_hosts);
mb->add_execute_job(job->id, job->alloc.to_string_hyphen());
available_hosts -= job->alloc;
nb_available_hosts -= job->nb_hosts;
platform_nb_available_watts -= job->power_estimation;
job_it = job_queue.erase(job_it);
}
else
{
priority_job = *job_it;
++job_it;
// compute when the priority job can start, and the number of available machines at this time
std::vector<::Job*> running_jobs_asc_maximum_finish_time;
running_jobs_asc_maximum_finish_time.reserve(running_jobs.size());
for (const auto & it : running_jobs)
running_jobs_asc_maximum_finish_time.push_back(it.second);
std::sort(running_jobs_asc_maximum_finish_time.begin(), running_jobs_asc_maximum_finish_time.end(), ascending_max_finish_time_job_order);
nb_available_hosts_at_priority_job_start = nb_available_hosts;
nb_available_watts_at_priority_job_start = platform_nb_available_watts;
for (const auto & job : running_jobs_asc_maximum_finish_time) {
nb_available_hosts_at_priority_job_start += job->nb_hosts;
nb_available_watts_at_priority_job_start += job->power_estimation;
if ( (nb_available_hosts_at_priority_job_start >= priority_job->nb_hosts) // usual EASY predicate
&& (nb_available_watts_at_priority_job_start >= priority_job->power_estimation) // powercap predicate
) {
nb_available_hosts_at_priority_job_start -= priority_job->nb_hosts;
nb_available_watts_at_priority_job_start -= priority_job->power_estimation;
priority_job_start_time = job->maximum_finish_time;
break;
}
}
break;
}
}
// Continue traversal to backfill jobs
for (; job_it != job_queue.end(); ) {
auto job = *job_it;
// should the job be backfilled?
float job_finish_time = parsed->now() + job->walltime;
if ( (job->nb_hosts <= nb_available_hosts) // enough hosts now?
&& (job->power_estimation <= platform_nb_available_watts) // enough power now?
&& (
( (job->nb_hosts <= nb_available_hosts_at_priority_job_start) // cannot hinder priority job at all regarding hosts
&& (job->power_estimation <= nb_available_watts_at_priority_job_start) // cannot hinder priority job at all regarding watts
) // previous block if true if the backfilled job cannot hinder the priority job regardless of the backfilled job duration
|| (job_finish_time <= priority_job_start_time) // the backfilled job finishes before the priority job's expected start time
)
) {
running_jobs[job->id] = *job_it;
job->maximum_finish_time = job_finish_time;
job->alloc = available_hosts.left(job->nb_hosts);
mb->add_execute_job(job->id, job->alloc.to_string_hyphen());
available_hosts -= job->alloc;
nb_available_hosts -= job->nb_hosts;
platform_nb_available_watts -= job->power_estimation;
if (job_finish_time > priority_job_start_time) {
nb_available_hosts_at_priority_job_start -= job->nb_hosts;
nb_available_watts_at_priority_job_start -= job->power_estimation;
}
job_it = job_queue.erase(job_it);
}
else if (nb_available_hosts == 0)
break;
else
++job_it;
}
}
mb->finish_message(parsed->now());
serialize_message(*mb, !format_binary, const_cast<const uint8_t **>(decisions), decisions_size);
return 0;
}
{
"nodes": {
"batprotocol-flake": {
"inputs": {
"flake-utils": [
"flake-utils"
],
"nixpkgs": [
"nixpkgs"
],
"nur-kapack": [
"nur-kapack"
]
},
"locked": {
"lastModified": 1707316999,
"narHash": "sha256-+jGORWq0Vx9JKFfkMU2u5ZhnSD4Nc7eRDu9kBgZgJto=",
"ref": "refs/heads/master",
"rev": "25bc5bbf039c18a8024c4ab326047ba56800376a",
"revCount": 153,
"type": "git",
"url": "https://framagit.org/batsim/batprotocol"
},
"original": {
"type": "git",
"url": "https://framagit.org/batsim/batprotocol"
}
},
"flake-utils": {
"inputs": {
"systems": "systems"
},
"locked": {
"lastModified": 1710146030,
"narHash": "sha256-SZ5L6eA7HJ/nmkzGG7/ISclqe6oZdOZTNoesiInkXPQ=",
"owner": "numtide",
"repo": "flake-utils",
"rev": "b1d9ab70662946ef0850d488da1c9019f3a9752a",
"type": "github"
},
"original": {
"owner": "numtide",
"repo": "flake-utils",
"type": "github"
}
},
"intervalset-flake": {
"inputs": {
"flake-utils": [
"flake-utils"
],
"nixpkgs": [
"nixpkgs"
],
"nur-kapack": [
"nur-kapack"
]
},
"locked": {
"lastModified": 1707321891,
"narHash": "sha256-eEH9zxYphfl/0PPX39ChMpguF/yuR4z4ZLOlWPwkoHA=",
"ref": "refs/heads/main",
"rev": "13d8f2d7000d4dc4e3202422658b0b5d83f83679",
"revCount": 84,
"type": "git",
"url": "https://framagit.org/batsim/intervalset"
},
"original": {
"type": "git",
"url": "https://framagit.org/batsim/intervalset"
}
},
"nixpkgs": {
"locked": {
"lastModified": 1701282334,
"narHash": "sha256-MxCVrXY6v4QmfTwIysjjaX0XUhqBbxTWWB4HXtDYsdk=",
"owner": "nixos",
"repo": "nixpkgs",
"rev": "057f9aecfb71c4437d2b27d3323df7f93c010b7e",
"type": "github"
},
"original": {
"owner": "nixos",
"ref": "23.11",
"repo": "nixpkgs",
"type": "github"
}
},
"nur-kapack": {
"inputs": {
"flake-utils": [
"flake-utils"
],
"nixpkgs": [
"nixpkgs"
]
},
"locked": {
"lastModified": 1709732514,
"narHash": "sha256-beQGEh9mgRL4T7tdkSlT/MrhncmfICWt3LmfuydGL1o=",
"owner": "oar-team",
"repo": "nur-kapack",
"rev": "2c3f5d9f5078229de10394eead8f8ed28d17b6a9",
"type": "github"
},
"original": {
"owner": "oar-team",
"ref": "master",
"repo": "nur-kapack",
"type": "github"
}
},
"root": {
"inputs": {
"batprotocol-flake": "batprotocol-flake",
"flake-utils": "flake-utils",
"intervalset-flake": "intervalset-flake",
"nixpkgs": "nixpkgs",
"nur-kapack": "nur-kapack"
}
},
"systems": {
"locked": {
"lastModified": 1681028828,
"narHash": "sha256-Vy1rq5AaRuLzOxct8nz4T6wlgyUR7zLU309k9mBC768=",
"owner": "nix-systems",
"repo": "default",
"rev": "da67096a3b9bf56a91d16901293e51ba5b49a27e",
"type": "github"
},
"original": {
"owner": "nix-systems",
"repo": "default",
"type": "github"
}
}
},
"root": "root",
"version": 7
}
{
inputs = {
nixpkgs.url = "github:nixos/nixpkgs?ref=23.11";
flake-utils.url = "github:numtide/flake-utils";
nur-kapack = {
url = "github:oar-team/nur-kapack/master";
inputs.nixpkgs.follows = "nixpkgs";
inputs.flake-utils.follows = "flake-utils";
};
intervalset-flake = {
url = "git+https://framagit.org/batsim/intervalset";
inputs.nixpkgs.follows = "nixpkgs";
inputs.nur-kapack.follows = "nur-kapack";
inputs.flake-utils.follows = "flake-utils";
};
batprotocol-flake = {
url = "git+https://framagit.org/batsim/batprotocol";
inputs.nixpkgs.follows = "nixpkgs";
inputs.nur-kapack.follows = "nur-kapack";
inputs.flake-utils.follows = "flake-utils";
};
};
outputs = { self, nixpkgs, flake-utils, nur-kapack, intervalset-flake, batprotocol-flake }:
flake-utils.lib.eachSystem [ "x86_64-linux" ] (system:
let
pkgs = import nixpkgs { inherit system; };
batprotopkgs = batprotocol-flake.packages.${system};
intervalsetpkgs = intervalset-flake.packages.${system};
in rec {
packages = rec {
easypower = pkgs.stdenv.mkDerivation rec {
name = "easypower";
version = "0.1.0";
src = pkgs.lib.sourceByRegex ./. [
"^meson\.build"
"^.*\.?pp"
"^.*\.h"
];
buildInputs = [
pkgs.nlohmann_json
batprotopkgs.batprotocol-cpp
intervalsetpkgs.intervalset
];
nativeBuildInputs = [
pkgs.meson
pkgs.ninja
pkgs.pkg-config
];
};
default = easypower;
};
devShells = {
};
}
);
}
project('energusched', 'cpp',
version: '0.1.0',
license: 'LGPL-3.0',
default_options: ['cpp_std=c++17'],
meson_version: '>=0.40.0'
)
batprotocol_cpp_dep = dependency('batprotocol-cpp')
boost_dep = dependency('boost')
intervalset_dep = dependency('intervalset')
nlohmann_json_dep = dependency('nlohmann_json')
deps = [
batprotocol_cpp_dep
]
common = [
'batsim_edc.h'
]
easypower = shared_library('easypower', common + ['easypower.cpp'],
dependencies: deps + [boost_dep, intervalset_dep, nlohmann_json_dep],
install: true,
)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment