-
Maël Madon authoredMaël Madon authored
cloud_broker.cpp 10.18 KiB
#include "cloud_broker.hpp"
#include "broker/user.hpp"
#include "json_workload.hpp"
#include <boost/format.hpp>
#include <fstream>
#include "../pempek_assert.hpp"
#include "rapidjson/rapidjson.h"
#include <loguru.hpp>
using namespace rapidjson;
CloudBroker::CloudBroker(rapidjson::Document *user_description_file)
{
/* Parse description file and call constructor for each user */
if (!user_description_file->ObjectEmpty())
{
if (user_description_file->HasMember("dm_window"))
{
const Value &dm_param = (*user_description_file)["dm_window"];
PPK_ASSERT_ERROR(dm_param.IsArray()
&& dm_param.GetArray().Size() == 2
&& dm_param.GetArray()[0].IsInt()
&& dm_param.GetArray()[1].IsInt(),
"Invalid user_description file: field "
"'dm_window' should be an array of int of size 2.");
dm_window = new DMWindow(dm_param.GetArray()[0].GetInt(),
dm_param.GetArray()[1].GetInt());
}
PPK_ASSERT_ERROR(user_description_file->HasMember("users"),
"Invalid user_description file: should have field 'users'.");
const Value &users_json = (*user_description_file)["users"];
PPK_ASSERT_ERROR(users_json.IsArray(),
"Invalid user_description file: field 'users' should be an array.");
for (SizeType i = 0; i < users_json.Size(); i++)
{
PPK_ASSERT_ERROR(users_json[i].HasMember("name")
&& users_json[i]["name"].IsString(),
"Invalid user_description file: user should have string field "
"'name'.");
string name = users_json[i]["name"].GetString();
PPK_ASSERT_ERROR(users_json[i].HasMember("category")
&& users_json[i]["category"].IsString(),
"Invalid user_description file: user should have string field "
"'category'.");
string category = users_json[i]["category"].GetString();
PPK_ASSERT_ERROR(users_json[i].HasMember("param")
&& users_json[i]["param"].IsObject(),
"Invalid user_description file: user should have json field "
"'param'.");
const Value ¶m = users_json[i]["param"];
User *user;
if (category == "routine_greedy")
user = new RoutineGreedyUser(name, param);
else if (category == "dicho_intersubmission_time")
user = new DichoIntersubmitTimeUser(name, param);
else if (category == "think_time_user")
user = new ThinkTimeUser(name, param);
else if (category == "replay_user_rigid")
user = new ReplayUserRigid(name, param, dm_window);
else if (category == "replay_user_reconfig")
user = new ReplayUserReconfig(name, param, seed_generator());
/* DM user */
else if (category == "dm_user_reconfig"
|| category == "dm_user_degrad" || category == "dm_user_renonce"
|| category == "dm_user_delay")
{
PPK_ASSERT_ERROR(dm_window != nullptr,
"User %s is a demand response user. The field 'dm_window' "
"should be defined in the user_description_file.",
name.c_str());
if (category == "dm_user_reconfig")
user = new DMUserReconfig(name, param, dm_window);
else if (category == "dm_user_degrad")
user = new DMUserDegrad(name, param, dm_window);
else if (category == "dm_user_renonce")
user = new DMUserRenonce(name, param, dm_window);
else // category == "dm_user_delay"
user = new DMUserDelay(name, param, dm_window);
}
/* Feedback user */
else if (category == "fb_user_think_time_only")
user = new FBUserThinkTimeOnly(name, param);
else
{
PPK_ASSERT_ERROR(false,
"Invalid user_description file: unknown user category.");
user = nullptr;
}
users.emplace(user->user_name, user);
}
}
/* Create users by hand */
else
{
users.emplace(
"geo", new RoutineGreedyUser("geo", 0.0, 10000.0, 1000.0, 2));
// alice and bob
users.emplace("alice",
new DichoIntersubmitTimeUser("alice", 0.0, 10000.0, 1000.0));
users.emplace(
"bob", new DichoIntersubmitTimeUser("bob", 500.0, 10000.0, 1000.0));
}
// int nb_users = 2;
// for (int i=0; i<nb_users; i++)
// {
// string user_name = "user" + to_string(i)
// users.push_back(new DichoIntersubmitTimeUser(user_name, ));
// }
for (auto it : users)
{
user_queue.push_back(it.second);
}
user_queue.sort(CompareUsers());
}
CloudBroker::~CloudBroker()
{
for (auto it : users)
{
delete it.second;
}
for (auto itr = dynamic_jobs.begin(); itr != dynamic_jobs.end(); itr++)
delete itr->second;
dynamic_jobs.clear();
users_to_wake.clear();
}
double CloudBroker::next_submission(double date) const
{
return user_queue.front()->next_submission();
}
void CloudBroker::jobs_to_submit(
double date, list<Job *> &jobs, list<Profile *> &profiles)
{
jobs = list<Job *>();
profiles = list<Profile *>();
User *user = user_queue.front();
double planned_date_submission = user->next_submission();
PPK_ASSERT_WARNING(date - planned_date_submission < 1.0,
"The user asked to be called at date=%f which is more than 1sec "
"earlier than now",
planned_date_submission);
/* There might be more than one user wanting to submit at this date */
while (planned_date_submission == user->next_submission())
{
user_queue.pop_front();
list<Job *> user_jobs;
list<Profile *> user_profiles;
user->jobs_to_submit(date, user_jobs, user_profiles);
for (Job *job : user_jobs)
{
jobs.push_back(job);
dynamic_jobs[job->id] = job;
job->status = WAITING;
}
profiles.splice(profiles.end(), user_profiles);
/* user->next_submission has been updated by side effeect */
user_queue.push_back(user);
user = user_queue.front();
}
/* Reorder the user queue */
user_queue.sort(CompareUsers());
/* Sort the jobs before sending them to submission, for determinism */
jobs.sort(JobComparator());
}
void CloudBroker::feedback_job_status(double date,
vector<string> &jobs_ended, vector<string> &jobs_killed,
vector<string> &jobs_released)
{
/* Jobs ended recently */
for (const string &job_id : jobs_ended)
{
update_status_if_dyn_job(job_id, FINISHED);
}
/* Jobs killed recently */
for (const string &job_id : jobs_killed)
{
update_status_if_dyn_job(job_id, KILLED);
}
/* Wake up affected users to give them feedback */
for (auto userJobs : users_to_wake)
{
userJobs.first->wake_on_feedback(date, userJobs.second);
}
users_to_wake.clear();
/* Dates of next submission may have changed in response to feedback: sort
* the queue */
user_queue.sort(CompareUsers());
}
void CloudBroker::update_status_if_dyn_job(
const string &job_id, JobStatus status)
{
auto it = dynamic_jobs.find(job_id);
if (it != dynamic_jobs.end())
{
string user_name = job_id.substr(0, job_id.find('!'));
User *user = users[user_name];
Job *current_job = it->second;
switch (status)
{
case WAITING:
PPK_ASSERT_ERROR(current_job->status == WAITING);
break;
case RUNNING:
PPK_ASSERT_ERROR(current_job->status == WAITING);
current_job->status = RUNNING;
LOG_F(2, "Updating status of job %s: RUNNING", it->first.c_str());
break;
case FINISHED:
PPK_ASSERT_ERROR(current_job->status == RUNNING);
current_job->status = FINISHED;
LOG_F(2, "Updating status of job %s: FINISHED", it->first.c_str());
/* Add potentially interested user to the map of users to wake up */
if (users_to_wake.find(user) == users_to_wake.end())
users_to_wake.emplace(user, list<Job *>());
/* ..and keep track of its recently ended jobs */
users_to_wake[user].push_back(current_job);
dynamic_jobs.erase(it); // job ended, free memory
break;
case KILLED:
PPK_ASSERT_ERROR(current_job->status == WAITING
|| current_job->status == RUNNING);
current_job->status = KILLED;
LOG_F(2, "Updating status of job %s: KILLED", it->first.c_str());
/* Add potentially interested user to the map of users to wake up */
if (users_to_wake.find(user) == users_to_wake.end())
users_to_wake.emplace(user, list<Job *>());
/* ..and keep track of its recently ended jobs */
users_to_wake[user].push_back(current_job);
dynamic_jobs.erase(it); // job ended, free memory
break;
}
}
}
void CloudBroker::log_user_stats(string log_folder)
{
static int stat[10] = { 0 };
int *dm_stat;
for (auto it : users)
{
User *user = it.second;
dm_stat = user->get_dm_stat();
for (int i = 0; i < 10; i++)
stat[i] += dm_stat[i];
}
/* Write in file */
ofstream file(log_folder + "/user_stats.csv");
file << ",nb_jobs,core_seconds\n";
file << boost::format("rigid,%1%,%2%\n") % stat[2 * RIGID]
% stat[2 * RIGID + 1];
file << boost::format("renonced,%1%,%2%\n") % stat[2 * RENONCED]
% stat[2 * RENONCED + 1];
file << boost::format("delayed,%1%,%2%\n") % stat[2 * DELAYED]
% stat[2 * DELAYED + 1];
file << boost::format("degraded,%1%,%2%\n") % stat[2 * DEGRADED]
% stat[2 * DEGRADED + 1];
file << boost::format("reconf,%1%,%2%\n") % stat[2 * RECONF]
% stat[2 * RECONF + 1];
file.close();
}