Skip to content
Snippets Groups Projects
Commit cfbc45ab authored by Millian Poquet's avatar Millian Poquet
Browse files

Initial commit

parents
Branches
Tags
No related merge requests found
Showing
with 1712 additions and 0 deletions
*.user
build
cmake_minimum_required(VERSION 3.1.0 FATAL_ERROR)
project(batsched CXX)
set(CMAKE_BUILD_TYPE Debug)
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wall -Wextra")
set(CMAKE_MODULE_PATH ${CMAKE_MODULE_PATH} "${CMAKE_SOURCE_DIR}/cmake/Modules/")
# Options
option(enable_warnings "Enable compilation warnings" ON)
option(treat_warnings_as_errors "Treat compilation warnings as compilation errors" OFF)
# Build type
# Set a default build type if none was specified
if(NOT CMAKE_BUILD_TYPE AND NOT CMAKE_CONFIGURATION_TYPES)
message(STATUS "Setting build type to 'Debug' as none was specified.")
set(CMAKE_BUILD_TYPE Debug CACHE STRING "Choose the type of build." FORCE)
# Set the possible values of build type for cmake-gui
set_property(CACHE CMAKE_BUILD_TYPE PROPERTY STRINGS "Debug" "Release"
"MinSizeRel" "RelWithDebInfo")
endif()
# Let's enable C++11
set(CMAKE_CXX_STANDARD 11)
set(CMAKE_CXX_STANDARD_REQUIRED ON)
# Dependencies
## Boost
find_package(Boost 1.48 REQUIRED COMPONENTS regex locale)
include_directories(${Boost_INCLUDE_DIR})
## GMP
find_package(GMP REQUIRED)
include_directories(${GMP_INCLUDE_DIR})
## RapidJSON
find_package(rapidjson REQUIRED)
include_directories(${RAPIDJSON_INCLUDE_DIRS})
## Redox dependency
find_package(redox REQUIRED)
include_directories(${REDOX_INCLUDE_DIR})
## Redox sub dependencies
find_package(hiredis REQUIRED)
include_directories(${HIREDIS_INCLUDE_DIRS})
find_package(libev REQUIRED)
include_directories(${LIBEV_INCLUDE_DIRS})
file(GLOB batsched_SRC
"src/*.hpp"
"src/*.cpp"
"src/algo/*.hpp"
"src/algo/*.cpp")
add_executable(batsched ${batsched_SRC})
target_link_libraries(batsched ${GMP_LIBRARIES}
${Boost_REGEX_LIBRARY_DEBUG}
${Boost_LOCALE_LIBRARY_DEBUG}
${REDOX_LIBRARY}
${LIBEV_LIBRARY}
${HIREDIS_LIBRARY})
target_link_libraries(batsched boost_program_options)
# Installation
install(PROGRAMS ${CMAKE_CURRENT_BINARY_DIR}/batsched
DESTINATION bin)
# Let's enable warnings if needed
if (enable_warnings)
set(warning_flags " -Wall -Wextra")
if (treat_warnings_as_errors)
set(warning_flags "${warning_flags} -Werror")
endif()
if ("${CMAKE_CXX_COMPILER_ID}" STREQUAL "Clang")
set_property(TARGET batsched APPEND_STRING PROPERTY COMPILE_FLAGS ${warning_flags})
elseif ("${CMAKE_CXX_COMPILER_ID}" STREQUAL "GNU")
set_property(TARGET batsched APPEND_STRING PROPERTY COMPILE_FLAGS ${warning_flags})
else()
message(WARNING "Unknown compiler. Warnings should not be enabled correctly.")
set_property(TARGET batsched APPEND_STRING PROPERTY COMPILE_FLAGS ${warning_flags})
endif()
endif()
# Try to find the GMP librairies
# GMP_FOUND - system has GMP lib
# GMP_INCLUDE_DIR - the GMP include directory
# GMP_LIBRARIES - Libraries needed to use GMP
if (GMP_INCLUDE_DIR AND GMP_LIBRARIES)
# Already in cache, be silent
set(GMP_FIND_QUIETLY TRUE)
endif (GMP_INCLUDE_DIR AND GMP_LIBRARIES)
find_path(GMP_INCLUDE_DIR NAMES gmp.h )
find_library(GMP_LIBRARIES NAMES gmp libgmp )
find_library(GMPXX_LIBRARIES NAMES gmpxx libgmpxx )
MESSAGE(STATUS "GMP libs: " ${GMP_LIBRARIES} " " ${GMPXX_LIBRARIES} )
include(FindPackageHandleStandardArgs)
FIND_PACKAGE_HANDLE_STANDARD_ARGS(GMP DEFAULT_MSG GMP_INCLUDE_DIR GMP_LIBRARIES)
mark_as_advanced(GMP_INCLUDE_DIR GMP_LIBRARIES)
# File got from redox repo, https://github.com/hmartiro/redox.
# Try to find hiredis
# Once done, this will define
#
# HIREDIS_FOUND - system has hiredis
# HIREDIS_INCLUDE_DIRS - hiredis include directories
# HIREDIS_LIBRARIES - libraries need to use hiredis
if(HIREDIS_INCLUDE_DIRS AND HIREDIS_LIBRARIES)
set(HIREDIS_FIND_QUIETLY TRUE)
else()
find_path(
HIREDIS_INCLUDE_DIR
NAMES hiredis/hiredis.h
HINTS ${HIREDIS_ROOT_DIR}
PATH_SUFFIXES include)
find_library(
HIREDIS_LIBRARY
NAMES hiredis
HINTS ${HIREDIS_ROOT_DIR}
PATH_SUFFIXES ${CMAKE_INSTALL_LIBDIR})
set(HIREDIS_INCLUDE_DIRS ${HIREDIS_INCLUDE_DIR})
set(HIREDIS_LIBRARIES ${HIREDIS_LIBRARY})
include (FindPackageHandleStandardArgs)
find_package_handle_standard_args(
hiredis DEFAULT_MSG HIREDIS_LIBRARY HIREDIS_INCLUDE_DIR)
mark_as_advanced(HIREDIS_LIBRARY HIREDIS_INCLUDE_DIR)
endif()
# File got from redox repo, https://github.com/hmartiro/redox.
# Try to find libev
# Once done, this will define
#
# LIBEV_FOUND - system has libev
# LIBEV_INCLUDE_DIRS - libev include directories
# LIBEV_LIBRARIES - libraries needed to use libev
if(LIBEV_INCLUDE_DIRS AND LIBEV_LIBRARIES)
set(LIBEV_FIND_QUIETLY TRUE)
else()
find_path(
LIBEV_INCLUDE_DIR
NAMES ev.h
HINTS ${LIBEV_ROOT_DIR}
PATH_SUFFIXES include)
find_library(
LIBEV_LIBRARY
NAME ev
HINTS ${LIBEV_ROOT_DIR}
PATH_SUFFIXES ${CMAKE_INSTALL_LIBDIR})
set(LIBEV_INCLUDE_DIRS ${LIBEV_INCLUDE_DIR})
set(LIBEV_LIBRARIES ${LIBEV_LIBRARY})
include(FindPackageHandleStandardArgs)
find_package_handle_standard_args(
libev DEFAULT_MSG LIBEV_LIBRARY LIBEV_INCLUDE_DIR)
mark_as_advanced(LIBEV_LIBRARY LIBEV_INCLUDE_DIR)
endif()
# Copyright (c) 2011 Milo Yip (miloyip@gmail.com)
# Copyright (c) 2013 Rafal Jeczalik (rjeczalik@gmail.com)
# Distributed under the MIT License (see license.txt file)
# -----------------------------------------------------------------------------------
#
# Finds the rapidjson library
#
# -----------------------------------------------------------------------------------
#
# Variables used by this module, they can change the default behaviour.
# Those variables need to be either set before calling find_package
# or exported as environment variables before running CMake:
#
# RAPIDJSON_INCLUDEDIR - Set custom include path, useful when rapidjson headers are
# outside system paths
# RAPIDJSON_USE_SSE2 - Configure rapidjson to take advantage of SSE2 capabilities
# RAPIDJSON_USE_SSE42 - Configure rapidjson to take advantage of SSE4.2 capabilities
#
# -----------------------------------------------------------------------------------
#
# Variables defined by this module:
#
# RAPIDJSON_FOUND - True if rapidjson was found
# RAPIDJSON_INCLUDE_DIRS - Path to rapidjson include directory
# RAPIDJSON_CXX_FLAGS - Extra C++ flags required for compilation with rapidjson
#
# -----------------------------------------------------------------------------------
#
# Example usage:
#
# set(RAPIDJSON_USE_SSE2 ON)
# set(RAPIDJSON_INCLUDEDIR "/opt/github.com/rjeczalik/rapidjson/include")
#
# find_package(rapidjson REQUIRED)
#
# include_directories("${RAPIDJSON_INCLUDE_DIRS}")
# set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} ${RAPIDJSON_CXX_FLAGS}")
# add_executable(foo foo.cc)
#
# -----------------------------------------------------------------------------------
foreach(opt RAPIDJSON_INCLUDEDIR RAPIDJSON_USE_SSE2 RAPIDJSON_USE_SSE42)
if(${opt} AND DEFINED ENV{${opt}} AND NOT ${opt} STREQUAL "$ENV{${opt}}")
message(WARNING "Conflicting ${opt} values: ignoring environment variable and using CMake cache entry.")
elseif(DEFINED ENV{${opt}} AND NOT ${opt})
set(${opt} "$ENV{${opt}}")
endif()
endforeach()
find_path(
RAPIDJSON_INCLUDE_DIRS
NAMES rapidjson/rapidjson.h
PATHS ${RAPIDJSON_INCLUDEDIR}
DOC "Include directory for the rapidjson library."
)
mark_as_advanced(RAPIDJSON_INCLUDE_DIRS)
if(RAPIDJSON_INCLUDE_DIRS)
set(RAPIDJSON_FOUND TRUE)
endif()
mark_as_advanced(RAPIDJSON_FOUND)
if(RAPIDJSON_USE_SSE42)
set(RAPIDJSON_CXX_FLAGS "-DRAPIDJSON_SSE42")
if(MSVC)
set(RAPIDJSON_CXX_FLAGS "${RAPIDJSON_CXX_FLAGS} /arch:SSE4.2")
else()
set(RAPIDJSON_CXX_FLAGS "${RAPIDJSON_CXX_FLAGS} -msse4.2")
endif()
else()
if(RAPIDJSON_USE_SSE2)
set(RAPIDJSON_CXX_FLAGS "-DRAPIDJSON_SSE2")
if(MSVC)
set(RAPIDJSON_CXX_FLAGS "${RAPIDJSON_CXX_FLAGS} /arch:SSE2")
else()
set(RAPIDJSON_CXX_FLAGS "${RAPIDJSON_CXX_FLAGS} -msse2")
endif()
endif()
endif()
mark_as_advanced(RAPIDJSON_CXX_FLAGS)
if(RAPIDJSON_FOUND)
if(NOT rapidjson_FIND_QUIETLY)
message(STATUS "Found rapidjson header files in ${RAPIDJSON_INCLUDE_DIRS}")
if(DEFINED RAPIDJSON_CXX_FLAGS)
message(STATUS "Found rapidjson C++ extra compilation flags: ${RAPIDJSON_CXX_FLAGS}")
endif()
endif()
elseif(rapidjson_FIND_REQUIRED)
message(FATAL_ERROR "Could not find rapidjson")
else()
message(STATUS "Optional package rapidjson was not found")
endif()
# Try fo find Redox.
# Once done, this will define:
# REDOX_FOUND
# REDOX_INCLUDE_DIRS
# REDOX_LIBRARIES
find_path(
REDOX_INCLUDE_DIR
NAMES redox.hpp
HINTS ${REDOX_ROOT_DIR})
find_library(
REDOX_LIBRARY
NAMES redox
HINTS ${REDOX_ROOT_DIR}
PATH_SUFFIXES ${CMAKE_INSTALL_LIBDIR})
set(REDOX_INCLUDE_DIRS ${REDOX_INCLUDE_DIR})
set(REDOX_LIBRARIES ${REDOX_LIBRARY})
include (FindPackageHandleStandardArgs)
find_package_handle_standard_args(
redox DEFAULT_MSG REDOX_LIBRARY REDOX_INCLUDE_DIR)
mark_as_advanced(REDOX_LIBRARY REDOX_INCLUDE_DIR)
#include "conservative_bf.hpp"
using namespace std;
ConservativeBackfilling::ConservativeBackfilling(Workload *workload, SchedulingDecision *decision,
Queue *queue, ResourceSelector * selector, double rjms_delay, rapidjson::Document *variant_options) :
ISchedulingAlgorithm(workload, decision, queue, selector, rjms_delay, variant_options)
{
}
ConservativeBackfilling::~ConservativeBackfilling()
{
}
void ConservativeBackfilling::on_simulation_start(double date)
{
_schedule = Schedule(_nb_machines, date);
}
void ConservativeBackfilling::on_simulation_end(double date)
{
(void) date;
}
void ConservativeBackfilling::make_decisions(double date,
SortableJobOrder::UpdateInformation *update_info,
SortableJobOrder::CompareInformation *compare_info)
{
// Let's remove finished jobs from the schedule
for (const string & ended_job_id : _jobs_ended_recently)
_schedule.remove_job((*_workload)[ended_job_id]);
// Let's handle recently released jobs
for (const string & new_job_id : _jobs_released_recently)
{
const Job * new_job = (*_workload)[new_job_id];
if (new_job->nb_requested_resources > _nb_machines)
_decision->add_rejection(new_job_id, date);
else
_queue->append_job(new_job, update_info);
}
// Let's update the schedule's present
_schedule.update_first_slice(date);
// Queue sorting
_queue->sort_queue(update_info, compare_info);
// If no resources have been released, we can just insert the new jobs into the schedule
if (_jobs_ended_recently.empty())
{
for (const string & new_job_id : _jobs_released_recently)
{
const Job * new_job = (*_workload)[new_job_id];
Schedule::JobAlloc alloc = _schedule.add_job_first_fit(new_job, _selector);
// If the job should start now, let's say it to the resource manager
if (alloc.started_in_first_slice)
{
_decision->add_allocation(new_job->id, alloc.used_machines, date);
_queue->remove_job(new_job);
}
}
}
else
{
// Since some resources have been freed,
// Let's compress the schedule following conservative backfilling rules:
// For each non running job j
// Remove j from the schedule
// Add j into the schedule
// If j should be executed now
// Take the decision to run j now
for (auto job_it = _queue->begin(); job_it != _queue->end(); )
{
const Job * job = (*job_it)->job;
_schedule.remove_job_if_exists(job);
Schedule::JobAlloc alloc = _schedule.add_job_first_fit(job, _selector);
if (alloc.started_in_first_slice)
{
_decision->add_allocation(job->id, alloc.used_machines, date);
job_it = _queue->remove_job(job_it);
}
else
++job_it;
}
}
}
#pragma once
#include <list>
#include "../isalgorithm.hpp"
#include "../json_workload.hpp"
#include "../locality.hpp"
#include "../schedule.hpp"
class ConservativeBackfilling : public ISchedulingAlgorithm
{
public:
ConservativeBackfilling(Workload * workload, SchedulingDecision * decision, Queue * queue, ResourceSelector * selector,
double rjms_delay, rapidjson::Document * variant_options);
virtual ~ConservativeBackfilling();
virtual void on_simulation_start(double date);
virtual void on_simulation_end(double date);
virtual void make_decisions(double date,
SortableJobOrder::UpdateInformation * update_info,
SortableJobOrder::CompareInformation * compare_info);
private:
Schedule _schedule;
};
#include "easy_bf.hpp"
#include "../pempek_assert.hpp"
using namespace std;
EasyBackfilling::EasyBackfilling(Workload * workload,
SchedulingDecision * decision,
Queue * queue,
ResourceSelector * selector,
double rjms_delay,
rapidjson::Document * variant_options) :
ISchedulingAlgorithm(workload, decision, queue, selector, rjms_delay, variant_options)
{
}
EasyBackfilling::~EasyBackfilling()
{
}
void EasyBackfilling::on_simulation_start(double date)
{
_schedule = Schedule(_nb_machines, date);
}
void EasyBackfilling::on_simulation_end(double date)
{
(void) date;
}
void EasyBackfilling::make_decisions(double date,
SortableJobOrder::UpdateInformation *update_info,
SortableJobOrder::CompareInformation *compare_info)
{
const Job * priority_job_before = _queue->first_job_or_nullptr();
// Let's remove finished jobs from the schedule
for (const string & ended_job_id : _jobs_ended_recently)
_schedule.remove_job((*_workload)[ended_job_id]);
// Let's handle recently released jobs
for (const string & new_job_id : _jobs_released_recently)
{
const Job * new_job = (*_workload)[new_job_id];
if (new_job->nb_requested_resources > _nb_machines)
_decision->add_rejection(new_job_id, date);
else
_queue->append_job(new_job, update_info);
}
// Let's update the schedule's present
_schedule.update_first_slice(date);
// Queue sorting
const Job * priority_job_after = nullptr;
sort_queue_while_handling_priority_job(priority_job_before, priority_job_after, update_info, compare_info);
// If no resources have been released, we can just try to backfill the newly-released jobs
if (_jobs_ended_recently.empty())
{
int nb_available_machines = _schedule.begin()->available_machines.size();
for (unsigned int i = 0; i < _jobs_released_recently.size() && nb_available_machines > 0; ++i)
{
const string & new_job_id = _jobs_released_recently[i];
const Job * new_job = (*_workload)[new_job_id];
// The job could have already been executed by sort_queue_while_handling_priority_job,
// that's why we check whether the queue contains the job.
if (_queue->contains_job(new_job) &&
new_job != priority_job_after &&
new_job->nb_requested_resources <= nb_available_machines)
{
Schedule::JobAlloc alloc = _schedule.add_job_first_fit(new_job, _selector);
if ( alloc.started_in_first_slice)
{
_decision->add_allocation(new_job_id, alloc.used_machines, date);
_queue->remove_job(new_job);
nb_available_machines -= new_job->nb_requested_resources;
}
else
_schedule.remove_job(new_job);
}
}
}
else
{
// Some resources have been released, the whole queue should be traversed.
auto job_it = _queue->begin();
int nb_available_machines = _schedule.begin()->available_machines.size();
// Let's try to backfill all the jobs
while (job_it != _queue->end() && nb_available_machines > 0)
{
const Job * job = (*job_it)->job;
if (_schedule.contains_job(job))
_schedule.remove_job(job);
if (job == priority_job_after) // If the current job is priority
{
Schedule::JobAlloc alloc = _schedule.add_job_first_fit(job, _selector);
if (alloc.started_in_first_slice)
{
_decision->add_allocation(job->id, alloc.used_machines, date);
job_it = _queue->remove_job(job_it); // Updating job_it to remove on traversal
priority_job_after = _queue->first_job_or_nullptr();
}
else
++job_it;
}
else // The job is not priority
{
Schedule::JobAlloc alloc = _schedule.add_job_first_fit(job, _selector);
if (alloc.started_in_first_slice)
{
_decision->add_allocation(job->id, alloc.used_machines, date);
job_it = _queue->remove_job(job_it);
}
else
{
_schedule.remove_job(job);
++job_it;
}
}
}
}
}
void EasyBackfilling::sort_queue_while_handling_priority_job(const Job * priority_job_before,
const Job *& priority_job_after,
SortableJobOrder::UpdateInformation * update_info,
SortableJobOrder::CompareInformation * compare_info)
{
if (_debug)
printf("sort_queue_while_handling_priority_job beginning, %s\n", _schedule.to_string().c_str());
// Let's sort the queue
_queue->sort_queue(update_info, compare_info);
// Let the new priority job be computed
priority_job_after = _queue->first_job_or_nullptr();
// If the priority job has changed
if (priority_job_after != priority_job_before)
{
// If there was a priority job before, let it be removed from the schedule
if (priority_job_before != nullptr)
_schedule.remove_job_if_exists(priority_job_before);
// Let us ensure the priority job is in the schedule.
// To do so, while the priority job can be executed now, we keep on inserting it into the schedule
for (bool could_run_priority_job = true; could_run_priority_job && priority_job_after != nullptr; )
{
could_run_priority_job = false;
// Let's add the priority job into the schedule
Schedule::JobAlloc alloc = _schedule.add_job_first_fit(priority_job_after, _selector);
if (alloc.started_in_first_slice)
{
_decision->add_allocation(priority_job_after->id, alloc.used_machines, (double)update_info->current_date);
_queue->remove_job(priority_job_after);
priority_job_after = _queue->first_job_or_nullptr();
could_run_priority_job = true;
}
}
}
if (_debug)
printf("sort_queue_while_handling_priority_job ending, %s\n", _schedule.to_string().c_str());
}
#pragma once
#include <list>
#include "../isalgorithm.hpp"
#include "../json_workload.hpp"
#include "../locality.hpp"
#include "../schedule.hpp"
class EasyBackfilling : public ISchedulingAlgorithm
{
public:
EasyBackfilling(Workload * workload, SchedulingDecision * decision, Queue * queue, ResourceSelector * selector,
double rjms_delay, rapidjson::Document * variant_options);
virtual ~EasyBackfilling();
virtual void on_simulation_start(double date);
virtual void on_simulation_end(double date);
virtual void make_decisions(double date,
SortableJobOrder::UpdateInformation * update_info,
SortableJobOrder::CompareInformation * compare_info);
void sort_queue_while_handling_priority_job(const Job * priority_job_before,
const Job *& priority_job_after,
SortableJobOrder::UpdateInformation * update_info,
SortableJobOrder::CompareInformation * compare_info);
protected:
Schedule _schedule;
bool _debug = false;
};
#include "easy_bf_plot_liquid_load_horizon.hpp"
#include "../pempek_assert.hpp"
using namespace std;
EasyBackfillingPlotLiquidLoadHorizon::EasyBackfillingPlotLiquidLoadHorizon(Workload * workload,
SchedulingDecision * decision,
Queue * queue,
ResourceSelector * selector,
double rjms_delay,
rapidjson::Document * variant_options) :
EasyBackfilling(workload, decision, queue, selector, rjms_delay, variant_options)
{
PPK_ASSERT_ERROR(variant_options->HasMember("output_filename"),
"Invalid options JSON object: Member 'output_filename' cannot be found");
PPK_ASSERT_ERROR((*variant_options)["output_filename"].IsString(),
"Invalid options JSON object: Member 'output_filename' must be a string");
string output_filename = (*variant_options)["output_filename"].GetString();
_output_file.open(output_filename);
PPK_ASSERT_ERROR(_output_file.is_open(), "Couldn't open file %s", output_filename.c_str());
string buf = "date,nb_jobs_in_queue,load_in_queue,liquid_load_horizon\n";
//string buf = "date,nb_jobs_in_queue,load_in_queue,liquid_load_horizon,qt_mean_wt\n";
_output_file.write(buf.c_str(), buf.size());
}
EasyBackfillingPlotLiquidLoadHorizon::~EasyBackfillingPlotLiquidLoadHorizon()
{
_output_file.close();
}
void EasyBackfillingPlotLiquidLoadHorizon::make_decisions(double date,
SortableJobOrder::UpdateInformation *update_info,
SortableJobOrder::CompareInformation *compare_info)
{
EasyBackfilling::make_decisions(date, update_info, compare_info);
write_current_metrics_in_file(date);
}
void EasyBackfillingPlotLiquidLoadHorizon::write_current_metrics_in_file(double date)
{
Rational liquid_load_horizon = compute_liquid_load_horizon(_schedule, _queue, date);
/*Rational queueing_theory_period = 60*60*24*10;
estimator.remove_old(date - queueing_theory_period);
Rational qt_mean_wt = estimator.estimate_waiting_time(queueing_theory_period);*/
const int buf_size = 256;
int nb_printed;
char * buf = (char *) malloc(sizeof(char) * buf_size);
nb_printed = snprintf(buf, buf_size, "%g,%d,%g,%g\n", date, _queue->nb_jobs(),
(double) _queue->compute_load_estimation(),
(double) liquid_load_horizon);
//(double) qt_mean_wt);
PPK_ASSERT_ERROR(nb_printed < buf_size - 1,
"Buffer too small, some information might have been lost!");
_output_file.write(buf, strlen(buf));
free(buf);
}
Rational EasyBackfillingPlotLiquidLoadHorizon::compute_liquid_load_horizon(const Schedule &schedule,
const Queue *queue,
Rational starting_time)
{
// Let's check whether the starting_time is valid
PPK_ASSERT_ERROR(starting_time >= schedule.first_slice_begin());
PPK_ASSERT_ERROR(starting_time < schedule.infinite_horizon());
// Let's compute the total load (area) in the queue
Rational load_to_distribute = queue->compute_load_estimation();
// Let's fill the queue load into the schedule by fluidifying it
auto slice_it = schedule.find_last_time_slice_before_date(starting_time, false);
Rational current_time = starting_time;
while (load_to_distribute > 0 && slice_it != schedule.end())
{
const Schedule::TimeSlice & slice = *slice_it;
// If the starting time is in the middle of the schedule, the whole
// time slice length is not to be considered.
Rational amount_of_time_to_consider = max(Rational(0), slice.end - max(starting_time, slice.begin));
Rational slice_empty_area = slice.available_machines.size() * amount_of_time_to_consider;
if (slice_empty_area <= load_to_distribute)
{
load_to_distribute -= slice_empty_area;
current_time = slice.end;
++slice_it;
}
else
{
PPK_ASSERT_ERROR(slice.available_machines.size() > 0);
Rational amount_of_time_needed_to_fill_last_slice = load_to_distribute / slice.available_machines.size();
current_time += amount_of_time_needed_to_fill_last_slice;
load_to_distribute = 0;
}
}
// Degenerate case: all the machines are probably in a sleep state
if (load_to_distribute > 0)
current_time = schedule.infinite_horizon();
Rational ret_value = current_time - starting_time;
PPK_ASSERT_ERROR(ret_value >= 0);
return ret_value;
}
#pragma once
#include <fstream>
#include "easy_bf.hpp"
#include "../queueing_theory_waiting_time_estimator.hpp"
class EasyBackfillingPlotLiquidLoadHorizon : public EasyBackfilling
{
public:
EasyBackfillingPlotLiquidLoadHorizon(Workload * workload,
SchedulingDecision * decision,
Queue * queue,
ResourceSelector * selector,
double rjms_delay,
rapidjson::Document * variant_options);
virtual ~EasyBackfillingPlotLiquidLoadHorizon();
virtual void make_decisions(double date,
SortableJobOrder::UpdateInformation * update_info,
SortableJobOrder::CompareInformation * compare_info);
public:
void write_current_metrics_in_file(double date);
public:
static Rational compute_liquid_load_horizon(const Schedule & schedule,
const Queue * queue,
Rational starting_time);
private:
std::ofstream _output_file;
//QueueingTheoryWaitingTimeEstimator estimator;
};
This diff is collapsed.
#pragma once
#include <list>
#include <map>
#include "../isalgorithm.hpp"
#include "../json_workload.hpp"
#include "../locality.hpp"
#include "../schedule.hpp"
class EnergyBackfilling : public ISchedulingAlgorithm
{
public:
enum MachineState
{
AWAKE,
ASLEEP,
SWITCHING_OFF,
SWITCHING_ON
};
struct ScheduleMetrics
{
Rational makespan = 0;
Rational mean_waiting_time = 0;
Rational max_waiting_time = 0;
Rational mean_turnaround_time = 0;
Rational max_turnaround_time = 0;
Rational mean_slowdown = 0;
Rational max_slowdown = 0;
Rational mean_bounded_slowdown = 0;
Rational max_bounded_slowdown = 0;
};
std::string machine_state_to_string(const MachineState & state);
struct MachineInformation
{
MachineInformation(int machine_number);
~MachineInformation();
void create_jobs(double rjms_delay,
Rational ensured_sleep_time_lower_bound,
Rational ensured_sleep_time_upper_bound);
void free_jobs();
private:
void create_selector();
void free_selector();
public:
int machine_number = -1; //! The machine number the MachineInformation corresponds to
LimitedRangeResourceSelector * limited_resource_selector = nullptr;
MachineState state = AWAKE;
int compute_pstate = 0;
int sleep_pstate = 1;
Rational compute_epower = 0;
Rational idle_epower = 0;
Rational sleep_epower = 0;
Rational switch_on_seconds = 0;
Rational switch_on_energy = 0;
Rational switch_on_electrical_power = 0;
Rational switch_off_seconds = 0;
Rational switch_off_energy = 0;
Rational switch_off_electrical_power = 0;
Job * switch_on_job = nullptr; //! This job corresponds to the switching ON state of the machine
Job * switch_off_job = nullptr; //! This job corresponds to the switching OFF state of the machine
Job * ensured_sleep_job = nullptr; //! This job corresponds to the sleeping state of the machine. It cannot be stopped. It is used to avoid pure loss of energy via too frequent switches OFF and ON
Job * potential_sleep_job = nullptr; //! This job corresponds to the sleeping state of the machine. It can be stopped if a job cannot fit in the schedule otherwise
};
public:
EnergyBackfilling(Workload * workload, SchedulingDecision * decision, Queue * queue, ResourceSelector * selector,
double rjms_delay, rapidjson::Document * variant_options);
virtual ~EnergyBackfilling();
virtual void on_simulation_start(double date);
virtual void on_simulation_end(double date);
virtual void on_machine_state_changed(double date, MachineRange machines, int newState);
virtual void on_nop(double date);
virtual void make_decisions(double date,
SortableJobOrder::UpdateInformation * update_info,
SortableJobOrder::CompareInformation * compare_info);
protected:
void generate_machine_informations(int nb_machines);
void clear_machine_informations();
void make_decisions_of_schedule(const Schedule & schedule, bool run_nop_me_later_on_nothing_to_do = true);
void update_first_slice_taking_sleep_jobs_into_account(Rational date);
void put_jobs_into_schedule(Schedule & schedule) const;
/**
* @brief Sedates machines as soon as they remain idle.
* @param[in] schedule The Schedule into which the sedatings should be done
* @param[in] machines_to_sedate The machines to sedate
* @return The earliest sedating decision date
*/
Rational sedate_machines_at_the_furthest_moment(Schedule & schedule,
const MachineRange & machines_to_sedate) const;
void sedate_machine(Schedule & schedule,
int machine_id,
std::list<Schedule::TimeSlice>::iterator time_slice,
bool insert_in_slice = true) const;
void sedate_machine_without_switch(Schedule & schedule,
int machine_id,
Rational when_it_should_start) const;
void awaken_machine(Schedule & schedule,
int machine_id,
Rational awakening_date) const;
/**
* @brief Awakens a machine as soon as possible
* @param[in] schedule The Schedule into which the awakening should be done
* @param[in] machine_id The machine to awaken
* @return The moment at which the machine should be awakened
*/
Rational awaken_machine_as_soon_as_possible(Schedule & schedule,
int machine_id) const;
ScheduleMetrics compute_metrics_of_schedule(const Schedule & schedule, Rational min_job_length = 30) const;
/**
* @brief Computes the machines that can be awakened inside a given time slice.
* @details These machines are the ones on which potential sleep jobs are allocated
* @param[in] time_slice The time slice
* @return The machines that can be awaked inside the given time slice
*/
static MachineRange compute_potentially_awaken_machines(const Schedule::TimeSlice & time_slice);
static MachineRange compute_sleeping_machines(const Schedule::TimeSlice & time_slice);
Rational find_earliest_moment_to_awaken_machines(Schedule & schedule, const MachineRange & machines_to_awaken) const;
Rational estimate_energy_of_schedule(const Schedule & schedule, Rational horizon) const;
static bool is_switch_on_job(const std::string & job_id);
static bool is_switch_off_job(const std::string & job_id);
static bool is_ensured_sleep_job(const std::string & job_id);
static bool is_potential_sleep_job(const std::string & job_id);
static bool is_fake_job(const std::string & job_id);
protected:
Schedule _schedule;
bool _debug = false;
int _nb_nop_me_later_running = 0;
int _nb_jobs_submitted = 0;
int _nb_jobs_completed = 0;
std::map<int, MachineInformation*> _machine_informations;
MachineRange _all_machines; //!< All the machines that can be used for computing jobs
MachineRange _switching_on_machines; //!< The machines currently being switched ON
MachineRange _switching_off_machines; //!< The machines currently being switched OFF
MachineRange _asleep_machines; //!< The machines currently in a sleepy state. They cannot be used to compute jobs now. This is the union of _wakable_asleep_machines and _non_wakable_asleep_machines
MachineRange _wakable_asleep_machines; //!< Subset of _asleep_machines. Those machines have been sleeping for enough time, they can be awakened.
MachineRange _non_wakable_asleep_machines; //!< Subset of _asleep_machines. Those machines have NOT been sleeping for enough time, they cannot be awakened yet.
MachineRange _awake_machines; //!< The machines currently in a computation pstate. They can represent the machines which compute jobs, or idle machines.
};
#include "energy_bf_dicho.hpp"
#include <iostream>
#include "../pempek_assert.hpp"
using namespace std;
EnergyBackfillingDichotomy::EnergyBackfillingDichotomy(Workload *workload, SchedulingDecision *decision, Queue *queue,
ResourceSelector *selector, double rjms_delay,
rapidjson::Document *variant_options) :
EnergyBackfilling(workload, decision, queue, selector, rjms_delay, variant_options)
{
_str_to_comparison_type["switch_on"] = SWITCH_ON;
_str_to_comparison_type["remove_sleep_jobs"] = REMOVE_SLEEP_JOBS;
// Let's get the tolerated slowdown loss ratio from the variant options
PPK_ASSERT_ERROR(variant_options->HasMember("tolerated_slowdown_loss_ratio"),
"Invalid options JSON object: Member 'tolerated_slowdown_loss_ratio' cannot be found");
PPK_ASSERT_ERROR((*variant_options)["tolerated_slowdown_loss_ratio"].IsNumber(),
"Invalid options JSON object: Member 'tolerated_slowdown_loss_ratio' should be a number");
_tolerated_slowdown_loss_ratio = (*variant_options)["tolerated_slowdown_loss_ratio"].GetDouble();
PPK_ASSERT_ERROR(_tolerated_slowdown_loss_ratio >= 0,
"Invalid options JSON object: Member 'tolerated_slowdown_loss_ratio' should be positive or null (got %g)",
(double) _tolerated_slowdown_loss_ratio);
if (variant_options->HasMember("comparison_type"))
{
cout << "Comparison type found in options!" << endl;
PPK_ASSERT_ERROR((*variant_options)["comparison_type"].IsString(),
"Invalid options JSON object: Member 'comparison_type' should be a string");
string comp_type = (*variant_options)["comparison_type"].GetString();
PPK_ASSERT_ERROR(_str_to_comparison_type.count(comp_type) == 1,
"Invalid options JSON object: invalid 'comparison_type' value (%s)",
comp_type.c_str());
_comparison_type = _str_to_comparison_type.at(comp_type);
}
}
EnergyBackfillingDichotomy::~EnergyBackfillingDichotomy()
{
}
void EnergyBackfillingDichotomy::make_decisions(double date,
SortableJobOrder::UpdateInformation *update_info,
SortableJobOrder::CompareInformation *compare_info)
{
// Let's remove finished jobs from the schedule
for (const string & ended_job_id : _jobs_ended_recently)
{
const Job * ended_job = (*_workload)[ended_job_id];
++_nb_jobs_completed;
PPK_ASSERT_ERROR(_schedule.contains_job(ended_job),
"Invalid schedule: job '%s' just finished, "
"but it not in the schedule...\n%s",
ended_job_id.c_str(), _schedule.to_string().c_str());
PPK_ASSERT_ERROR(!_queue->contains_job(ended_job),
"Job '%s' just ended, but it is still in the "
"queue...\nQueue : %s",
ended_job_id.c_str(),
_queue->to_string().c_str());
// Let's remove the finished job from the schedule
_schedule.remove_job(ended_job);
}
// Let's handle recently released jobs
for (const string & new_job_id : _jobs_released_recently)
{
const Job * new_job = (*_workload)[new_job_id];
++_nb_jobs_submitted;
PPK_ASSERT_ERROR(!_schedule.contains_job(new_job),
"Invalid schedule: job '%s' already exists.\n%s",
new_job->id.c_str(), _schedule.to_string().c_str());
PPK_ASSERT_ERROR(!_queue->contains_job(new_job),
"Job '%s' is already in the queue!\nQueue:%s",
new_job->id.c_str(), _queue->to_string().c_str());
if (new_job->nb_requested_resources > _nb_machines)
{
_decision->add_rejection(new_job_id, date);
++_nb_jobs_completed;
}
else
_queue->append_job(new_job, update_info);
}
// Let's update the first slice of the schedule
update_first_slice_taking_sleep_jobs_into_account(date);
// Let's update the sorting of the queue
_queue->sort_queue(update_info, compare_info);
// Let's find the schedule which:
// - minimises the estimated energy consumption
// - respects the slowdown constraint: not worse in avg bds (for jobs in schedule) than
// tolerated_slowdown_loss_ratio times the bds of the schedule in which all nodes are awaken first.
// *******************************************************************
// Let's compute global informations about the current online schedule
// *******************************************************************
const Schedule::TimeSlice & online_first_slice = *_schedule.begin();
MachineRange sleeping_machines = compute_sleeping_machines(online_first_slice);
MachineRange awakenable_sleeping_machines = compute_potentially_awaken_machines(online_first_slice);
const MachineRange & available_machines = online_first_slice.available_machines;
// ****************************************************************
// Let's first compute the schedule in which all nodes are awakened
// ****************************************************************
Schedule awakened_schedule = _schedule;
if (_comparison_type == SWITCH_ON)
{
// Let's awaken every machine at the earliest moment for each machine
for (auto machine_it = sleeping_machines.elements_begin(); machine_it != sleeping_machines.elements_end(); ++machine_it)
{
int machine_id = *machine_it;
Rational wake_up_moment = find_earliest_moment_to_awaken_machines(awakened_schedule, MachineRange(machine_id));
awaken_machine(awakened_schedule, machine_id, wake_up_moment);
}
}
else if (_comparison_type == REMOVE_SLEEP_JOBS)
{
// Let's simply remove sleep jobs from the schedule!
for (auto machine_it = sleeping_machines.elements_begin(); machine_it != sleeping_machines.elements_end(); ++machine_it)
{
int machine_id = *machine_it;
MachineInformation *minfo = _machine_informations.at(machine_id);
while (awakened_schedule.remove_job_if_exists(minfo->ensured_sleep_job));
while (awakened_schedule.remove_job_if_exists(minfo->potential_sleep_job));
}
}
// Let's add jobs into the schedule
put_jobs_into_schedule(awakened_schedule);
// *********************************************************************************
// Let's determine if the online schedule respects our avg bds constraint.
// If so, we should try to sedate machines. Otherwise, we should try to awaken some.
// *********************************************************************************
Schedule online_schedule = _schedule;
put_jobs_into_schedule(online_schedule);
//Rational online_awakened_comparison_horizon = max(awakened_schedule.finite_horizon(), online_schedule.finite_horizon());
// Let's compute the jobs metrics and energy of the two schedules
ScheduleMetrics online_sched_metrics = compute_metrics_of_schedule(online_schedule);
ScheduleMetrics awakened_sched_metrics = compute_metrics_of_schedule(awakened_schedule);
//Rational online_sched_energy = estimate_energy_of_schedule(online_schedule, online_awakened_comparison_horizon);
//Rational awakened_sched_energy = estimate_energy_of_schedule(awakened_schedule, online_awakened_comparison_horizon);
bool should_sedate_machines = false;
if (online_sched_metrics.mean_slowdown <= awakened_sched_metrics.mean_slowdown * _tolerated_slowdown_loss_ratio)
should_sedate_machines = true;
// *****************************************************************************************
// Let's do a dichotomy on the number of machines to awaken/sedate to find the best solution
// *****************************************************************************************
cout << "should_sedate_machines " << should_sedate_machines << endl;
if (should_sedate_machines)
{
// Sleeping machines are not available since they compute "fake" jobs
MachineRange sedatable_machines = available_machines;
int nb_to_sedate_min = 1; // Online schedule is nb_to_sedate = 0
int nb_to_sedate_max = sedatable_machines.size();
Schedule best_schedule = online_schedule;
ScheduleMetrics best_sched_metrics = online_sched_metrics;
//Rational best_sched_energy = online_sched_energy;
int best_nb_to_sedate = 0;
// Dichotomy
while (nb_to_sedate_min < nb_to_sedate_max)
{
// Select machines to sedate
int nb_to_sedate = (nb_to_sedate_min + nb_to_sedate_max) / 2;
MachineRange machines_to_sedate;
_selector->select_resources_to_sedate(nb_to_sedate, available_machines, sedatable_machines, machines_to_sedate);
// Create the schedule with the desired sedated machines
Schedule schedule = _schedule;
for (auto machine_it = machines_to_sedate.elements_begin(); machine_it != machines_to_sedate.elements_end(); ++machine_it)
{
int machine_id = *machine_it;
sedate_machine(schedule, machine_id, schedule.begin());
}
put_jobs_into_schedule(schedule);
// Let's compute jobs metrics about the current schedule
ScheduleMetrics sched_metrics = compute_metrics_of_schedule(schedule);
// If the schedule respects the avg slowdown constraint
if (sched_metrics.mean_slowdown <= awakened_sched_metrics.mean_slowdown * _tolerated_slowdown_loss_ratio)
{
// Let's compute the energy of both schedules to compare them
Rational comparison_horizon = max(best_schedule.finite_horizon(), schedule.finite_horizon());
Rational best_sched_energy = estimate_energy_of_schedule(best_schedule, comparison_horizon);
Rational sched_energy = estimate_energy_of_schedule(schedule, comparison_horizon);
cout << "Current schedule respects the mean slowdown constraint. "
<< "(best_energy, curr_energy) : (" << (double) best_sched_energy
<< ", " << (double)sched_energy << ")" << endl;
// Let's update the best solution if needed
if ((sched_energy < best_sched_energy) ||
((sched_energy == best_sched_energy) && (nb_to_sedate > best_nb_to_sedate)))
{
best_schedule = schedule;
best_sched_metrics = sched_metrics;
best_sched_energy = sched_energy;
best_nb_to_sedate = nb_to_sedate;
}
nb_to_sedate_min = nb_to_sedate + 1;
}
else
nb_to_sedate_max = nb_to_sedate - 1;
}
// Let's apply the decisions of the best schedule
make_decisions_of_schedule(best_schedule);
}
else
{
int nb_to_awaken_min = 1; // online schedule is nb_to_awaken = 0
int nb_to_awaken_max = awakenable_sleeping_machines.size();
Schedule best_schedule = online_schedule;
ScheduleMetrics best_sched_metrics = online_sched_metrics;
int best_nb_to_awaken = 0;
// Dichotomy
while (nb_to_awaken_min < nb_to_awaken_max)
{
// Select machines to awaken
int nb_to_awaken = (nb_to_awaken_min + nb_to_awaken_max) / 2;
MachineRange machines_to_awaken;
_selector->select_resources_to_awaken(nb_to_awaken, available_machines, awakenable_sleeping_machines, machines_to_awaken);
// Create the schedule with the desired awakened machines
Schedule schedule = _schedule;
for (auto machine_it = machines_to_awaken.elements_begin(); machine_it != machines_to_awaken.elements_end(); ++machine_it)
{
int machine_id = *machine_it;
Rational wake_up_date = find_earliest_moment_to_awaken_machines(schedule, MachineRange(machine_id));
awaken_machine(schedule, machine_id, wake_up_date);
}
put_jobs_into_schedule(schedule);
ScheduleMetrics sched_metrics = compute_metrics_of_schedule(schedule);
// If the schedule respects our avg slowdown constraint
if (sched_metrics.mean_slowdown <= awakened_sched_metrics.mean_slowdown * _tolerated_slowdown_loss_ratio)
{
// Let's compute the energy of both schedules to compare them
Rational comparison_horizon = max(best_schedule.finite_horizon(), schedule.finite_horizon());
Rational best_sched_energy = estimate_energy_of_schedule(best_schedule, comparison_horizon);
Rational sched_energy = estimate_energy_of_schedule(schedule, comparison_horizon);
cout << "Current schedule respects the mean slowdown constraint. "
<< "(best_energy, curr_energy) : (" << (double) best_sched_energy
<< ", " << (double)sched_energy << ")" << endl;
// Let's update the best solution if needed
if ((sched_energy < best_sched_energy) ||
((sched_energy == best_sched_energy) && (nb_to_awaken < best_nb_to_awaken)))
{
best_schedule = schedule;
best_sched_metrics = sched_metrics;
best_nb_to_awaken = nb_to_awaken;
}
nb_to_awaken_max = nb_to_awaken - 1;
}
else
nb_to_awaken_min = nb_to_awaken + 1;
}
// Let's apply the decisions of the best schedule
make_decisions_of_schedule(best_schedule);
}
}
#pragma once
#include "energy_bf.hpp"
class EnergyBackfillingDichotomy : public EnergyBackfilling
{
enum AwakeningComparisonType
{
SWITCH_ON,
REMOVE_SLEEP_JOBS
};
public:
EnergyBackfillingDichotomy(Workload * workload, SchedulingDecision * decision, Queue * queue, ResourceSelector * selector,
double rjms_delay, rapidjson::Document * variant_options);
virtual ~EnergyBackfillingDichotomy();
protected:
virtual void make_decisions(double date,
SortableJobOrder::UpdateInformation * update_info,
SortableJobOrder::CompareInformation * compare_info);
private:
Rational _tolerated_slowdown_loss_ratio;
AwakeningComparisonType _comparison_type = REMOVE_SLEEP_JOBS;
std::map<std::string, AwakeningComparisonType> _str_to_comparison_type;
};
#include "energy_bf_idle_sleeper.hpp"
#include "../pempek_assert.hpp"
using namespace std;
EnergyBackfillingIdleSleeper::EnergyBackfillingIdleSleeper(Workload *workload,
SchedulingDecision *decision,
Queue *queue,
ResourceSelector *selector,
double rjms_delay,
rapidjson::Document *variant_options) :
EnergyBackfillingMonitoringInertialShutdown(workload, decision, queue, selector, rjms_delay, variant_options)
{
}
EnergyBackfillingIdleSleeper::~EnergyBackfillingIdleSleeper()
{
}
void EnergyBackfillingIdleSleeper::on_monitoring_stage(double date)
{
update_first_slice_taking_sleep_jobs_into_account(date);
_inertial_schedule = _schedule;
update_idle_states(date, _inertial_schedule, _all_machines, _idle_machines,
_machines_idle_start_date);
make_idle_sleep_decisions(date);
}
void EnergyBackfillingIdleSleeper::select_idle_machines_to_sedate(Rational current_date,
const MachineRange &idle_machines,
const MachineRange &machines_awake_soon,
const Job *priority_job,
const std::map<int, Rational> idle_machines_start_date,
Rational minimum_idle_time_to_sedate,
MachineRange &machines_to_sedate)
{
int nb_awake_soon = machines_awake_soon.size();
int nb_needed_for_priority_job = 0;
if (priority_job != nullptr)
nb_needed_for_priority_job = priority_job->nb_requested_resources;
Rational sedate_thresh = current_date - minimum_idle_time_to_sedate;
MachineRange sedatable_idle_machines;
for (auto machine_it = idle_machines.elements_begin();
machine_it != idle_machines.elements_end();
++machine_it)
{
const int machine_id = *machine_it;
if (idle_machines_start_date.at(machine_id) <= sedate_thresh)
sedatable_idle_machines.insert(machine_id);
}
// To avoid blocking the priority job with switches OFF, let's reduce
// the number of machines to switch OFF if needed.
int nb_machines_to_sedate = max(0, min(nb_awake_soon - nb_needed_for_priority_job,
(int)sedatable_idle_machines.size()));
machines_to_sedate.clear();
if (nb_machines_to_sedate > 0)
machines_to_sedate = sedatable_idle_machines.left(nb_machines_to_sedate);
}
void EnergyBackfillingIdleSleeper::select_idle_machines_to_awaken(const Queue *queue,
const Schedule &schedule,
ResourceSelector * priority_job_selector,
const MachineRange &idle_machines,
AwakeningPolicy policy,
int maximum_nb_machines_to_awaken,
MachineRange &machines_to_awaken)
{
PPK_ASSERT_ERROR(maximum_nb_machines_to_awaken >= 0);
machines_to_awaken.clear();
// If there are no jobs to compute, there is nothing more to do.
if (queue->nb_jobs() <= 0)
return;
// If we only awaken for the priority job, this is already done by Inertial Shutdown.
if (policy == AWAKEN_FOR_PRIORITY_JOB_ONLY)
return;
Schedule schedule_copy = schedule;
// Let's try to backfill some jobs into the awakenable machines, and wake them up if needed.
MachineRange awakable_machines = compute_potentially_awaken_machines(*schedule_copy.begin());
if (awakable_machines.size() > (unsigned int)maximum_nb_machines_to_awaken)
awakable_machines = awakable_machines.left(maximum_nb_machines_to_awaken);
MachineRange usable_machines = idle_machines + awakable_machines;
MachineRange usable_idle_machines = idle_machines;
// Let's find the priority job and related stuff to avoid penalizing the priority job
const Job * priority_job;
bool priority_job_needs_awakenings;
Schedule::JobAlloc priority_job_alloc;
MachineRange priority_job_reserved_machines;
MachineRange machines_that_can_be_used_by_the_priority_job;
compute_priority_job_and_related_stuff(schedule_copy, queue, priority_job,
priority_job_selector,
priority_job_needs_awakenings,
priority_job_alloc,
priority_job_reserved_machines,
machines_that_can_be_used_by_the_priority_job);
if (policy == AWAKEN_FOR_ALL_JOBS_RESPECTING_PRIORITY_JOB)
{
// Let's remove the priority_job_reserved_machines from the usable_machines
usable_idle_machines -= priority_job_reserved_machines;
usable_machines -= priority_job_reserved_machines;
}
auto job_it = queue->begin();
while (usable_machines.size() > 0 && job_it != queue->end())
{
const Job * job = (*job_it)->job;
if (job->nb_requested_resources <= (int)usable_machines.size())
{
MachineRange machines_to_use_for_this_job;
if (usable_idle_machines.size() > 0)
{
int nb_idle_machines_to_use = min((int)usable_idle_machines.size(), job->nb_requested_resources);
machines_to_use_for_this_job = usable_idle_machines.left(nb_idle_machines_to_use);
usable_idle_machines -= machines_to_use_for_this_job;
}
machines_to_use_for_this_job += usable_machines.left(job->nb_requested_resources - (int)machines_to_use_for_this_job.size());
MachineRange machines_to_awaken_for_this_job = (awakable_machines & machines_to_use_for_this_job);
usable_machines -= machines_to_use_for_this_job;
awakable_machines -= machines_to_awaken_for_this_job;
machines_to_awaken += machines_to_awaken_for_this_job;
}
++job_it;
}
PPK_ASSERT_ERROR((int)machines_to_awaken.size() <= maximum_nb_machines_to_awaken);
}
void EnergyBackfillingIdleSleeper::update_idle_states(Rational current_date,
const Schedule & schedule,
const MachineRange & all_machines,
MachineRange & idle_machines,
map<int,Rational> & machines_idle_start_date)
{
PPK_ASSERT_ERROR(schedule.nb_slices() > 0);
PPK_ASSERT_ERROR(schedule.first_slice_begin() == Rational(current_date));
const Schedule::TimeSlice & slice = *schedule.begin();
MachineRange machines_newly_available = (slice.available_machines & (all_machines - idle_machines));
for (auto machine_it = machines_newly_available.elements_begin();
machine_it != machines_newly_available.elements_end();
++machine_it)
{
int machine_id = *machine_it;
machines_idle_start_date[machine_id] = current_date;
}
MachineRange machines_newly_busy = ((all_machines - slice.available_machines) & idle_machines);
for (auto machine_it = machines_newly_busy.elements_begin();
machine_it != machines_newly_busy.elements_end();
++machine_it)
{
int machine_id = *machine_it;
machines_idle_start_date[machine_id] = schedule.infinite_horizon();
}
PPK_ASSERT_ERROR((machines_newly_available & machines_newly_busy) == MachineRange::empty_range(),
"machines_newly_available=%s. machines_newly_busy=%s",
machines_newly_available.to_string_brackets().c_str(),
machines_newly_busy.to_string_brackets().c_str());
PPK_ASSERT_ERROR((machines_newly_available & idle_machines) == MachineRange::empty_range(),
"machines_newly_available=%s. _idle_machines=%s",
machines_newly_available.to_string_brackets().c_str(),
idle_machines.to_string_brackets().c_str());
PPK_ASSERT_ERROR((machines_newly_busy & idle_machines) == machines_newly_busy,
"machines_newly_busy=%s. _idle_machines=%s",
machines_newly_busy.to_string_brackets().c_str(),
idle_machines.to_string_brackets().c_str());
idle_machines += machines_newly_available;
idle_machines -= machines_newly_busy;
}
void EnergyBackfillingIdleSleeper::make_idle_sleep_decisions(double date)
{
const Job * priority_job = _queue->first_job_or_nullptr();
MachineRange machines_awake_soon = _awake_machines + _switching_on_machines + _machines_to_sedate;
MachineRange machines_to_sedate;
select_idle_machines_to_sedate(date, _idle_machines, machines_awake_soon,
priority_job, _machines_idle_start_date,
_needed_amount_of_idle_time_to_be_sedated,
machines_to_sedate);
if (machines_to_sedate.size() > 0)
{
_machines_to_sedate += machines_to_sedate;
_nb_machines_sedated_for_being_idle += machines_to_sedate.size();
MachineRange machines_sedated_this_turn, machines_awakened_this_turn;
handle_queued_switches(_inertial_schedule, _machines_to_sedate, _machines_to_awaken,
machines_sedated_this_turn, machines_awakened_this_turn);
if (machines_sedated_this_turn.size() > 0)
printf("Date=%g. Those machines should be put to sleep now: %s\n",
date, machines_sedated_this_turn.to_string_brackets().c_str());
if (machines_awakened_this_turn.size() > 0)
printf("Date=%g. Those machines should be awakened now: %s\n",
date, machines_awakened_this_turn.to_string_brackets().c_str());
_machines_to_sedate -= machines_sedated_this_turn;
_machines_to_awaken -= machines_awakened_this_turn;
_machines_sedated_since_last_monitoring_stage_inertia = machines_sedated_this_turn;
_machines_awakened_since_last_monitoring_stage_inertia = machines_awakened_this_turn;
PPK_ASSERT_ERROR((_machines_to_awaken & _machines_to_sedate) == MachineRange::empty_range());
make_decisions_of_schedule(_inertial_schedule, false);
}
}
#pragma once
#include "energy_bf_monitoring_inertial_shutdown.hpp"
#include <map>
class EnergyBackfillingIdleSleeper : public EnergyBackfillingMonitoringInertialShutdown
{
public:
enum AwakeningPolicy
{
AWAKEN_FOR_PRIORITY_JOB_ONLY,
AWAKEN_FOR_ALL_JOBS_WITHOUT_RESPECTING_PRIORITY_JOB,
AWAKEN_FOR_ALL_JOBS_RESPECTING_PRIORITY_JOB
};
public:
EnergyBackfillingIdleSleeper(Workload * workload, SchedulingDecision * decision,
Queue * queue, ResourceSelector * selector,
double rjms_delay, rapidjson::Document * variant_options);
virtual ~EnergyBackfillingIdleSleeper();
virtual void on_monitoring_stage(double date);
public:
/**
* @brief Selects which machines should be sedated for being idle for too long
* @param[in] current_date The current date
* @param[in] idle_machines The machines currently idle
* @param[in] machines_awake_soon The machines that will be awake soon (those awake now and those switching ON)
* @param[in] priority_job The priority job (nullptr if there is no priority job)
* @param[in] idle_machines_start_date A map which associates machine_ids to the starting time of their idle state
* @param[in] minimum_idle_time_to_sedate The machines must be idle for a longer time than idle_sedate_thresh to be sedated
* @param[out] machines_to_sedate The machines that can be sedated for being idle for too long (output parameter)
*/
static void select_idle_machines_to_sedate(Rational current_date,
const MachineRange & idle_machines,
const MachineRange & machines_awake_soon,
const Job * priority_job,
const std::map<int, Rational> idle_machines_start_date,
Rational minimum_idle_time_to_sedate,
MachineRange & machines_to_sedate);
/**
* @brief Selects which machines should be awakened to compute some jobs
* @param[in] queue The queue which contains jobs (or not)
* @param[in] schedule The current schedule. It should not be modified by this function.
* @param[in,out] priority_job_selector The ResourceSelector to insert the priority_job into the schedule
* @param[in] idle_machines The machines currently idle
* @param[in] policy The AwakeningPolicy to apply
* @param[in] maximum_nb_machines_to_awaken The maximum number of machines to awaken for this call
* @param[out] machines_to_awaken The machines that can be awakened to execute jobs (output parameter)
*/
static void select_idle_machines_to_awaken(const Queue *queue,
const Schedule &schedule,
ResourceSelector * priority_job_selector,
const MachineRange &idle_machines,
AwakeningPolicy policy,
int maximum_nb_machines_to_awaken,
MachineRange &machines_to_awaken);
/**
* @brief Updates whichever machines are idle and their idle starting time if needed,
* based on the first slice of the given schedule
* @param[in] current_date The current date
* @param[in] schedule The current schedule. Its first slice must reflect the platform usage
* @param[in] all_machines All computing machines
* @param[in,out] idle_machines The machines currently being idle
* @param[in,out] machines_idle_start_date The starting time of the idle period of the idle machines
*/
static void update_idle_states(Rational current_date,
const Schedule & schedule,
const MachineRange & all_machines,
MachineRange & idle_machines,
std::map<int,Rational> & machines_idle_start_date);
protected:
void make_idle_sleep_decisions(double date);
};
#include "energy_bf_machine_subpart_sleeper.hpp"
#include "../pempek_assert.hpp"
using namespace std;
EnergyBackfillingMachineSubpartSleeper::EnergyBackfillingMachineSubpartSleeper(Workload *workload, SchedulingDecision *decision,
Queue *queue, ResourceSelector *selector,
double rjms_delay, rapidjson::Document *variant_options) :
EnergyBackfillingMonitoringInertialShutdown(workload, decision, queue, selector, rjms_delay, variant_options)
{
PPK_ASSERT_ERROR(variant_options->HasMember("fraction_of_machines_to_let_awake"),
"Invalid options JSON object: Member 'fraction_of_machines_to_let_awake' cannot be found");
PPK_ASSERT_ERROR((*variant_options)["fraction_of_machines_to_let_awake"].IsNumber(),
"Invalid options JSON object: Member 'fraction_of_machines_to_let_awake' must be a number");
_fraction_of_machines_to_let_awake = (*variant_options)["fraction_of_machines_to_let_awake"].GetDouble();
PPK_ASSERT_ERROR(_fraction_of_machines_to_let_awake >= 0 && _fraction_of_machines_to_let_awake <= 1,
"Invalid options JSON object: Member 'fraction_of_machines_to_let_awake' has an invalid "
"value (%g)", (double) _fraction_of_machines_to_let_awake);
printf("Fraction of machines to let awake: %g\n",
(double) _fraction_of_machines_to_let_awake);
}
EnergyBackfillingMachineSubpartSleeper::~EnergyBackfillingMachineSubpartSleeper()
{
}
void EnergyBackfillingMachineSubpartSleeper::on_monitoring_stage(double date)
{
update_first_slice_taking_sleep_jobs_into_account(date);
_inertial_schedule = _schedule;
const Job * priority_job = nullptr;
MachineRange machines_awake_soon = _awake_machines + _switching_on_machines + _machines_to_awaken
- _switching_off_machines - _machines_to_sedate;
int nb_machines_to_let_awakened = (int) (_fraction_of_machines_to_let_awake * _all_machines.size());
if (_inertial_shutdown_debug)
printf("Date=%g. nb_machines_to_let_awakened=%d\n", date, nb_machines_to_let_awakened);
MachineRange machines_that_can_be_used_by_the_priority_job;
Schedule::JobAlloc priority_job_alloc;
if (_inertial_shutdown_debug)
printf("Schedule without priority_job.%s\n", _inertial_schedule.to_string().c_str());
// Let's find in which time space the priority job should be executed
if (!_queue->is_empty())
{
priority_job = _queue->first_job();
// To do so, let's insert the priority job into the schedule.
PPK_ASSERT_ERROR(!_inertial_schedule.contains_job(priority_job),
"The priority job is not supposed to be in the schedule, "
"but it is. Priority job = '%s'.\n%s",
priority_job->id.c_str(), _inertial_schedule.to_string().c_str());
priority_job_alloc = _inertial_schedule.add_job_first_fit(priority_job, _selector);
// Now we want to determine which machines the priority job can use at this period of time.
// To do so, let's remove it from the schedule then compute all available machines during
// this period of time.
_inertial_schedule.remove_job(priority_job);
machines_that_can_be_used_by_the_priority_job = _inertial_schedule.available_machines_during_period(priority_job_alloc.begin, priority_job_alloc.end);
// Let's also update the number of machines to let awakened if needed
nb_machines_to_let_awakened = max(nb_machines_to_let_awakened, priority_job->nb_requested_resources);
if (_inertial_shutdown_debug)
printf("Date=%g. nb_machines_to_let_awakened=%d\n", date, nb_machines_to_let_awakened);
}
int nb_machines_to_sedate = (int)machines_awake_soon.size() - nb_machines_to_let_awakened;
if (_inertial_shutdown_debug)
printf("Date=%g. nb_machines_to_sedate=%d\n", date, nb_machines_to_sedate);
if (nb_machines_to_sedate > 0)
{
if (_inertial_shutdown_debug)
printf("Date=%g. nb_machines_to_sedate=%d. machines_awake_soon=%s. "
"machines_that_can_be_used_by_the_priority_job=%s\n",
date, nb_machines_to_sedate,
machines_awake_soon.to_string_brackets().c_str(),
machines_that_can_be_used_by_the_priority_job.to_string_brackets().c_str());
MachineRange machines_to_sedate;
select_machines_to_sedate(nb_machines_to_sedate, machines_awake_soon,
machines_that_can_be_used_by_the_priority_job,
machines_to_sedate, priority_job);
if (_inertial_shutdown_debug)
printf("Date=%g. machines_to_sedate=%s",
date, machines_to_sedate.to_string_brackets().c_str());
_machines_to_sedate += machines_to_sedate;
printf("Machines to sedate are now %s\n", _machines_to_sedate.to_string_brackets().c_str());
MachineRange machines_sedated_this_turn, machines_awakened_this_turn, empty_range;
handle_queued_switches(_inertial_schedule, _machines_to_sedate,
empty_range, machines_sedated_this_turn,
machines_awakened_this_turn);
PPK_ASSERT_ERROR(machines_awakened_this_turn == MachineRange::empty_range());
if (machines_sedated_this_turn.size() > 0)
printf("Date=%g. Those machines should be put to sleep now: %s\n",
date, machines_sedated_this_turn.to_string_brackets().c_str());
_machines_to_sedate -= machines_sedated_this_turn;
_nb_machines_sedated_by_inertia += (int) machines_to_sedate.size();
PPK_ASSERT_ERROR((_machines_to_awaken & _machines_to_sedate) == MachineRange::empty_range());
make_decisions_of_schedule(_inertial_schedule, false);
}
MachineRange machines_asleep_soon = _asleep_machines + _switching_off_machines + _machines_to_sedate
- _machines_to_awaken - _switching_on_machines;
PPK_ASSERT_ERROR((int)machines_asleep_soon.size() == _nb_machines_sedated_by_inertia +
_nb_machines_sedated_for_being_idle,
"Asleep machines inconsistency. nb_asleep_soon=%d (%s). nb_sedated_inertia=%d. "
"nb_sedated_idle=%d\n",
(int)machines_asleep_soon.size(), machines_asleep_soon.to_string_brackets().c_str(),
_nb_machines_sedated_by_inertia, _nb_machines_sedated_for_being_idle);
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment