diff --git a/CHANGELOG.md b/CHANGELOG.md index e235322a8c21c85322735a2561c89383b1602b6a..b8c867d04480fbd46fcf64700d88eb5ae54e7a7b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,6 +11,8 @@ Batsched adheres to [Semantic Versioning][semver] and its public API is the foll [//]: ========================================================================= ## [Unreleased] +- Add a new `fcfs` algorithm (copied from `fcfs_fast`) that takes into account + the resources selector given in parameter. ### Fixed - The `easy_bf_fast` did not try to backfill previously submitted jobs in many diff --git a/meson.build b/meson.build index 40560ac63282cc525a9ab9c8add2aca9180133cd..5000a6f66230f7033e3b0546bf02a3c4331f6632 100644 --- a/meson.build +++ b/meson.build @@ -54,6 +54,8 @@ src = [ 'src/algo/energy_watcher.hpp', 'src/algo/fcfs_fast.cpp', 'src/algo/fcfs_fast.hpp', + 'src/algo/fcfs.cpp', + 'src/algo/fcfs.hpp', 'src/algo/filler.cpp', 'src/algo/filler.hpp', 'src/algo/killer2.cpp', diff --git a/src/algo/fcfs.cpp b/src/algo/fcfs.cpp new file mode 100644 index 0000000000000000000000000000000000000000..97a948ed59c1309d7dd13055f76c8b4159697518 --- /dev/null +++ b/src/algo/fcfs.cpp @@ -0,0 +1,135 @@ +#include "fcfs.hpp" +#include <iostream> + +#include "../pempek_assert.hpp" + +FCFS::FCFS(Workload *workload, + SchedulingDecision *decision, Queue *queue, ResourceSelector *selector, + double rjms_delay, rapidjson::Document *variant_options) : + ISchedulingAlgorithm(workload, decision, queue, selector, rjms_delay, + variant_options) +{} + +FCFS::~FCFS() +{} + +void FCFS::on_simulation_start(double date, + const rapidjson::Value &batsim_config) +{ + (void) date; + (void) batsim_config; + + _available_machines.insert(IntervalSet::ClosedInterval(0, _nb_machines - 1)); + _nb_available_machines = _nb_machines; + PPK_ASSERT_ERROR(_available_machines.size() == (unsigned int) _nb_machines); +} + +void FCFS::on_simulation_end(double date) +{ + (void) date; +} + +void FCFS::make_decisions(double date, + SortableJobOrder::UpdateInformation *update_info, + SortableJobOrder::CompareInformation *compare_info) +{ + (void) update_info; + (void) compare_info; + + // This algorithm is a version of FCFS without backfilling. + // It is meant to be fast in the usual case, not to handle corner cases. + // It is not meant to be easily readable or hackable ;). + + // This fast FCFS variant in a few words: + // - only handles the FCFS queue order + // - only handles finite jobs (no switchoff) + // - only handles time as floating-point (-> precision errors). + + bool job_ended = false; + + // Handle newly finished jobs + for (const std::string & ended_job_id : _jobs_ended_recently) + { + job_ended = true; + + Job * finished_job = (*_workload)[ended_job_id]; + + // Update data structures + _available_machines.insert(_current_allocations[ended_job_id]); + _nb_available_machines += finished_job->nb_requested_resources; + _current_allocations.erase(ended_job_id); + } + + // If jobs have finished, execute jobs as long as they fit + if (job_ended) + { + for (auto job_it = _pending_jobs.begin(); + job_it != _pending_jobs.end(); ) + { + Job * pending_job = *job_it; + IntervalSet machines; + + if (_selector->fit(pending_job, _available_machines, machines)) + { + _decision->add_execute_job(pending_job->id, + machines, date); + + // Update data structures + _available_machines -= machines; + _nb_available_machines -= pending_job->nb_requested_resources; + _current_allocations[pending_job->id] = machines; + job_it = _pending_jobs.erase(job_it); + + } + else + { + // The job becomes priority! + // As there is no backfilling, we can simply leave this loop. + break; + } + } + } + + // Handle newly released jobs + for (const std::string & new_job_id : _jobs_released_recently) + { + Job * new_job = (*_workload)[new_job_id]; + + // Is this job valid? + if (new_job->nb_requested_resources > _nb_machines) + { + // Invalid! + _decision->add_reject_job(new_job_id, date); + continue; + } + + // Is there a waiting job? + if (!_pending_jobs.empty()) + { + // Yes. The new job is queued up. + _pending_jobs.push_back(new_job); + } + else + { + // No, the queue is empty. + // Can the new job be executed now? + if (new_job->nb_requested_resources <= _nb_available_machines) + { + // Yes, the job can be executed right away! + IntervalSet machines = _available_machines.left( + new_job->nb_requested_resources); + _decision->add_execute_job(new_job_id, machines, date); + + // Update data structures + _available_machines -= machines; + _nb_available_machines -= new_job->nb_requested_resources; + _current_allocations[new_job_id] = machines; + } + else + { + // No. The job is queued up. + _pending_jobs.push_back(new_job); + } + } + } +} diff --git a/src/algo/fcfs.hpp b/src/algo/fcfs.hpp new file mode 100644 index 0000000000000000000000000000000000000000..e2d1c925a755aec1e8207a786505e539dcd6beaf --- /dev/null +++ b/src/algo/fcfs.hpp @@ -0,0 +1,38 @@ +#pragma once + +#include <unordered_map> +#include <list> + +#include "../isalgorithm.hpp" +#include "../json_workload.hpp" +#include "../locality.hpp" + +class FCFS : public ISchedulingAlgorithm +{ +public: + FCFS(Workload * workload, SchedulingDecision * decision, + Queue * queue, ResourceSelector * selector, + double rjms_delay, + rapidjson::Document * variant_options); + virtual ~FCFS(); + + virtual void on_simulation_start(double date, + const rapidjson::Value & batsim_config); + + virtual void on_simulation_end(double date); + + virtual void make_decisions(double date, + SortableJobOrder::UpdateInformation * update_info, + SortableJobOrder::CompareInformation * compare_info); + +private: + // Machines currently available + IntervalSet _available_machines; + int _nb_available_machines = -1; + + // Pending jobs (queue) + std::list<Job *> _pending_jobs; + + // Allocations of running jobs + std::unordered_map<std::string, IntervalSet> _current_allocations; +}; diff --git a/src/main.cpp b/src/main.cpp index 8da64b1f8c07009c5f2237bb38c87fb9e292a717..52d18e8cab1c5637909c38416fe44e4735e6b13c 100644 --- a/src/main.cpp +++ b/src/main.cpp @@ -33,6 +33,7 @@ #include "algo/energy_bf_machine_subpart_sleeper.hpp" #include "algo/energy_watcher.hpp" #include "algo/filler.hpp" +#include "algo/fcfs.hpp" #include "algo/fcfs_fast.hpp" #include "algo/killer.hpp" #include "algo/killer2.hpp" @@ -78,7 +79,7 @@ int main(int argc, char ** argv) "energy_bf", "energy_bf_dicho", "energy_bf_idle_sleeper", "energy_bf_monitoring", "energy_bf_monitoring_inertial", "energy_bf_subpart_sleeper", - "energy_watcher", "fcfs_fast", + "energy_watcher", "fcfs", "fcfs_fast", "filler", "killer", "killer2", "random", "rejecter", "sequencer", "sleeper", "submitter", "waiting_time_estimator"}; const set<string> policies_set = {"basic", "contiguous"}; @@ -284,6 +285,8 @@ int main(int argc, char ** argv) algo = new EnergyBackfillingMachineSubpartSleeper(&w, &decision, queue, selector, rjms_delay, &json_doc_variant_options); else if (scheduling_variant == "energy_watcher") algo = new EnergyWatcher(&w, &decision, queue, selector, rjms_delay, &json_doc_variant_options); + else if (scheduling_variant == "fcfs") + algo = new FCFS(&w, &decision, queue, selector, rjms_delay, &json_doc_variant_options); else if (scheduling_variant == "fcfs_fast") algo = new FCFSFast(&w, &decision, queue, selector, rjms_delay, &json_doc_variant_options); else if (scheduling_variant == "killer")