Skip to content
Snippets Groups Projects
Commit f21c2562 authored by Millian Poquet's avatar Millian Poquet
Browse files

Merge branch 'artifacts-prediction'

parents 79b886ec 2ac9c5a8
Branches
No related tags found
No related merge requests found
Showing
with 2024 additions and 18 deletions
artifact-overview.pdf
......@@ -281,12 +281,11 @@ The step-by-step instructions of this document can be used in several ways *depe
+ You can *check* the final analyses (code + plots) done in Article @lightpredenergy by reading the provided pre-rendered notebooks available on #link(zenodo-url)[Zenodo].
+ You can *reproduce* the *final analyses* by first downloading the provided aggregated results of the experiments from #link(zenodo-url)[Zenodo], and then by running the notebooks yourself.
This enables you to *edit* our notebooks before running them, so that you can to modify the analyses done or add your own.
// - Refer to #todo[link to Danilo's notebook section] for the machine learning experiment.
- Refer to @sec-analyze-prediction-results for instructions to analyze the results of the machine learning experiment.
- Refer to @sec-analyze-simu-campaign-results for instructions to analyze the results of the scheduling experiment.
+ You can *reproduce* our *experimental campaigns* by downloading the provided experiment input files from #link(zenodo-url)[Zenodo],
and then by running the experiment yourself.
This can enable you to make sure that our experiment can be reproduced with the *exact same parameters and configuration*.
//- Refer to #todo[link to Danilo's expe section?] for the machine learning experiment.
- Refer to @sec-run-simu-campaign for instructions to reproduce the scheduling experiment.
+ You can *fully reproduce* our *experimental campaigns* by downloading original traces of the Marconi100,
by generating the experimental campaigns parameters yourself (enabling you to hack provided command-line parameters or provided code),
......@@ -294,19 +293,6 @@ The step-by-step instructions of this document can be used in several ways *depe
You can follow all steps below in this case,
but *please do note that this is disk/bandwidth/computation-intensive.*
== Job power prediction <sec-job-power-pred>
How to reproduce the power predictions of jobs has not been written yet.
The expected output data of this section has however been stored on #link(zenodo-url)[Zenodo].
//#tododanilo[how to reproduce this experiment?]
#fullbox(footer: [Disk: 82 Mo.])[
#filehashes((
"fdcc47998a7e998abde325162833b23e", "power_pred_users_allmethods_max.tar.gz",
"954f782a75c9a5b21c53a95c0218e220", "power_pred_users_allmethods_mean.tar.gz",
))
]
== Analysis and modeling of the power behavior of Marconi100 nodes
=== Get power and job Marconi100 traces on your disk <sec-m100-power-job-traces>
This section downloads parts of the Marconi100 trace as archives from #link("https://gitlab.com/ecs-lab/exadata")[the ExaData Zenodo files], checks that the archives have the right content (via MD5 checksums), extracts the data needed by later stages of the pipeline (node power usage traces, jobs information traces), then finally removes unneeded extracted files and the downloaded archives.
......@@ -363,7 +349,7 @@ The notebook also generates a power model of the Marconi100 nodes, which is requ
Required input files.
- `m100-data/22-agg_power_total.csv` (output of @sec-agg-power-traces-per-node).
#fullbox(footer:[Disk: 1.7 Mo. Time (laptop): 00:00:10])[
#fullbox(footer:[Disk: 1.7 Mo. Time (laptop): 00:00:10.])[
```sh
nix develop .#r-notebook --command \
Rscript notebooks/run-rmarkdown-notebook.R \
......@@ -381,6 +367,147 @@ Required input files.
Their content should be completely reproducible though.
]
== Job power prediction <sec-job-power-pred>
The experimental workflow consists of three parts, (i) preprocessing of the original data, and
(ii) prediction of the mean and maximum power consumption.
Please note that reproducing this section involves *heavy computations* and *big data*.
We have *not* made intermediate files available on #link(zenodo-url)[Zenodo] as they were too big.
=== Pre-processing
==== Step 1
#fullbox(footer:[#emph-overhead[Memory: 128 Go. Time (sequential): 18:00:00.]])[
```sh
for month in 22-01 22-02 22-03 22-04 22-05 22-06 22-07 22-08 22-09; do
nix develop .#py-scripts --command m100-pred-preprocess1 \
-j ./m100-data/${month}_jobs.parquet \
-m ./m100-data/${month}_power_total.parquet
done
```
]
==== Step 2
#fullbox(footer:[#emph-overhead[Memory: 128 Go. Time (sequential): 66:00:00.]])[
```sh
for month in 22-01 22-02 22-03 22-04 22-05 22-06 22-07 22-08 22-09; do
nix develop .#py-scripts --command m100-pred-preprocess2 \
-js ./m100-data/${month}_filter12_singlenode.csv \
-jm ./m100-data/${month}_filter12_multinode.csv \
-m ./m100-data/${month}_power_total.parquet
done
```
]
=== Aggregate step 2 output into a single file
#fullbox(footer: [#emph-overhead[Disk: 32 Go.]])[
```sh
find . -name '*filter123*' | \
tar -zcvf exadata_job_energy_profiles.tar.gz --files-from -
```
]
=== Compute power metrics and add job information
#fullbox(footer: [#emph-overhead[Disk: 32 Go.]])[
```sh
for month in 22-01 22-02 22-03 22-04 22-05 22-06 22-07 22-08 22-09; do
nix develop .#py-scripts --command m100-pred-jobs-extract-power-metrics \
-d ./m100-data/${month}
done
```
]
=== Merge files into a single CSV file
This will output the `filter123_all_jobs_aggmetrics.csv.gz` needed for the prediction script.
#fullbox(footer: [Disk: 82 Mo.])[
```sh
nix develop .#py-scripts --command m100-pred-merge-jobfiles -d ./m100-data/
```
]
== Predicting Job mean and maximum power consumption
#fullbox(footer:[#emph-overhead[Memory: 128 Go. Time (sequential): 72:00:00.]])[
```sh
mkdir ./m100-data/total_power_mean_predictions_users_allmethods_mean
mkdir ./m100-data/total_power_mean_predictions_users_allmethods_max
nix develop .#py-scripts --command \
run-prediction-per-user-allmethods-mean \
-j ./m100-data/filter123_all_jobs_aggmetrics.csv.gz \
-o ./m100-data/total_power_mean_predictions_users_allmethods_mean
nix develop .#py-scripts --command \
run-prediction-per-user-allmethods-max \
-j ./m100-data/filter123_all_jobs_aggmetrics.csv.gz \
-o ./m100-data/total_power_mean_predictions_users_allmethods_max
```
]
=== Compressing prediction output into single files
The expected output data has been stored on #link(zenodo-url)[Zenodo].
#fullbox(footer:[Disk: 82 Mo.])[
```sh
tar -cvzf ./m100-data/power_pred_users_allmethods_max.tar.gz \
./m100-data/total_power_mean_predictions_users_allmethods_mean
tar -cvzf ./m100-data/power_pred_users_allmethods_mean.tar.gz \
./m100-data/total_power_mean_predictions_users_allmethods_max
```
#filehashes((
"fdcc47998a7e998abde325162833b23e", "power_pred_users_allmethods_max.tar.gz",
"954f782a75c9a5b21c53a95c0218e220", "power_pred_users_allmethods_mean.tar.gz",
))
]
== Analyzing prediction results <sec-analyze-prediction-results>
This analysis requires that the two job power prediction archives (outputs of @sec-job-power-pred, available on #link(zenodo-url)[Zenodo]) are available on your disk in the `./user-power-predictions` directory.
The following command populates the `./user-power-predictions/data` by extracting the archives and uncompressing all the required files on your disk.
#fullbox(footer: [Disk: 519 Mo. Time: 00:00:05.])[
```sh
mkdir ./user-power-predictions/data
nix develop .#merge-m100-power-predictions --command \
tar xf ./user-power-predictions/*mean.tar.gz --directory ./user-power-predictions/data
nix develop .#merge-m100-power-predictions --command \
tar xf ./user-power-predictions/*max.tar.gz --directory ./user-power-predictions/data
nix develop .#merge-m100-power-predictions --command \
gunzip ./user-power-predictions/data/*/*.gz
```
]
The analysis of the predictions, which also generates Figures 2 and 3 of Article @lightpredenergy, can be reproduced with the following command.
#fullbox(footer:[Time (laptop): 00:00:20.])[
```sh
nix develop .#r-py-notebook --command \
Rscript notebooks/run-rmarkdown-notebook.R \
notebooks/prediction-results-analysis.Rmd
```
#filehashes((
"6ce534dd1bf017444f05c354a1b3b767", "notebooks/fig2a-distrib-job-power-mean.svg",
"0b1cdfcf017c2cba7057a544e19cd698", "notebooks/fig2b-distrib-job-power-max.svg",
"0bc88e65ae495a8d6ec7d3cbcfca12ae", "notebooks/fig3a-pred-mape-mean-power.svg",
"a19b1a7c5dc72ec73a5349d85fc68fa3", "notebooks/fig3b-pred-mape-max-power.svg",
"04c2d5ef412b791a4d5515ec0719b3d0", "notebooks/prediction-results-analysis.html",
), fill: (x, y) => {
if y > 0 { red.lighten(80%) }
},
)
We could not make HTML notebooks and Python-generated images binary reproducible despite our best efforts.
Their content should be completely reproducible though.
]
== Job scheduling with power prediction <sec-sched>
This section shows how to reproduce Sections 6.4 and 6.5 of article @lightpredenergy.
......@@ -499,7 +626,7 @@ Required input files.
- The `/tmp/wlds` directory (#emph-overhead[1.4 Go]) that contains all the workload files (output of @sec-gen-workloads).
Please note that all input files can be downloaded from #link(zenodo-url)[Zenodo] if you have not generated them yourself.
In particular to populate the `/tmp/wlds`directory you can *download file* `workloads.tar.xz` and then *extract it* into `/tmp/` via a command such as the following. `tar xf workloads.tar.xz --directory=/tmp/`
In particular to populate the `/tmp/wlds` directory you can *download file* `workloads.tar.xz` and then *extract it* into `/tmp/` via a command such as the following. `tar xf workloads.tar.xz --directory=/tmp/`
#fullbox(footer: [#emph-overhead[Disk: 7.6 Go.] Time: 00:06:00.])[
```sh
......
......@@ -70,6 +70,8 @@
propagatedBuildInputs = with pyPkgs; [
packages.fastparquet-2402
pyPkgs.sortedcontainers
pyPkgs.scipy
pyPkgs.scikit-learn
];
};
fastparquet-2402 = pyPkgs.fastparquet.overrideAttrs(final: prev: rec {
......@@ -83,7 +85,7 @@
});
easypower-sched-lib = easy-powercap-pkgs.easypower;
};
devShells = {
devShells = rec {
download-m100-months = pkgs.mkShell {
buildInputs = [
packages.python-scripts
......@@ -125,6 +127,14 @@
pkgs.rPackages.viridis
];
};
r-py-notebook = pkgs.mkShell {
buildInputs = r-notebook.buildInputs ++ [
pkgs.rPackages.reticulate
pyPkgs.pandas
pyPkgs.seaborn
pyPkgs.scikit-learn
];
};
typst-shell = pkgs.mkShell {
buildInputs = [
typstpkgs.typst-dev
......
---
title: "Job power prediction result analysis"
author: "Danilo Carastan-Santos"
date: "2024-05-15"
output:
rmdformats::readthedown
---
## Processing the mean power prediction results
Outputs of script `run_prediction_per_user_allmethods_mean.py`.
```{python}
import pandas as pd
import seaborn as sns
import os
RESULTS_PATH = "../user-power-predictions/data/total_power_mean_predictions_users_allmethods_mean/"
PRED_COLS = ["hist_pred_total_power_mean",
"LinearRegression_total_power_mean_watts",
"RandomForestRegressor_total_power_mean_watts",
"LinearSVR_total_power_mean_watts",
"SGDRegressor_total_power_mean_watts"]
result_filenames = os.listdir(RESULTS_PATH)
df_all_results = pd.concat([pd.read_csv(RESULTS_PATH+filename, low_memory=False) for filename in result_filenames])
df_all_results = df_all_results.dropna(subset=PRED_COLS)
df_all_results
from sklearn.metrics import mean_absolute_error, mean_squared_error, mean_absolute_percentage_error
lst_users = df_all_results["user_id"].drop_duplicates().to_list()
#print(lst_users)
df_results_user_group = df_all_results.groupby("user_id")
lst_stats_per_user = []
for user in lst_users:
results_user = df_results_user_group.get_group(user)
hist_mape = mean_absolute_percentage_error(results_user["total_power_mean_watts"], results_user["hist_pred_total_power_mean"])
LR_mape = mean_absolute_percentage_error(results_user["total_power_mean_watts"], results_user["LinearRegression_total_power_mean_watts"])
RF_mape = mean_absolute_percentage_error(results_user["total_power_mean_watts"], results_user["RandomForestRegressor_total_power_mean_watts"])
LSVR_mape = mean_absolute_percentage_error(results_user["total_power_mean_watts"], results_user["LinearSVR_total_power_mean_watts"])
SGD_mape = mean_absolute_percentage_error(results_user["total_power_mean_watts"], results_user["SGDRegressor_total_power_mean_watts"])
res = {"user_id": user,
"hist_mape": hist_mape,
"LinearRegression_mape": LR_mape,
"RandomForestRegressor_mape": RF_mape,
"LinearSVR_mape": LSVR_mape,
"SGDRegressor_mape": SGD_mape}
lst_stats_per_user.append(res)
#break
df_stats_per_user = pd.DataFrame(lst_stats_per_user)
df_stats_per_user
COLS = ["hist_mape","LinearRegression_mape","RandomForestRegressor_mape","LinearSVR_mape","SGDRegressor_mape"]
df_stats_per_user[COLS].describe()
COLS = ["hist_mape","LinearRegression_mape","RandomForestRegressor_mape","LinearSVR_mape","SGDRegressor_mape"]
df_stats_per_user_pivot = pd.melt(df_stats_per_user, id_vars="user_id")
df_stats_per_user_pivot
```
### Figure 3 (a)
```{python}
import matplotlib.pyplot as plt
TINY_SIZE = 2
SMALL_SIZE = 5
MEDIUM_SIZE = 20
BIGGER_SIZE = 50
FIG_WIDTH = 40
FIG_HEIGHT = 10
#plt.rc('font', size=16) # controls default text sizes
plt.rc('font', size=20) # controls default text sizes
plt.rc('axes', titlesize=MEDIUM_SIZE) # fontsize of the axes title
plt.rc('axes', labelsize=MEDIUM_SIZE) # fontsize of the x and y labels
plt.rc('xtick', labelsize=MEDIUM_SIZE) # fontsize of the tick labels
plt.rc('ytick', labelsize=MEDIUM_SIZE) # fontsize of the tick labels
plt.rc('legend', fontsize=MEDIUM_SIZE) # legend fontsize
plt.rc('figure', titlesize=MEDIUM_SIZE) # fontsize of the figure title
plt.rc('figure', figsize=(8,4))
#g = sns.boxplot(x="variable", y="value", data=df_stats_per_user_pivot, showfliers=False)
#plt.xticks(ticks=[0,1,2,3,4],labels=["History", "LinearRegression", "RandomForest", "LinearSVR", "SGDRegressor"],rotation=30)
g = sns.boxplot(y="variable", x="value", data=df_stats_per_user_pivot, showfliers=False)
plt.yticks(ticks=[0,1,2,3,4],labels=["History", "LinearRegression", "RandomForest", "LinearSVR", "SGDRegressor"],rotation=0)
g.set_ylabel("Prediction Method")
g.set_xlabel("Mean Absolute Percentage Error (MAPE) ")
plt.tight_layout(pad=0)
plt.savefig("./fig3a-pred-mape-mean-power.svg")
```
## Processing the max power prediction results
Outputs of script `run_prediction_per_user_allmethods_max.py`.
```{python}
import pandas as pd
import seaborn as sns
import os
RESULTS_PATH = "../user-power-predictions/data/total_power_mean_predictions_users_allmethods_max/"
PRED_COLS = ["hist_pred_total_power_max",
"LinearRegression_total_power_max_watts",
"RandomForestRegressor_total_power_max_watts",
"LinearSVR_total_power_max_watts",
"SGDRegressor_total_power_max_watts"]
result_filenames = os.listdir(RESULTS_PATH)
df_all_results = pd.concat([pd.read_csv(RESULTS_PATH+filename, low_memory=False) for filename in result_filenames])
df_all_results = df_all_results.dropna(subset=PRED_COLS)
#df_all_results
from sklearn.metrics import mean_absolute_error, mean_squared_error, mean_absolute_percentage_error
lst_users = df_all_results["user_id"].drop_duplicates().to_list()
#print(lst_users)
df_results_user_group = df_all_results.groupby("user_id")
lst_stats_per_user = []
for user in lst_users:
results_user = df_results_user_group.get_group(user)
hist_mape = mean_absolute_percentage_error(results_user["total_power_max_watts"], results_user["hist_pred_total_power_max"])
LR_mape = mean_absolute_percentage_error(results_user["total_power_max_watts"], results_user["LinearRegression_total_power_max_watts"])
RF_mape = mean_absolute_percentage_error(results_user["total_power_max_watts"], results_user["RandomForestRegressor_total_power_max_watts"])
LSVR_mape = mean_absolute_percentage_error(results_user["total_power_max_watts"], results_user["LinearSVR_total_power_max_watts"])
SGD_mape = mean_absolute_percentage_error(results_user["total_power_max_watts"], results_user["SGDRegressor_total_power_max_watts"])
res = {"user_id": user,
"hist_mape": hist_mape,
"LinearRegression_mape": LR_mape,
"RandomForestRegressor_mape": RF_mape,
"LinearSVR_mape": LSVR_mape,
"SGDRegressor_mape": SGD_mape}
lst_stats_per_user.append(res)
#break
df_stats_per_user = pd.DataFrame(lst_stats_per_user)
#df_stats_per_user
COLS = ["hist_mape","LinearRegression_mape","RandomForestRegressor_mape","LinearSVR_mape","SGDRegressor_mape"]
df_stats_per_user[COLS].describe()
COLS = ["hist_mape","LinearRegression_mape","RandomForestRegressor_mape","LinearSVR_mape","SGDRegressor_mape"]
df_stats_per_user_pivot = pd.melt(df_stats_per_user, id_vars="user_id")
df_stats_per_user_pivot
```
### Figure 3 (b)
```{python}
import matplotlib.pyplot as plt
TINY_SIZE = 2
SMALL_SIZE = 5
MEDIUM_SIZE = 20
BIGGER_SIZE = 50
FIG_WIDTH = 40
FIG_HEIGHT = 10
plt.rc('font', size=20) # controls default text sizes
plt.rc('axes', titlesize=MEDIUM_SIZE) # fontsize of the axes title
plt.rc('axes', labelsize=MEDIUM_SIZE) # fontsize of the x and y labels
plt.rc('xtick', labelsize=MEDIUM_SIZE) # fontsize of the tick labels
plt.rc('ytick', labelsize=MEDIUM_SIZE) # fontsize of the tick labels
plt.rc('legend', fontsize=MEDIUM_SIZE) # legend fontsize
plt.rc('figure', titlesize=MEDIUM_SIZE) # fontsize of the figure title
plt.rc('figure', figsize=(8,4))
#g = sns.boxplot(x="variable", y="value", data=df_stats_per_user_pivot, showfliers=False)
#plt.xticks(ticks=[0,1,2,3,4],labels=["History", "LinearRegression", "RandomForest", "LinearSVR", "SGDRegressor"],rotation=30)
#g.set_xlabel("Prediction Method")
#g.set_ylabel("Mean Absolute Percentage Error (MAPE) ")
g = sns.boxplot(y="variable", x="value", data=df_stats_per_user_pivot, showfliers=False)
plt.yticks(ticks=[0,1,2,3,4],labels=["History", "LinearRegression", "RandomForest", "LinearSVR", "SGDRegressor"],rotation=0)
g.set_ylabel("Prediction Method")
g.set_xlabel("Mean Absolute Percentage Error (MAPE)")
plt.tight_layout(pad=0)
plt.savefig("./fig3b-pred-mape-max-power.svg")
```
## Getting the actual mean and max power distributions
### Mean: Figure 2 (a)
```{python}
import matplotlib.pyplot as plt
import seaborn as sns
TINY_SIZE = 2
SMALL_SIZE = 5
MEDIUM_SIZE = 20
BIGGER_SIZE = 50
FIG_WIDTH = 40
FIG_HEIGHT = 10
plt.clf()
plt.rc('figure', figsize=(8, 6))
plt.rc('font', size=MEDIUM_SIZE) # controls default text sizes
plt.rc('axes', titlesize=MEDIUM_SIZE) # fontsize of the axes title
plt.rc('axes', labelsize=MEDIUM_SIZE) # fontsize of the x and y labels
plt.rc('xtick', labelsize=MEDIUM_SIZE) # fontsize of the tick labels
plt.rc('ytick', labelsize=MEDIUM_SIZE) # fontsize of the tick labels
plt.rc('legend', fontsize=MEDIUM_SIZE) # legend fontsize
plt.rc('figure', titlesize=MEDIUM_SIZE) # fontsize of the figure title
plt.rc('figure', figsize=(6,4))
g = sns.histplot(x="total_power_mean_watts", data=df_all_results, bins=25, fill=False)
#g.ax.set_yscale('log')
g.set_xlabel("Total Power (watts)")
g.set_ylabel("Number of Jobs")
plt.xticks(ticks=[0,250,500,750,1000,1250,1500], rotation=30)
plt.tight_layout(pad=0)
plt.savefig("./fig2a-distrib-job-power-mean.svg")
```
### Max : Figure 2 (b)
```{python}
import matplotlib.pyplot as plt
import seaborn as sns
TINY_SIZE = 2
SMALL_SIZE = 5
MEDIUM_SIZE = 20
BIGGER_SIZE = 50
FIG_WIDTH = 40
FIG_HEIGHT = 10
plt.clf()
plt.rc('figure', figsize=(8, 6))
plt.rc('font', size=MEDIUM_SIZE) # controls default text sizes
plt.rc('axes', titlesize=MEDIUM_SIZE) # fontsize of the axes title
plt.rc('axes', labelsize=MEDIUM_SIZE) # fontsize of the x and y labels
plt.rc('xtick', labelsize=MEDIUM_SIZE) # fontsize of the tick labels
plt.rc('ytick', labelsize=MEDIUM_SIZE) # fontsize of the tick labels
plt.rc('legend', fontsize=MEDIUM_SIZE) # legend fontsize
plt.rc('figure', titlesize=MEDIUM_SIZE) # fontsize of the figure title
plt.rc('figure', figsize=(6,4))
#g = sns.displot(x="total_power_max_watts", data=df_all_results)
g = sns.histplot(x="total_power_max_watts", data=df_all_results, bins=25, fill=False)
#g.ax.set_yscale('log')
g.set_xlabel("Total Power (watts)")
g.set_ylabel("Number of Jobs")
plt.xticks(ticks=[0,250,500,750,1000,1250,1500,1750,2000], rotation=30)
plt.tight_layout(pad=0)
plt.savefig("./fig2b-distrib-job-power-max.svg")
```
import argparse
import sys
import pandas as pd
import numpy as np
from scipy.stats import iqr
"""
Read input data
"""
def read_data(rootdir):
df_jobs_single = pd.read_csv(rootdir+"_filter123_singlenode.csv")
df_jobs_multi = pd.read_csv(rootdir+"_filter123_multinode.csv")
df_power_single = pd.read_csv(rootdir+"_filter123_singlenode.csv")
df_power_multi = pd.read_csv(rootdir+"_filter123_multinode.csv")
df_jobs = pd.concat([df_jobs_single, df_jobs_multi]).reset_index(drop=True)
df_power = pd.concat([df_power_single, df_power_multi]).reset_index(drop=True)
df_power['node'] = pd.to_numeric(df_power['node'])
df_power['value'] = pd.to_numeric(df_power['value'])
return df_jobs, df_power
"""
Calculate jobs' power aggregation metrics
"""
def calculate_agg_metrics(df_jobs, df_power):
powertrace_jobs = [df_power[df_power["job_id"]==x]['value'].values for x in df_jobs['job_id'].values]
df_jobs["total_power_max_watts"] = [x.max() for x in powertrace_jobs]
df_jobs["total_power_mean_watts"] = [x.mean() for x in powertrace_jobs]
df_jobs["total_power_median_watts"] = [np.median(x) for x in powertrace_jobs]
df_jobs["total_power_std_watts"] = [x.std() for x in powertrace_jobs]
df_jobs["total_power_iqr_watts"] = [iqr(x, rng=(25, 75)) for x in powertrace_jobs]
df_jobs["total_power_1090range_watts"] = [iqr(x, rng=(10, 90)) for x in powertrace_jobs]
return df_jobs
"""
Save results
"""
def save_results(df_jobs_aggmetrics, rootdir):
df_jobs_aggmetrics.to_csv(rootdir+"_filter123_aggmetrics.csv")
"""
Run workflow
"""
def run_workflow(rootdir):
df_jobs, df_power = read_data(rootdir)
df_jobs_aggmetrics = calculate_agg_metrics(df_jobs, df_power)
print(df_jobs_aggmetrics[["run_time",
"num_nodes",
"total_power_max_watts",
"total_power_mean_watts",
"total_power_median_watts"]])
save_results(df_jobs_aggmetrics, rootdir)
"""
Read Command line interface
"""
def read_cli():
# Make parser object
p = argparse.ArgumentParser(description='Process ExaData data to extract per-job power metrics. This script uses the output from process_marconi_jobs_2.py')
p.add_argument("--rootdir", "-d", type=str, required=True,
help="Root directory of the trace")
return(p.parse_args())
def main():
if sys.version_info<(3,5,0):
sys.stderr.write("You need python 3.5 or later to run this script\n")
sys.exit(1)
args = read_cli()
sys.exit(2)
run_workflow(args.rootdir)
if __name__ == '__main__':
main()
import glob
import pandas as pd
import sys
import argparse
"""
Read job files spread in the months folders
"""
def read_jobifles(rootdir):
#DATASET_PATH = "/home/dancarastan/Documentos/exadata_job_energy_profiles/"
jobfiles_list = glob.glob(rootdir+"*"+"_filter123_aggmetrics.csv")
#print(len(jobfiles_list))
df_jobs = pd.concat([pd.read_csv(jobfile) for jobfile in jobfiles_list]).reset_index(drop=True)
return df_jobs
"""
Save results to compressed csv
"""
def save_results(df_jobs, rootdir):
df_jobs.to_csv(rootdir+"/filter123_all_jobs_aggmetrics.csv.gz", index=False)
"""
Run workflow
"""
def run_workflow(rootdir):
df_jobs = read_jobifles(rootdir)
save_results(df_jobs, rootdir)
"""
Read Command line interface
"""
def read_cli():
# Make parser object
p = argparse.ArgumentParser(description='Concatenate job table files into a single csv file')
p.add_argument("--rootdir", "-d", type=str, required=True,
help="Root directory of the trace")
return(p.parse_args())
def main():
if sys.version_info<(3,5,0):
sys.stderr.write("You need python 3.5 or later to run this script\n")
sys.exit(1)
args = read_cli()
run_workflow(args.rootdir)
if __name__ == '__main__':
main()
#!/usr/bin/env python3
import sys
import argparse
import ast
import pandas as pd
import numpy as np
"""
Read Data
"""
def read_data(jobfile, metricfile):
df_jobs = pd.read_parquet(jobfile)
df_power = pd.read_parquet(metricfile)
## I call here df_power, but in reality it can be any metric
return df_jobs, df_power
"""
Convert some string values to numeric
Convert timestamps to seconds for job runtime
"""
def preprocess_data(df_jobs, df_power):
df_power['node'] = pd.to_numeric(df_power['node'])
df_power['value'] = pd.to_numeric(df_power['value'])
df_jobs["run_time"] = (df_jobs['end_time']-df_jobs['start_time']) / np.timedelta64(1, 's')
print("Total number of jobs: ", len(df_jobs))
return df_jobs, df_power
"""
Filter 1: remove jobs that take less than a minute
"""
def filter1(df_jobs):
SECONDS_ONE_MINUTE = 60.0
df_jobs_f1 = df_jobs.loc[df_jobs["run_time"] >= SECONDS_ONE_MINUTE]
print("Number of jobs after filter 1: ", str(len(df_jobs_f1)))
return df_jobs_f1
"""
Filter 2: remove jobs with no energy profile
Method for single-node jobs
"""
def filter2_single(df_jobs, df_power):
df_jobs_f1_single = df_jobs[df_jobs["num_nodes"]==1]
#removing malformed "nodes"
df_jobs_f1_single = df_jobs_f1_single[df_jobs_f1_single["nodes"].str.match("\[\d+\]")].copy().reset_index(drop=True)
df_jobs_f1_single["node"] = [ast.literal_eval(x)[0] for x in df_jobs_f1_single['nodes']]
#use this for smaller inputs
#sample = df_jobs_f1_single[0:10000].copy().reset_index(drop=True)
sample = df_jobs_f1_single
group = df_power.groupby(by="node")
available_nodes = group.groups.keys()
#for debugging
#result = [group.get_group(x[0])["timestamp"].between(x[1], x[2]).value_counts() for x in sample[["node", "start_time", "end_time"]].values.tolist()]
#i know that 0 is node, 1 is start_time, and 2 is end_time
result = [any(group.get_group(x[0])["timestamp"].between(x[1], x[2])) if x[0] in available_nodes else False for x in sample[["node", "start_time", "end_time"]].values.tolist()]
sample["has_profile"] = result
df_jobs_f12_single = sample[sample["has_profile"]==True]
print("Number of jobs after filter 2 - single: ", str(len(df_jobs_f12_single)))
return df_jobs_f12_single
"""
Filter 2: remove jobs with no energy profile
Method for multi-node jobs
"""
def filter2_multi(df_jobs, df_power):
df_jobs_f1_multi = df_jobs[df_jobs["num_nodes"] > 1].copy().reset_index(drop=True)
#removing malformed "nodes"
df_jobs_f1_multi["node"] = [ast.literal_eval(x) for x in df_jobs_f1_multi['nodes']]
#use this for smaller inputs
#sample = df_jobs_f1_multi[0:10].copy().reset_index(drop=True)
sample = df_jobs_f1_multi
group = df_power.groupby(by="node")
available_nodes = set(group.groups.keys())
#for debugging
#result = [group.get_group(x[0])["timestamp"].between(x[1], x[2]).value_counts() for x in sample[["node", "start_time", "end_time"]].values.tolist()]
#i know that 0 is node, 1 is start_time, and 2 is end_time
result = [all([any(group.get_group(y)["timestamp"].between(x[1], x[2])) for y in x[0]]) if set(x[0]).issubset(available_nodes) else False for x in sample[["node", "start_time", "end_time"]].values.tolist()]
sample["has_profile"] = result
df_jobs_f12_multi = sample[sample["has_profile"]==True]
print("Number of jobs after filter 2 - multi: ", str(len(df_jobs_f12_multi)))
return df_jobs_f12_multi
"""
Save intermediate results to csv
"""
def save_results(df_jobs_single, df_jobs_multi, jobfile, metricfile):
jobfile_out = jobfile.rstrip("jobs.parquet")
df_jobs_single.to_csv(jobfile_out+"_filter12_singlenode.csv", index=False)
df_jobs_multi.to_csv(jobfile_out+"_filter12_multinode.csv", index=False)
"""
Run workflow
"""
def run_workflow(metricfile, jobfile):
df_jobs, df_power = read_data(jobfile=jobfile, metricfile=metricfile)
df_jobs, df_power = preprocess_data(df_jobs=df_jobs, df_power=df_power)
df_jobs = filter1(df_jobs)
df_jobs_single = filter2_single(df_jobs, df_power)
df_jobs_multi = filter2_multi(df_jobs, df_power)
save_results(df_jobs_single, df_jobs_multi, jobfile, metricfile)
"""
Read Command line interface
"""
def read_cli():
# Make parser object
p = argparse.ArgumentParser(description='Process ExaData data to extract per-job energy profiles')
p.add_argument("--metricfile", "-m", type=str, required=True,
help="Metric file")
p.add_argument("--jobfile", "-j", type=str, required=True,
help="Job table file")
return(p.parse_args())
def main():
if sys.version_info<(3,5,0):
sys.stderr.write("You need python 3.5 or later to run this script\n")
sys.exit(1)
args = read_cli()
run_workflow(args.metricfile, args.jobfile)
if __name__ == '__main__':
main()
import argparse
import sys
import ast
import multiprocessing
import functools
import numpy as np
import pandas as pd
"""
Read data
Output from process_marconi_jobs_1.py
"""
def read_data(jobfile_single, jobfile_multi, metricfile):
df_jobs_single = pd.read_csv(jobfile_single)
df_jobs_multi = pd.read_csv(jobfile_multi)
## Here i refer to total_power but in reality it can be any metric
df_total_power = pd.read_parquet(metricfile)
df_total_power['node'] = pd.to_numeric(df_total_power['node'])
df_total_power['value'] = pd.to_numeric(df_total_power['value'])
return df_jobs_single, df_jobs_multi, df_total_power
"""
Filter 3: find the jobs that share a node
The idea is to add a new column to `df_total_power`
with a list of jobs that run on each node at each timestamp.
Then we filter jobs that appear sharing with another job
"""
def filter3_1_single(df_jobs_single, df_total_power):
#removing malformed "nodes"
df_jobs_single["node"] = [ast.literal_eval(x)[0] for x in df_jobs_single['nodes']]
#use this for smaller inputs
#TEST_SAMPLE_SIZE = 1000
TEST_SAMPLE_SIZE = len(df_jobs_single)
sample = df_jobs_single[0:TEST_SAMPLE_SIZE].copy().reset_index(drop=True)
#sample = df_jobs_single
group = df_total_power.groupby(by="node")
#for debugging
#result = [group.get_group(x[0])["timestamp"].between(x[1], x[2]).value_counts() for x in sample[["node", "start_time", "end_time"]].values.tolist()]
#i know that x[0] is node, x[1] is start_time, and x[2] is end_time
#each element of the list is a subseries of the total_power of a certain mode
#with "1" at the timestamps where the "job_id" ran
result = [[group.get_group(x[0])["timestamp"].between(x[1], x[2]).astype(int).replace(0, np.nan).dropna().astype(str), x[3]] for x in sample[["node", "start_time", "end_time", "job_id"]].values.tolist()]
# now i replace the "1"s with the "job_id"
#each element of the list is a subseries of the total_power of a certain mode
#with the "job_id" value at the timestamps where the "job_id" ran
result2 = [x[0].replace("1.0", str(x[1])) for x in result]
#print(result2)
#finally i concatenate each of the series by index
# while joining the values where indices overlap
# i hope the indices here are consistent with the indices in df_total_power
#concatenated_series = pd.concat(result2).groupby(level=0).apply(','.join).replace(to_replace=r'^(0,)*0$', value="0", regex=True)
concatenated_series = pd.concat(result2).groupby(level=0).apply(','.join).rename("job_ids")
joined_total_power = df_total_power.merge(concatenated_series, left_index=True, right_index=True, how="inner")
print(joined_total_power)
return joined_total_power
"""
Filter 3: find the jobs that not share a node
Based on filter3_single_1, return the list of job ids that
not shared a node with another job
"""
def filter3_2(joined_total_power):
all_job_ids=[]
_ = pd.Series([[all_job_ids.append(y) for y in x.split(",")] for x in joined_total_power["job_ids"].drop_duplicates().values.tolist()])
all_job_ids = pd.Series(all_job_ids).drop_duplicates()
nodeshare_job_ids = []
_ = pd.Series([[nodeshare_job_ids.append(y) for y in x.split(",") if len(x.split(",")) > 1] for x in joined_total_power["job_ids"].drop_duplicates().values.tolist()])
nodeshare_job_ids = pd.Series(nodeshare_job_ids).drop_duplicates()
exclusive_job_ids = all_job_ids[~all_job_ids.isin(nodeshare_job_ids)]
print("Nodeshare Job ratio: ", len(nodeshare_job_ids)/len(all_job_ids))
print(exclusive_job_ids)
return exclusive_job_ids
"""
Filter 3: find the jobs that not share a node
Getting the profile of the not nodeshared jobs
And details of the exclusive jobs
"""
def filter3_3(df_jobs_single, exclusive_job_ids, joined_total_power):
result = [x for x in joined_total_power.values if np.any(pd.Series(x[3].split(",")).isin(exclusive_job_ids).values) == True]
df_total_power_exclusive_single = pd.DataFrame(result, columns=["timestamp", "value", "node", "job_id"])
df_exclusive_jobs = df_jobs_single[df_jobs_single["job_id"].isin(exclusive_job_ids.astype(int))]
return df_total_power_exclusive_single, df_exclusive_jobs
"""
Filter 3: Define the function to check the condition in parallel
"""
def check_condition_f3(x, exclusive_job_ids=None):
return np.any(pd.Series(x[3].split(",")).isin(exclusive_job_ids).values)
"""
Filter 3: find the jobs that not share a node
Getting the profile of the not nodeshared jobs
And details of the exclusive jobs
Attempt to do this in parallel
"""
def filter3_3_par(df_jobs, exclusive_job_ids, joined_total_power):
# Use multiprocessing.Pool to parallelize the list comprehension
pool = multiprocessing.Pool()
result = pool.map(functools.partial(check_condition_f3, exclusive_job_ids=exclusive_job_ids), joined_total_power.values)
# Filter the values based on the condition
result = [x for x, res in zip(joined_total_power.values, result) if res]
df_total_power_exclusive_single = pd.DataFrame(result, columns=["timestamp", "value", "node", "job_id"])
df_exclusive_jobs = df_jobs[df_jobs["job_id"].isin(exclusive_job_ids.astype(int))]
return df_total_power_exclusive_single, df_exclusive_jobs
"""
Filter 3: find the jobs that share a node
The idea is to add a new column to `df_total_power`
with a list of jobs that run on each node at each timestamp.
Then we filter jobs that appear sharing with another job
Version for multi-node jobs
"""
def filter3_1_multi(df_jobs_multi, df_total_power):
df_jobs_multi["node"] = [ast.literal_eval(x) for x in df_jobs_multi['nodes']]
#use this for smaller inputs
#TEST_SAMPLE_SIZE = 1000
TEST_SAMPLE_SIZE = len(df_jobs_multi)
sample = df_jobs_multi[0:TEST_SAMPLE_SIZE].copy().reset_index(drop=True)
#sample = df_jobs_single
group = df_total_power.groupby(by="node")
#for debugging
#result = [group.get_group(x[0])["timestamp"].between(x[1], x[2]).value_counts() for x in sample[["node", "start_time", "end_time"]].values.tolist()]
#i know that x[0] is the node list, x[1] is start_time, and x[2] is end_time
#each element of the list is a subseries of the total_power of a certain mode
#with "1" at the timestamps where the "job_id" ran
# i replace the "0"s (where the timestamp is not betweenx[1], x[2]) with nans just to remove these values with dropna.
result = [[[group.get_group(y)["timestamp"].between(x[1], x[2]).astype(int).replace(0, np.nan).dropna().astype(str) for y in x[0]], x[3]] for x in sample[["node", "start_time", "end_time", "job_id"]].values.tolist()]
#print(type(group.get_group(512)))
#group.get_group(x[0])
#result[0] = Series with timestamps with True where the job run, and False otherwise
#result[1] = job id
#result.values.to_list()
# now i replace the "1"s with the "job_id"
#each element of the list is a subseries of the total_power of a certain mode
#with the "job_id" value at the timestamps where the "job_id" ran
result2 = [pd.concat([y.replace("1.0", str(x[1])) for y in x[0]]) for x in result]
#finally i concatenate each of the series by index
# while joining the values where indices overlap
# i hope the indices here are consistent with the indices in df_total_power
#concatenated_series = pd.concat(result2).groupby(level=0).apply(','.join).replace(to_replace=r'^(0,)*0$', value="0", regex=True)
concatenated_series = pd.concat(result2).groupby(level=0).apply(','.join).rename("job_ids")
#print(concatenated_series)
# use the below code for the full run
#joined_total_power = df_total_power.merge(concatenated_series, left_index=True, right_index=True, how="left")
#use the below code for a rest run
joined_total_power = df_total_power.merge(concatenated_series, left_index=True, right_index=True, how="inner")
print(joined_total_power)
return joined_total_power
"""
Save results to csv
"""
def save_results(df_exclusive_jobs_single, df_exclusive_jobs_multi, df_total_power_exclusive_single, df_total_power_exclusive_multi, jobfile_single, metricfile):
jobfile_out = jobfile_single.rstrip("_filter12_singlenode.csv")
metricfile_out = metricfile.rstrip("power_total.parquet")
df_exclusive_jobs_single.to_csv(jobfile_out+"_filter123_singlenode.csv", index=False)
df_exclusive_jobs_multi.to_csv(jobfile_out+"_filter123_multinode.csv", index=False)
df_total_power_exclusive_single.to_csv(metricfile_out+"a_0_filter123_singlenode.csv", index=False)
df_total_power_exclusive_multi.to_csv(metricfile_out+"a_0_filter123_multinode.csv", index=False)
"""
Run workflow
"""
def run_workflow(metricfile, jobfile_single, jobfile_multi):
df_jobs_single, df_jobs_multi, df_total_power = read_data(jobfile_single, jobfile_multi, metricfile)
#Single-node jobs workflow
joined_total_power = filter3_1_single(df_jobs_single, df_total_power)
exclusive_job_ids = filter3_2(joined_total_power)
#df_total_power_exclusive_single, df_exclusive_jobs_single = filter3_single_3(df_jobs_single, exclusive_job_ids, joined_total_power)
df_total_power_exclusive_single, df_exclusive_jobs_single = filter3_3_par(df_jobs_single, exclusive_job_ids, joined_total_power)
print(df_total_power_exclusive_single)
print(df_exclusive_jobs_single)
###############################
#Multi-node jobs workflow
joined_total_power = filter3_1_multi(df_jobs_multi, df_total_power)
exclusive_job_ids = filter3_2(joined_total_power)
#df_total_power_exclusive_single, df_exclusive_jobs_single = filter3_single_3(df_jobs_single, exclusive_job_ids, joined_total_power)
df_total_power_exclusive_multi, df_exclusive_jobs_multi = filter3_3_par(df_jobs_multi, exclusive_job_ids, joined_total_power)
print(df_total_power_exclusive_multi)
print(df_exclusive_jobs_multi)
save_results(df_exclusive_jobs_single, df_exclusive_jobs_multi, df_total_power_exclusive_single, df_total_power_exclusive_multi, jobfile_single, metricfile)
###############################
"""
Read Command line interface
"""
def read_cli():
# Make parser object
p = argparse.ArgumentParser(description='Process ExaData data to extract per-job energy profiles')
p.add_argument("--metricfile", "-m", type=str, required=True,
help="Metric file")
p.add_argument("--jobfilesingle", "-js", type=str, required=True,
help="Job table file for single-node jobs (output from process_marconi_jobs_1.py)")
p.add_argument("--jobfilemulti", "-jm", type=str, required=True,
help="Job table file for multi-node jobs (output from process_marconi_jobs_1.py)")
return(p.parse_args())
def main():
if sys.version_info<(3,5,0):
sys.stderr.write("You need python 3.5 or later to run this script\n")
sys.exit(1)
args = read_cli()
run_workflow(args.metricfile, args.jobfilesingle, args.jobfilemulti)
if __name__ == '__main__':
main()
#!/usr/bin/env python3
import sys
import argparse
import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.linear_model import LinearRegression, Lasso, Ridge, SGDRegressor
from sklearn.svm import SVR, LinearSVR
from sklearn.ensemble import HistGradientBoostingRegressor, RandomForestRegressor
from sklearn.pipeline import make_pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split, GridSearchCV
from sklearn.feature_selection import RFE
from sklearn.metrics import r2_score, mean_absolute_percentage_error, mean_absolute_error, mean_squared_error
from itertools import product
"""
Initial variables
"""
## Time constants
NANOSECONDS_ONE_SECOND = 1e9
SEC_ONE_MINUTE=60
SEC_ONE_HOUR=60*SEC_ONE_MINUTE
SEC_ONE_DAY=24*SEC_ONE_HOUR
SEC_ONE_WEEK=7*SEC_ONE_DAY
# Experiments parameters
## values for job submission time difference penalization
POWER_FACTORS={"sublinear": 0.5,
"linear": 1.0,
"quadratic": 2.0}
## Predict using only jobs with the same nb of resources
SAME_NB_RESOURCES={"yes": True,
"no": False}
## List of models used in the regression
MODELS=[
{'key':'LinearRegression', 'value':LinearRegression, 'kwargs': {'n_jobs' : -1}},
{'key':'RandomForestRegressor', 'value':RandomForestRegressor, 'kwargs': {'n_jobs' : -1}},
{'key':'LinearSVR', 'value':LinearSVR, 'kwargs': {'max_iter' : 100000}},
{'key':'SGDRegressor', 'value':SGDRegressor, 'kwargs': {'max_iter' : 100000}},
]
MODEL_NAMES=['LinearRegression','RandomForestRegressor','LinearSVR','SGDRegressor']
## train model only if the user's number of jobs is at least MIN_NB_JOBS
MIN_NB_JOBS=[10]
## Further reduce train set size (used for testing, default must be 0.0)
## For info: regression train/test split is 0.66 train and 0.33 test
#PERCENTAGE_TRAINING_DATA=[0.75,0.5,0.25,0.0]
PERCENTAGE_TRAINING_DATA=[0.0]
## Columns for regression training experiments parameters
EXP_REG_COLUMNS=['min_reg_nb_jobs', 'model', 'percentage_training_data']
## Columns for historical prediction experiments parameters
EXP_HIST_COLUMNS=['power_factor', 'same_nb_resources']
## Job Features
# I could add "sime_limit_str" and "tres_per_node" but this needs more preprocessing
FEATURE_LABELS=['num_cpus', 'num_nodes', 'submission_date_hour', "walltime"]
#FEATURE_LABELS=['num_cpus','submission_date_hour', 'nb_sockets']
## Lag features used in regression
LAG_FEATURE_LABELS=['lag_total_power_mean', 'lag_total_power_std', 'lag_total_power_prev']
## The value we want to predict
TARGET_LABEL='total_power_max_watts'
SVR_TO_LINEARSVR_THRESHOLD = 10000
# Define the parameter grid
param_grid = {
'SVR':{
'svr__C': np.logspace(-3, 3, 7),
'svr__gamma': np.logspace(-3, 3, 7),
'rfe__n_features_to_select': list(range(1,len(FEATURE_LABELS)+len(LAG_FEATURE_LABELS)))
},
'LinearSVR': {
'linearsvr__C': np.logspace(-3, 3, 7), # Use 'linearsvr__C' for LinearSVR
'rfe__n_features_to_select': list(range(1,len(FEATURE_LABELS)+len(LAG_FEATURE_LABELS)))
},
'RandomForestRegressor': {
'randomforestregressor__n_estimators': [10, 50, 100, 200],
'randomforestregressor__max_depth': [None, 10, 20, 30],
'rfe__n_features_to_select': list(range(1,len(FEATURE_LABELS)+len(LAG_FEATURE_LABELS)))
},
'LinearRegression': {
#'linearregression__': {},
'rfe__n_features_to_select': list(range(1,len(FEATURE_LABELS)+len(LAG_FEATURE_LABELS)))
},
'SGDRegressor': {
'sgdregressor__alpha': np.logspace(-3, 3, 7),
'rfe__n_features_to_select': list(range(1,len(FEATURE_LABELS)+len(LAG_FEATURE_LABELS)))
}
}
## Create SKlearn pipeline according to the model
def create_pipeline(model_conf):
model_obj = model_conf["value"](**model_conf["kwargs"])
pipeline = make_pipeline(StandardScaler(), RFE(LinearRegression(n_jobs=-1)), model_obj)
return pipeline
# Function to convert time strings to total seconds
def time_str_to_seconds(time_str):
#print(time_str)
components = time_str.split('-')
if len(components) == 1: # No days component
time_parts = time_str.split(':')
if len(time_parts) == 1: # Format M:SS
time_str = "0:" + time_str
elif len(time_parts) == 2: # Format MM:SS
time_str = "00:" + time_str
return pd.to_timedelta(time_str).total_seconds()
elif len(components) == 2: # Format DD-HH:MM:SS
days, time_part = components
time_seconds = pd.to_timedelta(time_part).total_seconds()
return int(days) * 24 * 3600 + time_seconds
else:
return 180 * SEC_ONE_DAY # Handle infinite = 180 days = 6 months
"""
Read and prepare data to use in the prediction
"""
def read_data(jobfile, user):
## Holds the first job of each user (key=user_id)
dct_users_first_job_timestamp={}
## Holds a dataframe of jobs for each user (key=user_id)
jobs_users={}
## Read jobfile
df_jobs = pd.read_csv(jobfile, low_memory=False)
## Convert datetime columns to integer timestamps in seconds
df_jobs["submit_time"] = pd.to_datetime(df_jobs['submit_time']).astype(int) / NANOSECONDS_ONE_SECOND
df_jobs["start_time"] = pd.to_datetime(df_jobs['start_time']).astype(int) / NANOSECONDS_ONE_SECOND
df_jobs["end_time"] = pd.to_datetime(df_jobs['end_time']).astype(int) / NANOSECONDS_ONE_SECOND
df_jobs["walltime"] = df_jobs['time_limit_str'].apply(time_str_to_seconds)
lst_users = [user]
## Sort jobs by submission time
df_jobs=df_jobs.sort_values(by='submit_time').reset_index(drop=True)
## Get (back) datetime objects
df_jobs['submission_datetime'] = pd.to_datetime(df_jobs['submit_time'], unit='s')
df_jobs['submission_date_hour'] = pd.DatetimeIndex(df_jobs['submission_datetime']).hour
## Initialize new columns
df_jobs['user_submission_week']=np.nan
df_jobs['lag_total_power_mean']=np.nan
df_jobs['lag_total_power_std']=np.nan
df_jobs['lag_total_power_prev']=np.nan
df_jobs['hist_pred_total_power_max']=np.nan
#df_jobs['reg_pred_total_power_mean']=np.nan
col_names = [model_name+"_"+TARGET_LABEL for model_name in MODEL_NAMES]
_ = [df_jobs.assign(x=np.nan) for x in col_names]
## Populate jobs_users and dct_users_first_job_timestamp
## Also computing the week number of the job submission
for user in lst_users:
jobs_user = df_jobs.loc[df_jobs['user_id']==user].copy().sort_values(by='submit_time').reset_index()
dct_users_first_job_timestamp[user] = jobs_user.head(1)['submit_time'].values[0]
first_job_timestamp = dct_users_first_job_timestamp[user]
jobs_user['user_submission_week'] = (jobs_user['submit_time']-first_job_timestamp)//SEC_ONE_WEEK
jobs_users[user]=jobs_user
#print(user, len(jobs_users[user]))
#print(jobs_user)
#break
return lst_users, jobs_users, dct_users_first_job_timestamp
"""
Run prediction experiments
"""
def run_experiments(hist_design,
reg_design,
jobs_users,
dct_users_jobs,
dct_users_first_job_timestamp,
lst_results,
lst_predicted_jobs,
user=None):
## Initialize the dict for the week
dct_users_jobs[user] = {}
## Get the jobs dataframe of the user
jobs_user = jobs_users[user]
## Get the list of weeks that user submitted jobs (e.g., [0.0, 1.0, 3.0])
jobs_weeks=jobs_user['user_submission_week'].drop_duplicates().sort_values().to_list()
## Get te timestamp of the user's first job
first_job_timestamp = dct_users_first_job_timestamp[user]
## Run the prediction routine for each week the user submitted jobs
for week in jobs_weeks:
## Get the dataframe of user's jobs at week
user_jobs_week = jobs_user.loc[jobs_user['user_submission_week']==week].copy()
## for each job in week
for index, job in user_jobs_week.iterrows():
if index == 0:
continue
else:
##################################################################
### Weighted Mean of up previous finished jobs in the jobs history
### add the mean power of previous user's jobs
### (weighted history approach) as features
##################################################################
## Submission timestamp of the job
job_subm_time = job['submit_time']
## Timestamp where the history will start
history_start = first_job_timestamp
## History window size (start counting since the first user job submission)
history_size = job_subm_time - history_start
## Get only jobs that finished before the row submission time
prev_jobs = jobs_user.loc[jobs_user['end_time']<=job_subm_time].copy().sort_values(by='end_time').reset_index(drop=True)
## Further filter jobs if same_nb_resources (evals to False by default)
if hist_design['same_nb_resources'].values[0] == True:
prev_jobs=prev_jobs.loc[prev_jobs['nb_resources']==job['nb_resources']]
if len(prev_jobs) < 1:
## No job has finished before job_subm_time
continue
else:
## Compute the standard deviation of the jobs in the history (lag feature)
user_jobs_week.loc[index, 'lag_total_power_std']=prev_jobs[TARGET_LABEL].std()
## Assign weights for the previous jobs in the history according to the power_factor
prev_jobs['weight'] = 1 - ((job_subm_time - prev_jobs['end_time'])/history_size)
prev_jobs['weight'] = prev_jobs['weight'].pow(hist_design['power_factor'].values[0])
## Compute the mean of the jobs in the history (lag feature)
user_jobs_week.loc[index,'lag_total_power_mean'] = (prev_jobs[TARGET_LABEL]*prev_jobs['weight']).sum()/prev_jobs['weight'].sum()
## Compute the mean of the jobs in the history (historical prediction result)
user_jobs_week.loc[index,'hist_pred_total_power_max'] = user_jobs_week.loc[index,'lag_total_power_mean']
## Get the power metric of the most recent user job (lag feature, not used by default)
user_jobs_week.loc[index,'lag_total_power_prev']=prev_jobs.iloc[-1][TARGET_LABEL]
## Add the user's week jobs in the dict
dct_users_jobs[user][week]=user_jobs_week
prev_weeks=[]
####################################################################################
### Regression multi-model power prediction
### For each week >= 1 use the first 2/3 of the user's finished job history as train
### and the remaining 1/3 as test
### Run regression fitting for each of the models in MODELS
### Select the best regression model regarding the test mean squared error (MSE)
### Compare with the MSE of the historic prediction on the test
### For week+1 use the model (regression or historic) with the lowest MSE
####################################################################################
for i, week in enumerate(jobs_weeks):
## We skip the first week for regression
if i == 0:
prev_weeks.append(week)
continue
## Get the user's jobs at current week (only jobs with historic data prediction)
user_jobs_week=dct_users_jobs[user][week]
## Get all previous jobs for regression fitting
model_jobs_user=pd.concat([dct_users_jobs[user][w] for w in prev_weeks]).dropna(subset=LAG_FEATURE_LABELS)
## Add week in the list of previous weeks to get data
### This will be used at the next week iteration
prev_weeks.append(week)
##DEBUG
#print(len(model_jobs_user))
regression=False
## Iterate over the experiments parameters (which vary the models)
for index, exp in reg_design.iterrows():
## Only do regression training if the number of jobs to train > min_reg_nb_jobs (10 by default)
if len(model_jobs_user) >= exp['min_reg_nb_jobs']:
regression=True
## Append result data to the experiments parameters dict
dct_result = exp.to_dict()
dct_result['user_id']=user
dct_result['user_week']=week
dct_result['hist_power_factor']=hist_design['power_factor'].values[0]
dct_result['hist_same_nb_resources']=hist_design['same_nb_resources'].values[0]
## Select features and target label
X=model_jobs_user[FEATURE_LABELS+LAG_FEATURE_LABELS]
y=model_jobs_user[TARGET_LABEL]
## Train test split
## Test split is important if we want to switch between algortihms at each week
#X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.1, shuffle=False)
## With this we only evaluate the training performance.
## This is ok, since X, y are jobs from previous weeks.
## We "test" the models at the current week, which is not present in X, y
X_train, y_train = X, y
## exp['percentage_training_data'] evals to 0 -> nothing changes
X_train = X_train.iloc[int(len(X_train)*exp['percentage_training_data']):len(X_train)]
y_train = y_train.iloc[int(len(y_train)*exp['percentage_training_data']):len(y_train)]
## Create an ML pipeline of standard scaling -> model and then train the model
pipeline = create_pipeline(exp["model"])
#print("Week ", week, "Model: ", model, "Train Size: ", len(X_train))
reg = GridSearchCV(pipeline, param_grid[exp["model"]["key"]], cv=5, scoring='neg_mean_squared_error', n_jobs=-1).fit(X_train, y_train)
## Append result data to the experiments parameters dict
dct_result['model_name']=exp['model']['key']
best_predictor = reg.best_estimator_
dct_result['model_object']=best_predictor
## Predict on the test dataset
test_predictions=best_predictor.predict(X_train)
## Append result data to the experiments parameters dict
dct_result['model']=exp['model']['key']
# Access the selected features using the RFE estimator from the best model
best_rfe = reg.best_estimator_.named_steps['rfe']
selected_features = np.where(best_rfe.support_)[0]
cols = FEATURE_LABELS+LAG_FEATURE_LABELS
feature_names = [cols[x] for x in selected_features]
dct_result['features']=feature_names
#print("User: ", user, "Week ", week, "Model: ", exp["model"]["key"], "Features: ", feature_names)
dct_result['reg_total_power_max_mape']=mean_absolute_percentage_error(y_train, test_predictions)
dct_result['reg_total_power_max_mae']=mean_absolute_error(y_train, test_predictions)
dct_result['reg_total_power_max_mse']=mean_squared_error(y_train, test_predictions)
print("User: ", user, "Week ", week, "Model: ", exp["model"]["key"], "Features: ", feature_names, "MAPE: ", dct_result['reg_total_power_max_mape'], "MAE: ", dct_result['reg_total_power_max_mae'], "MSE: ", dct_result['reg_total_power_max_mse'])
### Results of the historic data power prediction
dct_result['hist_total_power_max_mape']=mean_absolute_percentage_error(y_train, X_train['lag_total_power_mean'])
dct_result['hist_total_power_max_mse']=mean_squared_error(y_train, X_train['lag_total_power_mean'])
## Append to the global list of training results
lst_results.append(dct_result)
## If regression training was performed using prev_weeks, select the best model for week
if regression == True:
## Get the week prediction results from the global list lst_results
df_week_results=pd.DataFrame(lst_results)
df_week_results=df_week_results.loc[df_week_results['user_week']==week]
#print(df_week_results)
## Predict for week using all models
for _, result in df_week_results.iterrows():
model_name = result['model_name']
model_object = result['model_object']
col_label = model_name+"_"+TARGET_LABEL
user_jobs_week[col_label]=model_object.predict(user_jobs_week[FEATURE_LABELS+LAG_FEATURE_LABELS])
lst_predicted_jobs.append(user_jobs_week)
return lst_predicted_jobs
"""
Start prediction routine
"""
def predict_jobs(lst_users, jobs_users, dct_users_first_job_timestamp):
# Construct the design of experiments for the prediction
## Historical prediction design
## (POWER_FACTORS["quadratic"], SAME_NB_RESOURCES["no"]) evals to (2.0, False)
design_hist_pred=[(POWER_FACTORS["quadratic"], SAME_NB_RESOURCES["no"])]
## Regression design
## MIN_NB_JOBS, MODELS, PERCENTAGE_TRAINING_DATA evals to [10], MODELS, [0.0]
## I switch between SVR and LinearSVR if the training dataset is larger than 10k, so MODELS are temporarily ignored
design_reg_pred=list(product(MIN_NB_JOBS, MODELS, PERCENTAGE_TRAINING_DATA))
df_hist_design=pd.DataFrame(design_hist_pred, columns=EXP_HIST_COLUMNS)
df_reg_design=pd.DataFrame(design_reg_pred, columns=EXP_REG_COLUMNS)
## Holds the list of regression results (TO VERIFY)
lst_results=[]
## Holds the list of predicted jobs
## Each position is a dataframe of predicted jobs of a user
lst_predicted_jobs=[]
## Holds the weekly job submission for each user
## Key is [user][week]
dct_users_jobs={}
#for index, exp in df_ff_design.iterrows():
for user in lst_users:
#print('user: ', user)
dct_users_jobs[user] = {}
lst_predicted_jobs = run_experiments(df_hist_design,
df_reg_design,
jobs_users,
dct_users_jobs,
dct_users_first_job_timestamp,
lst_results,
lst_predicted_jobs,
user=user)
#print(str(index) + ' of ' + str(len(df_ff_design)), end='\r')
df_predicted_jobs = []
if lst_predicted_jobs:
df_predicted_jobs=pd.concat(lst_predicted_jobs)
if len(df_predicted_jobs) != 0:
df_predicted_jobs=df_predicted_jobs.dropna(subset=['total_power_max_watts','hist_pred_total_power_max'])
## Print results stats, no longer needed
"""
for user in lst_users:
#print('user: ', user)
df_predicted_user=df_predicted_jobs.loc[df_predicted_jobs['user_id']==user]
print('number of predicted jobs: ', len(df_predicted_user))
if len(df_predicted_user) == 0:
continue
df_predicted_user_hist = df_predicted_user.loc[df_predicted_user['pred_method'] == 'history']
if(len(df_predicted_user_hist) > 0):
print('historic MSE when used only: ', mean_squared_error(df_predicted_user['total_power_mean_watts'], df_predicted_user['hist_pred_total_power_mean']))
print('historic MAPE when used only: ', mean_absolute_percentage_error(df_predicted_user['total_power_mean_watts'], df_predicted_user['hist_pred_total_power_mean']))
print('historic MSE when selected: ', mean_squared_error(df_predicted_user_hist['total_power_mean_watts'], df_predicted_user_hist['hist_pred_total_power_mean']))
#df_predicted_jobs_notnan=df_predicted_user.dropna(subset='reg_pred_pp0_mean')
df_predicted_user_reg = df_predicted_user.loc[df_predicted_user['pred_method'] == 'regression']
if(len(df_predicted_user_reg) > 0):
print('regression MSE when selected: ', mean_squared_error(df_predicted_user_reg['total_power_mean_watts'], df_predicted_user_reg['reg_pred_total_power_mean']))
print('final MSE: ', mean_squared_error(df_predicted_user['total_power_mean_watts'], df_predicted_user['final_pred_total_power_mean']))
print('final MAPE: ', mean_absolute_percentage_error(df_predicted_user['total_power_mean_watts'], df_predicted_user['final_pred_total_power_mean']))
print('final MAE: ', mean_absolute_error(df_predicted_user['total_power_mean_watts'], df_predicted_user['final_pred_total_power_mean']))
"""
return df_predicted_jobs
"""
Save predicted jobs data to csv
"""
def save_results(df_predicted_jobs, outputfile):
if len(df_predicted_jobs) == 0:
return
else:
df_predicted_jobs.to_csv(outputfile, index=False)
"""
Run workflow
"""
def run_workflow(jobfile, outputfile, user):
lst_users, jobs_users, dct_users_first_job_timestamp = read_data(jobfile, user)
df_predicted_jobs = predict_jobs(lst_users, jobs_users, dct_users_first_job_timestamp)
save_results(df_predicted_jobs, outputfile)
"""
Read Command line interface
"""
def read_cli():
# Make parser object
p = argparse.ArgumentParser(description='Run the sophisticated power prediction method')
p.add_argument("--jobfile", "-j", type=str, required=True,
help="Job table file with power aggregation metrics (output from marconi_jobs_extract_power_metrics.py)")
p.add_argument("--outputfile", "-o", type=str, required=True,
help="Output file name (optionally with global path)")
p.add_argument("--user", "-u", type=int, required=True,
help="User ID")
return(p.parse_args())
def main():
if sys.version_info<(3,5,0):
sys.stderr.write("You need python 3.5 or later to run this script\n")
sys.exit(1)
args = read_cli()
run_workflow(args.jobfile, args.outputfile, args.user)
if __name__ == '__main__':
main()
#!/usr/bin/env python3
import sys
import argparse
import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.linear_model import LinearRegression, Lasso, Ridge, SGDRegressor
from sklearn.svm import SVR, LinearSVR
from sklearn.ensemble import HistGradientBoostingRegressor, RandomForestRegressor
from sklearn.pipeline import make_pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split, GridSearchCV
from sklearn.feature_selection import RFE
from sklearn.metrics import r2_score, mean_absolute_percentage_error, mean_absolute_error, mean_squared_error
from itertools import product
"""
Initial variables
"""
## Time constants
NANOSECONDS_ONE_SECOND = 1e9
SEC_ONE_MINUTE=60
SEC_ONE_HOUR=60*SEC_ONE_MINUTE
SEC_ONE_DAY=24*SEC_ONE_HOUR
SEC_ONE_WEEK=7*SEC_ONE_DAY
# Experiments parameters
## values for job submission time difference penalization
POWER_FACTORS={"sublinear": 0.5,
"linear": 1.0,
"quadratic": 2.0}
## Predict using only jobs with the same nb of resources
SAME_NB_RESOURCES={"yes": True,
"no": False}
## List of models used in the regression
MODELS=[
{'key':'LinearRegression', 'value':LinearRegression, 'kwargs': {'n_jobs' : -1}},
{'key':'RandomForestRegressor', 'value':RandomForestRegressor, 'kwargs': {'n_jobs' : -1}},
{'key':'LinearSVR', 'value':LinearSVR, 'kwargs': {'max_iter' : 100000}},
{'key':'SGDRegressor', 'value':SGDRegressor, 'kwargs': {'max_iter' : 100000}},
]
MODEL_NAMES=['LinearRegression','RandomForestRegressor','LinearSVR','SGDRegressor']
## train model only if the user's number of jobs is at least MIN_NB_JOBS
MIN_NB_JOBS=[10]
## Further reduce train set size (used for testing, default must be 0.0)
## For info: regression train/test split is 0.66 train and 0.33 test
#PERCENTAGE_TRAINING_DATA=[0.75,0.5,0.25,0.0]
PERCENTAGE_TRAINING_DATA=[0.0]
## Columns for regression training experiments parameters
EXP_REG_COLUMNS=['min_reg_nb_jobs', 'model', 'percentage_training_data']
## Columns for historical prediction experiments parameters
EXP_HIST_COLUMNS=['power_factor', 'same_nb_resources']
## Job Features
# I could add "sime_limit_str" and "tres_per_node" but this needs more preprocessing
FEATURE_LABELS=['num_cpus', 'num_nodes', 'submission_date_hour', "walltime"]
#FEATURE_LABELS=['num_cpus','submission_date_hour', 'nb_sockets']
## Lag features used in regression
LAG_FEATURE_LABELS=['lag_total_power_mean', 'lag_total_power_std', 'lag_total_power_prev']
## The value we want to predict
TARGET_LABEL='total_power_mean_watts'
SVR_TO_LINEARSVR_THRESHOLD = 10000
# Define the parameter grid
param_grid = {
'SVR':{
'svr__C': np.logspace(-3, 3, 7),
'svr__gamma': np.logspace(-3, 3, 7),
'rfe__n_features_to_select': list(range(1,len(FEATURE_LABELS)+len(LAG_FEATURE_LABELS)))
},
'LinearSVR': {
'linearsvr__C': np.logspace(-3, 3, 7), # Use 'linearsvr__C' for LinearSVR
'rfe__n_features_to_select': list(range(1,len(FEATURE_LABELS)+len(LAG_FEATURE_LABELS)))
},
'RandomForestRegressor': {
'randomforestregressor__n_estimators': [10, 50, 100, 200],
'randomforestregressor__max_depth': [None, 10, 20, 30],
'rfe__n_features_to_select': list(range(1,len(FEATURE_LABELS)+len(LAG_FEATURE_LABELS)))
},
'LinearRegression': {
#'linearregression__': {},
'rfe__n_features_to_select': list(range(1,len(FEATURE_LABELS)+len(LAG_FEATURE_LABELS)))
},
'SGDRegressor': {
'sgdregressor__alpha': np.logspace(-3, 3, 7),
'rfe__n_features_to_select': list(range(1,len(FEATURE_LABELS)+len(LAG_FEATURE_LABELS)))
}
}
## Create SKlearn pipeline according to the model
def create_pipeline(model_conf):
model_obj = model_conf["value"](**model_conf["kwargs"])
pipeline = make_pipeline(StandardScaler(), RFE(LinearRegression(n_jobs=-1)), model_obj)
return pipeline
# Function to convert time strings to total seconds
def time_str_to_seconds(time_str):
#print(time_str)
components = time_str.split('-')
if len(components) == 1: # No days component
time_parts = time_str.split(':')
if len(time_parts) == 1: # Format M:SS
time_str = "0:" + time_str
elif len(time_parts) == 2: # Format MM:SS
time_str = "00:" + time_str
return pd.to_timedelta(time_str).total_seconds()
elif len(components) == 2: # Format DD-HH:MM:SS
days, time_part = components
time_seconds = pd.to_timedelta(time_part).total_seconds()
return int(days) * 24 * 3600 + time_seconds
else:
return 180 * SEC_ONE_DAY # Handle infinite = 180 days = 6 months
"""
Read and prepare data to use in the prediction
"""
def read_data(jobfile, user):
## Holds the first job of each user (key=user_id)
dct_users_first_job_timestamp={}
## Holds a dataframe of jobs for each user (key=user_id)
jobs_users={}
## Read jobfile
df_jobs = pd.read_csv(jobfile, low_memory=False)
## Convert datetime columns to integer timestamps in seconds
df_jobs["submit_time"] = pd.to_datetime(df_jobs['submit_time']).astype(int) / NANOSECONDS_ONE_SECOND
df_jobs["start_time"] = pd.to_datetime(df_jobs['start_time']).astype(int) / NANOSECONDS_ONE_SECOND
df_jobs["end_time"] = pd.to_datetime(df_jobs['end_time']).astype(int) / NANOSECONDS_ONE_SECOND
df_jobs["walltime"] = df_jobs['time_limit_str'].apply(time_str_to_seconds)
lst_users = [user]
## Sort jobs by submission time
df_jobs=df_jobs.sort_values(by='submit_time').reset_index(drop=True)
## Get (back) datetime objects
df_jobs['submission_datetime'] = pd.to_datetime(df_jobs['submit_time'], unit='s')
df_jobs['submission_date_hour'] = pd.DatetimeIndex(df_jobs['submission_datetime']).hour
## Initialize new columns
df_jobs['user_submission_week']=np.nan
df_jobs['lag_total_power_mean']=np.nan
df_jobs['lag_total_power_std']=np.nan
df_jobs['lag_total_power_prev']=np.nan
df_jobs['hist_pred_total_power_mean']=np.nan
#df_jobs['reg_pred_total_power_mean']=np.nan
col_names = [model_name+"_"+TARGET_LABEL for model_name in MODEL_NAMES]
_ = [df_jobs.assign(x=np.nan) for x in col_names]
## Populate jobs_users and dct_users_first_job_timestamp
## Also computing the week number of the job submission
for user in lst_users:
jobs_user = df_jobs.loc[df_jobs['user_id']==user].copy().sort_values(by='submit_time').reset_index()
dct_users_first_job_timestamp[user] = jobs_user.head(1)['submit_time'].values[0]
first_job_timestamp = dct_users_first_job_timestamp[user]
jobs_user['user_submission_week'] = (jobs_user['submit_time']-first_job_timestamp)//SEC_ONE_WEEK
jobs_users[user]=jobs_user
#print(user, len(jobs_users[user]))
#print(jobs_user)
#break
return lst_users, jobs_users, dct_users_first_job_timestamp
"""
Run prediction experiments
"""
def run_experiments(hist_design,
reg_design,
jobs_users,
dct_users_jobs,
dct_users_first_job_timestamp,
lst_results,
lst_predicted_jobs,
user=None):
## Initialize the dict for the week
dct_users_jobs[user] = {}
## Get the jobs dataframe of the user
jobs_user = jobs_users[user]
## Get the list of weeks that user submitted jobs (e.g., [0.0, 1.0, 3.0])
jobs_weeks=jobs_user['user_submission_week'].drop_duplicates().sort_values().to_list()
## Get te timestamp of the user's first job
first_job_timestamp = dct_users_first_job_timestamp[user]
## Run the prediction routine for each week the user submitted jobs
for week in jobs_weeks:
## Get the dataframe of user's jobs at week
user_jobs_week = jobs_user.loc[jobs_user['user_submission_week']==week].copy()
## for each job in week
for index, job in user_jobs_week.iterrows():
if index == 0:
continue
else:
##################################################################
### Weighted Mean of up previous finished jobs in the jobs history
### add the mean power of previous user's jobs
### (weighted history approach) as features
##################################################################
## Submission timestamp of the job
job_subm_time = job['submit_time']
## Timestamp where the history will start
history_start = first_job_timestamp
## History window size (start counting since the first user job submission)
history_size = job_subm_time - history_start
## Get only jobs that finished before the row submission time
prev_jobs = jobs_user.loc[jobs_user['end_time']<=job_subm_time].copy().sort_values(by='end_time').reset_index(drop=True)
## Further filter jobs if same_nb_resources (evals to False by default)
if hist_design['same_nb_resources'].values[0] == True:
prev_jobs=prev_jobs.loc[prev_jobs['nb_resources']==job['nb_resources']]
if len(prev_jobs) < 1:
## No job has finished before job_subm_time
continue
else:
## Compute the standard deviation of the jobs in the history (lag feature)
user_jobs_week.loc[index, 'lag_total_power_std']=prev_jobs['total_power_mean_watts'].std()
## Assign weights for the previous jobs in the history according to the power_factor
prev_jobs['weight'] = 1 - ((job_subm_time - prev_jobs['end_time'])/history_size)
prev_jobs['weight'] = prev_jobs['weight'].pow(hist_design['power_factor'].values[0])
## Compute the mean of the jobs in the history (lag feature)
user_jobs_week.loc[index,'lag_total_power_mean'] = (prev_jobs['total_power_mean_watts']*prev_jobs['weight']).sum()/prev_jobs['weight'].sum()
## Compute the mean of the jobs in the history (historical prediction result)
user_jobs_week.loc[index,'hist_pred_total_power_mean'] = user_jobs_week.loc[index,'lag_total_power_mean']
## Get the power metric of the most recent user job (lag feature, not used by default)
user_jobs_week.loc[index,'lag_total_power_prev']=prev_jobs.iloc[-1]['total_power_mean_watts']
## Add the user's week jobs in the dict
dct_users_jobs[user][week]=user_jobs_week
prev_weeks=[]
####################################################################################
### Regression multi-model power prediction
### For each week >= 1 use the first 2/3 of the user's finished job history as train
### and the remaining 1/3 as test
### Run regression fitting for each of the models in MODELS
### Select the best regression model regarding the test mean squared error (MSE)
### Compare with the MSE of the historic prediction on the test
### For week+1 use the model (regression or historic) with the lowest MSE
####################################################################################
for i, week in enumerate(jobs_weeks):
## We skip the first week for regression
if i == 0:
prev_weeks.append(week)
continue
## Get the user's jobs at current week (only jobs with historic data prediction)
user_jobs_week=dct_users_jobs[user][week]
## Get all previous jobs for regression fitting
model_jobs_user=pd.concat([dct_users_jobs[user][w] for w in prev_weeks]).dropna(subset=LAG_FEATURE_LABELS)
## Add week in the list of previous weeks to get data
### This will be used at the next week iteration
prev_weeks.append(week)
##DEBUG
#print(len(model_jobs_user))
regression=False
## Iterate over the experiments parameters (which vary the models)
for index, exp in reg_design.iterrows():
## Only do regression training if the number of jobs to train > min_reg_nb_jobs (10 by default)
if len(model_jobs_user) >= exp['min_reg_nb_jobs']:
regression=True
## Append result data to the experiments parameters dict
dct_result = exp.to_dict()
dct_result['user_id']=user
dct_result['user_week']=week
dct_result['hist_power_factor']=hist_design['power_factor'].values[0]
dct_result['hist_same_nb_resources']=hist_design['same_nb_resources'].values[0]
## Select features and target label
X=model_jobs_user[FEATURE_LABELS+LAG_FEATURE_LABELS]
y=model_jobs_user[TARGET_LABEL]
## Train test split
## Test split is important if we want to switch between algortihms at each week
#X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.1, shuffle=False)
## With this we only evaluate the training performance.
## This is ok, since X, y are jobs from previous weeks.
## We "test" the models at the current week, which is not present in X, y
X_train, y_train = X, y
## exp['percentage_training_data'] evals to 0 -> nothing changes
X_train = X_train.iloc[int(len(X_train)*exp['percentage_training_data']):len(X_train)]
y_train = y_train.iloc[int(len(y_train)*exp['percentage_training_data']):len(y_train)]
## Create an ML pipeline of standard scaling -> model and then train the model
pipeline = create_pipeline(exp["model"])
#print("Week ", week, "Model: ", model, "Train Size: ", len(X_train))
reg = GridSearchCV(pipeline, param_grid[exp["model"]["key"]], cv=5, scoring='neg_mean_squared_error', n_jobs=-1).fit(X_train, y_train)
## Append result data to the experiments parameters dict
dct_result['model_name']=exp['model']['key']
#dct_result['model_name']=model
best_predictor = reg.best_estimator_
dct_result['model_object']=best_predictor
## Predict on the train dataset
test_predictions=best_predictor.predict(X_train)
## Append result data to the experiments parameters dict
dct_result['model']=exp['model']['key']
# Access the selected features using the RFE estimator from the best model
best_rfe = reg.best_estimator_.named_steps['rfe']
selected_features = np.where(best_rfe.support_)[0]
cols = FEATURE_LABELS+LAG_FEATURE_LABELS
feature_names = [cols[x] for x in selected_features]
dct_result['features']=feature_names
#print("User: ", user, "Week ", week, "Model: ", exp["model"]["key"], "Features: ", feature_names)
dct_result['reg_total_power_mean_mape']=mean_absolute_percentage_error(y_train, test_predictions)
dct_result['reg_total_power_mean_mae']=mean_absolute_error(y_train, test_predictions)
dct_result['reg_total_power_mean_mse']=mean_squared_error(y_train, test_predictions)
print("User: ", user, "Week ", week, "Model: ", exp["model"]["key"], "Features: ", feature_names, "MAPE: ", dct_result['reg_total_power_mean_mape'], "MAE: ", dct_result['reg_total_power_mean_mae'], "MSE: ", dct_result['reg_total_power_mean_mse'])
### Results of the historic data power prediction
dct_result['hist_total_power_mean_mape']=mean_absolute_percentage_error(y_train, X_train['lag_total_power_mean'])
dct_result['hist_total_power_mean_mae']=mean_absolute_error(y_train, X_train['lag_total_power_mean'])
dct_result['hist_total_power_mean_mse']=mean_squared_error(y_train, X_train['lag_total_power_mean'])
#print("User: ", user, "Week ", week, "Model: ", "History", "MAPE: ", dct_result['hist_total_power_mean_mape'], "MAE: ", dct_result['hist_total_power_mean_mae'], "MSE: ", dct_result['hist_total_power_mean_mse'])
## Append to the global list of training results
lst_results.append(dct_result)
## If regression training was performed using prev_weeks, select the best model for week
if regression == True:
## Get the week prediction results from the global list lst_results
df_week_results=pd.DataFrame(lst_results)
df_week_results=df_week_results.loc[df_week_results['user_week']==week]
#print(df_week_results)
## Predict for week using all models
for _, result in df_week_results.iterrows():
model_name = result['model_name']
model_object = result['model_object']
col_label = model_name+"_"+TARGET_LABEL
user_jobs_week[col_label]=model_object.predict(user_jobs_week[FEATURE_LABELS+LAG_FEATURE_LABELS])
lst_predicted_jobs.append(user_jobs_week)
return lst_predicted_jobs
"""
Start prediction routine
"""
def predict_jobs(lst_users, jobs_users, dct_users_first_job_timestamp):
# Construct the design of experiments for the prediction
## Historical prediction design
## (POWER_FACTORS["quadratic"], SAME_NB_RESOURCES["no"]) evals to (2.0, False)
design_hist_pred=[(POWER_FACTORS["quadratic"], SAME_NB_RESOURCES["no"])]
## Regression design
## MIN_NB_JOBS, MODELS, PERCENTAGE_TRAINING_DATA evals to [10], MODELS, [0.0]
## I switch between SVR and LinearSVR if the training dataset is larger than 10k, so MODELS are temporarily ignored
design_reg_pred=list(product(MIN_NB_JOBS, MODELS, PERCENTAGE_TRAINING_DATA))
df_hist_design=pd.DataFrame(design_hist_pred, columns=EXP_HIST_COLUMNS)
df_reg_design=pd.DataFrame(design_reg_pred, columns=EXP_REG_COLUMNS)
#print(len(df_ff_design))
## Holds the list of regression results (TO VERIFY)
lst_results=[]
## Holds the list of predicted jobs
## Each position is a dataframe of predicted jobs of a user
lst_predicted_jobs=[]
## Holds the weekly job submission for each user
## Key is [user][week]
dct_users_jobs={}
#for index, exp in df_ff_design.iterrows():
for user in lst_users:
#print('user: ', user)
dct_users_jobs[user] = {}
lst_predicted_jobs = run_experiments(df_hist_design,
df_reg_design,
jobs_users,
dct_users_jobs,
dct_users_first_job_timestamp,
lst_results,
lst_predicted_jobs,
user=user)
#print(str(index) + ' of ' + str(len(df_ff_design)), end='\r')
df_predicted_jobs = []
if lst_predicted_jobs:
df_predicted_jobs=pd.concat(lst_predicted_jobs)
if len(df_predicted_jobs) != 0:
df_predicted_jobs=df_predicted_jobs.dropna(subset=['total_power_mean_watts','hist_pred_total_power_mean'])
## Print results stats, no longer needed
"""
for user in lst_users:
#print('user: ', user)
df_predicted_user=df_predicted_jobs.loc[df_predicted_jobs['user_id']==user]
print('number of predicted jobs: ', len(df_predicted_user))
if len(df_predicted_user) == 0:
continue
df_predicted_user_hist = df_predicted_user.loc[df_predicted_user['pred_method'] == 'history']
if(len(df_predicted_user_hist) > 0):
print('historic MSE when used only: ', mean_squared_error(df_predicted_user['total_power_mean_watts'], df_predicted_user['hist_pred_total_power_mean']))
print('historic MAPE when used only: ', mean_absolute_percentage_error(df_predicted_user['total_power_mean_watts'], df_predicted_user['hist_pred_total_power_mean']))
print('historic MSE when selected: ', mean_squared_error(df_predicted_user_hist['total_power_mean_watts'], df_predicted_user_hist['hist_pred_total_power_mean']))
#df_predicted_jobs_notnan=df_predicted_user.dropna(subset='reg_pred_pp0_mean')
df_predicted_user_reg = df_predicted_user.loc[df_predicted_user['pred_method'] == 'regression']
if(len(df_predicted_user_reg) > 0):
print('regression MSE when selected: ', mean_squared_error(df_predicted_user_reg['total_power_mean_watts'], df_predicted_user_reg['reg_pred_total_power_mean']))
print('final MSE: ', mean_squared_error(df_predicted_user['total_power_mean_watts'], df_predicted_user['final_pred_total_power_mean']))
print('final MAPE: ', mean_absolute_percentage_error(df_predicted_user['total_power_mean_watts'], df_predicted_user['final_pred_total_power_mean']))
print('final MAE: ', mean_absolute_error(df_predicted_user['total_power_mean_watts'], df_predicted_user['final_pred_total_power_mean']))
"""
return df_predicted_jobs
"""
Save predicted jobs data to csv
"""
def save_results(df_predicted_jobs, outputfile):
if len(df_predicted_jobs) == 0:
return
else:
df_predicted_jobs.to_csv(outputfile, index=False)
"""
Run workflow
"""
def run_workflow(jobfile, outputfile, user):
lst_users, jobs_users, dct_users_first_job_timestamp = read_data(jobfile, user)
df_predicted_jobs = predict_jobs(lst_users, jobs_users, dct_users_first_job_timestamp)
save_results(df_predicted_jobs, outputfile)
"""
Read Command line interface
"""
def read_cli():
# Make parser object
p = argparse.ArgumentParser(description='Run the sophisticated power prediction method')
p.add_argument("--jobfile", "-j", type=str, required=True,
help="Job table file with power aggregation metrics (output from marconi_jobs_extract_power_metrics.py)")
p.add_argument("--outputfile", "-o", type=str, required=True,
help="Output file name (optionally with global path)")
p.add_argument("--user", "-u", type=int, required=True,
help="User ID")
return(p.parse_args())
def main():
if sys.version_info<(3,5,0):
sys.stderr.write("You need python 3.5 or later to run this script\n")
sys.exit(1)
args = read_cli()
run_workflow(args.jobfile, args.outputfile, args.user)
if __name__ == '__main__':
main()
import pandas as pd
import subprocess
import sys
import argparse
import os
import functools
from multiprocessing import Pool
"""
Run Workflow
"""
def run_workflow(jobfile, outdir):
# Load the dataframe with "user_id" column
df = pd.read_csv(jobfile)
# Iterate over each user_id and run predict_jobs_power_svr.py
for user_id in df['user_id'].drop_duplicates():
outfile = outdir+'/filter123_user_'+str(user_id)+'_total_power_mean_pred.csv.gz'
# Skip user if prediction was already performed
if os.path.isfile(outfile) == True:
continue
command = ['predict-jobs-power-allmethods-max', "-j", jobfile , '-o', outfile, '-u', str(user_id)]
subprocess.run(command)
"""
Read Command line interface
"""
def read_cli():
# Make parser object
p = argparse.ArgumentParser(description='Run the sophisticated power prediction method, but one call for each user')
p.add_argument("--jobfile", "-j", type=str, required=True,
help="Job table file with power aggregation metrics (output from marconi_jobs_extract_power_metrics.py)")
p.add_argument("--outdir", "-o", type=str, required=True,
help="Directory of the output files")
return(p.parse_args())
def main():
if sys.version_info<(3,5,0):
sys.stderr.write("You need python 3.5 or later to run this script\n")
sys.exit(1)
args = read_cli()
run_workflow(args.jobfile, args.outdir)
if __name__ == '__main__':
main()
import pandas as pd
import subprocess
import sys
import argparse
import os
import functools
from multiprocessing import Pool
"""
Run Workflow
"""
def run_workflow(jobfile, outdir):
# Load the dataframe with "user_id" column
df = pd.read_csv(jobfile)
# Iterate over each user_id and run predict_jobs_power_svr.py
for user_id in df['user_id'].drop_duplicates():
outfile = outdir+'/filter123_user_'+str(user_id)+'_total_power_mean_pred.csv.gz'
# Skip user if prediction was already performed
if os.path.isfile(outfile) == True:
continue
command = ['predict-jobs-power-allmethods-mean', "-j", jobfile , '-o', outfile, '-u', str(user_id)]
subprocess.run(command)
"""
Read Command line interface
"""
def read_cli():
# Make parser object
p = argparse.ArgumentParser(description='Run the sophisticated power prediction method, but one call for each user')
p.add_argument("--jobfile", "-j", type=str, required=True,
help="Job table file with power aggregation metrics (output from marconi_jobs_extract_power_metrics.py)")
p.add_argument("--outdir", "-o", type=str, required=True,
help="Directory of the output files")
return(p.parse_args())
def main():
if sys.version_info<(3,5,0):
sys.stderr.write("You need python 3.5 or later to run this script\n")
sys.exit(1)
args = read_cli()
run_workflow(args.jobfile, args.outdir)
if __name__ == '__main__':
main()
......@@ -11,6 +11,14 @@ version = "0.1.0"
[project.scripts]
m100-data-downloader = "expe_energumen.m100_data_downloader:main"
m100-agg-power-months = "expe_energumen.m100_agg_month_power_values:several_months"
m100-pred-preprocess1 = "expe_energumen.m100_pred_preprocess_1:main"
m100-pred-preprocess2 = "expe_energumen.m100_pred_preprocess_2:main"
m100-pred-jobs-extract-power-metrics = "expe_energumen.m100_pred_jobs_extract_power_metrics:main"
m100-pred-merge-jobfiles = "expe_energumen.m100_pred_merge_jobfiles:main"
run-prediction-per-user-allmethods-mean = "expe_energumen.run_prediction_per_user_allmethods_mean:main"
run-prediction-per-user-allmethods-max = "expe_energumen.run_prediction_per_user_allmethods_max:main"
predict-jobs-power-allmethods-mean = "expe_energumen.predict_jobs_power_allmethods_mean:main"
predict-jobs-power-allmethods-max = "expe_energumen.predict_jobs_power_allmethods_max:main"
m100-agg-power-predictions = "expe_energumen.m100_agg_power_predictions:agg_all_files"
m100-agg-jobs-info = "expe_energumen.m100_agg_jobs_info:several_months"
m100-join-usable-jobs-info = "expe_energumen.m100_join_usable_jobs_info:join"
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment