Skip to content
Snippets Groups Projects
schedule.cpp 41.51 KiB
#include "schedule.hpp"

#include <boost/algorithm/string/join.hpp>
#include <fstream>
#include <stdio.h>

#include <loguru.hpp>

#include "pempek_assert.hpp"

using namespace std;

Schedule::Schedule(int nb_machines, Rational initial_time)
{
    PPK_ASSERT_ERROR(nb_machines > 0);
    _nb_machines = nb_machines;

    TimeSlice slice;
    slice.begin = initial_time;
    slice.end = 1e19; // greater than the number of seconds elapsed since the big bang
    slice.length = slice.end - slice.begin;
    slice.available_machines.insert(IntervalSet::ClosedInterval(0, nb_machines - 1));
    slice.nb_available_machines = nb_machines;
    PPK_ASSERT_ERROR(slice.available_machines.size() == (unsigned int)nb_machines);

    _profile.push_back(slice);

    generate_colors();
}

Schedule::Schedule(const Schedule &other)
{
    *this = other;
}

Schedule &Schedule::operator=(const Schedule &other)
{
    _profile = other._profile;
    _nb_machines = other._nb_machines;
    _output_number = other._output_number;
    _colors = other._colors;

    return *this;
}

void Schedule::update_first_slice(Rational current_time)
{
    auto slice = _profile.begin();

    PPK_ASSERT_ERROR(
        current_time >= slice->begin, "current_time=%g, slice->begin=%g", (double)current_time, (double)slice->begin);
    PPK_ASSERT_ERROR(
        current_time <= slice->end, "current_time=%g, slice->end=%g", (double)current_time, (double)slice->end);

    Rational old_time = slice->begin;
    slice->begin = current_time;
    slice->length = slice->end - slice->begin;
    for (auto it = slice->allocated_jobs.begin(); it != slice->allocated_jobs.end(); ++it)
    {
        const Job *job_ref = (it->first);
        auto alloc_it = job_ref->allocations.find(old_time);

        if (alloc_it != job_ref->allocations.end())
        {
            job_ref->allocations[current_time] = alloc_it->second;
            job_ref->allocations.erase(alloc_it);
        }
    }
}
void Schedule::update_first_slice_removing_remaining_jobs(Rational current_time)
{
    PPK_ASSERT_ERROR(current_time < infinite_horizon());

    auto slice = _profile.begin();
    PPK_ASSERT_ERROR(
        current_time >= slice->begin, "current_time=%g, slice->begin=%g", (double)current_time, (double)slice->begin);

    while (current_time >= slice->end)
        slice = _profile.erase(slice);

    slice->begin = current_time;
    slice->length = slice->end - slice->begin;
}

void Schedule::remove_job(const Job *job)
{
    remove_job_first_occurence(job);
}

bool Schedule::remove_job_if_exists(const Job *job)
{
    auto job_first_slice = find_first_occurence_of_job(job, _profile.begin());

    if (job_first_slice != _profile.end())
    {
        remove_job_internal(job, job_first_slice);
        return true;
    }

    return false;
}

void Schedule::remove_job_all_occurences(const Job *job)
{
    auto job_first_slice = find_first_occurence_of_job(job, _profile.begin());

    while (job_first_slice != _profile.end())
    {
        remove_job_internal(job, job_first_slice);
        job_first_slice = find_first_occurence_of_job(job, job_first_slice);
    }
}

void Schedule::remove_job_first_occurence(const Job *job)
{
    auto job_first_slice = find_first_occurence_of_job(job, _profile.begin());
    PPK_ASSERT_ERROR(job_first_slice != _profile.end(),
        "Cannot remove job '%s' from the schedule since it is not in it", job->id.c_str());

    remove_job_internal(job, job_first_slice);
}

void Schedule::remove_job_last_occurence(const Job *job)
{
    auto job_first_slice = find_last_occurence_of_job(job, _profile.begin());
    PPK_ASSERT_ERROR(job_first_slice != _profile.end(),
        "Cannot remove job '%s' from the schedule since it is not in it", job->id.c_str());

    remove_job_internal(job, job_first_slice);
}

Schedule::JobAlloc Schedule::add_job_first_fit(
    const Job *job, ResourceSelector *selector, bool assert_insertion_successful)
{
    PPK_ASSERT_ERROR(!contains_job(job),
        "Invalid Schedule::add_job_first_fit call: Cannot add "
        "job '%s' because it is already in the schedule. %s",
        job->id.c_str(), to_string().c_str());
    return add_job_first_fit_after_time_slice(job, _profile.begin(), selector, assert_insertion_successful);
}

Schedule::JobAlloc Schedule::add_job_first_fit_after_time_slice(const Job *job,
    std::list<TimeSlice>::iterator first_time_slice, ResourceSelector *selector, bool assert_insertion_successful)
{
    if (_debug)
    {
        LOG_F(1, "Adding job '%s' (size=%d, walltime=%g). Output number %d. %s",
            job->id.c_str(), job->nb_requested_resources, (double)job->walltime,
            _output_number, to_string().c_str());
        output_to_svg();
    }

    // Let's scan the profile for an anchor point.
    // An anchor point is a point where enough processors are available to run this job
    for (auto pit = first_time_slice; pit != _profile.end(); ++pit)
    {
        // If the current time slice is an anchor point
        if ((int)pit->nb_available_machines >= job->nb_requested_resources)
        {
            // Let's continue to scan the profile to ascertain that
            // the machines remain available until the job's expected termination

            // If the job has no walltime, its size will be "infinite"
            if (!job->has_walltime)
            {
                // TODO: remove this ugly const_cast?
                const_cast<Job *>(job)->walltime = infinite_horizon() - pit->begin;
            }

            int availableMachinesCount = pit->nb_available_machines;
            Rational totalTime = pit->length;

            // If the job fits in the current time slice (temporarily speaking)
            if (totalTime >= job->walltime)
            {
                // Let's create the job allocation
                Schedule::JobAlloc *alloc = new Schedule::JobAlloc;

                // If the job fits in the current time slice (according to the fitting function)
                if (selector->fit(job, pit->available_machines, alloc->used_machines))
                {
                    Rational beginning = pit->begin;
                    alloc->begin = beginning;
                    alloc->end = alloc->begin + job->walltime;
                    alloc->started_in_first_slice = (pit == _profile.begin()) ? true : false;
                    alloc->job = job;
                    job->allocations[beginning] = alloc;

                    // Let's split the current time slice if needed
                    TimeSliceIterator first_slice_after_split;
                    TimeSliceIterator second_slice_after_split;
                    Rational split_date = pit->begin + job->walltime;
                    split_slice(pit, split_date, first_slice_after_split, second_slice_after_split);

                    // Let's remove the allocated machines from the available machines of the time slice
                    first_slice_after_split->available_machines.remove(alloc->used_machines);
                    first_slice_after_split->nb_available_machines -= job->nb_requested_resources;
                    first_slice_after_split->allocated_jobs[job] = alloc->used_machines;

                    if (_debug)
                    {
                        LOG_F(1, "Added job '%s' (size=%d, walltime=%g). Output number %d. %s", job->id.c_str(),
                            job->nb_requested_resources, (double)job->walltime, _output_number, to_string().c_str());
                        output_to_svg();
                    }

                    // The job has been placed, we can leave this function
                    return *alloc;
                }
            }
            else
            {
                // TODO : merge this big else with its if, as the "else" is a more general case of the "if"
                // The job does not fit in the current time slice (temporarily speaking)
                auto availableMachines = pit->available_machines;

                auto pit2 = pit;
                ++pit2;

                for (; (pit2 != _profile.end()) && ((int)pit2->nb_available_machines >= job->nb_requested_resources);
                     ++pit2)
                {
                    availableMachines &= pit2->available_machines;
                    availableMachinesCount = (int)availableMachines.size();
                    totalTime += pit2->length;

                    if (availableMachinesCount
                        < job->nb_requested_resources) // We don't have enough machines to run the job
                        break;
                    else if (totalTime
                        >= job->walltime) // The job fits in the slices [pit, pit2[ (temporarily speaking)
                    {
                        // Let's create the job allocation
                        JobAlloc *alloc = new JobAlloc;

                        // If the job fits in the current time slice (according to the fitting function)
                        if (selector->fit(job, availableMachines, alloc->used_machines))
                        {
                            alloc->begin = pit->begin;
                            alloc->end = alloc->begin + job->walltime;
                            alloc->started_in_first_slice = (pit == _profile.begin()) ? true : false;
                            alloc->job = job;
                            job->allocations[alloc->begin] = alloc;

                            // Let's remove the used machines from the slices before pit2
                            auto pit3 = pit;
                            for (; pit3 != pit2; ++pit3)
                            {
                                pit3->available_machines -= alloc->used_machines;
                                pit3->nb_available_machines -= job->nb_requested_resources;
                                pit3->allocated_jobs[job] = alloc->used_machines;
                            }

                            // Let's split the current time slice if needed
                            TimeSliceIterator first_slice_after_split;
                            TimeSliceIterator second_slice_after_split;
                            Rational split_date = pit->begin + job->walltime;
                            split_slice(pit2, split_date, first_slice_after_split, second_slice_after_split);

                            // Let's remove the allocated machines from the available machines of the time slice
                            first_slice_after_split->available_machines -= alloc->used_machines;
                            first_slice_after_split->nb_available_machines -= job->nb_requested_resources;
                            first_slice_after_split->allocated_jobs[job] = alloc->used_machines;

                            if (_debug)
                            {
                                LOG_F(1, "Added job '%s' (size=%d, walltime=%g). Output number %d. %s", job->id.c_str(),
                                    job->nb_requested_resources, (double)job->walltime, _output_number,
                                    to_string().c_str());
                                output_to_svg();
                            }

                            // The job has been placed, we can leave this function
                            return *alloc;
                        }
                    }
                }
            }
        }
    }

    if (assert_insertion_successful)
        PPK_ASSERT_ERROR(false, "Error in Schedule::add_job_first_fit: could not add job '%s' into %s", job->id.c_str(),
            to_string().c_str());

    JobAlloc failed_alloc;
    failed_alloc.has_been_inserted = false;
    return failed_alloc;
}

Schedule::JobAlloc Schedule::add_job_first_fit_after_time(
    const Job *job, Rational date, ResourceSelector *selector, bool assert_insertion_successful)
{
    if (_debug)
    {
        LOG_F(1, "Adding job '%s' (size=%d, walltime=%g) after date %g. Output number %d. %s", job->id.c_str(),
            job->nb_requested_resources, (double)job->walltime, (double)date, _output_number, to_string().c_str());
        output_to_svg();
    }

    // Let's first search at each time slice the job should be added
    auto insertion_slice_it = _profile.begin();
    bool insertion_slice_found = false;

    while (insertion_slice_it != _profile.end() && !insertion_slice_found)
    {
        if ((date >= insertion_slice_it->begin) && (date < insertion_slice_it->end))
        {
            insertion_slice_found = true;
        }
        else
            ++insertion_slice_it;
    }

    PPK_ASSERT_ERROR(insertion_slice_found, "Cannot find the insertion slice of date %g. Schedule : %s\n", (double)date,
        to_string().c_str());

    // Let's split the insertion slice in two parts if needed
    TimeSliceIterator first_slice_after_split;
    TimeSliceIterator second_slice_after_split;
    split_slice(insertion_slice_it, date, first_slice_after_split, second_slice_after_split);

    // In both cases (whether a split occured or not), we can simply call add_job_first_fit_after_time_slice on the
    // second slice now
    return add_job_first_fit_after_time_slice(job, second_slice_after_split, selector, assert_insertion_successful);
}

double Schedule::query_wait(int size, Rational time, ResourceSelector *selector)
{
    // very similar to job insertions...

    Job fake_job;
    fake_job.id = "fake";
    fake_job.unique_number = -1;
    fake_job.nb_requested_resources = size;
    fake_job.walltime = time;

    // Let's scan the profile for an anchor point.
    // An anchor point is a point where enough processors are available to run this job
    for (auto pit = _profile.begin(); pit != _profile.end(); ++pit)
    {
        // If the current time slice is an anchor point
        if ((int)pit->nb_available_machines >= size)
        {
            // Let's continue to scan the profile to ascertain that
            // the machines remain available until the job's expected termination

            int availableMachinesCount = pit->nb_available_machines;
            Rational totalTime = pit->length;

            // If the job fits in the current time slice (temporarily speaking)
            if (totalTime >= time)
            {
                IntervalSet used_machines;

                // If the job fits in the current time slice (according to the fitting function)
                if (selector->fit(&fake_job, pit->available_machines, used_machines))
                {
                    return static_cast<double>(pit->begin);
                }
            }
            else
            {
                // TODO : merge this big else with its if, as the "else" is a more general case of the "if"
                // The job does not fit in the current time slice (temporarily speaking)
                auto availableMachines = pit->available_machines;

                auto pit2 = pit;
                ++pit2;

                for (; (pit2 != _profile.end()) && ((int)pit2->nb_available_machines >= size); ++pit2)
                {
                    availableMachines &= pit2->available_machines;
                    availableMachinesCount = (int)availableMachines.size();
                    totalTime += pit2->length;

                    if (availableMachinesCount < size) // We don't have enough machines to run the job
                        break;
                    else if (totalTime >= time) // The job fits in the slices [pit, pit2[ (temporarily speaking)
                    {

                        IntervalSet used_machines;

                        // If the job fits in the current time slice (according to the fitting function)
                        if (selector->fit(&fake_job, availableMachines, used_machines))
                        {
                            return static_cast<double>(pit->begin);
                        }
                    }
                }
            }
        }
    }

    return -1;
}

Rational Schedule::first_slice_begin() const
{
    PPK_ASSERT_ERROR(_profile.size() > 0);

    auto it = _profile.begin();
    return it->begin;
}

Rational Schedule::finite_horizon() const
{
    PPK_ASSERT_ERROR(_profile.size() > 0);

    auto it = _profile.end();
    --it;
    return it->begin;
}

Rational Schedule::infinite_horizon() const
{
    PPK_ASSERT_ERROR(_profile.size() > 0);
    auto it = _profile.end();
    --it;
    return it->end;
}

std::multimap<std::string, Schedule::JobAlloc> Schedule::jobs_allocations() const
{
    multimap<std::string, Schedule::JobAlloc> res;

    map<const Job *, Rational> jobs_starting_times;
    map<const Job *, Rational> jobs_ending_times;
    set<const Job *> current_jobs;
    for (auto mit : _profile.begin()->allocated_jobs)
    {
        const Job *allocated_job = mit.first;
        current_jobs.insert(allocated_job);
        jobs_starting_times[allocated_job] = _profile.begin()->begin;
    }

    // Let's traverse the profile to find the beginning of each job
    for (auto slice_it = _profile.begin(); slice_it != _profile.end(); ++slice_it)
    {
        const TimeSlice &slice = *slice_it;
        set<const Job *> allocated_jobs;
        for (auto mit : slice.allocated_jobs)
        {
            const Job *job = mit.first;
            allocated_jobs.insert(job);
        }

        set<const Job *> finished_jobs;
        set_difference(current_jobs.begin(), current_jobs.end(), allocated_jobs.begin(), allocated_jobs.end(),
            std::inserter(finished_jobs, finished_jobs.end()));

        for (const Job *job : finished_jobs)
        {
            jobs_ending_times[job] = slice_it->begin;

            // Let's find where the job has been allocated
            PPK_ASSERT_ERROR(slice_it != _profile.begin());
            auto previous_slice_it = slice_it;
            --previous_slice_it;

            JobAlloc alloc;
            alloc.job = job;
            alloc.begin = jobs_starting_times[job];
            alloc.end = jobs_ending_times[job];
            alloc.started_in_first_slice = (alloc.begin == first_slice_begin());
            alloc.used_machines = previous_slice_it->allocated_jobs.at(job);

            res.insert({ job->id, alloc });
        }

        set<const Job *> new_jobs;
        set_difference(allocated_jobs.begin(), allocated_jobs.end(), current_jobs.begin(), current_jobs.end(),
            std::inserter(new_jobs, new_jobs.end()));

        for (const Job *job : new_jobs)
        {
            jobs_starting_times[job] = slice.begin;
        }

        // Update current_jobs
        for (const Job *job : finished_jobs)
            current_jobs.erase(job);
        for (const Job *job : new_jobs)
            current_jobs.insert(job);
    }

    return res;
}

bool Schedule::contains_job(const Job *job) const
{
    return find_first_occurence_of_job(job, _profile.begin()) != _profile.end();
}

bool Schedule::split_slice(Schedule::TimeSliceIterator slice_to_split, Rational date,
    Schedule::TimeSliceIterator &first_slice_after_split, Schedule::TimeSliceIterator &second_slice_after_split)
{
    if ((date > slice_to_split->begin) && (date < slice_to_split->end))
    {
        // The split must be done.
        // Let's create the new slice
        TimeSlice new_slice = *slice_to_split;

        new_slice.begin = date;
        new_slice.length = new_slice.end - new_slice.begin;
        PPK_ASSERT_ERROR(new_slice.length > 0);

        // Let's reduce the existing slice length
        slice_to_split->end = date;
        slice_to_split->length = slice_to_split->end - slice_to_split->begin;
        PPK_ASSERT_ERROR(slice_to_split->length > 0);

        // Let's insert the new_slice just after slice_to_split
        // To do so, since list::insert inserts BEFORE the given iterator, we must point after slice 1.
        auto list_insert_it = slice_to_split;
        ++list_insert_it;

        // Let's update returned iterators
        second_slice_after_split = _profile.insert(list_insert_it, new_slice);
        first_slice_after_split = second_slice_after_split;
        --first_slice_after_split;

        return true;
    }
    else
    {
        first_slice_after_split = slice_to_split;
        second_slice_after_split = slice_to_split;
        return false;
    }
}

Schedule::TimeSliceIterator Schedule::find_first_occurence_of_job(
    const Job *job, Schedule::TimeSliceIterator starting_point)
{
    PPK_ASSERT_ERROR(_profile.size() > 0);
    for (auto slice_it = starting_point; slice_it != _profile.end(); ++slice_it)
    {
        const TimeSlice &slice = *slice_it;
        if (slice.allocated_jobs.count(job) == 1)
            return slice_it;
    }

    return _profile.end();
}

Schedule::TimeSliceIterator Schedule::find_last_occurence_of_job(
    const Job *job, Schedule::TimeSliceIterator starting_point)
{
    PPK_ASSERT_ERROR(_profile.size() > 0);

    auto slice_it = _profile.end();
    bool found = false;

    do
    {
        --slice_it;

        const TimeSlice &slice = *slice_it;

        if (slice.allocated_jobs.count(job) == 1)
            found = true;
        else if (found) // If the job is no longer found, its starting point is just after the current slice
            return ++slice_it;

    } while (slice_it != starting_point);

    if (found)
        return starting_point;
    else
        return _profile.end();
}

Schedule::TimeSliceConstIterator Schedule::find_first_occurence_of_job(
    const Job *job, Schedule::TimeSliceConstIterator starting_point) const
{
    PPK_ASSERT_ERROR(_profile.size() > 0);
    for (auto slice_it = starting_point; slice_it != _profile.end(); ++slice_it)
    {
        const TimeSlice &slice = *slice_it;
        if (slice.allocated_jobs.count(job) == 1)
            return slice_it;
    }

    return _profile.end();
}

Schedule::TimeSliceConstIterator Schedule::find_last_occurence_of_job(
    const Job *job, Schedule::TimeSliceConstIterator starting_point) const
{
    PPK_ASSERT_ERROR(_profile.size() > 0);

    auto slice_it = _profile.end();
    bool found = false;

    do
    {
        --slice_it;

        const TimeSlice &slice = *slice_it;

        if (slice.allocated_jobs.count(job) == 1)
            found = true;
        else if (found) // If the job is no longer found, its starting point is just after the current slice
            return ++slice_it;

    } while (slice_it != starting_point);

    if (found)
        return starting_point;
    else
        return _profile.end();
}

Schedule::TimeSliceIterator Schedule::find_last_time_slice_before_date(Rational date, bool assert_not_found)
{
    PPK_ASSERT_ERROR(_profile.size() > 0);

    auto slice_it = _profile.end();

    do
    {
        --slice_it;

        const TimeSlice &slice = *slice_it;

        if (slice.begin <= date)
            return slice_it;

    } while (slice_it != _profile.begin());

    if (assert_not_found)
        PPK_ASSERT_ERROR(false, "No time slice beginning before date %g could be found. Schedule: %s\n", (double)date,
            to_string().c_str());
    return _profile.begin();
}

Schedule::TimeSliceConstIterator Schedule::find_last_time_slice_before_date(Rational date, bool assert_not_found) const
{
    PPK_ASSERT_ERROR(_profile.size() > 0);

    auto slice_it = _profile.end();

    do
    {
        --slice_it;

        const TimeSlice &slice = *slice_it;

        if (slice.begin <= date)
            return slice_it;

    } while (slice_it != _profile.begin());

    if (assert_not_found)
        PPK_ASSERT_ERROR(false, "No time slice beginning before date %g could be found. Schedule: %s\n", (double)date,
            to_string().c_str());
    return slice_it;
}

Schedule::TimeSliceIterator Schedule::find_first_time_slice_after_date(Rational date, bool assert_not_found)
{
    auto slice_it = _profile.begin();

    while (slice_it != _profile.end())
    {
        if (slice_it->begin >= date)
            return slice_it;
        ++slice_it;
    }

    if (assert_not_found)
        PPK_ASSERT_ERROR(false, "No time slice starting after date %g could be found. Schedule: %s\n", (double)date,
            to_string().c_str());

    return _profile.end();
}

Schedule::TimeSliceConstIterator Schedule::find_first_time_slice_after_date(Rational date, bool assert_not_found) const
{
    auto slice_it = _profile.begin();

    while (slice_it != _profile.end())
    {
        if (slice_it->begin >= date)
            return slice_it;
        ++slice_it;
    }

    if (assert_not_found)
        PPK_ASSERT_ERROR(false, "No time slice starting after date %g could be found. Schedule: %s\n", (double)date,
            to_string().c_str());

    return _profile.end();
}

IntervalSet Schedule::available_machines_during_period(Rational begin, Rational end) const
{
    PPK_ASSERT_ERROR(
        begin >= first_slice_begin(), "begin=%f, first_slice_begin()=%f", (double)begin, (double)first_slice_begin());
    PPK_ASSERT_ERROR(
        end <= infinite_horizon(), "end=%f, infinite_horizon()=%f", (double)end, (double)infinite_horizon());

    auto slice_it = find_first_time_slice_after_date(begin);
    IntervalSet available_machines = slice_it->available_machines;

    while (slice_it != _profile.end() && slice_it->begin < end)
    {
        available_machines &= slice_it->available_machines;

        ++slice_it;
    }

    return available_machines;
}

std::list<Schedule::TimeSlice>::iterator Schedule::begin()
{
    return _profile.begin();
}

std::list<Schedule::TimeSlice>::iterator Schedule::end()
{
    return _profile.end();
}

std::list<Schedule::TimeSlice>::const_iterator Schedule::begin() const
{
    return _profile.cbegin();
}

std::list<Schedule::TimeSlice>::const_iterator Schedule::end() const
{
    return _profile.cend();
}

int Schedule::nb_slices() const
{
    return (int) _profile.size();
}

string Schedule::to_string() const
{
    string res = "Schedule:\n";

    for (const TimeSlice &slice : _profile)
        res += slice.to_string(2, 2);

    return res;
}

string Schedule::to_svg() const
{
    Rational x0, x1, y0, y1;
    x0 = y0 = std::numeric_limits<double>::max();
    x1 = y1 = std::numeric_limits<double>::min();

    PPK_ASSERT_ERROR(_profile.size() > 0);

    auto last_finite_slice = _profile.end();
    --last_finite_slice;

    const Rational second_width = 10;
    const Rational machine_height = 10;
    const Rational space_between_machines_ratio(1, 8);
    PPK_ASSERT_ERROR(space_between_machines_ratio >= 0 && space_between_machines_ratio <= 1);
    x0 = _profile.begin()->begin * second_width;
    x1 = last_finite_slice->begin * second_width;

    y0 = 0 * machine_height;
    y1 = _nb_machines * machine_height;

    const Rational width = x1 - x0 + 10;
    const Rational height = y1 - y0;

    const int buf_size = 4096;
    char *buf = new char[buf_size];

    // header
    snprintf(buf, buf_size,
        "<?xml version=\"1.0\" encoding=\"UTF-8\" standalone=\"no\"?>\n"
        "<svg xmlns=\"http://www.w3.org/2000/svg\" width=\"%g\" height=\"%g\">\n"
        "<title>Schedule</title>\n",
        (double)width, (double)height);

    string res = buf;

    // machines background color
    for (int i = 0; i < _nb_machines; ++i)
    {
        string machine_color;

        if (i % 2 == 0)
            machine_color = "#EEEEEE";
        else
            machine_color = "#DDDDDD";

        snprintf(buf, buf_size,
            "  <rect x=\"%g\" y=\"%g\" width=\"%g\" height=\"%g\" style=\"stroke:none; fill:%s;\"/>\n", (double)0,
            (double)(i * machine_height), (double)width, (double)machine_height, machine_color.c_str());
        res += buf;
    }

    map<const Job *, Rational> jobs_starting_times;
    set<const Job *> current_jobs;
    for (auto mit : _profile.begin()->allocated_jobs)
    {
        const Job *allocated_job = mit.first;
        current_jobs.insert(allocated_job);
        jobs_starting_times[allocated_job] = _profile.begin()->begin;
    }

    // Let's traverse the profile to find the beginning of each job
    for (auto slice_it = _profile.begin(); slice_it != _profile.end(); ++slice_it)
    {
        const TimeSlice &slice = *slice_it;
        set<const Job *> allocated_jobs;
        for (auto mit : slice.allocated_jobs)
        {
            const Job *job = mit.first;
            allocated_jobs.insert(job);
        }

        set<const Job *> finished_jobs;
        set_difference(current_jobs.begin(), current_jobs.end(), allocated_jobs.begin(), allocated_jobs.end(),
            std::inserter(finished_jobs, finished_jobs.end()));

        for (const Job *job : finished_jobs)
        {
            Rational rect_x0 = jobs_starting_times[job] * second_width - x0;
            Rational rect_x1 = slice.begin * second_width - x0;
            Rational rect_width = rect_x1 - rect_x0;
            string rect_color = _colors[job->unique_number % (int)_colors.size()];

            // Let's find where the job has been allocated
            PPK_ASSERT_ERROR(slice_it != _profile.begin());
            auto previous_slice_it = slice_it;
            --previous_slice_it;

            IntervalSet job_machines = previous_slice_it->allocated_jobs.at(job);

            // Let's create a rectangle for each contiguous part of the allocation
            for (auto it = job_machines.intervals_begin(); it != job_machines.intervals_end(); ++it)
            {
                PPK_ASSERT_ERROR(it->lower() <= it->upper());
                Rational rect_y0 = it->lower() * machine_height - y0;
                Rational rect_y1 = ((it->upper() + Rational(1)) * machine_height)
                    - (space_between_machines_ratio * machine_height) - y0;
                Rational rect_height = rect_y1 - rect_y0;

                snprintf(buf, buf_size,
                    "  <rect x=\"%g\" y=\"%g\" width=\"%g\" height=\"%g\" style=\"stroke:black; stroke-width=%g; "
                    "fill:%s;\"/>\n",
                    (double)rect_x0, (double)rect_y0, (double)rect_width, (double)rect_height,
                    (double)(std::min(second_width, machine_height) / 10), rect_color.c_str());

                res += buf;
            }
        }

        set<const Job *> new_jobs;
        set_difference(allocated_jobs.begin(), allocated_jobs.end(), current_jobs.begin(), current_jobs.end(),
            std::inserter(new_jobs, new_jobs.end()));

        for (const Job *job : new_jobs)
        {
            jobs_starting_times[job] = slice.begin;
        }

        // Update current_jobs
        for (const Job *job : finished_jobs)
            current_jobs.erase(job);
        for (const Job *job : new_jobs)
            current_jobs.insert(job);
    }

    res += "</svg>";

    delete[] buf;
    return res;
}

void Schedule::write_svg_to_file(const string &filename) const
{
    ofstream f(filename);

    if (f.is_open())
        f << to_svg() << "\n";

    f.close();
}

void Schedule::output_to_svg(const string &filename_prefix)
{
    const int bufsize = 4096;
    char *buf = new char[bufsize];

    snprintf(buf, bufsize, "%s%06d.svg", filename_prefix.c_str(), _output_number);
    ++_output_number %= 10000000;

    write_svg_to_file(buf);

    delete[] buf;
}

void Schedule::dump_to_batsim_jobs_file(const string &filename) const
{
    ofstream f(filename);
    if (f.is_open())
    {
        f << "job_id,submission_time,requested_number_of_resources,requested_time,starting_time,finish_time,allocated_resources\n";

        PPK_ASSERT_ERROR(_profile.size() > 0);

        const int buf_size = 4096;
        char *buf = new char[buf_size];

        map<const Job *, Rational> jobs_starting_times;
        set<const Job *> current_jobs;
        for (auto mit : _profile.begin()->allocated_jobs)
        {
            const Job *allocated_job = mit.first;
            current_jobs.insert(allocated_job);
            jobs_starting_times[allocated_job] = _profile.begin()->begin;
        }

        // Let's traverse the profile to find the beginning of each job
        for (auto slice_it = _profile.begin(); slice_it != _profile.end(); ++slice_it)
        {
            const TimeSlice &slice = *slice_it;
            set<const Job *> allocated_jobs;
            for (auto mit : slice.allocated_jobs)
            {
                const Job *job = mit.first;
                allocated_jobs.insert(job);
            }

            set<const Job *> finished_jobs;
            set_difference(current_jobs.begin(), current_jobs.end(), allocated_jobs.begin(), allocated_jobs.end(),
                std::inserter(finished_jobs, finished_jobs.end()));

            for (const Job *job : finished_jobs)
            {
                // Find where the job has been allocated
                PPK_ASSERT_ERROR(slice_it != _profile.begin());
                auto previous_slice_it = slice_it;
                --previous_slice_it;
                IntervalSet job_machines = previous_slice_it->allocated_jobs.at(job);

                snprintf(buf, buf_size, "%s,%g,%d,%g,%g,%g,%s\n",
                         job->id.c_str(),
                         job->submission_time,
                         job->nb_requested_resources,
                         (double)job->walltime,
                         (double)jobs_starting_times[job],
                         (double)slice_it->begin,
                         job_machines.to_string_hyphen(" ", "-").c_str());
                f << buf;
            }

            set<const Job *> new_jobs;
            set_difference(allocated_jobs.begin(), allocated_jobs.end(), current_jobs.begin(), current_jobs.end(),
                std::inserter(new_jobs, new_jobs.end()));

            for (const Job *job : new_jobs)
            {
                jobs_starting_times[job] = slice.begin;
            }

            // Update current_jobs
            for (const Job *job : finished_jobs)
                current_jobs.erase(job);
            for (const Job *job : new_jobs)
                current_jobs.insert(job);
        }
        delete[] buf;
    }

    f.close();
}

void Schedule::incremental_dump_as_batsim_jobs_file(const string &filename_prefix)
{
    const int bufsize = 4096;
    char *buf = new char[bufsize];

    snprintf(buf, bufsize, "%s%06d.csv", filename_prefix.c_str(), _output_number);
    _output_number = (_output_number + 1) % 10000000;

    dump_to_batsim_jobs_file(buf);

    delete[] buf;
}

int Schedule::nb_machines() const
{
    return _nb_machines;
}

void Schedule::generate_colors(int nb_colors)
{
    PPK_ASSERT_ERROR(nb_colors > 0);
    _colors.reserve(nb_colors);

    double h, s = 1, v = 1, r, g, b;
    const int color_bufsize = 16;
    char color_buf[color_bufsize];

    double hue_fraction = 360.0 / nb_colors;
    for (int i = 0; i < nb_colors; ++i)
    {
        h = i * hue_fraction;
        hsvToRgb(h, s, v, r, g, b);

        unsigned int red = std::max(0, std::min((int)(floor(256 * r)), 255));
        unsigned int green = std::max(0, std::min((int)(floor(256 * g)), 255));
        unsigned int blue = std::max(0, std::min((int)(floor(256 * g)), 255));

        snprintf(color_buf, color_bufsize, "#%02x%02x%02x", red, green, blue);
        _colors.push_back(color_buf);
    }

    random_shuffle(_colors.begin(), _colors.end());
}

void Schedule::remove_job_internal(const Job *job, Schedule::TimeSliceIterator removal_point)
{
    // Let's retrieve the machines used by the job
    PPK_ASSERT_ERROR(removal_point->allocated_jobs.count(job) == 1);
    IntervalSet job_machines = removal_point->allocated_jobs.at(job);

    if (_debug)
    {
        LOG_F(1, "Removing job '%s'. Output number %d. %s", job->id.c_str(), _output_number, to_string().c_str());
        output_to_svg();
    }

    // Let's iterate the time slices until the job is found
    for (auto pit = removal_point; pit != _profile.end(); ++pit)
    {
        // If the job was succesfully erased from the current slice (the job was in it)
        if (pit->allocated_jobs.erase(job) == 1)
        {
            pit->available_machines.insert(job_machines);
            pit->nb_available_machines += job->nb_requested_resources;

            // If the slice is not the first one, let's try to merge it with its preceding slice
            if (pit != _profile.begin())
            {
                auto previous = pit;
                previous--;

                // The slices are merged if they have the same jobs
                if (previous->allocated_jobs == pit->allocated_jobs)
                {
                    PPK_ASSERT_ERROR(previous->available_machines == pit->available_machines,
                        "Two consecutive time slices, do NOT use the same resources "
                        "whereas they contain the same jobs. Slices:\n%s%s",
                        previous->to_string(2).c_str(), pit->to_string(2).c_str());

                    pit->begin = previous->begin;
                    pit->length = pit->end - pit->begin;

                    // pit is updated to ensure --pit points to a valid location after erasure
                    pit = _profile.erase(previous);
                }
            }

            // Let's iterate the slices while the job is in it, and erase it
            for (++pit; pit != _profile.end() && pit->allocated_jobs.erase(job) == 1; ++pit)
            {
                pit->available_machines.insert(job_machines);
                pit->nb_available_machines += job->nb_requested_resources;

                // If the slice is not the first one, let's try to merge it with its preceding slice
                if (pit != _profile.begin())
                {
                    auto previous = pit;
                    previous--;

                    // The slices are merged if they have the same jobs
                    if (previous->allocated_jobs == pit->allocated_jobs)
                    {
                        PPK_ASSERT_ERROR(previous->available_machines == pit->available_machines,
                            "Two consecutive time slices, do NOT use the same resources "
                            "whereas they contain the same jobs. Slices:\n%s%s",
                            previous->to_string(2).c_str(), pit->to_string(2).c_str());

                        pit->begin = previous->begin;
                        pit->length = pit->end - pit->begin;

                        // pit is updated to ensure --pit points to a valid location after erasure
                        pit = _profile.erase(previous);
                    }
                }
            }

            // pit is either profile.end() or does NOT contain the job
            // Let's try to merge it with its previous slice
            if (pit != _profile.end())
            {
                if (pit != _profile.begin())
                {
                    auto previous = pit;
                    previous--;

                    // The slices are merged if they have the same jobs
                    if (previous->allocated_jobs == pit->allocated_jobs)
                    {
                        PPK_ASSERT_ERROR(previous->available_machines == pit->available_machines,
                            "Two consecutive time slices, do NOT use the same resources "
                            "whereas they contain the same jobs. Slices:\n%s%s",
                            previous->to_string(2).c_str(), pit->to_string(2).c_str());

                        pit->begin = previous->begin;
                        pit->length = pit->end - pit->begin;

                        // pit is updated to ensure --pit points to a valid location after erasure
                        pit = _profile.erase(previous);
                    }
                }
            }

            if (_debug)
            {
                LOG_F(1, "Removed job '%s'. Output number %d. %s", job->id.c_str(), _output_number, to_string().c_str());
                output_to_svg();
            }

            return;
        }
    }
}

bool Schedule::TimeSlice::contains_job(const Job *job) const
{
    return allocated_jobs.count(job);
}

bool Schedule::TimeSlice::contains_matching_job(std::function<bool(const Job *)> matching_function) const
{
    for (auto mit : allocated_jobs)
    {
        const Job *job = mit.first;
        if (matching_function(job))
            return true;
    }

    return false;
}

const Job *Schedule::TimeSlice::job_from_job_id(string job_id) const
{
    for (auto mit : allocated_jobs)
    {
        const Job *job = mit.first;
        if (job->id == job_id)
            return job;
    }

    return nullptr;
}

string Schedule::TimeSlice::to_string_interval() const
{
    double ibegin = begin.convert_to<double>();
    double iend = end.convert_to<double>();
    double ilength = length.convert_to<double>();

    char buf[256];
    snprintf(buf, 256, "[%f,%f] (length=%f)", ibegin, iend, ilength);

    return string(buf);
}

string Schedule::TimeSlice::to_string_allocated_jobs() const
{
    vector<string> jobs_str;
    jobs_str.reserve(allocated_jobs.size());
    for (auto mit : allocated_jobs)
    {
        const Job *job = mit.first;
        jobs_str.push_back(job->id);
    }
    return boost::algorithm::join(jobs_str, ",");
}

string Schedule::TimeSlice::to_string(int initial_indent, int indent) const
{
    string res;

    string iistr, istr;

    for (int i = 0; i < initial_indent; ++i)
        iistr += " ";

    for (int i = 0; i < indent; ++i)
        istr += " ";

    res += iistr + "Time slice: ";

    res += to_string_interval() + "\n";
    res += iistr + istr + "available machines: " + available_machines.to_string_brackets() + "\n";
    res += iistr + istr + "allocated jobs: {" + to_string_allocated_jobs() + "}\n";

    return res;
}

void hsvToRgb(double h, double s, double v, double &r, double &g, double &b)
{
    if (s == 0) // Achromatic (grey)
    {
        r = g = b = v;
        return;
    }

    h /= 60; // sector 0 to 5
    int i = floor(h);
    float f = h - i; // factorial part of h
    float p = v * (1 - s);
    float q = v * (1 - s * f);
    float t = v * (1 - s * (1 - f));

    switch (i)
    {
    case 0:
        r = v;
        g = t;
        b = p;
        break;
    case 1:
        r = q;
        g = v;
        b = p;
        break;
    case 2:
        r = p;
        g = v;
        b = t;
        break;
    case 3:
        r = p;
        g = q;
        b = v;
        break;
    case 4:
        r = t;
        g = p;
        b = v;
        break;
    default: // case 5:
        r = v;
        g = p;
        b = q;
        break;
    }
}