From 8fa039fd403929983941d7de923b0f81be8bcdd6 Mon Sep 17 00:00:00 2001
From: Faure Adrien <adrien.faure2@gmail.com>
Date: Wed, 25 Mar 2020 08:27:03 +0100
Subject: [PATCH] [code] create FCFS using resource selector

---
 CHANGELOG.md      |   2 +
 meson.build       |   2 +
 src/algo/fcfs.cpp | 135 ++++++++++++++++++++++++++++++++++++++++++++++
 src/algo/fcfs.hpp |  38 +++++++++++++
 src/main.cpp      |   5 +-
 5 files changed, 181 insertions(+), 1 deletion(-)
 create mode 100644 src/algo/fcfs.cpp
 create mode 100644 src/algo/fcfs.hpp

diff --git a/CHANGELOG.md b/CHANGELOG.md
index e235322..b8c867d 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -11,6 +11,8 @@ Batsched adheres to [Semantic Versioning][semver] and its public API is the foll
 
 [//]: =========================================================================
 ## [Unreleased]
+- Add a new `fcfs` algorithm (copied from `fcfs_fast`) that takes into account
+  the resources selector given in parameter.
 
 ### Fixed
 - The `easy_bf_fast` did not try to backfill previously submitted jobs in many
diff --git a/meson.build b/meson.build
index 40560ac..5000a6f 100644
--- a/meson.build
+++ b/meson.build
@@ -54,6 +54,8 @@ src = [
     'src/algo/energy_watcher.hpp',
     'src/algo/fcfs_fast.cpp',
     'src/algo/fcfs_fast.hpp',
+    'src/algo/fcfs.cpp',
+    'src/algo/fcfs.hpp',
     'src/algo/filler.cpp',
     'src/algo/filler.hpp',
     'src/algo/killer2.cpp',
diff --git a/src/algo/fcfs.cpp b/src/algo/fcfs.cpp
new file mode 100644
index 0000000..97a948e
--- /dev/null
+++ b/src/algo/fcfs.cpp
@@ -0,0 +1,135 @@
+#include "fcfs.hpp"
+#include <iostream>
+
+#include "../pempek_assert.hpp"
+
+FCFS::FCFS(Workload *workload,
+    SchedulingDecision *decision, Queue *queue, ResourceSelector *selector,
+    double rjms_delay, rapidjson::Document *variant_options) :
+    ISchedulingAlgorithm(workload, decision, queue, selector, rjms_delay,
+        variant_options)
+{}
+
+FCFS::~FCFS()
+{}
+
+void FCFS::on_simulation_start(double date,
+    const rapidjson::Value &batsim_config)
+{
+    (void) date;
+    (void) batsim_config;
+
+    _available_machines.insert(IntervalSet::ClosedInterval(0, _nb_machines - 1));
+    _nb_available_machines = _nb_machines;
+    PPK_ASSERT_ERROR(_available_machines.size() == (unsigned int) _nb_machines);
+}
+
+void FCFS::on_simulation_end(double date)
+{
+    (void) date;
+}
+
+void FCFS::make_decisions(double date,
+    SortableJobOrder::UpdateInformation *update_info,
+    SortableJobOrder::CompareInformation *compare_info)
+{
+    (void) update_info;
+    (void) compare_info;
+
+    // This algorithm is a version of FCFS without backfilling.
+    // It is meant to be fast in the usual case, not to handle corner cases.
+    // It is not meant to be easily readable or hackable ;).
+
+    // This fast FCFS variant in a few words:
+    // - only handles the FCFS queue order
+    // - only handles finite jobs (no switchoff)
+    // - only handles time as floating-point (-> precision errors).
+
+    bool job_ended = false;
+
+    // Handle newly finished jobs
+    for (const std::string & ended_job_id : _jobs_ended_recently)
+    {
+        job_ended = true;
+
+        Job * finished_job = (*_workload)[ended_job_id];
+
+        // Update data structures
+        _available_machines.insert(_current_allocations[ended_job_id]);
+        _nb_available_machines += finished_job->nb_requested_resources;
+        _current_allocations.erase(ended_job_id);
+    }
+
+    // If jobs have finished, execute jobs as long as they fit
+    if (job_ended)
+    {
+        for (auto job_it = _pending_jobs.begin();
+             job_it != _pending_jobs.end(); )
+        {
+            Job * pending_job = *job_it;
+            IntervalSet machines;
+
+            if (_selector->fit(pending_job, _available_machines, machines))
+           {
+                _decision->add_execute_job(pending_job->id,
+                    machines, date);
+
+                // Update data structures
+                _available_machines -= machines;
+                _nb_available_machines -= pending_job->nb_requested_resources;
+                _current_allocations[pending_job->id] = machines;
+                job_it = _pending_jobs.erase(job_it);
+
+            }
+            else
+            {
+                // The job becomes priority!
+                // As there is no backfilling, we can simply leave this loop.
+                break;
+            }
+        }
+    }
+
+    // Handle newly released jobs
+    for (const std::string & new_job_id : _jobs_released_recently)
+    {
+        Job * new_job = (*_workload)[new_job_id];
+
+        // Is this job valid?
+        if (new_job->nb_requested_resources > _nb_machines)
+        {
+            // Invalid!
+            _decision->add_reject_job(new_job_id, date);
+            continue;
+        }
+
+        // Is there a waiting job?
+        if (!_pending_jobs.empty())
+        {
+            // Yes. The new job is queued up.
+            _pending_jobs.push_back(new_job);
+        }
+        else
+        {
+            // No, the queue is empty.
+            // Can the new job be executed now?
+            if (new_job->nb_requested_resources <= _nb_available_machines)
+            {
+                // Yes, the job can be executed right away!
+                IntervalSet machines = _available_machines.left(
+                    new_job->nb_requested_resources);
+                _decision->add_execute_job(new_job_id, machines, date);
+
+                // Update data structures
+                _available_machines -= machines;
+                _nb_available_machines -= new_job->nb_requested_resources;
+                _current_allocations[new_job_id] = machines;
+            }
+            else
+            {
+                // No. The job is queued up.
+                _pending_jobs.push_back(new_job);
+            }
+        }
+    }
+}
diff --git a/src/algo/fcfs.hpp b/src/algo/fcfs.hpp
new file mode 100644
index 0000000..e2d1c92
--- /dev/null
+++ b/src/algo/fcfs.hpp
@@ -0,0 +1,38 @@
+#pragma once
+
+#include <unordered_map>
+#include <list>
+
+#include "../isalgorithm.hpp"
+#include "../json_workload.hpp"
+#include "../locality.hpp"
+
+class FCFS : public ISchedulingAlgorithm
+{
+public:
+    FCFS(Workload * workload, SchedulingDecision * decision,
+        Queue * queue, ResourceSelector * selector,
+        double rjms_delay,
+        rapidjson::Document * variant_options);
+    virtual ~FCFS();
+
+    virtual void on_simulation_start(double date,
+        const rapidjson::Value & batsim_config);
+
+    virtual void on_simulation_end(double date);
+
+    virtual void make_decisions(double date,
+        SortableJobOrder::UpdateInformation * update_info,
+        SortableJobOrder::CompareInformation * compare_info);
+
+private:
+    // Machines currently available
+    IntervalSet _available_machines;
+    int _nb_available_machines = -1;
+
+    // Pending jobs (queue)
+    std::list<Job *> _pending_jobs;
+
+    // Allocations of running jobs
+    std::unordered_map<std::string, IntervalSet> _current_allocations;
+};
diff --git a/src/main.cpp b/src/main.cpp
index 8da64b1..52d18e8 100644
--- a/src/main.cpp
+++ b/src/main.cpp
@@ -33,6 +33,7 @@
 #include "algo/energy_bf_machine_subpart_sleeper.hpp"
 #include "algo/energy_watcher.hpp"
 #include "algo/filler.hpp"
+#include "algo/fcfs.hpp"
 #include "algo/fcfs_fast.hpp"
 #include "algo/killer.hpp"
 #include "algo/killer2.hpp"
@@ -78,7 +79,7 @@ int main(int argc, char ** argv)
                                       "energy_bf", "energy_bf_dicho", "energy_bf_idle_sleeper",
                                       "energy_bf_monitoring",
                                       "energy_bf_monitoring_inertial", "energy_bf_subpart_sleeper",
-                                      "energy_watcher", "fcfs_fast",
+                                      "energy_watcher", "fcfs", "fcfs_fast",
                                       "filler", "killer", "killer2", "random", "rejecter",
                                       "sequencer", "sleeper", "submitter", "waiting_time_estimator"};
     const set<string> policies_set = {"basic", "contiguous"};
@@ -284,6 +285,8 @@ int main(int argc, char ** argv)
             algo = new EnergyBackfillingMachineSubpartSleeper(&w, &decision, queue, selector, rjms_delay, &json_doc_variant_options);
         else if (scheduling_variant == "energy_watcher")
             algo = new EnergyWatcher(&w, &decision, queue, selector, rjms_delay, &json_doc_variant_options);
+        else if (scheduling_variant == "fcfs")
+            algo = new FCFS(&w, &decision, queue, selector, rjms_delay, &json_doc_variant_options);
         else if (scheduling_variant == "fcfs_fast")
             algo = new FCFSFast(&w, &decision, queue, selector, rjms_delay, &json_doc_variant_options);
         else if (scheduling_variant == "killer")
-- 
GitLab