Skip to content
Snippets Groups Projects
cloud_broker.cpp 10.18 KiB
#include "cloud_broker.hpp"
#include "broker/user.hpp"
#include "json_workload.hpp"

#include <boost/format.hpp>
#include <fstream>

#include "../pempek_assert.hpp"
#include "rapidjson/rapidjson.h"
#include <loguru.hpp>
using namespace rapidjson;

CloudBroker::CloudBroker(rapidjson::Document *user_description_file)
{
    /* Parse description file and call constructor for each user */
    if (!user_description_file->ObjectEmpty())
    {
        if (user_description_file->HasMember("dm_window"))
        {
            const Value &dm_param = (*user_description_file)["dm_window"];
            PPK_ASSERT_ERROR(dm_param.IsArray()
                    && dm_param.GetArray().Size() == 2
                    && dm_param.GetArray()[0].IsInt()
                    && dm_param.GetArray()[1].IsInt(),
                "Invalid user_description file: field "
                "'dm_window' should be an array of int of size 2.");
            dm_window = new DMWindow(dm_param.GetArray()[0].GetInt(),
                dm_param.GetArray()[1].GetInt());
        }

        PPK_ASSERT_ERROR(user_description_file->HasMember("users"),
            "Invalid user_description file: should have field 'users'.");
        const Value &users_json = (*user_description_file)["users"];
        PPK_ASSERT_ERROR(users_json.IsArray(),
            "Invalid user_description file: field 'users' should be an array.");

        for (SizeType i = 0; i < users_json.Size(); i++)
        {
            PPK_ASSERT_ERROR(users_json[i].HasMember("name")
                    && users_json[i]["name"].IsString(),
                "Invalid user_description file: user should have string field "
                "'name'.");
            string name = users_json[i]["name"].GetString();

            PPK_ASSERT_ERROR(users_json[i].HasMember("category")
                    && users_json[i]["category"].IsString(),
                "Invalid user_description file: user should have string field "
                "'category'.");
            string category = users_json[i]["category"].GetString();

            PPK_ASSERT_ERROR(users_json[i].HasMember("param")
                    && users_json[i]["param"].IsObject(),
                "Invalid user_description file: user should have json field "
                "'param'.");
            const Value &param = users_json[i]["param"];

            User *user;
            if (category == "routine_greedy")
                user = new RoutineGreedyUser(name, param);

            else if (category == "dicho_intersubmission_time")
                user = new DichoIntersubmitTimeUser(name, param);

            else if (category == "think_time_user")
                user = new ThinkTimeUser(name, param);

            else if (category == "replay_user_rigid")
                user = new ReplayUserRigid(name, param, dm_window);

            else if (category == "replay_user_reconfig")
                user = new ReplayUserReconfig(name, param, seed_generator());

            /* DM user */
            else if (category == "dm_user_reconfig"
                || category == "dm_user_degrad" || category == "dm_user_renonce"
                || category == "dm_user_delay")
            {
                PPK_ASSERT_ERROR(dm_window != nullptr,
                    "User %s is a demand response user. The field 'dm_window' "
                    "should be defined in the user_description_file.",
                    name.c_str());

                if (category == "dm_user_reconfig")
                    user = new DMUserReconfig(name, param, dm_window);

                else if (category == "dm_user_degrad")
                    user = new DMUserDegrad(name, param, dm_window);

                else if (category == "dm_user_renonce")
                    user = new DMUserRenonce(name, param, dm_window);

                else // category == "dm_user_delay"
                    user = new DMUserDelay(name, param, dm_window);
            }

            /* Feedback user */
            else if (category == "fb_user_think_time_only")
                user = new FBUserThinkTimeOnly(name, param);


            else
            {
                PPK_ASSERT_ERROR(false,
                    "Invalid user_description file: unknown user category.");
                user = nullptr;
            }

            users.emplace(user->user_name, user);
        }
    }

    /* Create users by hand */
    else
    {
        users.emplace(
            "geo", new RoutineGreedyUser("geo", 0.0, 10000.0, 1000.0, 2));

        // alice and bob
        users.emplace("alice",
            new DichoIntersubmitTimeUser("alice", 0.0, 10000.0, 1000.0));
        users.emplace(
            "bob", new DichoIntersubmitTimeUser("bob", 500.0, 10000.0, 1000.0));
    }

    // int nb_users = 2;
    // for (int i=0; i<nb_users; i++)
    // {
    //     string user_name = "user" + to_string(i)
    //     users.push_back(new DichoIntersubmitTimeUser(user_name, ));

    // }

    for (auto it : users)
    {
        user_queue.push_back(it.second);
    }
    user_queue.sort(CompareUsers());
}

CloudBroker::~CloudBroker()
{
    for (auto it : users)
    {
        delete it.second;
    }
    for (auto itr = dynamic_jobs.begin(); itr != dynamic_jobs.end(); itr++)
        delete itr->second;
    dynamic_jobs.clear();
    users_to_wake.clear();
}

double CloudBroker::next_submission(double date) const
{
    return user_queue.front()->next_submission();
}

void CloudBroker::jobs_to_submit(
    double date, list<Job *> &jobs, list<Profile *> &profiles)
{
    jobs = list<Job *>();
    profiles = list<Profile *>();

    User *user = user_queue.front();
    double planned_date_submission = user->next_submission();
    PPK_ASSERT_WARNING(date - planned_date_submission < 1.0,
        "The user asked to be called at date=%f which is more than 1sec "
        "earlier than now",
        planned_date_submission);

    /* There might be more than one user wanting to submit at this date */
    while (planned_date_submission == user->next_submission())
    {
        user_queue.pop_front();
        list<Job *> user_jobs;
        list<Profile *> user_profiles;
        user->jobs_to_submit(date, user_jobs, user_profiles);

        for (Job *job : user_jobs)
        {
            jobs.push_back(job);
            dynamic_jobs[job->id] = job;
            job->status = WAITING;
        }
        profiles.splice(profiles.end(), user_profiles);

        /* user->next_submission has been updated by side effeect */
        user_queue.push_back(user);
        user = user_queue.front();
    }

    /* Reorder the user queue */
    user_queue.sort(CompareUsers());

    /* Sort the jobs before sending them to submission, for determinism */
    jobs.sort(JobComparator());
}

void CloudBroker::feedback_job_status(double date,
    vector<string> &jobs_ended, vector<string> &jobs_killed,
    vector<string> &jobs_released)
{
    /* Jobs ended recently */
    for (const string &job_id : jobs_ended)
    {
        update_status_if_dyn_job(job_id, FINISHED);
    }

    /* Jobs killed recently */
    for (const string &job_id : jobs_killed)
    {
        update_status_if_dyn_job(job_id, KILLED);
    }

    /* Wake up affected users to give them feedback */
    for (auto userJobs : users_to_wake)
    {
        userJobs.first->wake_on_feedback(date, userJobs.second);
    }
    users_to_wake.clear();

    /* Dates of next submission may have changed in response to feedback: sort
     * the queue */
    user_queue.sort(CompareUsers());
}

void CloudBroker::update_status_if_dyn_job(
    const string &job_id, JobStatus status)
{
    auto it = dynamic_jobs.find(job_id);
    if (it != dynamic_jobs.end())
    {
        string user_name = job_id.substr(0, job_id.find('!'));
        User *user = users[user_name];
        Job *current_job = it->second;

        switch (status)
        {
        case WAITING:
            PPK_ASSERT_ERROR(current_job->status == WAITING);
            break;
        case RUNNING:
            PPK_ASSERT_ERROR(current_job->status == WAITING);
            current_job->status = RUNNING;
            LOG_F(2, "Updating status of job %s: RUNNING", it->first.c_str());
            break;
        case FINISHED:
            PPK_ASSERT_ERROR(current_job->status == RUNNING);
            current_job->status = FINISHED;
            LOG_F(2, "Updating status of job %s: FINISHED", it->first.c_str());

            /* Add potentially interested user to the map of users to wake up */
            if (users_to_wake.find(user) == users_to_wake.end())
                users_to_wake.emplace(user, list<Job *>());

            /* ..and keep track of its recently ended jobs */
            users_to_wake[user].push_back(current_job);

            dynamic_jobs.erase(it); // job ended, free memory
            break;
        case KILLED:
            PPK_ASSERT_ERROR(current_job->status == WAITING
                || current_job->status == RUNNING);
            current_job->status = KILLED;
            LOG_F(2, "Updating status of job %s: KILLED", it->first.c_str());

            /* Add potentially interested user to the map of users to wake up */
            if (users_to_wake.find(user) == users_to_wake.end())
                users_to_wake.emplace(user, list<Job *>());

            /* ..and keep track of its recently ended jobs */
            users_to_wake[user].push_back(current_job);

            dynamic_jobs.erase(it); // job ended, free memory
            break;
        }
    }
}

void CloudBroker::log_user_stats(string log_folder)
{
    static int stat[10] = { 0 };
    int *dm_stat;
    for (auto it : users)
    {
        User *user = it.second;
        dm_stat = user->get_dm_stat();
        for (int i = 0; i < 10; i++)
            stat[i] += dm_stat[i];
    }

    /* Write in file */
    ofstream file(log_folder + "/user_stats.csv");

    file << ",nb_jobs,core_seconds\n";
    file << boost::format("rigid,%1%,%2%\n") % stat[2 * RIGID]
            % stat[2 * RIGID + 1];
    file << boost::format("renonced,%1%,%2%\n") % stat[2 * RENONCED]
            % stat[2 * RENONCED + 1];
    file << boost::format("delayed,%1%,%2%\n") % stat[2 * DELAYED]
            % stat[2 * DELAYED + 1];
    file << boost::format("degraded,%1%,%2%\n") % stat[2 * DEGRADED]
            % stat[2 * DEGRADED + 1];
    file << boost::format("reconf,%1%,%2%\n") % stat[2 * RECONF]
            % stat[2 * RECONF + 1];

    file.close();
}